You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2015/01/28 22:20:59 UTC
[5/6] qpid-jms git commit: A bit more work on TX handling during
failover.
A bit more work on TX handling during failover.
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/58e14248
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/58e14248
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/58e14248
Branch: refs/heads/master
Commit: 58e1424820255690ab7fb6a01b7ca78297901c12
Parents: 7698953
Author: Timothy Bish <ta...@gmail.com>
Authored: Wed Jan 28 16:08:59 2015 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Wed Jan 28 16:08:59 2015 -0500
----------------------------------------------------------------------
.../qpid/jms/JmsLocalTransactionContext.java | 62 +++++++++-----------
.../qpid/jms/JmsNoTxTransactionContext.java | 13 ++--
.../java/org/apache/qpid/jms/JmsSession.java | 8 +--
.../apache/qpid/jms/JmsTransactionContext.java | 17 +++---
4 files changed, 50 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/58e14248/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
index a7dc974..0e3eddf 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
@@ -27,7 +27,9 @@ import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
import org.apache.qpid.jms.message.JmsOutboundMessageDispatch;
import org.apache.qpid.jms.meta.JmsTransactionId;
import org.apache.qpid.jms.meta.JmsTransactionInfo;
+import org.apache.qpid.jms.provider.Provider;
import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
+import org.apache.qpid.jms.provider.ProviderFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,10 +54,6 @@ public class JmsLocalTransactionContext implements JmsTransactionContext {
@Override
public void send(JmsConnection connection, JmsOutboundMessageDispatch envelope) throws JMSException {
- // TODO - Optional throw an exception here to give early warning that
- // the transaction is in a failed state and must be rolled back.
-
- //TODO: Is it worth holding the producer here (or earlier) while recovery is known to be in progress?
if (!isFailed()) {
begin();
connection.send(envelope);
@@ -64,26 +62,13 @@ public class JmsLocalTransactionContext implements JmsTransactionContext {
@Override
public void acknowledge(JmsConnection connection, JmsInboundMessageDispatch envelope, ACK_TYPE ackType) throws JMSException {
- // TODO - Should we ACK as delivered if failed just to ensure that prefetch is
- // extended and new message arrive until commit or rollback is called.
- // A quiet consumer could be misleading and prevent the code from doing
- // its normal batched receive / commit.
- //
- // Reply: I think at least we would want a way to replenish the credit,
- // even if we didn't call the ack method (avoiding using the provider or
- // pumping the proton transport), especially if it was a low-prefetch
- // consumer to begin with. I think we always 'consumed ack' transacted
- // messages currently, never 'delivered ack' since we don't need to do
- // session recover for them.
- if (!isFailed()) {
- // Consumed or delivered messages fall into a transaction so we must check
- // that there is an active one and start one if not.
- if (ackType == ACK_TYPE.CONSUMED || ackType == ACK_TYPE.DELIVERED) {
- begin();
- }
-
- connection.acknowledge(envelope, ackType);
+ // Consumed or delivered messages fall into a transaction so we must check
+ // that there is an active one and start one if not.
+ if (ackType == ACK_TYPE.CONSUMED || ackType == ACK_TYPE.DELIVERED) {
+ begin();
}
+
+ connection.acknowledge(envelope, ackType);
}
@Override
@@ -95,16 +80,6 @@ public class JmsLocalTransactionContext implements JmsTransactionContext {
}
@Override
- public void markAsFailed() {
- //TODO: do we need to adjust this (or perhaps when we start the transaction?)
- // to handle an ack for a stale message delivery via onMessage starting
- // a transaction after this method was originally called?
- if (isInTransaction()) {
- failed = true;
- }
- }
-
- @Override
public boolean isFailed() {
return failed;
}
@@ -159,7 +134,10 @@ public class JmsLocalTransactionContext implements JmsTransactionContext {
if (isFailed()) {
failed = false;
transactionId = null;
- //TODO: we need to actually roll back if we have let any acks etc occur after the recovery.
+ try {
+ rollback();
+ } catch (Exception e) {
+ }
throw new TransactionRolledBackException("Transaction failed and has been rolled back.");
}
@@ -187,6 +165,22 @@ public class JmsLocalTransactionContext implements JmsTransactionContext {
}
@Override
+ public void onConnectionInterrupted() {
+ if (isInTransaction()) {
+ failed = true;
+ }
+ }
+
+ @Override
+ public void onConnectionRecovery(Provider provider) throws Exception {
+ transactionId = connection.getNextTransactionId();
+ JmsTransactionInfo transaction = new JmsTransactionInfo(session.getSessionId(), transactionId);
+ ProviderFuture request = new ProviderFuture();
+ provider.create(transaction, request);
+ request.sync();
+ }
+
+ @Override
public String toString() {
return "JmsLocalTransactionContext{ transactionId=" + transactionId + " }";
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/58e14248/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsNoTxTransactionContext.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsNoTxTransactionContext.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsNoTxTransactionContext.java
index 97f6df7..b26ba03 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsNoTxTransactionContext.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsNoTxTransactionContext.java
@@ -21,6 +21,7 @@ import javax.jms.JMSException;
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
import org.apache.qpid.jms.message.JmsOutboundMessageDispatch;
import org.apache.qpid.jms.meta.JmsTransactionId;
+import org.apache.qpid.jms.provider.Provider;
import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
/**
@@ -44,10 +45,6 @@ public class JmsNoTxTransactionContext implements JmsTransactionContext {
}
@Override
- public void markAsFailed() {
- }
-
- @Override
public boolean isFailed() {
return false;
}
@@ -84,4 +81,12 @@ public class JmsNoTxTransactionContext implements JmsTransactionContext {
public boolean isInTransaction() {
return false;
}
+
+ @Override
+ public void onConnectionInterrupted() {
+ }
+
+ @Override
+ public void onConnectionRecovery(Provider provider) throws Exception {
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/58e14248/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
index a71adb2..3d8618c 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
@@ -920,11 +920,7 @@ public class JmsSession implements Session, QueueSession, TopicSession, JmsMessa
protected void onConnectionInterrupted() {
- if (this.acknowledgementMode == SESSION_TRANSACTED) {
- if (transactionContext.isInTransaction()) {
- transactionContext.markAsFailed();
- }
- }
+ transactionContext.onConnectionInterrupted();
for (JmsMessageProducer producer : producers) {
producer.onConnectionInterrupted();
@@ -941,6 +937,8 @@ public class JmsSession implements Session, QueueSession, TopicSession, JmsMessa
provider.create(sessionInfo, request);
request.sync();
+ transactionContext.onConnectionRecovery(provider);
+
for (JmsMessageProducer producer : producers) {
producer.onConnectionRecovery(provider);
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/58e14248/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTransactionContext.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTransactionContext.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTransactionContext.java
index 2d5524d..b1ba332 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTransactionContext.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTransactionContext.java
@@ -21,6 +21,7 @@ import javax.jms.JMSException;
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
import org.apache.qpid.jms.message.JmsOutboundMessageDispatch;
import org.apache.qpid.jms.meta.JmsTransactionId;
+import org.apache.qpid.jms.provider.Provider;
import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
/**
@@ -70,13 +71,6 @@ public interface JmsTransactionContext {
void addSynchronization(JmsTxSynchronization sync);
/**
- * Marks an currently active Transaction as being failed, usually due to a
- * connection failure. Once failed all the transaction must be rolled back
- * before new work can be done.
- */
- void markAsFailed();
-
- /**
* @returns if the currently transaction has been marked as being failed.
*/
boolean isFailed();
@@ -134,4 +128,13 @@ public interface JmsTransactionContext {
*/
boolean isInTransaction();
+ void onConnectionInterrupted();
+
+ /**
+ * Called when the connection to the remote peer has been lost and then a new
+ * connection established. The context should perform any necessary processing
+ * recover and reset its internal state.
+ */
+ void onConnectionRecovery(Provider provider) throws Exception;
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org