You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2016/04/04 18:09:11 UTC

[02/42] activemq-artemis git commit: ARTEMIS-463 Using OperationContext for async support

ARTEMIS-463 Using OperationContext for async support


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/3560415b
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/3560415b
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/3560415b

Branch: refs/heads/master
Commit: 3560415bcbcebfa853a941c4ac9fcf707ed2beaf
Parents: 3aedf27
Author: Clebert Suconic <cl...@apache.org>
Authored: Fri Apr 1 17:24:41 2016 -0400
Committer: jbertram <jb...@apache.org>
Committed: Mon Apr 4 11:08:43 2016 -0500

----------------------------------------------------------------------
 .../protocol/openwire/OpenWireConnection.java   | 67 +++++++++++++++-----
 1 file changed, 52 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3560415b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index f9e8838..3ccb98d 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -40,6 +40,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
 import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.persistence.OperationContext;
 import org.apache.activemq.artemis.core.postoffice.Binding;
 import org.apache.activemq.artemis.core.postoffice.Bindings;
 import org.apache.activemq.artemis.core.postoffice.QueueBinding;
@@ -224,15 +225,12 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
       super.bufferReceived(connectionID, buffer);
       try {
 
-         // TODO-NOW: set OperationContext
-
          Command command = (Command) wireFormat.unmarshal(buffer);
 
          boolean responseRequired = command.isResponseRequired();
          int commandId = command.getCommandId();
 
-         // TODO-NOW: the server should send packets to the client based on the requested times
-         //           need to look at what Andy did on AMQP
+         // TODO: the server should send packets to the client based on the requested times
 
          // the connection handles pings, negotiations directly.
          // and delegate all other commands to manager.
@@ -285,7 +283,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
                }
             }
 
-            // TODO-NOW: response through operation-context
+            // TODO: response through operation-context
 
             if (response != null && !protocolManager.isStopping()) {
                response.setCorrelationId(commandId);
@@ -1076,6 +1074,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
       public Response processBeginTransaction(TransactionInfo info) throws Exception {
          final TransactionId txID = info.getTransactionId();
 
+         setOperationContext(null);
          try {
             internalSession.resetTX(null);
             if (txID.isXATransaction()) {
@@ -1095,6 +1094,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
          }
          finally {
             internalSession.resetTX(null);
+            clearOpeartionContext();
          }
          return null;
       }
@@ -1111,7 +1111,13 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
 
          AMQSession session = (AMQSession) tx.getProtocolData();
 
-         tx.commit(onePhase);
+         setOperationContext(session);
+         try {
+            tx.commit(onePhase);
+         }
+         finally {
+            clearOpeartionContext();
+         }
 
          return null;
       }
@@ -1125,18 +1131,24 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
       public Response processForgetTransaction(TransactionInfo info) throws Exception {
          TransactionId txID = info.getTransactionId();
 
-         if (txID.isXATransaction()) {
-            try {
-               Xid xid = OpenWireUtil.toXID(info.getTransactionId());
-               internalSession.xaForget(xid);
+         setOperationContext(null);
+         try {
+            if (txID.isXATransaction()) {
+               try {
+                  Xid xid = OpenWireUtil.toXID(info.getTransactionId());
+                  internalSession.xaForget(xid);
+               }
+               catch (Exception e) {
+                  e.printStackTrace();
+                  throw e;
+               }
             }
-            catch (Exception e) {
-               e.printStackTrace();
-               throw e;
+            else {
+               txMap.remove(txID);
             }
          }
-         else {
-            txMap.remove(txID);
+         finally {
+            clearOpeartionContext();
          }
 
          return null;
@@ -1146,6 +1158,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
       public Response processPrepareTransaction(TransactionInfo info) throws Exception {
          TransactionId txID = info.getTransactionId();
 
+         setOperationContext(null);
          try {
             if (txID.isXATransaction()) {
                try {
@@ -1164,6 +1177,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
          }
          finally {
             internalSession.resetTX(null);
+            clearOpeartionContext();
          }
 
          return new IntegerResponse(XAResource.XA_RDONLY);
@@ -1173,6 +1187,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
       public Response processEndTransaction(TransactionInfo info) throws Exception {
          TransactionId txID = info.getTransactionId();
 
+         setOperationContext(null);
          if (txID.isXATransaction()) {
             try {
                Transaction tx = lookupTX(txID, null);
@@ -1192,6 +1207,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
          }
          else {
             txMap.remove(info);
+            clearOpeartionContext();
          }
 
          return null;
@@ -1255,14 +1271,17 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
 
          Transaction tx = lookupTX(messageSend.getTransactionId(), session);
 
+         setOperationContext(session);
          session.getCoreSession().resetTX(tx);
          try {
             session.send(producerInfo, messageSend, sendProducerAck);
          }
          finally {
             session.getCoreSession().resetTX(null);
+            clearOpeartionContext();
          }
 
+
          return null;
       }
 
@@ -1270,6 +1289,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
       public Response processMessageAck(MessageAck ack) throws Exception {
          AMQSession session = getSession(ack.getConsumerId().getParentId());
          Transaction tx = lookupTX(ack.getTransactionId(), session);
+         setOperationContext(session);
          session.getCoreSession().resetTX(tx);
 
          try {
@@ -1278,6 +1298,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
          }
          finally {
             session.getCoreSession().resetTX(null);
+            clearOpeartionContext();
          }
          return null;
       }
@@ -1354,6 +1375,22 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
 
    }
 
+   private void setOperationContext(AMQSession session) {
+      OperationContext ctx;
+      if (session == null) {
+         ctx = this.internalSession.getSessionContext();
+      }
+      else {
+         ctx = session.getCoreSession().getSessionContext();
+      }
+      server.getStorageManager().setContext(ctx);
+   }
+
+
+   private void clearOpeartionContext() {
+      server.getStorageManager().clearContext();
+   }
+
    private Transaction lookupTX(TransactionId txID, AMQSession session) throws IllegalStateException {
       if (txID == null) {
          return null;