You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2008/10/09 19:08:01 UTC

svn commit: r703208 [2/2] - in /incubator/qpid/trunk/qpid/java: client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/ client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/ client/example/src/main/java/org/apache/qp...

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java?rev=703208&r1=703207&r2=703208&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java Thu Oct  9 10:07:59 2008
@@ -20,14 +20,22 @@
  */
 package org.apache.qpid.transport;
 
+import org.apache.qpid.transport.network.ConnectionBinding;
+import org.apache.qpid.transport.network.io.IoTransport;
 import org.apache.qpid.transport.util.Logger;
+import org.apache.qpid.transport.util.Waiter;
 
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import java.nio.ByteBuffer;
+import java.util.UUID;
+
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslServer;
+
+import static org.apache.qpid.transport.Connection.State.*;
 
 
 /**
@@ -44,23 +52,164 @@
     implements Receiver<ProtocolEvent>, Sender<ProtocolEvent>
 {
 
+    enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD }
+
     private static final Logger log = Logger.get(Connection.class);
 
-    final private Sender<ProtocolEvent> sender;
-    final private ConnectionDelegate delegate;
+    class DefaultConnectionListener implements ConnectionListener
+    {
+        public void opened(Connection conn) {}
+        public void exception(Connection conn, ConnectionException exception)
+        {
+            throw exception;
+        }
+        public void closed(Connection conn) {}
+    }
+
+    private ConnectionDelegate delegate;
+    private Sender<ProtocolEvent> sender;
+
+    final private Map<Integer,Channel> channels = new HashMap<Integer,Channel>();
+
+    private State state = NEW;
+    private Object lock = new Object();
+    private long timeout = 60000;
+    private ConnectionListener listener = new DefaultConnectionListener();
+    private Throwable error = null;
+
     private int channelMax = 1;
+    private String locale;
+    private SaslServer saslServer;
+    private SaslClient saslClient;
+
     // want to make this final
     private int _connectionId;
 
-    final private Map<Integer,Channel> channels = new HashMap<Integer,Channel>();
+    public Connection() {}
 
-    public Connection(Sender<ProtocolEvent> sender,
-                      ConnectionDelegate delegate)
+    public void setConnectionDelegate(ConnectionDelegate delegate)
     {
-        this.sender = sender;
         this.delegate = delegate;
     }
 
+    public void setConnectionListener(ConnectionListener listener)
+    {
+        if (listener == null)
+        {
+            this.listener = new DefaultConnectionListener();
+        }
+        else
+        {
+            this.listener = listener;
+        }
+    }
+
+    public Sender<ProtocolEvent> getSender()
+    {
+        return sender;
+    }
+
+    public void setSender(Sender<ProtocolEvent> sender)
+    {
+        this.sender = sender;
+    }
+
+    void setState(State state)
+    {
+        synchronized (lock)
+        {
+            this.state = state;
+            lock.notifyAll();
+        }
+    }
+
+    void setLocale(String locale)
+    {
+        this.locale = locale;
+    }
+
+    String getLocale()
+    {
+        return locale;
+    }
+
+    void setSaslServer(SaslServer saslServer)
+    {
+        this.saslServer = saslServer;
+    }
+
+    SaslServer getSaslServer()
+    {
+        return saslServer;
+    }
+
+    void setSaslClient(SaslClient saslClient)
+    {
+        this.saslClient = saslClient;
+    }
+
+    SaslClient getSaslClient()
+    {
+        return saslClient;
+    }
+
+    public void connect(String host, int port, String vhost, String username, String password)
+    {
+        synchronized (lock)
+        {
+            state = OPENING;
+
+            delegate = new ClientDelegate(vhost, username, password);
+
+            IoTransport.connect(host, port, ConnectionBinding.get(this));
+            send(new ProtocolHeader(1, 0, 10));
+
+            Waiter w = new Waiter(lock, timeout);
+            while (w.hasTime() && state == OPENING && error == null)
+            {
+                w.await();
+            }
+
+            if (error != null)
+            {
+                Throwable t = error;
+                error = null;
+                close();
+                throw new ConnectionException(t);
+            }
+
+            switch (state)
+            {
+            case OPENING:
+                close();
+                throw new ConnectionException("connect() timed out");
+            case OPEN:
+                break;
+            case CLOSED:
+                throw new ConnectionException("connect() aborted");
+            default:
+                throw new IllegalStateException(String.valueOf(state));
+            }
+        }
+
+        listener.opened(this);
+    }
+
+    public Session createSession()
+    {
+        return createSession(0);
+    }
+
+    public Session createSession(long expiryInSeconds)
+    {
+        Channel ch = getChannel();
+        Session ssn = new Session(UUID.randomUUID().toString().getBytes());
+        ssn.attach(ch);
+        ssn.sessionAttach(ssn.getName());
+        ssn.sessionRequestTimeout(expiryInSeconds);
+        return ssn;
+    }
+
     public void setConnectionId(int id)
     {
         _connectionId = id;
@@ -86,7 +235,12 @@
     public void send(ProtocolEvent event)
     {
         log.debug("SEND: [%s] %s", this, event);
-        sender.send(event);
+        Sender s = sender;
+        if (s == null)
+        {
+            throw new ConnectionException("connection closed");
+        }
+        s.send(event);
     }
 
     public void flush()
@@ -107,7 +261,7 @@
 
     public Channel getChannel()
     {
-        synchronized (channels)
+        synchronized (lock)
         {
             for (int i = 0; i < getChannelMax(); i++)
             {
@@ -123,7 +277,7 @@
 
     public Channel getChannel(int number)
     {
-        synchronized (channels)
+        synchronized (lock)
         {
             Channel channel = channels.get(number);
             if (channel == null)
@@ -137,45 +291,146 @@
 
     void removeChannel(int number)
     {
-        synchronized (channels)
+        synchronized (lock)
         {
             channels.remove(number);
         }
     }
 
+    public void exception(ConnectionException e)
+    {
+        synchronized (lock)
+        {
+            switch (state)
+            {
+            case OPENING:
+            case CLOSING:
+                error = e;
+                lock.notifyAll();
+                break;
+            default:
+                listener.exception(this, e);
+                break;
+            }
+        }
+    }
+
     public void exception(Throwable t)
     {
-        delegate.exception(t);
+        synchronized (lock)
+        {
+            switch (state)
+            {
+            case OPENING:
+            case CLOSING:
+                error = t;
+                lock.notifyAll();
+                break;
+            default:
+                listener.exception(this, new ConnectionException(t));
+                break;
+            }
+        }
     }
 
     void closeCode(ConnectionClose close)
     {
-        synchronized (channels)
+        synchronized (lock)
         {
             for (Channel ch : channels.values())
             {
                 ch.closeCode(close);
             }
+            ConnectionCloseCode code = close.getReplyCode();
+            if (code != ConnectionCloseCode.NORMAL)
+            {
+                exception(new ConnectionException(close));
+            }
         }
     }
 
     public void closed()
     {
         log.debug("connection closed: %s", this);
-        synchronized (channels)
+
+        if (state == OPEN)
+        {
+            exception(new ConnectionException("connection aborted"));
+        }
+
+        synchronized (lock)
         {
             List<Channel> values = new ArrayList<Channel>(channels.values());
             for (Channel ch : values)
             {
                 ch.closed();
             }
+
+            sender = null;
+            setState(CLOSED);
         }
-        delegate.closed();
+
+        listener.closed(this);
     }
 
     public void close()
     {
-        sender.close();
+        synchronized (lock)
+        {
+            switch (state)
+            {
+            case OPEN:
+                Channel ch = getChannel(0);
+                state = CLOSING;
+                ch.connectionClose(ConnectionCloseCode.NORMAL, null);
+                Waiter w = new Waiter(lock, timeout);
+                while (w.hasTime() && state == CLOSING && error == null)
+                {
+                    w.await();
+                }
+
+                if (error != null)
+                {
+                    close();
+                    throw new ConnectionException(error);
+                }
+
+                switch (state)
+                {
+                case CLOSING:
+                    close();
+                    throw new ConnectionException("close() timed out");
+                case CLOSED:
+                    break;
+                default:
+                    throw new IllegalStateException(String.valueOf(state));
+                }
+                break;
+            case CLOSED:
+                break;
+            default:
+                if (sender != null)
+                {
+                    sender.close();
+                    w = new Waiter(lock, timeout);
+                    while (w.hasTime() && sender != null && error == null)
+                    {
+                        w.await();
+                    }
+
+                    if (error != null)
+                    {
+                        throw new ConnectionException(error);
+                    }
+
+                    if (sender != null)
+                    {
+                        throw new ConnectionException("close() timed out");
+                    }
+                }
+                break;
+            }
+        }
     }
 
     public String toString()

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java?rev=703208&r1=703207&r2=703208&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java Thu Oct  9 10:07:59 2008
@@ -22,22 +22,7 @@
 
 import org.apache.qpid.transport.util.Logger;
 
-import org.apache.qpid.SecurityHelper;
-import org.apache.qpid.QpidException;
-
-import java.io.UnsupportedEncodingException;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-
-import javax.security.sasl.Sasl;
-import javax.security.sasl.SaslClient;
-import javax.security.sasl.SaslException;
-import javax.security.sasl.SaslServer;
+import static org.apache.qpid.transport.Connection.State.*;
 
 
 /**
@@ -57,231 +42,26 @@
 
     private static final Logger log = Logger.get(ConnectionDelegate.class);
 
-    private String _username = "guest";
-    private String _password = "guest";;
-    private String _mechanism;
-    private String _virtualHost;
-    private SaslClient saslClient;
-    private SaslServer saslServer;
-    private String _locale = "utf8";
-    private int maxFrame = 64*1024;
-    private Condition _negotiationComplete;
-    private Lock _negotiationCompleteLock;
-
-    public abstract SessionDelegate getSessionDelegate();
-
-    public abstract void exception(Throwable t);
-
-    public abstract void closed();
-
-    public void setCondition(Lock negotiationCompleteLock,Condition negotiationComplete)
-    {
-        _negotiationComplete = negotiationComplete;
-        _negotiationCompleteLock = negotiationCompleteLock;
-    }
-
-    public void init(Channel ch, ProtocolHeader hdr)
-    {
-        ch.getConnection().send(new ProtocolHeader (1, hdr.getMajor(), hdr.getMinor()));
-        List<Object> plain = new ArrayList<Object>();
-        plain.add("PLAIN");
-        List<Object> utf8 = new ArrayList<Object>();
-        utf8.add("utf8");
-        ch.connectionStart(null, plain, utf8);
-    }
-
-    // ----------------------------------------------
-    //           Client side
-    //-----------------------------------------------
-    @Override public void connectionStart(Channel context, ConnectionStart struct)
-    {
-        String mechanism = null;
-        byte[] response = null;
-        try
-        {
-            mechanism = SecurityHelper.chooseMechanism(struct.getMechanisms());
-            saslClient = Sasl.createSaslClient(new String[]{ mechanism },null, "AMQP", "localhost", null,
-                                                  SecurityHelper.createCallbackHandler(mechanism,_username,_password ));
-            response = saslClient.evaluateChallenge(new byte[0]);
-        }
-        catch (UnsupportedEncodingException e)
-        {
-           // need error handling
-        }
-        catch (SaslException e)
-        {
-          // need error handling
-        }
-        catch (QpidException e)
-        {
-          //  need error handling
-        }
-
-        Map<String,Object> props = new HashMap<String,Object>();
-        context.connectionStartOk(props, mechanism, response, _locale);
-    }
-
-    @Override public void connectionSecure(Channel context, ConnectionSecure struct)
-    {
-        try
-        {
-            byte[] response = saslClient.evaluateChallenge(struct.getChallenge());
-            context.connectionSecureOk(response);
-        }
-        catch (SaslException e)
-        {
-          // need error handling
-        }
-    }
-
-    @Override public void connectionTune(Channel context, ConnectionTune struct)
-    {
-        context.getConnection().setChannelMax(struct.getChannelMax());
-        context.connectionTuneOk(struct.getChannelMax(), struct.getMaxFrameSize(), struct.getHeartbeatMax());
-        context.connectionOpen(_virtualHost, null, Option.INSIST);
-    }
-
-
-    @Override public void connectionOpenOk(Channel context, ConnectionOpenOk struct)
-    {
-        List<Object> knownHosts = struct.getKnownHosts();
-        if(_negotiationCompleteLock != null)
-        {
-            _negotiationCompleteLock.lock();
-            try
-            {
-                _negotiationComplete.signalAll();
-            }
-            finally
-            {
-                _negotiationCompleteLock.unlock();
-            }
-        }
-    }
-
-    public void connectionRedirect(Channel context, ConnectionRedirect struct)
-    {
-        // not going to bother at the moment
-    }
-
-    //  ----------------------------------------------
-    //           Server side
-    //-----------------------------------------------
-    @Override public void connectionStartOk(Channel context, ConnectionStartOk struct)
-    {
-        //set the client side locale on the server side
-        _locale = struct.getLocale();
-        _mechanism = struct.getMechanism();
-
-        //try
-        //{
-            //saslServer = Sasl.createSaslServer(_mechanism, "AMQP", "ABC",null,SecurityHelper.createCallbackHandler(_mechanism,_username,_password));
-            //byte[] challenge = saslServer.evaluateResponse(struct.getResponse().getBytes());
-            byte[] challenge = null;
-            if ( challenge == null)
-            {
-                context.connectionTune(Integer.MAX_VALUE, maxFrame, 0, Integer.MAX_VALUE);
-            }
-            else
-            {
-                try
-                {
-                    context.connectionSecure(challenge);
-                }
-                catch(Exception e)
-                {
-
-                }
-            }
-
-
-        /*}
-        catch (SaslException e)
-        {
-          // need error handling
-        }
-        catch (QpidException e)
-        {
-          //  need error handling
-        }*/
-    }
-
-    @Override public void connectionSecureOk(Channel context, ConnectionSecureOk struct)
+    public SessionDelegate getSessionDelegate()
     {
-        try
-        {
-            saslServer = Sasl.createSaslServer(_mechanism, "AMQP", "ABC",new HashMap(),SecurityHelper.createCallbackHandler(_mechanism,_username,_password));
-            byte[] challenge = saslServer.evaluateResponse(struct.getResponse());
-            if ( challenge == null)
-            {
-                context.connectionTune(Integer.MAX_VALUE, maxFrame, 0, Integer.MAX_VALUE);
-            }
-            else
-            {
-                try
-                {
-                    context.connectionSecure(challenge);
-                }
-                catch(Exception e)
-                {
-
-                }
-            }
-
-
-        }
-        catch (SaslException e)
-        {
-          // need error handling
-        }
-        catch (QpidException e)
-        {
-          //  need error handling
-        }
+        return new SessionDelegate();
     }
 
