You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ra...@apache.org on 2012/07/27 00:45:25 UTC

svn commit: r1366218 - in /qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl: DriverImpl.java ListenerImpl.java MailServer.java ServerConnectorImpl.java

Author: rajith
Date: Thu Jul 26 22:45:24 2012
New Revision: 1366218

URL: http://svn.apache.org/viewvc?rev=1366218&view=rev
Log:
QPID-4312 Cleaned up the code a bit by removing unnessacery bits and
rearrainging parts of the code to improve readability.

Modified:
    qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/DriverImpl.java
    qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/ListenerImpl.java
    qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/MailServer.java
    qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/ServerConnectorImpl.java

Modified: qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/DriverImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/DriverImpl.java?rev=1366218&r1=1366217&r2=1366218&view=diff
==============================================================================
--- qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/DriverImpl.java (original)
+++ qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/DriverImpl.java Thu Jul 26 22:45:24 2012
@@ -37,18 +37,18 @@ import java.util.Set;
 import org.apache.qpid.proton.driver.Connector;
 import org.apache.qpid.proton.driver.Driver;
 import org.apache.qpid.proton.driver.Listener;
+import org.apache.qpid.proton.engine.impl.SaslClientImpl;
+import org.apache.qpid.proton.engine.impl.SaslServerImpl;
 import org.apache.qpid.proton.logging.LogHandler;
 
 public class DriverImpl implements Driver
 {
     private Selector _selector;
     private Set<SelectionKey> _selectedKeys = Collections.emptySet();
-    private ConnectorFactory _connectorFactory;
     private LogHandler _logger;
 
-    public DriverImpl(ConnectorFactory connectorFactory, LogHandler logger) throws IOException
+    public DriverImpl(LogHandler logger) throws IOException
     {
-        _connectorFactory = connectorFactory;
         _logger = logger;
         _selector = Selector.open();
     }
@@ -192,17 +192,10 @@ public class DriverImpl implements Drive
 
     public <C> Listener<C> createListener(ServerSocketChannel c, C context)
     {
-        try
-        {
-            Listener<C> l = new ListenerImpl<C>(this, c, context);
-            c.register(_selector, SelectionKey.OP_ACCEPT,l);
-            return l;
-        }
-        catch (ClosedChannelException e)
-        {
-            e.printStackTrace();  // TODO - Implement
-            throw new RuntimeException(e);
-        }
+        Listener<C> l = new ListenerImpl<C>(this, c, context);
+        SelectionKey key = registerInterest(c,SelectionKey.OP_ACCEPT);
+        key.attach(l);
+        return l;
     }
 
     public <C> Connector<C> createConnector(String host, int port, C context)
@@ -225,13 +218,25 @@ public class DriverImpl implements Drive
 
     public <C> Connector<C> createConnector(SelectableChannel c, C context)
     {
+        SelectionKey key = registerInterest(c,SelectionKey.OP_READ | SelectionKey.OP_WRITE);
+        Connector<C> co = new ServerConnectorImpl<C>(this, null, new SaslClientImpl(),(SocketChannel)c, context, key);
+        key.attach(co);
+        return co;
+    }
+
+    protected <C> Connector<C> createServerConnector(SelectableChannel c, C context, Listener<C> l)
+    {
+        SelectionKey key = registerInterest(c,SelectionKey.OP_READ | SelectionKey.OP_WRITE);
+        Connector<C> co = new ServerConnectorImpl<C>(this, l, new SaslServerImpl(),(SocketChannel)c, context, key);
+        key.attach(co);
+        return co;
+    }
+
+    private <C> SelectionKey registerInterest(SelectableChannel c, int opKeys)
+    {
         try
         {
-            int opKeys = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
-            SelectionKey key = c.register(_selector, opKeys);
-            Connector<C> co = _connectorFactory.createConnector(this, (SocketChannel)c, context, key);
-            key.attach(co);
-            return co;
+            return c.register(_selector, opKeys);
         }
         catch (ClosedChannelException e)
         {
@@ -240,6 +245,7 @@ public class DriverImpl implements Drive
         }
     }
 
+
     protected LogHandler getLogHandler()
     {
         return _logger;

Modified: qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/ListenerImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/ListenerImpl.java?rev=1366218&r1=1366217&r2=1366218&view=diff
==============================================================================
--- qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/ListenerImpl.java (original)
+++ qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/ListenerImpl.java Thu Jul 26 22:45:24 2012
@@ -39,7 +39,7 @@ class ListenerImpl<C> implements Listene
         _context = context;
     }
 
-    public Connector accept()
+    public Connector<C> accept()
     {
         try
         {
@@ -47,7 +47,7 @@ class ListenerImpl<C> implements Listene
             if(c != null)
             {
                 c.configureBlocking(false);
-                return _driver.createConnector(c, _context);
+                return _driver.createServerConnector(c, _context, this);
             }
         }
         catch (IOException e)
@@ -64,11 +64,19 @@ class ListenerImpl<C> implements Listene
 
     public void close()
     {
-        //TODO - Implement
+        try
+        {
+            _channel.close();
+        }
+        catch (IOException e)
+        {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+        }
     }
 
     public void destroy()
     {
-        //TODO - Implement
+        close();
     }
 }

Modified: qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/MailServer.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/MailServer.java?rev=1366218&r1=1366217&r2=1366218&view=diff
==============================================================================
--- qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/MailServer.java (original)
+++ qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/MailServer.java Thu Jul 26 22:45:24 2012
@@ -32,13 +32,14 @@ public class MailServer
     private Driver _driver;
     private LogHandler _logger;
     private Listener<State> _listener;
+    private String[] _saslMechs = "ANONYMOUS".split(",");
     private int _counter;
     private Map<String,List<byte[]>> _mailboxes = new HashMap<String,List<byte[]>>();
 
     public MailServer() throws Exception
     {
         _logger = new Logger();
-        _driver = new DriverImpl(new ServerConnectorFactory(),_logger);
+        _driver = new DriverImpl(_logger);
         _listener = _driver.createListener("localhost", 5672, State.NEW);
     }
 
@@ -54,6 +55,7 @@ public class MailServer
         {
             _logger.info("Accepting Connection.");
             Connector<State> ctor = _listener.accept();
+            ctor.sasl().setMechanisms(_saslMechs);
             ctor.setContext(State.AUTHENTICATING);
         }
     }
