You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2008/10/13 18:07:01 UTC

svn commit: r704147 - in /incubator/qpid/trunk/qpid/java/common: ./ src/main/java/org/apache/qpid/ src/main/java/org/apache/qpid/transport/ src/main/java/org/apache/qpid/transport/network/io/ src/test/java/org/apache/qpid/transport/

Author: rhs
Date: Mon Oct 13 09:07:01 2008
New Revision: 704147

URL: http://svn.apache.org/viewvc?rev=704147&view=rev
Log:
QPID-1339:

 - Removed the Channel class in order to simplify the state management
   surrounding Sessions and Connections.

 - Consolidated the ChannelDelegate into the ConnectionDelegate.

 - Modified MethodDelegate to invoke a generic handle method as the
   default action for each dispatched method.

 - Modified the code generator to produce a separate ConnectionInvoker
   and SessionInvoker.

 - Modified the invoker template to use package level visibility for
   all controls rather than public visibility.

Removed:
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Channel.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ChannelDelegate.java
Modified:
    incubator/qpid/trunk/qpid/java/common/Composite.tpl
    incubator/qpid/trunk/qpid/java/common/Invoker.tpl
    incubator/qpid/trunk/qpid/java/common/MethodDelegate.tpl
    incubator/qpid/trunk/qpid/java/common/codegen
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/ToyClient.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Binary.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionClosedException.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionException.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Sink.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
    incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java

Modified: incubator/qpid/trunk/qpid/java/common/Composite.tpl
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/Composite.tpl?rev=704147&r1=704146&r2=704147&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/Composite.tpl (original)
+++ incubator/qpid/trunk/qpid/java/common/Composite.tpl Mon Oct 13 09:07:01 2008
@@ -130,9 +130,13 @@
 }
     }
 