-
-    @Override public void connectionOpen(Channel context, ConnectionOpen struct)
-    {
-        List<Object> hosts = new ArrayList<Object>();
-        hosts.add("amqp:1223243232325");
-        context.connectionOpenOk(hosts);
-    }
+    public abstract void init(Channel ch, ProtocolHeader hdr);
 
     @Override public void connectionClose(Channel ch, ConnectionClose close)
     {
-        ch.getConnection().closeCode(close);
+        Connection conn = ch.getConnection();
         ch.connectionCloseOk();
+        conn.getSender().close();
+        conn.closeCode(close);
+        conn.setState(CLOSE_RCVD);
     }
 
-    public String getPassword()
-    {
-        return _password;
-    }
-
-    public void setPassword(String password)
-    {
-        _password = password;
-    }
-
-    public String getUsername()
-    {
-        return _username;
-    }
-
-    public void setUsername(String username)
-    {
-        _username = username;
-    }
-
-    public String getVirtualHost()
-    {
-        return _virtualHost;
-    }
-
-    public void setVirtualHost(String host)
+    @Override public void connectionCloseOk(Channel ch, ConnectionCloseOk ok)
     {
-        _virtualHost = host;
+        Connection conn = ch.getConnection();
+        conn.getSender().close();
     }
 
 }

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionException.java?rev=703208&r1=703207&r2=703208&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionException.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionException.java Thu Oct  9 10:07:59 2008
@@ -26,17 +26,37 @@
  *
  */
 