@@ -85,6 +87,10 @@ public class MailServer
             // now generate any outbound network data generated in response to
             // any work done by the engine.
             ctor.process();
+            if (ctor.isClosed())
+            {
+                ctor.destroy();
+            }
 
             ctor = _driver.connector();
         }
@@ -181,7 +187,7 @@ public class MailServer
        Delivery delivery = con.getWorkHead();
        while (delivery != null)
        {
-           _logger.debug("Process delivery " + delivery.getTag());
+           _logger.debug("Process delivery " + String.valueOf(delivery.getTag()));
 
            if (delivery.isReadable())   // inbound data available
            {
@@ -193,14 +199,15 @@ public class MailServer
            }
 
            // check to see if the remote has accepted message we sent
-           if (delivery.remotelySettled())
+           if (delivery.getRemoteState() != null)
            {
+               _logger.debug("Remote has seen it, Settling delivery " + String.valueOf(delivery.getTag()));
                // once we know the remote has seen the message, we can
                // release the delivery.
                delivery.settle();
            }
 
-           delivery = con.getWorkHead();
+           delivery = delivery.getWorkNext();
        }
 
        // Step 3: Clean up any links or sessions that have been closed by the

Modified: qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/ServerConnectorImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/ServerConnectorImpl.java?rev=1366218&r1=1366217&r2=1366218&view=diff
==============================================================================
--- qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/ServerConnectorImpl.java (original)
+++ qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/ServerConnectorImpl.java Thu Jul 26 22:45:24 2012
@@ -1,6 +1,6 @@
 package org.apache.qpid.proton.driver.impl;
 
-import static org.apache.qpid.proton.driver.impl.ServerConnectorImpl.ConnectorState.NEW;
+import static org.apache.qpid.proton.driver.impl.ServerConnectorImpl.ConnectorState.UNINITIALIZED;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -23,16 +23,18 @@ class ServerConnectorImpl<C> implements 
     private static int writeBufferSize = Integer.getInteger
         ("pn.send_buffer_size", DEFAULT_BUFFER_SIZE);
 
