You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2017/02/17 08:08:51 UTC
svn commit: r1783342 [2/2] -
/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java?rev=1783342&r1=1783341&r2=1783342&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java Fri Feb 17 08:08:51 2017
@@ -84,7 +84,7 @@ import org.apache.qpid.server.virtualhos
import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
import org.apache.qpid.server.transport.*;
-public class ServerSessionDelegate extends SessionDelegate
+public class ServerSessionDelegate extends MethodDelegate<ServerSession> implements ProtocolDelegate<ServerSession>
{
private static final Logger LOGGER = LoggerFactory.getLogger(ServerSessionDelegate.class);
@@ -94,14 +94,14 @@ public class ServerSessionDelegate exten
}
@Override
- public void command(Session session, Method method)
+ public void command(ServerSession session, Method method)
{
try
{
if(!session.isClosing())
{
Object asyncCommandMark = ((ServerSession)session).getAsyncCommandMark();
- super.command(session, method, false);
+ command(session, method, false);
Object newOutstanding = ((ServerSession)session).getAsyncCommandMark();
if(newOutstanding == null || newOutstanding == asyncCommandMark)
{
@@ -132,7 +132,7 @@ public class ServerSessionDelegate exten
}
@Override
- public void messageAccept(Session session, MessageAccept method)
+ public void messageAccept(ServerSession session, MessageAccept method)
{
final ServerSession serverSession = (ServerSession) session;
serverSession.accept(method.getTransfers());
@@ -144,19 +144,19 @@ public class ServerSessionDelegate exten
}
@Override
- public void messageReject(Session session, MessageReject method)
+ public void messageReject(ServerSession session, MessageReject method)
{
((ServerSession)session).reject(method.getTransfers());
}
@Override
- public void messageRelease(Session session, MessageRelease method)
+ public void messageRelease(ServerSession session, MessageRelease method)
{
((ServerSession)session).release(method.getTransfers(), method.getSetRedelivered());
}
@Override
- public void messageAcquire(Session session, MessageAcquire method)
+ public void messageAcquire(ServerSession session, MessageAcquire method)
{
RangeSet acquiredRanges = ((ServerSession)session).acquire(method.getTransfers());
@@ -169,13 +169,13 @@ public class ServerSessionDelegate exten
}
@Override
- public void messageResume(Session session, MessageResume method)
+ public void messageResume(ServerSession session, MessageResume method)
{
super.messageResume(session, method);
}
@Override
- public void messageSubscribe(Session session, MessageSubscribe method)
+ public void messageSubscribe(ServerSession session, MessageSubscribe method)
{
/*
TODO - work around broken Python tests
@@ -411,7 +411,7 @@ public class ServerSessionDelegate exten
}
@Override
- public void messageTransfer(Session ssn, final MessageTransfer xfr)
+ public void messageTransfer(ServerSession ssn, final MessageTransfer xfr)
{
try
{
@@ -549,7 +549,7 @@ public class ServerSessionDelegate exten
}
@Override
- public void messageCancel(Session session, MessageCancel method)
+ public void messageCancel(ServerSession session, MessageCancel method)
{
String destination = method.getDestination();
@@ -566,7 +566,7 @@ public class ServerSessionDelegate exten
}
@Override
- public void messageFlush(Session session, MessageFlush method)
+ public void messageFlush(ServerSession session, MessageFlush method)
{
String destination = method.getDestination();
@@ -583,35 +583,35 @@ public class ServerSessionDelegate exten
}
@Override
- public void txSelect(Session session, TxSelect method)
+ public void txSelect(ServerSession session, TxSelect method)
{
// TODO - check current tx mode
((ServerSession)session).selectTx();
}
@Override
- public void txCommit(Session session, TxCommit method)
+ public void txCommit(ServerSession session, TxCommit method)
{
// TODO - check current tx mode
((ServerSession)session).commit();
}
@Override
- public void txRollback(Session session, TxRollback method)
+ public void txRollback(ServerSession session, TxRollback method)
{
// TODO - check current tx mode
((ServerSession)session).rollback();
}
@Override
- public void dtxSelect(Session session, DtxSelect method)
+ public void dtxSelect(ServerSession session, DtxSelect method)
{
// TODO - check current tx mode
((ServerSession)session).selectDtx();
}
@Override
- public void dtxStart(Session session, DtxStart method)
+ public void dtxStart(ServerSession session, DtxStart method)
{
XaResult result = new XaResult();
result.setStatus(DtxXaStatus.XA_OK);
@@ -641,7 +641,7 @@ public class ServerSessionDelegate exten
}
@Override
- public void dtxEnd(Session session, DtxEnd method)
+ public void dtxEnd(ServerSession session, DtxEnd method)
{
XaResult result = new XaResult();
result.setStatus(DtxXaStatus.XA_OK);
@@ -677,7 +677,7 @@ public class ServerSessionDelegate exten
}
@Override
- public void dtxCommit(Session session, DtxCommit method)
+ public void dtxCommit(ServerSession session, DtxCommit method)
{
XaResult result = new XaResult();
result.setStatus(DtxXaStatus.XA_OK);
@@ -713,7 +713,7 @@ public class ServerSessionDelegate exten
}
@Override
- public void dtxForget(Session session, DtxForget method)
+ public void dtxForget(ServerSession session, DtxForget method)
{
try
{
@@ -731,7 +731,7 @@ public class ServerSessionDelegate exten
}
@Override
- public void dtxGetTimeout(Session session, DtxGetTimeout method)
+ public void dtxGetTimeout(ServerSession session, DtxGetTimeout method)
{
GetTimeoutResult result = new GetTimeoutResult();
try
@@ -746,7 +746,7 @@ public class ServerSessionDelegate exten
}
@Override
- public void dtxPrepare(Session session, DtxPrepare method)
+ public void dtxPrepare(ServerSession session, DtxPrepare method)
{
XaResult result = new XaResult();
result.setStatus(DtxXaStatus.XA_OK);
@@ -782,7 +782,7 @@ public class ServerSessionDelegate exten
}
@Override
- public void dtxRecover(Session session, DtxRecover method)
+ public void dtxRecover(ServerSession session, DtxRecover method)
{
RecoverResult result = new RecoverResult();
List inDoubt = ((ServerSession)session).recoverDtx();
@@ -791,7 +791,7 @@ public class ServerSessionDelegate exten
}
@Override
- public void dtxRollback(Session session, DtxRollback method)
+ public void dtxRollback(ServerSession session, DtxRollback method)
{
XaResult result = new XaResult();
@@ -824,7 +824,7 @@ public class ServerSessionDelegate exten
}
@Override
- public void dtxSetTimeout(Session session, DtxSetTimeout method)
+ public void dtxSetTimeout(ServerSession session, DtxSetTimeout method)
{
try
{
@@ -837,14 +837,20 @@ public class ServerSessionDelegate exten
}
@Override
- public void executionSync(final Session ssn, final ExecutionSync sync)
+ public void executionSync(final ServerSession ssn, final ExecutionSync sync)
{
((ServerSession)ssn).awaitCommandCompletion();
- super.executionSync(ssn, sync);
+ executionSyncSuper(ssn, sync);
}
+ private void executionSyncSuper(ServerSession ssn, ExecutionSync sync)
+ {
+ ssn.syncPoint();
+ }
+
+
@Override
- public void exchangeDeclare(Session session, ExchangeDeclare method)
+ public void exchangeDeclare(ServerSession session, ExchangeDeclare method)
{
String exchangeName = method.getExchange();
NamedAddressSpace addressSpace = getAddressSpace(session);
@@ -965,7 +971,7 @@ public class ServerSessionDelegate exten
}
}
- private void exception(Session session, Method method, ExecutionErrorCode errorCode, String description)
+ private void exception(ServerSession session, Method method, ExecutionErrorCode errorCode, String description)
{
ExecutionException ex = new ExecutionException();
ex.setErrorCode(errorCode);
@@ -977,7 +983,7 @@ public class ServerSessionDelegate exten
((ServerSession)session).close(errorCode.getValue(), description);
}
- private Exchange<?> getExchange(Session session, String exchangeName)
+ private Exchange<?> getExchange(ServerSession session, String exchangeName)
{
return getExchange(getAddressSpace(session),exchangeName);
}
@@ -995,7 +1001,7 @@ public class ServerSessionDelegate exten
return source instanceof Queue ? (Queue<?>) source : null;
}
- private MessageDestination getDestinationForMessage(Session ssn, MessageTransfer xfr)
+ private MessageDestination getDestinationForMessage(ServerSession ssn, MessageTransfer xfr)
{
NamedAddressSpace addressSpace = getAddressSpace(ssn);
@@ -1046,29 +1052,29 @@ public class ServerSessionDelegate exten
return destination;
}
- private NamedAddressSpace getAddressSpace(Session session)
+ private NamedAddressSpace getAddressSpace(ServerSession session)
{
ServerConnection conn = getServerConnection(session);
return conn.getAddressSpace();
}
- private ServerConnection getServerConnection(Session session)
+ private ServerConnection getServerConnection(ServerSession session)
{
return (ServerConnection) session.getConnection();
}
- private <T> T getContextValue(Session session, Class<T> clazz, String name)
+ private <T> T getContextValue(ServerSession session, Class<T> clazz, String name)
{
return getServerConnection(session).getAmqpConnection().getContextProvider().getContextValue(clazz, name);
}
- private EventLogger getEventLogger(Session session)
+ private EventLogger getEventLogger(ServerSession session)
{
return getServerConnection(session).getAmqpConnection().getEventLogger();
}
@Override
- public void exchangeDelete(Session session, ExchangeDelete method)
+ public void exchangeDelete(ServerSession session, ExchangeDelete method)
{
if (nameNullOrEmpty(method.getExchange()))
{
@@ -1121,7 +1127,7 @@ public class ServerSessionDelegate exten
}
@Override
- public void exchangeQuery(Session session, ExchangeQuery method)
+ public void exchangeQuery(ServerSession session, ExchangeQuery method)
{
ExchangeQueryResult result = new ExchangeQueryResult();
@@ -1155,7 +1161,7 @@ public class ServerSessionDelegate exten
}
@Override
- public void exchangeBind(Session session, ExchangeBind method)
+ public void exchangeBind(ServerSession session, ExchangeBind method)
{
NamedAddressSpace addressSpace = getAddressSpace(session);
@@ -1221,7 +1227,7 @@ public class ServerSessionDelegate exten
}
@Override
- public void exchangeUnbind(Session session, ExchangeUnbind method)
+ public void exchangeUnbind(ServerSession session, ExchangeUnbind method)
{
NamedAddressSpace addressSpace = getAddressSpace(session);
@@ -1267,7 +1273,7 @@ public class ServerSessionDelegate exten
}
@Override
- public void exchangeBound(Session session, ExchangeBound method)
+ public void exchangeBound(ServerSession session, ExchangeBound method)
{
ExchangeBoundResult result = new ExchangeBoundResult();
@@ -1453,18 +1459,18 @@ public class ServerSessionDelegate exten
}
- private MessageSource getMessageSource(Session session, String queue)
+ private MessageSource getMessageSource(ServerSession session, String queue)
{
return getAddressSpace(session).getAttainedMessageSource(queue);
}
- private Queue<?> getQueue(Session session, String queue)
+ private Queue<?> getQueue(ServerSession session, String queue)
{
return getQueue(getAddressSpace(session), queue);
}
@Override
- public void queueDeclare(Session session, final QueueDeclare method)
+ public void queueDeclare(ServerSession session, final QueueDeclare method)
{
final NamedAddressSpace addressSpace = getAddressSpace(session);
@@ -1581,7 +1587,7 @@ public class ServerSessionDelegate exten
}
@Override
- public void queueDelete(Session session, QueueDelete method)
+ public void queueDelete(ServerSession session, QueueDelete method)
{
String queueName = method.getQueue();
if(queueName == null || queueName.length()==0)
@@ -1630,7 +1636,7 @@ public class ServerSessionDelegate exten
}
@Override
- public void queuePurge(Session session, QueuePurge method)
+ public void queuePurge(ServerSession session, QueuePurge method)
{
String queueName = method.getQueue();
if(queueName == null || queueName.length()==0)
@@ -1660,7 +1666,7 @@ public class ServerSessionDelegate exten
}
@Override
- public void queueQuery(Session session, QueueQuery method)
+ public void queueQuery(ServerSession session, QueueQuery method)
{
QueueQueryResult result = new QueueQueryResult();
@@ -1701,7 +1707,7 @@ public class ServerSessionDelegate exten
}
@Override
- public void messageSetFlowMode(Session session, MessageSetFlowMode sfm)
+ public void messageSetFlowMode(ServerSession session, MessageSetFlowMode sfm)
{
String destination = sfm.getDestination();
@@ -1722,7 +1728,7 @@ public class ServerSessionDelegate exten
}
@Override
- public void messageStop(Session session, MessageStop stop)
+ public void messageStop(ServerSession session, MessageStop stop)
{
String destination = stop.getDestination();
@@ -1740,7 +1746,7 @@ public class ServerSessionDelegate exten
}
@Override
- public void messageFlow(Session session, MessageFlow flow)
+ public void messageFlow(ServerSession session, MessageFlow flow)
{
String destination = flow.getDestination();
@@ -1757,8 +1763,7 @@ public class ServerSessionDelegate exten
}
- @Override
- public void closed(Session session)
+ public void closed(ServerSession session)
{
ServerSession serverSession = (ServerSession)session;
@@ -1767,12 +1772,150 @@ public class ServerSessionDelegate exten
serverSession.unregisterSubscriptions();
}
- @Override
- public void detached(Session session)
+ public void detached(ServerSession session)
{
closed(session);
}
+ public void init(ServerSession ssn, ProtocolHeader hdr)
+ {
+ LOGGER.warn("INIT: [{}] {}", ssn, hdr);
+ }
+
+ public void control(ServerSession ssn, Method method)
+ {
+ method.dispatch(ssn, this);
+ }
+
+ public void command(ServerSession ssn, Method method, boolean processed)
+ {
+ ssn.identify(method);
+ method.dispatch(ssn, this);
+ if (processed)
+ {
+ ssn.processed(method);
+ }
+ }
+
+ public void error(ServerSession ssn, ProtocolError error)
+ {
+ LOGGER.warn("ERROR: [{}] {}", ssn, error);
+ }
+
+ public void handle(ServerSession ssn, Method method)
+ {
+ LOGGER.warn("UNHANDLED: [{}] {}", ssn, method);
+ }
+
+ @Override public void sessionRequestTimeout(ServerSession ssn, SessionRequestTimeout t)
+ {
+ if (t.getTimeout() == 0)
+ {
+ ssn.setClose(true);
+ }
+ ssn.sessionTimeout(0); // Always report back an expiry of 0 until it is implemented
+ }
+
+ @Override public void sessionAttached(ServerSession ssn, SessionAttached atc)
+ {
+ ssn.setState(ServerSession.State.OPEN);
+ synchronized (ssn.getStateLock())
+ {
+ ssn.getStateLock().notifyAll();
+ }
+ }
+
+ @Override public void sessionTimeout(ServerSession ssn, SessionTimeout t)
+ {
+ // Setting of expiry is not implemented
+ }
+
+ @Override public void sessionCompleted(ServerSession ssn, SessionCompleted cmp)
+ {
+ RangeSet ranges = cmp.getCommands();
+ RangeSet known = null;
+
+ if (ranges != null)
+ {
+ if(ranges.size() == 1)
+ {
+ Range range = ranges.getFirst();
+ boolean advanced = ssn.complete(range.getLower(), range.getUpper());
+
+ if(advanced && cmp.getTimelyReply())
+ {
+ known = range;
+ }
+ }
+ else
+ {
+ if (cmp.getTimelyReply())
+ {
+ known = RangeSetFactory.createRangeSet();
+ }
+ for (Range range : ranges)
+ {
+ boolean advanced = ssn.complete(range.getLower(), range.getUpper());
+ if (advanced && known != null)
+ {
+ known.add(range);
+ }
+ }
+ }
+ }
+ else if (cmp.getTimelyReply())
+ {
+ known = RangeSetFactory.createRangeSet();
+ }
+
+ if (known != null)
+ {
+ ssn.sessionKnownCompleted(known);
+ }
+ }
+
+ @Override public void sessionKnownCompleted(ServerSession ssn, SessionKnownCompleted kcmp)
+ {
+ RangeSet kc = kcmp.getCommands();
+ if (kc != null)
+ {
+ ssn.knownComplete(kc);
+ }
+ }
+
+ @Override public void sessionFlush(ServerSession ssn, SessionFlush flush)
+ {
+ if (flush.getCompleted())
+ {
+ ssn.flushProcessed();
+ }
+ if (flush.getConfirmed())
+ {
+ ssn.flushProcessed();
+ }
+ if (flush.getExpected())
+ {
+ ssn.flushExpected();
+ }
+ }
+
+ @Override public void sessionCommandPoint(ServerSession ssn, SessionCommandPoint scp)
+ {
+ ssn.commandPoint(scp.getCommandId());
+ }
+
+ @Override public void executionResult(ServerSession ssn, ExecutionResult result)
+ {
+ ssn.result(result.getCommandId(), result.getValue());
+ }
+
+ @Override public void executionException(ServerSession ssn, ExecutionException exc)
+ {
+ ssn.setException(exc);
+ ssn.getSessionListener().exception(ssn, new SessionException(exc));
+ ssn.closed();
+ }
+
private static class CommandProcessedAction implements ServerTransaction.Action
{
private final ServerSession _serverSession;
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/SessionListener.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/SessionListener.java?rev=1783342&r1=1783341&r2=1783342&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/SessionListener.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/SessionListener.java Fri Feb 17 08:08:51 2017
@@ -32,14 +32,14 @@ import org.apache.qpid.server.transport.
public interface SessionListener
{
- void opened(Session session);
+ void opened(ServerSession session);
- void resumed(Session session);
+ void resumed(ServerSession session);
- void message(Session ssn, MessageTransfer xfr);
+ void message(ServerSession ssn, MessageTransfer xfr);
- void exception(Session session, SessionException exception);
+ void exception(ServerSession session, SessionException exception);
- void closed(Session session);
+ void closed(ServerSession session);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org