-public class ConnectionException extends RuntimeException
+public class ConnectionException extends TransportException
 {
 
     private ConnectionClose close;
 
-    public ConnectionException(ConnectionClose close)
+    public ConnectionException(String message, ConnectionClose close, Throwable cause)
     {
-        super(close.getReplyText());
+        super(message, cause);
         this.close = close;
     }
 
+    public ConnectionException(String message)
+    {
+        this(message, null, null);
+    }
+
+    public ConnectionException(String message, Throwable cause)
+    {
+        this(message, null, cause);
+    }
+
+    public ConnectionException(Throwable cause)
+    {
+        this(cause.getMessage(), null, cause);
+    }
+
+    public ConnectionException(ConnectionClose close)
+    {
+        this(close.getReplyText(), close, null);
+    }
+
     public ConnectionClose getClose()
     {
         return close;

Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionListener.java?rev=703208&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionListener.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionListener.java Thu Oct  9 10:07:59 2008
@@ -0,0 +1,38 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport;
+
+
+/**
+ * ConnectionListener
+ *
+ */
+
+public interface ConnectionListener
+{
+
+    void opened(Connection connection);
+
+    void exception(Connection connection, ConnectionException exception);
+
+    void closed(Connection connection);
+
+}

Propchange: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionListener.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java?rev=703208&r1=703207&r2=703208&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java Thu Oct  9 10:07:59 2008
@@ -43,25 +43,16 @@
 
     public static final void main(String[] args) throws IOException
     {
-        ConnectionDelegate delegate = new ConnectionDelegate()
+        ConnectionDelegate delegate = new ServerDelegate()
         {
             public SessionDelegate getSessionDelegate()
             {
                 return new Echo();
             }
-            public void exception(Throwable t)
-            {
-                t.printStackTrace();
-            }
-            public void closed() {}
         };
 
-        //hack
-        delegate.setUsername("guest");
-        delegate.setPassword("guest");
-
         IoAcceptor ioa = new IoAcceptor
-            ("0.0.0.0", 5672, new ConnectionBinding(delegate));
+            ("0.0.0.0", 5672, ConnectionBinding.get(delegate));
         ioa.start();
     }
 

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolVersionException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolVersionException.java?rev=703208&r1=703207&r2=703208&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolVersionException.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolVersionException.java Thu Oct  9 10:07:59 2008
@@ -26,7 +26,7 @@
  *
  */
 
-public final class ProtocolVersionException extends TransportException
+public final class ProtocolVersionException extends ConnectionException
 {
 
     private final byte major;

Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java?rev=703208&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java Thu Oct  9 10:07:59 2008
@@ -0,0 +1,145 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport;
+
+import java.util.Collections;
+
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+
+import java.io.UnsupportedEncodingException;
+
+import org.apache.qpid.QpidException;
+
+import org.apache.qpid.SecurityHelper;
+
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+
+
+import static org.apache.qpid.transport.Connection.State.*;
+
+
+/**
+ * ServerDelegate
+ *
+ */
+
+public class ServerDelegate extends ConnectionDelegate
+{
+
+    private SaslServer saslServer;
+
+    public void init(Channel ch, ProtocolHeader hdr)
+    {
+        Connection conn = ch.getConnection();
+        conn.send(new ProtocolHeader(1, 0, 10));
+        List<Object> utf8 = new ArrayList<Object>();
+        utf8.add("utf8");
+        ch.connectionStart(null, Collections.EMPTY_LIST, utf8);
+    }
+
+    @Override public void connectionStartOk(Channel ch, ConnectionStartOk ok)
+    {
+        Connection conn = ch.getConnection();
+        conn.setLocale(ok.getLocale());
+        String mechanism = ok.getMechanism();
+
+        if (mechanism == null || mechanism.length() == 0)
+        {
+            ch.connectionTune
+                (Integer.MAX_VALUE,
+                 org.apache.qpid.transport.network.ConnectionBinding.MAX_FRAME_SIZE,
+                 0, Integer.MAX_VALUE);
+            return;
+        }
+
+        try
+        {
+            SaslServer ss = Sasl.createSaslServer
+                (mechanism, "AMQP", "localhost", null, null);
+            if (ss == null)
+            {
+                ch.connectionClose
+                    (ConnectionCloseCode.CONNECTION_FORCED,
+                     "null SASL mechanism: " + mechanism);
+                return;
+            }
+            conn.setSaslServer(ss);
+            secure(ch, ok.getResponse());
+        }
+        catch (SaslException e)
+        {
+            conn.exception(e);
+        }
+    }
+
+    private void secure(Channel ch, byte[] response)
+    {
+        Connection conn = ch.getConnection();
+        SaslServer ss = conn.getSaslServer();
+        try
+        {
+            byte[] challenge = ss.evaluateResponse(response);
+            if (ss.isComplete())
+            {
+                ss.dispose();
+                ch.connectionTune
+                    (Integer.MAX_VALUE,
+                     org.apache.qpid.transport.network.ConnectionBinding.MAX_FRAME_SIZE,
+                     0, Integer.MAX_VALUE);
+            }
+            else
+            {
+                ch.connectionSecure(challenge);
+            }
+        }
+        catch (SaslException e)
+        {
+            conn.exception(e);
+        }
+    }
+
+    @Override public void connectionSecureOk(Channel ch, ConnectionSecureOk ok)
+    {
+        secure(ch, ok.getResponse());
+    }
+
+    @Override public void connectionTuneOk(Channel ch, ConnectionTuneOk ok)
+    {
+        
+    }
+
+    @Override public void connectionOpen(Channel ch, ConnectionOpen open)
+    {
+        Connection conn = ch.getConnection();
+        ch.connectionOpenOk(Collections.EMPTY_LIST);
+        conn.setState(OPEN);
+    }
+
+}

Propchange: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java?rev=703208&r1=703207&r2=703208&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java Thu Oct  9 10:07:59 2008
@@ -49,6 +49,26 @@
 
     private static final Logger log = Logger.get(Session.class);
 
+    class DefaultSessionListener implements SessionListener
+    {
+
+        public void opened(Session ssn) {}
+
+        public void message(Session ssn, MessageTransfer xfr)
+        {
+            log.info("message: %s", xfr);
+        }
+
+        public void exception(Session ssn, SessionException exc)
+        {
+            throw exc;
+        }
+
+        public void closed(Session ssn) {}
+    }
+
+    public static final int UNLIMITED_CREDIT = 0xFFFFFFFF;
+
     private static boolean ENABLE_REPLAY = false;
 
     static
@@ -65,6 +85,7 @@
     }
 
     private byte[] name;
+    private SessionListener listener = new DefaultSessionListener();
     private long timeout = 60000;
     private boolean autoSync = false;
 
@@ -97,6 +118,23 @@
         return name;
     }
 
+    public void setSessionListener(SessionListener listener)
+    {
+        if (listener == null)
+        {
+            this.listener = new DefaultSessionListener();
+        }
+        else
+        {
+            this.listener = listener;
+        }
+    }
+
+    public SessionListener getSessionListener()
+    {
+        return listener;
+    }
+
     public void setAutoSync(boolean value)
     {
         synchronized (commands)
@@ -270,8 +308,8 @@
     {
         if (closed.get())
         {
-            List<ExecutionException> exc = getExceptions();
-            if (!exc.isEmpty())
+            ExecutionException exc = getException();
+            if (exc != null)
             {
                 throw new SessionException(exc);
             }
@@ -361,7 +399,7 @@
             {
                 if (closed.get())
                 {
-                    throw new SessionException(getExceptions());
+                    throw new SessionException(getException());
                 }
                 else
                 {
@@ -375,8 +413,7 @@
 
     private Map<Integer,ResultFuture<?>> results =
         new HashMap<Integer,ResultFuture<?>>();
-    private List<ExecutionException> exceptions =
-        new ArrayList<ExecutionException>();
+    private ExecutionException exception = null;
 
     void result(int command, Struct result)
     {
@@ -388,11 +425,17 @@
         future.set(result);
     }
 
-    void addException(ExecutionException exc)
+    void setException(ExecutionException exc)
     {
-        synchronized (exceptions)
+        synchronized (results)
         {
-            exceptions.add(exc);
+            if (exception != null)
+            {
+                throw new IllegalStateException
+                    (String.format
+                     ("too many exceptions: %s, %s", exception, exc));
+            }
+            exception = exc;
         }
     }
 
@@ -403,11 +446,11 @@
         this.close = close;
     }
 
-    List<ExecutionException> getExceptions()
+    ExecutionException getException()
     {
-        synchronized (exceptions)
+        synchronized (results)
         {
-            return new ArrayList<ExecutionException>(exceptions);
+            return exception;
         }
     }
 
@@ -473,7 +516,7 @@
             }
             else if (closed.get())
             {
-                throw new SessionException(getExceptions());
+                throw new SessionException(getException());
             }
             else
             {

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionClosedException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionClosedException.java?rev=703208&r1=703207&r2=703208&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionClosedException.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionClosedException.java Thu Oct  9 10:07:59 2008
@@ -33,7 +33,7 @@
 
     public SessionClosedException()
     {
-        super(Collections.EMPTY_LIST);
+        super(null);
     }
 
 }

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java?rev=703208&r1=703207&r2=703208&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java Thu Oct  9 10:07:59 2008
@@ -29,7 +29,7 @@
  * @author Rafael H. Schloming
  */
 
-public abstract class SessionDelegate
+public class SessionDelegate
     extends MethodDelegate<Session>
     implements ProtocolDelegate<Session>
 {
@@ -57,7 +57,7 @@
 
     @Override public void executionException(Session ssn, ExecutionException exc)
     {
-        ssn.addException(exc);
+        ssn.setException(exc);
     }
 
     @Override public void sessionCompleted(Session ssn, SessionCompleted cmp)
@@ -122,4 +122,9 @@
         ssn.syncPoint();
     }
 
+    @Override public void messageTransfer(Session ssn, MessageTransfer xfr)
+    {
+        ssn.getSessionListener().message(ssn, xfr);
+    }
+
 }

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionException.java?rev=703208&r1=703207&r2=703208&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionException.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionException.java Thu Oct  9 10:07:59 2008
@@ -27,20 +27,20 @@
  *
  */
 
-public class SessionException extends RuntimeException
+public class SessionException extends TransportException
 {
 
-    private List<ExecutionException> exceptions;
+    private ExecutionException exception;
 
-    public SessionException(List<ExecutionException> exceptions)
+    public SessionException(ExecutionException exception)
     {
-        super(exceptions.isEmpty() ? "" : exceptions.toString());
-        this.exceptions = exceptions;
+        super(String.valueOf(exception));
+        this.exception = exception;
     }
 
-    public List<ExecutionException> getExceptions()
+    public ExecutionException getException()
     {
-        return exceptions;
+        return exception;
     }
 
 }

Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionListener.java?rev=703208&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionListener.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionListener.java Thu Oct  9 10:07:59 2008
@@ -0,0 +1,40 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport;
+
+
+/**
+ * SessionListener
+ *
+ */
+
+public interface SessionListener
+{
+
+    void opened(Session session);
+
+    void message(Session ssn, MessageTransfer xfr);
+
+    void exception(Session session, SessionException exception);
+
+    void closed(Session session);
+
+}

Propchange: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionListener.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Sink.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Sink.java?rev=703208&r1=703207&r2=703208&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Sink.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Sink.java Thu Oct  9 10:07:59 2008
@@ -103,28 +103,17 @@
 
     public static final void main(String[] args) throws IOException
     {
-        ConnectionDelegate delegate = new ConnectionDelegate()
+        ConnectionDelegate delegate = new ServerDelegate()
         {
 
             public SessionDelegate getSessionDelegate()
             {
                 return new Sink();
             }
-
-            public void exception(Throwable t)
-            {
-                t.printStackTrace();
-            }
-
-            public void closed() {}
         };
 
-        //hack
-        delegate.setUsername("guest");
-        delegate.setPassword("guest");
-
         IoAcceptor ioa = new IoAcceptor
-            ("0.0.0.0", 5672, new ConnectionBinding(delegate));
+            ("0.0.0.0", 5672, ConnectionBinding.get(delegate));
         System.out.println
             (String.format
              (FORMAT_HDR, "Session", "Count/MBytes", "Cumulative Rate", "Interval Rate"));

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java?rev=703208&r1=703207&r2=703208&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java Thu Oct  9 10:07:59 2008
@@ -33,23 +33,46 @@
  *
  */
 
-public class ConnectionBinding implements Binding<Connection,ByteBuffer>
+public abstract class ConnectionBinding
+    implements Binding<Connection,ByteBuffer>
 {
 
-    private static final int MAX_FRAME_SIZE = 64 * 1024 - 1;
-
-    private final ConnectionDelegate delegate;
+    public static Binding<Connection,ByteBuffer> get(final Connection connection)
+    {
+        return new ConnectionBinding()
+        {
+            public Connection connection()
+            {
+                return connection;
+            }
+        };
+    }
 
-    public ConnectionBinding(ConnectionDelegate delegate)
+    public static Binding<Connection,ByteBuffer> get(final ConnectionDelegate delegate)
     {
-        this.delegate = delegate;
+        return new ConnectionBinding()
+        {
+            public Connection connection()
+            {
+                Connection conn = new Connection();
+                conn.setConnectionDelegate(delegate);
+                return conn;
+            }
+        };
     }
 
+    public static final int MAX_FRAME_SIZE = 64 * 1024 - 1;
+
+    public abstract Connection connection();
+
     public Connection endpoint(Sender<ByteBuffer> sender)
     {
+        Connection conn = connection();
+
         // XXX: hardcoded max-frame
-        return new Connection
-            (new Disassembler(sender, MAX_FRAME_SIZE), delegate);
+        Disassembler dis = new Disassembler(sender, MAX_FRAME_SIZE);
+        conn.setSender(dis);
+        return conn;
     }
 
     public Receiver<ByteBuffer> receiver(Connection conn)

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java?rev=703208&r1=703207&r2=703208&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java Thu Oct  9 10:07:59 2008
@@ -61,7 +61,7 @@
         start();
     }
 
-    void close()
+    void close(boolean block)
     {
         if (!closed.getAndSet(true))
         {
@@ -75,7 +75,7 @@
                 {
                     socket.shutdownInput();
                 }
-                if (Thread.currentThread() != this)
+                if (block && Thread.currentThread() != this)
                 {
                     join(timeout);
                     if (isAlive())
@@ -121,6 +121,7 @@
                     }
                 }
             }
+            socket.close();
         }
         catch (Throwable t)
         {
@@ -129,7 +130,6 @@
         finally
         {
             receiver.closed();
-            transport.getSender().close();
         }
     }
 

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java?rev=703208&r1=703207&r2=703208&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java Thu Oct  9 10:07:59 2008
@@ -196,17 +196,12 @@
                         throw new TransportException("join timed out");
                     }
                 }
-                transport.getReceiver().close();
-                socket.close();
+                transport.getReceiver().close(false);
             }
             catch (InterruptedException e)
             {
                 throw new TransportException(e);
             }
