You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2017/03/03 17:30:54 UTC

qpid-jms git commit: QPIDJMS-269 Resolve performance regression for transacted sends

Repository: qpid-jms
Updated Branches:
  refs/heads/master 72efc6992 -> 1819bb277


QPIDJMS-269 Resolve performance regression for transacted sends

Remove some unnecessary accounting on transacted sends that leads to a
performance degradation compared to previous releases.  

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/1819bb27
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/1819bb27
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/1819bb27

Branch: refs/heads/master
Commit: 1819bb277f42a895c98c3b764299c0b7371af2e1
Parents: 72efc69
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri Mar 3 12:30:41 2017 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Fri Mar 3 12:30:41 2017 -0500

----------------------------------------------------------------------
 .../amqp/AmqpAnonymousFallbackProducer.java     |  6 ---
 .../qpid/jms/provider/amqp/AmqpConsumer.java    |  4 +-
 .../jms/provider/amqp/AmqpFixedProducer.java    | 11 -----
 .../qpid/jms/provider/amqp/AmqpProducer.java    | 10 ----
 .../provider/amqp/AmqpTransactionContext.java   | 52 ++++++++------------
 5 files changed, 22 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/1819bb27/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java
index 447b4b2..a73ec92 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java
@@ -139,12 +139,6 @@ public class AmqpAnonymousFallbackProducer extends AmqpProducer {
         return new JmsProducerId(producerIdKey, -1, producerIdCount++);
     }
 
