You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ra...@apache.org on 2012/07/27 00:45:25 UTC
svn commit: r1366218 - in
/qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl:
DriverImpl.java ListenerImpl.java MailServer.java ServerConnectorImpl.java
Author: rajith
Date: Thu Jul 26 22:45:24 2012
New Revision: 1366218
URL: http://svn.apache.org/viewvc?rev=1366218&view=rev
Log:
QPID-4312 Cleaned up the code a bit by removing unnessacery bits and
rearrainging parts of the code to improve readability.
Modified:
qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/DriverImpl.java
qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/ListenerImpl.java
qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/MailServer.java
qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/ServerConnectorImpl.java
Modified: qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/DriverImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/DriverImpl.java?rev=1366218&r1=1366217&r2=1366218&view=diff
==============================================================================
--- qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/DriverImpl.java (original)
+++ qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/DriverImpl.java Thu Jul 26 22:45:24 2012
@@ -37,18 +37,18 @@ import java.util.Set;
import org.apache.qpid.proton.driver.Connector;
import org.apache.qpid.proton.driver.Driver;
import org.apache.qpid.proton.driver.Listener;
+import org.apache.qpid.proton.engine.impl.SaslClientImpl;
+import org.apache.qpid.proton.engine.impl.SaslServerImpl;
import org.apache.qpid.proton.logging.LogHandler;
public class DriverImpl implements Driver
{
private Selector _selector;
private Set<SelectionKey> _selectedKeys = Collections.emptySet();
- private ConnectorFactory _connectorFactory;
private LogHandler _logger;
- public DriverImpl(ConnectorFactory connectorFactory, LogHandler logger) throws IOException
+ public DriverImpl(LogHandler logger) throws IOException
{
- _connectorFactory = connectorFactory;
_logger = logger;
_selector = Selector.open();
}
@@ -192,17 +192,10 @@ public class DriverImpl implements Drive
public <C> Listener<C> createListener(ServerSocketChannel c, C context)
{
- try
- {
- Listener<C> l = new ListenerImpl<C>(this, c, context);
- c.register(_selector, SelectionKey.OP_ACCEPT,l);
- return l;
- }
- catch (ClosedChannelException e)
- {
- e.printStackTrace(); // TODO - Implement
- throw new RuntimeException(e);
- }
+ Listener<C> l = new ListenerImpl<C>(this, c, context);
+ SelectionKey key = registerInterest(c,SelectionKey.OP_ACCEPT);
+ key.attach(l);
+ return l;
}
public <C> Connector<C> createConnector(String host, int port, C context)
@@ -225,13 +218,25 @@ public class DriverImpl implements Drive
public <C> Connector<C> createConnector(SelectableChannel c, C context)
{
+ SelectionKey key = registerInterest(c,SelectionKey.OP_READ | SelectionKey.OP_WRITE);
+ Connector<C> co = new ServerConnectorImpl<C>(this, null, new SaslClientImpl(),(SocketChannel)c, context, key);
+ key.attach(co);
+ return co;
+ }
+
+ protected <C> Connector<C> createServerConnector(SelectableChannel c, C context, Listener<C> l)
+ {
+ SelectionKey key = registerInterest(c,SelectionKey.OP_READ | SelectionKey.OP_WRITE);
+ Connector<C> co = new ServerConnectorImpl<C>(this, l, new SaslServerImpl(),(SocketChannel)c, context, key);
+ key.attach(co);
+ return co;
+ }
+
+ private <C> SelectionKey registerInterest(SelectableChannel c, int opKeys)
+ {
try
{
- int opKeys = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
- SelectionKey key = c.register(_selector, opKeys);
- Connector<C> co = _connectorFactory.createConnector(this, (SocketChannel)c, context, key);
- key.attach(co);
- return co;
+ return c.register(_selector, opKeys);
}
catch (ClosedChannelException e)
{
@@ -240,6 +245,7 @@ public class DriverImpl implements Drive
}
}
+
protected LogHandler getLogHandler()
{
return _logger;
Modified: qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/ListenerImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/ListenerImpl.java?rev=1366218&r1=1366217&r2=1366218&view=diff
==============================================================================
--- qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/ListenerImpl.java (original)
+++ qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/ListenerImpl.java Thu Jul 26 22:45:24 2012
@@ -39,7 +39,7 @@ class ListenerImpl<C> implements Listene
_context = context;
}
- public Connector accept()
+ public Connector<C> accept()
{
try
{
@@ -47,7 +47,7 @@ class ListenerImpl<C> implements Listene
if(c != null)
{
c.configureBlocking(false);
- return _driver.createConnector(c, _context);
+ return _driver.createServerConnector(c, _context, this);
}
}
catch (IOException e)
@@ -64,11 +64,19 @@ class ListenerImpl<C> implements Listene
public void close()
{
- //TODO - Implement
+ try
+ {
+ _channel.close();
+ }
+ catch (IOException e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
}
public void destroy()
{
- //TODO - Implement
+ close();
}
}
Modified: qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/MailServer.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/MailServer.java?rev=1366218&r1=1366217&r2=1366218&view=diff
==============================================================================
--- qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/MailServer.java (original)
+++ qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/MailServer.java Thu Jul 26 22:45:24 2012
@@ -32,13 +32,14 @@ public class MailServer
private Driver _driver;
private LogHandler _logger;
private Listener<State> _listener;
+ private String[] _saslMechs = "ANONYMOUS".split(",");
private int _counter;
private Map<String,List<byte[]>> _mailboxes = new HashMap<String,List<byte[]>>();
public MailServer() throws Exception
{
_logger = new Logger();
- _driver = new DriverImpl(new ServerConnectorFactory(),_logger);
+ _driver = new DriverImpl(_logger);
_listener = _driver.createListener("localhost", 5672, State.NEW);
}
@@ -54,6 +55,7 @@ public class MailServer
{
_logger.info("Accepting Connection.");
Connector<State> ctor = _listener.accept();
+ ctor.sasl().setMechanisms(_saslMechs);
ctor.setContext(State.AUTHENTICATING);
}
}
@@ -85,6 +87,10 @@ public class MailServer
// now generate any outbound network data generated in response to
// any work done by the engine.
ctor.process();
+ if (ctor.isClosed())
+ {
+ ctor.destroy();
+ }
ctor = _driver.connector();
}
@@ -181,7 +187,7 @@ public class MailServer
Delivery delivery = con.getWorkHead();
while (delivery != null)
{
- _logger.debug("Process delivery " + delivery.getTag());
+ _logger.debug("Process delivery " + String.valueOf(delivery.getTag()));
if (delivery.isReadable()) // inbound data available
{
@@ -193,14 +199,15 @@ public class MailServer
}
// check to see if the remote has accepted message we sent
- if (delivery.remotelySettled())
+ if (delivery.getRemoteState() != null)
{
+ _logger.debug("Remote has seen it, Settling delivery " + String.valueOf(delivery.getTag()));
// once we know the remote has seen the message, we can
// release the delivery.
delivery.settle();
}
- delivery = con.getWorkHead();
+ delivery = delivery.getWorkNext();
}
// Step 3: Clean up any links or sessions that have been closed by the
Modified: qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/ServerConnectorImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/ServerConnectorImpl.java?rev=1366218&r1=1366217&r2=1366218&view=diff
==============================================================================
--- qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/ServerConnectorImpl.java (original)
+++ qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/ServerConnectorImpl.java Thu Jul 26 22:45:24 2012
@@ -1,6 +1,6 @@
package org.apache.qpid.proton.driver.impl;
-import static org.apache.qpid.proton.driver.impl.ServerConnectorImpl.ConnectorState.NEW;
+import static org.apache.qpid.proton.driver.impl.ServerConnectorImpl.ConnectorState.UNINITIALIZED;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -23,16 +23,18 @@ class ServerConnectorImpl<C> implements
private static int writeBufferSize = Integer.getInteger
("pn.send_buffer_size", DEFAULT_BUFFER_SIZE);
- enum ConnectorState {NEW, OPENED, CLOSED};
+ enum ConnectorState {UNINITIALIZED, OPENED, EOS, CLOSED};
- private final SaslServerImpl _sasl;
+ private final Sasl _sasl;
private final DriverImpl _driver;
+ private final Listener<C> _listener;
private final SocketChannel _channel;
private final LogHandler _logger;
private C _context;
+
private Connection _connection;
private SelectionKey _key;
- private ConnectorState _state = NEW;
+ private ConnectorState _state = UNINITIALIZED;
private ByteBuffer _readBuffer = ByteBuffer.allocate(readBufferSize);
private int _bytesNotRead = 0;
@@ -40,12 +42,12 @@ class ServerConnectorImpl<C> implements
private int _bytesNotWritten = 0;
private ByteBuffer _writeBuffer = ByteBuffer.allocate(writeBufferSize);
- ServerConnectorImpl(DriverImpl driver, SocketChannel c, C context, SelectionKey key)
+ ServerConnectorImpl(DriverImpl driver, Listener<C> listener, Sasl sasl, SocketChannel c, C context, SelectionKey key)
{
_driver = driver;
+ _listener = listener;
_channel = c;
- _sasl = new SaslServerImpl();
- _sasl.setMechanisms(new String[]{"ANONYMOUS"}); //TODO
+ _sasl = sasl;
_logger = driver.getLogHandler();
_context = context;
_key = key;
@@ -55,6 +57,7 @@ class ServerConnectorImpl<C> implements
{
if (!_channel.isOpen())
{
+ _state = ConnectorState.CLOSED;
return;
}
@@ -69,6 +72,65 @@ class ServerConnectorImpl<C> implements
}
}
+ void read()
+ {
+ try
+ {
+ int bytesRead = _channel.read(_readBuffer);
+ int consumed = 0;
+ while (bytesRead > 0)
+ {
+ consumed = processInput(_readBuffer.array(), 0, bytesRead + _bytesNotRead);
+ if (consumed < bytesRead)
+ {
+ _readBuffer.compact();
+ _bytesNotRead = bytesRead - consumed;
+ }
+ else
+ {
+ _readBuffer.rewind();
+ _bytesNotRead = 0;
+ }
+ bytesRead = _channel.read(_readBuffer);
+ }
+ if (bytesRead == -1)
+ {
+ _state = ConnectorState.EOS;
+ }
+ }
+ catch (IOException e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ void write()
+ {
+ try
+ {
+ processOutput();
+ if (_bytesNotWritten > 0)
+ {
+ _writeBuffer.limit(_bytesNotWritten);
+ int written = _channel.write(_writeBuffer);
+ if (_writeBuffer.hasRemaining())
+ {
+ _writeBuffer.compact();
+ _bytesNotWritten = _bytesNotWritten - written;
+ }
+ else
+ {
+ _writeBuffer.clear();
+ _bytesNotWritten = 0;
+ }
+ }
+ }
+ catch (IOException e)
+ {
+ e.printStackTrace();
+ }
+ }
+
int processInput(byte[] bytes, int offset, int size)
{
int read = 0;
@@ -76,13 +138,14 @@ class ServerConnectorImpl<C> implements
{
switch (_state)
{
- case NEW:
+ case UNINITIALIZED:
read += readSasl(bytes, offset, size);
- writeSasl();
break;
case OPENED:
read += readAMQPCommands(bytes, offset, size);
- writeAMQPCommands();
+ break;
+ case EOS:
+ case CLOSED:
break;
}
}
@@ -93,16 +156,21 @@ class ServerConnectorImpl<C> implements
{
switch (_state)
{
- case NEW:
+ case UNINITIALIZED:
writeSasl();
break;
case OPENED:
writeAMQPCommands();
break;
+ case EOS:
+ writeAMQPCommands();
+ case CLOSED: // not a valid option
+ //TODO
+ break;
}
}
- private int readAMQPCommands(byte[] bytes, int offset, int size)
+ int readAMQPCommands(byte[] bytes, int offset, int size)
{
int consumed = _connection.transport().input(bytes, offset, size);
if (consumed == END_OF_STREAM)
@@ -115,18 +183,13 @@ class ServerConnectorImpl<C> implements
}
}
- private void writeAMQPCommands()
+ void writeAMQPCommands()
{
int size = _writeBuffer.array().length - _bytesNotWritten;
_bytesNotWritten += _connection.transport().output(_writeBuffer.array(),
_bytesNotWritten, size);
}
- private void setState(ConnectorState newState)
- {
- _state = newState;
- }
-
int readSasl(byte[] bytes, int offset, int size)
{
int consumed = _sasl.input(bytes, offset, size);
@@ -147,64 +210,9 @@ class ServerConnectorImpl<C> implements
_bytesNotWritten, size);
}
- void write()
+ public Listener<C> listener()
{
- try
- {
- processOutput();
- if (_bytesNotWritten > 0)
- {
- _writeBuffer.limit(_bytesNotWritten);
- int written = _channel.write(_writeBuffer);
- if (_writeBuffer.hasRemaining())
- {
- _writeBuffer.compact();
- _bytesNotWritten = _bytesNotWritten - written;
- }
- else
- {
- _writeBuffer.clear();
- _bytesNotWritten = 0;
- }
- }
- }
- catch (IOException e)
- {
- e.printStackTrace();
- }
- }
-
- void read()
- {
- try
- {
- int bytesRead = _channel.read(_readBuffer);
- int consumed = 0;
- while (bytesRead > 0)
- {
- consumed = processInput(_readBuffer.array(), 0, bytesRead + _bytesNotRead);
- if (consumed < bytesRead)
- {
- _readBuffer.compact();
- _bytesNotRead = bytesRead - consumed;
- }
- else
- {
- _readBuffer.rewind();
- _bytesNotRead = 0;
- }
- bytesRead = _channel.read(_readBuffer);
- }
- }
- catch (IOException e)
- {
- e.printStackTrace();
- }
- }
-
- public Listener listener()
- {
- return null; //TODO - Implement
+ return _listener;
}
public Sasl sasl()
@@ -219,17 +227,10 @@ class ServerConnectorImpl<C> implements
public void setConnection(Connection connection)
{
- if (_sasl.isDone())
- {
- writeSasl();
- }
- else
- {
- throw new RuntimeException("Cannot set the connection before authentication is completed");
- }
-
+ // write any remaining data on to the wire.
+ writeSasl();
_connection = connection;
- // write any initial data
+ // write initial data
int size = _writeBuffer.array().length - _bytesNotWritten;
_bytesNotWritten += _connection.transport().output(_writeBuffer.array(),
_bytesNotWritten, size);
@@ -248,24 +249,38 @@ class ServerConnectorImpl<C> implements
public void close()
{
+ if (_state == ConnectorState.CLOSED)
+ {
+ return;
+ }
+
try
{
+ // If the connection was closed due to authentication error
+ // then there might be data available to write on to the wire.
writeSasl();
+ writeAMQPCommands(); // write any closing commands
_channel.close();
+ _state = ConnectorState.CLOSED;
}
catch (IOException e)
{
-
+ e.printStackTrace();
}
}
public boolean isClosed()
{
- return !_channel.isOpen();
+ return _state == ConnectorState.EOS || _state == ConnectorState.CLOSED;
}
public void destroy()
{
- close();
+ close(); // close if not closed already
+ }
+
+ private void setState(ConnectorState newState)
+ {
+ _state = newState;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org