-            catch (IOException e)
-            {
-                throw new TransportException(e);
-            }
 
             if (reportException && exception != null)
             {

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java?rev=703208&r1=703207&r2=703208&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java Thu Oct  9 10:07:59 2008
@@ -108,7 +108,7 @@
     public static final Connection connect(String host, int port,
                                            ConnectionDelegate delegate)
     {
-        return connect(host, port, new ConnectionBinding(delegate));
+        return connect(host, port, ConnectionBinding.get(delegate));
     }
 
     public static void connect_0_9(AMQVersionAwareProtocolSession session, String host, int port)

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java?rev=703208&r1=703207&r2=703208&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java Thu Oct  9 10:07:59 2008
@@ -262,13 +262,13 @@
                                     ConnectionDelegate delegate)
         throws IOException
     {
-        accept(host, port, new ConnectionBinding(delegate));
+        accept(host, port, ConnectionBinding.get(delegate));
     }
 
     public static final Connection connect(String host, int port,
                                            ConnectionDelegate delegate)
     {
-        return connect(host, port, new ConnectionBinding(delegate));
+        return connect(host, port, ConnectionBinding.get(delegate));
     }
 
 }

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioHandler.java?rev=703208&r1=703207&r2=703208&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioHandler.java Thu Oct  9 10:07:59 2008
@@ -87,8 +87,9 @@
         }
 
         NioSender sender = new NioSender(_ch);
