You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ra...@apache.org on 2012/07/25 18:47:47 UTC
svn commit: r1365663 - in /qpid/proton/branches/rajith_sandbox:
examples/mailbox/ proton-j/src/org/apache/qpid/proton/driver/
proton-j/src/org/apache/qpid/proton/driver/impl/
proton-j/src/org/apache/qpid/proton/engine/impl/
proton-j/src/org/apache/qpid...
Author: rajith
Date: Wed Jul 25 16:47:46 2012
New Revision: 1365663
URL: http://svn.apache.org/viewvc?rev=1365663&view=rev
Log:
QPID-4132 Experimental Driver implementation for proton-j. This is
initial checkin focus on just getting it to work. A lot more
improvements are to follow.
Added:
qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/ConnectorFactory.java
- copied, changed from r1364816, qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/BytesTransport.java
qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/MailServer.java
qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/ServerConnectorFactory.java
- copied, changed from r1364816, qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/Application.java
qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/logging/
qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/logging/LogHandler.java
- copied, changed from r1364816, qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/ApplicationFactory.java
Removed:
qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/Application.java
qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/ApplicationFactory.java
qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/BytesTransport.java
qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/ConnectionTransport.java
qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/DelegatingTransport.java
qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/NIOAcceptingDriver.java
qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/NoddyBrokerApplication.java
Modified:
qpid/proton/branches/rajith_sandbox/examples/mailbox/fetch
qpid/proton/branches/rajith_sandbox/examples/mailbox/post
qpid/proton/branches/rajith_sandbox/examples/mailbox/server
qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/Connector.java
qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/Driver.java
qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/Listener.java
qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/DriverImpl.java
qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/ListenerImpl.java
qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/ServerConnectorImpl.java
qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/engine/impl/SaslImpl.java
qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/engine/impl/SaslServerImpl.java
qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/engine/impl/TransportImpl.java
Modified: qpid/proton/branches/rajith_sandbox/examples/mailbox/fetch
URL: http://svn.apache.org/viewvc/qpid/proton/branches/rajith_sandbox/examples/mailbox/fetch?rev=1365663&r1=1365662&r2=1365663&view=diff
==============================================================================
--- qpid/proton/branches/rajith_sandbox/examples/mailbox/fetch (original)
+++ qpid/proton/branches/rajith_sandbox/examples/mailbox/fetch Wed Jul 25 16:47:46 2012
@@ -77,6 +77,7 @@ class FetchClient(object):
# inform the engine about the connection, and link the driver to it.
self.conn = pn_connection()
+ pn_connection_set_container(self.conn, "fetch")
pn_connector_set_connection(self.cxtr, self.conn)
# create a session, and Link for receiving from the mailbox
Modified: qpid/proton/branches/rajith_sandbox/examples/mailbox/post
URL: http://svn.apache.org/viewvc/qpid/proton/branches/rajith_sandbox/examples/mailbox/post?rev=1365663&r1=1365662&r2=1365663&view=diff
==============================================================================
--- qpid/proton/branches/rajith_sandbox/examples/mailbox/post (original)
+++ qpid/proton/branches/rajith_sandbox/examples/mailbox/post Wed Jul 25 16:47:46 2012
@@ -76,6 +76,7 @@ class PostClient(object):
# inform the engine about the connection, and link the driver to it.
self.conn = pn_connection()
+ pn_connection_set_container(self.conn, "post")
pn_connector_set_connection(self.cxtr, self.conn)
# create a session, and Link for receiving from the mailbox
Modified: qpid/proton/branches/rajith_sandbox/examples/mailbox/server
URL: http://svn.apache.org/viewvc/qpid/proton/branches/rajith_sandbox/examples/mailbox/server?rev=1365663&r1=1365662&r2=1365663&view=diff
==============================================================================
--- qpid/proton/branches/rajith_sandbox/examples/mailbox/server (original)
+++ qpid/proton/branches/rajith_sandbox/examples/mailbox/server Wed Jul 25 16:47:46 2012
@@ -161,7 +161,9 @@ class MailboxServer(object):
state = pn_sasl_state(sasl)
if state == PN_SASL_PASS:
- pn_connector_set_connection(cxtr, pn_connection());
+ conn = pn_connection()
+ pn_connection_set_container(conn, "mail-server")
+ pn_connector_set_connection(cxtr, conn)
pn_connector_set_context(cxtr, CONNECTION_UP)
self.log("Authentication-PASSED")
elif state == PN_SASL_FAIL:
Modified: qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/Connector.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/Connector.java?rev=1365663&r1=1365662&r2=1365663&view=diff
==============================================================================
--- qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/Connector.java (original)
+++ qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/Connector.java Wed Jul 25 16:47:46 2012
@@ -1,77 +1,104 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
package org.apache.qpid.proton.driver;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Sasl;
+/**
+ * Client API
+ *
+ * @param <C> application supplied context
+ */
public interface Connector<C>
{
/** Service the given connector.
- *
- * Handle any inbound data, outbound data, or timing events pending on
- * the connector.
- *
- */
- void process();
-
- /** Access the listener which opened this connector.
- *
- * @return the listener which created this connector, or NULL if the
- * connector has no listener (e.g. an outbound client
- * connection)
- */
- Listener listener();
-
- /** Access the Authentication and Security context of the connector.
- *
- * @return the Authentication and Security context for the connector,
- * or NULL if none
- */
- Sasl sasl();
-
- /** Access the AMQP Connection associated with the connector.
- *
- * @return the connection context for the connector, or NULL if none
- */
- Connection getConnection();
-
- /** Assign the AMQP Connection associated with the connector.
- *
- * @param connection the connection to associate with the
- * connector
- */
- void setConnection(Connection connection);
-
- /** Access the application context that is associated with the
- * connector.
- *
- * @return the application context that was passed to pn_connector()
- * or pn_connector_fd()
- */
- C getContext();
-
- /** Assign a new application context to the connector.
- *
- * @param[in] context new application context to associate with the
- * connector
- */
- void setContext(C context);
-
- /** Close the socket used by the connector.
- *
- */
- void close();
-
- /** Determine if the connector is closed.
- *
- * @return True if closed, otherwise false
- */
- boolean isClosed();
-
- /** Destructor for the given connector.
- *
- * Assumes the connector's socket has been closed prior to call.
- *
- */
- void destroy();
-
+ *
+ * Handle any inbound data, outbound data, or timing events pending on
+ * the connector.
+ *
+ */
+ void process();
+
+ /** Access the listener which opened this connector.
+ *
+ * @return the listener which created this connector, or NULL if the
+ * connector has no listener (e.g. an outbound client
+ * connection).
+ */
+ @SuppressWarnings("rawtypes")
+ Listener listener();
+
+ /** Access the Authentication and Security context of the connector.
+ *
+ * @return the Authentication and Security context for the connector,
+ * or NULL if none.
+ */
+ Sasl sasl();
+
+ /** Access the AMQP Connection associated with the connector.
+ *
+ * @return the connection context for the connector, or NULL if none.
+ */
+ Connection getConnection();
+
+ /** Assign the AMQP Connection associated with the connector.
+ *
+ * @param connection the connection to associate with the connector.
+ */
+ void setConnection(Connection connection);
+
+ /**
+ * Access the application context that is associated with the connector.
+ *
+ * @return the application context that was passed when creating this
+ * connector. See
+ * {@link Driver#createConnector(String, int, Object)
+ * createConnector(String, int, Object)} and
+ * {@link Driver#createConnector(java.nio.channels.SelectableChannel, Object)
+ * createConnector(java.nio.channels.SelectableChannel, Object)}.
+ */
+ C getContext();
+
+ /** Assign a new application context to the connector.
+ *
+ * @param context new application context to associate with the connector
+ */
+ void setContext(C context);
+
+ /** Close the socket used by the connector.
+ *
+ */
+ void close();
+
+ /** Determine if the connector is closed.
+ *
+ * @return True if closed, otherwise false
+ */
+ boolean isClosed();
+
+ /** Destructor for the given connector.
+ *
+ * Assumes the connector's socket has been closed prior to call.
+ *
+ */
+ void destroy();
}
Modified: qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/Driver.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/Driver.java?rev=1365663&r1=1365662&r2=1365663&view=diff
==============================================================================
--- qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/Driver.java (original)
+++ qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/Driver.java Wed Jul 25 16:47:46 2012
@@ -1,88 +1,123 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
package org.apache.qpid.proton.driver;
import java.nio.channels.SelectableChannel;
import java.nio.channels.ServerSocketChannel;
+/**
+ * The Driver interface provides an abstraction for an implementation of
+ * a driver for the proton engine.
+ * A driver is responsible for providing input, output, and tick events,
+ * to the bottom half of the engine API. TODO See ::pn_input, ::pn_output, and
+ * ::pn_tick.
+ * The driver also provides an interface for the application to access,
+ * the top half of the API when the state of the engine may have changed
+ * due to I/O or timing events. Additionally the driver incorporates the SASL
+ * engine as well in order to provide a complete network stack: AMQP over SASL
+ * over TCP.
+ */
+
public interface Driver
{
- /** Force pn_driver_wait() to return
+ /**
+ * Force wait() to return
*
*/
void wakeup();
- /** Wait for an active connector or listener
+ /**
+ * Wait for an active connector or listener
*
- * @param[in] timeout maximum time in milliseconds to wait, -1 means
- * infinite wait
+ * @param timeout maximum time in milliseconds to wait.
+ * 0 means infinite wait
*/
void doWait(int timeout);
- /** Get the next listener with pending data in the driver.
+ /**
+ * Get the next listener with pending data in the driver.
*
* @return NULL if no active listener available
*/
+ @SuppressWarnings("rawtypes")
Listener listener();
- /** Get the next active connector in the driver.
+ /**
+ * Get the next active connector in the driver.
*
- * Returns the next connector with pending inbound data, available
- * capacity for outbound data, or pending tick.
+ * Returns the next connector with pending inbound data, available capacity
+ * for outbound data, or pending tick.
*
* @return NULL if no active connector available
*/
+ @SuppressWarnings("rawtypes")
Connector connector();
- /** Destruct the driver and all associated
- * listeners and connectors.
+ /**
+ * Destruct the driver and all associated listeners, connectors and other resources.
*/
void destroy();
-
-
- /** Construct a listener for the given address.
+ /**
+ * Construct a listener for the given address.
*
* @param host local host address to listen on
* @param port local port to listen on
* @param context application-supplied, can be accessed via
- * pn_listener_context()
+ * {@link Listener#getContext() getContext()} method on a listener.
* @return a new listener on the given host:port, NULL if error
*/
<C> Listener<C> createListener(String host, int port, C context);
- /** Create a listener using the existing channel.
+ /**
+ * Create a listener using the existing channel.
*
- * @param c existing file descriptor for listener to listen on
+ * @param c existing SocketChannel for listener to listen on
* @param context application-supplied, can be accessed via
- * pn_listener_context()
+ * {@link Listener#getContext() getContext()} method on a listener.
* @return a new listener on the given channel, NULL if error
*/
- <C> Listener<C> createListener(ServerSocketChannel c, C context);
+ <C> Listener<C> createListener(ServerSocketChannel c, C context);
-
- /** Construct a connector to the given remote address.
+ /**
+ * Construct a connector to the given remote address.
*
* @param host remote host to connect to.
* @param port remote port to connect to.
- * @param context application supplied, can be accessed via
- * pn_connector_context() @return a new connector
- * to the given remote, or NULL on error.
+ * @param context application-supplied, can be accessed via
+ * {@link Connector#getContext() getContext()} method on a listener.
+ *
+ * @return a new connector to the given remote, or NULL on error.
*/
<C> Connector<C> createConnector(String host, int port, C context);
- /** Create a connector using the existing file descriptor.
+ /**
+ * Create a connector using the existing file descriptor.
*
- * @param fd existing file descriptor to use for this connector.
+ * @param c existing SocketChannel for listener to listen on
* @param context application-supplied, can be accessed via
- * pn_connector_context()
+ * {@link Connector#getContext() getContext()} method on a listener.
+ *
* @return a new connector to the given host:port, NULL if error.
*/
<C> Connector<C> createConnector(SelectableChannel fd, C context);
-
- /** Set the tracing level for the given connector.
- *
- * @param trace the trace level to use.
- */
- //void pn_connector_trace(pn_connector_t *connector, pn_trace_t trace);
-
-
}
Modified: qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/Listener.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/Listener.java?rev=1365663&r1=1365662&r2=1365663&view=diff
==============================================================================
--- qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/Listener.java (original)
+++ qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/Listener.java Wed Jul 25 16:47:46 2012
@@ -1,44 +1,61 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
package org.apache.qpid.proton.driver;
+/**
+ * Server API.
+ *
+ * @param <C> application supplied context
+ */
public interface Listener<C>
{
- /** pn_listener - the server API **/
-
-
/**
- * @todo pn_listener_trace needs documentation
- */
- //void pn_listener_trace(pn_listener_t *listener, pn_trace_t trace);
-
- /** Accept a connection that is pending on the listener.
+ * Accept a connection that is pending on the listener.
*
- * @param[in] listener the listener to accept the connection on
- * @return a new connector for the remote, or NULL on error
+ * @return a new connector for the remote, or NULL on error.
*/
- Connector accept();
+ Connector<C> accept();
- /** Access the application context that is associated with the listener.
+ /**
+ * Access the application context that is associated with the listener.
*
- * @param[in] listener the listener whose context is to be returned
- * @return the application context that was passed to pn_listener() or
- * pn_listener_fd()
+ * @return the application context that was passed when creating this
+ * listener. See {@link Driver#createListener(String, int, Object)
+ * createListener(String, int, Object)} and
+ * {@link Driver#createConnector(java.nio.channels.SelectableChannel, Object)
+ * createConnector(java.nio.channels.SelectableChannel, Object)}
*/
C getContext();
- /** Close the socket used by the listener.
+ /**
+ * Close the socket used by the listener.
*
- * @param[in] listener the listener whose socket will be closed.
*/
void close();
- /** Destructor for the given listener.
+ /**
+ * Destructor for the given listener.
*
* Assumes the listener's socket has been closed prior to call.
*
- * @param[in] listener the listener object to destroy, no longer valid
- * on return
*/
void destroy();
-
-
}
Copied: qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/ConnectorFactory.java (from r1364816, qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/BytesTransport.java)
URL: http://svn.apache.org/viewvc/qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/ConnectorFactory.java?p2=qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/ConnectorFactory.java&p1=qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/BytesTransport.java&r1=1364816&r2=1365663&rev=1365663&view=diff
==============================================================================
--- qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/BytesTransport.java (original)
+++ qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/ConnectorFactory.java Wed Jul 25 16:47:46 2012
@@ -18,12 +18,14 @@
* under the License.
*
*/
+package org.apache.qpid.proton.driver.impl;
-package org.apache.qpid.proton.driver;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
-public interface BytesTransport
-{
- int input(byte[] bytes, int offset, int size);
+import org.apache.qpid.proton.driver.Connector;
- int output(byte[] bytes, int offset, int size);
+public interface ConnectorFactory
+{
+ <C> Connector<C> createConnector(DriverImpl driver, SocketChannel sc, C context, SelectionKey key);
}
Modified: qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/DriverImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/DriverImpl.java?rev=1365663&r1=1365662&r2=1365663&view=diff
==============================================================================
--- qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/DriverImpl.java (original)
+++ qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/DriverImpl.java Wed Jul 25 16:47:46 2012
@@ -1,23 +1,55 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
package org.apache.qpid.proton.driver.impl;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
-import java.nio.channels.*;
+import java.net.Socket;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
import java.util.Collections;
import java.util.Iterator;
import java.util.Set;
+
import org.apache.qpid.proton.driver.Connector;
import org.apache.qpid.proton.driver.Driver;
import org.apache.qpid.proton.driver.Listener;
+import org.apache.qpid.proton.logging.LogHandler;
public class DriverImpl implements Driver
{
private Selector _selector;
private Set<SelectionKey> _selectedKeys = Collections.emptySet();
+ private ConnectorFactory _connectorFactory;
+ private LogHandler _logger;
- public DriverImpl() throws IOException
+ public DriverImpl(ConnectorFactory connectorFactory, LogHandler logger) throws IOException
{
+ _connectorFactory = connectorFactory;
+ _logger = logger;
_selector = Selector.open();
}
@@ -35,10 +67,12 @@ public class DriverImpl implements Drive
}
catch (IOException e)
{
- e.printStackTrace(); // TODO - Implement
+ _logger.error(e, "Exception when waiting for IO Event");
+ throw new RuntimeException(e);
}
}
+ @SuppressWarnings("rawtypes")
public Listener listener()
{
Listener listener = null;
@@ -51,7 +85,8 @@ public class DriverImpl implements Drive
}
catch (IOException e)
{
- e.printStackTrace(); // TODO - Implement
+ _logger.error(e, "Exception when selecting");
+ throw new RuntimeException(e);
}
listener = getFirstListener();
}
@@ -64,6 +99,7 @@ public class DriverImpl implements Drive
_selectedKeys = _selector.selectedKeys();
}
+ @SuppressWarnings("rawtypes")
private Listener getFirstListener()
{
Iterator<SelectionKey> selectedIter = _selectedKeys.iterator();
@@ -74,14 +110,13 @@ public class DriverImpl implements Drive
selectedIter.remove();
if(key.isAcceptable())
{
- selectedIter.remove();
return (Listener) key.attachment();
-
}
}
return null;
}
+ @SuppressWarnings("rawtypes")
public Connector connector()
{
Connector connector = null;
@@ -94,13 +129,15 @@ public class DriverImpl implements Drive
}
catch (IOException e)
{
- e.printStackTrace(); // TODO - Implement
+ _logger.error(e, "Exception when selecting");
+ throw new RuntimeException(e);
}
connector = getFirstConnector();
}
return connector;
}
+ @SuppressWarnings("rawtypes")
private Connector getFirstConnector()
{
Iterator<SelectionKey> selectedIter = _selectedKeys.iterator();
@@ -111,7 +148,6 @@ public class DriverImpl implements Drive
selectedIter.remove();
if(key.isReadable() || key.isWritable())
{
- selectedIter.remove();
return (Connector) key.attachment();
}
@@ -122,7 +158,15 @@ public class DriverImpl implements Drive
public void destroy()
{
- //TODO - Implement
+ try
+ {
+ _selector.close();
+ }
+ catch (IOException e)
+ {
+ _logger.error(e, "Exception when closing selector");
+ throw new RuntimeException(e);
+ }
}
public <C> Listener<C> createListener(String host, int port, C context)
@@ -150,8 +194,9 @@ public class DriverImpl implements Drive
{
try
{
- c.register(_selector, SelectionKey.OP_ACCEPT);
- return new ListenerImpl<C>(this, c, context);
+ Listener<C> l = new ListenerImpl<C>(this, c, context);
+ c.register(_selector, SelectionKey.OP_ACCEPT,l);
+ return l;
}
catch (ClosedChannelException e)
{
@@ -162,11 +207,41 @@ public class DriverImpl implements Drive
public <C> Connector<C> createConnector(String host, int port, C context)
{
- return null; //TODO - Implement
+ try
+ {
+ SocketChannel channel = SocketChannel.open();
+ channel.configureBlocking(false);
+ Socket socket = channel.socket();
+ socket.bind(new InetSocketAddress(host, port));
+ return createConnector(channel, context);
+ }
+ catch (IOException e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ }
+
+ public <C> Connector<C> createConnector(SelectableChannel c, C context)
+ {
+ try
+ {
+ int opKeys = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
+ SelectionKey key = c.register(_selector, opKeys);
+ Connector<C> co = _connectorFactory.createConnector(this, (SocketChannel)c, context, key);
+ key.attach(co);
+ return co;
+ }
+ catch (ClosedChannelException e)
+ {
+ e.printStackTrace(); // TODO - Implement
+ throw new RuntimeException(e);
+ }
}
- public <C> Connector<C> createConnector(SelectableChannel fd, C context)
+ protected LogHandler getLogHandler()
{
- return null; //TODO - Implement
+ return _logger;
}
}
Modified: qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/ListenerImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/ListenerImpl.java?rev=1365663&r1=1365662&r2=1365663&view=diff
==============================================================================
--- qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/ListenerImpl.java (original)
+++ qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/ListenerImpl.java Wed Jul 25 16:47:46 2012
@@ -1,3 +1,23 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
package org.apache.qpid.proton.driver.impl;
import java.io.IOException;
@@ -27,7 +47,7 @@ class ListenerImpl<C> implements Listene
if(c != null)
{
c.configureBlocking(false);
- return new ServerConnectorImpl(_driver,c);
+ return _driver.createConnector(c, _context);
}
}
catch (IOException e)
Added: qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/MailServer.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/MailServer.java?rev=1365663&view=auto
==============================================================================
--- qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/MailServer.java (added)
+++ qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/MailServer.java Wed Jul 25 16:47:46 2012
@@ -0,0 +1,430 @@
+package org.apache.qpid.proton.driver.impl;
+
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.qpid.proton.driver.Connector;
+import org.apache.qpid.proton.driver.Driver;
+import org.apache.qpid.proton.driver.Listener;
+import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.EndpointState;
+import org.apache.qpid.proton.engine.Link;
+import org.apache.qpid.proton.engine.Receiver;
+import org.apache.qpid.proton.engine.Sasl;
+import org.apache.qpid.proton.engine.Sasl.SaslOutcome;
+import org.apache.qpid.proton.engine.Sasl.SaslState;
+import org.apache.qpid.proton.engine.Sender;
+import org.apache.qpid.proton.engine.Session;
+import org.apache.qpid.proton.engine.impl.ConnectionImpl;
+import org.apache.qpid.proton.engine.impl.SaslServerImpl;
+import org.apache.qpid.proton.logging.LogHandler;
+import org.apache.qpid.proton.type.transport.DeliveryState;
+
+public class MailServer
+{
+ enum State {NEW,AUTHENTICATING,CONNECTION_UP, FAILED};
+
+ private Driver _driver;
+ private LogHandler _logger;
+ private Listener<State> _listener;
+ private int _counter;
+ private Map<String,List<byte[]>> _mailboxes = new HashMap<String,List<byte[]>>();
+
+ public MailServer() throws Exception
+ {
+ _logger = new Logger();
+ _driver = new DriverImpl(new ServerConnectorFactory(),_logger);
+ _listener = _driver.createListener("localhost", 5672, State.NEW);
+ }
+
+ public void doWait()
+ {
+ _driver.doWait(0);
+ }
+
+ public void acceptConnections()
+ {
+ // We have only one listener
+ if (_driver.listener() != null)
+ {
+ _logger.info("Accepting Connection.");
+ Connector<State> ctor = _listener.accept();
+ ctor.setContext(State.AUTHENTICATING);
+ }
+ }
+
+ public void processConnections() throws Exception
+ {
+ Connector<State> ctor = _driver.connector();
+ while (ctor != null)
+ {
+ // process any data coming from the network, this will update the
+ // engine's view of the state of the remote clients
+ ctor.process();
+ State state = ctor.getContext();
+ if (state == State.AUTHENTICATING)
+ {
+ //connection has not passed SASL authentication yet
+ authenticateConnector(ctor);
+ }
+ else if (state == State.CONNECTION_UP)
+ {
+ //active connection, service any engine events
+ serviceConnector(ctor);
+ }
+ else
+ {
+ _logger.error("Unknown state.");
+ }
+
+ // now generate any outbound network data generated in response to
+ // any work done by the engine.
+ ctor.process();
+
+ ctor = _driver.connector();
+ }
+ }
+
+ private void authenticateConnector(Connector<State> ctor) throws Exception
+ {
+ _logger.info("Authenticating...");
+ Sasl sasl = ctor.sasl();
+ SaslState state = sasl.getState();
+ while (state == SaslState.PN_SASL_CONF || state == SaslState.PN_SASL_STEP)
+ {
+ if (state == SaslState.PN_SASL_CONF)
+ {
+ _logger.info("Authenticating-CONF...");
+ sasl.setMechanisms(new String[]{"ANONYMOUS"});
+ }
+ else if (state == SaslState.PN_SASL_STEP)
+ {
+ _logger.info("Authenticating-STEP...");
+ String[] mechs = sasl.getRemoteMechanisms();
+ if (mechs[0] == "ANONYMOUS")
+ {
+ ((SaslServerImpl)sasl).done(SaslOutcome.PN_SASL_OK);
+ }
+ else
+ {
+ //sasl.(sasl, PN_SASL_AUTH)
+ }
+ }
+ state = sasl.getState();
+ }
+
+ if (state == SaslState.PN_SASL_PASS)
+ {
+ ctor.setConnection(new ConnectionImpl());
+ ctor.setContext(State.CONNECTION_UP);
+ _logger.info("Authentication-PASSED");
+ }
+ else if (state == SaslState.PN_SASL_FAIL)
+ {
+ ctor.setContext(State.FAILED);
+ ctor.close();
+ _logger.info("Authentication-FAILED");
+ }
+ else
+ {
+ _logger.info("Authentication-PENDING");
+ }
+
+ }
+
+ private void serviceConnector(Connector<State> ctor) throws Exception
+ {
+ Connection con = ctor.getConnection();
+
+ // Step 1: setup the engine's connection, and any sessions and links
+ // that may be pending.
+
+ // initialize the connection if it's new
+ if (con.getLocalState() == EndpointState.UNINITIALIZED)
+ {
+ con.open();
+ _logger.debug("Connection Opened.");
+ }
+
+ // open all pending sessions
+ Session ssn = con.sessionHead(EnumSet.of(EndpointState.UNINITIALIZED),
+ EnumSet.of(EndpointState.UNINITIALIZED,EndpointState.ACTIVE));
+ while (ssn != null)
+ {
+
+ ssn.open();
+ _logger.debug("Session Opened.");
+ ssn = con.sessionHead(EnumSet.of(EndpointState.UNINITIALIZED),
+ EnumSet.of(EndpointState.UNINITIALIZED,EndpointState.ACTIVE));
+ }
+
+ // configure and open any pending links
+ Link link = con.linkHead(EnumSet.of(EndpointState.UNINITIALIZED),
+ EnumSet.of(EndpointState.UNINITIALIZED,EndpointState.ACTIVE));
+ while (link != null)
+ {
+
+ setupLink(link);
+ _logger.debug("Link Opened.");
+ link = con.linkHead(EnumSet.of(EndpointState.UNINITIALIZED),
+ EnumSet.of(EndpointState.UNINITIALIZED,EndpointState.ACTIVE));
+ }
+
+ // Step 2: Now drain all the pending deliveries from the connection's
+ // work queue and process them
+
+ Delivery delivery = con.getWorkHead();
+ while (delivery != null)
+ {
+ _logger.debug("Process delivery " + delivery.getTag());
+
+ if (delivery.isReadable()) // inbound data available
+ {
+ processReceive(delivery);
+ }
+ else if (delivery.isWritable()) // can send a message
+ {
+ sendMessage(delivery);
+ }
+
+ // check to see if the remote has accepted message we sent
+ if (delivery.remotelySettled())
+ {
+ // once we know the remote has seen the message, we can
+ // release the delivery.
+ delivery.settle();
+ }
+
+ delivery = con.getWorkHead();
+ }
+
+ // Step 3: Clean up any links or sessions that have been closed by the
+ // remote. If the connection has been closed remotely, clean that up also.
+
+ // teardown any terminating links
+ link = con.linkHead(EnumSet.of(EndpointState.ACTIVE),
+ EnumSet.of(EndpointState.CLOSED));
+ while (link != null)
+ {
+ link.close();
+ _logger.debug("Link Closed");
+ link = con.linkHead(EnumSet.of(EndpointState.ACTIVE),
+ EnumSet.of(EndpointState.CLOSED));
+ }
+
+ // teardown any terminating sessions
+ ssn = con.sessionHead(EnumSet.of(EndpointState.ACTIVE),
+ EnumSet.of(EndpointState.CLOSED));
+ while (ssn != null)
+ {
+ ssn.close();
+ _logger.debug("Session Closed");
+ ssn = con.sessionHead(EnumSet.of(EndpointState.ACTIVE),
+ EnumSet.of(EndpointState.CLOSED));
+ }
+
+ // teardown the connection if it's terminating
+ if (con.getRemoteState() == EndpointState.CLOSED)
+ {
+ _logger.debug("Connection Closed");
+ con.close();
+ }
+ }
+
+ private void setupLink(Link link)
+ {
+ String src = link.getRemoteSourceAddress();
+ String target = link.getRemoteTargetAddress();
+
+ if (link instanceof Sender)
+ {
+ _logger.debug("Opening Link to read from mailbox: " + src);
+ if (!_mailboxes.containsKey(src))
+ {
+ _logger.error("Error: mailbox " + src + " does not exist!");
+ // TODO report error.
+ }
+ }
+ else
+ {
+ _logger.debug("Opening Link to write from mailbox: " + target);
+ if (!_mailboxes.containsKey(target))
+ {
+ _mailboxes.put(target, new ArrayList<byte[]>());
+ }
+ }
+
+ link.setLocalSourceAddress(src);
+ link.setLocalTargetAddress(target);
+
+ if (link instanceof Sender)
+ {
+ // grant a delivery to the link - it will become "writable" when the
+ // driver can accept messages for the sender.
+ String id = "server-delivery-" + _counter;
+ link.delivery(id.getBytes(),0,id.getBytes().length);
+ _counter++;
+ }
+ else
+ {
+ // Grant enough credit to the receiver to allow one inbound message
+ ((Receiver)link).flow(1);
+ }
+ link.open();
+ }
+
+ private void processReceive(Delivery d)
+ {
+ Receiver rec = (Receiver)d.getLink();
+ String mailboxName = rec.getRemoteTargetAddress();
+ List<byte[]> mailbox;
+ if (!_mailboxes.containsKey(mailboxName))
+ {
+ _logger.error("Error: cannot sent to mailbox " + mailboxName + " - dropping message.");
+ }
+ else
+ {
+ mailbox = _mailboxes.get(mailboxName);
+ byte[] readBuf = new byte[1024];
+ int bytesRead = rec.recv(readBuf, 0, readBuf.length);
+ ByteArrayOutputStream bout = new ByteArrayOutputStream();
+ while (bytesRead > 0)
+ {
+ bout.write(readBuf, 0, bytesRead);
+ bytesRead = rec.recv(readBuf, 0, readBuf.length);
+ }
+ mailbox.add(bout.toByteArray());
+ }
+
+ d.disposition(null);
+ d.settle();
+ rec.advance();
+ if (rec.getCredit() == 0)
+ {
+ rec.flow(1);
+ }
+ }
+
+ private void sendMessage(Delivery d)
+ {
+ Sender sender = (Sender)d.getLink();
+ String mailboxName = sender.getRemoteSourceAddress();
+ _logger.error("Request for Mailbox : " + mailboxName);
+ byte[] msg;
+ if (_mailboxes.containsKey(mailboxName))
+ {
+ msg = _mailboxes.get(mailboxName).remove(0);
+ _logger.debug("Fetching message " + new String(msg));
+ }
+ else
+ {
+ _logger.debug("Warning: mailbox " + mailboxName + " is empty, sending empty message.");
+ msg = "".getBytes();
+ }
+ sender.send(msg, 0, msg.length);
+ if (sender.advance())
+ {
+ String id = "server-delivery-" + _counter;
+ sender.delivery(id.getBytes(),0,id.getBytes().length);
+ _counter++;
+ }
+ }
+
+ public static void main(String[] args) throws Exception
+ {
+ MailServer server = new MailServer();
+ while (true)
+ {
+ server.doWait();
+ server.acceptConnections();
+ server.processConnections();
+ }
+ }
+
+ class Logger implements LogHandler
+ {
+ static final String prefix = "TestServer: ";
+
+ @Override
+ public boolean isTraceEnabled()
+ {
+ return true;
+ }
+
+ @Override
+ public void trace(String message)
+ {
+ System.out.println(prefix + message);
+ }
+
+ @Override
+ public boolean isDebugEnabled()
+ {
+ return true;
+ }
+
+ @Override
+ public void debug(String message)
+ {
+ System.out.println(prefix + "DEBUG : " + message);
+ }
+
+ @Override
+ public void debug(Throwable t, String message)
+ {
+ System.out.println(prefix + "DEBUG : " + message);
+ t.printStackTrace();
+ }
+
+ @Override
+ public boolean isInfoEnabled()
+ {
+ return true;
+ }
+
+ @Override
+ public void info(String message)
+ {
+ System.out.println(prefix + "INFO : " + message);
+ }
+
+ @Override
+ public void info(Throwable t, String message)
+ {
+ System.out.println(prefix + "INFO : " + message);
+ t.printStackTrace();
+ }
+
+ @Override
+ public void warn(String message)
+ {
+ System.out.println(prefix + "WARN : " + message);
+ }
+
+ @Override
+ public void warn(Throwable t, String message)
+ {
+ System.out.println(prefix + "INFO : " + message);
+ t.printStackTrace();
+ }
+
+ @Override
+ public void error(String message)
+ {
+ System.out.println(prefix + "ERROR : " + message);
+ }
+
+ @Override
+ public void error(Throwable t, String message)
+ {
+ System.out.println(prefix + "INFO : " + message);
+ t.printStackTrace();
+ }
+
+ }
+}
Copied: qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/ServerConnectorFactory.java (from r1364816, qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/Application.java)
URL: http://svn.apache.org/viewvc/qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/ServerConnectorFactory.java?p2=qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/ServerConnectorFactory.java&p1=qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/Application.java&r1=1364816&r2=1365663&rev=1365663&view=diff
==============================================================================
--- qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/Application.java (original)
+++ qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/ServerConnectorFactory.java Wed Jul 25 16:47:46 2012
@@ -18,12 +18,20 @@
* under the License.
*
*/
+package org.apache.qpid.proton.driver.impl;
-package org.apache.qpid.proton.driver;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
-import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.driver.Connector;
+import org.apache.qpid.proton.driver.Driver;
-public interface Application
+public class ServerConnectorFactory implements ConnectorFactory
{
- void process(Connection conn);
+ @Override
+ public <C> Connector<C> createConnector(DriverImpl driver, SocketChannel sc, C context, SelectionKey key)
+ {
+ return new ServerConnectorImpl<C>(driver, sc,context, key);
+ }
+
}
Modified: qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/ServerConnectorImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/ServerConnectorImpl.java?rev=1365663&r1=1365662&r2=1365663&view=diff
==============================================================================
--- qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/ServerConnectorImpl.java (original)
+++ qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/ServerConnectorImpl.java Wed Jul 25 16:47:46 2012
@@ -1,28 +1,205 @@
package org.apache.qpid.proton.driver.impl;
+import static org.apache.qpid.proton.driver.impl.ServerConnectorImpl.ConnectorState.NEW;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
+
import org.apache.qpid.proton.driver.Connector;
import org.apache.qpid.proton.driver.Listener;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Sasl;
import org.apache.qpid.proton.engine.impl.SaslServerImpl;
+import org.apache.qpid.proton.logging.LogHandler;
-class ServerConnectorImpl implements Connector
+class ServerConnectorImpl<C> implements Connector<C>
{
+ public static int END_OF_STREAM = -1;
+ private static int DEFAULT_BUFFER_SIZE = 64 * 1024;
+ private static int readBufferSize = Integer.getInteger
+ ("pn.receive_buffer_size", DEFAULT_BUFFER_SIZE);
+ private static int writeBufferSize = Integer.getInteger
+ ("pn.send_buffer_size", DEFAULT_BUFFER_SIZE);
+
+ enum ConnectorState {NEW, OPENED, CLOSED};
+
private final SaslServerImpl _sasl;
- private DriverImpl _driver;
- private SocketChannel _channel;
+ private final DriverImpl _driver;
+ private final SocketChannel _channel;
+ private final LogHandler _logger;
+ private C _context;
+ private Connection _connection;
+ private SelectionKey _key;
+ private ConnectorState _state = NEW;
+
+ private ByteBuffer _readBuffer = ByteBuffer.allocate(readBufferSize);
+ private int _bytesNotRead = 0;
+
+ private int _bytesNotWritten = 0;
+ private ByteBuffer _writeBuffer = ByteBuffer.allocate(writeBufferSize);
- ServerConnectorImpl(DriverImpl driver, SocketChannel c)
+ ServerConnectorImpl(DriverImpl driver, SocketChannel c, C context, SelectionKey key)
{
_driver = driver;
_channel = c;
_sasl = new SaslServerImpl();
+ _sasl.setMechanisms(new String[]{"ANONYMOUS"}); //TODO
+ _logger = driver.getLogHandler();
+ _context = context;
+ _key = key;
}
public void process()
{
- //TODO - Implement
+ if (!_channel.isOpen())
+ {
+ return;
+ }
+
+ if (_key.isReadable())
+ {
+ read();
+ }
+
+ if (_key.isWritable())
+ {
+ write();
+ }
+ }
+
+ int processInput(byte[] bytes, int offset, int size)
+ {
+ int read = 0;
+ while (read < size)
+ {
+ switch (_state)
+ {
+ case NEW:
+ read += readSasl(bytes, offset, size);
+ writeSasl();
+ break;
+ case OPENED:
+ read += readAMQPCommands(bytes, offset, size);
+ writeAMQPCommands();
+ break;
+ }
+ }
+ return read;
+ }
+
+ void processOutput()
+ {
+ switch (_state)
+ {
+ case NEW:
+ writeSasl();
+ break;
+ case OPENED:
+ writeAMQPCommands();
+ break;
+ }
+ }
+
+ private int readAMQPCommands(byte[] bytes, int offset, int size)
+ {
+ int consumed = _connection.transport().input(bytes, offset, size);
+ if (consumed == END_OF_STREAM)
+ {
+ return size;
+ }
+ else
+ {
+ return consumed;
+ }
+ }
+
+ private void writeAMQPCommands()
+ {
+ int size = _writeBuffer.array().length - _bytesNotWritten;
+ _bytesNotWritten += _connection.transport().output(_writeBuffer.array(),
+ _bytesNotWritten, size);
+ }
+
+ private void setState(ConnectorState newState)
+ {
+ _state = newState;
+ }
+
+ int readSasl(byte[] bytes, int offset, int size)
+ {
+ int consumed = _sasl.input(bytes, offset, size);
+ if (consumed == END_OF_STREAM)
+ {
+ return size;
+ }
+ else
+ {
+ return consumed;
+ }
+ }
+
+ void writeSasl()
+ {
+ int size = _writeBuffer.array().length - _bytesNotWritten;
+ _bytesNotWritten += _sasl.output(_writeBuffer.array(),
+ _bytesNotWritten, size);
+ }
+
+ void write()
+ {
+ try
+ {
+ processOutput();
+ if (_bytesNotWritten > 0)
+ {
+ _writeBuffer.limit(_bytesNotWritten);
+ int written = _channel.write(_writeBuffer);
+ if (_writeBuffer.hasRemaining())
+ {
+ _writeBuffer.compact();
+ _bytesNotWritten = _bytesNotWritten - written;
+ }
+ else
+ {
+ _writeBuffer.clear();
+ _bytesNotWritten = 0;
+ }
+ }
+ }
+ catch (IOException e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ void read()
+ {
+ try
+ {
+ int bytesRead = _channel.read(_readBuffer);
+ int consumed = 0;
+ while (bytesRead > 0)
+ {
+ consumed = processInput(_readBuffer.array(), 0, bytesRead + _bytesNotRead);
+ if (consumed < bytesRead)
+ {
+ _readBuffer.compact();
+ _bytesNotRead = bytesRead - consumed;
+ }
+ else
+ {
+ _readBuffer.rewind();
+ _bytesNotRead = 0;
+ }
+ bytesRead = _channel.read(_readBuffer);
+ }
+ }
+ catch (IOException e)
+ {
+ e.printStackTrace();
+ }
}
public Listener listener()
@@ -32,41 +209,63 @@ class ServerConnectorImpl implements Con
public Sasl sasl()
{
- return null; //TODO - Implement
+ return _sasl;
}
public Connection getConnection()
{
- return null; //TODO - Implement
+ return _connection;
}
public void setConnection(Connection connection)
{
- //TODO - Implement
+ if (_sasl.isDone())
+ {
+ writeSasl();
+ }
+ else
+ {
+ throw new RuntimeException("Cannot set the connection before authentication is completed");
+ }
+
+ _connection = connection;
+ // write any initial data
+ int size = _writeBuffer.array().length - _bytesNotWritten;
+ _bytesNotWritten += _connection.transport().output(_writeBuffer.array(),
+ _bytesNotWritten, size);
+ setState(ConnectorState.OPENED);
}
- public Object getContext()
+ public C getContext()
{
- return null; //TODO - Implement
+ return _context;
}
- public void setContext(Object context)
+ public void setContext(C context)
{
- //TODO - Implement
+ _context = context;
}
public void close()
{
- //TODO - Implement
+ try
+ {
+ writeSasl();
+ _channel.close();
+ }
+ catch (IOException e)
+ {
+
+ }
}
public boolean isClosed()
{
- return false; //TODO - Implement
+ return !_channel.isOpen();
}
public void destroy()
{
- //TODO - Implement
+ close();
}
}
Modified: qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/engine/impl/SaslImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/engine/impl/SaslImpl.java?rev=1365663&r1=1365662&r2=1365663&view=diff
==============================================================================
--- qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/engine/impl/SaslImpl.java (original)
+++ qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/engine/impl/SaslImpl.java Wed Jul 25 16:47:46 2012
@@ -69,6 +69,7 @@ public abstract class SaslImpl implement
if(!_overflowBuffer.hasRemaining())
{
_overflowBuffer.clear();
+ _overflowBuffer.limit(0);
CompositeWritableBuffer outputBuffer =
new CompositeWritableBuffer(
Modified: qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/engine/impl/SaslServerImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/engine/impl/SaslServerImpl.java?rev=1365663&r1=1365662&r2=1365663&view=diff
==============================================================================
--- qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/engine/impl/SaslServerImpl.java (original)
+++ qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/engine/impl/SaslServerImpl.java Wed Jul 25 16:47:46 2012
@@ -40,6 +40,7 @@ public class SaslServerImpl extends Sasl
mechanisms.setSaslServerMechanisms(_mechanisms);
written += writeFrame(outputBuffer, mechanisms);
+ _state = SaslState.PN_SASL_STEP;
}
else if(getChallengeResponse() != null)
{
@@ -97,11 +98,13 @@ public class SaslServerImpl extends Sasl
{
//TODO - Implement
// error - should only be sent server -> client
+ System.out.println(saslMechanisms);
}
public void handleInit(SaslInit saslInit, Binary payload, Void context)
{
_hostname = saslInit.getHostname();
+ setMechanism(saslInit.getMechanism());
if(saslInit.getInitialResponse() != null)
{
setPending(saslInit.getInitialResponse().asByteBuffer());
@@ -112,6 +115,7 @@ public class SaslServerImpl extends Sasl
{
//TODO - Implement
// error - should only be sent server -> client
+ System.out.println(saslChallenge);
}
public void handleResponse(SaslResponse saslResponse, Binary payload, Void context)
@@ -125,6 +129,7 @@ public class SaslServerImpl extends Sasl
{
//TODO - Implement
// error - should only be sent server -> client
+ System.out.println(saslOutcome);
}
@Override
Modified: qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/engine/impl/TransportImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/engine/impl/TransportImpl.java?rev=1365663&r1=1365662&r2=1365663&view=diff
==============================================================================
--- qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/engine/impl/TransportImpl.java (original)
+++ qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/engine/impl/TransportImpl.java Wed Jul 25 16:47:46 2012
@@ -142,7 +142,6 @@ public class TransportImpl extends Endpo
written += processAttach(outputBuffer);
written += processReceiverFlow(outputBuffer);
written += processReceiverDisposition(outputBuffer);
- written += processReceiverFlow(outputBuffer); // TODO
written += processMessageData(outputBuffer);
written += processSenderDisposition(outputBuffer);
written += processSenderFlow(outputBuffer);
@@ -448,7 +447,7 @@ public class TransportImpl extends Endpo
}
}
}
- endpoint = endpoint.getNext();
+ endpoint = endpoint.transportNext();
}
endpoint = _connectionEndpoint.getTransportHead();
while(endpoint != null && buffer.remaining() >= _maxFrameSize)
@@ -540,7 +539,7 @@ public class TransportImpl extends Endpo
}
}
- endpoint = endpoint.getNext();
+ endpoint = endpoint.transportNext();
}
return written;
}
Copied: qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/logging/LogHandler.java (from r1364816, qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/ApplicationFactory.java)
URL: http://svn.apache.org/viewvc/qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/logging/LogHandler.java?p2=qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/logging/LogHandler.java&p1=qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/ApplicationFactory.java&r1=1364816&r2=1365663&rev=1365663&view=diff
==============================================================================
--- qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/ApplicationFactory.java (original)
+++ qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/logging/LogHandler.java Wed Jul 25 16:47:46 2012
@@ -18,10 +18,31 @@
* under the License.
*
*/
+package org.apache.qpid.proton.logging;
-package org.apache.qpid.proton.driver;
-
-public interface ApplicationFactory
+public interface LogHandler
{
- Application createApplication();
+ public boolean isTraceEnabled();
+
+ public void trace(String message);
+
+ public boolean isDebugEnabled();
+
+ public void debug(String message);
+
+ public void debug(Throwable t, String message);
+
+ public boolean isInfoEnabled();
+
+ public void info(String message);
+
+ public void info(Throwable t, String message);
+
+ public void warn(String message);
+
+ public void warn(Throwable t, String message);
+
+ public void error(String message);
+
+ public void error(Throwable t, String message);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org