You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2017/02/17 08:08:51 UTC

svn commit: r1783342 [2/2] - /qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java?rev=1783342&r1=1783341&r2=1783342&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java Fri Feb 17 08:08:51 2017
@@ -84,7 +84,7 @@ import org.apache.qpid.server.virtualhos
 import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
 import org.apache.qpid.server.transport.*;
 
-public class ServerSessionDelegate extends SessionDelegate
+public class ServerSessionDelegate extends MethodDelegate<ServerSession> implements ProtocolDelegate<ServerSession>
 {
     private static final Logger LOGGER = LoggerFactory.getLogger(ServerSessionDelegate.class);
 
@@ -94,14 +94,14 @@ public class ServerSessionDelegate exten
     }
 
     @Override
-    public void command(Session session, Method method)
+    public void command(ServerSession session, Method method)
     {
         try
         {
             if(!session.isClosing())
             {
                 Object asyncCommandMark = ((ServerSession)session).getAsyncCommandMark();
-                super.command(session, method, false);
+                command(session, method, false);
                 Object newOutstanding = ((ServerSession)session).getAsyncCommandMark();
                 if(newOutstanding == null || newOutstanding == asyncCommandMark)
                 {
@@ -132,7 +132,7 @@ public class ServerSessionDelegate exten
     }
 
     @Override
-    public void messageAccept(Session session, MessageAccept method)
+    public void messageAccept(ServerSession session, MessageAccept method)
     {
         final ServerSession serverSession = (ServerSession) session;
         serverSession.accept(method.getTransfers());
@@ -144,19 +144,19 @@ public class ServerSessionDelegate exten
     }
 
     @Override
-    public void messageReject(Session session, MessageReject method)
+    public void messageReject(ServerSession session, MessageReject method)
     {
         ((ServerSession)session).reject(method.getTransfers());
     }
 
     @Override
-    public void messageRelease(Session session, MessageRelease method)
+    public void messageRelease(ServerSession session, MessageRelease method)
     {
         ((ServerSession)session).release(method.getTransfers(), method.getSetRedelivered());
     }
 
     @Override
-    public void messageAcquire(Session session, MessageAcquire method)
+    public void messageAcquire(ServerSession session, MessageAcquire method)
     {
         RangeSet acquiredRanges = ((ServerSession)session).acquire(method.getTransfers());
 
@@ -169,13 +169,13 @@ public class ServerSessionDelegate exten
     }
 
     @Override
-    public void messageResume(Session session, MessageResume method)
+    public void messageResume(ServerSession session, MessageResume method)
     {
         super.messageResume(session, method);
     }
 
     @Override
-    public void messageSubscribe(Session session, MessageSubscribe method)
+    public void messageSubscribe(ServerSession session, MessageSubscribe method)
     {
         /*
           TODO - work around broken Python tests
@@ -411,7 +411,7 @@ public class ServerSessionDelegate exten
     }
 
     @Override
-    public void messageTransfer(Session ssn, final MessageTransfer xfr)
+    public void messageTransfer(ServerSession ssn, final MessageTransfer xfr)
     {
         try
         {
@@ -549,7 +549,7 @@ public class ServerSessionDelegate exten
     }
 
     @Override
-    public void messageCancel(Session session, MessageCancel method)
+    public void messageCancel(ServerSession session, MessageCancel method)
     {
         String destination = method.getDestination();
 
@@ -566,7 +566,7 @@ public class ServerSessionDelegate exten
     }
 
     @Override
-    public void messageFlush(Session session, MessageFlush method)
+    public void messageFlush(ServerSession session, MessageFlush method)
     {
         String destination = method.getDestination();
 
@@ -583,35 +583,35 @@ public class ServerSessionDelegate exten
     }
 
     @Override
-    public void txSelect(Session session, TxSelect method)
+    public void txSelect(ServerSession session, TxSelect method)
     {
         // TODO - check current tx mode
         ((ServerSession)session).selectTx();
     }
 
     @Override
-    public void txCommit(Session session, TxCommit method)
+    public void txCommit(ServerSession session, TxCommit method)
     {
         // TODO - check current tx mode
         ((ServerSession)session).commit();
     }
 
     @Override
-    public void txRollback(Session session, TxRollback method)
+    public void txRollback(ServerSession session, TxRollback method)
     {
         // TODO - check current tx mode
         ((ServerSession)session).rollback();
     }
 
     @Override
-    public void dtxSelect(Session session, DtxSelect method)
+    public void dtxSelect(ServerSession session, DtxSelect method)
     {
         // TODO - check current tx mode
         ((ServerSession)session).selectDtx();
     }
 
     @Override
-    public void dtxStart(Session session, DtxStart method)
+    public void dtxStart(ServerSession session, DtxStart method)
     {
         XaResult result = new XaResult();
         result.setStatus(DtxXaStatus.XA_OK);
@@ -641,7 +641,7 @@ public class ServerSessionDelegate exten
     }
 
     @Override
-    public void dtxEnd(Session session, DtxEnd method)
+    public void dtxEnd(ServerSession session, DtxEnd method)
     {
         XaResult result = new XaResult();
         result.setStatus(DtxXaStatus.XA_OK);
@@ -677,7 +677,7 @@ public class ServerSessionDelegate exten
     }
 
     @Override
-    public void dtxCommit(Session session, DtxCommit method)
+    public void dtxCommit(ServerSession session, DtxCommit method)
     {
         XaResult result = new XaResult();
         result.setStatus(DtxXaStatus.XA_OK);
@@ -713,7 +713,7 @@ public class ServerSessionDelegate exten
     }
 
     @Override
-    public void dtxForget(Session session, DtxForget method)
+    public void dtxForget(ServerSession session, DtxForget method)
     {
         try
         {
@@ -731,7 +731,7 @@ public class ServerSessionDelegate exten
     }
 
     @Override
-    public void dtxGetTimeout(Session session, DtxGetTimeout method)
+    public void dtxGetTimeout(ServerSession session, DtxGetTimeout method)
     {
         GetTimeoutResult result = new GetTimeoutResult();
         try
@@ -746,7 +746,7 @@ public class ServerSessionDelegate exten
     }
 
     @Override
-    public void dtxPrepare(Session session, DtxPrepare method)
+    public void dtxPrepare(ServerSession session, DtxPrepare method)
     {
         XaResult result = new XaResult();
         result.setStatus(DtxXaStatus.XA_OK);
@@ -782,7 +782,7 @@ public class ServerSessionDelegate exten
     }
 
     @Override
-    public void dtxRecover(Session session, DtxRecover method)
+    public void dtxRecover(ServerSession session, DtxRecover method)
     {
         RecoverResult result = new RecoverResult();
         List inDoubt = ((ServerSession)session).recoverDtx();
@@ -791,7 +791,7 @@ public class ServerSessionDelegate exten
     }
 
     @Override
-    public void dtxRollback(Session session, DtxRollback method)
+    public void dtxRollback(ServerSession session, DtxRollback method)
     {
 
         XaResult result = new XaResult();
@@ -824,7 +824,7 @@ public class ServerSessionDelegate exten
     }
 
     @Override
-    public void dtxSetTimeout(Session session, DtxSetTimeout method)
+    public void dtxSetTimeout(ServerSession session, DtxSetTimeout method)
     {
         try
         {
@@ -837,14 +837,20 @@ public class ServerSessionDelegate exten
     }
 
     @Override
-    public void executionSync(final Session ssn, final ExecutionSync sync)
+    public void executionSync(final ServerSession ssn, final ExecutionSync sync)
     {
         ((ServerSession)ssn).awaitCommandCompletion();
-        super.executionSync(ssn, sync);
+        executionSyncSuper(ssn, sync);
     }
 
+    private void executionSyncSuper(ServerSession ssn, ExecutionSync sync)
+    {
+        ssn.syncPoint();
+    }
+
+
     @Override
-    public void exchangeDeclare(Session session, ExchangeDeclare method)
+    public void exchangeDeclare(ServerSession session, ExchangeDeclare method)
     {
         String exchangeName = method.getExchange();
         NamedAddressSpace addressSpace = getAddressSpace(session);
@@ -965,7 +971,7 @@ public class ServerSessionDelegate exten
         }
     }
 
-    private void exception(Session session, Method method, ExecutionErrorCode errorCode, String description)
+    private void exception(ServerSession session, Method method, ExecutionErrorCode errorCode, String description)
     {
         ExecutionException ex = new ExecutionException();
         ex.setErrorCode(errorCode);
@@ -977,7 +983,7 @@ public class ServerSessionDelegate exten
         ((ServerSession)session).close(errorCode.getValue(), description);
     }
 
-    private Exchange<?> getExchange(Session session, String exchangeName)
+    private Exchange<?> getExchange(ServerSession session, String exchangeName)
     {
         return getExchange(getAddressSpace(session),exchangeName);
     }
@@ -995,7 +1001,7 @@ public class ServerSessionDelegate exten
         return source instanceof Queue ? (Queue<?>) source : null;
     }
 
-    private MessageDestination getDestinationForMessage(Session ssn, MessageTransfer xfr)
+    private MessageDestination getDestinationForMessage(ServerSession ssn, MessageTransfer xfr)
     {
         NamedAddressSpace addressSpace = getAddressSpace(ssn);
 
@@ -1046,29 +1052,29 @@ public class ServerSessionDelegate exten
         return destination;
     }
 
-    private NamedAddressSpace getAddressSpace(Session session)
+    private NamedAddressSpace getAddressSpace(ServerSession session)
     {
         ServerConnection conn = getServerConnection(session);
         return conn.getAddressSpace();
     }
 
-    private ServerConnection getServerConnection(Session session)
+    private ServerConnection getServerConnection(ServerSession session)
     {
         return (ServerConnection) session.getConnection();
     }
 
-    private <T> T getContextValue(Session session, Class<T> clazz, String name)
+    private <T> T getContextValue(ServerSession session, Class<T> clazz, String name)
     {
         return getServerConnection(session).getAmqpConnection().getContextProvider().getContextValue(clazz, name);
     }
 
-    private EventLogger getEventLogger(Session session)
+    private EventLogger getEventLogger(ServerSession session)
     {
         return getServerConnection(session).getAmqpConnection().getEventLogger();
     }
 
     @Override
-    public void exchangeDelete(Session session, ExchangeDelete method)
+    public void exchangeDelete(ServerSession session, ExchangeDelete method)
     {
         if (nameNullOrEmpty(method.getExchange()))
         {
@@ -1121,7 +1127,7 @@ public class ServerSessionDelegate exten
     }
 
     @Override
-    public void exchangeQuery(Session session, ExchangeQuery method)
+    public void exchangeQuery(ServerSession session, ExchangeQuery method)
     {
 
         ExchangeQueryResult result = new ExchangeQueryResult();
@@ -1155,7 +1161,7 @@ public class ServerSessionDelegate exten
     }
 
     @Override
-    public void exchangeBind(Session session, ExchangeBind method)
+    public void exchangeBind(ServerSession session, ExchangeBind method)
     {
 
         NamedAddressSpace addressSpace = getAddressSpace(session);
@@ -1221,7 +1227,7 @@ public class ServerSessionDelegate exten
     }
 
     @Override
-    public void exchangeUnbind(Session session, ExchangeUnbind method)
+    public void exchangeUnbind(ServerSession session, ExchangeUnbind method)
     {
         NamedAddressSpace addressSpace = getAddressSpace(session);
 
@@ -1267,7 +1273,7 @@ public class ServerSessionDelegate exten
     }
 
     @Override
-    public void exchangeBound(Session session, ExchangeBound method)
+    public void exchangeBound(ServerSession session, ExchangeBound method)
     {
 
         ExchangeBoundResult result = new ExchangeBoundResult();
@@ -1453,18 +1459,18 @@ public class ServerSessionDelegate exten
 
     }
 
-    private MessageSource getMessageSource(Session session, String queue)
+    private MessageSource getMessageSource(ServerSession session, String queue)
     {
         return getAddressSpace(session).getAttainedMessageSource(queue);
     }
 
-    private Queue<?> getQueue(Session session, String queue)
+    private Queue<?> getQueue(ServerSession session, String queue)
     {
         return getQueue(getAddressSpace(session), queue);
     }
 
     @Override
-    public void queueDeclare(Session session, final QueueDeclare method)
+    public void queueDeclare(ServerSession session, final QueueDeclare method)
     {
 
         final NamedAddressSpace addressSpace = getAddressSpace(session);
@@ -1581,7 +1587,7 @@ public class ServerSessionDelegate exten
     }
 
     @Override
-    public void queueDelete(Session session, QueueDelete method)
+    public void queueDelete(ServerSession session, QueueDelete method)
     {
         String queueName = method.getQueue();
         if(queueName == null || queueName.length()==0)
@@ -1630,7 +1636,7 @@ public class ServerSessionDelegate exten
     }
 
     @Override
-    public void queuePurge(Session session, QueuePurge method)
+    public void queuePurge(ServerSession session, QueuePurge method)
     {
         String queueName = method.getQueue();
         if(queueName == null || queueName.length()==0)
@@ -1660,7 +1666,7 @@ public class ServerSessionDelegate exten
     }
 
     @Override
-    public void queueQuery(Session session, QueueQuery method)
+    public void queueQuery(ServerSession session, QueueQuery method)
     {
         QueueQueryResult result = new QueueQueryResult();
 
@@ -1701,7 +1707,7 @@ public class ServerSessionDelegate exten
     }
 
     @Override
-    public void messageSetFlowMode(Session session, MessageSetFlowMode sfm)
+    public void messageSetFlowMode(ServerSession session, MessageSetFlowMode sfm)
     {
         String destination = sfm.getDestination();
 
@@ -1722,7 +1728,7 @@ public class ServerSessionDelegate exten
     }
 
     @Override
-    public void messageStop(Session session, MessageStop stop)
+    public void messageStop(ServerSession session, MessageStop stop)
     {
         String destination = stop.getDestination();
 
@@ -1740,7 +1746,7 @@ public class ServerSessionDelegate exten
     }
 
     @Override
-    public void messageFlow(Session session, MessageFlow flow)
+    public void messageFlow(ServerSession session, MessageFlow flow)
     {
         String destination = flow.getDestination();
 
@@ -1757,8 +1763,7 @@ public class ServerSessionDelegate exten
 
     }
 
-    @Override
-    public void closed(Session session)
+    public void closed(ServerSession session)
     {
         ServerSession serverSession = (ServerSession)session;
 
@@ -1767,12 +1772,150 @@ public class ServerSessionDelegate exten
         serverSession.unregisterSubscriptions();
     }
 
-    @Override
-    public void detached(Session session)
+    public void detached(ServerSession session)
     {
         closed(session);
     }
 
+    public void init(ServerSession ssn, ProtocolHeader hdr)
+    {
+        LOGGER.warn("INIT: [{}] {}", ssn, hdr);
+    }
+
+    public void control(ServerSession ssn, Method method)
+    {
+        method.dispatch(ssn, this);
+    }
+
+    public void command(ServerSession ssn, Method method, boolean processed)
+    {
+        ssn.identify(method);
+        method.dispatch(ssn, this);
+        if (processed)
+        {
+            ssn.processed(method);
+        }
+    }
+
+    public void error(ServerSession ssn, ProtocolError error)
+    {
+        LOGGER.warn("ERROR: [{}] {}", ssn, error);
+    }
+
+    public void handle(ServerSession ssn, Method method)
+    {
+        LOGGER.warn("UNHANDLED: [{}] {}", ssn, method);
+    }
+
+    @Override public void sessionRequestTimeout(ServerSession ssn, SessionRequestTimeout t)
+    {
+        if (t.getTimeout() == 0)
+        {
+            ssn.setClose(true);
+        }
+        ssn.sessionTimeout(0); // Always report back an expiry of 0 until it is implemented
+    }
+
+    @Override public void sessionAttached(ServerSession ssn, SessionAttached atc)
+    {
+        ssn.setState(ServerSession.State.OPEN);
+        synchronized (ssn.getStateLock())
+        {
+            ssn.getStateLock().notifyAll();
+        }
+    }
+
+    @Override public void sessionTimeout(ServerSession ssn, SessionTimeout t)
+    {
+        // Setting of expiry is not implemented
+    }
+
+    @Override public void sessionCompleted(ServerSession ssn, SessionCompleted cmp)
+    {
+        RangeSet ranges = cmp.getCommands();
+        RangeSet known = null;
+
+        if (ranges != null)
+        {
+            if(ranges.size() == 1)
+            {
+                Range range = ranges.getFirst();
+                boolean advanced = ssn.complete(range.getLower(), range.getUpper());
+
+                if(advanced && cmp.getTimelyReply())
+                {
+                    known = range;
+                }
+            }
+            else
+            {
+                if (cmp.getTimelyReply())
+                {
+                    known = RangeSetFactory.createRangeSet();
+                }
+                for (Range range : ranges)
+                {
+                    boolean advanced = ssn.complete(range.getLower(), range.getUpper());
+                    if (advanced && known != null)
+                    {
+                        known.add(range);
+                    }
+                }
+            }
+        }
+        else if (cmp.getTimelyReply())
+        {
+            known = RangeSetFactory.createRangeSet();
+        }
+
+        if (known != null)
+        {
+            ssn.sessionKnownCompleted(known);
+        }
+    }
+
+    @Override public void sessionKnownCompleted(ServerSession ssn, SessionKnownCompleted kcmp)
+    {
+        RangeSet kc = kcmp.getCommands();
+        if (kc != null)
+        {
+            ssn.knownComplete(kc);
+        }
+    }
+
+    @Override public void sessionFlush(ServerSession ssn, SessionFlush flush)
+    {
+        if (flush.getCompleted())
+        {
+            ssn.flushProcessed();
+        }
+        if (flush.getConfirmed())
+        {
+           ssn.flushProcessed();
+        }
+        if (flush.getExpected())
+        {
+            ssn.flushExpected();
+        }
+    }
+
+    @Override public void sessionCommandPoint(ServerSession ssn, SessionCommandPoint scp)
+    {
+        ssn.commandPoint(scp.getCommandId());
+    }
+
+    @Override public void executionResult(ServerSession ssn, ExecutionResult result)
+    {
+        ssn.result(result.getCommandId(), result.getValue());
+    }
+
+    @Override public void executionException(ServerSession ssn, ExecutionException exc)
+    {
+        ssn.setException(exc);
+        ssn.getSessionListener().exception(ssn, new SessionException(exc));
+        ssn.closed();
+    }
+
     private static class CommandProcessedAction implements ServerTransaction.Action
     {
         private final ServerSession _serverSession;

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/SessionListener.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/SessionListener.java?rev=1783342&r1=1783341&r2=1783342&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/SessionListener.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/SessionListener.java Fri Feb 17 08:08:51 2017
@@ -32,14 +32,14 @@ import org.apache.qpid.server.transport.
 public interface SessionListener
 {
 
-    void opened(Session session);
+    void opened(ServerSession session);
 
-    void resumed(Session session);
+    void resumed(ServerSession session);
 
-    void message(Session ssn, MessageTransfer xfr);
+    void message(ServerSession ssn, MessageTransfer xfr);
 
-    void exception(Session session, SessionException exception);
+    void exception(ServerSession session, SessionException exception);
 
-    void closed(Session session);
+    void closed(ServerSession session);
 
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org