-        Connection con = new Connection
-            (new Disassembler(sender, 64*1024 - 1), delegate);
+        Connection con = new Connection();
+        con.setSender(new Disassembler(sender, 64*1024 - 1));
+        con.setConnectionDelegate(delegate);
 
         con.setConnectionId(_count.incrementAndGet());
         _handlers.put(con.getConnectionId(),sender);

Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/util/Waiter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/util/Waiter.java?rev=703208&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/util/Waiter.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/util/Waiter.java Thu Oct  9 10:07:59 2008
@@ -0,0 +1,63 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport.util;
+
+
+/**
+ * Waiter
+ *
+ */
+
+public final class Waiter
+{
+
+    private final Object lock;
+    private final long timeout;
+    private final long start;
+    private long elapsed;
+
+    public Waiter(Object lock, long timeout)
+    {
+        this.lock = lock;
+        this.timeout = timeout;
+        this.start = System.currentTimeMillis();
+        this.elapsed = 0;
+    }
+
+    public boolean hasTime()
+    {
+        return elapsed < timeout;
+    }
+
+    public void await()
+    {
+        try
+        {
+            lock.wait(timeout - elapsed);
+        }
+        catch (InterruptedException e)
+        {
+            // pass
+        }
+        elapsed = System.currentTimeMillis() - start;
+    }
+
+}

Propchange: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/util/Waiter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java?rev=703208&r1=703207&r2=703208&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java Thu Oct  9 10:07:59 2008
@@ -50,38 +50,30 @@
 
         port = AvailablePortFinder.getNextAvailable(12000);
 
-        ConnectionDelegate server = new ConnectionDelegate() {
-            public void init(Channel ch, ProtocolHeader hdr) {
+        ConnectionDelegate server = new ServerDelegate() {
+            @Override public void connectionOpen(Channel ch, ConnectionOpen open)
+            {
+                super.connectionOpen(ch, open);
                 ch.getConnection().close();
             }
-
-            public SessionDelegate getSessionDelegate() {
-                return new SessionDelegate() {};
-            }
-            public void exception(Throwable t) {
-                log.error(t, "exception caught");
-            }
-            public void closed() {}
         };
 
         IoAcceptor ioa = new IoAcceptor
-            ("localhost", port, new ConnectionBinding(server));
+            ("localhost", port, ConnectionBinding.get(server));
         ioa.start();
     }
 
     private Connection connect(final Condition closed)
     {
-        Connection conn = IoTransport.connect("localhost", port, new ConnectionDelegate()
+        Connection conn = new Connection();
+        conn.setConnectionListener(new ConnectionListener()
         {
-            public SessionDelegate getSessionDelegate()
+            public void opened(Connection conn) {}
+            public void exception(Connection conn, ConnectionException exc)
             {
-                return new SessionDelegate() {};
+                exc.printStackTrace();
             }
-            public void exception(Throwable t)
-            {
-                t.printStackTrace();
-            }
-            public void closed()
+            public void closed(Connection conn)
             {
                 if (closed != null)
                 {
@@ -89,8 +81,7 @@
                 }
             }
         });
-
-        conn.send(new ProtocolHeader(1, 0, 10));
+        conn.connect("localhost", port, null, "guest", "guest");
         return conn;
     }
 

Modified: incubator/qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/QpidDatasource.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/QpidDatasource.java?rev=703208&r1=703207&r2=703208&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/QpidDatasource.java (original)
+++ incubator/qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/QpidDatasource.java Thu Oct  9 10:07:59 2008
@@ -28,140 +28,95 @@
 import org.apache.commons.pool.ObjectPool;
 import org.apache.commons.pool.impl.GenericObjectPool;
 import org.apache.commons.pool.impl.GenericObjectPoolFactory;
-import org.apache.qpid.ErrorCode;
-import org.apache.qpid.QpidException;
-import org.apache.qpid.nclient.Client;
-import org.apache.qpid.nclient.ClosedListener;
-import org.apache.qpid.nclient.Connection;
-import org.apache.qpid.nclient.DtxSession;
-import org.apache.qpid.nclient.Session;
+import org.apache.qpid.transport.Connection;
+import org.apache.qpid.transport.ConnectionException;
 import org.apache.qpid.transport.util.Logger;
 
 /**
  * Qpid datasource.
- * Basically it is a connection pool manager used for optimizing broker connections usage. 
- * 
+ * Basically it is a connection pool manager used for optimizing broker connections usage.
+ *
  * @author Andrea Gazzarini
  */
-public final class QpidDatasource 
+public final class QpidDatasource
 {
     private final static Logger LOGGER = Logger.get(QpidDatasource.class);
-    
+
     /**
      * A connection decorator used for adding pool interaction behaviour to an existing connection.
-     * 
+     *
      * @author Andrea Gazzarini
      */
-    public class ConnectionDecorator implements Connection,ClosedListener 
+    class PooledConnection extends Connection
     {
-        private final Connection _decoratee;
         private final UUID _brokerId;
         private boolean _valid;
-      
+
         /**
          * Builds a new decorator with the given connection.
-         * 
+         *
          * @param brokerId the broker identifier.
          * @param decoratee the underlying connection.
          */
-        private ConnectionDecorator(UUID brokerId, Connection decoratee)
+        private PooledConnection(UUID brokerId)
         {
-            this._decoratee = decoratee;
             this._brokerId = brokerId;
-            _decoratee.setClosedListener(this);
             _valid = true;
         }
-        
+
         /**
          * Returns true if the underlying connection is still valid and can be used.
-         * 
+         *
          * @return true if the underlying connection is still valid and can be used.
          */
         boolean isValid()
         {
             return _valid;
         }
-        
+
+        void reallyClose()
+        {
+            super.close();
+        }
+
         /**
          * Returns the connection to the pool. That is, marks this connections as available.
          * After that, this connection will be available for further operations.
          */
-        public void close () throws QpidException
+        public void close()
         {
             try
             {
                 pools.get(_brokerId).returnObject(this);
                 LOGGER.debug("<QMAN-200012> : Connection %s returned to the pool.", this);
-            } catch (Exception exception)
+            }
+            catch (Exception e)
             {
-                throw new QpidException("Error while closing connection.",ErrorCode.CONNECTION_ERROR,exception);
-            }  
-        }
-
-        /**
-         * Do nothing : underlying connection is already connected.
-         */
-        public void connect (String host, int port, String virtualHost, String username, String password)
-                throws QpidException
-        {
-            // DO NOTHING : DECORATEE CONNECTION IS ALREADY CONNECTED.
-        }
-        
-        /**
-         * Do nothing : underlying connection is already connected.
-         */
-        public void connect (String url) throws QpidException
-        {
-            // DO NOTHING : DECORATEE CONNECTION IS ALREADY CONNECTED.
+                throw new ConnectionException(e);
+            }
         }
 
-        /**
-         * @see Connection#createDTXSession(int)
-         */
-        public DtxSession createDTXSession (int expiryInSeconds)
+        public void exception(Throwable t)
         {
-            return _decoratee.createDTXSession(expiryInSeconds);
-        }
-
-        /**
-         * @see Connection#createSession(long)
-         */
-        public Session createSession (long expiryInSeconds)
-        {
-            return _decoratee.createSession(expiryInSeconds);
+            super.exception(t);
+            _valid = false;
         }
+    }
 
-        /**
-         * Do nothing : closed listener  has been already injected.
-         */
-        public void setClosedListener (ClosedListener exceptionListner)
-        {
-        }
-        
-        /**
-         * Callback method used for error notifications while underlying connection is closing.
-         */
-        public void onClosed (ErrorCode errorCode, String reason, Throwable t)
-        {
-            _valid = false;
-            LOGGER.error(t,"<QMAN-100012> : Error on closing connection. Reason is : %s, error code is %s",reason,errorCode.getCode());
-        }        
-    };
-        
     /**
-     * This is the connection factory, that is, the factory used to manage the lifecycle (create, validate & destroy) of 
+     * This is the connection factory, that is, the factory used to manage the lifecycle (create, validate & destroy) of
      * the broker connection(s).
-     * 
+     *
      * @author Andrea Gazzarini
      */
     class QpidConnectionFactory extends BasePoolableObjectFactory
