You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2017/09/06 17:00:00 UTC
[1/2] qpid-jms git commit: NO-JIRA: make the test peer track
heartbeats in thread safe manner
Repository: qpid-jms
Updated Branches:
refs/heads/master 22c3358a5 -> e8a8358a1
NO-JIRA: make the test peer track heartbeats in thread safe manner
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/ea7b7c99
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/ea7b7c99
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/ea7b7c99
Branch: refs/heads/master
Commit: ea7b7c991779e0ea02f6277e593db630b664e606
Parents: 22c3358
Author: Robbie Gemmell <ro...@apache.org>
Authored: Wed Sep 6 17:53:55 2017 +0100
Committer: Robbie Gemmell <ro...@apache.org>
Committed: Wed Sep 6 17:54:40 2017 +0100
----------------------------------------------------------------------
.../java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java | 7 ++++---
1 file changed, 4 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ea7b7c99/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
index 2e2c9af..5c856cf 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
@@ -42,6 +42,7 @@ import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.net.ssl.SSLContext;
import javax.security.auth.Subject;
@@ -159,7 +160,7 @@ public class TestAmqpPeer implements AutoCloseable
private UnsignedInteger _lastInitiatedLinkHandle = null;
private UnsignedInteger _lastInitiatedCoordinatorLinkHandle = null;
private int advertisedIdleTimeout = 0;
- private int _emptyFrameCount = 0;
+ private AtomicInteger _emptyFrameCount = new AtomicInteger();
public TestAmqpPeer() throws IOException
{
@@ -263,7 +264,7 @@ public class TestAmqpPeer implements AutoCloseable
}
public int getEmptyFrameCount() {
- return _emptyFrameCount;
+ return _emptyFrameCount.get();
}
void receiveHeader(byte[] header)
@@ -304,7 +305,7 @@ public class TestAmqpPeer implements AutoCloseable
void receiveEmptyFrame(int type, int channel)
{
- _emptyFrameCount ++;
+ _emptyFrameCount.incrementAndGet();
LOGGER.debug("Received empty frame");
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[2/2] qpid-jms git commit: QPIDJMS-321: pre-size the inbound message
queue based on prefetch
Posted by ro...@apache.org.
QPIDJMS-321: pre-size the inbound message queue based on prefetch
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/e8a8358a
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/e8a8358a
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/e8a8358a
Branch: refs/heads/master
Commit: e8a8358a1a903d24dac697f3f5cd43765130d8fe
Parents: ea7b7c9
Author: Robbie Gemmell <ro...@apache.org>
Authored: Wed Sep 6 17:54:47 2017 +0100
Committer: Robbie Gemmell <ro...@apache.org>
Committed: Wed Sep 6 17:54:47 2017 +0100
----------------------------------------------------------------------
.../java/org/apache/qpid/jms/JmsMessageConsumer.java | 15 ++++++++-------
.../org/apache/qpid/jms/util/FifoMessageQueue.java | 6 +++++-
.../apache/qpid/jms/util/FifoMessageQueueTest.java | 4 ++--
3 files changed, 15 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e8a8358a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
index 58609b3..bd09e77 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
@@ -86,16 +86,18 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
connection.checkConsumeFromTemporaryDestination((JmsTemporaryDestination) destination);
}
+ JmsPrefetchPolicy prefetchPolicy = session.getPrefetchPolicy();
+ JmsRedeliveryPolicy redeliveryPolicy = session.getRedeliveryPolicy().copy();
+ JmsDeserializationPolicy deserializationPolicy = session.getDeserializationPolicy().copy();
+
+ int configuredPrefetch = prefetchPolicy.getConfiguredPrefetch(session, destination, isDurableSubscription(), isBrowser());
+
if (connection.isLocalMessagePriority()) {
this.messageQueue = new PriorityMessageQueue();
} else {
- this.messageQueue = new FifoMessageQueue();
+ this.messageQueue = new FifoMessageQueue(configuredPrefetch);
}
- JmsPrefetchPolicy prefetchPolicy = session.getPrefetchPolicy();
- JmsRedeliveryPolicy redeliveryPolicy = session.getRedeliveryPolicy().copy();
- JmsDeserializationPolicy deserializationPolicy = session.getDeserializationPolicy().copy();
-
consumerInfo = new JmsConsumerInfo(consumerId, messageQueue);
consumerInfo.setExplicitClientID(connection.isExplicitClientID());
consumerInfo.setSelector(selector);
@@ -106,8 +108,7 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
consumerInfo.setAcknowledgementMode(acknowledgementMode);
consumerInfo.setNoLocal(noLocal);
consumerInfo.setBrowser(isBrowser());
- consumerInfo.setPrefetchSize(
- prefetchPolicy.getConfiguredPrefetch(session, destination, isDurableSubscription(), isBrowser()));
+ consumerInfo.setPrefetchSize(configuredPrefetch);
consumerInfo.setRedeliveryPolicy(redeliveryPolicy);
consumerInfo.setLocalMessageExpiry(connection.isLocalMessageExpiry());
consumerInfo.setPresettle(session.getPresettlePolicy().isConsumerPresttled(session, destination));
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e8a8358a/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/FifoMessageQueue.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/FifoMessageQueue.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/FifoMessageQueue.java
index 7a2a4ba..a6860d1 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/FifoMessageQueue.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/FifoMessageQueue.java
@@ -28,7 +28,11 @@ import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
*/
public final class FifoMessageQueue extends AbstractMessageQueue {
- protected final Deque<JmsInboundMessageDispatch> queue = new ArrayDeque<JmsInboundMessageDispatch>();
+ protected final Deque<JmsInboundMessageDispatch> queue;
+
+ public FifoMessageQueue(int prefetchSize) {
+ this.queue = new ArrayDeque<JmsInboundMessageDispatch>(prefetchSize);
+ }
@Override
public void enqueueFirst(JmsInboundMessageDispatch envelope) {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e8a8358a/qpid-jms-client/src/test/java/org/apache/qpid/jms/util/FifoMessageQueueTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/util/FifoMessageQueueTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/util/FifoMessageQueueTest.java
index f0009da..3d3416e 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/util/FifoMessageQueueTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/util/FifoMessageQueueTest.java
@@ -47,7 +47,7 @@ public class FifoMessageQueueTest {
@Before
public void setUp() {
- queue = new FifoMessageQueue();
+ queue = new FifoMessageQueue(1000);
queue.start();
}
@@ -63,7 +63,7 @@ public class FifoMessageQueueTest {
@Test
public void testCreate() {
- FifoMessageQueue queue = new FifoMessageQueue();
+ FifoMessageQueue queue = new FifoMessageQueue(1000);
assertFalse(queue.isClosed());
assertTrue(queue.isEmpty());
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org