You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2015/01/28 22:20:59 UTC

[5/6] qpid-jms git commit: A bit more work on TX handling during failover.

A bit more work on TX handling during failover.

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/58e14248
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/58e14248
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/58e14248

Branch: refs/heads/master
Commit: 58e1424820255690ab7fb6a01b7ca78297901c12
Parents: 7698953
Author: Timothy Bish <ta...@gmail.com>
Authored: Wed Jan 28 16:08:59 2015 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Wed Jan 28 16:08:59 2015 -0500

----------------------------------------------------------------------
 .../qpid/jms/JmsLocalTransactionContext.java    | 62 +++++++++-----------
 .../qpid/jms/JmsNoTxTransactionContext.java     | 13 ++--
 .../java/org/apache/qpid/jms/JmsSession.java    |  8 +--
 .../apache/qpid/jms/JmsTransactionContext.java  | 17 +++---
 4 files changed, 50 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/58e14248/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
index a7dc974..0e3eddf 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
@@ -27,7 +27,9 @@ import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
 import org.apache.qpid.jms.message.JmsOutboundMessageDispatch;
 import org.apache.qpid.jms.meta.JmsTransactionId;
 import org.apache.qpid.jms.meta.JmsTransactionInfo;
+import org.apache.qpid.jms.provider.Provider;
 import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