-    {    
+    {
         private final BrokerConnectionData _connectionData;
         private final UUID _brokerId;
-        
+
         /**
          * Builds a new connection factory with the given parameters.
-         * 
+         *
          * @param brokerId the broker identifier.
          * @param connectionData the connecton data.
          */
@@ -170,35 +125,35 @@
             this._connectionData = connectionData;
             this._brokerId = brokerId;
         }
-        
+
         /**
          * Creates a new underlying connection.
          */
         @Override
         public Connection makeObject () throws Exception
         {
-            Connection connection = Client.createConnection();
+            PooledConnection connection = new PooledConnection(_brokerId);
             connection.connect(
-                    _connectionData.getHost(), 
-                    _connectionData.getPort(), 
-                    _connectionData.getVirtualHost(), 
-                    _connectionData.getUsername(), 
+                    _connectionData.getHost(),
+                    _connectionData.getPort(),
+                    _connectionData.getVirtualHost(),
+                    _connectionData.getUsername(),
                     _connectionData.getPassword());
-            return new ConnectionDecorator(_brokerId,connection);
+            return connection;
         }
-    
+
         /**
          * Validates the underlying connection.
          */
         @Override
         public boolean validateObject (Object obj)
         {
-            ConnectionDecorator connection = (ConnectionDecorator) obj;
+            PooledConnection connection = (PooledConnection) obj;
             boolean isValid = connection.isValid();
             LOGGER.debug("<QMAN-200013> : Test connection on reserve. Is valid? %s",isValid);
             return isValid;
         }
-        
+
         /**
          * Closes the underlying connection.
          */
@@ -207,8 +162,8 @@
         {
             try
             {
-                ConnectionDecorator connection = (ConnectionDecorator) obj;
-                connection._decoratee.close();
+                PooledConnection connection = (PooledConnection) obj;
+                connection.reallyClose();
                 LOGGER.debug("<QMAN-200014> : Connection has been destroyed.");
             } catch (Exception e)
             {
@@ -216,21 +171,21 @@
             }
         }
     }
-   
+
     // Singleton instance.
     private static QpidDatasource instance = new QpidDatasource();
 
     // Each entry contains a connection pool for a specific broker.
     private Map<UUID, ObjectPool> pools = new HashMap<UUID, ObjectPool>();
-    
+
     // Private constructor.
     private QpidDatasource()
     {
     }
-    
+
     /**
      * Gets an available connection from the pool of the given broker.
-     * 
+     *
      * @param brokerId the broker identifier.
      * @return a valid connection to the broker associated with the given identifier.
      */
@@ -238,20 +193,20 @@
     {
         return (Connection) pools.get(brokerId).borrowObject();
     }
-    
+
     /**
      * Entry point method for retrieving the singleton instance of this datasource.
-     * 
+     *
      * @return the qpid datasource singleton instance.
      */
-    public static QpidDatasource getInstance() 
+    public static QpidDatasource getInstance()
     {
         return instance;
     }
-    
+
     /**
      * Adds a connection pool to this datasource.
-     * 
+     *
      * @param brokerId the broker identifier that will be associated with the new connection pool.
      * @param connectionData the broker connection data.
      * @throws Exception when the pool cannot be created.
@@ -265,12 +220,12 @@
                 true,
                 false);
         ObjectPool pool = factory.createPool();
-        
+
         for (int i  = 0; i < connectionData.getInitialPoolCapacity(); i++)
         {
             pool.returnObject(pool.borrowObject());
         }
-            
+
         pools.put(brokerId,pool);
     }
 }
\ No newline at end of file

Modified: incubator/qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QpidService.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QpidService.java?rev=703208&r1=703207&r2=703208&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QpidService.java (original)
+++ incubator/qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QpidService.java Thu Oct  9 10:07:59 2008
@@ -22,42 +22,47 @@
 
 import java.nio.ByteBuffer;
 import java.util.Arrays;
+import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.qpid.QpidException;
 import org.apache.qpid.management.Constants;
 import org.apache.qpid.management.Names;
 import org.apache.qpid.management.configuration.Configuration;
 import org.apache.qpid.management.configuration.QpidDatasource;
-import org.apache.qpid.nclient.Connection;
-import org.apache.qpid.nclient.Session;
 import org.apache.qpid.nclient.util.MessageListener;
 import org.apache.qpid.nclient.util.MessagePartListenerAdapter;
+import org.apache.qpid.transport.Connection;
 import org.apache.qpid.transport.MessageAcceptMode;
 import org.apache.qpid.transport.MessageAcquireMode;
 import org.apache.qpid.transport.MessageCreditUnit;
+import org.apache.qpid.transport.MessageTransfer;
 import org.apache.qpid.transport.Option;
+import org.apache.qpid.transport.Session;
+import org.apache.qpid.transport.SessionException;
+import org.apache.qpid.transport.SessionListener;
 import org.apache.qpid.transport.util.Logger;
 
 /**
  * Qpid Broker facade.
- * 
+ *
  * @author Andrea Gazzarini
  */