-    public <C> void dispatch(C context, MethodDelegate<C> delegate) {
+${
+
+if base == "Method":
+  out("""    public <C> void dispatch(C context, MethodDelegate<C> delegate) {
         delegate.$(dromedary(name))(context, this);
-    }
+    }""")
+}
 
 ${
 for f in fields:

Modified: incubator/qpid/trunk/qpid/java/common/Invoker.tpl
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/Invoker.tpl?rev=704147&r1=704146&r2=704147&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/Invoker.tpl (original)
+++ incubator/qpid/trunk/qpid/java/common/Invoker.tpl Mon Oct 13 09:07:01 2008
@@ -5,14 +5,12 @@
 import java.util.Map;
 import java.util.UUID;
 
-public abstract class Invoker {
-
-    protected abstract void invoke(Method method);
-    protected abstract <T> Future<T> invoke(Method method, Class<T> resultClass);
-
+public abstract class $(invoker) {
 ${
 from genutil import *
 
+results = False
+
 for c in composites:
   name = cname(c)
   fields = get_fields(c)
@@ -20,6 +18,7 @@
   args = get_arguments(c, fields)
   result = c["result"]
   if result:
+    results = True
     if not result["@type"]:
       rname = cname(result["struct"])
     else:
@@ -32,11 +31,22 @@
     jreturn = ""
     jclass = ""
 
+  if c.name == "command":
+    access = "public "
+  else:
+    access = ""
+
   out("""
-    public final $jresult $(dromedary(name))($(", ".join(params))) {
+    $(access)final $jresult $(dromedary(name))($(", ".join(params))) {
         $(jreturn)invoke(new $name($(", ".join(args)))$jclass);
     }
 """)
 }
-
+    protected abstract void invoke(Method method);
+${
+if results:
+  out("""
+    protected abstract <T> Future<T> invoke(Method method, Class<T> resultClass);
+""")
+}
 }

Modified: incubator/qpid/trunk/qpid/java/common/MethodDelegate.tpl
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/MethodDelegate.tpl?rev=704147&r1=704146&r2=704147&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/MethodDelegate.tpl (original)
+++ incubator/qpid/trunk/qpid/java/common/MethodDelegate.tpl Mon Oct 13 09:07:01 2008
@@ -2,11 +2,15 @@
 
 public abstract class MethodDelegate<C> {
 
+    public abstract void handle(C context, Method method);
 ${
 from genutil import *
 
 for c in composites:
   name = cname(c)
-  out("    public void $(dromedary(name))(C context, $name struct) {}\n")
+  out("""
+    public void $(dromedary(name))(C context, $name method) {
+        handle(context, method);
+    }""")
 }
 }

Modified: incubator/qpid/trunk/qpid/java/common/codegen
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/codegen?rev=704147&r1=704146&r2=704147&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/codegen (original)
+++ incubator/qpid/trunk/qpid/java/common/codegen Mon Oct 13 09:07:01 2008
@@ -43,14 +43,20 @@
 commands = spec.query["amqp/class/command", excludes]
 
 composites = structs + controls + commands
+actions = controls + commands
+connection = [c for c in actions if c.parent["@name"] == "connection"]
+session = [c for c in actions if c.parent["@name"] != "connection"]
 
 for c in composites:
   name = cname(c)
   execute("%s.java" % name, "Composite.tpl", type = c, name = name)
 
-execute("MethodDelegate.java", "MethodDelegate.tpl", composites = composites)
+execute("MethodDelegate.java", "MethodDelegate.tpl", composites = actions)
 execute("Option.java", "Option.tpl", composites = composites)
-execute("Invoker.java", "Invoker.tpl", composites = controls + commands)
+execute("ConnectionInvoker.java", "Invoker.tpl", invoker = "ConnectionInvoker",
+        composites = connection)
+execute("SessionInvoker.java", "Invoker.tpl", invoker = "SessionInvoker",
+        composites = session)
 execute("StructFactory.java", "StructFactory.tpl", composites = composites)
 
 def is_enum(nd):

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/ToyClient.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/ToyClient.java?rev=704147&r1=704146&r2=704147&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/ToyClient.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/ToyClient.java Mon Oct 13 09:07:01 2008
@@ -33,48 +33,28 @@
  * @author Rafael H. Schloming
  */
 
-class ToyClient extends SessionDelegate
+class ToyClient implements SessionListener
 {
+    public void opened(Session ssn) {}
 
-    @Override public void messageReject(Session ssn, MessageReject reject)
+    public void exception(Session ssn, SessionException exc)
     {
-        for (Range range : reject.getTransfers())
-        {
-            for (long l = range.getLower(); l <= range.getUpper(); l++)
-            {
-                System.out.println("message rejected: " +
-                                   ssn.getCommand((int) l));
-            }
-        }
+        exc.printStackTrace();
     }
 
-    @Override public void messageTransfer(Session ssn, MessageTransfer xfr)
+    public void message(Session ssn, MessageTransfer xfr)
     {
         System.out.println("msg: " + xfr);
     }
 
+    public void closed(Session ssn) {}
+
     public static final void main(String[] args)
     {
-        Connection conn = MinaHandler.connect("0.0.0.0", 5672,
-                                              new ClientDelegate(null, "guest", "guest")
-                                              {
-                                                  public SessionDelegate getSessionDelegate()
-                                                  {
-                                                      return new ToyClient();
-                                                  }
-                                                  public void exception(Throwable t)
-                                                  {
-                                                      t.printStackTrace();
-                                                  }
-                                                  public void closed() {}
-                                              });
-        conn.send(new ProtocolHeader
-                  (1, 0, 10));
-
-        Channel ch = conn.getChannel(0);
-        Session ssn = new Session("my-session".getBytes());
-        ssn.attach(ch);
-        ssn.sessionAttach(ssn.getName());
+        Connection conn = new Connection();
+        conn.connect("0.0.0.0", 5672, null, "guest", "guest");
+        Session ssn = conn.createSession();
+        ssn.setSessionListener(new ToyClient());
 
         ssn.queueDeclare("asdf", null, null);
         ssn.sync();

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Binary.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Binary.java?rev=704147&r1=704146&r2=704147&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Binary.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Binary.java Mon Oct 13 09:07:01 2008
@@ -20,6 +20,10 @@
  */
 package org.apache.qpid.transport;
 
+import java.nio.ByteBuffer;
+
+import static org.apache.qpid.transport.util.Functions.*;
+
 
 /**
  * Binary
@@ -51,6 +55,13 @@
         this(bytes, 0, bytes.length);
     }
 
+    public final byte[] getBytes()
+    {
+        byte[] result = new byte[size];
+        System.arraycopy(bytes, offset, result, 0, size);
+        return result;
+    }
+
     public final byte[] array()
     {
         return bytes;
@@ -126,4 +137,9 @@
         return true;
     }
 
+    public String toString()
+    {
+        return str(ByteBuffer.wrap(bytes, offset, size));
+    }
+
 }

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java?rev=704147&r1=704146&r2=704147&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java Mon Oct 13 09:07:01 2008
@@ -62,23 +62,20 @@
         this.password = password;
     }
 
-    public void init(Channel ch, ProtocolHeader hdr)
+    public void init(Connection conn, ProtocolHeader hdr)
     {
         if (!(hdr.getMajor() == 0 && hdr.getMinor() == 10))
         {
-            Connection conn = ch.getConnection();
             conn.exception(new ProtocolVersionException(hdr.getMajor(), hdr.getMinor()));
         }
-
     }
 
-    @Override public void connectionStart(Channel ch, ConnectionStart start)
+    @Override public void connectionStart(Connection conn, ConnectionStart start)
     {
-        Connection conn = ch.getConnection();
         List<Object> mechanisms = start.getMechanisms();
         if (mechanisms == null || mechanisms.isEmpty())
         {
-            ch.connectionStartOk
+            conn.connectionStartOk
                 (Collections.EMPTY_MAP, null, null, conn.getLocale());
             return;
         }
@@ -97,7 +94,7 @@
 
             byte[] response = sc.hasInitialResponse() ?
                 sc.evaluateChallenge(new byte[0]) : null;
-            ch.connectionStartOk
+            conn.connectionStartOk
                 (Collections.EMPTY_MAP, sc.getMechanismName(), response,
                  conn.getLocale());
         }
@@ -107,14 +104,13 @@
         }
     }
 
-    @Override public void connectionSecure(Channel ch, ConnectionSecure secure)
+    @Override public void connectionSecure(Connection conn, ConnectionSecure secure)
     {
-        Connection conn = ch.getConnection();
         SaslClient sc = conn.getSaslClient();
         try
         {
             byte[] response = sc.evaluateChallenge(secure.getChallenge());
-            ch.connectionSecureOk(response);
+            conn.connectionSecureOk(response);
         }
         catch (SaslException e)
         {
@@ -122,20 +118,19 @@
         }
     }
 
-    @Override public void connectionTune(Channel ch, ConnectionTune tune)
+    @Override public void connectionTune(Connection conn, ConnectionTune tune)
     {
-        Connection conn = ch.getConnection();
         conn.setChannelMax(tune.getChannelMax());
-        ch.connectionTuneOk(tune.getChannelMax(), tune.getMaxFrameSize(), tune.getHeartbeatMax());
-        ch.connectionOpen(vhost, null, Option.INSIST);
+        conn.connectionTuneOk(tune.getChannelMax(), tune.getMaxFrameSize(), tune.getHeartbeatMax());
+        conn.connectionOpen(vhost, null, Option.INSIST);
     }
 
-    @Override public void connectionOpenOk(Channel ch, ConnectionOpenOk ok)
+    @Override public void connectionOpenOk(Connection conn, ConnectionOpenOk ok)
     {
-        ch.getConnection().setState(OPEN);
+        conn.setState(OPEN);
     }
 
-    @Override public void connectionRedirect(Channel ch, ConnectionRedirect redir)
+    @Override public void connectionRedirect(Connection conn, ConnectionRedirect redir)
     {
         throw new UnsupportedOperationException();
     }

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java?rev=704147&r1=704146&r2=704147&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java Mon Oct 13 09:07:01 2008
@@ -24,6 +24,7 @@
 import org.apache.qpid.transport.network.io.IoTransport;
 import org.apache.qpid.transport.util.Logger;
 import org.apache.qpid.transport.util.Waiter;
+import org.apache.qpid.util.Strings;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -48,7 +49,7 @@
  * short instead of Short
  */
 
-public class Connection
+public class Connection extends ConnectionInvoker
     implements Receiver<ProtocolEvent>, Sender<ProtocolEvent>
 {
 
@@ -69,7 +70,8 @@
     private ConnectionDelegate delegate;
     private Sender<ProtocolEvent> sender;
 
-    final private Map<Integer,Channel> channels = new HashMap<Integer,Channel>();
+    final private Map<Binary,Session> sessions = new HashMap<Binary,Session>();
+    final private Map<Integer,Session> channels = new HashMap<Integer,Session>();
 
     private State state = NEW;
     private Object lock = new Object();
@@ -200,14 +202,45 @@
         return createSession(0);
     }
 
-    public Session createSession(long expiryInSeconds)
+    public Session createSession(long timeout)
     {
-        Channel ch = getChannel();
-        Session ssn = new Session(UUID.randomUUID().toString().getBytes());
-        ssn.attach(ch);
-        ssn.sessionAttach(ssn.getName());
-        ssn.sessionRequestTimeout(expiryInSeconds);
-        return ssn;
+        return createSession(UUID.randomUUID().toString(), timeout);
+    }
+
+    public Session createSession(String name)
+    {
+        return createSession(name, 0);
+    }
+
+    public Session createSession(String name, long timeout)
+    {
+        return createSession(Strings.toUTF8(name), timeout);
+    }
+
+    public Session createSession(byte[] name, long timeout)
+    {
+        return createSession(new Binary(name), timeout);
+    }
+
+    public Session createSession(Binary name, long timeout)
+    {
+        synchronized (lock)
+        {
+            Session ssn = new Session(this, name);
+            sessions.put(name, ssn);
+            map(ssn);
+            ssn.sessionAttach(name.getBytes());
+            ssn.sessionRequestTimeout(timeout);
+            return ssn;
+        }
+    }
+
+    void removeSession(Session ssn)
+    {
+        synchronized (lock)
+        {
+            sessions.remove(ssn.getName());
+        }
     }
 
     public void setConnectionId(int id)
@@ -228,8 +261,7 @@
     public void received(ProtocolEvent event)
     {
         log.debug("RECV: [%s] %s", this, event);
-        Channel channel = getChannel(event.getChannel());
-        channel.received(event);
+        event.delegate(this, delegate);
     }
 
     public void send(ProtocolEvent event)
@@ -249,6 +281,22 @@
         sender.flush();
     }
 
+    protected void invoke(Method method)
+    {
+        method.setChannel(0);
+        send(method);
+        if (!method.isBatch())
+        {
+            flush();
+        }
+    }
+
+    public void dispatch(Method method)
+    {
+        Session ssn = getSession(method.getChannel());
+        ssn.received(method);
+    }
+
     public int getChannelMax()
     {
         return channelMax;
@@ -259,7 +307,7 @@
         channelMax = max;
     }
 
-    public Channel getChannel()
+    private int map(Session ssn)
     {
         synchronized (lock)
         {
@@ -267,7 +315,8 @@
             {
                 if (!channels.containsKey(i))
                 {
-                    return getChannel(i);
+                    map(ssn, i);
+                    return i;
                 }
             }
 
@@ -275,25 +324,28 @@
         }
     }
 
-    public Channel getChannel(int number)
+    void map(Session ssn, int channel)
     {
         synchronized (lock)
         {
-            Channel channel = channels.get(number);
-            if (channel == null)
-            {
-                channel = new Channel(this, number, delegate.getSessionDelegate());
-                channels.put(number, channel);
-            }
-            return channel;
+            channels.put(channel, ssn);
+            ssn.setChannel(channel);
+        }
+    }
+
+    void unmap(Session ssn)
+    {
+        synchronized (lock)
+        {
+            channels.remove(ssn.getChannel());
         }
     }
 
-    void removeChannel(int number)
+    Session getSession(int channel)
     {
         synchronized (lock)
         {
-            channels.remove(number);
+            return channels.get(channel);
         }
     }
 
@@ -324,9 +376,9 @@
     {
         synchronized (lock)
         {
-            for (Channel ch : channels.values())
+            for (Session ssn : channels.values())
             {
-                ch.closeCode(close);
+                ssn.closeCode(close);
             }
             ConnectionCloseCode code = close.getReplyCode();
             if (code != ConnectionCloseCode.NORMAL)
@@ -347,10 +399,10 @@
 
         synchronized (lock)
         {
-            List<Channel> values = new ArrayList<Channel>(channels.values());
-            for (Channel ch : values)
+            List<Session> values = new ArrayList<Session>(channels.values());
+            for (Session ssn : values)
             {
-                ch.closed();
+                ssn.closed();
             }
 
             sender = null;
@@ -367,9 +419,8 @@
             switch (state)
             {
             case OPEN:
-                Channel ch = getChannel(0);
                 state = CLOSING;
-                ch.connectionClose(ConnectionCloseCode.NORMAL, null);
+                connectionClose(ConnectionCloseCode.NORMAL, null);
                 Waiter w = new Waiter(lock, timeout);
                 while (w.hasTime() && state == CLOSING && error == null)
                 {

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java?rev=704147&r1=704146&r2=704147&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java Mon Oct 13 09:07:01 2008
@@ -37,31 +37,64 @@
  *
  * the connectionClose is kind of different for both sides
  */
-public abstract class ConnectionDelegate extends MethodDelegate<Channel>
+public abstract class ConnectionDelegate
+    extends MethodDelegate<Connection>
+    implements ProtocolDelegate<Connection>
 {
 
     private static final Logger log = Logger.get(ConnectionDelegate.class);
 
-    public SessionDelegate getSessionDelegate()
+    public void control(Connection conn, Method method)
     {
-        return new SessionDelegate();
+        method.dispatch(conn, this);
     }
 
-    public abstract void init(Channel ch, ProtocolHeader hdr);
+    public void command(Connection conn, Method method)
+    {
+        method.dispatch(conn, this);
+    }
+
+    public void error(Connection conn, ProtocolError error)
+    {
+        conn.exception(new ConnectionException(error.getMessage()));
+    }
 
-    @Override public void connectionClose(Channel ch, ConnectionClose close)
+    public void handle(Connection conn, Method method)
     {
-        Connection conn = ch.getConnection();
-        ch.connectionCloseOk();
+        conn.dispatch(method);
+    }
+
+    @Override public void connectionClose(Connection conn, ConnectionClose close)
+    {
+        conn.connectionCloseOk();
         conn.getSender().close();
         conn.closeCode(close);
         conn.setState(CLOSE_RCVD);
     }
 
-    @Override public void connectionCloseOk(Channel ch, ConnectionCloseOk ok)
+    @Override public void connectionCloseOk(Connection conn, ConnectionCloseOk ok)
     {
-        Connection conn = ch.getConnection();
         conn.getSender().close();
     }
 
+    @Override public void sessionAttached(Connection conn, SessionAttached atc)
+    {
+        
+    }
+
+    @Override public void sessionDetach(Connection conn, SessionDetach dtc)
+    {
+        Session ssn = conn.getSession(dtc.getChannel());
+        conn.unmap(ssn);
+        ssn.sessionDetached(dtc.getName(), SessionDetachCode.NORMAL);
+        ssn.closed();
+    }
+
+    @Override public void sessionDetached(Connection conn, SessionDetached dtc)
+    {
+        Session ssn = conn.getSession(dtc.getChannel());
+        conn.unmap(ssn);
+        ssn.closed();
+    }
+
 }

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java?rev=704147&r1=704146&r2=704147&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java Mon Oct 13 09:07:01 2008
@@ -32,22 +32,33 @@
  *
  */
 
-public class Echo extends SessionDelegate
+public class Echo implements SessionListener
 {
 
-    public void messageTransfer(Session ssn, MessageTransfer xfr)
+    public void opened(Session ssn) {}
+
+    public void message(Session ssn, MessageTransfer xfr)
     {
         ssn.invoke(xfr);
         ssn.processed(xfr);
     }
 
+    public void exception(Session ssn, SessionException exc)
+    {
+        exc.printStackTrace();
+    }
+
+    public void closed(Session ssn) {}
+
     public static final void main(String[] args) throws IOException
     {
         ConnectionDelegate delegate = new ServerDelegate()
         {
-            public SessionDelegate getSessionDelegate()
+            @Override public Session getSession(Connection conn, SessionAttach atc)
             {
-                return new Echo();
+                Session ssn = super.getSession(conn, atc);
+                ssn.setSessionListener(new Echo());
+                return ssn;
             }
         };
 

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java?rev=704147&r1=704146&r2=704147&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java Mon Oct 13 09:07:01 2008
@@ -55,24 +55,22 @@
 
     private SaslServer saslServer;
 
-    public void init(Channel ch, ProtocolHeader hdr)
+    public void init(Connection conn, ProtocolHeader hdr)
     {
-        Connection conn = ch.getConnection();
         conn.send(new ProtocolHeader(1, 0, 10));
         List<Object> utf8 = new ArrayList<Object>();
         utf8.add("utf8");
-        ch.connectionStart(null, Collections.EMPTY_LIST, utf8);
+        conn.connectionStart(null, Collections.EMPTY_LIST, utf8);
     }
 
-    @Override public void connectionStartOk(Channel ch, ConnectionStartOk ok)
+    @Override public void connectionStartOk(Connection conn, ConnectionStartOk ok)
     {
-        Connection conn = ch.getConnection();
         conn.setLocale(ok.getLocale());
         String mechanism = ok.getMechanism();
 
         if (mechanism == null || mechanism.length() == 0)
         {
-            ch.connectionTune
+            conn.connectionTune
                 (Integer.MAX_VALUE,
                  org.apache.qpid.transport.network.ConnectionBinding.MAX_FRAME_SIZE,
                  0, Integer.MAX_VALUE);
@@ -85,13 +83,13 @@
                 (mechanism, "AMQP", "localhost", null, null);
             if (ss == null)
             {
-                ch.connectionClose
+                conn.connectionClose
                     (ConnectionCloseCode.CONNECTION_FORCED,
                      "null SASL mechanism: " + mechanism);
                 return;
             }
             conn.setSaslServer(ss);
-            secure(ch, ok.getResponse());
+            secure(conn, ok.getResponse());
         }
         catch (SaslException e)
         {
@@ -99,9 +97,8 @@
         }
     }
 
-    private void secure(Channel ch, byte[] response)
+    private void secure(Connection conn, byte[] response)
     {
-        Connection conn = ch.getConnection();
         SaslServer ss = conn.getSaslServer();
         try
         {
@@ -109,14 +106,14 @@
             if (ss.isComplete())
             {
                 ss.dispose();
-                ch.connectionTune
+                conn.connectionTune
                     (Integer.MAX_VALUE,
                      org.apache.qpid.transport.network.ConnectionBinding.MAX_FRAME_SIZE,
                      0, Integer.MAX_VALUE);
             }
             else
             {
-                ch.connectionSecure(challenge);
+                conn.connectionSecure(challenge);
             }
         }
         catch (SaslException e)
@@ -125,21 +122,32 @@
         }
     }
 
-    @Override public void connectionSecureOk(Channel ch, ConnectionSecureOk ok)
+    @Override public void connectionSecureOk(Connection conn, ConnectionSecureOk ok)
     {
-        secure(ch, ok.getResponse());
+        secure(conn, ok.getResponse());
     }
 
-    @Override public void connectionTuneOk(Channel ch, ConnectionTuneOk ok)
+    @Override public void connectionTuneOk(Connection conn, ConnectionTuneOk ok)
     {
         
     }
 
-    @Override public void connectionOpen(Channel ch, ConnectionOpen open)
+    @Override public void connectionOpen(Connection conn, ConnectionOpen open)
     {
-        Connection conn = ch.getConnection();
-        ch.connectionOpenOk(Collections.EMPTY_LIST);
+        conn.connectionOpenOk(Collections.EMPTY_LIST);
         conn.setState(OPEN);
     }
 
+    public Session getSession(Connection conn, SessionAttach atc)
+    {
+        return new Session(conn, new Binary(atc.getName()));
+    }
+
+    @Override public void sessionAttach(Connection conn, SessionAttach atc)
+    {
+        Session ssn = getSession(conn, atc);
+        conn.map(ssn, atc.getChannel());
+        ssn.sessionAttached(atc.getName());
+    }
+
 }

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java?rev=704147&r1=704146&r2=704147&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java Mon Oct 13 09:07:01 2008
@@ -44,7 +44,7 @@
  * @author Rafael H. Schloming
  */
 
-public class Session extends Invoker
+public class Session extends SessionInvoker
 {
 
     private static final Logger log = Logger.get(Session.class);
@@ -84,14 +84,14 @@
         }
     }
 
-    private byte[] name;
+    private Connection connection;
+    private Binary name;
+    private int channel;
+    private SessionDelegate delegate = new SessionDelegate();
     private SessionListener listener = new DefaultSessionListener();
     private long timeout = 60000;
     private boolean autoSync = false;
 
-    // channel may be null
-    Channel channel;
-
     // incoming command count
     int commandsIn = 0;
     // completed incoming commands
@@ -108,16 +108,27 @@
 
     private AtomicBoolean closed = new AtomicBoolean(false);
 
-    public Session(byte[] name)
+    Session(Connection connection, Binary name)
     {
+        this.connection = connection;
         this.name = name;
     }
 
-    public byte[] getName()
+    public Binary getName()
     {
         return name;
     }
 
+    int getChannel()
+    {
+        return channel;
+    }
+
+    void setChannel(int channel)
+    {
+        this.channel = channel;
+    }
+
     public void setSessionListener(SessionListener listener)
     {
         if (listener == null)
@@ -212,8 +223,12 @@
             {
                 maxProcessed = max(maxProcessed, upper);
             }
-            flush = lt(old, syncPoint) && ge(maxProcessed, syncPoint);
-            syncPoint = maxProcessed;
+            boolean synced = ge(maxProcessed, syncPoint);
+            flush = lt(old, syncPoint) && synced;
+            if (synced)
+            {
+                syncPoint = maxProcessed;
+            }
         }
         if (flush)
         {
@@ -266,12 +281,6 @@
         }
     }
 
-    public void attach(Channel channel)
-    {
-        this.channel = channel;
-        channel.setSession(this);
-    }
-
     public Method getCommand(int id)
     {
         synchronized (commands)
@@ -304,6 +313,22 @@
         }
     }
 
+    void received(Method m)
+    {
+        m.delegate(this, delegate);
+    }
+
+    private void send(Method m)
+    {
+        m.setChannel(channel);
+        connection.send(m);
+
+        if (!m.isBatch())
+        {
+            connection.flush();
+        }
+    }
+
     public void invoke(Method m)
     {
         if (closed.get())
@@ -342,7 +367,7 @@
                     m.setSync(true);
                 }
                 needSync = !m.isSync();
-                channel.method(m);
+                send(m);
                 if (autoSync)
                 {
                     sync();
@@ -358,7 +383,7 @@
         }
         else
         {
-            channel.method(m);
+            send(m);
         }
     }
 
@@ -564,7 +589,7 @@
     public void close()
     {
         sessionRequestTimeout(0);
-        sessionDetach(name);
+        sessionDetach(name.getBytes());
         synchronized (commands)
         {
             long start = System.currentTimeMillis();
@@ -576,12 +601,19 @@
                     commands.wait(timeout - elapsed);
                     elapsed = System.currentTimeMillis() - start;
                 }
+
+                if (!closed.get())
+                {
+                    throw new SessionException("close() timed out");
+                }
             }
             catch (InterruptedException e)
             {
                 throw new RuntimeException(e);
             }
         }
+
+        connection.removeSession(this);
     }
 
     public void exception(Throwable t)
@@ -606,13 +638,11 @@
                 }
             }
         }
