You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2016/05/24 18:21:48 UTC
qpid-jms git commit: QPIDJMS-180 Add JmsMessageIDPolicy and a default
implementation that preserve previous behavior.
Repository: qpid-jms
Updated Branches:
refs/heads/master 0251ae8ce -> 95bc1aa80
QPIDJMS-180 Add JmsMessageIDPolicy and a default implementation that
preserve previous behavior.
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/95bc1aa8
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/95bc1aa8
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/95bc1aa8
Branch: refs/heads/master
Commit: 95bc1aa809b406c16e31802cf66c863cb0686a53
Parents: 0251ae8
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue May 24 14:15:11 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue May 24 14:15:11 2016 -0400
----------------------------------------------------------------------
.../java/org/apache/qpid/jms/JmsConnection.java | 16 +-
.../apache/qpid/jms/JmsConnectionFactory.java | 75 ++++-
.../org/apache/qpid/jms/JmsMessageProducer.java | 11 +-
.../java/org/apache/qpid/jms/JmsSession.java | 5 +-
.../apache/qpid/jms/meta/JmsConnectionInfo.java | 15 +-
.../apache/qpid/jms/meta/JmsProducerInfo.java | 24 +-
.../jms/policy/JmsDefaultMessageIDPolicy.java | 81 +++++
.../jms/policy/JmsDefaultPresettlePolicy.java | 2 +-
.../jms/policy/JmsDefaultRedeliveryPolicy.java | 2 +-
.../qpid/jms/policy/JmsMessageIDPolicy.java | 48 +++
.../qpid/jms/JmsConnectionFactoryTest.java | 28 ++
.../ConnectionFactoryIntegrationTest.java | 335 +++++++++++++++++--
.../integration/ProducerIntegrationTest.java | 112 ++++++-
.../qpid/jms/meta/JmsProducerInfoTest.java | 39 +--
14 files changed, 703 insertions(+), 90 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/95bc1aa8/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
index 38aa2fc..7375c3f 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
@@ -54,7 +54,6 @@ import org.apache.qpid.jms.exceptions.JmsExceptionSupport;
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
import org.apache.qpid.jms.message.JmsMessage;
import org.apache.qpid.jms.message.JmsMessageFactory;
-import org.apache.qpid.jms.message.JmsMessageIDBuilder;
import org.apache.qpid.jms.message.JmsOutboundMessageDispatch;
import org.apache.qpid.jms.meta.JmsConnectionId;
import org.apache.qpid.jms.meta.JmsConnectionInfo;
@@ -67,6 +66,7 @@ import org.apache.qpid.jms.meta.JmsSessionId;
import org.apache.qpid.jms.meta.JmsSessionInfo;
import org.apache.qpid.jms.meta.JmsTransactionId;
import org.apache.qpid.jms.meta.JmsTransactionInfo;
+import org.apache.qpid.jms.policy.JmsMessageIDPolicy;
import org.apache.qpid.jms.policy.JmsPrefetchPolicy;
import org.apache.qpid.jms.policy.JmsPresettlePolicy;
import org.apache.qpid.jms.policy.JmsRedeliveryPolicy;
@@ -123,8 +123,8 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
// not have it's own mechanism for doing so.
executor = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
@Override
- public Thread newThread(Runnable r) {
- Thread thread = new Thread(r, "QpidJMS Connection Executor: " + connectionId);
+ public Thread newThread(Runnable target) {
+ Thread thread = new Thread(target, "QpidJMS Connection Executor: " + connectionId);
thread.setDaemon(false);
return thread;
}
@@ -1012,12 +1012,12 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
connectionInfo.setLocalMessageExpiry(localMessageExpiry);
}
- public JmsMessageIDBuilder getMessageIDBuilder() {
- return connectionInfo.getMessageIDBuilder();
+ public JmsMessageIDPolicy getMessageIDPolicy() {
+ return connectionInfo.getMessageIDPolicy();
}
- void setMessageIDBuilder(JmsMessageIDBuilder messageIDBuilder) {
- connectionInfo.setMessageIDBuilder(messageIDBuilder);
+ public void setMessageIDPolicy(JmsMessageIDPolicy messageIDPolicy) {
+ connectionInfo.setMessageIDPolicy(messageIDPolicy);
}
public boolean isPopulateJMSXUserID() {
@@ -1154,7 +1154,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
onProviderException(ex);
- for(AsyncResult request : requests.keySet()) {
+ for (AsyncResult request : requests.keySet()) {
try {
request.onFailure(ex);
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/95bc1aa8/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java
index f874193..32cb39a 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java
@@ -35,9 +35,11 @@ import org.apache.qpid.jms.exceptions.JmsExceptionSupport;
import org.apache.qpid.jms.jndi.JNDIStorable;
import org.apache.qpid.jms.message.JmsMessageIDBuilder;
import org.apache.qpid.jms.meta.JmsConnectionInfo;
+import org.apache.qpid.jms.policy.JmsDefaultMessageIDPolicy;
import org.apache.qpid.jms.policy.JmsDefaultPrefetchPolicy;
import org.apache.qpid.jms.policy.JmsDefaultPresettlePolicy;
import org.apache.qpid.jms.policy.JmsDefaultRedeliveryPolicy;
+import org.apache.qpid.jms.policy.JmsMessageIDPolicy;
import org.apache.qpid.jms.policy.JmsPrefetchPolicy;
import org.apache.qpid.jms.policy.JmsPresettlePolicy;
import org.apache.qpid.jms.policy.JmsRedeliveryPolicy;
@@ -93,7 +95,7 @@ public class JmsConnectionFactory extends JNDIStorable implements ConnectionFact
private JmsPrefetchPolicy prefetchPolicy = new JmsDefaultPrefetchPolicy();
private JmsRedeliveryPolicy redeliveryPolicy = new JmsDefaultRedeliveryPolicy();
private JmsPresettlePolicy presettlePolicy = new JmsDefaultPresettlePolicy();
- private JmsMessageIDBuilder messageIDBuilder = JmsMessageIDBuilder.BUILTIN.DEFAULT.createBuilder();
+ private JmsMessageIDPolicy messageIDPolicy = new JmsDefaultMessageIDPolicy();
public JmsConnectionFactory() {
}
@@ -208,9 +210,15 @@ public class JmsConnectionFactory extends JNDIStorable implements ConnectionFact
properties.remove(CLIENT_ID_PROP);
}
+ // Copy the configured policies before applying URI options that
+ // might make additional configuration changes.
+ connection.setMessageIDPolicy(messageIDPolicy.copy());
+ connection.setPrefetchPolicy(prefetchPolicy.copy());
+ connection.setPresettlePolicy(presettlePolicy.copy());
+ connection.setRedeliveryPolicy(redeliveryPolicy.copy());
+
PropertyUtil.setProperties(connection, properties);
connection.setExceptionListener(exceptionListener);
- connection.setMessageIDBuilder(messageIDBuilder);
connection.setUsername(username);
connection.setPassword(password);
connection.setConfiguredURI(remoteURI);
@@ -500,6 +508,10 @@ public class JmsConnectionFactory extends JNDIStorable implements ConnectionFact
}
public void setPrefetchPolicy(JmsPrefetchPolicy prefetchPolicy) {
+ if (prefetchPolicy == null) {
+ prefetchPolicy = new JmsDefaultPrefetchPolicy();
+ }
+
this.prefetchPolicy = prefetchPolicy;
}
@@ -519,6 +531,9 @@ public class JmsConnectionFactory extends JNDIStorable implements ConnectionFact
* The new redeliveryPolicy to set
*/
public void setRedeliveryPolicy(JmsRedeliveryPolicy redeliveryPolicy) {
+ if (redeliveryPolicy == null) {
+ redeliveryPolicy = new JmsDefaultRedeliveryPolicy();
+ }
this.redeliveryPolicy = redeliveryPolicy;
}
@@ -536,10 +551,35 @@ public class JmsConnectionFactory extends JNDIStorable implements ConnectionFact
* the presettlePolicy to use by connections created from this factory.
*/
public void setPresettlePolicy(JmsPresettlePolicy presettlePolicy) {
+ if (presettlePolicy == null) {
+ presettlePolicy = new JmsDefaultPresettlePolicy();
+ }
this.presettlePolicy = presettlePolicy;
}
/**
+ * @return the messageIDPolicy that is currently configured.
+ */
+ public JmsMessageIDPolicy getMessageIDPolicy() {
+ return messageIDPolicy;
+ }
+
+ /**
+ * Sets the JmsMessageIDPolicy that is use to configure the JmsMessageIDBuilder
+ * that is assigned to any new MessageProducer created from Connection instances
+ * that this factory has created.
+ *
+ * @param messageIDPolicy
+ * the messageIDPolicy to use by connections created from this factory.
+ */
+ public void setMessageIDPolicy(JmsMessageIDPolicy messageIDPolicy) {
+ if (messageIDPolicy == null) {
+ messageIDPolicy = new JmsDefaultMessageIDPolicy();
+ }
+ this.messageIDPolicy = messageIDPolicy;
+ }
+
+ /**
* @return the currently configured client ID prefix for auto-generated client IDs.
*/
public synchronized String getClientIDPrefix() {
@@ -667,22 +707,39 @@ public class JmsConnectionFactory extends JNDIStorable implements ConnectionFact
/**
* Sets the type of the Message IDs used to populate the outgoing Messages
*
+ * @deprecated use the jms.messageIDPolicy.messageIDType URI setting instead.
+ *
* @param type
* The name of the Message type to use when sending a message.
*/
+ @Deprecated
public void setMessageIDType(String type) {
- this.messageIDBuilder = JmsMessageIDBuilder.BUILTIN.create(type);
+ if (messageIDPolicy instanceof JmsDefaultMessageIDPolicy) {
+ ((JmsDefaultMessageIDPolicy) messageIDPolicy).setMessageIDType(type);
+ }
}
+ @Deprecated
public String getMessageIDType() {
- return this.messageIDBuilder.toString();
+ if (messageIDPolicy instanceof JmsDefaultMessageIDPolicy) {
+ return ((JmsDefaultMessageIDPolicy) this.messageIDPolicy).getMessageIDType();
+ }
+
+ return null;
}
/**
* @return the messageIDBuilder currently configured.
+ *
+ * @deprecated Create a custom JmsMessageIDPolicy to control the JmsMessageIDBuilder
*/
+ @Deprecated
public JmsMessageIDBuilder getMessageIDBuilder() {
- return messageIDBuilder;
+ if (messageIDPolicy instanceof JmsDefaultMessageIDPolicy) {
+ return ((JmsDefaultMessageIDPolicy) this.messageIDPolicy).getMessageIDBuilder();
+ }
+
+ return null;
}
/**
@@ -692,12 +749,14 @@ public class JmsConnectionFactory extends JNDIStorable implements ConnectionFact
*
* @param messageIDBuilder
* The custom JmsMessageIDBuilder to use to create outgoing Message IDs.
+ *
+ * @deprecated Create a custom JmsMessageIDPolicy to control the JmsMessageIDBuilder
*/
+ @Deprecated
public void setMessageIDBuilder(JmsMessageIDBuilder messageIDBuilder) {
- if (messageIDBuilder == null) {
- messageIDBuilder = JmsMessageIDBuilder.BUILTIN.DEFAULT.createBuilder();
+ if (messageIDPolicy instanceof JmsDefaultMessageIDPolicy) {
+ ((JmsDefaultMessageIDPolicy) this.messageIDPolicy).setMessageIDBuilder(messageIDBuilder);
}
- this.messageIDBuilder = messageIDBuilder;
}
public boolean isReceiveLocalOnly() {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/95bc1aa8/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
index c077230..89165e4 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
@@ -28,6 +28,7 @@ import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
+import org.apache.qpid.jms.message.JmsMessageIDBuilder;
import org.apache.qpid.jms.meta.JmsProducerId;
import org.apache.qpid.jms.meta.JmsProducerInfo;
import org.apache.qpid.jms.provider.Provider;
@@ -55,7 +56,11 @@ public class JmsMessageProducer implements AutoCloseable, MessageProducer {
this.session = session;
this.connection = session.getConnection();
this.anonymousProducer = destination == null;
- this.producerInfo = new JmsProducerInfo(producerId);
+
+ JmsMessageIDBuilder messageIDBuilder =
+ session.getConnection().getMessageIDPolicy().getMessageIDBuilder(session, destination);
+
+ this.producerInfo = new JmsProducerInfo(producerId, messageIDBuilder);
this.producerInfo.setDestination(destination);
this.producerInfo.setPresettle(session.getPresettlePolicy().isProducerPresttled(session, destination));
@@ -241,6 +246,10 @@ public class JmsMessageProducer implements AutoCloseable, MessageProducer {
return anonymousProducer;
}
+ protected JmsMessageIDBuilder getMessageIDBuilder() {
+ return producerInfo.getMessageIDBuilder();
+ }
+
void setFailureCause(Exception failureCause) {
this.failureCause.set(failureCause);
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/95bc1aa8/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
index 66d9539..b2f87ed 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
@@ -60,7 +60,6 @@ import javax.jms.TopicSubscriber;
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
import org.apache.qpid.jms.message.JmsMessage;
-import org.apache.qpid.jms.message.JmsMessageIDBuilder;
import org.apache.qpid.jms.message.JmsMessageTransformation;
import org.apache.qpid.jms.message.JmsOutboundMessageDispatch;
import org.apache.qpid.jms.meta.JmsConsumerId;
@@ -97,7 +96,6 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
new LinkedBlockingQueue<JmsInboundMessageDispatch>(10000);
private final JmsPrefetchPolicy prefetchPolicy;
private final JmsPresettlePolicy presettlePolicy;
- private final JmsMessageIDBuilder messageIDBuilder;
private final JmsSessionInfo sessionInfo;
private volatile ExecutorService executor;
private final ReentrantLock sendLock = new ReentrantLock();
@@ -113,7 +111,6 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
this.acknowledgementMode = acknowledgementMode;
this.prefetchPolicy = connection.getPrefetchPolicy().copy();
this.presettlePolicy = connection.getPresettlePolicy().copy();
- this.messageIDBuilder = connection.getMessageIDBuilder();
if (acknowledgementMode == SESSION_TRANSACTED) {
setTransactionContext(new JmsLocalTransactionContext(this));
@@ -664,7 +661,7 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
long messageSequence = producer.getNextMessageSequence();
Object messageId = null;
if (!disableMsgId) {
- messageId = messageIDBuilder.createMessageID(producer.getProducerId().toString(), messageSequence);
+ messageId = producer.getMessageIDBuilder().createMessageID(producer.getProducerId().toString(), messageSequence);
}
JmsMessage copy = null;
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/95bc1aa8/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConnectionInfo.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConnectionInfo.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConnectionInfo.java
index 14661ca..f6bf6a7 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConnectionInfo.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConnectionInfo.java
@@ -19,10 +19,11 @@ package org.apache.qpid.jms.meta;
import java.net.URI;
import java.nio.charset.Charset;
-import org.apache.qpid.jms.message.JmsMessageIDBuilder;
+import org.apache.qpid.jms.policy.JmsDefaultMessageIDPolicy;
import org.apache.qpid.jms.policy.JmsDefaultPrefetchPolicy;
import org.apache.qpid.jms.policy.JmsDefaultPresettlePolicy;
import org.apache.qpid.jms.policy.JmsDefaultRedeliveryPolicy;
+import org.apache.qpid.jms.policy.JmsMessageIDPolicy;
import org.apache.qpid.jms.policy.JmsPrefetchPolicy;
import org.apache.qpid.jms.policy.JmsPresettlePolicy;
import org.apache.qpid.jms.policy.JmsRedeliveryPolicy;
@@ -65,7 +66,7 @@ public final class JmsConnectionInfo implements JmsResource, Comparable<JmsConne
private JmsPrefetchPolicy prefetchPolicy = new JmsDefaultPrefetchPolicy();
private JmsRedeliveryPolicy redeliveryPolicy = new JmsDefaultRedeliveryPolicy();
private JmsPresettlePolicy presettlePolicy = new JmsDefaultPresettlePolicy();
- private JmsMessageIDBuilder messageIDBuilder = JmsMessageIDBuilder.BUILTIN.DEFAULT.createBuilder();
+ private JmsMessageIDPolicy messageIDPolicy = new JmsDefaultMessageIDPolicy();
private volatile byte[] encodedUserId;
@@ -96,7 +97,7 @@ public final class JmsConnectionInfo implements JmsResource, Comparable<JmsConne
copy.topicPrefix = topicPrefix;
copy.connectTimeout = connectTimeout;
copy.validatePropertyNames = validatePropertyNames;
- copy.messageIDBuilder = messageIDBuilder;
+ copy.messageIDPolicy = messageIDPolicy;
copy.prefetchPolicy = prefetchPolicy.copy();
copy.redeliveryPolicy = redeliveryPolicy.copy();
copy.presettlePolicy = presettlePolicy.copy();
@@ -283,12 +284,12 @@ public final class JmsConnectionInfo implements JmsResource, Comparable<JmsConne
this.presettlePolicy = presettlePolicy;
}
- public JmsMessageIDBuilder getMessageIDBuilder() {
- return messageIDBuilder;
+ public JmsMessageIDPolicy getMessageIDPolicy() {
+ return messageIDPolicy;
}
- public void setMessageIDBuilder(JmsMessageIDBuilder messageIDBuilder) {
- this.messageIDBuilder = messageIDBuilder;
+ public void setMessageIDPolicy(JmsMessageIDPolicy messageIDPolicy) {
+ this.messageIDPolicy = messageIDPolicy;
}
public boolean isPopulateJMSXUserID() {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/95bc1aa8/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsProducerInfo.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsProducerInfo.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsProducerInfo.java
index 8b1e019..1b006d0 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsProducerInfo.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsProducerInfo.java
@@ -17,31 +17,35 @@
package org.apache.qpid.jms.meta;
import org.apache.qpid.jms.JmsDestination;
+import org.apache.qpid.jms.message.JmsMessageIDBuilder;
public final class JmsProducerInfo implements JmsResource, Comparable<JmsProducerInfo> {
private final JmsProducerId producerId;
+ private final JmsMessageIDBuilder messageIDBuilder;
+
private JmsDestination destination;
private boolean presettle;
public JmsProducerInfo(JmsProducerId producerId) {
+ this(producerId, JmsMessageIDBuilder.BUILTIN.DEFAULT.createBuilder());
+ }
+
+ public JmsProducerInfo(JmsProducerId producerId, JmsMessageIDBuilder messageIDBuilder) {
if (producerId == null) {
throw new IllegalArgumentException("Producer ID cannot be null");
}
- this.producerId = producerId;
- }
-
- public JmsProducerInfo(JmsSessionInfo sessionInfo, long producerId) {
- if (sessionInfo == null) {
- throw new IllegalArgumentException("Parent Session Info object cannot be null");
+ if (messageIDBuilder == null) {
+ throw new IllegalArgumentException("Message ID Builder cannot be null");
}
- this.producerId = new JmsProducerId(sessionInfo.getId(), producerId);
+ this.producerId = producerId;
+ this.messageIDBuilder = messageIDBuilder;
}
public JmsProducerInfo copy() {
- JmsProducerInfo info = new JmsProducerInfo(producerId);
+ JmsProducerInfo info = new JmsProducerInfo(producerId, messageIDBuilder);
copy(info);
return info;
}
@@ -85,6 +89,10 @@ public final class JmsProducerInfo implements JmsResource, Comparable<JmsProduce
this.presettle = presettle;
}
+ public JmsMessageIDBuilder getMessageIDBuilder() {
+ return messageIDBuilder;
+ }
+
@Override
public String toString() {
return "JmsProducerInfo { " + getId() + ", destination = " + getDestination() + " }";
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/95bc1aa8/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsDefaultMessageIDPolicy.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsDefaultMessageIDPolicy.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsDefaultMessageIDPolicy.java
new file mode 100644
index 0000000..2051561
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsDefaultMessageIDPolicy.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.policy;
+
+import org.apache.qpid.jms.JmsDestination;
+import org.apache.qpid.jms.JmsSession;
+import org.apache.qpid.jms.message.JmsMessageIDBuilder;
+
+/**
+ * The default MessageID policy used for all MessageProducers created from the
+ * client's connection factory.
+ */
+public class JmsDefaultMessageIDPolicy implements JmsMessageIDPolicy {
+
+ private JmsMessageIDBuilder messageIDBuilder = JmsMessageIDBuilder.BUILTIN.DEFAULT.createBuilder();
+
+ /**
+ * Initialize default Message ID builder policy
+ */
+ public JmsDefaultMessageIDPolicy() {
+ }
+
+ /**
+ * Creates a new JmsDefaultMessageIDPolicy instance copied from the source policy.
+ *
+ * @param source
+ * The policy instance to copy values from.
+ */
+ public JmsDefaultMessageIDPolicy(JmsDefaultMessageIDPolicy source) {
+ this.messageIDBuilder = source.messageIDBuilder;
+ }
+
+ @Override
+ public JmsDefaultMessageIDPolicy copy() {
+ return new JmsDefaultMessageIDPolicy(this);
+ }
+
+ @Override
+ public JmsMessageIDBuilder getMessageIDBuilder(JmsSession session, JmsDestination destination) {
+ return messageIDBuilder;
+ }
+
+ /**
+ * Sets the type of the Message IDs used to populate the outgoing Messages
+ *
+ * @param type
+ * The name of the Message type to use when sending a message.
+ */
+ public void setMessageIDType(String type) {
+ this.messageIDBuilder = JmsMessageIDBuilder.BUILTIN.create(type);
+ }
+
+ /**
+ * @return the type name of the configured JmsMessageIDBuilder.
+ */
+ public String getMessageIDType() {
+ return this.messageIDBuilder.toString();
+ }
+
+ public JmsMessageIDBuilder getMessageIDBuilder() {
+ return messageIDBuilder;
+ }
+
+ public void setMessageIDBuilder(JmsMessageIDBuilder messageIDBuilder) {
+ this.messageIDBuilder = messageIDBuilder;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/95bc1aa8/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsDefaultPresettlePolicy.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsDefaultPresettlePolicy.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsDefaultPresettlePolicy.java
index 8560750..2dc9f60 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsDefaultPresettlePolicy.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsDefaultPresettlePolicy.java
@@ -51,7 +51,7 @@ public class JmsDefaultPresettlePolicy implements JmsPresettlePolicy {
}
@Override
- public JmsPresettlePolicy copy() {
+ public JmsDefaultPresettlePolicy copy() {
return new JmsDefaultPresettlePolicy(this);
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/95bc1aa8/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsDefaultRedeliveryPolicy.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsDefaultRedeliveryPolicy.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsDefaultRedeliveryPolicy.java
index bf885ea..e9869b3 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsDefaultRedeliveryPolicy.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsDefaultRedeliveryPolicy.java
@@ -36,7 +36,7 @@ public class JmsDefaultRedeliveryPolicy implements JmsRedeliveryPolicy {
}
@Override
- public JmsRedeliveryPolicy copy() {
+ public JmsDefaultRedeliveryPolicy copy() {
return new JmsDefaultRedeliveryPolicy(this);
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/95bc1aa8/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsMessageIDPolicy.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsMessageIDPolicy.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsMessageIDPolicy.java
new file mode 100644
index 0000000..e519fc0
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsMessageIDPolicy.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.policy;
+
+import org.apache.qpid.jms.JmsDestination;
+import org.apache.qpid.jms.JmsSession;
+import org.apache.qpid.jms.message.JmsMessageIDBuilder;
+
+/**
+ * Interface for a policy that controls what kind of MessageID type is used for Messages
+ * sent to a specific destination.
+ */
+public interface JmsMessageIDPolicy {
+
+ /**
+ * Copy this policy into a newly allocated instance.
+ *
+ * @return a new JmsMessageIDPolicy that is a copy of this one.
+ */
+ JmsMessageIDPolicy copy();
+
+ /**
+ * Returns the JmsMessageIDBuilder that should be used with the producer being created.
+ *
+ * @param session
+ * the Session that own the MessageProducer being created.
+ * @param destination
+ * the Destination that the consumer will be subscribed to.
+ *
+ * @return the JmsMessageIDBuilder instance that is assigned to the new producer.
+ */
+ JmsMessageIDBuilder getMessageIDBuilder(JmsSession session, JmsDestination destination);
+
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/95bc1aa8/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionFactoryTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionFactoryTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionFactoryTest.java
index e5083cf..78e3164 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionFactoryTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionFactoryTest.java
@@ -370,6 +370,34 @@ public class JmsConnectionFactoryTest extends QpidJmsTestCase {
}
@Test
+ public void testSetRemoteURIThrowsOnNullURI() throws Exception {
+ JmsConnectionFactory cf = new JmsConnectionFactory();
+ try {
+ cf.setRemoteURI(null);
+ fail("Should not allow a null URI to be set.");
+ } catch (IllegalArgumentException e) {
+ }
+ }
+
+ @Test
+ public void testCreateWithNullURIRemoteURIThrows() throws Exception {
+ try {
+ new JmsConnectionFactory("user", "pass", (URI) null);
+ fail("Should not allow a null URI to be set.");
+ } catch (NullPointerException e) {
+ }
+ }
+
+ @Test
+ public void testCreateWithNullURIStringRemoteURIThrows() throws Exception {
+ try {
+ new JmsConnectionFactory("user", "pass", (String) null);
+ fail("Should not allow a null URI to be set.");
+ } catch (IllegalArgumentException e) {
+ }
+ }
+
+ @Test
public void testSerializeTwoConnectionFactories() throws Exception {
String uri = "amqp://localhost:1234";
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/95bc1aa8/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionFactoryIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionFactoryIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionFactoryIntegrationTest.java
index 016f188..cccb05e 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionFactoryIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionFactoryIntegrationTest.java
@@ -22,6 +22,7 @@ package org.apache.qpid.jms.integration;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -29,11 +30,23 @@ import java.net.URI;
import java.util.UUID;
import javax.jms.Connection;
+import javax.jms.QueueConnection;
+import javax.jms.TopicConnection;
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.JmsConnectionFactory;
+import org.apache.qpid.jms.JmsDestination;
+import org.apache.qpid.jms.JmsSession;
import org.apache.qpid.jms.message.JmsMessageIDBuilder;
import org.apache.qpid.jms.message.JmsMessageIDBuilder.BUILTIN;
+import org.apache.qpid.jms.policy.JmsDefaultMessageIDPolicy;
+import org.apache.qpid.jms.policy.JmsDefaultPrefetchPolicy;
+import org.apache.qpid.jms.policy.JmsDefaultPresettlePolicy;
+import org.apache.qpid.jms.policy.JmsDefaultRedeliveryPolicy;
+import org.apache.qpid.jms.policy.JmsMessageIDPolicy;
+import org.apache.qpid.jms.policy.JmsPrefetchPolicy;
+import org.apache.qpid.jms.policy.JmsPresettlePolicy;
+import org.apache.qpid.jms.policy.JmsRedeliveryPolicy;
import org.apache.qpid.jms.test.QpidJmsTestCase;
import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
import org.junit.Test;
@@ -44,19 +57,6 @@ public class ConnectionFactoryIntegrationTest extends QpidJmsTestCase {
private static final Logger LOG = LoggerFactory.getLogger(ConnectionFactoryIntegrationTest.class);
- private final class TestJmsMessageIdBuilder implements JmsMessageIDBuilder {
-
- @Override
- public Object createMessageID(String producerId, long messageSequence) {
- return UUID.randomUUID();
- }
-
- @Override
- public String toString() {
- return "TEST";
- }
- }
-
@Test(timeout=20000)
public void testCreateConnectionGoodProviderURI() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
@@ -80,6 +80,28 @@ public class ConnectionFactoryIntegrationTest extends QpidJmsTestCase {
}
@Test(timeout=20000)
+ public void testTopicCreateConnectionGoodProviderString() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ // DONT create a test fixture, we will drive everything directly.
+ JmsConnectionFactory factory = new JmsConnectionFactory("amqp://127.0.0.1:" + testPeer.getServerPort());
+ TopicConnection connection = factory.createTopicConnection();
+ assertNotNull(connection);
+ connection.close();
+ }
+ }
+
+ @Test(timeout=20000)
+ public void testCreateQueueConnectionGoodProviderString() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ // DONT create a test fixture, we will drive everything directly.
+ JmsConnectionFactory factory = new JmsConnectionFactory("amqp://127.0.0.1:" + testPeer.getServerPort());
+ QueueConnection connection = factory.createQueueConnection();
+ assertNotNull(connection);
+ connection.close();
+ }
+ }
+
+ @Test(timeout=20000)
public void testUriOptionsAppliedToConnection() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
// DONT create a test fixture, we will drive everything directly.
@@ -114,7 +136,7 @@ public class ConnectionFactoryIntegrationTest extends QpidJmsTestCase {
public void testSetInvalidMessageIDFormatOption() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
// DONT create a test fixture, we will drive everything directly.
- String uri = "amqp://127.0.0.1:" + testPeer.getServerPort() + "?jms.messageIDType=UNKNOWN";
+ String uri = "amqp://127.0.0.1:" + testPeer.getServerPort() + "?jms.messageIDPolicy.messageIDType=UNKNOWN";
try {
new JmsConnectionFactory(uri);
fail("Should not be able to create a factory with invalid id type option value.");
@@ -124,14 +146,16 @@ public class ConnectionFactoryIntegrationTest extends QpidJmsTestCase {
}
}
+ // TODO - Remove once the deprecated methods are removed.
@Test(timeout=20000)
- public void testSetMessageIDFormatOptionAlteredCase() throws Exception {
+ public void testSetMessageIDFormatOptionAlteredCaseLegacy() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
// DONT create a test fixture, we will drive everything directly.
try {
String uri = "amqp://127.0.0.1:" + testPeer.getServerPort() + "?jms.messageIDType=uuid";
JmsConnectionFactory factory = new JmsConnectionFactory(uri);
- assertEquals(JmsMessageIDBuilder.BUILTIN.UUID.name(), factory.getMessageIDType());
+ JmsDefaultMessageIDPolicy policy = (JmsDefaultMessageIDPolicy) factory.getMessageIDPolicy();
+ assertEquals(JmsMessageIDBuilder.BUILTIN.UUID.name(), policy.getMessageIDType());
} catch (Exception ex) {
fail("Should have succeeded in creating factory");
}
@@ -139,15 +163,17 @@ public class ConnectionFactoryIntegrationTest extends QpidJmsTestCase {
try {
String uri = "amqp://127.0.0.1:" + testPeer.getServerPort() + "?jms.messageIDType=Uuid";
JmsConnectionFactory factory = new JmsConnectionFactory(uri);
- assertEquals(JmsMessageIDBuilder.BUILTIN.UUID.name(), factory.getMessageIDType());
+ JmsDefaultMessageIDPolicy policy = (JmsDefaultMessageIDPolicy) factory.getMessageIDPolicy();
+ assertEquals(JmsMessageIDBuilder.BUILTIN.UUID.name(), policy.getMessageIDType());
} catch (Exception ex) {
fail("Should have succeeded in creating factory");
}
}
}
+ // TODO - Remove once the deprecated methods are removed.
@Test(timeout=20000)
- public void testMessageIDFormatOptionApplied() throws Exception {
+ public void testMessageIDFormatOptionAppliedLegacy() throws Exception {
BUILTIN[] formatters = JmsMessageIDBuilder.BUILTIN.values();
for (BUILTIN formatter : formatters) {
@@ -156,18 +182,63 @@ public class ConnectionFactoryIntegrationTest extends QpidJmsTestCase {
// DONT create a test fixture, we will drive everything directly.
String uri = "amqp://127.0.0.1:" + testPeer.getServerPort() + "?jms.messageIDType=" + formatter.name();
JmsConnectionFactory factory = new JmsConnectionFactory(uri);
- assertEquals(formatter.name(), factory.getMessageIDType());
+ assertEquals(formatter.name(), ((JmsDefaultMessageIDPolicy) factory.getMessageIDPolicy()).getMessageIDType());
JmsConnection connection = (JmsConnection) factory.createConnection();
- assertEquals(formatter.name(), connection.getMessageIDBuilder().toString());
+ assertEquals(formatter.name(), ((JmsDefaultMessageIDPolicy) connection.getMessageIDPolicy()).getMessageIDBuilder().toString());
connection.close();
}
}
}
@Test(timeout=20000)
- public void testSetCustomMessageIDBuilder() throws Exception {
- TestJmsMessageIdBuilder custom = new TestJmsMessageIdBuilder();
+ public void testSetMessageIDFormatOptionAlteredCase() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ // DONT create a test fixture, we will drive everything directly.
+ try {
+ String uri = "amqp://127.0.0.1:" + testPeer.getServerPort() + "?jms.messageIDPolicy.messageIDType=uuid";
+ JmsConnectionFactory factory = new JmsConnectionFactory(uri);
+ JmsDefaultMessageIDPolicy policy = (JmsDefaultMessageIDPolicy) factory.getMessageIDPolicy();
+ assertEquals(JmsMessageIDBuilder.BUILTIN.UUID.name(), policy.getMessageIDType());
+ } catch (Exception ex) {
+ fail("Should have succeeded in creating factory");
+ }
+
+ try {
+ String uri = "amqp://127.0.0.1:" + testPeer.getServerPort() + "?jms.messageIDPolicy.messageIDType=Uuid";
+ JmsConnectionFactory factory = new JmsConnectionFactory(uri);
+ JmsDefaultMessageIDPolicy policy = (JmsDefaultMessageIDPolicy) factory.getMessageIDPolicy();
+ assertEquals(JmsMessageIDBuilder.BUILTIN.UUID.name(), policy.getMessageIDType());
+ } catch (Exception ex) {
+ fail("Should have succeeded in creating factory");
+ }
+ }
+ }
+
+ @Test(timeout=20000)
+ public void testMessageIDFormatOptionApplied() throws Exception {
+ BUILTIN[] formatters = JmsMessageIDBuilder.BUILTIN.values();
+
+ for (BUILTIN formatter : formatters) {
+ LOG.info("Testing application of Message ID Format: {}", formatter.name());
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ // DONT create a test fixture, we will drive everything directly.
+ String uri = "amqp://127.0.0.1:" + testPeer.getServerPort() + "?jms.messageIDPolicy.messageIDType=" + formatter.name();
+ JmsConnectionFactory factory = new JmsConnectionFactory(uri);
+ assertEquals(formatter.name(), ((JmsDefaultMessageIDPolicy) factory.getMessageIDPolicy()).getMessageIDType());
+
+ JmsConnection connection = (JmsConnection) factory.createConnection();
+ assertEquals(formatter.name(), ((JmsDefaultMessageIDPolicy) connection.getMessageIDPolicy()).getMessageIDBuilder().toString());
+ connection.close();
+ }
+ }
+ }
+
+ // TODO - Remove once the deprecated methods are removed.
+ @SuppressWarnings("deprecation")
+ @Test(timeout=20000)
+ public void testSetCustomMessageIDBuilderLegacy() throws Exception {
+ CustomJmsMessageIdBuilder custom = new CustomJmsMessageIdBuilder();
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
// DONT create a test fixture, we will drive everything directly.
@@ -178,8 +249,226 @@ public class ConnectionFactoryIntegrationTest extends QpidJmsTestCase {
assertEquals(custom.toString(), factory.getMessageIDType());
JmsConnection connection = (JmsConnection) factory.createConnection();
- assertEquals(custom.toString(), connection.getMessageIDBuilder().toString());
+ assertEquals(custom.toString(), ((JmsDefaultMessageIDPolicy) connection.getMessageIDPolicy()).getMessageIDBuilder().toString());
connection.close();
}
}
+
+ @Test(timeout=20000)
+ public void testSetCustomMessageIDBuilder() throws Exception {
+ CustomJmsMessageIdBuilder custom = new CustomJmsMessageIdBuilder();
+
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ // DONT create a test fixture, we will drive everything directly.
+ String uri = "amqp://127.0.0.1:" + testPeer.getServerPort();
+
+ JmsConnectionFactory factory = new JmsConnectionFactory(uri);
+ ((JmsDefaultMessageIDPolicy) factory.getMessageIDPolicy()).setMessageIDBuilder(custom);
+ assertEquals(custom.toString(), ((JmsDefaultMessageIDPolicy) factory.getMessageIDPolicy()).getMessageIDType());
+
+ JmsConnection connection = (JmsConnection) factory.createConnection();
+ assertEquals(custom.toString(), ((JmsDefaultMessageIDPolicy) connection.getMessageIDPolicy()).getMessageIDBuilder().toString());
+ connection.close();
+ }
+ }
+
+ @Test(timeout=20000)
+ public void testSetCustomMessageIDPolicy() throws Exception {
+ CustomJmsMessageIDPolicy custom = new CustomJmsMessageIDPolicy();
+
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ String uri = "amqp://127.0.0.1:" + testPeer.getServerPort();
+
+ JmsConnectionFactory factory = new JmsConnectionFactory(uri);
+ factory.setMessageIDPolicy(custom);
+ assertEquals(custom, factory.getMessageIDPolicy());
+
+ JmsConnection connection = (JmsConnection) factory.createConnection();
+ assertTrue(connection.getMessageIDPolicy() instanceof CustomJmsMessageIDPolicy);
+ assertNotSame(custom, connection.getMessageIDPolicy());
+ connection.close();
+ }
+ }
+
+ @Test(timeout=20000)
+ public void testSetCustomPrefetchPolicy() throws Exception {
+ CustomJmsPrefetchPolicy custom = new CustomJmsPrefetchPolicy();
+
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ String uri = "amqp://127.0.0.1:" + testPeer.getServerPort();
+
+ JmsConnectionFactory factory = new JmsConnectionFactory(uri);
+ factory.setPrefetchPolicy(custom);
+ assertEquals(custom, factory.getPrefetchPolicy());
+
+ JmsConnection connection = (JmsConnection) factory.createConnection();
+ assertTrue(connection.getPrefetchPolicy() instanceof CustomJmsPrefetchPolicy);
+ assertNotSame(custom, connection.getPrefetchPolicy());
+ connection.close();
+ }
+ }
+
+ @Test(timeout=20000)
+ public void testSetCustomPresettlePolicy() throws Exception {
+ CustomJmsPresettlePolicy custom = new CustomJmsPresettlePolicy();
+
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ String uri = "amqp://127.0.0.1:" + testPeer.getServerPort();
+
+ JmsConnectionFactory factory = new JmsConnectionFactory(uri);
+ factory.setPresettlePolicy(custom);
+ assertEquals(custom, factory.getPresettlePolicy());
+
+ JmsConnection connection = (JmsConnection) factory.createConnection();
+ assertTrue(connection.getPresettlePolicy() instanceof CustomJmsPresettlePolicy);
+ assertNotSame(custom, connection.getPresettlePolicy());
+ connection.close();
+ }
+ }
+
+ @Test(timeout=20000)
+ public void testSetCustomRedeliveryPolicy() throws Exception {
+ CustomJmsRedeliveryPolicy custom = new CustomJmsRedeliveryPolicy();
+
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ String uri = "amqp://127.0.0.1:" + testPeer.getServerPort();
+
+ JmsConnectionFactory factory = new JmsConnectionFactory(uri);
+ factory.setRedeliveryPolicy(custom);
+ assertEquals(custom, factory.getRedeliveryPolicy());
+
+ JmsConnection connection = (JmsConnection) factory.createConnection();
+ assertTrue(connection.getRedeliveryPolicy() instanceof CustomJmsRedeliveryPolicy);
+ assertNotSame(custom, connection.getRedeliveryPolicy());
+ connection.close();
+ }
+ }
+
+ @Test(timeout=10000)
+ public void testMessageIDPolicyCannotBeNulled() throws Exception {
+ CustomJmsMessageIDPolicy custom = new CustomJmsMessageIDPolicy();
+
+ JmsConnectionFactory factory = new JmsConnectionFactory();
+ assertTrue(factory.getMessageIDPolicy() instanceof JmsDefaultMessageIDPolicy);
+
+ factory.setMessageIDPolicy(custom);
+ assertTrue(factory.getMessageIDPolicy() instanceof CustomJmsMessageIDPolicy);
+
+ factory.setMessageIDPolicy(null);
+ assertTrue(factory.getMessageIDPolicy() instanceof JmsDefaultMessageIDPolicy);
+ }
+
+ @Test(timeout=10000)
+ public void testPrefetchPolicyCannotBeNulled() throws Exception {
+ CustomJmsPrefetchPolicy custom = new CustomJmsPrefetchPolicy();
+
+ JmsConnectionFactory factory = new JmsConnectionFactory();
+ assertTrue(factory.getPrefetchPolicy() instanceof JmsDefaultPrefetchPolicy);
+
+ factory.setPrefetchPolicy(custom);
+ assertTrue(factory.getPrefetchPolicy() instanceof CustomJmsPrefetchPolicy);
+
+ factory.setPrefetchPolicy(null);
+ assertTrue(factory.getPrefetchPolicy() instanceof JmsDefaultPrefetchPolicy);
+ }
+
+ @Test(timeout=10000)
+ public void testPresettlePolicyCannotBeNulled() throws Exception {
+ CustomJmsPresettlePolicy custom = new CustomJmsPresettlePolicy();
+
+ JmsConnectionFactory factory = new JmsConnectionFactory();
+ assertTrue(factory.getPresettlePolicy() instanceof JmsDefaultPresettlePolicy);
+
+ factory.setPresettlePolicy(custom);
+ assertTrue(factory.getPresettlePolicy() instanceof CustomJmsPresettlePolicy);
+
+ factory.setPresettlePolicy(null);
+ assertTrue(factory.getPresettlePolicy() instanceof JmsDefaultPresettlePolicy);
+ }
+
+ @Test(timeout=10000)
+ public void testRedeliveryPolicyCannotBeNulled() throws Exception {
+ CustomJmsRedeliveryPolicy custom = new CustomJmsRedeliveryPolicy();
+
+ JmsConnectionFactory factory = new JmsConnectionFactory();
+ assertTrue(factory.getRedeliveryPolicy() instanceof JmsDefaultRedeliveryPolicy);
+
+ factory.setRedeliveryPolicy(custom);
+ assertTrue(factory.getRedeliveryPolicy() instanceof CustomJmsRedeliveryPolicy);
+
+ factory.setRedeliveryPolicy(null);
+ assertTrue(factory.getRedeliveryPolicy() instanceof JmsDefaultRedeliveryPolicy);
+ }
+
+ //----- Custom Policy Objects --------------------------------------------//
+
+ private final class CustomJmsMessageIdBuilder implements JmsMessageIDBuilder {
+
+ @Override
+ public Object createMessageID(String producerId, long messageSequence) {
+ return UUID.randomUUID();
+ }
+
+ @Override
+ public String toString() {
+ return "TEST";
+ }
+ }
+
+ private class CustomJmsMessageIDPolicy implements JmsMessageIDPolicy {
+
+ @Override
+ public JmsMessageIDPolicy copy() {
+ return new CustomJmsMessageIDPolicy();
+ }
+
+ @Override
+ public JmsMessageIDBuilder getMessageIDBuilder(JmsSession session, JmsDestination destination) {
+ return JmsMessageIDBuilder.BUILTIN.UUID_STRING.createBuilder();
+ }
+ }
+
+ private class CustomJmsPrefetchPolicy implements JmsPrefetchPolicy {
+
+ @Override
+ public JmsPrefetchPolicy copy() {
+ return new CustomJmsPrefetchPolicy();
+ }
+
+ @Override
+ public int getConfiguredPrefetch(JmsSession session, JmsDestination destination, boolean durable, boolean browser) {
+ return JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_PREFETCH;
+ }
+ }
+
+ private class CustomJmsPresettlePolicy implements JmsPresettlePolicy {
+
+ @Override
+ public JmsPresettlePolicy copy() {
+ return new CustomJmsPresettlePolicy();
+ }
+
+ @Override
+ public boolean isProducerPresttled(JmsSession session, JmsDestination destination) {
+ return false;
+ }
+
+ @Override
+ public boolean isConsumerPresttled(JmsSession session, JmsDestination destination) {
+ return false;
+ }
+ }
+
+ private class CustomJmsRedeliveryPolicy implements JmsRedeliveryPolicy {
+
+ @Override
+ public JmsRedeliveryPolicy copy() {
+ return new CustomJmsRedeliveryPolicy();
+ }
+
+ @Override
+ public int getMaxRedeliveries(JmsDestination destination) {
+ return JmsDefaultRedeliveryPolicy.DEFAULT_MAX_REDELIVERIES;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/95bc1aa8/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
index 485e5d2..8a03b87 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
@@ -618,8 +618,9 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
}
}
+ // TODO - Remove when the deprecated methods are removed.
@Test(timeout=20000)
- public void testSendingMessageWithUUIDStringMessageFormat() throws Exception {
+ public void testSendingMessageWithUUIDStringMessageFormatLegacy() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
// DONT create a test fixture, we will drive everything directly.
String uri = "amqp://127.0.0.1:" + testPeer.getServerPort() + "?jms.messageIDType=UUID_STRING";
@@ -672,7 +673,61 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
}
@Test(timeout=20000)
- public void testSendingMessageWithUUIDMessageFormat() throws Exception {
+ public void testSendingMessageWithUUIDStringMessageFormat() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ // DONT create a test fixture, we will drive everything directly.
+ String uri = "amqp://127.0.0.1:" + testPeer.getServerPort() + "?jms.messageIDPolicy.messageIDType=UUID_STRING";
+ JmsConnectionFactory factory = new JmsConnectionFactory(uri);
+
+ Connection connection = factory.createConnection();
+ testPeer.expectSaslAnonymousConnect();
+ testPeer.expectBegin();
+
+ testPeer.expectBegin();
+ testPeer.expectSenderAttach();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ String queueName = "myQueue";
+ Queue queue = session.createQueue(queueName);
+ MessageProducer producer = session.createProducer(queue);
+
+ String text = "myMessage";
+ MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
+ MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
+ MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true).withMessageId(isA(String.class));
+ TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+ messageMatcher.setHeadersMatcher(headersMatcher);
+ messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
+ messageMatcher.setPropertiesMatcher(propsMatcher);
+ messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(text));
+ testPeer.expectTransfer(messageMatcher);
+ testPeer.expectClose();
+
+ Message message = session.createTextMessage(text);
+
+ assertNull("JMSMessageID should not yet be set", message.getJMSMessageID());
+
+ producer.send(message);
+
+ String jmsMessageID = message.getJMSMessageID();
+ assertNotNull("JMSMessageID should be set", jmsMessageID);
+ assertTrue("JMS 'ID:' prefix not found", jmsMessageID.startsWith("ID:"));
+
+ connection.close();
+
+ // Get the value that was actually transmitted/received, verify it is a String, compare to what we have locally
+ testPeer.waitForAllHandlersToComplete(1000);
+
+ Object receivedMessageId = propsMatcher.getReceivedMessageId();
+
+ assertTrue("Expected UUID message id to be sent", receivedMessageId instanceof String);
+ assertTrue("Expected JMSMessageId value to be present in AMQP message", jmsMessageID.endsWith(receivedMessageId.toString()));
+ }
+ }
+
+ // TODO - Remove when the deprecated methods are removed.
+ @Test(timeout=20000)
+ public void testSendingMessageWithUUIDMessageFormatLegacy() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
// DONT create a test fixture, we will drive everything directly.
String uri = "amqp://127.0.0.1:" + testPeer.getServerPort() + "?jms.messageIDType=UUID";
@@ -724,6 +779,59 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
}
}
+ @Test(timeout=20000)
+ public void testSendingMessageWithUUIDMessageFormat() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ // DONT create a test fixture, we will drive everything directly.
+ String uri = "amqp://127.0.0.1:" + testPeer.getServerPort() + "?jms.messageIDPolicy.messageIDType=UUID";
+ JmsConnectionFactory factory = new JmsConnectionFactory(uri);
+
+ Connection connection = factory.createConnection();
+ testPeer.expectSaslAnonymousConnect();
+ testPeer.expectBegin();
+
+ testPeer.expectBegin();
+ testPeer.expectSenderAttach();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ String queueName = "myQueue";
+ Queue queue = session.createQueue(queueName);
+ MessageProducer producer = session.createProducer(queue);
+
+ String text = "myMessage";
+ MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
+ MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
+ MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true).withMessageId(isA(UUID.class));
+ TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+ messageMatcher.setHeadersMatcher(headersMatcher);
+ messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
+ messageMatcher.setPropertiesMatcher(propsMatcher);
+ messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(text));
+ testPeer.expectTransfer(messageMatcher);
+ testPeer.expectClose();
+
+ Message message = session.createTextMessage(text);
+
+ assertNull("JMSMessageID should not yet be set", message.getJMSMessageID());
+
+ producer.send(message);
+
+ String jmsMessageID = message.getJMSMessageID();
+ assertNotNull("JMSMessageID should be set", jmsMessageID);
+ assertTrue("JMS 'ID:' prefix not found", jmsMessageID.startsWith("ID:"));
+
+ connection.close();
+
+ // Get the value that was actually transmitted/received, verify it is a UUID, compare to what we have locally
+ testPeer.waitForAllHandlersToComplete(1000);
+
+ Object receivedMessageId = propsMatcher.getReceivedMessageId();
+
+ assertTrue("Expected UUID message id to be sent", receivedMessageId instanceof UUID);
+ assertTrue("Expected JMSMessageId value to be present in AMQP message", jmsMessageID.endsWith(receivedMessageId.toString()));
+ }
+ }
+
/**
* Test that after sending a message with the disableMessageID hint set, the message
* object has a null JMSMessageID value, and no message-id field value was set.
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/95bc1aa8/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsProducerInfoTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsProducerInfoTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsProducerInfoTest.java
index e83e04b..f2371d0 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsProducerInfoTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsProducerInfoTest.java
@@ -25,13 +25,11 @@ import static org.junit.Assert.assertTrue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.qpid.jms.JmsTopic;
+import org.apache.qpid.jms.message.JmsMessageIDBuilder;
import org.apache.qpid.jms.util.IdGenerator;
import org.junit.Before;
import org.junit.Test;
-/**
- *
- */
public class JmsProducerInfoTest {
private JmsProducerId firstId;
@@ -53,33 +51,21 @@ public class JmsProducerInfoTest {
secondId = new JmsProducerId(secondSessionId, 2);
}
- @Test(expected=IllegalArgumentException.class)
- public void testExceptionWhenCreatedWithNullConnectionId() {
- new JmsProducerInfo(null);
- }
-
- @Test(expected=IllegalArgumentException.class)
- public void testExceptionWhenCreatedWithNullSessionInfo() {
- new JmsProducerInfo(null, 1);
+ private JmsProducerInfo createPorducerInfo(JmsProducerId producerId) {
+ return new JmsProducerInfo(producerId, JmsMessageIDBuilder.BUILTIN.DEFAULT.createBuilder());
}
@Test
public void testCreateFromProducerId() {
- JmsProducerInfo info = new JmsProducerInfo(firstId);
+ JmsProducerInfo info = createPorducerInfo(firstId);
assertSame(firstId, info.getId());
assertSame(firstId.getParentId(), info.getParentId());
assertNotNull(info.toString());
}
@Test
- public void testCreateFromSessionId() {
- JmsProducerInfo info = new JmsProducerInfo(new JmsSessionInfo(firstSessionId), 1);
- assertNotNull(info.toString());
- }
-
- @Test
public void testCopy() {
- JmsProducerInfo info = new JmsProducerInfo(firstId);
+ JmsProducerInfo info = createPorducerInfo(firstId);
info.setDestination(new JmsTopic("Test"));
JmsProducerInfo copy = info.copy();
@@ -90,8 +76,8 @@ public class JmsProducerInfoTest {
@Test
public void testCompareTo() {
- JmsProducerInfo first = new JmsProducerInfo(firstId);
- JmsProducerInfo second = new JmsProducerInfo(secondId);
+ JmsProducerInfo first = createPorducerInfo(firstId);
+ JmsProducerInfo second = createPorducerInfo(secondId);
assertEquals(-1, first.compareTo(second));
assertEquals(0, first.compareTo(first));
@@ -100,8 +86,8 @@ public class JmsProducerInfoTest {
@Test
public void testHashCode() {
- JmsProducerInfo first = new JmsProducerInfo(firstId);
- JmsProducerInfo second = new JmsProducerInfo(secondId);
+ JmsProducerInfo first = createPorducerInfo(firstId);
+ JmsProducerInfo second = createPorducerInfo(secondId);
assertEquals(first.hashCode(), first.hashCode());
assertEquals(second.hashCode(), second.hashCode());
@@ -111,8 +97,8 @@ public class JmsProducerInfoTest {
@Test
public void testEqualsCode() {
- JmsProducerInfo first = new JmsProducerInfo(firstId);
- JmsProducerInfo second = new JmsProducerInfo(secondId);
+ JmsProducerInfo first = createPorducerInfo(firstId);
+ JmsProducerInfo second = createPorducerInfo(secondId);
assertEquals(first, first);
assertEquals(second, second);
@@ -126,8 +112,7 @@ public class JmsProducerInfoTest {
@Test
public void testVisit() throws Exception {
- final JmsProducerInfo first = new JmsProducerInfo(firstId);
-
+ final JmsProducerInfo first = createPorducerInfo(firstId);
final AtomicBoolean visited = new AtomicBoolean();
first.visit(new JmsDefaultResourceVisitor() {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org