You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ra...@apache.org on 2012/07/25 18:47:47 UTC

svn commit: r1365663 - in /qpid/proton/branches/rajith_sandbox: examples/mailbox/ proton-j/src/org/apache/qpid/proton/driver/ proton-j/src/org/apache/qpid/proton/driver/impl/ proton-j/src/org/apache/qpid/proton/engine/impl/ proton-j/src/org/apache/qpid...

Author: rajith
Date: Wed Jul 25 16:47:46 2012
New Revision: 1365663

URL: http://svn.apache.org/viewvc?rev=1365663&view=rev
Log:
QPID-4132 Experimental Driver implementation for proton-j. This is
initial checkin focus on just getting it to work. A lot more
improvements are to follow.

Added:
    qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/ConnectorFactory.java
      - copied, changed from r1364816, qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/BytesTransport.java
    qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/MailServer.java
    qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/ServerConnectorFactory.java
      - copied, changed from r1364816, qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/Application.java
    qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/logging/
    qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/logging/LogHandler.java
      - copied, changed from r1364816, qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/ApplicationFactory.java
Removed:
    qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/Application.java
    qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/ApplicationFactory.java
    qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/BytesTransport.java
    qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/ConnectionTransport.java
    qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/DelegatingTransport.java
    qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/NIOAcceptingDriver.java
    qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/NoddyBrokerApplication.java
Modified:
    qpid/proton/branches/rajith_sandbox/examples/mailbox/fetch
    qpid/proton/branches/rajith_sandbox/examples/mailbox/post
    qpid/proton/branches/rajith_sandbox/examples/mailbox/server
    qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/Connector.java
    qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/Driver.java
    qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/Listener.java
    qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/DriverImpl.java
    qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/ListenerImpl.java
    qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/ServerConnectorImpl.java
    qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/engine/impl/SaslImpl.java
    qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/engine/impl/SaslServerImpl.java
    qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/engine/impl/TransportImpl.java

Modified: qpid/proton/branches/rajith_sandbox/examples/mailbox/fetch
URL: http://svn.apache.org/viewvc/qpid/proton/branches/rajith_sandbox/examples/mailbox/fetch?rev=1365663&r1=1365662&r2=1365663&view=diff
==============================================================================
--- qpid/proton/branches/rajith_sandbox/examples/mailbox/fetch (original)
+++ qpid/proton/branches/rajith_sandbox/examples/mailbox/fetch Wed Jul 25 16:47:46 2012
@@ -77,6 +77,7 @@ class FetchClient(object):
 
         # inform the engine about the connection, and link the driver to it.
         self.conn = pn_connection()
+        pn_connection_set_container(self.conn, "fetch")
         pn_connector_set_connection(self.cxtr, self.conn)
 
         # create a session, and Link for receiving from the mailbox

Modified: qpid/proton/branches/rajith_sandbox/examples/mailbox/post
URL: http://svn.apache.org/viewvc/qpid/proton/branches/rajith_sandbox/examples/mailbox/post?rev=1365663&r1=1365662&r2=1365663&view=diff
==============================================================================
--- qpid/proton/branches/rajith_sandbox/examples/mailbox/post (original)
+++ qpid/proton/branches/rajith_sandbox/examples/mailbox/post Wed Jul 25 16:47:46 2012
@@ -76,6 +76,7 @@ class PostClient(object):
 
         # inform the engine about the connection, and link the driver to it.
         self.conn = pn_connection()
+        pn_connection_set_container(self.conn, "post")
         pn_connector_set_connection(self.cxtr, self.conn)
 
         # create a session, and Link for receiving from the mailbox

Modified: qpid/proton/branches/rajith_sandbox/examples/mailbox/server
URL: http://svn.apache.org/viewvc/qpid/proton/branches/rajith_sandbox/examples/mailbox/server?rev=1365663&r1=1365662&r2=1365663&view=diff
==============================================================================
--- qpid/proton/branches/rajith_sandbox/examples/mailbox/server (original)
+++ qpid/proton/branches/rajith_sandbox/examples/mailbox/server Wed Jul 25 16:47:46 2012
@@ -161,7 +161,9 @@ class MailboxServer(object):
             state = pn_sasl_state(sasl)
 
         if state == PN_SASL_PASS:
-            pn_connector_set_connection(cxtr, pn_connection());
+            conn =  pn_connection()
+            pn_connection_set_container(conn, "mail-server")
+            pn_connector_set_connection(cxtr, conn)
             pn_connector_set_context(cxtr, CONNECTION_UP)
             self.log("Authentication-PASSED")
         elif state == PN_SASL_FAIL:

Modified: qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/Connector.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/Connector.java?rev=1365663&r1=1365662&r2=1365663&view=diff
==============================================================================
--- qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/Connector.java (original)
+++ qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/Connector.java Wed Jul 25 16:47:46 2012
@@ -1,77 +1,104 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
 package org.apache.qpid.proton.driver;
 
 import org.apache.qpid.proton.engine.Connection;
 import org.apache.qpid.proton.engine.Sasl;
 
