You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2017/02/17 08:08:51 UTC
svn commit: r1783342 [1/2] -
/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/
Author: kwall
Date: Fri Feb 17 08:08:51 2017
New Revision: 1783342
URL: http://svn.apache.org/viewvc?rev=1783342&view=rev
Log:
QPID-7622: [Java Broker] [0-10] Collasped ServerConnection, SessionSession etc using inline refactoring
Methods in the base classes that were overridden and invoked using super.method() are renamed methodSuper()
Removed:
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Connection.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConnectionDelegate.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConnectionListener.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/SessionDelegate.java
Modified:
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/SessionListener.java
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java?rev=1783342&r1=1783341&r2=1783342&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java Fri Feb 17 08:08:51 2017
@@ -86,7 +86,7 @@ public class AMQPConnection_0_10Impl ext
_connection = new ServerConnection(id, broker, port, transport, this);
SubjectCreator subjectCreator = port.getAuthenticationProvider().getSubjectCreator(transport.isSecure());
- ConnectionDelegate connDelegate = new ServerConnectionDelegate(broker, subjectCreator);
+ ServerConnectionDelegate connDelegate = new ServerConnectionDelegate(broker, subjectCreator);
_connection.setConnectionDelegate(connDelegate);
_connection.setRemoteAddress(network.getRemoteAddress());
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java?rev=1783342&r1=1783341&r2=1783342&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java Fri Feb 17 08:08:51 2017
@@ -20,23 +20,36 @@
*/
package org.apache.qpid.server.protocol.v0_10;
-import static org.apache.qpid.server.protocol.v0_10.Connection.State.CLOSING;
+import static org.apache.qpid.server.protocol.v0_10.ServerConnection.State.CLOSED;
+import static org.apache.qpid.server.protocol.v0_10.ServerConnection.State.CLOSING;
+import static org.apache.qpid.server.protocol.v0_10.ServerConnection.State.NEW;
+import static org.apache.qpid.server.protocol.v0_10.ServerConnection.State.OPEN;
+import java.net.SocketAddress;
import java.security.AccessControlContext;
import java.security.AccessController;
import java.security.Principal;
import java.security.PrivilegedAction;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import javax.security.auth.Subject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.NamedAddressSpace;
@@ -45,26 +58,25 @@ import org.apache.qpid.server.model.port
import org.apache.qpid.server.protocol.ConnectionClosingTicker;
import org.apache.qpid.server.protocol.ErrorCodes;
import org.apache.qpid.server.session.AMQPSession;
-import org.apache.qpid.server.transport.AMQPConnection;
-import org.apache.qpid.server.transport.ConnectionClose;
-import org.apache.qpid.server.transport.ConnectionCloseCode;
-import org.apache.qpid.server.transport.ConnectionCloseOk;
-import org.apache.qpid.server.transport.ExecutionErrorCode;
-import org.apache.qpid.server.transport.ExecutionException;
-import org.apache.qpid.server.transport.Method;
-import org.apache.qpid.server.transport.ProtocolEvent;
-import org.apache.qpid.server.transport.ServerNetworkConnection;
+import org.apache.qpid.server.transport.*;
+import org.apache.qpid.server.transport.network.NetworkConnection;
+import org.apache.qpid.server.transport.util.Waiter;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
-public class ServerConnection extends Connection
+public class ServerConnection extends ConnectionInvoker implements ProtocolEventReceiver, ProtocolEventSender
{
+ private static final Logger LOGGER = LoggerFactory.getLogger(ServerConnection.class);
private final Broker<?> _broker;
private final long _connectionId;
private final Object _reference = new Object();
private final AmqpPort<?> _port;
private final AtomicLong _lastIoTime = new AtomicLong();
+ final private Map<Binary,ServerSession> sessions = new HashMap<Binary,ServerSession>();
+ final private Map<Integer,ServerSession> channels = new ConcurrentHashMap<Integer,ServerSession>();
+ final private Object lock = new Object();
+ private final AtomicBoolean connectionLost = new AtomicBoolean(false);
private boolean _blocking;
private final Transport _transport;
@@ -74,6 +86,17 @@ public class ServerConnection extends Co
private final AMQPConnection_0_10 _amqpConnection;
private boolean _ignoreFutureInput;
private boolean _ignoreAllButConnectionCloseOk;
+ private NetworkConnection _networkConnection;
+ private FrameSizeObserver _frameSizeObserver;
+ private ServerConnectionDelegate delegate;
+ private ProtocolEventSender sender;
+ private State state = NEW;
+ private long timeout = 60000; // TODO server side close does not require this
+ private ConnectionException error = null;
+ private int channelMax = 1;
+ private String locale;
+ private SocketAddress _remoteAddress;
+ private SocketAddress _localAddress;
public ServerConnection(final long connectionId,
Broker<?> broker,
@@ -102,22 +125,32 @@ public class ServerConnection extends Co
@Override
protected void invoke(Method method)
{
- super.invoke(method);
+ invokeSuper(method);
if (method instanceof ConnectionClose)
{
_ignoreAllButConnectionCloseOk = true;
}
}
+ private void invokeSuper(Method method)
+ {
+ method.setChannel(0);
+ send(method);
+ if (!method.isBatch())
+ {
+ flush();
+ }
+ }
+
+
EventLogger getEventLogger()
{
return _amqpConnection.getEventLogger();
}
- @Override
protected void setState(State state)
{
- super.setState(state);
+ setStateSuper(state);
if(state == State.CLOSING)
{
@@ -130,10 +163,24 @@ public class ServerConnection extends Co
}
}
- @Override
+ private void setStateSuper(State state)
+ {
+ synchronized (lock)
+ {
+ this.state = state;
+ lock.notifyAll();
+ }
+ }
+
+
public ServerConnectionDelegate getConnectionDelegate()
{
- return (ServerConnectionDelegate) super.getConnectionDelegate();
+ return (ServerConnectionDelegate) getConnectionDelegateSuper();
+ }
+
+ private ServerConnectionDelegate getConnectionDelegateSuper()
+ {
+ return delegate;
}
public AMQPConnection_0_10 getAmqpConnection()
@@ -209,7 +256,7 @@ public class ServerConnection extends Co
{
try
{
- super.exception(t);
+ exceptionSuper(t);
}
finally
{
@@ -224,6 +271,12 @@ public class ServerConnection extends Co
}
}
+ private void exceptionSuper(Throwable t)
+ {
+ exception(new ConnectionException(t));
+ }
+
+
@Override
public void received(final ProtocolEvent event)
{
@@ -253,13 +306,23 @@ public class ServerConnection extends Co
@Override
public Void run()
{
- ServerConnection.super.received(event);
+ receivedSuper(event);
return null;
}
}, context);
}
}
+ private void receivedSuper(ProtocolEvent event)
+ {
+ if(LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("RECV: [{}] {}", this, String.valueOf(event));
+ }
+ event.delegate(this, delegate);
+ }
+
+
void sendConnectionCloseAsync(final ConnectionCloseCode replyCode, final String message)
{
addAsyncTask(new Action<ServerConnection>()
@@ -307,27 +370,39 @@ public class ServerConnection extends Co
}
}
- @Override
- public synchronized void registerSession(final Session ssn)
+ public synchronized void registerSession(final ServerSession ssn)
{
- super.registerSession(ssn);
+ registerSessionSuper(ssn);
if(_blocking)
{
((ServerSession)ssn).block();
}
}
+ private void registerSessionSuper(ServerSession ssn)
+ {
+ synchronized (lock)
+ {
+ sessions.put(ssn.getName(),ssn);
+ }
+ }
+
public Collection<? extends ServerSession> getSessionModels()
{
return Collections.unmodifiableCollection(getChannels());
}
- @Override
protected Collection<ServerSession> getChannels()
{
- return (Collection<ServerSession>) super.getChannels();
+ return (Collection<ServerSession>) getChannelsSuper();
+ }
+
+ private Collection<ServerSession> getChannelsSuper()
+ {
+ return new ArrayList<>(channels.values());
}
+
public void setAuthorizedSubject(final Subject authorizedSubject)
{
_amqpConnection.setSubject(authorizedSubject);
@@ -349,7 +424,7 @@ public class ServerConnection extends Co
try
{
performDeleteTasks();
- super.closed();
+ closedSuper();
}
finally
{
@@ -362,9 +437,39 @@ public class ServerConnection extends Co
}
+ private void closedSuper()
+ {
+ if (state == OPEN)
+ {
+ exception(new ConnectionException("connection aborted"));
+ }
+
+ LOGGER.debug("connection closed: {}", this);
+
+ synchronized (lock)
+ {
+ List<ServerSession> values = new ArrayList<ServerSession>(channels.values());
+ for (ServerSession ssn : values)
+ {
+ ssn.closed();
+ }
+
+ try
+ {
+ sender.close();
+ }
+ catch(Exception e)
+ {
+ // ignore.
+ }
+ sender = null;
+ setState(CLOSED);
+ }
+ }
+
private void markAllSessionsClosed()
{
- for (Session ssn : getChannels())
+ for (ServerSession ssn : getChannels())
{
final ServerSession session = (ServerSession) ssn;
((ServerSession) ssn).setClose(true);
@@ -374,7 +479,7 @@ public class ServerConnection extends Co
public void receivedComplete()
{
- for (Session ssn : getChannels())
+ for (ServerSession ssn : getChannels())
{
((ServerSession)ssn).receivedComplete();
}
@@ -384,9 +489,24 @@ public class ServerConnection extends Co
public void send(ProtocolEvent event)
{
_lastIoTime.set(System.currentTimeMillis());
- super.send(event);
+ sendSuper(event);
}
+ private void sendSuper(ProtocolEvent event)
+ {
+ if(LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("SEND: [{}] {}", this, String.valueOf(event));
+ }
+ ProtocolEventSender s = sender;
+ if (s == null)
+ {
+ throw new ConnectionException("connection closed");
+ }
+ s.send(event);
+ }
+
+
public String getRemoteContainerName()
{
return getConnectionDelegate().getClientId();
@@ -428,6 +548,355 @@ public class ServerConnection extends Co
return new ProcessPendingIterator(sessionsWithWork);
}
+ public void setConnectionDelegate(ServerConnectionDelegate delegate)
+ {
+ this.delegate = delegate;
+ }
+
+ public ProtocolEventSender getSender()
+ {
+ return sender;
+ }
+
+ public void setSender(ProtocolEventSender sender)
+ {
+ this.sender = sender;
+ }
+
+ protected void setLocale(String locale)
+ {
+ this.locale = locale;
+ }
+
+ String getLocale()
+ {
+ return locale;
+ }
+
+ public void removeSession(ServerSession ssn)
+ {
+ synchronized (lock)
+ {
+ sessions.remove(ssn.getName());
+ }
+ }
+
+ @Override
+ public void flush()
+ {
+ if(LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("FLUSH: [{}]", this);
+ }
+ final ProtocolEventSender theSender = sender;
+ if(theSender != null)
+ {
+ theSender.flush();
+ }
+ }
+
+ public void dispatch(Method method)
+ {
+ int channel = method.getChannel();
+ ServerSession ssn = getSession(channel);
+ if(ssn != null)
+ {
+ ssn.received(method);
+ }
+ else
+ {
+ /*
+ * A peer receiving any other control on a detached transport MUST discard it and
+ * send a session.detached with the "not-attached" reason code.
+ */
+ if(LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Control received on unattached channel : {}", channel);
+ }
+ invokeSessionDetached(channel, SessionDetachCode.NOT_ATTACHED);
+ }
+ }
+
+ public int getChannelMax()
+ {
+ return channelMax;
+ }
+
+ protected void setChannelMax(int max)
+ {
+ channelMax = max;
+ }
+
+ private int map(ServerSession ssn)
+ {
+ synchronized (lock)
+ {
+ //For a negotiated channelMax N, there are channels 0 to N-1 available.
+ for (int i = 0; i < getChannelMax(); i++)
+ {
+ if (!channels.containsKey(i))
+ {
+ map(ssn, i);
+ return i;
+ }
+ }
+
+ throw new RuntimeException("no more channels available");
+ }
+ }
+
+ protected void map(ServerSession ssn, int channel)
+ {
+ synchronized (lock)
+ {
+ channels.put(channel, ssn);
+ ssn.setChannel(channel);
+ }
+ }
+
+ void unmap(ServerSession ssn)
+ {
+ synchronized (lock)
+ {
+ channels.remove(ssn.getChannel());
+ }
+ }
+
+ public ServerSession getSession(int channel)
+ {
+ synchronized (lock)
+ {
+ return channels.get(channel);
+ }
+ }
+
+ public void resume()
+ {
+ synchronized (lock)
+ {
+ for (ServerSession ssn : sessions.values())
+ {
+ map(ssn);
+ ssn.resume();
+ }
+
+ setState(OPEN);
+ }
+ }
+
+ public void exception(ConnectionException e)
+ {
+ connectionLost.set(true);
+ synchronized (lock)
+ {
+ switch (state)
+ {
+ case OPENING:
+ case CLOSING:
+ error = e;
+ lock.notifyAll();
+ return;
+ }
+ }
+ }
+
+ public void closeCode(ConnectionClose close)
+ {
+ synchronized (lock)
+ {
+ ConnectionCloseCode code = close.getReplyCode();
+ if (code != ConnectionCloseCode.NORMAL)
+ {
+ exception(new ConnectionException(close));
+ }
+ }
+ }
+
+ @Override
+ public void close()
+ {
+ close(ConnectionCloseCode.NORMAL, null);
+ }
+
+ protected void sendConnectionClose(ConnectionCloseCode replyCode, String replyText, Option... _options)
+ {
+ connectionClose(replyCode, replyText, _options);
+ }
+
+ public void close(ConnectionCloseCode replyCode, String replyText, Option ... _options)
+ {
+ synchronized (lock)
+ {
+ switch (state)
+ {
+ case OPEN:
+ state = CLOSING;
+ connectionClose(replyCode, replyText, _options);
+ Waiter w = new Waiter(lock, timeout);
+ while (w.hasTime() && state == CLOSING && error == null)
+ {
+ w.await();
+ }
+
+ if (error != null)
+ {
+ close(replyCode, replyText, _options);
+ throw new ConnectionException(error);
+ }
+
+ switch (state)
+ {
+ case CLOSING:
+ close(replyCode, replyText, _options);
+ throw new ConnectionException("close() timed out");
+ case CLOSED:
+ break;
+ default:
+ throw new IllegalStateException(String.valueOf(state));
+ }
+ break;
+ case CLOSED:
+ break;
+ default:
+ if (sender != null)
+ {
+ sender.close();
+ w = new Waiter(lock, timeout);
+ while (w.hasTime() && sender != null && error == null)
+ {
+ w.await();
+ }
+
+ if (error != null)
+ {
+ throw new ConnectionException(error);
+ }
+
+ if (sender != null)
+ {
+ throw new ConnectionException("close() timed out");
+ }
+ }
+ break;
+ }
+ }
+ }
+
+ @Override
+ public String toString()
+ {
+ return String.format("conn:%x", System.identityHashCode(this));
+ }
+
+ protected boolean isConnectionLost()
+ {
+ return connectionLost.get();
+ }
+
+ public boolean hasSessionWithName(final byte[] name)
+ {
+ return sessions.containsKey(new Binary(name));
+ }
+
+ public SocketAddress getRemoteSocketAddress()
+ {
+ return _remoteAddress;
+ }
+
+ public SocketAddress getLocalAddress()
+ {
+ return _localAddress;
+ }
+
+ protected void setRemoteAddress(SocketAddress remoteAddress)
+ {
+ _remoteAddress = remoteAddress;
+ }
+
+ protected void setLocalAddress(SocketAddress localAddress)
+ {
+ _localAddress = localAddress;
+ }
+
+ private void invokeSessionDetached(int channel, SessionDetachCode sessionDetachCode)
+ {
+ SessionDetached sessionDetached = new SessionDetached();
+ sessionDetached.setChannel(channel);
+ sessionDetached.setCode(sessionDetachCode);
+ invoke(sessionDetached);
+ }
+
+ protected void doHeartBeat()
+ {
+ connectionHeartbeat();
+ }
+
+ public void setNetworkConnection(NetworkConnection network)
+ {
+ _networkConnection = network;
+ }
+
+ public NetworkConnection getNetworkConnection()
+ {
+ return _networkConnection;
+ }
+
+ public void setMaxFrameSize(final int maxFrameSize)
+ {
+ if(_frameSizeObserver != null)
+ {
+ _frameSizeObserver.setMaxFrameSize(maxFrameSize);
+ }
+ }
+
+ public void addFrameSizeObserver(final FrameSizeObserver frameSizeObserver)
+ {
+ if(_frameSizeObserver == null)
+ {
+ _frameSizeObserver = frameSizeObserver;
+ }
+ else
+ {
+ final FrameSizeObserver currentObserver = _frameSizeObserver;
+ _frameSizeObserver = new FrameSizeObserver()
+ {
+ @Override
+ public void setMaxFrameSize(final int frameSize)
+ {
+ currentObserver.setMaxFrameSize(frameSize);
+ frameSizeObserver.setMaxFrameSize(frameSize);
+ }
+ };
+ }
+ }
+
+ public boolean isClosing()
+ {
+ synchronized (lock)
+ {
+ return state == CLOSING || state == CLOSED;
+ }
+ }
+
+ protected void sendConnectionSecure(byte[] challenge, Option ... options)
+ {
+ super.connectionSecure(challenge, options);
+ }
+
+ protected void sendConnectionTune(int channelMax, int maxFrameSize, int heartbeatMin, int heartbeatMax, Option ... options)
+ {
+ super.connectionTune(channelMax, maxFrameSize, heartbeatMin, heartbeatMax, options);
+ }
+
+ protected void sendConnectionStart(final Map<String, Object> clientProperties,
+ final List<Object> mechanisms,
+ final List<Object> locales, final Option... options)
+ {
+ super.connectionStart(clientProperties, mechanisms, locales, options);
+ }
+
+ public enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD, RESUMING }
+
private class ProcessPendingIterator implements Iterator<Runnable>
{
private final Collection<AMQPSession<?,?>> _sessionsWithPending;
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java?rev=1783342&r1=1783341&r2=1783342&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java Fri Feb 17 08:08:51 2017
@@ -20,7 +20,7 @@
*/
package org.apache.qpid.server.protocol.v0_10;
-import static org.apache.qpid.server.protocol.v0_10.Connection.State.CLOSE_RCVD;
+import static org.apache.qpid.server.protocol.v0_10.ServerConnection.State.CLOSE_RCVD;
import java.security.AccessControlException;
import java.security.Principal;
@@ -46,26 +46,18 @@ import org.apache.qpid.server.security.a
import org.apache.qpid.server.security.auth.SubjectAuthenticationResult;
import org.apache.qpid.server.security.auth.sasl.SaslNegotiator;
import org.apache.qpid.server.security.auth.sasl.SaslSettings;
-import org.apache.qpid.server.transport.AMQPConnection;
+import org.apache.qpid.server.transport.*;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
-import org.apache.qpid.server.transport.Binary;
-import org.apache.qpid.server.transport.ConnectionClose;
-import org.apache.qpid.server.transport.ConnectionCloseCode;
-import org.apache.qpid.server.transport.ConnectionOpen;
-import org.apache.qpid.server.transport.ConnectionOpenOk;
-import org.apache.qpid.server.transport.ConnectionRedirect;
-import org.apache.qpid.server.transport.ConnectionSecureOk;
-import org.apache.qpid.server.transport.ConnectionStartOk;
-import org.apache.qpid.server.transport.ConnectionTuneOk;
-import org.apache.qpid.server.transport.Constant;
-import org.apache.qpid.server.transport.ProtocolHeader;
-import org.apache.qpid.server.transport.SessionAttach;
-import org.apache.qpid.server.transport.SessionDetach;
-import org.apache.qpid.server.transport.SessionDetachCode;
-import org.apache.qpid.server.transport.SessionDetached;
-public class ServerConnectionDelegate extends ConnectionDelegate
+/*
+
+Method ConnectionDelegate.connectionClose(ServerConnection, ConnectionClose) is already overridden in class org.apache.qpid.server.protocol.v0_10.ServerConnectionDelegate. Method will not be pushed down to that class.
+
+
+
+ */
+public class ServerConnectionDelegate extends MethodDelegate<ServerConnection> implements ProtocolDelegate<ServerConnection>
{
private static final Logger LOGGER = LoggerFactory.getLogger(ServerConnectionDelegate.class);
@@ -81,6 +73,58 @@ public class ServerConnectionDelegate ex
private boolean _compressionSupported;
private volatile SaslNegotiator _saslNegotiator;
+ public void control(ServerConnection conn, Method method)
+ {
+ method.dispatch(conn, this);
+ }
+
+ public void command(ServerConnection conn, Method method)
+ {
+ method.dispatch(conn, this);
+ }
+
+ public void error(ServerConnection conn, ProtocolError error)
+ {
+ conn.exception(new ConnectionException(error.getMessage()));
+ }
+
+ public void handle(ServerConnection conn, Method method)
+ {
+ conn.dispatch(method);
+ }
+
+ @Override public void connectionHeartbeat(ServerConnection conn, ConnectionHeartbeat hearbeat)
+ {
+ // do nothing
+ }
+
+ protected void sendConnectionCloseOkAndCloseSender(ServerConnection conn)
+ {
+ conn.connectionCloseOk();
+ conn.getSender().close();
+ }
+
+ @Override public void connectionCloseOk(ServerConnection conn, ConnectionCloseOk ok)
+ {
+ conn.getSender().close();
+ }
+
+ @Override public void sessionDetached(ServerConnection conn, SessionDetached dtc)
+ {
+ ServerSession ssn = conn.getSession(dtc.getChannel());
+ if (ssn != null)
+ {
+ ssn.setDetachCode(dtc.getCode());
+ conn.unmap(ssn);
+ ssn.closed();
+ }
+ }
+
+ public void writerIdle(final ServerConnection connection)
+ {
+ connection.doHeartBeat();
+ }
+
enum ConnectionState
{
INIT,
@@ -134,7 +178,7 @@ public class ServerConnectionDelegate ex
}
@Override
- public void init(final Connection conn, final ProtocolHeader hdr)
+ public void init(final ServerConnection conn, final ProtocolHeader hdr)
{
ServerConnection serverConnection = (ServerConnection) conn;
assertState(serverConnection, ConnectionState.INIT);
@@ -178,7 +222,7 @@ public class ServerConnectionDelegate ex
}
@Override
- public void connectionSecureOk(final Connection conn, final ConnectionSecureOk ok)
+ public void connectionSecureOk(final ServerConnection conn, final ConnectionSecureOk ok)
{
ServerConnection serverConnection = (ServerConnection) conn;
assertState(serverConnection, ConnectionState.AWAIT_SECURE_OK);
@@ -223,7 +267,7 @@ public class ServerConnectionDelegate ex
}
@Override
- public void connectionClose(Connection conn, ConnectionClose close)
+ public void connectionClose(ServerConnection conn, ConnectionClose close)
{
final ServerConnection sconn = (ServerConnection) conn;
sconn.closeCode(close);
@@ -231,7 +275,8 @@ public class ServerConnectionDelegate ex
sendConnectionCloseOkAndCloseSender(conn);
}
- public void connectionOpen(Connection conn, ConnectionOpen open)
+
+ public void connectionOpen(ServerConnection conn, ConnectionOpen open)
{
final ServerConnection sconn = (ServerConnection) conn;
assertState(sconn, ConnectionState.AWAIT_OPEN);
@@ -255,7 +300,7 @@ public class ServerConnectionDelegate ex
{
if (!addressSpace.isActive())
{
- sconn.setState(Connection.State.CLOSING);
+ sconn.setState(ServerConnection.State.CLOSING);
final String redirectHost = addressSpace.getRedirectHost(port);
if(redirectHost == null)
{
@@ -274,25 +319,25 @@ public class ServerConnectionDelegate ex
sconn.setVirtualHost(addressSpace);
if(!addressSpace.authoriseCreateConnection(sconn.getAmqpConnection()))
{
- sconn.setState(Connection.State.CLOSING);
+ sconn.setState(ServerConnection.State.CLOSING);
sconn.sendConnectionClose(ConnectionCloseCode.CONNECTION_FORCED, "Connection not authorized");
return;
}
}
catch (AccessControlException | VirtualHostUnavailableException e)
{
- sconn.setState(Connection.State.CLOSING);
+ sconn.setState(ServerConnection.State.CLOSING);
sconn.sendConnectionClose(ConnectionCloseCode.CONNECTION_FORCED, e.getMessage());
return;
}
- sconn.setState(Connection.State.OPEN);
+ sconn.setState(ServerConnection.State.OPEN);
_state = ConnectionState.OPEN;
sconn.invoke(new ConnectionOpenOk(Collections.emptyList()));
}
else
{
- sconn.setState(Connection.State.CLOSING);
+ sconn.setState(ServerConnection.State.CLOSING);
sconn.sendConnectionClose(ConnectionCloseCode.INVALID_PATH,
"Unknown virtualhost '" + vhostName + "'");
}
@@ -300,7 +345,7 @@ public class ServerConnectionDelegate ex
}
@Override
- public void connectionTuneOk(final Connection conn, final ConnectionTuneOk ok)
+ public void connectionTuneOk(final ServerConnection conn, final ConnectionTuneOk ok)
{
ServerConnection sconn = (ServerConnection) conn;
assertState(sconn, ConnectionState.AWAIT_TUNE_OK);
@@ -369,18 +414,27 @@ public class ServerConnectionDelegate ex
return _maximumFrameSize;
}
- @Override public void sessionDetach(Connection conn, SessionDetach dtc)
+ @Override
+ public void sessionDetach(ServerConnection conn, SessionDetach dtc)
{
// To ensure a clean detach, we stop any remaining subscriptions. Stop ensures
// that any in-progress delivery (QueueRunner) is completed before the stop
// completes.
stopAllSubscriptions(conn, dtc);
- Session ssn = conn.getSession(dtc.getChannel());
+ ServerSession ssn = conn.getSession(dtc.getChannel());
((ServerSession)ssn).setClose(true);
- super.sessionDetach(conn, dtc);
+ sessionDetachSuper(conn, dtc);
+ }
+
+ private void sessionDetachSuper(ServerConnection conn, SessionDetach dtc)
+ {
+ ServerSession ssn = conn.getSession(dtc.getChannel());
+ ssn.sessionDetached(dtc.getName(), ssn.getDetachCode() == null? SessionDetachCode.NORMAL: ssn.getDetachCode());
+ conn.unmap(ssn);
+ ssn.closed();
}
- private void stopAllSubscriptions(Connection conn, SessionDetach dtc)
+ private void stopAllSubscriptions(ServerConnection conn, SessionDetach dtc)
{
final ServerSession ssn = (ServerSession) conn.getSession(dtc.getChannel());
final Collection<ConsumerTarget_0_10> subs = ssn.getSubscriptions();
@@ -392,12 +446,12 @@ public class ServerConnectionDelegate ex
@Override
- public void sessionAttach(final Connection conn, final SessionAttach atc)
+ public void sessionAttach(final ServerConnection conn, final SessionAttach atc)
{
ServerConnection serverConnection = (ServerConnection) conn;
assertState(serverConnection, ConnectionState.OPEN);
- SessionDelegate serverSessionDelegate = new ServerSessionDelegate();
+ ServerSessionDelegate serverSessionDelegate = new ServerSessionDelegate();
final ServerSession serverSession =
new ServerSession(conn, serverSessionDelegate, new Binary(atc.getName()), 0);
@@ -411,7 +465,7 @@ public class ServerConnectionDelegate ex
serverConnection.map(serverSession, atc.getChannel());
serverConnection.registerSession(serverSession);
serverSession.sendSessionAttached(atc.getName());
- serverSession.setState(Session.State.OPEN);
+ serverSession.setState(ServerSession.State.OPEN);
}
else
{
@@ -420,7 +474,7 @@ public class ServerConnectionDelegate ex
}
}
- private boolean isSessionNameUnique(final byte[] name, final Connection conn)
+ private boolean isSessionNameUnique(final byte[] name, final ServerConnection conn)
{
final ServerConnection sconn = (ServerConnection) conn;
final Principal authorizedPrincipal = sconn.getAuthorizedPrincipal();
@@ -444,7 +498,7 @@ public class ServerConnectionDelegate ex
}
@Override
- public void connectionStartOk(Connection conn, ConnectionStartOk ok)
+ public void connectionStartOk(ServerConnection conn, ConnectionStartOk ok)
{
ServerConnection serverConnection = (ServerConnection)conn;
assertState(serverConnection, ConnectionState.AWAIT_START_OK);
@@ -530,7 +584,7 @@ public class ServerConnectionDelegate ex
return _compressionSupported && _broker.isMessageCompressionEnabled();
}
- private void connectionAuthFailed(final Connection conn, Exception e)
+ private void connectionAuthFailed(final ServerConnection conn, Exception e)
{
ServerConnection serverConnection = (ServerConnection)conn;
if (e != null)
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1783342&r1=1783341&r2=1783342&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java Fri Feb 17 08:08:51 2017
@@ -21,8 +21,23 @@
package org.apache.qpid.server.protocol.v0_10;
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT;
+import static org.apache.qpid.server.protocol.v0_10.ServerSession.State.CLOSED;
+import static org.apache.qpid.server.protocol.v0_10.ServerSession.State.CLOSING;
+import static org.apache.qpid.server.protocol.v0_10.ServerSession.State.DETACHED;
+import static org.apache.qpid.server.protocol.v0_10.ServerSession.State.NEW;
+import static org.apache.qpid.server.protocol.v0_10.ServerSession.State.OPEN;
+import static org.apache.qpid.server.protocol.v0_10.ServerSession.State.RESUMING;
+import static org.apache.qpid.server.transport.Option.COMPLETED;
+import static org.apache.qpid.server.transport.Option.SYNC;
+import static org.apache.qpid.server.transport.Option.TIMELY_REPLY;
+import static org.apache.qpid.server.util.Serial.ge;
import static org.apache.qpid.server.util.Serial.gt;
+import static org.apache.qpid.server.util.Serial.le;
+import static org.apache.qpid.server.util.Serial.lt;
+import static org.apache.qpid.server.util.Serial.max;
+import static org.apache.qpid.server.util.Strings.toUTF8;
+import java.nio.ByteBuffer;
import java.security.AccessControlContext;
import java.security.AccessController;
import java.security.Principal;
@@ -31,6 +46,7 @@ import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
@@ -43,6 +59,7 @@ import java.util.concurrent.ConcurrentHa
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -72,6 +89,8 @@ import org.apache.qpid.server.store.Stor
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.transport.AMQPConnection;
+import org.apache.qpid.server.transport.network.Frame;
+import org.apache.qpid.server.transport.util.Waiter;
import org.apache.qpid.server.txn.AlreadyKnownDtxException;
import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
import org.apache.qpid.server.txn.DistributedTransaction;
@@ -89,10 +108,11 @@ import org.apache.qpid.server.util.Actio
import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.server.transport.*;
-public class ServerSession extends Session
+public class ServerSession extends SessionInvoker
implements LogSubject, AsyncAutoCommitTransaction.FutureRecorder
{
- private static final Logger _logger = LoggerFactory.getLogger(ServerSession.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(ServerSession.class);
+ public static final int UNLIMITED_CREDIT = 0xFFFFFFFF;
private static final String NULL_DESTINATION = UUID.randomUUID().toString();
private static final int PRODUCER_CREDIT_TOPUP_THRESHOLD = 1 << 30;
@@ -103,10 +123,832 @@ public class ServerSession extends Sessi
private final AtomicBoolean _blocking = new AtomicBoolean(false);
private final AtomicInteger _outstandingCredit = new AtomicInteger(UNLIMITED_CREDIT);
private final CheckCapacityAction _checkCapacityAction = new CheckCapacityAction();
+ private final long timeout = 60000; // TODO server side close does not require this
+ // completed incoming commands
+ private final Object processedLock = new Object();
+ private final int commandLimit = Integer.getInteger("qpid.session.command_limit", 64 * 1024);
+ private final Object commandsLock = new Object();
+ private final Object stateLock = new Object();
+ private final AtomicBoolean _failoverRequired = new AtomicBoolean(false);
private Session_0_10 _modelObject;
private long _blockTime;
private long _blockingTimeout;
private boolean _wireBlockingState;
+ private ServerConnection connection;
+ private Binary name;
+ private boolean closing;
+ private int channel;
+ private ServerSessionDelegate delegate;
+ private SessionListener listener = new DefaultSessionListener();
+ private boolean autoSync = false;
+ private boolean incomingInit;
+ // incoming command count
+ private int commandsIn;
+ private RangeSet processed;
+ private int maxProcessed;
+ private int syncPoint;
+ // outgoing command count
+ private int commandsOut = 0;
+ private Map<Integer,Method> commands = new HashMap<Integer, Method>();
+ private int commandBytes = 0;
+ private int byteLimit = Integer.getInteger("qpid.session.byte_limit", 1024 * 1024);
+ private int maxComplete = commandsOut - 1;
+ private boolean needSync = false;
+ private State state = NEW;
+ private Semaphore credit = new Semaphore(0);
+ private Thread resumer = null;
+ private boolean transacted = false;
+ private SessionDetachCode detachCode;
+ private boolean _isNoReplay = false;
+ private Map<Integer,ResultFuture<?>> results = new HashMap<Integer,ResultFuture<?>>();
+ private org.apache.qpid.server.transport.ExecutionException exception = null;
+
+ public Binary getName()
+ {
+ return name;
+ }
+
+ protected void setClose(boolean close)
+ {
+ this.closing = close;
+ }
+
+ public int getChannel()
+ {
+ return channel;
+ }
+
+ void setChannel(int channel)
+ {
+ this.channel = channel;
+ }
+
+ public SessionListener getSessionListener()
+ {
+ return listener;
+ }
+
+ protected State getState()
+ {
+ return this.state;
+ }
+
+ void addCredit(int value)
+ {
+ credit.release(value);
+ }
+
+ void drainCredit()
+ {
+ credit.drainPermits();
+ }
+
+ private void initReceiver()
+ {
+ synchronized (processedLock)
+ {
+ incomingInit = false;
+ processed = RangeSetFactory.createRangeSet();
+ }
+ }
+
+ void attach()
+ {
+ initReceiver();
+ sessionAttach(name.getBytes());
+ sessionRequestTimeout(0);//use expiry here only if/when session resume is supported
+ }
+
+ void resume()
+ {
+ _failoverRequired.set(false);
+
+ synchronized (commandsLock)
+ {
+ attach();
+
+ for (int i = maxComplete + 1; lt(i, commandsOut); i++)
+ {
+ Method m = getCommand(i);
+ if (m == null)
+ {
+ m = new ExecutionSync();
+ m.setId(i);
+ }
+ else if (m instanceof MessageTransfer)
+ {
+ MessageTransfer xfr = (MessageTransfer)m;
+
+ Header header = xfr.getHeader();
+
+ if (header != null)
+ {
+ if (header.getDeliveryProperties() != null)
+ {
+ header.getDeliveryProperties().setRedelivered(true);
+ }
+ else
+ {
+ DeliveryProperties deliveryProps = new DeliveryProperties();
+ deliveryProps.setRedelivered(true);
+
+ xfr.setHeader(new Header(deliveryProps, header.getMessageProperties(),
+ header.getNonStandardProperties()));
+ }
+
+ }
+ else
+ {
+ DeliveryProperties deliveryProps = new DeliveryProperties();
+ deliveryProps.setRedelivered(true);
+ xfr.setHeader(new Header(deliveryProps, null, null));
+ }
+ }
+ sessionCommandPoint(m.getId(), 0);
+ send(m);
+ }
+
+ sessionCommandPoint(commandsOut, 0);
+
+ sessionFlush(COMPLETED);
+ resumer = Thread.currentThread();
+ state = RESUMING;
+
+ if(isTransacted())
+ {
+ txSelect();
+ }
+
+ listener.resumed(this);
+ resumer = null;
+ }
+ }
+
+ private Method getCommand(int i)
+ {
+ return commands.get(i);
+ }
+
+ private void setCommand(int commandId, Method command)
+ {
+ commands.put(commandId, command);
+ }
+
+ private Method removeCommand(int id)
+ {
+ return commands.remove(id);
+ }
+
+ final void commandPoint(int id)
+ {
+ synchronized (processedLock)
+ {
+ this.commandsIn = id;
+ if (!incomingInit)
+ {
+ incomingInit = true;
+ maxProcessed = commandsIn - 1;
+ syncPoint = maxProcessed;
+ }
+ }
+ }
+
+ public int getCommandsOut()
+ {
+ return commandsOut;
+ }
+
+ public int getCommandsIn()
+ {
+ return commandsIn;
+ }
+
+ public int nextCommandId()
+ {
+ return commandsIn++;
+ }
+
+ final void identify(Method cmd)
+ {
+ if (!incomingInit)
+ {
+ throw new IllegalStateException();
+ }
+
+ int id = nextCommandId();
+ cmd.setId(id);
+
+ if(LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("identify: ch={}, commandId={}", this.channel, id);
+ }
+
+ if ((id & 0xff) == 0)
+ {
+ flushProcessed(TIMELY_REPLY);
+ }
+ }
+
+ public void processed(Method command)
+ {
+ processed(command.getId());
+ }
+
+ public void processed(int command)
+ {
+ processed(command, command);
+ }
+
+ public void processed(Range range)
+ {
+
+ processed(range.getLower(), range.getUpper());
+ }
+
+ public void processed(int lower, int upper)
+ {
+ if(LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("{} ch={} processed([{},{}]) {} {}", this, channel, lower, upper, syncPoint, maxProcessed);
+ }
+
+ boolean flush;
+ synchronized (processedLock)
+ {
+ if(LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("{} processed: {}", this, processed);
+ }
+
+ if (ge(upper, commandsIn))
+ {
+ throw new IllegalArgumentException
+ ("range exceeds max received command-id: " + Range.newInstance(lower, upper));
+ }
+
+ processed.add(lower, upper);
+
+ Range first = processed.getFirst();
+
+ int flower = first.getLower();
+ int fupper = first.getUpper();
+ int old = maxProcessed;
+ if (le(flower, maxProcessed + 1))
+ {
+ maxProcessed = max(maxProcessed, fupper);
+ }
+ boolean synced = ge(maxProcessed, syncPoint);
+ flush = lt(old, syncPoint) && synced;
+ if (synced)
+ {
+ syncPoint = maxProcessed;
+ }
+ }
+ if (flush)
+ {
+ flushProcessed();
+ }
+ }
+
+ void flushExpected()
+ {
+ RangeSet rs = RangeSetFactory.createRangeSet();
+ synchronized (processedLock)
+ {
+ if (incomingInit)
+ {
+ rs.add(commandsIn);
+ }
+ }
+ sessionExpected(rs, null);
+ }
+
+ public void flushProcessed(Option... options)
+ {
+ RangeSet copy;
+ synchronized (processedLock)
+ {
+ copy = processed.copy();
+ }
+
+ synchronized (commandsLock)
+ {
+ if (state == DETACHED || state == CLOSING || state == CLOSED)
+ {
+ return;
+ }
+ if (copy.size() > 0)
+ {
+ sessionCompleted(copy, options);
+ }
+ }
+ }
+
+ void knownComplete(RangeSet kc)
+ {
+ if (kc.size() > 0)
+ {
+ synchronized (processedLock)
+ {
+ processed.subtract(kc) ;
+ }
+ }
+ }
+
+ void syncPoint()
+ {
+ int id = getCommandsIn() - 1;
+ LOGGER.debug("{} synced to {}", this, id);
+ boolean flush;
+ synchronized (processedLock)
+ {
+ syncPoint = id;
+ flush = ge(maxProcessed, syncPoint);
+ }
+ if (flush)
+ {
+ flushProcessed();
+ }
+ }
+
+ protected boolean complete(int lower, int upper)
+ {
+ //avoid autoboxing
+ if(LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("{} complete({}, {})", this, lower, upper);
+ }
+ synchronized (commandsLock)
+ {
+ int old = maxComplete;
+ for (int id = max(maxComplete, lower); le(id, upper); id++)
+ {
+ Method m = removeCommand(id);
+ if (m != null)
+ {
+ commandBytes -= m.getBodySize();
+ m.complete();
+ }
+ }
+ if (le(lower, maxComplete + 1))
+ {
+ maxComplete = max(maxComplete, upper);
+ }
+
+ if(LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("{} commands remaining: {}", this, commandsOut - maxComplete);
+ }
+
+ commandsLock.notifyAll();
+ return gt(maxComplete, old);
+ }
+ }
+
+ void received(Method m)
+ {
+ m.delegate(this, delegate);
+ }
+
+ private void send(Method m)
+ {
+ m.setChannel(channel);
+ connection.send(m);
+
+ if (!m.isBatch())
+ {
+ connection.flush();
+ }
+ }
+
+ protected boolean isBytesFull()
+ {
+ return commandBytes >= byteLimit;
+ }
+
+ protected boolean isCommandsFull(int id)
+ {
+ return id - maxComplete >= commandLimit;
+ }
+
+ @Override
+ public void invoke(Method m)
+ {
+ invoke(m,(Runnable)null);
+ }
+
+ public void invoke(Method m, Runnable postIdSettingAction)
+ {
+ if (m.getEncodedTrack() == Frame.L4)
+ {
+ synchronized (commandsLock)
+ {
+ if (state == DETACHED && m.isUnreliable())
+ {
+ Thread current = Thread.currentThread();
+ if (!current.equals(resumer))
+ {
+ return;
+ }
+ }
+
+ if (state != OPEN && state != CLOSED && state != CLOSING)
+ {
+ Thread current = Thread.currentThread();
+ if (!current.equals(resumer) )
+ {
+ Waiter w = new Waiter(commandsLock, timeout);
+ while (w.hasTime() && (state != OPEN && state != CLOSED))
+ {
+ checkFailoverRequired("Command was interrupted because of failover, before being sent");
+ w.await();
+ }
+ }
+ }
+
+ switch (state)
+ {
+ case OPEN:
+ break;
+ case RESUMING:
+ Thread current = Thread.currentThread();
+ if (!current.equals(resumer))
+ {
+ throw new SessionException
+ ("timed out waiting for resume to finish");
+ }
+ break;
+ case CLOSING:
+ case CLOSED:
+ org.apache.qpid.server.transport.ExecutionException exc = getException();
+ if (exc != null)
+ {
+ throw new SessionException(exc);
+ }
+ else
+ {
+ throw new SessionClosedException();
+ }
+ default:
+ throw new SessionException
+ (String.format
+ ("timed out waiting for session to become open " +
+ "(state=%s)", state));
+ }
+
+ int next;
+ next = commandsOut++;
+ m.setId(next);
+ if(postIdSettingAction != null)
+ {
+ postIdSettingAction.run();
+ }
+
+ if (isFull(next))
+ {
+ Waiter w = new Waiter(commandsLock, timeout);
+ while (w.hasTime() && isFull(next) && state != CLOSED)
+ {
+ if (state == OPEN || state == RESUMING)
+ {
+ try
+ {
+ sessionFlush(COMPLETED);
+ }
+ catch (SenderException e)
+ {
+ if (!closing)
+ {
+ // if expiry is > 0 then this will
+ // happen again on resume
+ LOGGER.error("error sending flush (full replay buffer)", e);
+ }
+ else
+ {
+ e.rethrow();
+ }
+ }
+ }
+ checkFailoverRequired("Command was interrupted because of failover, before being sent");
+ w.await();
+ }
+ }
+
+ if (state == CLOSED)
+ {
+ org.apache.qpid.server.transport.ExecutionException exc = getException();
+ if (exc != null)
+ {
+ throw new SessionException(exc);
+ }
+ else
+ {
+ throw new SessionClosedException();
+ }
+ }
+
+ if (isFull(next))
+ {
+ throw new SessionException("timed out waiting for completion");
+ }
+
+ if (next == 0)
+ {
+ sessionCommandPoint(0, 0);
+ }
+
+ boolean replayTransfer = !_isNoReplay && !closing && !transacted &&
+ m instanceof MessageTransfer &&
+ ! m.isUnreliable();
+
+ if ((replayTransfer) || m.hasCompletionListener())
+ {
+ setCommand(next, m);
+ commandBytes += m.getBodySize();
+ }
+ if (autoSync)
+ {
+ m.setSync(true);
+ }
+ needSync = !m.isSync();
+
+ try
+ {
+ send(m);
+ }
+ catch (SenderException e)
+ {
+ if (!closing)
+ {
+ // if we are not closing then this will happen
+ // again on resume
+ LOGGER.error("error sending command", e);
+ }
+ else
+ {
+ e.rethrow();
+ }
+ }
+ if (autoSync)
+ {
+ sync();
+ }
+
+ // flush every 64K commands to avoid ambiguity on
+ // wraparound
+ if (shouldIssueFlush(next))
+ {
+ try
+ {
+ sessionFlush(COMPLETED);
+ }
+ catch (SenderException e)
+ {
+ if (!closing)
+ {
+ // if expiry is > 0 then this will happen
+ // again on resume
+ LOGGER.error("error sending flush (periodic)", e);
+ }
+ else
+ {
+ e.rethrow();
+ }
+ }
+ }
+ }
+ }
+ else
+ {
+ send(m);
+ }
+ }
+
+ private void checkFailoverRequired(String message)
+ {
+ if (_failoverRequired.get())
+ {
+ throw new SessionException(message);
+ }
+ }
+
+ protected boolean shouldIssueFlush(int next)
+ {
+ return (next % 65536) == 0;
+ }
+
+ public void sync()
+ {
+ sync(timeout);
+ }
+
+ public void sync(long timeout)
+ {
+ LOGGER.debug("{} sync()", this);
+ synchronized (commandsLock)
+ {
+ int point = commandsOut - 1;
+
+ if (needSync && lt(maxComplete, point))
+ {
+ executionSync(SYNC);
+ }
+
+ Waiter w = new Waiter(commandsLock, timeout);
+ while (w.hasTime() && state != CLOSED && lt(maxComplete, point))
+ {
+ checkFailoverRequired("Session sync was interrupted by failover.");
+ if(LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("{} waiting for[{}]: {}, {}", this, point, maxComplete, commands);
+ }
+ w.await();
+ }
+
+ if (lt(maxComplete, point))
+ {
+ if (state != CLOSED)
+ {
+ throw new SessionException(
+ String.format("timed out waiting for sync: complete = %s, point = %s",
+ maxComplete, point));
+ }
+ else
+ {
+ org.apache.qpid.server.transport.ExecutionException ee = getException();
+ if (ee != null)
+ {
+ throw new SessionException(ee);
+ }
+ }
+ }
+ }
+ }
+
+ void result(int command, Struct result)
+ {
+ ResultFuture<?> future;
+ synchronized (results)
+ {
+ future = results.remove(command);
+ }
+
+ if (future != null)
+ {
+ future.set(result);
+ }
+ else
+ {
+ LOGGER.warn("Received a response to a command" +
+ " that's no longer valid on the client side." +
+ " [ command id : {} , result : {} ]", command, result);
+ }
+ }
+
+ void setException(org.apache.qpid.server.transport.ExecutionException exc)
+ {
+ synchronized (results)
+ {
+ if (exception != null)
+ {
+ throw new IllegalStateException(
+ String.format("too many exceptions: %s, %s", exception, exc));
+ }
+ exception = exc;
+ }
+ }
+
+ org.apache.qpid.server.transport.ExecutionException getException()
+ {
+ synchronized (results)
+ {
+ return exception;
+ }
+ }
+
+ @Override
+ protected <T> Future<T> invoke(Method m, Class<T> klass)
+ {
+ synchronized (commandsLock)
+ {
+ int command = commandsOut;
+ ResultFuture<T> future = new ResultFuture<T>(klass);
+ synchronized (results)
+ {
+ results.put(command, future);
+ }
+ invoke(m);
+ return future;
+ }
+ }
+
+ public final void messageTransfer(String destination,
+ MessageAcceptMode acceptMode,
+ MessageAcquireMode acquireMode,
+ Header header,
+ byte[] body,
+ Option ... _options) {
+ messageTransfer(destination, acceptMode, acquireMode, header,
+ ByteBuffer.wrap(body), _options);
+ }
+
+ public final void messageTransfer(String destination,
+ MessageAcceptMode acceptMode,
+ MessageAcquireMode acquireMode,
+ Header header,
+ String body,
+ Option ... _options) {
+ messageTransfer(destination, acceptMode, acquireMode, header,
+ toUTF8(body), _options);
+ }
+
+ public void exception(Throwable t)
+ {
+ LOGGER.error("caught exception", t);
+ }
+
+ public void closed()
+ {
+ synchronized (commandsLock)
+ {
+ if (closing || getException() != null)
+ {
+ state = CLOSED;
+ }
+ else
+ {
+ state = DETACHED;
+ }
+
+ commandsLock.notifyAll();
+
+ synchronized (results)
+ {
+ for (ResultFuture<?> result : results.values())
+ {
+ synchronized(result)
+ {
+ result.notifyAll();
+ }
+ }
+ }
+ if(state == CLOSED)
+ {
+ delegate.closed(this);
+ }
+ else
+ {
+ delegate.detached(this);
+ }
+ }
+
+ if(state == CLOSED)
+ {
+ connection.removeSession(this);
+ listener.closed(this);
+ }
+ }
+
+ public boolean isClosing()
+ {
+ return state == CLOSED || state == CLOSING;
+ }
+
+ @Override
+ public String toString()
+ {
+ return String.format("ssn:%s", name);
+ }
+
+ public void setTransacted(boolean b) {
+ this.transacted = b;
+ }
+
+ public boolean isTransacted(){
+ return transacted;
+ }
+
+ public void setDetachCode(SessionDetachCode dtc)
+ {
+ this.detachCode = dtc;
+ }
+
+ public SessionDetachCode getDetachCode()
+ {
+ return this.detachCode;
+ }
+
+ public Object getStateLock()
+ {
+ return stateLock;
+ }
+
+ protected void sendSessionAttached(final byte[] name, final Option... options)
+ {
+ super.sessionAttached(name, options);
+ }
+
+ public enum State { NEW, DETACHED, RESUMING, OPEN, CLOSING, CLOSED }
public interface MessageDispositionChangeListener
{
@@ -142,9 +984,14 @@ public class ServerSession extends Sessi
private final List<StoredMessage<MessageMetaData_0_10>> _uncommittedMessages = new ArrayList<>();
- public ServerSession(Connection connection, SessionDelegate delegate, Binary name, long expiry)
+ public ServerSession(ServerConnection connection, ServerSessionDelegate delegate, Binary name, long expiry)
{
- super(connection, delegate, name, expiry);
+ this.connection = connection;
+ this.delegate = delegate;
+ this.name = name;
+ this.closing = false;
+ this._isNoReplay = false;
+ initReceiver();
_transaction = new AsyncAutoCommitTransaction(this.getMessageStore(),this);
ServerConnection serverConnection = (ServerConnection) connection;
@@ -166,7 +1013,7 @@ public class ServerSession extends Sessi
{
if(runningAsSubject())
{
- super.setState(state);
+ setStateSuper(state);
if (state == State.OPEN)
{
@@ -188,6 +1035,16 @@ public class ServerSession extends Sessi
}
}
+ private void setStateSuper(State state)
+ {
+ synchronized (commandsLock)
+ {
+ this.state = state;
+ commandsLock.notifyAll();
+ }
+ }
+
+
private <T> T runAsSubject(final PrivilegedAction<T> privilegedAction)
{
return AccessController.doPrivileged(privilegedAction, getAccessControllerContext());
@@ -222,7 +1079,6 @@ public class ServerSession extends Sessi
_modelObject.getPublishAuthCache().authorisePublish(destination, routingKey, immediate, currentTime);
}
- @Override
protected boolean isFull(int id)
{
return isCommandsFull(id);
@@ -301,6 +1157,7 @@ public class ServerSession extends Sessi
{
dispositionChange(ranges, new MessageDispositionAction()
{
+ @Override
public void performAction(MessageDispositionChangeListener listener)
{
listener.onAccept();
@@ -313,6 +1170,7 @@ public class ServerSession extends Sessi
{
dispositionChange(ranges, new MessageDispositionAction()
{
+ @Override
public void performAction(MessageDispositionChangeListener listener)
{
listener.onRelease(setRedelivered);
@@ -324,6 +1182,7 @@ public class ServerSession extends Sessi
{
dispositionChange(ranges, new MessageDispositionAction()
{
+ @Override
public void performAction(MessageDispositionChangeListener listener)
{
listener.onReject();
@@ -469,7 +1328,6 @@ public class ServerSession extends Sessi
getAMQPConnection().getEventLogger().message(getLogSubject(), operationalLoggingMessage);
}
- @Override
protected void awaitClose()
{
// Broker shouldn't block awaiting close - thus do override this method to do nothing
@@ -737,12 +1595,15 @@ public class ServerSession extends Sessi
return getConnection().getAmqpConnection();
}
- @Override
public ServerConnection getConnection()
{
- return (ServerConnection) super.getConnection();
+ return (ServerConnection) getConnectionSuper();
}
+ private ServerConnection getConnectionSuper()
+ {
+ return connection;
+ }
public LogSubject getLogSubject()
{
@@ -839,7 +1700,7 @@ public class ServerSession extends Sessi
@Override
public String toLogString()
{
- long connectionId = super.getConnection() instanceof ServerConnection
+ long connectionId = getConnection() instanceof ServerConnection
? getConnection().getConnectionId()
: -1;
String authorizedPrincipal = (getAuthorizedPrincipal() == null) ? "?" : getAuthorizedPrincipal().getName();
@@ -861,7 +1722,6 @@ public class ServerSession extends Sessi
close();
}
- @Override
public void close()
{
// unregister subscriptions in order to prevent sending of new messages
@@ -871,7 +1731,35 @@ public class ServerSession extends Sessi
{
_modelObject.delete();
}
- super.close();
+ closeSuper();
+ }
+
+ private void closeSuper()
+ {
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Closing [{}] in state [{}]", this, state);
+ }
+ synchronized (commandsLock)
+ {
+ switch(state)
+ {
+ case DETACHED:
+ state = CLOSED;
+ delegate.closed(this);
+ connection.removeSession(this);
+ listener.closed(this);
+ break;
+ case CLOSED:
+ break;
+ default:
+ state = CLOSING;
+ setClose(true);
+ sessionRequestTimeout(0);
+ sessionDetach(name.getBytes());
+ awaitClose();
+ }
+ }
}
void unregisterSubscriptions()
@@ -946,6 +1834,7 @@ public class ServerSession extends Sessi
return _unfinishedCommandsQueue.isEmpty() ? null : _unfinishedCommandsQueue.getLast();
}
+ @Override
public void recordFuture(final ListenableFuture<Void> future, final ServerTransaction.Action action)
{
_unfinishedCommandsQueue.add(new AsyncCommand(future, action));
@@ -1068,6 +1957,31 @@ public class ServerSession extends Sessi
return _modelObject.getMaxUncommittedInMemorySize();
}
+ static class DefaultSessionListener implements SessionListener
+ {
+
+ @Override
+ public void opened(ServerSession ssn) {}
+
+ @Override
+ public void resumed(ServerSession ssn) {}
+
+ @Override
+ public void message(ServerSession ssn, MessageTransfer xfr)
+ {
+ LOGGER.info("message: {}", xfr);
+ }
+
+ @Override
+ public void exception(ServerSession ssn, SessionException exc)
+ {
+ LOGGER.error("session exception", exc);
+ }
+
+ @Override
+ public void closed(ServerSession ssn) {}
+ }
+
private class CheckCapacityAction implements Action<MessageInstance>
{
@Override
@@ -1080,4 +1994,75 @@ public class ServerSession extends Sessi
}
}
}
+
+ private class ResultFuture<T> implements Future<T>
+ {
+
+ private final Class<T> klass;
+ private T result;
+
+ private ResultFuture(Class<T> klass)
+ {
+ this.klass = klass;
+ }
+
+ private void set(Struct result)
+ {
+ synchronized (this)
+ {
+ this.result = klass.cast(result);
+ notifyAll();
+ }
+ }
+
+ public T get(long timeout)
+ {
+ synchronized (this)
+ {
+ Waiter w = new Waiter(this, timeout);
+ while (w.hasTime() && state != CLOSED && !isDone())
+ {
+ checkFailoverRequired("Operation was interrupted by failover.");
+ LOGGER.debug("{} waiting for result: {}", ServerSession.this, this);
+ w.await();
+ }
+ }
+
+ if (isDone())
+ {
+ return result;
+ }
+ else if (state == CLOSED)
+ {
+ org.apache.qpid.server.transport.ExecutionException ex = getException();
+ if(ex == null)
+ {
+ throw new SessionClosedException();
+ }
+ throw new SessionException(ex);
+ }
+ else
+ {
+ throw new SessionException(
+ String.format("%s timed out waiting for result: %s",
+ ServerSession.this, this));
+ }
+ }
+
+ public T get()
+ {
+ return get(timeout);
+ }
+
+ public boolean isDone()
+ {
+ return result != null;
+ }
+
+ public String toString()
+ {
+ return String.format("Future(%s)", isDone() ? result : klass);
+ }
+
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org