-    enum ConnectorState {NEW, OPENED, CLOSED};
+    enum ConnectorState {UNINITIALIZED, OPENED, EOS, CLOSED};
 
-    private final SaslServerImpl _sasl;
+    private final Sasl _sasl;
     private final DriverImpl _driver;
+    private final Listener<C> _listener;
     private final SocketChannel _channel;
     private final LogHandler _logger;
     private C _context;
+
     private Connection _connection;
     private SelectionKey _key;
-    private ConnectorState _state = NEW;
+    private ConnectorState _state = UNINITIALIZED;
 
     private ByteBuffer _readBuffer = ByteBuffer.allocate(readBufferSize);
     private int _bytesNotRead = 0;
@@ -40,12 +42,12 @@ class ServerConnectorImpl<C> implements 
     private int _bytesNotWritten = 0;
     private ByteBuffer _writeBuffer = ByteBuffer.allocate(writeBufferSize);
 
-    ServerConnectorImpl(DriverImpl driver, SocketChannel c, C context, SelectionKey key)
+    ServerConnectorImpl(DriverImpl driver, Listener<C> listener, Sasl sasl, SocketChannel c, C context, SelectionKey key)
     {
         _driver = driver;
+        _listener = listener;
         _channel = c;
-        _sasl = new SaslServerImpl();
-        _sasl.setMechanisms(new String[]{"ANONYMOUS"}); //TODO
+        _sasl = sasl;
         _logger = driver.getLogHandler();
         _context = context;
         _key = key;
@@ -55,6 +57,7 @@ class ServerConnectorImpl<C> implements 
     {
         if (!_channel.isOpen())
         {
+            _state = ConnectorState.CLOSED;
             return;
         }
 
@@ -69,6 +72,65 @@ class ServerConnectorImpl<C> implements 
         }
     }
 
+    void read()
+    {
+        try
+        {
+            int  bytesRead = _channel.read(_readBuffer);
+            int consumed = 0;
+            while (bytesRead > 0)
+            {
+                consumed = processInput(_readBuffer.array(), 0, bytesRead + _bytesNotRead);
+                if (consumed < bytesRead)
+                {
+                    _readBuffer.compact();
+                    _bytesNotRead = bytesRead - consumed;
+                }
+                else
+                {
+                    _readBuffer.rewind();
+                    _bytesNotRead = 0;
+                }
+                bytesRead = _channel.read(_readBuffer);
+            }
+            if (bytesRead == -1)
+            {
+                _state = ConnectorState.EOS;
+            }
+        }
+        catch (IOException e)
+        {
+            e.printStackTrace();
+        }
+    }
+
+    void write()
+    {
+        try
+        {
+            processOutput();
+            if (_bytesNotWritten > 0)
+            {
+                _writeBuffer.limit(_bytesNotWritten);
+                int written = _channel.write(_writeBuffer);
+                if (_writeBuffer.hasRemaining())
+                {
+                    _writeBuffer.compact();
+                    _bytesNotWritten = _bytesNotWritten - written;
+                }
+                else
+                {
+                    _writeBuffer.clear();
+                    _bytesNotWritten = 0;
+                }
+            }
+        }
+        catch (IOException e)
+        {
+            e.printStackTrace();
+        }
+    }
+
     int processInput(byte[] bytes, int offset, int size)
     {
         int read = 0;
@@ -76,13 +138,14 @@ class ServerConnectorImpl<C> implements 
         {
             switch (_state)
             {
-            case NEW:
+            case UNINITIALIZED:
                 read += readSasl(bytes, offset, size);
-                writeSasl();
                 break;
             case OPENED:
                 read += readAMQPCommands(bytes, offset, size);
-                writeAMQPCommands();
+                break;
+            case EOS:
+            case CLOSED:
                 break;
             }
         }
@@ -93,16 +156,21 @@ class ServerConnectorImpl<C> implements 
     {
         switch (_state)
         {
-        case NEW:
+        case UNINITIALIZED:
             writeSasl();
             break;
         case OPENED:
             writeAMQPCommands();
             break;
+        case EOS:
+            writeAMQPCommands();
+        case CLOSED:  // not a valid option
+            //TODO
+            break;
         }
     }
 
-    private int readAMQPCommands(byte[] bytes, int offset, int size)
+    int readAMQPCommands(byte[] bytes, int offset, int size)
     {
         int consumed = _connection.transport().input(bytes, offset, size);
         if (consumed == END_OF_STREAM)
@@ -115,18 +183,13 @@ class ServerConnectorImpl<C> implements 
         }
     }
 