+/**
+ * Client API
+ *
+ * @param <C> application supplied context
+ */
 public interface Connector<C>
 {
     /** Service the given connector.
-         *
-         * Handle any inbound data, outbound data, or timing events pending on
-         * the connector.
-         *
-         */
-        void process();
-
-        /** Access the listener which opened this connector.
-         *
-         * @return the listener which created this connector, or NULL if the
-         *         connector has no listener (e.g. an outbound client
-         *         connection)
-         */
-        Listener listener();
-
-        /** Access the Authentication and Security context of the connector.
-         *
-         * @return the Authentication and Security context for the connector,
-         *         or NULL if none
-         */
-        Sasl sasl();
-
-        /** Access the AMQP Connection associated with the connector.
-         *
-         * @return the connection context for the connector, or NULL if none
-         */
-        Connection getConnection();
-
-        /** Assign the AMQP Connection associated with the connector.
-         *
-         * @param connection the connection to associate with the
-         *                       connector
-         */
-        void setConnection(Connection connection);
-
-        /** Access the application context that is associated with the
-         *  connector.
-         *
-         * @return the application context that was passed to pn_connector()
-         *         or pn_connector_fd()
-         */
-        C getContext();
-
-        /** Assign a new application context to the connector.
-         *
-         * @param[in] context new application context to associate with the
-         *                    connector
-         */
-        void setContext(C context);
-
-        /** Close the socket used by the connector.
-         *
-        */
-        void close();
-
-        /** Determine if the connector is closed.
-         *
-         * @return True if closed, otherwise false
-         */
-        boolean isClosed();
-
-        /** Destructor for the given connector.
-         *
-         * Assumes the connector's socket has been closed prior to call.
-         *
-         */
-        void destroy();
-
+     *
+     * Handle any inbound data, outbound data, or timing events pending on
+     * the connector.
+     *
+     */
+    void process();
+
+    /** Access the listener which opened this connector.
+     *
+     * @return the listener which created this connector, or NULL if the
+     *         connector has no listener (e.g. an outbound client
+     *         connection).
+     */
+    @SuppressWarnings("rawtypes")
+    Listener listener();
+
+    /** Access the Authentication and Security context of the connector.
+     *
+     * @return the Authentication and Security context for the connector,
+     *         or NULL if none.
+     */
+    Sasl sasl();
+
+    /** Access the AMQP Connection associated with the connector.
+     *
+     * @return the connection context for the connector, or NULL if none.
+     */
+    Connection getConnection();
+
+    /** Assign the AMQP Connection associated with the connector.
+     *
+     * @param connection the connection to associate with the connector.
+     */
+    void setConnection(Connection connection);
+
+    /**
+     * Access the application context that is associated with the connector.
+     *
+     * @return the application context that was passed when creating this
+     *         connector. See
+     *         {@link Driver#createConnector(String, int, Object)
+     *         createConnector(String, int, Object)} and
+     *         {@link Driver#createConnector(java.nio.channels.SelectableChannel, Object)
+     *         createConnector(java.nio.channels.SelectableChannel, Object)}.
+     */
+    C getContext();
+
+    /** Assign a new application context to the connector.
+     *
+     * @param context new application context to associate with the connector
+     */
+    void setContext(C context);
+
+    /** Close the socket used by the connector.
+     *
+     */
+    void close();
+
+    /** Determine if the connector is closed.
+     *
+     * @return True if closed, otherwise false
+     */
+    boolean isClosed();
+
+    /** Destructor for the given connector.
+     *
+     * Assumes the connector's socket has been closed prior to call.
+     *
+     */
+    void destroy();
 }

Modified: qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/Driver.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/Driver.java?rev=1365663&r1=1365662&r2=1365663&view=diff
==============================================================================
--- qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/Driver.java (original)
+++ qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/Driver.java Wed Jul 25 16:47:46 2012
@@ -1,88 +1,123 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
 package org.apache.qpid.proton.driver;
 
 import java.nio.channels.SelectableChannel;
 import java.nio.channels.ServerSocketChannel;
 
+/**
+ * The Driver interface provides an abstraction for an implementation of
+ * a driver for the proton engine.
+ * A driver is responsible for providing input, output, and tick events,
+ * to the bottom half of the engine API. TODO See ::pn_input, ::pn_output, and
+ * ::pn_tick.
+ * The driver also provides an interface for the application to access,
+ * the top half of the API when the state of the engine may have changed
+ * due to I/O or timing events. Additionally the driver incorporates the SASL
+ * engine as well in order to provide a complete network stack: AMQP over SASL
+ * over TCP.
+ */
+
 public interface Driver
 {
-    /** Force pn_driver_wait() to return
+    /**
+     * Force wait() to return
      *
      */
     void wakeup();
 
-    /** Wait for an active connector or listener
+    /**
+     * Wait for an active connector or listener
      *
-     * @param[in] timeout maximum time in milliseconds to wait, -1 means
-     *                    infinite wait
+     * @param timeout maximum time in milliseconds to wait.
+     *                0 means infinite wait
      */
     void doWait(int timeout);
 
-    /** Get the next listener with pending data in the driver.
+    /**
+     * Get the next listener with pending data in the driver.
      *
      * @return NULL if no active listener available
      */
+    @SuppressWarnings("rawtypes")
     Listener listener();
 
-    /** Get the next active connector in the driver.
+    /**
+     * Get the next active connector in the driver.
      *
-     * Returns the next connector with pending inbound data, available
-     * capacity for outbound data, or pending tick.
+     * Returns the next connector with pending inbound data, available capacity
+     * for outbound data, or pending tick.
      *
      * @return NULL if no active connector available
      */
+    @SuppressWarnings("rawtypes")
     Connector connector();
 
-    /** Destruct the driver and all associated
-     *  listeners and connectors.
+    /**
+     * Destruct the driver and all associated listeners, connectors and other resources.
      */
     void destroy();
 
-
-
-    /** Construct a listener for the given address.
+    /**
+     * Construct a listener for the given address.
      *
      * @param host local host address to listen on
      * @param port local port to listen on
      * @param context application-supplied, can be accessed via
-     *                    pn_listener_context()
+     *                {@link Listener#getContext() getContext()} method on a listener.
      * @return a new listener on the given host:port, NULL if error
      */
     <C> Listener<C> createListener(String host, int port, C context);
 
-    /** Create a listener using the existing channel.
+    /**
+     * Create a listener using the existing channel.
      *
-     * @param c existing file descriptor for listener to listen on
+     * @param c   existing SocketChannel for listener to listen on
      * @param context application-supplied, can be accessed via
-     *                    pn_listener_context()
+     *                {@link Listener#getContext() getContext()} method on a listener.
      * @return a new listener on the given channel, NULL if error
      */
-    <C> Listener<C>  createListener(ServerSocketChannel c, C context);
+    <C> Listener<C> createListener(ServerSocketChannel c, C context);
 
-
-    /** Construct a connector to the given remote address.
+    /**
+     * Construct a connector to the given remote address.
      *
      * @param host remote host to connect to.
      * @param port remote port to connect to.
-     * @param context application supplied, can be accessed via
-     *                    pn_connector_context() @return a new connector
-     *                    to the given remote, or NULL on error.
+     * @param context application-supplied, can be accessed via
+     *                {@link Connector#getContext() getContext()} method on a listener.
+     *
+     * @return a new connector to the given remote, or NULL on error.
      */
     <C> Connector<C> createConnector(String host, int port, C context);
 
-    /** Create a connector using the existing file descriptor.
+    /**
+     * Create a connector using the existing file descriptor.
      *
-     * @param fd existing file descriptor to use for this connector.
+     * @param c   existing SocketChannel for listener to listen on
      * @param context application-supplied, can be accessed via
-     *                    pn_connector_context()
+     *                {@link Connector#getContext() getContext()} method on a listener.
+     *
      * @return a new connector to the given host:port, NULL if error.
      */
     <C> Connector<C> createConnector(SelectableChannel fd, C context);
-
-    /** Set the tracing level for the given connector.
-     *
-     * @param trace the trace level to use.
-     */
-    //void pn_connector_trace(pn_connector_t *connector, pn_trace_t trace);
-
-
 }