+import org.apache.qpid.jms.provider.ProviderFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,10 +54,6 @@ public class JmsLocalTransactionContext implements JmsTransactionContext {
 
     @Override
     public void send(JmsConnection connection, JmsOutboundMessageDispatch envelope) throws JMSException {
-        // TODO - Optional throw an exception here to give early warning that
-        //        the transaction is in a failed state and must be rolled back.
-
-        //TODO: Is it worth holding the producer here (or earlier) while recovery is known to be in progress?
         if (!isFailed()) {
             begin();
             connection.send(envelope);
@@ -64,26 +62,13 @@ public class JmsLocalTransactionContext implements JmsTransactionContext {
 
     @Override
     public void acknowledge(JmsConnection connection, JmsInboundMessageDispatch envelope, ACK_TYPE ackType) throws JMSException {
-        // TODO - Should we ACK as delivered if failed just to ensure that prefetch is
-        //        extended and new message arrive until commit or rollback is called.
-        //        A quiet consumer could be misleading and prevent the code from doing
-        //        its normal batched receive / commit.
-        //
-        //        Reply: I think at least we would want a way to replenish the credit,
-        //        even if we didn't call the ack method (avoiding using the provider or
-        //        pumping the proton transport), especially if it was a low-prefetch
-        //        consumer to begin with. I think we always 'consumed ack' transacted
-        //        messages currently, never 'delivered ack' since we don't need to do
-        //        session recover for them.
-        if (!isFailed()) {
-            // Consumed or delivered messages fall into a transaction so we must check
-            // that there is an active one and start one if not.
-            if (ackType == ACK_TYPE.CONSUMED || ackType == ACK_TYPE.DELIVERED) {
-                begin();
-            }
-
-            connection.acknowledge(envelope, ackType);
+        // Consumed or delivered messages fall into a transaction so we must check
+        // that there is an active one and start one if not.
+        if (ackType == ACK_TYPE.CONSUMED || ackType == ACK_TYPE.DELIVERED) {
+            begin();
         }
+
+        connection.acknowledge(envelope, ackType);
     }
 
     @Override
@@ -95,16 +80,6 @@ public class JmsLocalTransactionContext implements JmsTransactionContext {
     }
 
     @Override
-    public void markAsFailed() {
-        //TODO: do we need to adjust this (or perhaps when we start the transaction?)
-        //      to handle an ack for a stale message delivery via onMessage starting
-        //      a transaction after this method was originally called?
-        if (isInTransaction()) {
-            failed = true;
-        }
-    }
-
-    @Override
     public boolean isFailed() {
         return failed;
     }
@@ -159,7 +134,10 @@ public class JmsLocalTransactionContext implements JmsTransactionContext {
         if (isFailed()) {
             failed = false;
             transactionId = null;
-            //TODO: we need to actually roll back if we have let any acks etc occur after the recovery.
+            try {
+                rollback();
+            } catch (Exception e) {
+            }
             throw new TransactionRolledBackException("Transaction failed and has been rolled back.");
         }
 
@@ -187,6 +165,22 @@ public class JmsLocalTransactionContext implements JmsTransactionContext {
     }
 
     @Override
+    public void onConnectionInterrupted() {
+        if (isInTransaction()) {
+            failed = true;
+        }
+    }
+
+    @Override
+    public void onConnectionRecovery(Provider provider) throws Exception {
+        transactionId = connection.getNextTransactionId();
+        JmsTransactionInfo transaction = new JmsTransactionInfo(session.getSessionId(), transactionId);
+        ProviderFuture request = new ProviderFuture();
+        provider.create(transaction, request);
+        request.sync();
+    }
+
+    @Override
     public String toString() {
         return "JmsLocalTransactionContext{ transactionId=" + transactionId + " }";
     }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/58e14248/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsNoTxTransactionContext.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsNoTxTransactionContext.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsNoTxTransactionContext.java
index 97f6df7..b26ba03 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsNoTxTransactionContext.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsNoTxTransactionContext.java
@@ -21,6 +21,7 @@ import javax.jms.JMSException;
 import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
 import org.apache.qpid.jms.message.JmsOutboundMessageDispatch;
 import org.apache.qpid.jms.meta.JmsTransactionId;
+import org.apache.qpid.jms.provider.Provider;
 import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
 
 /**
@@ -44,10 +45,6 @@ public class JmsNoTxTransactionContext implements JmsTransactionContext {
     }
 
     @Override
-    public void markAsFailed() {
-    }
-
-    @Override
     public boolean isFailed() {
         return false;
     }
@@ -84,4 +81,12 @@ public class JmsNoTxTransactionContext implements JmsTransactionContext {
     public boolean isInTransaction() {
         return false;
     }
+
+    @Override
+    public void onConnectionInterrupted() {
+    }
+
+    @Override
+    public void onConnectionRecovery(Provider provider) throws Exception {
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/58e14248/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
index a71adb2..3d8618c 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
@@ -920,11 +920,7 @@ public class JmsSession implements Session, QueueSession, TopicSession, JmsMessa
 
     protected void onConnectionInterrupted() {
 
-        if (this.acknowledgementMode == SESSION_TRANSACTED) {
-            if (transactionContext.isInTransaction()) {
-                transactionContext.markAsFailed();
-            }
-        }
+        transactionContext.onConnectionInterrupted();
 
         for (JmsMessageProducer producer : producers) {
             producer.onConnectionInterrupted();
@@ -941,6 +937,8 @@ public class JmsSession implements Session, QueueSession, TopicSession, JmsMessa
         provider.create(sessionInfo, request);
         request.sync();
 
+        transactionContext.onConnectionRecovery(provider);
+
         for (JmsMessageProducer producer : producers) {
             producer.onConnectionRecovery(provider);
         }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/58e14248/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTransactionContext.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTransactionContext.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTransactionContext.java
index 2d5524d..b1ba332 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTransactionContext.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTransactionContext.java
@@ -21,6 +21,7 @@ import javax.jms.JMSException;
 import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
 import org.apache.qpid.jms.message.JmsOutboundMessageDispatch;
 import org.apache.qpid.jms.meta.JmsTransactionId;
+import org.apache.qpid.jms.provider.Provider;
 import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
 
 /**
@@ -70,13 +71,6 @@ public interface JmsTransactionContext {
     void addSynchronization(JmsTxSynchronization sync);
 
     /**
-     * Marks an currently active Transaction as being failed, usually due to a
-     * connection failure.  Once failed all the transaction must be rolled back
-     * before new work can be done.
-     */
-    void markAsFailed();
-
-    /**
      * @returns if the currently transaction has been marked as being failed.
      */
     boolean isFailed();
@@ -134,4 +128,13 @@ public interface JmsTransactionContext {
      */
     boolean isInTransaction();
 
+    void onConnectionInterrupted();
+
+    /**
+     * Called when the connection to the remote peer has been lost and then a new
+     * connection established.  The context should perform any necessary processing
+     * recover and reset its internal state.
+     */
+    void onConnectionRecovery(Provider provider) throws Exception;
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org