-    private void writeAMQPCommands()
+    void writeAMQPCommands()
     {
         int size = _writeBuffer.array().length - _bytesNotWritten;
         _bytesNotWritten += _connection.transport().output(_writeBuffer.array(),
                 _bytesNotWritten, size);
     }
 
-    private void setState(ConnectorState newState)
-    {
-        _state = newState;
-    }
-
     int readSasl(byte[] bytes, int offset, int size)
     {
         int consumed = _sasl.input(bytes, offset, size);
@@ -147,64 +210,9 @@ class ServerConnectorImpl<C> implements 
                 _bytesNotWritten, size);
     }
 
-    void write()
+    public Listener<C> listener()
     {
-        try
-        {
-            processOutput();
-            if (_bytesNotWritten > 0)
-            {
-                _writeBuffer.limit(_bytesNotWritten);
-                int written = _channel.write(_writeBuffer);
-                if (_writeBuffer.hasRemaining())
-                {
-                    _writeBuffer.compact();
-                    _bytesNotWritten = _bytesNotWritten - written;
-                }
-                else
-                {
-                    _writeBuffer.clear();
-                    _bytesNotWritten = 0;
-                }
-            }
-        }
-        catch (IOException e)
-        {
-            e.printStackTrace();
-        }
-    }
-
-    void read()
-    {
-        try
-        {
-            int  bytesRead = _channel.read(_readBuffer);
-            int consumed = 0;
-            while (bytesRead > 0)
-            {
-                consumed = processInput(_readBuffer.array(), 0, bytesRead + _bytesNotRead);
-                if (consumed < bytesRead)
-                {
-                    _readBuffer.compact();
-                    _bytesNotRead = bytesRead - consumed;
-                }
-                else
-                {
-                    _readBuffer.rewind();
-                    _bytesNotRead = 0;
-                }
-                bytesRead = _channel.read(_readBuffer);
-            }
-        }
-        catch (IOException e)
-        {
-            e.printStackTrace();
-        }
-    }
-
-    public Listener listener()
-    {
-        return null;  //TODO - Implement
+        return _listener;
     }
 
     public Sasl sasl()
@@ -219,17 +227,10 @@ class ServerConnectorImpl<C> implements 
 
     public void setConnection(Connection connection)
     {
-        if (_sasl.isDone())
-        {
-            writeSasl();
-        }
-        else
-        {
-            throw new RuntimeException("Cannot set the connection before authentication is completed");
-        }
-
+        // write any remaining data on to the wire.
+        writeSasl();
         _connection = connection;
-        // write any initial data
+        // write initial data
         int size = _writeBuffer.array().length - _bytesNotWritten;
         _bytesNotWritten += _connection.transport().output(_writeBuffer.array(),
                 _bytesNotWritten, size);
@@ -248,24 +249,38 @@ class ServerConnectorImpl<C> implements 
 
     public void close()
     {
+        if (_state == ConnectorState.CLOSED)
+        {
+            return;
+        }
+
         try
         {
+            // If the connection was closed due to authentication error
+            // then there might be data available to write on to the wire.
             writeSasl();
+            writeAMQPCommands(); // write any closing commands
             _channel.close();
+            _state = ConnectorState.CLOSED;
         }
         catch (IOException e)
         {
-
+            e.printStackTrace();
         }
     }
 
     public boolean isClosed()
     {
-        return !_channel.isOpen();
+        return _state == ConnectorState.EOS || _state == ConnectorState.CLOSED;
     }
 
     public void destroy()
     {
-        close();
+        close(); // close if not closed already
+    }
+
+    private void setState(ConnectorState newState)
+    {
+        _state = newState;
     }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org