You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2017/02/17 08:08:51 UTC

svn commit: r1783342 [1/2] - /qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/

Author: kwall
Date: Fri Feb 17 08:08:51 2017
New Revision: 1783342

URL: http://svn.apache.org/viewvc?rev=1783342&view=rev
Log:
QPID-7622: [Java Broker] [0-10] Collasped ServerConnection, SessionSession etc using inline refactoring

Methods in the base classes that were overridden and invoked using super.method() are renamed methodSuper()

Removed:
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Connection.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConnectionDelegate.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConnectionListener.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/SessionDelegate.java
Modified:
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/SessionListener.java

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java?rev=1783342&r1=1783341&r2=1783342&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java Fri Feb 17 08:08:51 2017
@@ -86,7 +86,7 @@ public class AMQPConnection_0_10Impl ext
         _connection = new ServerConnection(id, broker, port, transport, this);
 
         SubjectCreator subjectCreator = port.getAuthenticationProvider().getSubjectCreator(transport.isSecure());
-        ConnectionDelegate connDelegate = new ServerConnectionDelegate(broker, subjectCreator);
+        ServerConnectionDelegate connDelegate = new ServerConnectionDelegate(broker, subjectCreator);
 
         _connection.setConnectionDelegate(connDelegate);
         _connection.setRemoteAddress(network.getRemoteAddress());

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java?rev=1783342&r1=1783341&r2=1783342&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java Fri Feb 17 08:08:51 2017
@@ -20,23 +20,36 @@
  */
 package org.apache.qpid.server.protocol.v0_10;
 
-import static org.apache.qpid.server.protocol.v0_10.Connection.State.CLOSING;
+import static org.apache.qpid.server.protocol.v0_10.ServerConnection.State.CLOSED;
+import static org.apache.qpid.server.protocol.v0_10.ServerConnection.State.CLOSING;
+import static org.apache.qpid.server.protocol.v0_10.ServerConnection.State.NEW;
+import static org.apache.qpid.server.protocol.v0_10.ServerConnection.State.OPEN;
 
+import java.net.SocketAddress;
 import java.security.AccessControlContext;
 import java.security.AccessController;
 import java.security.Principal;
 import java.security.PrivilegedAction;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Queue;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 import javax.security.auth.Subject;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.NamedAddressSpace;
@@ -45,26 +58,25 @@ import org.apache.qpid.server.model.port
 import org.apache.qpid.server.protocol.ConnectionClosingTicker;
 import org.apache.qpid.server.protocol.ErrorCodes;
 import org.apache.qpid.server.session.AMQPSession;
-import org.apache.qpid.server.transport.AMQPConnection;
-import org.apache.qpid.server.transport.ConnectionClose;
-import org.apache.qpid.server.transport.ConnectionCloseCode;
-import org.apache.qpid.server.transport.ConnectionCloseOk;
-import org.apache.qpid.server.transport.ExecutionErrorCode;
-import org.apache.qpid.server.transport.ExecutionException;
-import org.apache.qpid.server.transport.Method;
-import org.apache.qpid.server.transport.ProtocolEvent;
-import org.apache.qpid.server.transport.ServerNetworkConnection;
+import org.apache.qpid.server.transport.*;
+import org.apache.qpid.server.transport.network.NetworkConnection;
+import org.apache.qpid.server.transport.util.Waiter;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.ServerScopedRuntimeException;
 
-public class ServerConnection extends Connection
+public class ServerConnection extends ConnectionInvoker implements ProtocolEventReceiver, ProtocolEventSender
 {
+    private static final Logger LOGGER = LoggerFactory.getLogger(ServerConnection.class);
     private final Broker<?> _broker;
 
     private final long _connectionId;
     private final Object _reference = new Object();
     private final AmqpPort<?> _port;
     private final AtomicLong _lastIoTime = new AtomicLong();
+    final private Map<Binary,ServerSession> sessions = new HashMap<Binary,ServerSession>();
+    final private Map<Integer,ServerSession> channels = new ConcurrentHashMap<Integer,ServerSession>();
+    final private Object lock = new Object();
+    private final AtomicBoolean connectionLost = new AtomicBoolean(false);
     private boolean _blocking;
     private final Transport _transport;
 
@@ -74,6 +86,17 @@ public class ServerConnection extends Co
     private final AMQPConnection_0_10 _amqpConnection;
     private boolean _ignoreFutureInput;
     private boolean _ignoreAllButConnectionCloseOk;
+    private NetworkConnection _networkConnection;
+    private FrameSizeObserver _frameSizeObserver;
+    private ServerConnectionDelegate delegate;
+    private ProtocolEventSender sender;
+    private State state = NEW;
+    private long timeout = 60000;  // TODO server side close does not require this
+    private ConnectionException error = null;
+    private int channelMax = 1;
+    private String locale;
+    private SocketAddress _remoteAddress;
+    private SocketAddress _localAddress;
 
     public ServerConnection(final long connectionId,
                             Broker<?> broker,
@@ -102,22 +125,32 @@ public class ServerConnection extends Co
     @Override
     protected void invoke(Method method)
     {
-        super.invoke(method);
+        invokeSuper(method);
         if (method instanceof ConnectionClose)
         {
             _ignoreAllButConnectionCloseOk = true;
         }
     }
 
+    private void invokeSuper(Method method)
+    {
+        method.setChannel(0);
+        send(method);
+        if (!method.isBatch())
+        {
+            flush();
+        }
+    }
+
+
     EventLogger getEventLogger()
     {
         return _amqpConnection.getEventLogger();
     }
 
-    @Override
     protected void setState(State state)
     {
-        super.setState(state);
+        setStateSuper(state);
 
         if(state == State.CLOSING)
         {
@@ -130,10 +163,24 @@ public class ServerConnection extends Co
         }
     }
 
-    @Override
+    private void setStateSuper(State state)
+    {
+        synchronized (lock)
+        {
+            this.state = state;
+            lock.notifyAll();
+        }
+    }
+
+
     public ServerConnectionDelegate getConnectionDelegate()
     {
-        return (ServerConnectionDelegate) super.getConnectionDelegate();
+        return (ServerConnectionDelegate) getConnectionDelegateSuper();
+    }
+
+    private ServerConnectionDelegate getConnectionDelegateSuper()
+    {
+        return delegate;
     }
 
     public AMQPConnection_0_10 getAmqpConnection()
@@ -209,7 +256,7 @@ public class ServerConnection extends Co
     {
         try
         {
-            super.exception(t);
+            exceptionSuper(t);
         }
         finally
         {
@@ -224,6 +271,12 @@ public class ServerConnection extends Co
         }
     }
 
+    private void exceptionSuper(Throwable t)
+    {
+        exception(new ConnectionException(t));
+    }
+
+
     @Override
     public void received(final ProtocolEvent event)
     {
@@ -253,13 +306,23 @@ public class ServerConnection extends Co
                 @Override
                 public Void run()
                 {
-                    ServerConnection.super.received(event);
+                    receivedSuper(event);
                     return null;
                 }
             }, context);
         }
     }
 
+    private void receivedSuper(ProtocolEvent event)
+    {
+        if(LOGGER.isDebugEnabled())
+        {
+            LOGGER.debug("RECV: [{}] {}", this, String.valueOf(event));
+        }
+        event.delegate(this, delegate);
+    }
+
+
     void sendConnectionCloseAsync(final ConnectionCloseCode replyCode, final String message)
     {
         addAsyncTask(new Action<ServerConnection>()
@@ -307,27 +370,39 @@ public class ServerConnection extends Co
         }
     }
 
