You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2017/03/03 17:30:54 UTC
qpid-jms git commit: QPIDJMS-269 Resolve performance regression for
transacted sends
Repository: qpid-jms
Updated Branches:
refs/heads/master 72efc6992 -> 1819bb277
QPIDJMS-269 Resolve performance regression for transacted sends
Remove some unnecessary accounting on transacted sends that leads to a
performance degradation compared to previous releases.
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/1819bb27
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/1819bb27
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/1819bb27
Branch: refs/heads/master
Commit: 1819bb277f42a895c98c3b764299c0b7371af2e1
Parents: 72efc69
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri Mar 3 12:30:41 2017 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Fri Mar 3 12:30:41 2017 -0500
----------------------------------------------------------------------
.../amqp/AmqpAnonymousFallbackProducer.java | 6 ---
.../qpid/jms/provider/amqp/AmqpConsumer.java | 4 +-
.../jms/provider/amqp/AmqpFixedProducer.java | 11 -----
.../qpid/jms/provider/amqp/AmqpProducer.java | 10 ----
.../provider/amqp/AmqpTransactionContext.java | 52 ++++++++------------
5 files changed, 22 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/1819bb27/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java
index 447b4b2..a73ec92 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java
@@ -139,12 +139,6 @@ public class AmqpAnonymousFallbackProducer extends AmqpProducer {
return new JmsProducerId(producerIdKey, -1, producerIdCount++);
}
- @Override
- public void addSendCompletionWatcher(AsyncResult watcher) {
- throw new UnsupportedOperationException(
- "The fallback producer parent should never have a watcher assigned.");
- }
-
//----- AsyncResult objects used to complete the sends -------------------//
private abstract class AnonymousRequest extends WrappedAsyncResult {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/1819bb27/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index 28f2ba5..b682c73 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -138,7 +138,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
public void run() {
LOG.trace("Consumer {} drain request timed out", getConsumerId());
Exception cause = new JmsOperationTimedOutException("Remote did not respond to a drain request in time");
- if (session.isTransacted() && session.getTransactionContext().isInTransaction(AmqpConsumer.this)) {
+ if (session.isTransacted() && session.getTransactionContext().isInTransaction(getConsumerId())) {
stopRequest.onFailure(cause);
stopRequest = null;
} else {
@@ -645,7 +645,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
}
private boolean shouldDeferClose() {
- if (getSession().isTransacted() && getSession().getTransactionContext().isInTransaction(this)) {
+ if (getSession().isTransacted() && getSession().getTransactionContext().isInTransaction(getConsumerId())) {
return true;
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/1819bb27/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
index 4acd419..4297147 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
@@ -303,17 +303,6 @@ public class AmqpFixedProducer extends AmqpProducer {
}
}
- @Override
- public void addSendCompletionWatcher(AsyncResult watcher) {
- // If none pending signal done already.
- // TODO - If we don't include blocked sends then update this.
- if (blocked.isEmpty() && sent.isEmpty()) {
- watcher.onSuccess();
- } else {
- this.sendCompletionWatcher = watcher;
- }
- }
-
//----- Class used to manage held sends ----------------------------------//
private class InFlightSend implements AsyncResult, AmqpExceptionBuilder {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/1819bb27/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProducer.java
index b862fb0..6999d76 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProducer.java
@@ -94,14 +94,4 @@ public abstract class AmqpProducer extends AmqpAbstractResource<JmsProducerInfo,
public void setDelayedDeliverySupported(boolean delayedDeliverySupported) {
this.delayedDeliverySupported = delayedDeliverySupported;
}
-
- /**
- * Allows a completion request to be added to this producer that will be notified
- * once all outstanding sends have completed.
- *
- * @param watcher
- * The AsyncResult that will be signaled once this producer has no pending sends.
- */
- public abstract void addSendCompletionWatcher(AsyncResult watcher);
-
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/1819bb27/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
index 8f3aab8..1a43aef 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
@@ -17,12 +17,14 @@
package org.apache.qpid.jms.provider.amqp;
import java.io.IOException;
-import java.util.LinkedHashSet;
-import java.util.Set;
+import java.util.HashMap;
+import java.util.Map;
import javax.jms.IllegalStateException;
import javax.jms.TransactionRolledBackException;
+import org.apache.qpid.jms.meta.JmsConsumerId;
+import org.apache.qpid.jms.meta.JmsProducerId;
import org.apache.qpid.jms.meta.JmsSessionInfo;
import org.apache.qpid.jms.meta.JmsTransactionId;
import org.apache.qpid.jms.meta.JmsTransactionInfo;
@@ -43,8 +45,8 @@ public class AmqpTransactionContext implements AmqpResourceParent {
private static final Logger LOG = LoggerFactory.getLogger(AmqpTransactionContext.class);
private final AmqpSession session;
- private final Set<AmqpConsumer> txConsumers = new LinkedHashSet<AmqpConsumer>();
- private final Set<AmqpProducer> txProducers = new LinkedHashSet<AmqpProducer>();
+ private final Map<JmsConsumerId, AmqpConsumer> txConsumers = new HashMap<>();
+ private final Map<JmsProducerId, AmqpProducer> txProducers = new HashMap<>();
private JmsTransactionId current;
private AmqpTransactionCoordinator coordinator;
@@ -130,15 +132,8 @@ public class AmqpTransactionContext implements AmqpResourceParent {
DischargeCompletion dischargeResult = new DischargeCompletion(request, true);
- if (txProducers.isEmpty()) {
- LOG.trace("TX Context[{}] committing current TX[[]]", this, current);
- coordinator.discharge(current, dischargeResult, true);
- } else {
- SendCompletion producersSendCompletion = new SendCompletion(transactionInfo, dischargeResult, txProducers.size(), true);
- for (AmqpProducer producer : txProducers) {
- producer.addSendCompletionWatcher(producersSendCompletion);
- }
- }
+ LOG.trace("TX Context[{}] committing current TX[[]]", this, current);
+ coordinator.discharge(current, dischargeResult, true);
}
public void rollback(JmsTransactionInfo transactionInfo, final AsyncResult request) throws Exception {
@@ -157,33 +152,26 @@ public class AmqpTransactionContext implements AmqpResourceParent {
DischargeCompletion dischargeResult = new DischargeCompletion(request, false);
- if (txProducers.isEmpty()) {
- LOG.trace("TX Context[{}] rolling back current TX[[]]", this, current);
- coordinator.discharge(current, dischargeResult, false);
- } else {
- SendCompletion producersSendCompletion = new SendCompletion(transactionInfo, dischargeResult, txProducers.size(), false);
- for (AmqpProducer producer : txProducers) {
- producer.addSendCompletionWatcher(producersSendCompletion);
- }
- }
+ LOG.trace("TX Context[{}] rolling back current TX[[]]", this, current);
+ coordinator.discharge(current, dischargeResult, false);
}
//----- Context utility methods ------------------------------------------//
public void registerTxConsumer(AmqpConsumer consumer) {
- txConsumers.add(consumer);
+ txConsumers.put(consumer.getConsumerId(), consumer);
}
- public boolean isInTransaction(AmqpConsumer consumer) {
- return txConsumers.contains(consumer);
+ public boolean isInTransaction(JmsConsumerId consumerId) {
+ return txConsumers.containsKey(consumerId);
}
public void registerTxProducer(AmqpProducer producer) {
- txProducers.add(producer);
+ txProducers.put(producer.getProducerId(), producer);
}
- public boolean isInTransaction(AmqpProducer producer) {
- return txProducers.contains(producer);
+ public boolean isInTransaction(JmsProducerId producerId) {
+ return txProducers.containsKey(producerId);
}
public AmqpSession getSession() {
@@ -214,19 +202,19 @@ public class AmqpTransactionContext implements AmqpResourceParent {
//----- Transaction pre / post completion --------------------------------//
private void preCommit() {
- for (AmqpConsumer consumer : txConsumers) {
+ for (AmqpConsumer consumer : txConsumers.values()) {
consumer.preCommit();
}
}
private void preRollback() {
- for (AmqpConsumer consumer : txConsumers) {
+ for (AmqpConsumer consumer : txConsumers.values()) {
consumer.preRollback();
}
}
private void postCommit() {
- for (AmqpConsumer consumer : txConsumers) {
+ for (AmqpConsumer consumer : txConsumers.values()) {
consumer.postCommit();
}
@@ -235,7 +223,7 @@ public class AmqpTransactionContext implements AmqpResourceParent {
}
private void postRollback() {
- for (AmqpConsumer consumer : txConsumers) {
+ for (AmqpConsumer consumer : txConsumers.values()) {
consumer.postRollback();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org