-    @Override
-    public void addSendCompletionWatcher(AsyncResult watcher) {
-        throw new UnsupportedOperationException(
-            "The fallback producer parent should never have a watcher assigned.");
-    }
-
     //----- AsyncResult objects used to complete the sends -------------------//
 
     private abstract class AnonymousRequest extends WrappedAsyncResult {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/1819bb27/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index 28f2ba5..b682c73 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -138,7 +138,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
                     public void run() {
                         LOG.trace("Consumer {} drain request timed out", getConsumerId());
                         Exception cause = new JmsOperationTimedOutException("Remote did not respond to a drain request in time");
-                        if (session.isTransacted() && session.getTransactionContext().isInTransaction(AmqpConsumer.this)) {
+                        if (session.isTransacted() && session.getTransactionContext().isInTransaction(getConsumerId())) {
                             stopRequest.onFailure(cause);
                             stopRequest = null;
                         } else {
@@ -645,7 +645,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
     }
 
     private boolean shouldDeferClose() {
-        if (getSession().isTransacted() && getSession().getTransactionContext().isInTransaction(this)) {
+        if (getSession().isTransacted() && getSession().getTransactionContext().isInTransaction(getConsumerId())) {
             return true;
         }
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/1819bb27/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
index 4acd419..4297147 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
@@ -303,17 +303,6 @@ public class AmqpFixedProducer extends AmqpProducer {
         }
     }
 
-    @Override
-    public void addSendCompletionWatcher(AsyncResult watcher) {
-        // If none pending signal done already.
-        // TODO - If we don't include blocked sends then update this.
-        if (blocked.isEmpty() && sent.isEmpty()) {
-            watcher.onSuccess();
-        } else {
-            this.sendCompletionWatcher = watcher;
-        }
-    }
-
     //----- Class used to manage held sends ----------------------------------//
 
     private class InFlightSend implements AsyncResult, AmqpExceptionBuilder {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/1819bb27/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProducer.java
index b862fb0..6999d76 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProducer.java
@@ -94,14 +94,4 @@ public abstract class AmqpProducer extends AmqpAbstractResource<JmsProducerInfo,
     public void setDelayedDeliverySupported(boolean delayedDeliverySupported) {
         this.delayedDeliverySupported = delayedDeliverySupported;
     }
-
-    /**
-     * Allows a completion request to be added to this producer that will be notified
-     * once all outstanding sends have completed.
-     *
-     * @param watcher
-     *      The AsyncResult that will be signaled once this producer has no pending sends.
-     */
-    public abstract void addSendCompletionWatcher(AsyncResult watcher);
-
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/1819bb27/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
index 8f3aab8..1a43aef 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
@@ -17,12 +17,14 @@
 package org.apache.qpid.jms.provider.amqp;
 
 import java.io.IOException;
-import java.util.LinkedHashSet;
-import java.util.Set;
+import java.util.HashMap;
+import java.util.Map;
 
 import javax.jms.IllegalStateException;
 import javax.jms.TransactionRolledBackException;
 
+import org.apache.qpid.jms.meta.JmsConsumerId;
+import org.apache.qpid.jms.meta.JmsProducerId;
 import org.apache.qpid.jms.meta.JmsSessionInfo;
 import org.apache.qpid.jms.meta.JmsTransactionId;
 import org.apache.qpid.jms.meta.JmsTransactionInfo;
@@ -43,8 +45,8 @@ public class AmqpTransactionContext implements AmqpResourceParent {
     private static final Logger LOG = LoggerFactory.getLogger(AmqpTransactionContext.class);
 
     private final AmqpSession session;
-    private final Set<AmqpConsumer> txConsumers = new LinkedHashSet<AmqpConsumer>();
-    private final Set<AmqpProducer> txProducers = new LinkedHashSet<AmqpProducer>();
+    private final Map<JmsConsumerId, AmqpConsumer> txConsumers = new HashMap<>();
+    private final Map<JmsProducerId, AmqpProducer> txProducers = new HashMap<>();
 
     private JmsTransactionId current;
     private AmqpTransactionCoordinator coordinator;
@@ -130,15 +132,8 @@ public class AmqpTransactionContext implements AmqpResourceParent {
 
         DischargeCompletion dischargeResult = new DischargeCompletion(request, true);
 
-        if (txProducers.isEmpty()) {
-            LOG.trace("TX Context[{}] committing current TX[[]]", this, current);
-            coordinator.discharge(current, dischargeResult, true);
-        } else {
-            SendCompletion producersSendCompletion = new SendCompletion(transactionInfo, dischargeResult, txProducers.size(), true);
-            for (AmqpProducer producer : txProducers) {
-                producer.addSendCompletionWatcher(producersSendCompletion);
-            }
-        }
+        LOG.trace("TX Context[{}] committing current TX[[]]", this, current);
+        coordinator.discharge(current, dischargeResult, true);
     }
 
     public void rollback(JmsTransactionInfo transactionInfo, final AsyncResult request) throws Exception {
@@ -157,33 +152,26 @@ public class AmqpTransactionContext implements AmqpResourceParent {
 
         DischargeCompletion dischargeResult = new DischargeCompletion(request, false);
 
-        if (txProducers.isEmpty()) {
-            LOG.trace("TX Context[{}] rolling back current TX[[]]", this, current);
-            coordinator.discharge(current, dischargeResult, false);
-        } else {
-            SendCompletion producersSendCompletion = new SendCompletion(transactionInfo, dischargeResult, txProducers.size(), false);
-            for (AmqpProducer producer : txProducers) {
-                producer.addSendCompletionWatcher(producersSendCompletion);
-            }
-        }
+        LOG.trace("TX Context[{}] rolling back current TX[[]]", this, current);
+        coordinator.discharge(current, dischargeResult, false);
     }
 
     //----- Context utility methods ------------------------------------------//
 
     public void registerTxConsumer(AmqpConsumer consumer) {
-        txConsumers.add(consumer);
+        txConsumers.put(consumer.getConsumerId(), consumer);
     }
 
-    public boolean isInTransaction(AmqpConsumer consumer) {
-        return txConsumers.contains(consumer);
+    public boolean isInTransaction(JmsConsumerId consumerId) {
+        return txConsumers.containsKey(consumerId);
     }
 
     public void registerTxProducer(AmqpProducer producer) {
-        txProducers.add(producer);
+        txProducers.put(producer.getProducerId(), producer);
     }
 
-    public boolean isInTransaction(AmqpProducer producer) {
-        return txProducers.contains(producer);
+    public boolean isInTransaction(JmsProducerId producerId) {
+        return txProducers.containsKey(producerId);
     }
 
     public AmqpSession getSession() {
@@ -214,19 +202,19 @@ public class AmqpTransactionContext implements AmqpResourceParent {
     //----- Transaction pre / post completion --------------------------------//
 
     private void preCommit() {
-        for (AmqpConsumer consumer : txConsumers) {
+        for (AmqpConsumer consumer : txConsumers.values()) {
             consumer.preCommit();
         }
     }
 
     private void preRollback() {
-        for (AmqpConsumer consumer : txConsumers) {
+        for (AmqpConsumer consumer : txConsumers.values()) {
             consumer.preRollback();
         }
     }
 
     private void postCommit() {
-        for (AmqpConsumer consumer : txConsumers) {
+        for (AmqpConsumer consumer : txConsumers.values()) {
             consumer.postCommit();
         }
 
@@ -235,7 +223,7 @@ public class AmqpTransactionContext implements AmqpResourceParent {
     }
 
     private void postRollback() {
-        for (AmqpConsumer consumer : txConsumers) {
+        for (AmqpConsumer consumer : txConsumers.values()) {
             consumer.postRollback();
         }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org