You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2016/04/04 18:09:11 UTC
[02/42] activemq-artemis git commit: ARTEMIS-463 Using
OperationContext for async support
ARTEMIS-463 Using OperationContext for async support
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/3560415b
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/3560415b
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/3560415b
Branch: refs/heads/master
Commit: 3560415bcbcebfa853a941c4ac9fcf707ed2beaf
Parents: 3aedf27
Author: Clebert Suconic <cl...@apache.org>
Authored: Fri Apr 1 17:24:41 2016 -0400
Committer: jbertram <jb...@apache.org>
Committed: Mon Apr 4 11:08:43 2016 -0500
----------------------------------------------------------------------
.../protocol/openwire/OpenWireConnection.java | 67 +++++++++++++++-----
1 file changed, 52 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3560415b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index f9e8838..3ccb98d 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -40,6 +40,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
@@ -224,15 +225,12 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
super.bufferReceived(connectionID, buffer);
try {
- // TODO-NOW: set OperationContext
-
Command command = (Command) wireFormat.unmarshal(buffer);
boolean responseRequired = command.isResponseRequired();
int commandId = command.getCommandId();
- // TODO-NOW: the server should send packets to the client based on the requested times
- // need to look at what Andy did on AMQP
+ // TODO: the server should send packets to the client based on the requested times
// the connection handles pings, negotiations directly.
// and delegate all other commands to manager.
@@ -285,7 +283,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
}
}
- // TODO-NOW: response through operation-context
+ // TODO: response through operation-context
if (response != null && !protocolManager.isStopping()) {
response.setCorrelationId(commandId);
@@ -1076,6 +1074,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
public Response processBeginTransaction(TransactionInfo info) throws Exception {
final TransactionId txID = info.getTransactionId();
+ setOperationContext(null);
try {
internalSession.resetTX(null);
if (txID.isXATransaction()) {
@@ -1095,6 +1094,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
}
finally {
internalSession.resetTX(null);
+ clearOpeartionContext();
}
return null;
}
@@ -1111,7 +1111,13 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
AMQSession session = (AMQSession) tx.getProtocolData();
- tx.commit(onePhase);
+ setOperationContext(session);
+ try {
+ tx.commit(onePhase);
+ }
+ finally {
+ clearOpeartionContext();
+ }
return null;
}
@@ -1125,18 +1131,24 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
public Response processForgetTransaction(TransactionInfo info) throws Exception {
TransactionId txID = info.getTransactionId();
- if (txID.isXATransaction()) {
- try {
- Xid xid = OpenWireUtil.toXID(info.getTransactionId());
- internalSession.xaForget(xid);
+ setOperationContext(null);
+ try {
+ if (txID.isXATransaction()) {
+ try {
+ Xid xid = OpenWireUtil.toXID(info.getTransactionId());
+ internalSession.xaForget(xid);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ throw e;
+ }
}
- catch (Exception e) {
- e.printStackTrace();
- throw e;
+ else {
+ txMap.remove(txID);
}
}
- else {
- txMap.remove(txID);
+ finally {
+ clearOpeartionContext();
}
return null;
@@ -1146,6 +1158,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
public Response processPrepareTransaction(TransactionInfo info) throws Exception {
TransactionId txID = info.getTransactionId();
+ setOperationContext(null);
try {
if (txID.isXATransaction()) {
try {
@@ -1164,6 +1177,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
}
finally {
internalSession.resetTX(null);
+ clearOpeartionContext();
}
return new IntegerResponse(XAResource.XA_RDONLY);
@@ -1173,6 +1187,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
public Response processEndTransaction(TransactionInfo info) throws Exception {
TransactionId txID = info.getTransactionId();
+ setOperationContext(null);
if (txID.isXATransaction()) {
try {
Transaction tx = lookupTX(txID, null);
@@ -1192,6 +1207,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
}
else {
txMap.remove(info);
+ clearOpeartionContext();
}
return null;
@@ -1255,14 +1271,17 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
Transaction tx = lookupTX(messageSend.getTransactionId(), session);
+ setOperationContext(session);
session.getCoreSession().resetTX(tx);
try {
session.send(producerInfo, messageSend, sendProducerAck);
}
finally {
session.getCoreSession().resetTX(null);
+ clearOpeartionContext();
}
+
return null;
}
@@ -1270,6 +1289,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
public Response processMessageAck(MessageAck ack) throws Exception {
AMQSession session = getSession(ack.getConsumerId().getParentId());
Transaction tx = lookupTX(ack.getTransactionId(), session);
+ setOperationContext(session);
session.getCoreSession().resetTX(tx);
try {
@@ -1278,6 +1298,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
}
finally {
session.getCoreSession().resetTX(null);
+ clearOpeartionContext();
}
return null;
}
@@ -1354,6 +1375,22 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
}
+ private void setOperationContext(AMQSession session) {
+ OperationContext ctx;
+ if (session == null) {
+ ctx = this.internalSession.getSessionContext();
+ }
+ else {
+ ctx = session.getCoreSession().getSessionContext();
+ }
+ server.getStorageManager().setContext(ctx);
+ }
+
+
+ private void clearOpeartionContext() {
+ server.getStorageManager().clearContext();
+ }
+
private Transaction lookupTX(TransactionId txID, AMQSession session) throws IllegalStateException {
if (txID == null) {
return null;