Modified: qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/Listener.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/Listener.java?rev=1365663&r1=1365662&r2=1365663&view=diff
==============================================================================
--- qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/Listener.java (original)
+++ qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/Listener.java Wed Jul 25 16:47:46 2012
@@ -1,44 +1,61 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
 package org.apache.qpid.proton.driver;
 
+/**
+ * Server API.
+ *
+ * @param <C> application supplied context
+ */
 public interface Listener<C>
 {
-    /** pn_listener - the server API **/
-
-
     /**
-     * @todo pn_listener_trace needs documentation
-     */
-    //void pn_listener_trace(pn_listener_t *listener, pn_trace_t trace);
-
-    /** Accept a connection that is pending on the listener.
+     * Accept a connection that is pending on the listener.
      *
-     * @param[in] listener the listener to accept the connection on
-     * @return a new connector for the remote, or NULL on error
+     * @return a new connector for the remote, or NULL on error.
      */
-    Connector accept();
+    Connector<C> accept();
 
-    /** Access the application context that is associated with the listener.
+    /**
+     * Access the application context that is associated with the listener.
      *
-     * @param[in] listener the listener whose context is to be returned
-     * @return the application context that was passed to pn_listener() or
-     *         pn_listener_fd()
+     * @return the application context that was passed when creating this
+     *         listener. See {@link Driver#createListener(String, int, Object)
+     *         createListener(String, int, Object)} and
+     *         {@link Driver#createConnector(java.nio.channels.SelectableChannel, Object)
+     *         createConnector(java.nio.channels.SelectableChannel, Object)}
      */
     C getContext();
 
-    /** Close the socket used by the listener.
+    /**
+     * Close the socket used by the listener.
      *
-     * @param[in] listener the listener whose socket will be closed.
      */
     void close();
 
-    /** Destructor for the given listener.
+    /**
+     * Destructor for the given listener.
      *
      * Assumes the listener's socket has been closed prior to call.
      *
-     * @param[in] listener the listener object to destroy, no longer valid
-     *            on return
      */
     void destroy();
-
-
 }

Copied: qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/ConnectorFactory.java (from r1364816, qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/BytesTransport.java)
URL: http://svn.apache.org/viewvc/qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/ConnectorFactory.java?p2=qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/ConnectorFactory.java&p1=qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/BytesTransport.java&r1=1364816&r2=1365663&rev=1365663&view=diff
==============================================================================
--- qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/BytesTransport.java (original)
+++ qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/ConnectorFactory.java Wed Jul 25 16:47:46 2012
@@ -18,12 +18,14 @@
  * under the License.
  *
  */
+package org.apache.qpid.proton.driver.impl;
 
-package org.apache.qpid.proton.driver;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
 
-public interface BytesTransport
-{
-    int input(byte[] bytes, int offset, int size);
+import org.apache.qpid.proton.driver.Connector;
 
-    int output(byte[] bytes, int offset, int size);
+public interface ConnectorFactory
+{
+    <C> Connector<C> createConnector(DriverImpl driver, SocketChannel sc, C context, SelectionKey key);
 }

Modified: qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/DriverImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/DriverImpl.java?rev=1365663&r1=1365662&r2=1365663&view=diff
==============================================================================
--- qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/DriverImpl.java (original)
+++ qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/DriverImpl.java Wed Jul 25 16:47:46 2012
@@ -1,23 +1,55 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
 package org.apache.qpid.proton.driver.impl;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
-import java.nio.channels.*;
+import java.net.Socket;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.Set;
+
 import org.apache.qpid.proton.driver.Connector;
 import org.apache.qpid.proton.driver.Driver;
 import org.apache.qpid.proton.driver.Listener;
+import org.apache.qpid.proton.logging.LogHandler;
 
 public class DriverImpl implements Driver
 {
     private Selector _selector;
     private Set<SelectionKey> _selectedKeys = Collections.emptySet();
+    private ConnectorFactory _connectorFactory;
+    private LogHandler _logger;
 
-    public DriverImpl() throws IOException
+    public DriverImpl(ConnectorFactory connectorFactory, LogHandler logger) throws IOException
     {
+        _connectorFactory = connectorFactory;
+        _logger = logger;
         _selector = Selector.open();
     }
 
@@ -35,10 +67,12 @@ public class DriverImpl implements Drive
         }
         catch (IOException e)
         {
-            e.printStackTrace();  // TODO - Implement
+            _logger.error(e, "Exception when waiting for IO Event");
+            throw new RuntimeException(e);
         }
     }
 
+    @SuppressWarnings("rawtypes")
     public Listener listener()
     {
         Listener listener = null;
@@ -51,7 +85,8 @@ public class DriverImpl implements Drive
             }
             catch (IOException e)
             {
-                e.printStackTrace();  // TODO - Implement
+                _logger.error(e, "Exception when selecting");
+                throw new RuntimeException(e);
             }
             listener = getFirstListener();
         }
@@ -64,6 +99,7 @@ public class DriverImpl implements Drive
         _selectedKeys = _selector.selectedKeys();
     }
 
