You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2016/05/24 21:05:09 UTC
qpid-jms git commit: QPIDJMS-177 Ensure that sessions snapshot the
policy objects they use on creation.
Repository: qpid-jms
Updated Branches:
refs/heads/master 6b6e1a76c -> 7e2c5702d
QPIDJMS-177 Ensure that sessions snapshot the policy objects they use on
creation.
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/7e2c5702
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/7e2c5702
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/7e2c5702
Branch: refs/heads/master
Commit: 7e2c5702d448b2517cdfc6bfb3ac3e2101299076
Parents: 6b6e1a7
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue May 24 17:05:00 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue May 24 17:05:00 2016 -0400
----------------------------------------------------------------------
.../org/apache/qpid/jms/JmsMessageConsumer.java | 4 +-
.../org/apache/qpid/jms/JmsMessageProducer.java | 2 +-
.../java/org/apache/qpid/jms/JmsSession.java | 24 +++++++----
.../apache/qpid/jms/meta/JmsSessionInfo.java | 42 ++++++++++++++++++++
.../jms/integration/SessionIntegrationTest.java | 22 ++++++++++
5 files changed, 84 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7e2c5702/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
index fd74ffe..d797918 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
@@ -86,8 +86,8 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
this.messageQueue = new FifoMessageQueue();
}
- JmsPrefetchPolicy prefetchPolicy = connection.getPrefetchPolicy();
- JmsRedeliveryPolicy redeliveryPolicy = connection.getRedeliveryPolicy().copy();
+ JmsPrefetchPolicy prefetchPolicy = session.getPrefetchPolicy();
+ JmsRedeliveryPolicy redeliveryPolicy = session.getRedeliveryPolicy().copy();
consumerInfo = new JmsConsumerInfo(consumerId);
consumerInfo.setClientId(connection.getClientID());
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7e2c5702/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
index 89165e4..e040068 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
@@ -58,7 +58,7 @@ public class JmsMessageProducer implements AutoCloseable, MessageProducer {
this.anonymousProducer = destination == null;
JmsMessageIDBuilder messageIDBuilder =
- session.getConnection().getMessageIDPolicy().getMessageIDBuilder(session, destination);
+ session.getMessageIDPolicy().getMessageIDBuilder(session, destination);
this.producerInfo = new JmsProducerInfo(producerId, messageIDBuilder);
this.producerInfo.setDestination(destination);
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7e2c5702/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
index b2f87ed..7b3d20f 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
@@ -68,8 +68,10 @@ import org.apache.qpid.jms.meta.JmsProducerId;
import org.apache.qpid.jms.meta.JmsProducerInfo;
import org.apache.qpid.jms.meta.JmsSessionId;
import org.apache.qpid.jms.meta.JmsSessionInfo;
+import org.apache.qpid.jms.policy.JmsMessageIDPolicy;
import org.apache.qpid.jms.policy.JmsPrefetchPolicy;
import org.apache.qpid.jms.policy.JmsPresettlePolicy;
+import org.apache.qpid.jms.policy.JmsRedeliveryPolicy;
import org.apache.qpid.jms.provider.Provider;
import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
import org.apache.qpid.jms.provider.ProviderFuture;
@@ -94,8 +96,6 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
private final AtomicBoolean started = new AtomicBoolean();
private final LinkedBlockingQueue<JmsInboundMessageDispatch> stoppedMessages =
new LinkedBlockingQueue<JmsInboundMessageDispatch>(10000);
- private final JmsPrefetchPolicy prefetchPolicy;
- private final JmsPresettlePolicy presettlePolicy;
private final JmsSessionInfo sessionInfo;
private volatile ExecutorService executor;
private final ReentrantLock sendLock = new ReentrantLock();
@@ -109,8 +109,6 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
protected JmsSession(JmsConnection connection, JmsSessionId sessionId, int acknowledgementMode) throws JMSException {
this.connection = connection;
this.acknowledgementMode = acknowledgementMode;
- this.prefetchPolicy = connection.getPrefetchPolicy().copy();
- this.presettlePolicy = connection.getPresettlePolicy().copy();
if (acknowledgementMode == SESSION_TRANSACTED) {
setTransactionContext(new JmsLocalTransactionContext(this));
@@ -121,6 +119,10 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
sessionInfo = new JmsSessionInfo(sessionId);
sessionInfo.setAcknowledgementMode(acknowledgementMode);
sessionInfo.setSendAcksAsync(connection.isForceAsyncAcks());
+ sessionInfo.setMessageIDPolicy(connection.getMessageIDPolicy().copy());
+ sessionInfo.setPrefetchPolicy(connection.getPrefetchPolicy().copy());
+ sessionInfo.setPresettlePolicy(connection.getPresettlePolicy().copy());
+ sessionInfo.setRedeliveryPolicy(connection.getRedeliveryPolicy().copy());
connection.createResource(sessionInfo);
@@ -710,7 +712,7 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
envelope.setDispatchId(messageSequence);
if (producer.isAnonymous()) {
- envelope.setPresettle(presettlePolicy.isProducerPresttled(this, destination));
+ envelope.setPresettle(getPresettlePolicy().isProducerPresttled(this, destination));
}
transactionContext.send(connection, envelope);
@@ -923,12 +925,20 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
}
}
+ public JmsMessageIDPolicy getMessageIDPolicy() {
+ return sessionInfo.getMessageIDPolicy();
+ }
+
public JmsPrefetchPolicy getPrefetchPolicy() {
- return prefetchPolicy;
+ return sessionInfo.getPrefetchPolicy();
}
public JmsPresettlePolicy getPresettlePolicy() {
- return presettlePolicy;
+ return sessionInfo.getPresettlePolicy();
+ }
+
+ public JmsRedeliveryPolicy getRedeliveryPolicy() {
+ return sessionInfo.getRedeliveryPolicy();
}
@Override
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7e2c5702/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsSessionInfo.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsSessionInfo.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsSessionInfo.java
index c1927af..a4733b1 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsSessionInfo.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsSessionInfo.java
@@ -18,11 +18,21 @@ package org.apache.qpid.jms.meta;
import javax.jms.Session;
+import org.apache.qpid.jms.policy.JmsMessageIDPolicy;
+import org.apache.qpid.jms.policy.JmsPrefetchPolicy;
+import org.apache.qpid.jms.policy.JmsPresettlePolicy;
+import org.apache.qpid.jms.policy.JmsRedeliveryPolicy;
+
public final class JmsSessionInfo implements JmsResource, Comparable<JmsSessionInfo> {
private final JmsSessionId sessionId;
+
private int acknowledgementMode;
private boolean sendAcksAsync;
+ private JmsMessageIDPolicy messageIDPolicy;
+ private JmsPrefetchPolicy prefetchPolicy;
+ private JmsPresettlePolicy presettlePolicy;
+ private JmsRedeliveryPolicy redeliveryPolicy;
public JmsSessionInfo(JmsConnectionInfo connectionInfo, long sessionId) {
if (connectionInfo == null) {
@@ -110,4 +120,36 @@ public final class JmsSessionInfo implements JmsResource, Comparable<JmsSessionI
public int compareTo(JmsSessionInfo other) {
return sessionId.compareTo(other.sessionId);
}
+
+ public JmsMessageIDPolicy getMessageIDPolicy() {
+ return messageIDPolicy;
+ }
+
+ public void setMessageIDPolicy(JmsMessageIDPolicy messageIDPolicy) {
+ this.messageIDPolicy = messageIDPolicy;
+ }
+
+ public JmsPrefetchPolicy getPrefetchPolicy() {
+ return prefetchPolicy;
+ }
+
+ public void setPrefetchPolicy(JmsPrefetchPolicy prefetchPolicy) {
+ this.prefetchPolicy = prefetchPolicy;
+ }
+
+ public JmsPresettlePolicy getPresettlePolicy() {
+ return presettlePolicy;
+ }
+
+ public void setPresettlePolicy(JmsPresettlePolicy presettlePolicy) {
+ this.presettlePolicy = presettlePolicy;
+ }
+
+ public JmsRedeliveryPolicy getRedeliveryPolicy() {
+ return redeliveryPolicy;
+ }
+
+ public void setRedeliveryPolicy(JmsRedeliveryPolicy redeliveryPolicy) {
+ this.redeliveryPolicy = redeliveryPolicy;
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7e2c5702/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
index fe9b4b4..bca3a57 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
@@ -26,6 +26,7 @@ import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -59,6 +60,7 @@ import javax.jms.TopicSubscriber;
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.JmsDefaultConnectionListener;
import org.apache.qpid.jms.JmsOperationTimedOutException;
+import org.apache.qpid.jms.JmsSession;
import org.apache.qpid.jms.policy.JmsDefaultPrefetchPolicy;
import org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper;
import org.apache.qpid.jms.test.QpidJmsTestCase;
@@ -1541,4 +1543,24 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
}
}
}
+
+ @Test(timeout = 20000)
+ public void testSessionSnapshotsPolicyObjects() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
+ connection.start();
+
+ testPeer.expectBegin();
+
+ JmsSession session = (JmsSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ assertNotSame(session.getMessageIDPolicy(), connection.getMessageIDPolicy());
+ assertNotSame(session.getPrefetchPolicy(), connection.getPrefetchPolicy());
+ assertNotSame(session.getPresettlePolicy(), connection.getPresettlePolicy());
+ assertNotSame(session.getRedeliveryPolicy(), connection.getRedeliveryPolicy());
+
+ testPeer.expectClose();
+ connection.close();
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org