-    @Override
-    public synchronized void registerSession(final Session ssn)
+    public synchronized void registerSession(final ServerSession ssn)
     {
-        super.registerSession(ssn);
+        registerSessionSuper(ssn);
         if(_blocking)
         {
             ((ServerSession)ssn).block();
         }
     }
 
+    private void registerSessionSuper(ServerSession ssn)
+    {
+        synchronized (lock)
+        {
+            sessions.put(ssn.getName(),ssn);
+        }
+    }
+
     public Collection<? extends ServerSession> getSessionModels()
     {
         return Collections.unmodifiableCollection(getChannels());
     }
 
-    @Override
     protected Collection<ServerSession> getChannels()
     {
-        return  (Collection<ServerSession>) super.getChannels();
+        return  (Collection<ServerSession>) getChannelsSuper();
+    }
+
+    private Collection<ServerSession> getChannelsSuper()
+    {
+        return new ArrayList<>(channels.values());
     }
 
+
     public void setAuthorizedSubject(final Subject authorizedSubject)
     {
         _amqpConnection.setSubject(authorizedSubject);
@@ -349,7 +424,7 @@ public class ServerConnection extends Co
         try
         {
             performDeleteTasks();
-            super.closed();
+            closedSuper();
         }
         finally
         {
@@ -362,9 +437,39 @@ public class ServerConnection extends Co
 
     }
 
+    private void closedSuper()
+    {
+        if (state == OPEN)
+        {
+            exception(new ConnectionException("connection aborted"));
+        }
+
+        LOGGER.debug("connection closed: {}", this);
+
+        synchronized (lock)
+        {
+            List<ServerSession> values = new ArrayList<ServerSession>(channels.values());
+            for (ServerSession ssn : values)
+            {
+                ssn.closed();
+            }
+
+            try
+            {
+                sender.close();
+            }
+            catch(Exception e)
+            {
+                // ignore.
+            }
+            sender = null;
+            setState(CLOSED);
+        }
+    }
+
     private void markAllSessionsClosed()
     {
-        for (Session ssn :  getChannels())
+        for (ServerSession ssn :  getChannels())
         {
             final ServerSession session = (ServerSession) ssn;
             ((ServerSession) ssn).setClose(true);
@@ -374,7 +479,7 @@ public class ServerConnection extends Co
 
     public void receivedComplete()
     {
-        for (Session ssn : getChannels())
+        for (ServerSession ssn : getChannels())
         {
             ((ServerSession)ssn).receivedComplete();
         }
@@ -384,9 +489,24 @@ public class ServerConnection extends Co
     public void send(ProtocolEvent event)
     {
         _lastIoTime.set(System.currentTimeMillis());
-        super.send(event);
+        sendSuper(event);
     }
 
+    private void sendSuper(ProtocolEvent event)
+    {
+        if(LOGGER.isDebugEnabled())
+        {
+            LOGGER.debug("SEND: [{}] {}", this, String.valueOf(event));
+        }
+        ProtocolEventSender s = sender;
+        if (s == null)
+        {
+            throw new ConnectionException("connection closed");
+        }
+        s.send(event);
+    }
+
+
     public String getRemoteContainerName()
     {
         return getConnectionDelegate().getClientId();
@@ -428,6 +548,355 @@ public class ServerConnection extends Co
         return new ProcessPendingIterator(sessionsWithWork);
     }
 
+    public void setConnectionDelegate(ServerConnectionDelegate delegate)
+    {
+        this.delegate = delegate;
+    }
+
+    public ProtocolEventSender getSender()
+    {
+        return sender;
+    }
+
+    public void setSender(ProtocolEventSender sender)
+    {
+        this.sender = sender;
+    }
+
+    protected void setLocale(String locale)
+    {
+        this.locale = locale;
+    }
+
+    String getLocale()
+    {
+        return locale;
+    }
+
+    public void removeSession(ServerSession ssn)
+    {
+        synchronized (lock)
+        {
+            sessions.remove(ssn.getName());
+        }
+    }
+
+    @Override
+    public void flush()
+    {
+        if(LOGGER.isDebugEnabled())
+        {
+            LOGGER.debug("FLUSH: [{}]", this);
+        }
+        final ProtocolEventSender theSender = sender;
+        if(theSender != null)
+        {
+            theSender.flush();
+        }
+    }
+
+    public void dispatch(Method method)
+    {
+        int channel = method.getChannel();
+        ServerSession ssn = getSession(channel);
+        if(ssn != null)
+        {
+            ssn.received(method);
+        }
+        else
+        {
+            /*
+             * A peer receiving any other control on a detached transport MUST discard it and
+             * send a session.detached with the "not-attached" reason code.
+             */
+            if(LOGGER.isDebugEnabled())
+            {
+                LOGGER.debug("Control received on unattached channel : {}", channel);
+            }
+            invokeSessionDetached(channel, SessionDetachCode.NOT_ATTACHED);
+        }
+    }
+
+    public int getChannelMax()
+    {
+        return channelMax;
+    }
+
+    protected void setChannelMax(int max)
+    {
+        channelMax = max;
+    }
+
+    private int map(ServerSession ssn)
+    {
+        synchronized (lock)
+        {
+            //For a negotiated channelMax N, there are channels 0 to N-1 available.
+            for (int i = 0; i < getChannelMax(); i++)
+            {
+                if (!channels.containsKey(i))
+                {
+                    map(ssn, i);
+                    return i;
+                }
+            }
+
+            throw new RuntimeException("no more channels available");
+        }
+    }
+
+    protected void map(ServerSession ssn, int channel)
+    {
+        synchronized (lock)
+        {
+            channels.put(channel, ssn);
+            ssn.setChannel(channel);
+        }
+    }
+
+    void unmap(ServerSession ssn)
+    {
+        synchronized (lock)
+        {
+            channels.remove(ssn.getChannel());
+        }
+    }
+
+    public ServerSession getSession(int channel)
+    {
+        synchronized (lock)
+        {
+            return channels.get(channel);
+        }
+    }
+
+    public void resume()
+    {
+        synchronized (lock)
+        {
+            for (ServerSession ssn : sessions.values())
+            {
+                map(ssn);
+                ssn.resume();
+            }
+
+            setState(OPEN);
+        }
+    }
+
+    public void exception(ConnectionException e)
+    {
+        connectionLost.set(true);
+        synchronized (lock)
+        {
+            switch (state)
+            {
+            case OPENING:
+            case CLOSING:
+                error = e;
+                lock.notifyAll();
+                return;
+            }
+        }
+    }
+
+    public void closeCode(ConnectionClose close)
+    {
+        synchronized (lock)
+        {
+            ConnectionCloseCode code = close.getReplyCode();
+            if (code != ConnectionCloseCode.NORMAL)
+            {
+                exception(new ConnectionException(close));
+            }
+        }
+    }
+
+    @Override
+    public void close()
+    {
+        close(ConnectionCloseCode.NORMAL, null);
+    }
+
+    protected void sendConnectionClose(ConnectionCloseCode replyCode, String replyText, Option... _options)
+    {
+        connectionClose(replyCode, replyText, _options);
+    }
+
+    public void close(ConnectionCloseCode replyCode, String replyText, Option ... _options)
+    {
+        synchronized (lock)
+        {
+            switch (state)
+            {
+            case OPEN:
+                state = CLOSING;
+                connectionClose(replyCode, replyText, _options);
+                Waiter w = new Waiter(lock, timeout);
+                while (w.hasTime() && state == CLOSING && error == null)
+                {
+                    w.await();
+                }
+
+                if (error != null)
+                {
+                    close(replyCode, replyText, _options);
+                    throw new ConnectionException(error);
+                }
+
+                switch (state)
+                {
+                case CLOSING:
+                    close(replyCode, replyText, _options);
+                    throw new ConnectionException("close() timed out");
+                case CLOSED:
+                    break;
+                default:
+                    throw new IllegalStateException(String.valueOf(state));
+                }
+                break;
+            case CLOSED:
+                break;
+            default:
+                if (sender != null)
+                {
+                    sender.close();
+                    w = new Waiter(lock, timeout);
+                    while (w.hasTime() && sender != null && error == null)
+                    {
+                        w.await();
+                    }
+
+                    if (error != null)
+                    {
+                        throw new ConnectionException(error);
+                    }
+
+                    if (sender != null)
+                    {
+                        throw new ConnectionException("close() timed out");
+                    }
+                }
+                break;
+            }
+        }
+    }
+
+    @Override
+    public String toString()
+    {
+        return String.format("conn:%x", System.identityHashCode(this));
+    }
+
+    protected boolean isConnectionLost()
+    {
+        return connectionLost.get();
+    }
+
+    public boolean hasSessionWithName(final byte[] name)
+    {
+        return sessions.containsKey(new Binary(name));
+    }
+
+    public SocketAddress getRemoteSocketAddress()
+    {
+        return _remoteAddress;
+    }
+
+    public SocketAddress getLocalAddress()
+    {
+        return _localAddress;
+    }
+
+    protected void setRemoteAddress(SocketAddress remoteAddress)
+    {
+        _remoteAddress = remoteAddress;
+    }
+
+    protected void setLocalAddress(SocketAddress localAddress)
+    {
+        _localAddress = localAddress;
+    }
+
+    private void invokeSessionDetached(int channel, SessionDetachCode sessionDetachCode)
+    {
+        SessionDetached sessionDetached = new SessionDetached();
+        sessionDetached.setChannel(channel);
+        sessionDetached.setCode(sessionDetachCode);
+        invoke(sessionDetached);
+    }
+
+    protected void doHeartBeat()
+    {
+        connectionHeartbeat();
+    }
+
+    public void setNetworkConnection(NetworkConnection network)
+    {
+        _networkConnection = network;
+    }
+
+    public NetworkConnection getNetworkConnection()
+    {
+        return _networkConnection;
+    }
+
+    public void setMaxFrameSize(final int maxFrameSize)
+    {
+        if(_frameSizeObserver != null)
+        {
+            _frameSizeObserver.setMaxFrameSize(maxFrameSize);
+        }
+    }
+
+    public void addFrameSizeObserver(final FrameSizeObserver frameSizeObserver)
+    {
+        if(_frameSizeObserver == null)
+        {
+            _frameSizeObserver = frameSizeObserver;
+        }
+        else
+        {
+            final FrameSizeObserver currentObserver = _frameSizeObserver;
+            _frameSizeObserver = new FrameSizeObserver()
+                                    {
+                                        @Override
+                                        public void setMaxFrameSize(final int frameSize)
+                                        {
+                                            currentObserver.setMaxFrameSize(frameSize);
+                                            frameSizeObserver.setMaxFrameSize(frameSize);
+                                        }
+                                    };
+        }
+    }
+
+    public boolean isClosing()
+    {
+        synchronized (lock)
+        {
+            return state == CLOSING || state == CLOSED;
+        }
+    }
+
+    protected void sendConnectionSecure(byte[] challenge, Option ... options)
+    {
+        super.connectionSecure(challenge, options);
+    }
+
+    protected void sendConnectionTune(int channelMax, int maxFrameSize, int heartbeatMin, int heartbeatMax, Option ... options)
+    {
+        super.connectionTune(channelMax, maxFrameSize, heartbeatMin, heartbeatMax, options);
+    }
+
+    protected void sendConnectionStart(final Map<String, Object> clientProperties,
+                                       final List<Object> mechanisms,
+                                       final List<Object> locales, final Option... options)
+    {
+        super.connectionStart(clientProperties, mechanisms, locales, options);
+    }
+
+    public enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD, RESUMING }
+
     private class ProcessPendingIterator implements Iterator<Runnable>
     {
         private final Collection<AMQPSession<?,?>> _sessionsWithPending;

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java?rev=1783342&r1=1783341&r2=1783342&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java Fri Feb 17 08:08:51 2017
@@ -20,7 +20,7 @@
  */
 package org.apache.qpid.server.protocol.v0_10;
 
-import static org.apache.qpid.server.protocol.v0_10.Connection.State.CLOSE_RCVD;
+import static org.apache.qpid.server.protocol.v0_10.ServerConnection.State.CLOSE_RCVD;
 
 import java.security.AccessControlException;
 import java.security.Principal;
@@ -46,26 +46,18 @@ import org.apache.qpid.server.security.a
 import org.apache.qpid.server.security.auth.SubjectAuthenticationResult;
 import org.apache.qpid.server.security.auth.sasl.SaslNegotiator;
 import org.apache.qpid.server.security.auth.sasl.SaslSettings;
-import org.apache.qpid.server.transport.AMQPConnection;
+import org.apache.qpid.server.transport.*;
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
-import org.apache.qpid.server.transport.Binary;
-import org.apache.qpid.server.transport.ConnectionClose;
-import org.apache.qpid.server.transport.ConnectionCloseCode;
-import org.apache.qpid.server.transport.ConnectionOpen;
-import org.apache.qpid.server.transport.ConnectionOpenOk;
-import org.apache.qpid.server.transport.ConnectionRedirect;
-import org.apache.qpid.server.transport.ConnectionSecureOk;
-import org.apache.qpid.server.transport.ConnectionStartOk;
-import org.apache.qpid.server.transport.ConnectionTuneOk;
-import org.apache.qpid.server.transport.Constant;
-import org.apache.qpid.server.transport.ProtocolHeader;
-import org.apache.qpid.server.transport.SessionAttach;
-import org.apache.qpid.server.transport.SessionDetach;
-import org.apache.qpid.server.transport.SessionDetachCode;
-import org.apache.qpid.server.transport.SessionDetached;
 
-public class ServerConnectionDelegate extends ConnectionDelegate
+/*
+
+Method ConnectionDelegate.connectionClose(ServerConnection, ConnectionClose) is already overridden in class org.apache.qpid.server.protocol.v0_10.ServerConnectionDelegate. Method will not be pushed down to that class.
+
+
+
+ */
+public class ServerConnectionDelegate extends MethodDelegate<ServerConnection> implements ProtocolDelegate<ServerConnection>
 {
     private static final Logger LOGGER = LoggerFactory.getLogger(ServerConnectionDelegate.class);
 
@@ -81,6 +73,58 @@ public class ServerConnectionDelegate ex
     private boolean _compressionSupported;
     private volatile SaslNegotiator _saslNegotiator;
 
+    public void control(ServerConnection conn, Method method)
+    {
+        method.dispatch(conn, this);
+    }
+
+    public void command(ServerConnection conn, Method method)
+    {
+        method.dispatch(conn, this);
+    }
+
+    public void error(ServerConnection conn, ProtocolError error)
+    {
+        conn.exception(new ConnectionException(error.getMessage()));
+    }
+
+    public void handle(ServerConnection conn, Method method)
+    {
+        conn.dispatch(method);
+    }
+
+    @Override public void connectionHeartbeat(ServerConnection conn, ConnectionHeartbeat hearbeat)
+    {
+        // do nothing
+    }
+
+    protected void sendConnectionCloseOkAndCloseSender(ServerConnection conn)
+    {
+        conn.connectionCloseOk();
+        conn.getSender().close();
+    }
+
+    @Override public void connectionCloseOk(ServerConnection conn, ConnectionCloseOk ok)
+    {
+        conn.getSender().close();
+    }
+
+    @Override public void sessionDetached(ServerConnection conn, SessionDetached dtc)
+    {
+        ServerSession ssn = conn.getSession(dtc.getChannel());
+        if (ssn != null)
+        {
+            ssn.setDetachCode(dtc.getCode());
+            conn.unmap(ssn);
+            ssn.closed();
+        }
+    }
+
+    public void writerIdle(final ServerConnection connection)
+    {
+        connection.doHeartBeat();
+    }
+
     enum ConnectionState
     {
         INIT,
@@ -134,7 +178,7 @@ public class ServerConnectionDelegate ex
     }
 
     @Override
-    public void init(final Connection conn, final ProtocolHeader hdr)
+    public void init(final ServerConnection conn, final ProtocolHeader hdr)
     {
         ServerConnection serverConnection = (ServerConnection) conn;
         assertState(serverConnection, ConnectionState.INIT);
@@ -178,7 +222,7 @@ public class ServerConnectionDelegate ex
     }
 
     @Override
-    public void connectionSecureOk(final Connection conn, final ConnectionSecureOk ok)
+    public void connectionSecureOk(final ServerConnection conn, final ConnectionSecureOk ok)
     {
         ServerConnection serverConnection = (ServerConnection) conn;
         assertState(serverConnection, ConnectionState.AWAIT_SECURE_OK);
@@ -223,7 +267,7 @@ public class ServerConnectionDelegate ex
     }
 
     @Override
-    public void connectionClose(Connection conn, ConnectionClose close)
+    public void connectionClose(ServerConnection conn, ConnectionClose close)
     {
         final ServerConnection sconn = (ServerConnection) conn;
         sconn.closeCode(close);
@@ -231,7 +275,8 @@ public class ServerConnectionDelegate ex
         sendConnectionCloseOkAndCloseSender(conn);
     }
 
-    public void connectionOpen(Connection conn, ConnectionOpen open)
+
+    public void connectionOpen(ServerConnection conn, ConnectionOpen open)
     {
         final ServerConnection sconn = (ServerConnection) conn;
         assertState(sconn, ConnectionState.AWAIT_OPEN);
@@ -255,7 +300,7 @@ public class ServerConnectionDelegate ex
         {
             if (!addressSpace.isActive())
             {
-                sconn.setState(Connection.State.CLOSING);
+                sconn.setState(ServerConnection.State.CLOSING);
                 final String redirectHost = addressSpace.getRedirectHost(port);
                 if(redirectHost == null)
                 {
@@ -274,25 +319,25 @@ public class ServerConnectionDelegate ex
                 sconn.setVirtualHost(addressSpace);
                 if(!addressSpace.authoriseCreateConnection(sconn.getAmqpConnection()))
                 {
-                    sconn.setState(Connection.State.CLOSING);
+                    sconn.setState(ServerConnection.State.CLOSING);
                     sconn.sendConnectionClose(ConnectionCloseCode.CONNECTION_FORCED, "Connection not authorized");
                     return;
                 }
             }
             catch (AccessControlException | VirtualHostUnavailableException e)
             {
-                sconn.setState(Connection.State.CLOSING);
+                sconn.setState(ServerConnection.State.CLOSING);
                 sconn.sendConnectionClose(ConnectionCloseCode.CONNECTION_FORCED, e.getMessage());
                 return;
             }
 
-            sconn.setState(Connection.State.OPEN);
+            sconn.setState(ServerConnection.State.OPEN);
             _state = ConnectionState.OPEN;
             sconn.invoke(new ConnectionOpenOk(Collections.emptyList()));
         }
         else
         {
-            sconn.setState(Connection.State.CLOSING);
+            sconn.setState(ServerConnection.State.CLOSING);
             sconn.sendConnectionClose(ConnectionCloseCode.INVALID_PATH,
                                              "Unknown virtualhost '" + vhostName + "'");
         }
@@ -300,7 +345,7 @@ public class ServerConnectionDelegate ex
     }
 
     @Override
-    public void connectionTuneOk(final Connection conn, final ConnectionTuneOk ok)
+    public void connectionTuneOk(final ServerConnection conn, final ConnectionTuneOk ok)
     {
         ServerConnection sconn = (ServerConnection) conn;
         assertState(sconn, ConnectionState.AWAIT_TUNE_OK);
@@ -369,18 +414,27 @@ public class ServerConnectionDelegate ex
         return _maximumFrameSize;
     }
 
-    @Override public void sessionDetach(Connection conn, SessionDetach dtc)
+    @Override
+    public void sessionDetach(ServerConnection conn, SessionDetach dtc)
     {
         // To ensure a clean detach, we stop any remaining subscriptions. Stop ensures
         // that any in-progress delivery (QueueRunner) is completed before the stop
         // completes.
         stopAllSubscriptions(conn, dtc);
-        Session ssn = conn.getSession(dtc.getChannel());
+        ServerSession ssn = conn.getSession(dtc.getChannel());
         ((ServerSession)ssn).setClose(true);
-        super.sessionDetach(conn, dtc);
+        sessionDetachSuper(conn, dtc);
+    }
+
+    private void sessionDetachSuper(ServerConnection conn, SessionDetach dtc)
+    {
+        ServerSession ssn = conn.getSession(dtc.getChannel());
+        ssn.sessionDetached(dtc.getName(), ssn.getDetachCode() == null? SessionDetachCode.NORMAL: ssn.getDetachCode());
+        conn.unmap(ssn);
+        ssn.closed();
     }
 
-    private void stopAllSubscriptions(Connection conn, SessionDetach dtc)
+    private void stopAllSubscriptions(ServerConnection conn, SessionDetach dtc)
     {
         final ServerSession ssn = (ServerSession) conn.getSession(dtc.getChannel());
         final Collection<ConsumerTarget_0_10> subs = ssn.getSubscriptions();
@@ -392,12 +446,12 @@ public class ServerConnectionDelegate ex
 
 
     @Override
-    public void sessionAttach(final Connection conn, final SessionAttach atc)
+    public void sessionAttach(final ServerConnection conn, final SessionAttach atc)
     {
         ServerConnection serverConnection = (ServerConnection) conn;
         assertState(serverConnection, ConnectionState.OPEN);
 
-        SessionDelegate serverSessionDelegate = new ServerSessionDelegate();
+        ServerSessionDelegate serverSessionDelegate = new ServerSessionDelegate();
 
         final ServerSession serverSession =
                 new ServerSession(conn, serverSessionDelegate, new Binary(atc.getName()), 0);
@@ -411,7 +465,7 @@ public class ServerConnectionDelegate ex
             serverConnection.map(serverSession, atc.getChannel());
             serverConnection.registerSession(serverSession);
             serverSession.sendSessionAttached(atc.getName());
-            serverSession.setState(Session.State.OPEN);
+            serverSession.setState(ServerSession.State.OPEN);
         }
         else
         {
@@ -420,7 +474,7 @@ public class ServerConnectionDelegate ex
         }
     }
 
-    private boolean isSessionNameUnique(final byte[] name, final Connection conn)
+    private boolean isSessionNameUnique(final byte[] name, final ServerConnection conn)
     {
         final ServerConnection sconn = (ServerConnection) conn;
         final Principal authorizedPrincipal = sconn.getAuthorizedPrincipal();
@@ -444,7 +498,7 @@ public class ServerConnectionDelegate ex
     }
 
     @Override
-    public void connectionStartOk(Connection conn, ConnectionStartOk ok)
+    public void connectionStartOk(ServerConnection conn, ConnectionStartOk ok)
     {
         ServerConnection serverConnection = (ServerConnection)conn;
         assertState(serverConnection, ConnectionState.AWAIT_START_OK);
@@ -530,7 +584,7 @@ public class ServerConnectionDelegate ex
         return _compressionSupported && _broker.isMessageCompressionEnabled();
     }
 
-    private void connectionAuthFailed(final Connection conn, Exception e)
+    private void connectionAuthFailed(final ServerConnection conn, Exception e)
     {
         ServerConnection serverConnection = (ServerConnection)conn;
         if (e != null)

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1783342&r1=1783341&r2=1783342&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java Fri Feb 17 08:08:51 2017
@@ -21,8 +21,23 @@
 package org.apache.qpid.server.protocol.v0_10;
 
 import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT;
+import static org.apache.qpid.server.protocol.v0_10.ServerSession.State.CLOSED;
+import static org.apache.qpid.server.protocol.v0_10.ServerSession.State.CLOSING;
+import static org.apache.qpid.server.protocol.v0_10.ServerSession.State.DETACHED;
+import static org.apache.qpid.server.protocol.v0_10.ServerSession.State.NEW;
+import static org.apache.qpid.server.protocol.v0_10.ServerSession.State.OPEN;
+import static org.apache.qpid.server.protocol.v0_10.ServerSession.State.RESUMING;
+import static org.apache.qpid.server.transport.Option.COMPLETED;
+import static org.apache.qpid.server.transport.Option.SYNC;
+import static org.apache.qpid.server.transport.Option.TIMELY_REPLY;
+import static org.apache.qpid.server.util.Serial.ge;
 import static org.apache.qpid.server.util.Serial.gt;
+import static org.apache.qpid.server.util.Serial.le;
+import static org.apache.qpid.server.util.Serial.lt;
+import static org.apache.qpid.server.util.Serial.max;
+import static org.apache.qpid.server.util.Strings.toUTF8;
 
+import java.nio.ByteBuffer;
 import java.security.AccessControlContext;
 import java.security.AccessController;
 import java.security.Principal;
@@ -31,6 +46,7 @@ import java.text.MessageFormat;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -43,6 +59,7 @@ import java.util.concurrent.ConcurrentHa
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -72,6 +89,8 @@ import org.apache.qpid.server.store.Stor
 import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.server.transport.AMQPConnection;
+import org.apache.qpid.server.transport.network.Frame;
+import org.apache.qpid.server.transport.util.Waiter;
 import org.apache.qpid.server.txn.AlreadyKnownDtxException;
 import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
 import org.apache.qpid.server.txn.DistributedTransaction;
@@ -89,10 +108,11 @@ import org.apache.qpid.server.util.Actio
 import org.apache.qpid.server.util.ServerScopedRuntimeException;
 import org.apache.qpid.server.transport.*;
 
-public class ServerSession extends Session
+public class ServerSession extends SessionInvoker
         implements LogSubject, AsyncAutoCommitTransaction.FutureRecorder
 {
-    private static final Logger _logger = LoggerFactory.getLogger(ServerSession.class);
+    private static final Logger LOGGER = LoggerFactory.getLogger(ServerSession.class);
+    public static final int UNLIMITED_CREDIT = 0xFFFFFFFF;
 
     private static final String NULL_DESTINATION = UUID.randomUUID().toString();
     private static final int PRODUCER_CREDIT_TOPUP_THRESHOLD = 1 << 30;
@@ -103,10 +123,832 @@ public class ServerSession extends Sessi
     private final AtomicBoolean _blocking = new AtomicBoolean(false);
     private final AtomicInteger _outstandingCredit = new AtomicInteger(UNLIMITED_CREDIT);
     private final CheckCapacityAction _checkCapacityAction = new CheckCapacityAction();
+    private final long timeout = 60000;  // TODO server side close does not require this
+    // completed incoming commands
+    private final Object processedLock = new Object();
+    private final int commandLimit = Integer.getInteger("qpid.session.command_limit", 64 * 1024);
+    private final Object commandsLock = new Object();
+    private final Object stateLock = new Object();
+    private final AtomicBoolean _failoverRequired = new AtomicBoolean(false);
     private Session_0_10 _modelObject;
     private long _blockTime;
     private long _blockingTimeout;
     private boolean _wireBlockingState;
+    private ServerConnection connection;
+    private Binary name;
+    private boolean closing;
+    private int channel;
+    private ServerSessionDelegate delegate;
+    private SessionListener listener = new DefaultSessionListener();
+    private boolean autoSync = false;
+    private boolean incomingInit;
+    // incoming command count
+    private int commandsIn;
+    private RangeSet processed;
+    private int maxProcessed;
+    private int syncPoint;
+    // outgoing command count
+    private int commandsOut = 0;
+    private Map<Integer,Method> commands = new HashMap<Integer, Method>();
+    private int commandBytes = 0;
+    private int byteLimit = Integer.getInteger("qpid.session.byte_limit", 1024 * 1024);
+    private int maxComplete = commandsOut - 1;
+    private boolean needSync = false;
+    private State state = NEW;
+    private Semaphore credit = new Semaphore(0);
+    private Thread resumer = null;
+    private boolean transacted = false;
+    private SessionDetachCode detachCode;
+    private boolean _isNoReplay = false;
+    private Map<Integer,ResultFuture<?>> results = new HashMap<Integer,ResultFuture<?>>();
+    private org.apache.qpid.server.transport.ExecutionException exception = null;
+
+    public Binary getName()
+    {
+        return name;
+    }
+
+    protected void setClose(boolean close)
+    {
+        this.closing = close;
+    }
+
+    public int getChannel()
+    {
+        return channel;
+    }
+
+    void setChannel(int channel)
+    {
+        this.channel = channel;
+    }
+
+    public SessionListener getSessionListener()
+    {
+        return listener;
+    }
+
+    protected State getState()
+    {
+        return this.state;
+    }
+
+    void addCredit(int value)
+    {
+        credit.release(value);
+    }
+
+    void drainCredit()
+    {
+        credit.drainPermits();
+    }
+
+    private void initReceiver()
+    {
+        synchronized (processedLock)
+        {
+            incomingInit = false;
+            processed = RangeSetFactory.createRangeSet();
+        }
+    }
+
+    void attach()
+    {
+        initReceiver();
+        sessionAttach(name.getBytes());
+        sessionRequestTimeout(0);//use expiry here only if/when session resume is supported
+    }
+
+    void resume()
+    {
+        _failoverRequired.set(false);
+
+        synchronized (commandsLock)
+        {
+            attach();
+
+            for (int i = maxComplete + 1; lt(i, commandsOut); i++)
+            {
+                Method m = getCommand(i);
+                if (m == null)
+                {
+                    m = new ExecutionSync();
+                    m.setId(i);
+                }
+                else if (m instanceof MessageTransfer)
+                {
+                    MessageTransfer xfr = (MessageTransfer)m;
+
+                    Header header = xfr.getHeader();
+
+                    if (header != null)
+                    {
+                        if (header.getDeliveryProperties() != null)
+                        {
+                           header.getDeliveryProperties().setRedelivered(true);
+                        }
+                        else
+                        {
+                            DeliveryProperties deliveryProps = new DeliveryProperties();
+                            deliveryProps.setRedelivered(true);
+
+                            xfr.setHeader(new Header(deliveryProps, header.getMessageProperties(),
+                                                     header.getNonStandardProperties()));
+                        }
+
+                    }
+                    else
+                    {
+                        DeliveryProperties deliveryProps = new DeliveryProperties();
+                        deliveryProps.setRedelivered(true);
+                        xfr.setHeader(new Header(deliveryProps, null, null));
+                    }
+                }
+                sessionCommandPoint(m.getId(), 0);
+                send(m);
+            }
+
+            sessionCommandPoint(commandsOut, 0);
+
+            sessionFlush(COMPLETED);
+            resumer = Thread.currentThread();
+            state = RESUMING;
+
+            if(isTransacted())
+            {
+                txSelect();
+            }
+
+            listener.resumed(this);
+            resumer = null;
+        }
+    }
+
+    private Method getCommand(int i)
+    {
+        return commands.get(i);
+    }
+
+    private void setCommand(int commandId, Method command)
+    {
+        commands.put(commandId, command);
+    }
+
+    private Method removeCommand(int id)
+    {
+        return commands.remove(id);
+    }
+
+    final void commandPoint(int id)
+    {
+        synchronized (processedLock)
+        {
+            this.commandsIn = id;
+            if (!incomingInit)
+            {
+                incomingInit = true;
+                maxProcessed = commandsIn - 1;
+                syncPoint = maxProcessed;
+            }
+        }
+    }
+
+    public int getCommandsOut()
+    {
+        return commandsOut;
+    }
+
+    public int getCommandsIn()
+    {
+        return commandsIn;
+    }
+
+    public int nextCommandId()
+    {
+        return commandsIn++;
+    }
+
+    final void identify(Method cmd)
+    {
+        if (!incomingInit)
+        {
+            throw new IllegalStateException();
+        }
+
+        int id = nextCommandId();
+        cmd.setId(id);
+
+        if(LOGGER.isDebugEnabled())
+        {
+            LOGGER.debug("identify: ch={}, commandId={}", this.channel, id);
+        }
+
+        if ((id & 0xff) == 0)
+        {
+            flushProcessed(TIMELY_REPLY);
+        }
+    }
+
+    public void processed(Method command)
+    {
+        processed(command.getId());
+    }
+
+    public void processed(int command)
+    {
+        processed(command, command);
+    }
+
+    public void processed(Range range)
+    {
+
+        processed(range.getLower(), range.getUpper());
+    }
+
+    public void processed(int lower, int upper)
+    {
+        if(LOGGER.isDebugEnabled())
+        {
+            LOGGER.debug("{} ch={} processed([{},{}]) {} {}", this, channel, lower, upper, syncPoint, maxProcessed);
+        }
+
+        boolean flush;
+        synchronized (processedLock)
+        {
+            if(LOGGER.isDebugEnabled())
+            {
+                LOGGER.debug("{} processed: {}", this, processed);
+            }
+
+            if (ge(upper, commandsIn))
+            {
+                throw new IllegalArgumentException
+                    ("range exceeds max received command-id: " + Range.newInstance(lower, upper));
+            }
+
+            processed.add(lower, upper);
+
+            Range first = processed.getFirst();
+
+            int flower = first.getLower();
+            int fupper = first.getUpper();
+            int old = maxProcessed;
+            if (le(flower, maxProcessed + 1))
+            {
+                maxProcessed = max(maxProcessed, fupper);
+            }
+            boolean synced = ge(maxProcessed, syncPoint);
+            flush = lt(old, syncPoint) && synced;
+            if (synced)
+            {
+                syncPoint = maxProcessed;
+            }
+        }
+        if (flush)
+        {
+            flushProcessed();
+        }
+    }
+
+    void flushExpected()
+    {
+        RangeSet rs = RangeSetFactory.createRangeSet();
+        synchronized (processedLock)
+        {
+            if (incomingInit)
+            {
+                rs.add(commandsIn);
+            }
+        }
+        sessionExpected(rs, null);
+    }
+
+    public void flushProcessed(Option... options)
+    {
+        RangeSet copy;
+        synchronized (processedLock)
+        {
+            copy = processed.copy();
+        }
+
+        synchronized (commandsLock)
+        {
+            if (state == DETACHED || state == CLOSING || state == CLOSED)
+            {
+                return;
+            }
+            if (copy.size() > 0)
+            {
+                sessionCompleted(copy, options);
+            }
+        }
+    }
+
+    void knownComplete(RangeSet kc)
+    {
+        if (kc.size() > 0)
+        {
+            synchronized (processedLock)
+            {
+                processed.subtract(kc) ;
+            }
+        }
+    }
+
+    void syncPoint()
+    {
+        int id = getCommandsIn() - 1;
+        LOGGER.debug("{} synced to {}", this, id);
+        boolean flush;
+        synchronized (processedLock)
+        {
+            syncPoint = id;
+            flush = ge(maxProcessed, syncPoint);
+        }
+        if (flush)
+        {
+            flushProcessed();
+        }
+    }
+
+    protected boolean complete(int lower, int upper)
+    {
+        //avoid autoboxing
+        if(LOGGER.isDebugEnabled())
+        {
+            LOGGER.debug("{} complete({}, {})", this, lower, upper);
+        }
+        synchronized (commandsLock)
+        {
+            int old = maxComplete;
+            for (int id = max(maxComplete, lower); le(id, upper); id++)
+            {
+                Method m = removeCommand(id);
+                if (m != null)
+                {
+                    commandBytes -= m.getBodySize();
+                    m.complete();
+                }
+            }
+            if (le(lower, maxComplete + 1))
+            {
+                maxComplete = max(maxComplete, upper);
+            }
+
+            if(LOGGER.isDebugEnabled())
+            {
+                LOGGER.debug("{}   commands remaining: {}", this, commandsOut - maxComplete);
+            }
+
+            commandsLock.notifyAll();
+            return gt(maxComplete, old);
+        }
+    }
+
+    void received(Method m)
+    {
+        m.delegate(this, delegate);
+    }
+
+    private void send(Method m)
+    {
+        m.setChannel(channel);
+        connection.send(m);
+
+        if (!m.isBatch())
+        {
+            connection.flush();
+        }
+    }
+
+    protected boolean isBytesFull()
+    {
+        return commandBytes >= byteLimit;
+    }
+
+    protected boolean isCommandsFull(int id)
+    {
+        return id - maxComplete >= commandLimit;
+    }
+
+    @Override
+    public void invoke(Method m)
+    {
+        invoke(m,(Runnable)null);
+    }
+
+    public void invoke(Method m, Runnable postIdSettingAction)
+    {
+        if (m.getEncodedTrack() == Frame.L4)
+        {
+            synchronized (commandsLock)
+            {
+                if (state == DETACHED && m.isUnreliable())
+                {
+                    Thread current = Thread.currentThread();
+                    if (!current.equals(resumer))
+                    {
+                        return;
+                    }
+                }
+
+                if (state != OPEN && state != CLOSED && state != CLOSING)
+                {
+                    Thread current = Thread.currentThread();
+                    if (!current.equals(resumer) )
+                    {
+                        Waiter w = new Waiter(commandsLock, timeout);
+                        while (w.hasTime() && (state != OPEN && state != CLOSED))
+                        {
+                            checkFailoverRequired("Command was interrupted because of failover, before being sent");
+                            w.await();
+                        }
+                    }
+                }
+
+                switch (state)
+                {
+                case OPEN:
+                    break;
+                case RESUMING:
+                    Thread current = Thread.currentThread();
+                    if (!current.equals(resumer))
+                    {
+                        throw new SessionException
+                            ("timed out waiting for resume to finish");
+                    }
+                    break;
+                case CLOSING:
+                case CLOSED:
+                    org.apache.qpid.server.transport.ExecutionException exc = getException();
+                    if (exc != null)
+                    {
+                        throw new SessionException(exc);
+                    }
+                    else
+                    {
+                        throw new SessionClosedException();
+                    }
+                default:
+                    throw new SessionException
+                        (String.format
+                         ("timed out waiting for session to become open " +
+                          "(state=%s)", state));
+                }
+
+                int next;
+                next = commandsOut++;
+                m.setId(next);
+                if(postIdSettingAction != null)
+                {
+                    postIdSettingAction.run();
+                }
+
+                if (isFull(next))
+                {
+                    Waiter w = new Waiter(commandsLock, timeout);
+                    while (w.hasTime() && isFull(next) && state != CLOSED)
+                    {
+                        if (state == OPEN || state == RESUMING)
+                        {
+                            try
+                            {
+                                sessionFlush(COMPLETED);
+                            }
+                            catch (SenderException e)
+                            {
+                                if (!closing)
+                                {
+                                    // if expiry is > 0 then this will
+                                    // happen again on resume
+                                    LOGGER.error("error sending flush (full replay buffer)", e);
+                                }
+                                else
+                                {
+                                    e.rethrow();
+                                }
+                            }
+                        }
+                        checkFailoverRequired("Command was interrupted because of failover, before being sent");
+                        w.await();
+                    }
+                }
+
+                if (state == CLOSED)
+                {
+                    org.apache.qpid.server.transport.ExecutionException exc = getException();
+                    if (exc != null)
+                    {
+                        throw new SessionException(exc);
+                    }
+                    else
+                    {
+                        throw new SessionClosedException();
+                    }
+                }
+
+                if (isFull(next))
+                {
+                    throw new SessionException("timed out waiting for completion");
+                }
+
+                if (next == 0)
+                {
+                    sessionCommandPoint(0, 0);
+                }
+
+                boolean replayTransfer = !_isNoReplay && !closing && !transacted &&
+                                         m instanceof MessageTransfer &&
+                                         ! m.isUnreliable();
+
+                if ((replayTransfer) || m.hasCompletionListener())
+                {
+                    setCommand(next, m);
+                    commandBytes += m.getBodySize();
+                }
+                if (autoSync)
+                {
+                    m.setSync(true);
+                }
+                needSync = !m.isSync();
+
+                try
+                {
+                    send(m);
+                }
+                catch (SenderException e)
+                {
+                    if (!closing)
+                    {
+                        // if we are not closing then this will happen
+                        // again on resume
+                        LOGGER.error("error sending command", e);
+                    }
+                    else
+                    {
+                        e.rethrow();
+                    }
+                }
+                if (autoSync)
+                {
+                    sync();
+                }
+
+                // flush every 64K commands to avoid ambiguity on
+                // wraparound
+                if (shouldIssueFlush(next))
+                {
+                    try
+                    {
+                        sessionFlush(COMPLETED);
+                    }
+                    catch (SenderException e)
+                    {
+                        if (!closing)
+                        {
+                            // if expiry is > 0 then this will happen
+                            // again on resume
+                            LOGGER.error("error sending flush (periodic)", e);
+                        }
+                        else
+                        {
+                            e.rethrow();
+                        }
+                    }
+                }
+            }
+        }
+        else
+        {
+            send(m);
+        }
+    }
+
+    private void checkFailoverRequired(String message)
+    {
+        if (_failoverRequired.get())
+        {
+            throw new SessionException(message);
+        }
+    }
+
+    protected boolean shouldIssueFlush(int next)
+    {
+        return (next % 65536) == 0;
+    }
+
+    public void sync()
+    {
+        sync(timeout);
+    }
+
+    public void sync(long timeout)
+    {
+        LOGGER.debug("{} sync()", this);
+        synchronized (commandsLock)
+        {
+            int point = commandsOut - 1;
+
+            if (needSync && lt(maxComplete, point))
+            {
+                executionSync(SYNC);
+            }
+
+            Waiter w = new Waiter(commandsLock, timeout);
+            while (w.hasTime() && state != CLOSED && lt(maxComplete, point))
+            {
+                checkFailoverRequired("Session sync was interrupted by failover.");
+                if(LOGGER.isDebugEnabled())
+                {
+                    LOGGER.debug("{}   waiting for[{}]: {}, {}", this, point, maxComplete, commands);
+                }
+                w.await();
+            }
+
+            if (lt(maxComplete, point))
+            {
+                if (state != CLOSED)
+                {
+                    throw new SessionException(
+                            String.format("timed out waiting for sync: complete = %s, point = %s",
+                                    maxComplete, point));
+                }
+                else
+                {
+                    org.apache.qpid.server.transport.ExecutionException ee = getException();
+                    if (ee != null)
+                    {
+                        throw new SessionException(ee);
+                    }
+                }
+            }
+        }
+    }
+
+    void result(int command, Struct result)
+    {
+        ResultFuture<?> future;
+        synchronized (results)
+        {
+            future = results.remove(command);
+        }
+
+        if (future != null)
+        {
+            future.set(result);
+        }
+        else
+        {
+            LOGGER.warn("Received a response to a command" +
+                     " that's no longer valid on the client side." +
+                     " [ command id : {} , result : {} ]", command, result);
+        }
+    }
+
+    void setException(org.apache.qpid.server.transport.ExecutionException exc)
+    {
+        synchronized (results)
+        {
+            if (exception != null)
+            {
+                throw new IllegalStateException(
+                        String.format("too many exceptions: %s, %s", exception, exc));
+            }
+            exception = exc;
+        }
+    }
+
+    org.apache.qpid.server.transport.ExecutionException getException()
+    {
+        synchronized (results)
+        {
+            return exception;
+        }
+    }
+
+    @Override
+    protected <T> Future<T> invoke(Method m, Class<T> klass)
+    {
+        synchronized (commandsLock)
+        {
+            int command = commandsOut;
+            ResultFuture<T> future = new ResultFuture<T>(klass);
+            synchronized (results)
+            {
+                results.put(command, future);
+            }
+            invoke(m);
+            return future;
+        }
+    }
+
+    public final void messageTransfer(String destination,
+                                      MessageAcceptMode acceptMode,
+                                      MessageAcquireMode acquireMode,
+                                      Header header,
+                                      byte[] body,
+                                      Option ... _options) {
+        messageTransfer(destination, acceptMode, acquireMode, header,
+                        ByteBuffer.wrap(body), _options);
+    }
+
+    public final void messageTransfer(String destination,
+                                      MessageAcceptMode acceptMode,
+                                      MessageAcquireMode acquireMode,
+                                      Header header,
+                                      String body,
+                                      Option ... _options) {
+        messageTransfer(destination, acceptMode, acquireMode, header,
+                        toUTF8(body), _options);
+    }
+
+    public void exception(Throwable t)
+    {
+        LOGGER.error("caught exception", t);
+    }
+
+    public void closed()
+    {
+        synchronized (commandsLock)
+        {
+            if (closing || getException() != null)
+            {
+                state = CLOSED;
+            }
+            else
+            {
+                state = DETACHED;
+            }
+
+            commandsLock.notifyAll();
+
+            synchronized (results)
+            {
+                for (ResultFuture<?> result : results.values())
+                {
+                    synchronized(result)
+                    {
+                        result.notifyAll();
+                    }
+                }
+            }
+            if(state == CLOSED)
+            {
+                delegate.closed(this);
+            }
+            else
+            {
+                delegate.detached(this);
+            }
+        }
+
+        if(state == CLOSED)
+        {
+            connection.removeSession(this);
+            listener.closed(this);
+        }
+    }
+
+    public boolean isClosing()
+    {
+        return state == CLOSED || state == CLOSING;
+    }
+
+    @Override
+    public String toString()
+    {
+        return String.format("ssn:%s", name);
+    }
+
+    public void setTransacted(boolean b) {
+        this.transacted = b;
+    }
+
+    public boolean isTransacted(){
+        return transacted;
+    }
+
+    public void setDetachCode(SessionDetachCode dtc)
+    {
+        this.detachCode = dtc;
+    }
+
+    public SessionDetachCode getDetachCode()
+    {
+        return this.detachCode;
+    }
+
+    public Object getStateLock()
+    {
+        return stateLock;
+    }
+
+    protected void sendSessionAttached(final byte[] name, final Option... options)
+    {
+        super.sessionAttached(name, options);
+    }
+
+    public enum State { NEW, DETACHED, RESUMING, OPEN, CLOSING, CLOSED }
 
     public interface MessageDispositionChangeListener
     {
@@ -142,9 +984,14 @@ public class ServerSession extends Sessi
 
     private final List<StoredMessage<MessageMetaData_0_10>> _uncommittedMessages = new ArrayList<>();
 
-    public ServerSession(Connection connection, SessionDelegate delegate, Binary name, long expiry)
+    public ServerSession(ServerConnection connection, ServerSessionDelegate delegate, Binary name, long expiry)
     {
-        super(connection, delegate, name, expiry);
+        this.connection = connection;
+        this.delegate = delegate;
+        this.name = name;
+        this.closing = false;
+        this._isNoReplay = false;
+        initReceiver();
         _transaction = new AsyncAutoCommitTransaction(this.getMessageStore(),this);
 
         ServerConnection serverConnection = (ServerConnection) connection;
@@ -166,7 +1013,7 @@ public class ServerSession extends Sessi
     {
         if(runningAsSubject())
         {
-            super.setState(state);
+            setStateSuper(state);
 
             if (state == State.OPEN)
             {
@@ -188,6 +1035,16 @@ public class ServerSession extends Sessi
         }
     }
 
+    private void setStateSuper(State state)
+    {
+        synchronized (commandsLock)
+        {
+            this.state = state;
+            commandsLock.notifyAll();
+        }
+    }
+
+
     private <T> T runAsSubject(final PrivilegedAction<T> privilegedAction)
     {
         return AccessController.doPrivileged(privilegedAction, getAccessControllerContext());
@@ -222,7 +1079,6 @@ public class ServerSession extends Sessi
         _modelObject.getPublishAuthCache().authorisePublish(destination, routingKey, immediate, currentTime);
     }
 
-    @Override
     protected boolean isFull(int id)
     {
         return isCommandsFull(id);
@@ -301,6 +1157,7 @@ public class ServerSession extends Sessi
     {
         dispositionChange(ranges, new MessageDispositionAction()
         {
+            @Override
             public void performAction(MessageDispositionChangeListener listener)
             {
                 listener.onAccept();
@@ -313,6 +1170,7 @@ public class ServerSession extends Sessi
     {
         dispositionChange(ranges, new MessageDispositionAction()
                                       {
+                                          @Override
                                           public void performAction(MessageDispositionChangeListener listener)
                                           {
                                               listener.onRelease(setRedelivered);
@@ -324,6 +1182,7 @@ public class ServerSession extends Sessi
     {
         dispositionChange(ranges, new MessageDispositionAction()
                                       {
+                                          @Override
                                           public void performAction(MessageDispositionChangeListener listener)
                                           {
                                               listener.onReject();
@@ -469,7 +1328,6 @@ public class ServerSession extends Sessi
         getAMQPConnection().getEventLogger().message(getLogSubject(), operationalLoggingMessage);
     }
 
-    @Override
     protected void awaitClose()
     {
         // Broker shouldn't block awaiting close - thus do override this method to do nothing
@@ -737,12 +1595,15 @@ public class ServerSession extends Sessi
         return getConnection().getAmqpConnection();
     }
 
-    @Override
     public ServerConnection getConnection()
     {
-        return (ServerConnection) super.getConnection();
+        return (ServerConnection) getConnectionSuper();
     }
 
+    private ServerConnection getConnectionSuper()
+    {
+        return connection;
+    }
 
     public LogSubject getLogSubject()
     {
@@ -839,7 +1700,7 @@ public class ServerSession extends Sessi
     @Override
     public String toLogString()
     {
-        long connectionId = super.getConnection() instanceof ServerConnection
+        long connectionId = getConnection() instanceof ServerConnection
                             ? getConnection().getConnectionId()
                             : -1;
         String authorizedPrincipal = (getAuthorizedPrincipal() == null) ? "?" : getAuthorizedPrincipal().getName();
@@ -861,7 +1722,6 @@ public class ServerSession extends Sessi
         close();
     }
 
-    @Override
     public void close()
     {
         // unregister subscriptions in order to prevent sending of new messages
@@ -871,7 +1731,35 @@ public class ServerSession extends Sessi
         {
             _modelObject.delete();
         }
-        super.close();
+        closeSuper();
+    }
+
+    private void closeSuper()
+    {
+        if (LOGGER.isDebugEnabled())
+        {
+            LOGGER.debug("Closing [{}] in state [{}]", this, state);
+        }
+        synchronized (commandsLock)
+        {
+            switch(state)
+            {
+                case DETACHED:
+                    state = CLOSED;
+                    delegate.closed(this);
+                    connection.removeSession(this);
+                    listener.closed(this);
+                    break;
+                case CLOSED:
+                    break;
+                default:
+                    state = CLOSING;
+                    setClose(true);
+                    sessionRequestTimeout(0);
+                    sessionDetach(name.getBytes());
+                    awaitClose();
+            }
+        }
     }
 
     void unregisterSubscriptions()
@@ -946,6 +1834,7 @@ public class ServerSession extends Sessi
         return _unfinishedCommandsQueue.isEmpty() ? null : _unfinishedCommandsQueue.getLast();
     }
 
+    @Override
     public void recordFuture(final ListenableFuture<Void> future, final ServerTransaction.Action action)
     {
         _unfinishedCommandsQueue.add(new AsyncCommand(future, action));
@@ -1068,6 +1957,31 @@ public class ServerSession extends Sessi
         return _modelObject.getMaxUncommittedInMemorySize();
     }
 
+    static class DefaultSessionListener implements SessionListener
+    {
+
+        @Override
+        public void opened(ServerSession ssn) {}
+
+        @Override
+        public void resumed(ServerSession ssn) {}
+
+        @Override
+        public void message(ServerSession ssn, MessageTransfer xfr)
+        {
+            LOGGER.info("message: {}", xfr);
+        }
+
+        @Override
+        public void exception(ServerSession ssn, SessionException exc)
+        {
+            LOGGER.error("session exception", exc);
+        }
+
+        @Override
+        public void closed(ServerSession ssn) {}
+    }
+
     private class CheckCapacityAction implements Action<MessageInstance>
     {
         @Override
@@ -1080,4 +1994,75 @@ public class ServerSession extends Sessi
             }
         }
     }
+
+    private class ResultFuture<T> implements Future<T>
+    {
+
+        private final Class<T> klass;
+        private T result;
+
+        private ResultFuture(Class<T> klass)
+        {
+            this.klass = klass;
+        }
+
+        private void set(Struct result)
+        {
+            synchronized (this)
+            {
+                this.result = klass.cast(result);
+                notifyAll();
+            }
+        }
+
+        public T get(long timeout)
+        {
+            synchronized (this)
+            {
+                Waiter w = new Waiter(this, timeout);
+                while (w.hasTime() && state != CLOSED && !isDone())
+                {
+                    checkFailoverRequired("Operation was interrupted by failover.");
+                    LOGGER.debug("{} waiting for result: {}", ServerSession.this, this);
+                    w.await();
+                }
+            }
+
+            if (isDone())
+            {
+                return result;
+            }
+            else if (state == CLOSED)
+            {
+                org.apache.qpid.server.transport.ExecutionException ex = getException();
+                if(ex == null)
+                {
+                    throw new SessionClosedException();
+                }
+                throw new SessionException(ex);
+            }
+            else
+            {
+                throw new SessionException(
+                        String.format("%s timed out waiting for result: %s",
+                                      ServerSession.this, this));
+            }
+        }
+
+        public T get()
+        {
+            return get(timeout);
+        }
+
+        public boolean isDone()
+        {
+            return result != null;
+        }
+
+        public String toString()
+        {
+            return String.format("Future(%s)", isDone() ? result : klass);
+        }
+
+    }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org