You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2016/05/24 21:05:09 UTC

qpid-jms git commit: QPIDJMS-177 Ensure that sessions snapshot the policy objects they use on creation.

Repository: qpid-jms
Updated Branches:
  refs/heads/master 6b6e1a76c -> 7e2c5702d


QPIDJMS-177 Ensure that sessions snapshot the policy objects they use on
creation. 

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/7e2c5702
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/7e2c5702
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/7e2c5702

Branch: refs/heads/master
Commit: 7e2c5702d448b2517cdfc6bfb3ac3e2101299076
Parents: 6b6e1a7
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue May 24 17:05:00 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue May 24 17:05:00 2016 -0400

----------------------------------------------------------------------
 .../org/apache/qpid/jms/JmsMessageConsumer.java |  4 +-
 .../org/apache/qpid/jms/JmsMessageProducer.java |  2 +-
 .../java/org/apache/qpid/jms/JmsSession.java    | 24 +++++++----
 .../apache/qpid/jms/meta/JmsSessionInfo.java    | 42 ++++++++++++++++++++
 .../jms/integration/SessionIntegrationTest.java | 22 ++++++++++
 5 files changed, 84 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7e2c5702/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
index fd74ffe..d797918 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
@@ -86,8 +86,8 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
             this.messageQueue = new FifoMessageQueue();
         }
 