-public class QpidService
+public class QpidService implements SessionListener
 {
     private final static Logger LOGGER = Logger.get(QpidService.class);
 
     // Inner static class used for logging and avoid conditional logic (isDebugEnabled()) duplication.
-    private static class Log 
-    {            
+    private static class Log
+    {
         /**
          * Logs the content f the message.
          * This will be written on log only if DEBUG level is enabled.
-         * 
+         *
          * @param messageContent the raw content of the message.
          */
-        static void logMessageContent(byte [] messageContent) 
+        static void logMessageContent(byte [] messageContent)
         {
             if (LOGGER.isDebugEnabled()) {
                 LOGGER.debug(
@@ -65,56 +70,81 @@
                         Arrays.toString(messageContent));
             }
         }
-        
+
         /**
          * Logs the content f the message.
          * This will be written on log only if DEBUG level is enabled.
-         * 
+         *
          * @param messageContent the raw content of the message.
          */
-        static void logMessageContent(ByteBuffer messageContent) 
+        static void logMessageContent(ByteBuffer messageContent)
         {
             if (LOGGER.isDebugEnabled()) {
                 LOGGER.debug(
                         "<QMAN-200002> : Message has been sent to management exchange.");
             }
-        }        
+        }
     }
-    
+
     private UUID _brokerId;
     private Connection _connection;
     private Session _session;
-    
+    private Map<String,MessagePartListenerAdapter> _listeners;
+
     /**
      * Builds a new service with the given connection data.
-     * 
+     *
      * @param connectionData the connection data of the broker.
      */
-    public QpidService(UUID brokerId) 
+    public QpidService(UUID brokerId)
     {
         this._brokerId = brokerId;
     }
-    
+
     /**
      * Estabilishes a connection with the broker.
-     * 
+     *
      * @throws QpidException in case of connection failure.
      */
     public void connect() throws Exception
     {
         _connection = QpidDatasource.getInstance().getConnection(_brokerId);
+        _listeners = new ConcurrentHashMap<String,MessagePartListenerAdapter>();
         _session = _connection.createSession(Constants.NO_EXPIRATION);
+        _session.setSessionListener(this);
     }
-    
+
+    public void opened(Session ssn) {}
+
+    public void message(Session ssn, MessageTransfer xfr)
+    {
+        MessagePartListenerAdapter l = _listeners.get(xfr.getDestination());
+        if (l == null)
+        {
+            LOGGER.error("unhandled message: %s", xfr);
+        }
+        else
+        {
+            l.messageTransfer(xfr);
+        }
+    }
+
+    public void exception(Session ssn, SessionException exc)
+    {
+        LOGGER.error(exc, "session %s exception", ssn);
+    }
+
+    public void closed(Session ssn) {}
+
     /**
-     * All the previously entered outstanding commands are asynchronous. 
+     * All the previously entered outstanding commands are asynchronous.
      * Synchronous behavior is achieved through invoking this  method.
      */
-    public void sync() 
+    public void sync()
     {
         _session.sync();
     }
-    
+
     /**
      * Closes communication with broker.
      */
@@ -124,48 +154,50 @@
         {
             _session.close();
             _session = null;
+            _listeners = null;
         } catch (Exception e)
         {
         }
         try
         {
-            _connection.close();        
+            _connection.close();
             _connection = null;
         } catch (Exception e)
         {
         }
     }
-    
+
     /**
      * Associate a message listener with a destination therefore creating a new subscription.
-     * 
+     *
      * @param queueName The name of the queue that the subscriber is receiving messages from.
      * @param destinationName the name of the destination, or delivery tag, for the subscriber.
-     * @param listener the listener for this destination. 
-     * 
+     * @param listener the listener for this destination.
+     *
      * @see Session#messageSubscribe(String, String, short, short, org.apache.qpid.nclient.MessagePartListener, java.util.Map, org.apache.qpid.transport.Option...)
      */
-    public void createSubscription(String queueName, String destinationName,MessageListener listener)
+    public void createSubscription(String queueName, String destinationName, MessageListener listener)
     {
-        _session.messageSubscribe(
-                queueName,
-                destinationName,
-                Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED,
-                Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE,
-                new MessagePartListenerAdapter(listener), null);
-        
-        _session.messageFlow(destinationName, MessageCreditUnit.BYTE, Session.MESSAGE_FLOW_MAX_BYTES);
-        _session.messageFlow(destinationName, MessageCreditUnit.MESSAGE, Integer.MAX_VALUE);
-        
+        _listeners.put(destinationName, new MessagePartListenerAdapter(listener));
+        _session.messageSubscribe
+            (queueName,
+             destinationName,
+             MessageAcceptMode.NONE,
+             MessageAcquireMode.PRE_ACQUIRED,
+             null, 0, null);
+
+        _session.messageFlow(destinationName, MessageCreditUnit.BYTE, Session.UNLIMITED_CREDIT);
+        _session.messageFlow(destinationName, MessageCreditUnit.MESSAGE, Session.UNLIMITED_CREDIT);
+
         LOGGER.debug(
-                "<QMAN-200003> : New subscription between queue %s and destination %s has been declared.", 
+                "<QMAN-200003> : New subscription between queue %s and destination %s has been declared.",
                 queueName,
                 destinationName);
     }
-    
+
     /**
      * Removes a previously declared consumer from the broker.
-     * 
+     *
      * @param destinationName the name of the destination, or delivery tag, for the subscriber.
      * @see Session#messageCancel(String, Option...)
      */
@@ -173,10 +205,10 @@
     {
         _session.messageCancel(destinationName);
         LOGGER.debug(
-                "<QMAN-200026> : Subscription named %s has been removed from remote broker.", 
+                "<QMAN-200026> : Subscription named %s has been removed from remote broker.",
                 destinationName);
-    }    
-    
+    }
+
     /**
      * Declares a queue on the broker with the given name.
      *
@@ -200,27 +232,27 @@
         _session.queueDelete(queueName);
         LOGGER.debug("<QMAN-2000025> : Queue with name %s has been removed.",queueName);
     }
-    
+
     /**
      * Binds (on the broker) a queue with an exchange.
      *
-     * @param queueName the name of the queue to bind. 
+     * @param queueName the name of the queue to bind.
      * @param exchangeName the exchange name.
-     * @param routingKey the routing key used for the binding. 
+     * @param routingKey the routing key used for the binding.
      * @see Session#exchangeBind(String, String, String, java.util.Map, Option...)
      */
     public void declareBinding(String queueName, String exchangeName, String routingKey)
     {
         _session.exchangeBind(queueName, exchangeName, routingKey, null);
         LOGGER.debug(
-                "<QMAN-200005> : New binding with %s as routing key has been declared between queue %s and exchange %s.", 
+                "<QMAN-200005> : New binding with %s as routing key has been declared between queue %s and exchange %s.",
                 routingKey,queueName,
                 exchangeName);
     }
-    
+
     /**
      * Removes a previously declare binding between an exchange and a queue.
-     * 
+     *
      * @param queueName the name of the queue.
      * @param exchangeName the name of the exchange.
      * @param routingKey the routing key used for binding.
@@ -229,42 +261,42 @@
     {
         _session.exchangeUnbind(queueName, exchangeName, routingKey);
         LOGGER.debug(
-                "<QMAN-200005> : Binding with %s as routing key has been removed between queue %s and exchange %s.", 
+                "<QMAN-200005> : Binding with %s as routing key has been removed between queue %s and exchange %s.",
                 routingKey,queueName,
                 exchangeName);
     }
-    
+
     /**
      * Sends a command message with the given data on the management queue.
-     * 
+     *
      * @param messageData the command message content.
      */