+    @SuppressWarnings("rawtypes")
     private Listener getFirstListener()
     {
         Iterator<SelectionKey> selectedIter = _selectedKeys.iterator();
@@ -74,14 +110,13 @@ public class DriverImpl implements Drive
             selectedIter.remove();
             if(key.isAcceptable())
             {
-                selectedIter.remove();
                 return (Listener) key.attachment();
-
             }
         }
         return null;
     }
 
+    @SuppressWarnings("rawtypes")
     public Connector connector()
     {
         Connector connector = null;
@@ -94,13 +129,15 @@ public class DriverImpl implements Drive
             }
             catch (IOException e)
             {
-                e.printStackTrace();  // TODO - Implement
+                _logger.error(e, "Exception when selecting");
+                throw new RuntimeException(e);
             }
             connector = getFirstConnector();
         }
         return connector;
     }
 
+    @SuppressWarnings("rawtypes")
     private Connector getFirstConnector()
     {
         Iterator<SelectionKey> selectedIter = _selectedKeys.iterator();
@@ -111,7 +148,6 @@ public class DriverImpl implements Drive
             selectedIter.remove();
             if(key.isReadable() || key.isWritable())
             {
-                selectedIter.remove();
                 return (Connector) key.attachment();
 
             }
@@ -122,7 +158,15 @@ public class DriverImpl implements Drive
 
     public void destroy()
     {
-        //TODO - Implement
+        try
+        {
+            _selector.close();
+        }
+        catch (IOException e)
+        {
+            _logger.error(e, "Exception when closing selector");
+            throw new RuntimeException(e);
+        }
     }
 
     public <C> Listener<C> createListener(String host, int port, C context)
@@ -150,8 +194,9 @@ public class DriverImpl implements Drive
     {
         try
         {
-            c.register(_selector, SelectionKey.OP_ACCEPT);
-            return new ListenerImpl<C>(this, c, context);
+            Listener<C> l = new ListenerImpl<C>(this, c, context);
+            c.register(_selector, SelectionKey.OP_ACCEPT,l);
+            return l;
         }
         catch (ClosedChannelException e)
         {
@@ -162,11 +207,41 @@ public class DriverImpl implements Drive
 
     public <C> Connector<C> createConnector(String host, int port, C context)
     {
-        return null;  //TODO - Implement
+        try
+        {
+            SocketChannel channel = SocketChannel.open();
+            channel.configureBlocking(false);
+            Socket socket = channel.socket();
+            socket.bind(new InetSocketAddress(host, port));
+            return createConnector(channel, context);
+        }
+        catch (IOException e)
+        {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+            throw new RuntimeException(e);
+        }
+    }
+
+    public <C> Connector<C> createConnector(SelectableChannel c, C context)
+    {
+        try
+        {
+            int opKeys = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
+            SelectionKey key = c.register(_selector, opKeys);
+            Connector<C> co = _connectorFactory.createConnector(this, (SocketChannel)c, context, key);
+            key.attach(co);
+            return co;
+        }
+        catch (ClosedChannelException e)
+        {
+            e.printStackTrace();  // TODO - Implement
+            throw new RuntimeException(e);
+        }
     }
 
-    public <C> Connector<C> createConnector(SelectableChannel fd, C context)
+    protected LogHandler getLogHandler()
     {
-        return null;  //TODO - Implement
+        return _logger;
     }
 }

Modified: qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/ListenerImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/ListenerImpl.java?rev=1365663&r1=1365662&r2=1365663&view=diff
==============================================================================
--- qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/ListenerImpl.java (original)
+++ qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/ListenerImpl.java Wed Jul 25 16:47:46 2012
@@ -1,3 +1,23 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
 package org.apache.qpid.proton.driver.impl;
 
 import java.io.IOException;
@@ -27,7 +47,7 @@ class ListenerImpl<C> implements Listene
             if(c != null)
             {
                 c.configureBlocking(false);
-                return new ServerConnectorImpl(_driver,c);
+                return _driver.createConnector(c, _context);
             }
         }
         catch (IOException e)

Added: qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/MailServer.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/MailServer.java?rev=1365663&view=auto
==============================================================================
--- qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/MailServer.java (added)
+++ qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/MailServer.java Wed Jul 25 16:47:46 2012
@@ -0,0 +1,430 @@
+package org.apache.qpid.proton.driver.impl;
+
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.qpid.proton.driver.Connector;
+import org.apache.qpid.proton.driver.Driver;
+import org.apache.qpid.proton.driver.Listener;
+import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.EndpointState;
+import org.apache.qpid.proton.engine.Link;
+import org.apache.qpid.proton.engine.Receiver;
+import org.apache.qpid.proton.engine.Sasl;
+import org.apache.qpid.proton.engine.Sasl.SaslOutcome;
+import org.apache.qpid.proton.engine.Sasl.SaslState;
+import org.apache.qpid.proton.engine.Sender;
+import org.apache.qpid.proton.engine.Session;
+import org.apache.qpid.proton.engine.impl.ConnectionImpl;
+import org.apache.qpid.proton.engine.impl.SaslServerImpl;
+import org.apache.qpid.proton.logging.LogHandler;
+import org.apache.qpid.proton.type.transport.DeliveryState;
+
+public class MailServer
+{
+    enum State {NEW,AUTHENTICATING,CONNECTION_UP, FAILED};
+
+    private Driver _driver;
+    private LogHandler _logger;
+    private Listener<State> _listener;
+    private int _counter;
+    private Map<String,List<byte[]>> _mailboxes = new HashMap<String,List<byte[]>>();
+
+    public MailServer() throws Exception
+    {
+        _logger = new Logger();
+        _driver = new DriverImpl(new ServerConnectorFactory(),_logger);
+        _listener = _driver.createListener("localhost", 5672, State.NEW);
+    }
+
+    public void doWait()
+    {
+        _driver.doWait(0);
+    }
+
+    public void acceptConnections()
+    {
+        // We have only one listener
+        if (_driver.listener() != null)
+        {
+            _logger.info("Accepting Connection.");
+            Connector<State> ctor = _listener.accept();
+            ctor.setContext(State.AUTHENTICATING);
+        }
+    }
+
+    public void processConnections() throws Exception
+    {
+        Connector<State> ctor = _driver.connector();
+        while (ctor != null)
+        {
+            // process any data coming from the network, this will update the
+            // engine's view of the state of the remote clients
+            ctor.process();
+            State state = ctor.getContext();
+            if (state == State.AUTHENTICATING)
+            {
+                //connection has not passed SASL authentication yet
+                authenticateConnector(ctor);
+            }
+            else if (state == State.CONNECTION_UP)
+            {
+                //active connection, service any engine events
+                serviceConnector(ctor);
+            }
+            else
+            {
+                _logger.error("Unknown state.");
+            }
+
+            // now generate any outbound network data generated in response to
+            // any work done by the engine.
+            ctor.process();
+
+            ctor = _driver.connector();
+        }
+    }
+
+    private void authenticateConnector(Connector<State> ctor) throws Exception
+    {
+        _logger.info("Authenticating...");
+        Sasl sasl = ctor.sasl();
+        SaslState state = sasl.getState();
+        while (state == SaslState.PN_SASL_CONF || state == SaslState.PN_SASL_STEP)
+        {
+            if (state == SaslState.PN_SASL_CONF)
+            {
+                _logger.info("Authenticating-CONF...");
+                sasl.setMechanisms(new String[]{"ANONYMOUS"});
+            }
+            else if (state == SaslState.PN_SASL_STEP)
+            {
+                _logger.info("Authenticating-STEP...");
+                String[] mechs = sasl.getRemoteMechanisms();
+                if (mechs[0] == "ANONYMOUS")
+                {
+                    ((SaslServerImpl)sasl).done(SaslOutcome.PN_SASL_OK);
+                }
+                else
+                {
+                    //sasl.(sasl, PN_SASL_AUTH)
+                }
+            }
+            state = sasl.getState();
+        }
+
+        if (state == SaslState.PN_SASL_PASS)
+        {
+            ctor.setConnection(new ConnectionImpl());
+            ctor.setContext(State.CONNECTION_UP);
+            _logger.info("Authentication-PASSED");
+        }
+        else if (state == SaslState.PN_SASL_FAIL)
+        {
+            ctor.setContext(State.FAILED);
+            ctor.close();
+            _logger.info("Authentication-FAILED");
+        }
+        else
+        {
+            _logger.info("Authentication-PENDING");
+        }
+
+    }
+
+    private void serviceConnector(Connector<State> ctor) throws Exception
+    {
+       Connection con = ctor.getConnection();
+
+       // Step 1: setup the engine's connection, and any sessions and links
+       // that may be pending.
+
+       // initialize the connection if it's new
+       if (con.getLocalState() == EndpointState.UNINITIALIZED)
+       {
+           con.open();
+           _logger.debug("Connection Opened.");
+       }
+
+       // open all pending sessions
+       Session ssn = con.sessionHead(EnumSet.of(EndpointState.UNINITIALIZED),
+               EnumSet.of(EndpointState.UNINITIALIZED,EndpointState.ACTIVE));
+       while (ssn != null)
+       {
+
+           ssn.open();
+           _logger.debug("Session Opened.");
+           ssn = con.sessionHead(EnumSet.of(EndpointState.UNINITIALIZED),
+           EnumSet.of(EndpointState.UNINITIALIZED,EndpointState.ACTIVE));
+       }
+
+       // configure and open any pending links
+       Link link = con.linkHead(EnumSet.of(EndpointState.UNINITIALIZED),
+               EnumSet.of(EndpointState.UNINITIALIZED,EndpointState.ACTIVE));
+       while (link != null)
+       {
+
+           setupLink(link);
+           _logger.debug("Link Opened.");
+           link = con.linkHead(EnumSet.of(EndpointState.UNINITIALIZED),
+           EnumSet.of(EndpointState.UNINITIALIZED,EndpointState.ACTIVE));
+       }
+
+       // Step 2: Now drain all the pending deliveries from the connection's
+       // work queue and process them
+
+       Delivery delivery = con.getWorkHead();
+       while (delivery != null)
+       {
+           _logger.debug("Process delivery " + delivery.getTag());
+
+           if (delivery.isReadable())   // inbound data available
+           {
+               processReceive(delivery);
+           }
+           else if (delivery.isWritable()) // can send a message
+           {
+               sendMessage(delivery);
+           }
+
+           // check to see if the remote has accepted message we sent
+           if (delivery.remotelySettled())
+           {
+               // once we know the remote has seen the message, we can
+               // release the delivery.
+               delivery.settle();
+           }
+
+           delivery = con.getWorkHead();
+       }
+
+       // Step 3: Clean up any links or sessions that have been closed by the
+       // remote.  If the connection has been closed remotely, clean that up also.
+
+       // teardown any terminating links
+       link = con.linkHead(EnumSet.of(EndpointState.ACTIVE),
+               EnumSet.of(EndpointState.CLOSED));
+       while (link != null)
+       {
+           link.close();
+           _logger.debug("Link Closed");
+           link = con.linkHead(EnumSet.of(EndpointState.ACTIVE),
+                   EnumSet.of(EndpointState.CLOSED));
+       }
+
+       // teardown any terminating sessions
+       ssn = con.sessionHead(EnumSet.of(EndpointState.ACTIVE),
+               EnumSet.of(EndpointState.CLOSED));
+       while (ssn != null)
+       {
+           ssn.close();
+           _logger.debug("Session Closed");
+           ssn = con.sessionHead(EnumSet.of(EndpointState.ACTIVE),
+                   EnumSet.of(EndpointState.CLOSED));
+       }
+
+       // teardown the connection if it's terminating
+       if (con.getRemoteState() == EndpointState.CLOSED)
+       {
+           _logger.debug("Connection Closed");
+           con.close();
+       }
+    }
+
+    private void setupLink(Link link)
+    {
+        String src = link.getRemoteSourceAddress();
+        String target = link.getRemoteTargetAddress();
+
+        if (link instanceof Sender)
+        {
+            _logger.debug("Opening Link to read from mailbox: " + src);
+            if (!_mailboxes.containsKey(src))
+            {
+                _logger.error("Error: mailbox " + src + " does not exist!");
+                // TODO report error.
+            }
+        }
+        else
+        {
+            _logger.debug("Opening Link to write from mailbox: " + target);
+            if (!_mailboxes.containsKey(target))
+            {
+                _mailboxes.put(target, new ArrayList<byte[]>());
+            }
+        }
+
+        link.setLocalSourceAddress(src);
+        link.setLocalTargetAddress(target);
+
+        if (link instanceof Sender)
+        {
+            // grant a delivery to the link - it will become "writable" when the
+            // driver can accept messages for the sender.
+            String id = "server-delivery-" + _counter;
+            link.delivery(id.getBytes(),0,id.getBytes().length);
+            _counter++;
+        }
+        else
+        {
+            // Grant enough credit to the receiver to allow one inbound message
+            ((Receiver)link).flow(1);
+        }
+        link.open();
+    }
+
+    private void processReceive(Delivery d)
+    {
+        Receiver rec = (Receiver)d.getLink();
+        String mailboxName = rec.getRemoteTargetAddress();
+        List<byte[]> mailbox;
+        if (!_mailboxes.containsKey(mailboxName))
+        {
+            _logger.error("Error: cannot sent to mailbox " + mailboxName + " - dropping message.");
+        }
+        else
+        {
+            mailbox = _mailboxes.get(mailboxName);
+            byte[] readBuf = new byte[1024];
+            int  bytesRead = rec.recv(readBuf, 0, readBuf.length);
+            ByteArrayOutputStream bout = new ByteArrayOutputStream();
+            while (bytesRead > 0)
+            {
+                bout.write(readBuf, 0, bytesRead);
+                bytesRead = rec.recv(readBuf, 0, readBuf.length);
+            }
+            mailbox.add(bout.toByteArray());
+        }
+
+        d.disposition(null);
+        d.settle();
+        rec.advance();
+        if (rec.getCredit() == 0)
+        {
+            rec.flow(1);
+        }
+    }
+
+    private void sendMessage(Delivery d)
+    {
+        Sender sender = (Sender)d.getLink();
+        String mailboxName = sender.getRemoteSourceAddress();
+        _logger.error("Request for Mailbox : " + mailboxName);
+        byte[] msg;
+        if (_mailboxes.containsKey(mailboxName))
+        {
+            msg = _mailboxes.get(mailboxName).remove(0);
+            _logger.debug("Fetching message " + new String(msg));
+        }
+        else
+        {
+            _logger.debug("Warning: mailbox " + mailboxName + " is empty, sending empty message.");
+            msg = "".getBytes();
+        }
+        sender.send(msg, 0, msg.length);
+        if (sender.advance())
+        {
+            String id = "server-delivery-" + _counter;
+            sender.delivery(id.getBytes(),0,id.getBytes().length);
+            _counter++;
+        }
+    }
+
+    public static void main(String[] args) throws Exception
+    {
+        MailServer server = new MailServer();
+        while (true)
+        {
+            server.doWait();
+            server.acceptConnections();
+            server.processConnections();
+        }
+    }
+
+    class Logger implements LogHandler
+    {
+        static final String prefix = "TestServer: ";
+
+        @Override
+        public boolean isTraceEnabled()
+        {
+            return true;
+        }
+
+        @Override
+        public void trace(String message)
+        {
+            System.out.println(prefix + message);
+        }
+
+        @Override
+        public boolean isDebugEnabled()
+        {
+            return true;
+        }
+
+        @Override
+        public void debug(String message)
+        {
+            System.out.println(prefix + "DEBUG : " + message);
+        }
+
+        @Override
+        public void debug(Throwable t, String message)
+        {
+            System.out.println(prefix + "DEBUG : " + message);
+            t.printStackTrace();
+        }
+
+        @Override
+        public boolean isInfoEnabled()
+        {
+            return true;
+        }
+
+        @Override
+        public void info(String message)
+        {
+            System.out.println(prefix + "INFO : " + message);
+        }
+
+        @Override
+        public void info(Throwable t, String message)
+        {
+            System.out.println(prefix + "INFO : " + message);
+            t.printStackTrace();
+        }
+
+        @Override
+        public void warn(String message)
+        {
+            System.out.println(prefix + "WARN : " + message);
+        }
+
+        @Override
+        public void warn(Throwable t, String message)
+        {
+            System.out.println(prefix + "INFO : " + message);
+            t.printStackTrace();
+        }
+
+        @Override
+        public void error(String message)
+        {
+            System.out.println(prefix + "ERROR : " + message);
+        }
+
+        @Override
+        public void error(Throwable t, String message)
+        {
+            System.out.println(prefix + "INFO : " + message);
+            t.printStackTrace();
+        }
+
+    }
+}

Copied: qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/ServerConnectorFactory.java (from r1364816, qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/Application.java)
URL: http://svn.apache.org/viewvc/qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/ServerConnectorFactory.java?p2=qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/ServerConnectorFactory.java&p1=qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/Application.java&r1=1364816&r2=1365663&rev=1365663&view=diff
==============================================================================
--- qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/Application.java (original)
+++ qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/ServerConnectorFactory.java Wed Jul 25 16:47:46 2012
@@ -18,12 +18,20 @@
  * under the License.
  *
  */
+package org.apache.qpid.proton.driver.impl;
 
-package org.apache.qpid.proton.driver;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
 
-import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.driver.Connector;
+import org.apache.qpid.proton.driver.Driver;
 
-public interface Application
+public class ServerConnectorFactory implements ConnectorFactory
 {
-    void process(Connection conn);
+    @Override
+    public <C> Connector<C> createConnector(DriverImpl driver, SocketChannel sc, C context, SelectionKey key)
+    {
+        return new ServerConnectorImpl<C>(driver, sc,context, key);
+    }
+
 }

Modified: qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/ServerConnectorImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/ServerConnectorImpl.java?rev=1365663&r1=1365662&r2=1365663&view=diff
==============================================================================
--- qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/ServerConnectorImpl.java (original)
+++ qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/ServerConnectorImpl.java Wed Jul 25 16:47:46 2012
@@ -1,28 +1,205 @@
 package org.apache.qpid.proton.driver.impl;
 
+import static org.apache.qpid.proton.driver.impl.ServerConnectorImpl.ConnectorState.NEW;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
 import java.nio.channels.SocketChannel;
+
 import org.apache.qpid.proton.driver.Connector;
 import org.apache.qpid.proton.driver.Listener;
 import org.apache.qpid.proton.engine.Connection;
 import org.apache.qpid.proton.engine.Sasl;
 import org.apache.qpid.proton.engine.impl.SaslServerImpl;
+import org.apache.qpid.proton.logging.LogHandler;
 
-class ServerConnectorImpl implements Connector
+class ServerConnectorImpl<C> implements Connector<C>
 {
+    public static int END_OF_STREAM = -1;
+    private static int DEFAULT_BUFFER_SIZE = 64 * 1024;
+    private static int readBufferSize = Integer.getInteger
+        ("pn.receive_buffer_size", DEFAULT_BUFFER_SIZE);
+    private static int writeBufferSize = Integer.getInteger
+        ("pn.send_buffer_size", DEFAULT_BUFFER_SIZE);
+
+    enum ConnectorState {NEW, OPENED, CLOSED};
+
     private final SaslServerImpl _sasl;
-    private DriverImpl _driver;
-    private SocketChannel _channel;
+    private final DriverImpl _driver;
+    private final SocketChannel _channel;
+    private final LogHandler _logger;
+    private C _context;
+    private Connection _connection;
+    private SelectionKey _key;
+    private ConnectorState _state = NEW;
+
+    private ByteBuffer _readBuffer = ByteBuffer.allocate(readBufferSize);
+    private int _bytesNotRead = 0;
+
+    private int _bytesNotWritten = 0;
+    private ByteBuffer _writeBuffer = ByteBuffer.allocate(writeBufferSize);
 
-    ServerConnectorImpl(DriverImpl driver, SocketChannel c)
+    ServerConnectorImpl(DriverImpl driver, SocketChannel c, C context, SelectionKey key)
     {
         _driver = driver;
         _channel = c;
         _sasl = new SaslServerImpl();
+        _sasl.setMechanisms(new String[]{"ANONYMOUS"}); //TODO
+        _logger = driver.getLogHandler();
+        _context = context;
+        _key = key;
     }
 
     public void process()
     {
-        //TODO - Implement
+        if (!_channel.isOpen())
+        {
+            return;
+        }
+
+        if (_key.isReadable())
+        {
+            read();
+        }
+
+        if (_key.isWritable())
+        {
+            write();
+        }
+    }
+
+    int processInput(byte[] bytes, int offset, int size)
+    {
+        int read = 0;
+        while (read < size)
+        {
+            switch (_state)
+            {
+            case NEW:
+                read += readSasl(bytes, offset, size);
+                writeSasl();
+                break;
+            case OPENED:
+                read += readAMQPCommands(bytes, offset, size);
+                writeAMQPCommands();
+                break;
+            }
+        }
+        return read;
+    }
+
+    void processOutput()
+    {
+        switch (_state)
+        {
+        case NEW:
+            writeSasl();
+            break;
+        case OPENED:
+            writeAMQPCommands();
+            break;
+        }
+    }
+
+    private int readAMQPCommands(byte[] bytes, int offset, int size)
+    {
+        int consumed = _connection.transport().input(bytes, offset, size);
+        if (consumed == END_OF_STREAM)
+        {
+            return size;
+        }
+        else
+        {
+            return consumed;
+        }
+    }
+
+    private void writeAMQPCommands()
+    {
+        int size = _writeBuffer.array().length - _bytesNotWritten;
+        _bytesNotWritten += _connection.transport().output(_writeBuffer.array(),
+                _bytesNotWritten, size);
+    }
+
+    private void setState(ConnectorState newState)
+    {
+        _state = newState;
+    }
+
+    int readSasl(byte[] bytes, int offset, int size)
+    {
+        int consumed = _sasl.input(bytes, offset, size);
+        if (consumed == END_OF_STREAM)
+        {
+            return size;
+        }
+        else
+        {
+            return consumed;
+        }
+    }
+
+    void writeSasl()
+    {
+        int size = _writeBuffer.array().length - _bytesNotWritten;
+        _bytesNotWritten += _sasl.output(_writeBuffer.array(),
+                _bytesNotWritten, size);
+    }
+
+    void write()
+    {
+        try
+        {
+            processOutput();
+            if (_bytesNotWritten > 0)
+            {
+                _writeBuffer.limit(_bytesNotWritten);
+                int written = _channel.write(_writeBuffer);
+                if (_writeBuffer.hasRemaining())
+                {
+                    _writeBuffer.compact();
+                    _bytesNotWritten = _bytesNotWritten - written;
+                }
+                else
+                {
+                    _writeBuffer.clear();
+                    _bytesNotWritten = 0;
+                }
+            }
+        }
+        catch (IOException e)
+        {
+            e.printStackTrace();
+        }
+    }
+
+    void read()
+    {
+        try
+        {
+            int  bytesRead = _channel.read(_readBuffer);
+            int consumed = 0;
+            while (bytesRead > 0)
+            {
+                consumed = processInput(_readBuffer.array(), 0, bytesRead + _bytesNotRead);
+                if (consumed < bytesRead)
+                {
+                    _readBuffer.compact();
+                    _bytesNotRead = bytesRead - consumed;
+                }
+                else
+                {
+                    _readBuffer.rewind();
+                    _bytesNotRead = 0;
+                }
+                bytesRead = _channel.read(_readBuffer);
+            }
+        }
+        catch (IOException e)
+        {
+            e.printStackTrace();
+        }
     }
 
     public Listener listener()
@@ -32,41 +209,63 @@ class ServerConnectorImpl implements Con
 
     public Sasl sasl()
     {
-        return null;  //TODO - Implement
+        return _sasl;
     }
 
     public Connection getConnection()
     {
-        return null;  //TODO - Implement
+        return _connection;
     }
 
     public void setConnection(Connection connection)
     {
-        //TODO - Implement
+        if (_sasl.isDone())
+        {
+            writeSasl();
+        }
+        else
+        {
+            throw new RuntimeException("Cannot set the connection before authentication is completed");
+        }
+
+        _connection = connection;
+        // write any initial data
+        int size = _writeBuffer.array().length - _bytesNotWritten;
+        _bytesNotWritten += _connection.transport().output(_writeBuffer.array(),
+                _bytesNotWritten, size);
+        setState(ConnectorState.OPENED);
     }
 
-    public Object getContext()
+    public C getContext()
     {
-        return null;  //TODO - Implement
+        return _context;
     }
 
-    public void setContext(Object context)
+    public void setContext(C context)
     {
-        //TODO - Implement
+        _context = context;
     }
 
     public void close()
     {
-        //TODO - Implement
+        try
+        {
+            writeSasl();
+            _channel.close();
+        }
+        catch (IOException e)
+        {
+
+        }
     }
 
     public boolean isClosed()
     {
-        return false;  //TODO - Implement
+        return !_channel.isOpen();
     }
 
     public void destroy()
     {
-        //TODO - Implement
+        close();
     }
 }

Modified: qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/engine/impl/SaslImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/engine/impl/SaslImpl.java?rev=1365663&r1=1365662&r2=1365663&view=diff
==============================================================================
--- qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/engine/impl/SaslImpl.java (original)
+++ qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/engine/impl/SaslImpl.java Wed Jul 25 16:47:46 2012
@@ -69,6 +69,7 @@ public abstract class SaslImpl implement
         if(!_overflowBuffer.hasRemaining())
         {
             _overflowBuffer.clear();
+            _overflowBuffer.limit(0);
 
             CompositeWritableBuffer outputBuffer =
                     new CompositeWritableBuffer(

Modified: qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/engine/impl/SaslServerImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/engine/impl/SaslServerImpl.java?rev=1365663&r1=1365662&r2=1365663&view=diff
==============================================================================
--- qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/engine/impl/SaslServerImpl.java (original)
+++ qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/engine/impl/SaslServerImpl.java Wed Jul 25 16:47:46 2012
@@ -40,6 +40,7 @@ public class SaslServerImpl extends Sasl
 
             mechanisms.setSaslServerMechanisms(_mechanisms);
             written += writeFrame(outputBuffer, mechanisms);
+            _state = SaslState.PN_SASL_STEP;
         }
         else if(getChallengeResponse() != null)
         {
@@ -97,11 +98,13 @@ public class SaslServerImpl extends Sasl
     {
         //TODO - Implement
         // error - should only be sent server -> client
+        System.out.println(saslMechanisms);
     }
 
     public void handleInit(SaslInit saslInit, Binary payload, Void context)
     {
         _hostname = saslInit.getHostname();
+        setMechanism(saslInit.getMechanism());
         if(saslInit.getInitialResponse() != null)
         {
             setPending(saslInit.getInitialResponse().asByteBuffer());
@@ -112,6 +115,7 @@ public class SaslServerImpl extends Sasl
     {
         //TODO - Implement
         // error - should only be sent server -> client
+        System.out.println(saslChallenge);
     }
 
     public void handleResponse(SaslResponse saslResponse, Binary payload, Void context)
@@ -125,6 +129,7 @@ public class SaslServerImpl extends Sasl
     {
         //TODO - Implement
         // error - should only be sent server -> client
+        System.out.println(saslOutcome);
     }
 
     @Override

Modified: qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/engine/impl/TransportImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/engine/impl/TransportImpl.java?rev=1365663&r1=1365662&r2=1365663&view=diff
==============================================================================
--- qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/engine/impl/TransportImpl.java (original)
+++ qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/engine/impl/TransportImpl.java Wed Jul 25 16:47:46 2012
@@ -142,7 +142,6 @@ public class TransportImpl extends Endpo
             written += processAttach(outputBuffer);
             written += processReceiverFlow(outputBuffer);
             written += processReceiverDisposition(outputBuffer);
-            written += processReceiverFlow(outputBuffer);       // TODO
             written += processMessageData(outputBuffer);
             written += processSenderDisposition(outputBuffer);
             written += processSenderFlow(outputBuffer);
@@ -448,7 +447,7 @@ public class TransportImpl extends Endpo
                     }
                 }
             }
-            endpoint = endpoint.getNext();
+            endpoint = endpoint.transportNext();
         }
         endpoint = _connectionEndpoint.getTransportHead();
         while(endpoint != null && buffer.remaining() >= _maxFrameSize)
@@ -540,7 +539,7 @@ public class TransportImpl extends Endpo
                 }
 
             }