-        JmsPrefetchPolicy prefetchPolicy = connection.getPrefetchPolicy();
-        JmsRedeliveryPolicy redeliveryPolicy = connection.getRedeliveryPolicy().copy();
+        JmsPrefetchPolicy prefetchPolicy = session.getPrefetchPolicy();
+        JmsRedeliveryPolicy redeliveryPolicy = session.getRedeliveryPolicy().copy();
 
         consumerInfo = new JmsConsumerInfo(consumerId);
         consumerInfo.setClientId(connection.getClientID());

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7e2c5702/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
index 89165e4..e040068 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
@@ -58,7 +58,7 @@ public class JmsMessageProducer implements AutoCloseable, MessageProducer {
         this.anonymousProducer = destination == null;
 
         JmsMessageIDBuilder messageIDBuilder =
-            session.getConnection().getMessageIDPolicy().getMessageIDBuilder(session, destination);
+            session.getMessageIDPolicy().getMessageIDBuilder(session, destination);
 
         this.producerInfo = new JmsProducerInfo(producerId, messageIDBuilder);
         this.producerInfo.setDestination(destination);

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7e2c5702/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
index b2f87ed..7b3d20f 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
@@ -68,8 +68,10 @@ import org.apache.qpid.jms.meta.JmsProducerId;
 import org.apache.qpid.jms.meta.JmsProducerInfo;
 import org.apache.qpid.jms.meta.JmsSessionId;
 import org.apache.qpid.jms.meta.JmsSessionInfo;
+import org.apache.qpid.jms.policy.JmsMessageIDPolicy;
 import org.apache.qpid.jms.policy.JmsPrefetchPolicy;
 import org.apache.qpid.jms.policy.JmsPresettlePolicy;
+import org.apache.qpid.jms.policy.JmsRedeliveryPolicy;
 import org.apache.qpid.jms.provider.Provider;
 import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
 import org.apache.qpid.jms.provider.ProviderFuture;
@@ -94,8 +96,6 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
     private final AtomicBoolean started = new AtomicBoolean();
     private final LinkedBlockingQueue<JmsInboundMessageDispatch> stoppedMessages =
         new LinkedBlockingQueue<JmsInboundMessageDispatch>(10000);
-    private final JmsPrefetchPolicy prefetchPolicy;
-    private final JmsPresettlePolicy presettlePolicy;
     private final JmsSessionInfo sessionInfo;
     private volatile ExecutorService executor;
     private final ReentrantLock sendLock = new ReentrantLock();
@@ -109,8 +109,6 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
     protected JmsSession(JmsConnection connection, JmsSessionId sessionId, int acknowledgementMode) throws JMSException {
         this.connection = connection;
         this.acknowledgementMode = acknowledgementMode;
-        this.prefetchPolicy = connection.getPrefetchPolicy().copy();
-        this.presettlePolicy = connection.getPresettlePolicy().copy();
 
         if (acknowledgementMode == SESSION_TRANSACTED) {
             setTransactionContext(new JmsLocalTransactionContext(this));
@@ -121,6 +119,10 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
         sessionInfo = new JmsSessionInfo(sessionId);
         sessionInfo.setAcknowledgementMode(acknowledgementMode);
         sessionInfo.setSendAcksAsync(connection.isForceAsyncAcks());
+        sessionInfo.setMessageIDPolicy(connection.getMessageIDPolicy().copy());
+        sessionInfo.setPrefetchPolicy(connection.getPrefetchPolicy().copy());
+        sessionInfo.setPresettlePolicy(connection.getPresettlePolicy().copy());
+        sessionInfo.setRedeliveryPolicy(connection.getRedeliveryPolicy().copy());
 
         connection.createResource(sessionInfo);
 
@@ -710,7 +712,7 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
             envelope.setDispatchId(messageSequence);
 
             if (producer.isAnonymous()) {
-                envelope.setPresettle(presettlePolicy.isProducerPresttled(this, destination));
+                envelope.setPresettle(getPresettlePolicy().isProducerPresttled(this, destination));
             }
 
             transactionContext.send(connection, envelope);
@@ -923,12 +925,20 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
         }
     }
 
+    public JmsMessageIDPolicy getMessageIDPolicy() {
+        return sessionInfo.getMessageIDPolicy();
+    }
+
     public JmsPrefetchPolicy getPrefetchPolicy() {
-        return prefetchPolicy;
+        return sessionInfo.getPrefetchPolicy();
     }
 
     public JmsPresettlePolicy getPresettlePolicy() {
-        return presettlePolicy;
+        return sessionInfo.getPresettlePolicy();
+    }
+
+    public JmsRedeliveryPolicy getRedeliveryPolicy() {
+        return sessionInfo.getRedeliveryPolicy();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7e2c5702/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsSessionInfo.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsSessionInfo.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsSessionInfo.java
index c1927af..a4733b1 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsSessionInfo.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsSessionInfo.java
@@ -18,11 +18,21 @@ package org.apache.qpid.jms.meta;
 
 import javax.jms.Session;
 
+import org.apache.qpid.jms.policy.JmsMessageIDPolicy;
+import org.apache.qpid.jms.policy.JmsPrefetchPolicy;
+import org.apache.qpid.jms.policy.JmsPresettlePolicy;
+import org.apache.qpid.jms.policy.JmsRedeliveryPolicy;
+
 public final class JmsSessionInfo implements JmsResource, Comparable<JmsSessionInfo> {
 
     private final JmsSessionId sessionId;
+
     private int acknowledgementMode;
     private boolean sendAcksAsync;
+    private JmsMessageIDPolicy messageIDPolicy;
+    private JmsPrefetchPolicy prefetchPolicy;
+    private JmsPresettlePolicy presettlePolicy;
+    private JmsRedeliveryPolicy redeliveryPolicy;
 
     public JmsSessionInfo(JmsConnectionInfo connectionInfo, long sessionId) {
         if (connectionInfo == null) {
@@ -110,4 +120,36 @@ public final class JmsSessionInfo implements JmsResource, Comparable<JmsSessionI
     public int compareTo(JmsSessionInfo other) {
         return sessionId.compareTo(other.sessionId);
     }
+
+    public JmsMessageIDPolicy getMessageIDPolicy() {
+        return messageIDPolicy;
+    }
+
+    public void setMessageIDPolicy(JmsMessageIDPolicy messageIDPolicy) {
+        this.messageIDPolicy = messageIDPolicy;
+    }
+
+    public JmsPrefetchPolicy getPrefetchPolicy() {
+        return prefetchPolicy;
+    }
+
+    public void setPrefetchPolicy(JmsPrefetchPolicy prefetchPolicy) {
+        this.prefetchPolicy = prefetchPolicy;
+    }
+
+    public JmsPresettlePolicy getPresettlePolicy() {
+        return presettlePolicy;
+    }
+
+    public void setPresettlePolicy(JmsPresettlePolicy presettlePolicy) {
+        this.presettlePolicy = presettlePolicy;
+    }
+
+    public JmsRedeliveryPolicy getRedeliveryPolicy() {
+        return redeliveryPolicy;
+    }
+
+    public void setRedeliveryPolicy(JmsRedeliveryPolicy redeliveryPolicy) {
+        this.redeliveryPolicy = redeliveryPolicy;
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7e2c5702/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
index fe9b4b4..bca3a57 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
@@ -26,6 +26,7 @@ import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -59,6 +60,7 @@ import javax.jms.TopicSubscriber;
 import org.apache.qpid.jms.JmsConnection;
 import org.apache.qpid.jms.JmsDefaultConnectionListener;
 import org.apache.qpid.jms.JmsOperationTimedOutException;
+import org.apache.qpid.jms.JmsSession;
 import org.apache.qpid.jms.policy.JmsDefaultPrefetchPolicy;
 import org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper;
 import org.apache.qpid.jms.test.QpidJmsTestCase;
@@ -1541,4 +1543,24 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
             }
         }
     }
+
+    @Test(timeout = 20000)
+    public void testSessionSnapshotsPolicyObjects() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
+            connection.start();
+
+            testPeer.expectBegin();
+
+            JmsSession session = (JmsSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            assertNotSame(session.getMessageIDPolicy(), connection.getMessageIDPolicy());
+            assertNotSame(session.getPrefetchPolicy(), connection.getPrefetchPolicy());
+            assertNotSame(session.getPresettlePolicy(), connection.getPresettlePolicy());
+            assertNotSame(session.getRedeliveryPolicy(), connection.getRedeliveryPolicy());
+
+            testPeer.expectClose();
+            connection.close();
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org