-    public void sendCommandMessage(byte [] messageData) 
+    public void sendCommandMessage(byte [] messageData)
     {
         _session.messageTransfer(
                 Names.MANAGEMENT_EXCHANGE,
                 MessageAcceptMode.EXPLICIT,
                 MessageAcquireMode.PRE_ACQUIRED,
                 Configuration.getInstance().getCommandMessageHeader(),
-                messageData);     
-        
+                messageData);
+
         Log.logMessageContent (messageData);
     }
-    
+
     /**
      * Sends a command message with the given data on the management queue.
-     * 
+     *
      * @param messageData the command message content.
      */
-    public void sendCommandMessage(ByteBuffer messageData) 
+    public void sendCommandMessage(ByteBuffer messageData)
     {
         _session.messageTransfer(
                 Names.MANAGEMENT_EXCHANGE,
                 MessageAcceptMode.EXPLICIT,
                 MessageAcquireMode.PRE_ACQUIRED,
                 Configuration.getInstance().getCommandMessageHeader(),
-                messageData);     
-        
+                messageData);
+
         Log.logMessageContent (messageData);
-    }    
+    }
 }
\ No newline at end of file

Modified: incubator/qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java?rev=703208&r1=703207&r2=703208&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java (original)
+++ incubator/qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java Thu Oct  9 10:07:59 2008
@@ -640,71 +640,47 @@
     }
 
     private static final org.apache.qpid.transport.Connection getConnection
-        (Options opts, final SessionDelegate delegate)
+        (Options opts)
     {
-        final Object lock = new Object();
         org.apache.qpid.transport.Connection conn =
-            IoTransport.connect(opts.broker, opts.port,
-                                new ClientDelegate()
-                                {
-                                    public SessionDelegate getSessionDelegate()
-                                    {
-                                        return delegate;
-                                    }
-                                    public void exception(Throwable t)
-                                    {
-                                        t.printStackTrace();
-                                    }
-                                    public void closed() {}
-                                    @Override public void connectionOpenOk(Channel ch,
-                                                                           ConnectionOpenOk ok)
-                                    {
-                                        synchronized (lock)
-                                        {
-                                            lock.notify();
-                                        }
-                                    }
-                                });
-        conn.send(new ProtocolHeader(1, 0, 10));
-
-        synchronized (lock)
+            new org.apache.qpid.transport.Connection();
+        conn.connect(opts.broker, opts.port, null, "guest", "guest");
+        return conn;
+    }
+
+    private static abstract class NativeListener implements SessionListener
+    {
+
+        public void opened(org.apache.qpid.transport.Session ssn) {}
+
+        public void exception(org.apache.qpid.transport.Session ssn,
+                              SessionException exc)
         {
-            try
-            {
-                lock.wait();
-            }
-            catch (InterruptedException e)
-            {
-                throw new RuntimeException(e);
-            }
+            exc.printStackTrace();
         }
 
-        return conn;
+        public void closed(org.apache.qpid.transport.Session ssn) {}
+
     }
 
     private static final void native_publisher(Options opts) throws Exception
     {
         final long[] echos = { 0 };
-        org.apache.qpid.transport.Connection conn = getConnection
-            (opts,
-             new SessionDelegate() {
-                 @Override public void messageTransfer
-                     (org.apache.qpid.transport.Session ssn,
-                      MessageTransfer mt)
-                 {
-                     synchronized (echos)
-                     {
-                         echos[0]++;
-                         echos.notify();
-                     }
-                     ssn.processed(mt);
-                 }
-             });
-
-        Channel ch = conn.getChannel(0);
-        org.apache.qpid.transport.Session ssn = new org.apache.qpid.transport.Session("spam-session".getBytes());
-        ssn.attach(ch);
-        ssn.sessionAttach(ssn.getName());
+        org.apache.qpid.transport.Connection conn = getConnection(opts);
+        org.apache.qpid.transport.Session ssn = conn.createSession();
+        ssn.setSessionListener(new NativeListener()
+        {
+            public void message(org.apache.qpid.transport.Session ssn,
+                                MessageTransfer xfr)
+            {
+                synchronized (echos)
+                {
+                    echos[0]++;
+                    echos.notify();
+                }
+                ssn.processed(xfr);
+            }
+        });
 
         ssn.invoke(new QueueDeclare().queue("test-queue").durable(false));
         ssn.invoke(new QueueDeclare().queue("echo-queue").durable(false));
@@ -794,6 +770,7 @@
         ssn.messageCancel("echo-queue");
 
         ssn.sync();
+        ssn.close();
         conn.close();
     }
 
@@ -805,57 +782,51 @@
         dp.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
         final MessageProperties mp = new MessageProperties();
         final Object done = new Object();
-        org.apache.qpid.transport.Connection conn = getConnection
-            (opts,
-             new SessionDelegate() {
-
-                 private long count = 0;
-                 private long lastTime = 0;
-                 private long start;
-
-                 @Override public void messageTransfer
-                     (org.apache.qpid.transport.Session ssn,
-                      MessageTransfer mt)
-                 {
-                     if (count == 0)
-                     {
-                         start = System.currentTimeMillis();
-                     }
-
-                     boolean sample = opts.sample > 0 && (count % opts.sample) == 0;
-                     long time = sample ? System.currentTimeMillis() : 0;
-
-                     if (opts.window > 0 && (count % opts.window) == 0)
-                     {
-                         ssn.messageTransfer("amq.direct",
-                                             MessageAcceptMode.NONE,
-                                             MessageAcquireMode.PRE_ACQUIRED,
-                                             new Header(dp, mp),
-                                             echo);
-                     }
-
-                     if (sample)
-                     {
-                         sample(opts, Column.RIGHT, "NC", count, start, time, lastTime);
-                         lastTime = time;
-                     }
-                     ssn.processed(mt);
-                     count++;
-
-                     if (opts.count > 0 && count >= opts.count)
-                     {
-                         synchronized (done)
-                         {
-                             done.notify();
-                         }
-                     }
-                 }
-             });
-
-        Channel ch = conn.getChannel(0);
-        org.apache.qpid.transport.Session ssn = new org.apache.qpid.transport.Session("listener-session".getBytes());
-        ssn.attach(ch);
-        ssn.sessionAttach(ssn.getName());
+        org.apache.qpid.transport.Connection conn = getConnection(opts);
+        org.apache.qpid.transport.Session ssn = conn.createSession();
+        ssn.setSessionListener(new NativeListener()
+        {
+            private long count = 0;
+            private long lastTime = 0;
+            private long start;
+
+            public void message(org.apache.qpid.transport.Session ssn,
+                                MessageTransfer xfr)
+            {
+                if (count == 0)
+                {
+                    start = System.currentTimeMillis();
+                }
+
+                boolean sample = opts.sample > 0 && (count % opts.sample) == 0;
+                long time = sample ? System.currentTimeMillis() : 0;
+
+                if (opts.window > 0 && (count % opts.window) == 0)
+                {
+                    ssn.messageTransfer("amq.direct",
+                                        MessageAcceptMode.NONE,
+                                        MessageAcquireMode.PRE_ACQUIRED,
+                                        new Header(dp, mp),
+                                        echo);
+                }
+
+                if (sample)
+                {
+                    sample(opts, Column.RIGHT, "NC", count, start, time, lastTime);
+                    lastTime = time;
+                }
+                ssn.processed(xfr);
+                count++;
+
+                if (opts.count > 0 && count >= opts.count)
+                {
+                    synchronized (done)
+                    {
+                        done.notify();
+                    }
+                }
+            }
+        });
 
         ssn.invoke(new QueueDeclare().queue("test-queue").durable(false));
         ssn.invoke(new QueueDeclare().queue("echo-queue").durable(false));
@@ -879,6 +850,7 @@
         ssn.messageCancel("test-queue");
 
         ssn.sync();
+        ssn.close();
         conn.close();
     }