-            endpoint = endpoint.getNext();
+            endpoint = endpoint.transportNext();
         }
         return written;
     }

Copied: qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/logging/LogHandler.java (from r1364816, qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/ApplicationFactory.java)
URL: http://svn.apache.org/viewvc/qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/logging/LogHandler.java?p2=qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/logging/LogHandler.java&p1=qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/ApplicationFactory.java&r1=1364816&r2=1365663&rev=1365663&view=diff
==============================================================================
--- qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/ApplicationFactory.java (original)
+++ qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/logging/LogHandler.java Wed Jul 25 16:47:46 2012
@@ -18,10 +18,31 @@
  * under the License.
  *
  */
+package org.apache.qpid.proton.logging;
 
-package org.apache.qpid.proton.driver;
-
-public interface ApplicationFactory
+public interface LogHandler
 {
-    Application createApplication();
+   public boolean isTraceEnabled();
+    
+   public void trace(String message);
+   
+   public boolean isDebugEnabled();
+        
+   public void debug(String message);
+
+   public void debug(Throwable t, String message);
+
+   public boolean isInfoEnabled();
+   
+   public void info(String message);
+
+   public void info(Throwable t, String message);
+
+   public void warn(String message);
+   
+   public void warn(Throwable t, String message);
+   
+   public void error(String message);
+
+   public void error(Throwable t, String message);   
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org