-        channel.setSession(null);
-        channel = null;
     }
 
     public String toString()
     {
-        return String.format("ssn:%s", str(name));
+        return String.format("ssn:%s", name);
     }
 
 }

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionClosedException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionClosedException.java?rev=704147&r1=704146&r2=704147&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionClosedException.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionClosedException.java Mon Oct 13 09:07:01 2008
@@ -38,7 +38,7 @@
 
     public SessionClosedException(Throwable cause)
     {
-        super(null, cause);
+        super("session closed", null, cause);
     }
 
     @Override public void rethrow()

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java?rev=704147&r1=704146&r2=704147&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java Mon Oct 13 09:07:01 2008
@@ -20,7 +20,7 @@
  */
 package org.apache.qpid.transport;
 
-import org.apache.qpid.transport.network.Frame;
+import org.apache.qpid.transport.util.Logger;
 
 
 /**
@@ -33,6 +33,8 @@
     extends MethodDelegate<Session>
     implements ProtocolDelegate<Session>
 {
+    private static final Logger log = Logger.get(SessionDelegate.class);
+
     public void init(Session ssn, ProtocolHeader hdr) { }
 
     public void control(Session ssn, Method method) {
@@ -50,15 +52,12 @@
 
     public void error(Session ssn, ProtocolError error) { }
 
-    @Override public void executionResult(Session ssn, ExecutionResult result)
+    public void handle(Session ssn, Method method)
     {
-        ssn.result(result.getCommandId(), result.getValue());
+        log.warn("UNHANDLED: [%s] %s", ssn, method);
     }
 
-    @Override public void executionException(Session ssn, ExecutionException exc)
-    {
-        ssn.setException(exc);
-    }
+    @Override public void sessionTimeout(Session ssn, SessionTimeout t) {}
 
     @Override public void sessionCompleted(Session ssn, SessionCompleted cmp)
     {
@@ -122,6 +121,16 @@
         ssn.syncPoint();
     }
 
+    @Override public void executionResult(Session ssn, ExecutionResult result)
+    {
+        ssn.result(result.getCommandId(), result.getValue());
+    }
+
+    @Override public void executionException(Session ssn, ExecutionException exc)
+    {
+        ssn.setException(exc);
+    }
+
     @Override public void messageTransfer(Session ssn, MessageTransfer xfr)
     {
         ssn.getSessionListener().message(ssn, xfr);

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionException.java?rev=704147&r1=704146&r2=704147&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionException.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionException.java Mon Oct 13 09:07:01 2008
@@ -32,15 +32,20 @@
 
     private ExecutionException exception;
 
-    public SessionException(ExecutionException exception, Throwable cause)
+    public SessionException(String message, ExecutionException exception, Throwable cause)
     {
-        super(String.valueOf(exception), cause);
+        super(message, cause);
         this.exception = exception;
     }
 
     public SessionException(ExecutionException exception)
     {
-        this(exception, null);
+        this(String.valueOf(exception), exception, null);
+    }
+
+    public SessionException(String message)
+    {
+        this(message, null, null);
     }
 
     public ExecutionException getException()
@@ -50,7 +55,7 @@
 
     @Override public void rethrow()
     {
-        throw new SessionException(exception, this);
+        throw new SessionException(getMessage(), exception, this);
     }
 
 }

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Sink.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Sink.java?rev=704147&r1=704146&r2=704147&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Sink.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Sink.java Mon Oct 13 09:07:01 2008
@@ -31,7 +31,7 @@
  *
  */
 
-public class Sink extends SessionDelegate
+public class Sink implements SessionListener
 {
 
     private static final String FORMAT_HDR = "%-12s %-18s %-18s %-18s";
@@ -85,7 +85,9 @@
         return String.format("%d/%.2f", count, ((double) bytes)/(1024*1024));
     }
 
-    public void messageTransfer(Session ssn, MessageTransfer xfr)
+    public void opened(Session ssn) {}
+
+    public void message(Session ssn, MessageTransfer xfr)
     {
         count++;
         bytes += xfr.getBody().remaining();
@@ -101,14 +103,22 @@
         ssn.processed(xfr);
     }
 
+    public void exception(Session ssn, SessionException exc)
+    {
+        exc.printStackTrace();
+    }
+
+    public void closed(Session ssn) {}
+
     public static final void main(String[] args) throws IOException
     {
         ConnectionDelegate delegate = new ServerDelegate()
         {
-
-            public SessionDelegate getSessionDelegate()
+            @Override public Session getSession(Connection conn, SessionAttach atc)
             {
-                return new Sink();
+                Session ssn = super.getSession(conn, atc);
+                ssn.setSessionListener(new Sink());
+                return ssn;
             }
         };
 

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java?rev=704147&r1=704146&r2=704147&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java Mon Oct 13 09:07:01 2008
@@ -181,6 +181,11 @@
     {
         if (!closed.getAndSet(true))
         {
+            synchronized (notFull)
+            {
+                notFull.notify();
+            }
+
             synchronized (notEmpty)
             {
                 notEmpty.notify();

Modified: incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java?rev=704147&r1=704146&r2=704147&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java Mon Oct 13 09:07:01 2008
@@ -51,10 +51,10 @@
         port = AvailablePortFinder.getNextAvailable(12000);
 
         ConnectionDelegate server = new ServerDelegate() {
-            @Override public void connectionOpen(Channel ch, ConnectionOpen open)
+            @Override public void connectionOpen(Connection conn, ConnectionOpen open)
             {
-                super.connectionOpen(ch, open);
-                ch.getConnection().close();
+                super.connectionOpen(conn, open);
+                conn.close();
             }
         };
 
@@ -94,13 +94,9 @@
             fail("never got notified of connection close");
         }
 
-        Channel ch = conn.getChannel(0);
-        Session ssn = new Session("test".getBytes());
-        ssn.attach(ch);
-
         try
         {
-            ssn.sessionAttach(ssn.getName());
+            conn.connectionCloseOk();
             fail("writing to a closed socket succeeded");
         }
         catch (TransportException e)