You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ma...@apache.org on 2018/08/16 19:57:35 UTC
[2/2] activemq-artemis git commit: ARTEMIS-2023 extend 1x naming to
other ops
ARTEMIS-2023 extend 1x naming to other ops
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/a0b4c4dd
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/a0b4c4dd
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/a0b4c4dd
Branch: refs/heads/master
Commit: a0b4c4dd1923497a03255d4a27d0ed9b7948931e
Parents: dd15aa8
Author: Justin Bertram <jb...@apache.org>
Authored: Mon Aug 13 21:40:52 2018 -0500
Committer: Martyn Taylor <mt...@redhat.com>
Committed: Thu Aug 16 20:57:11 2018 +0100
----------------------------------------------------------------------
.../artemis/jms/client/ActiveMQDestination.java | 61 +++++++++++++++-----
.../artemis/jms/client/ActiveMQMessage.java | 54 +++++++++++++++--
.../jms/client/ActiveMQMessageConsumer.java | 4 ++
.../artemis/jms/client/ActiveMQQueue.java | 2 +-
.../jms/client/ActiveMQQueueBrowser.java | 11 +++-
.../artemis/jms/client/ActiveMQSession.java | 6 +-
.../jms/client/JMSMessageListenerWrapper.java | 4 ++
.../artemis/ra/ActiveMQResourceAdapter.java | 2 +
.../artemis/ra/inflow/ActiveMQActivation.java | 1 +
.../integration/jms/SimpleJNDIClientTest.java | 1 -
10 files changed, 121 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a0b4c4dd/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java
index 81aada1..6d1b409 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java
@@ -53,6 +53,10 @@ public class ActiveMQDestination extends JNDIStorable implements Destination, Se
return input.replace("\\", "\\\\").replace(".", "\\.");
}
+ protected void setName(String name) {
+ this.name = name;
+ }
+
/**
* Static helper method for working with destinations.
*/
@@ -84,21 +88,43 @@ public class ActiveMQDestination extends JNDIStorable implements Destination, Se
}
public static Destination fromPrefixedName(final String name) {
+ return fromPrefixedName(name, name);
+ }
+
+ public static Destination fromPrefixedName(final String addr, final String name) {
+
+ ActiveMQDestination destination;
+ if (addr.startsWith(ActiveMQDestination.QUEUE_QUALIFIED_PREFIX)) {
+ String address = addr.substring(ActiveMQDestination.QUEUE_QUALIFIED_PREFIX.length());
+ destination = createQueue(address);
+ } else if (addr.startsWith(ActiveMQDestination.TOPIC_QUALIFIED_PREFIX)) {
+ String address = addr.substring(ActiveMQDestination.TOPIC_QUALIFIED_PREFIX.length());
+ destination = createTopic(address);
+ } else if (addr.startsWith(ActiveMQDestination.TEMP_QUEUE_QUALIFED_PREFIX)) {
+ String address = addr.substring(ActiveMQDestination.TEMP_QUEUE_QUALIFED_PREFIX.length());
+ destination = new ActiveMQTemporaryQueue(address, null);
+ } else if (addr.startsWith(ActiveMQDestination.TEMP_TOPIC_QUALIFED_PREFIX)) {
+ String address = addr.substring(ActiveMQDestination.TEMP_TOPIC_QUALIFED_PREFIX.length());
+ destination = new ActiveMQTemporaryTopic(address, null);
+ } else {
+ destination = new ActiveMQDestination(addr, TYPE.DESTINATION, null);
+ }
+
+ String unprefixedName = name;
+
if (name.startsWith(ActiveMQDestination.QUEUE_QUALIFIED_PREFIX)) {
- String address = name.substring(ActiveMQDestination.QUEUE_QUALIFIED_PREFIX.length());
- return createQueue(address);
+ unprefixedName = name.substring(ActiveMQDestination.QUEUE_QUALIFIED_PREFIX.length());
} else if (name.startsWith(ActiveMQDestination.TOPIC_QUALIFIED_PREFIX)) {
- String address = name.substring(ActiveMQDestination.TOPIC_QUALIFIED_PREFIX.length());
- return createTopic(address);
+ unprefixedName = name.substring(ActiveMQDestination.TOPIC_QUALIFIED_PREFIX.length());
} else if (name.startsWith(ActiveMQDestination.TEMP_QUEUE_QUALIFED_PREFIX)) {
- String address = name.substring(ActiveMQDestination.TEMP_QUEUE_QUALIFED_PREFIX.length());
- return new ActiveMQTemporaryQueue(address, null);
+ unprefixedName = name.substring(ActiveMQDestination.TEMP_QUEUE_QUALIFED_PREFIX.length());
} else if (name.startsWith(ActiveMQDestination.TEMP_TOPIC_QUALIFED_PREFIX)) {
- String address = name.substring(ActiveMQDestination.TEMP_TOPIC_QUALIFED_PREFIX.length());
- return new ActiveMQTemporaryTopic(address, null);
- } else {
- return new ActiveMQDestination(name, TYPE.DESTINATION, null);
+ unprefixedName = name.substring(ActiveMQDestination.TEMP_TOPIC_QUALIFED_PREFIX.length());
}
+
+ destination.setName(unprefixedName);
+
+ return destination;
}
public static SimpleString createQueueNameForSubscription(final boolean isDurable,
@@ -274,10 +300,6 @@ public class ActiveMQDestination extends JNDIStorable implements Destination, Se
@Deprecated
private String address;
- /**
- * The "JMS" name of the destination. Needed for serialization backwards compatibility.
- */
- @Deprecated
private String name;
private final boolean temporary;
@@ -313,7 +335,6 @@ public class ActiveMQDestination extends JNDIStorable implements Destination, Se
this.queue = TYPE.isQueue(type);
}
- @Deprecated
protected ActiveMQDestination(final String address,
final String name,
final TYPE type,
@@ -337,6 +358,16 @@ public class ActiveMQDestination extends JNDIStorable implements Destination, Se
setSimpleAddress(SimpleString.toSimpleString(address));
}
+ @Override
+ public String toString() {
+ return "ActiveMQDestination [address=" + simpleAddress.toString() +
+ ", name=" +
+ name +
+ ", type =" +
+ thetype +
+ "]";
+ }
+
public void setSimpleAddress(SimpleString address) {
if (address == null) {
throw new IllegalArgumentException("address cannot be null");
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a0b4c4dd/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java
index 18f8000..ff7da00 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java
@@ -46,6 +46,7 @@ import org.apache.activemq.artemis.api.jms.ActiveMQJMSConstants;
import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
import org.apache.activemq.artemis.core.client.impl.ClientMessageInternal;
import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.utils.UUID;
@@ -65,6 +66,11 @@ public class ActiveMQMessage implements javax.jms.Message {
// Constants -----------------------------------------------------
public static final byte TYPE = org.apache.activemq.artemis.api.core.Message.DEFAULT_TYPE;
+ public static final SimpleString OLD_QUEUE_QUALIFIED_PREFIX = SimpleString.toSimpleString(ActiveMQDestination.QUEUE_QUALIFIED_PREFIX + PacketImpl.OLD_QUEUE_PREFIX);
+ public static final SimpleString OLD_TEMP_QUEUE_QUALIFED_PREFIX = SimpleString.toSimpleString(ActiveMQDestination.TEMP_QUEUE_QUALIFED_PREFIX + PacketImpl.OLD_TEMP_QUEUE_PREFIX);
+ public static final SimpleString OLD_TOPIC_QUALIFIED_PREFIX = SimpleString.toSimpleString(ActiveMQDestination.TOPIC_QUALIFIED_PREFIX + PacketImpl.OLD_TOPIC_PREFIX);
+ public static final SimpleString OLD_TEMP_TOPIC_QUALIFED_PREFIX = SimpleString.toSimpleString(ActiveMQDestination.TEMP_TOPIC_QUALIFED_PREFIX + PacketImpl.OLD_TEMP_TOPIC_PREFIX);
+
public static Map<String, Object> coreMaptoJMSMap(final Map<String, Object> coreMessage) {
Map<String, Object> jmsMessage = new HashMap<>();
@@ -203,6 +209,8 @@ public class ActiveMQMessage implements javax.jms.Message {
private boolean clientAck;
+ private boolean enable1xPrefixes;
+
private long jmsDeliveryTime;
// Constructors --------------------------------------------------
@@ -358,11 +366,23 @@ public class ActiveMQMessage implements javax.jms.Message {
@Override
public Destination getJMSReplyTo() throws JMSException {
if (replyTo == null) {
-
- SimpleString repl = MessageUtil.getJMSReplyTo(message);
-
- if (repl != null) {
- replyTo = ActiveMQDestination.fromPrefixedName(repl.toString());
+ SimpleString address = MessageUtil.getJMSReplyTo(message);
+ if (address != null) {
+ String name = address.toString();
+
+ // swap the old prefixes for the new ones so the proper destination type gets created
+ if (enable1xPrefixes) {
+ if (address.startsWith(OLD_QUEUE_QUALIFIED_PREFIX)) {
+ name = address.subSeq(OLD_QUEUE_QUALIFIED_PREFIX.length(), address.length()).toString();
+ } else if (address.startsWith(OLD_TEMP_QUEUE_QUALIFED_PREFIX)) {
+ name = address.subSeq(OLD_TEMP_QUEUE_QUALIFED_PREFIX.length(), address.length()).toString();
+ } else if (address.startsWith(OLD_TOPIC_QUALIFIED_PREFIX)) {
+ name = address.subSeq(OLD_TOPIC_QUALIFIED_PREFIX.length(), address.length()).toString();
+ } else if (address.startsWith(OLD_TEMP_TOPIC_QUALIFED_PREFIX)) {
+ name = address.subSeq(OLD_TEMP_TOPIC_QUALIFED_PREFIX.length(), address.length()).toString();
+ }
+ }
+ replyTo = ActiveMQDestination.fromPrefixedName(address.toString(), name);
}
}
return replyTo;
@@ -401,6 +421,20 @@ public class ActiveMQMessage implements javax.jms.Message {
public Destination getJMSDestination() throws JMSException {
if (dest == null) {
SimpleString address = message.getAddressSimpleString();
+ SimpleString name = address;
+
+ if (address != null & enable1xPrefixes) {
+ if (address.startsWith(PacketImpl.OLD_QUEUE_PREFIX)) {
+ name = address.subSeq(PacketImpl.OLD_QUEUE_PREFIX.length(), address.length());
+ } else if (address.startsWith(PacketImpl.OLD_TEMP_QUEUE_PREFIX)) {
+ name = address.subSeq(PacketImpl.OLD_TEMP_QUEUE_PREFIX.length(), address.length());
+ } else if (address.startsWith(PacketImpl.OLD_TOPIC_PREFIX)) {
+ name = address.subSeq(PacketImpl.OLD_TOPIC_PREFIX.length(), address.length());
+ } else if (address.startsWith(PacketImpl.OLD_TEMP_TOPIC_PREFIX)) {
+ name = address.subSeq(PacketImpl.OLD_TEMP_TOPIC_PREFIX.length(), address.length());
+ }
+ }
+
if (address == null) {
dest = null;
} else if (RoutingType.ANYCAST.equals(message.getRoutingType())) {
@@ -408,7 +442,11 @@ public class ActiveMQMessage implements javax.jms.Message {
} else if (RoutingType.MULTICAST.equals(message.getRoutingType())) {
dest = ActiveMQDestination.createTopic(address);
} else {
- dest = ActiveMQDestination.fromPrefixedName(address.toString());
+ dest = (ActiveMQDestination) ActiveMQDestination.fromPrefixedName(address.toString());
+ }
+
+ if (name != null) {
+ ((ActiveMQDestination) dest).setName(name.toString());
}
}
@@ -865,6 +903,10 @@ public class ActiveMQMessage implements javax.jms.Message {
}
}
+ public void setEnable1xPrefixes(boolean enable1xPrefixes) {
+ this.enable1xPrefixes = enable1xPrefixes;
+ }
+
@Override
public String toString() {
StringBuffer sb = new StringBuffer("ActiveMQMessage[");
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a0b4c4dd/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java
index 4664bb9..8fabe8b 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java
@@ -220,6 +220,10 @@ public final class ActiveMQMessageConsumer implements QueueReceiver, TopicSubscr
coreMessage.getType() == ActiveMQObjectMessage.TYPE;
jmsMsg = ActiveMQMessage.createMessage(coreMessage, needSession ? coreSession : null, options);
+ if (session.isEnable1xPrefixes()) {
+ jmsMsg.setEnable1xPrefixes(true);
+ }
+
try {
jmsMsg.doBeforeReceive();
} catch (IndexOutOfBoundsException ioob) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a0b4c4dd/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQQueue.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQQueue.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQQueue.java
index f2680f8..f1b69d7 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQQueue.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQQueue.java
@@ -72,7 +72,7 @@ public class ActiveMQQueue extends ActiveMQDestination implements Queue {
@Override
public String getQueueName() {
- return getAddress();
+ return getName();
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a0b4c4dd/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQQueueBrowser.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQQueueBrowser.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQQueueBrowser.java
index bb58f3d..716d044 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQQueueBrowser.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQQueueBrowser.java
@@ -49,18 +49,22 @@ public final class ActiveMQQueueBrowser implements QueueBrowser {
private SimpleString filterString;
+ private final boolean enable1xPrefixes;
+
// Constructors ---------------------------------------------------------------------------------
protected ActiveMQQueueBrowser(final ConnectionFactoryOptions options,
final ActiveMQQueue queue,
final String messageSelector,
- final ClientSession session) throws JMSException {
+ final ClientSession session,
+ final boolean enable1xPrefixes) throws JMSException {
this.options = options;
this.session = session;
this.queue = queue;
if (messageSelector != null) {
filterString = new SimpleString(SelectorTranslator.convertToActiveMQFilterString(messageSelector));
}
+ this.enable1xPrefixes = enable1xPrefixes;
}
// QueueBrowser implementation -------------------------------------------------------------------
@@ -138,6 +142,11 @@ public final class ActiveMQQueueBrowser implements QueueBrowser {
ClientMessage next = current;
current = null;
msg = ActiveMQMessage.createMessage(next, session, options);
+
+ if (enable1xPrefixes) {
+ msg.setEnable1xPrefixes(true);
+ }
+
try {
msg.doBeforeReceive();
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a0b4c4dd/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java
index 8d34ca1..278b429 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java
@@ -879,7 +879,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
throw JMSExceptionHelper.convertFromActiveMQException(e);
}
- return new ActiveMQQueueBrowser(options, (ActiveMQQueue) activeMQDestination, filterString, session);
+ return new ActiveMQQueueBrowser(options, (ActiveMQQueue) activeMQDestination, filterString, session, enable1xPrefixes);
}
@@ -1126,6 +1126,10 @@ public class ActiveMQSession implements QueueSession, TopicSession {
consumers.remove(consumer);
}
+ public boolean isEnable1xPrefixes() {
+ return enable1xPrefixes;
+ }
+
// Package protected ---------------------------------------------
void deleteQueue(final SimpleString queueName) throws JMSException {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a0b4c4dd/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSMessageListenerWrapper.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSMessageListenerWrapper.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSMessageListenerWrapper.java
index 5d9f6ed..0d2420b 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSMessageListenerWrapper.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSMessageListenerWrapper.java
@@ -82,6 +82,10 @@ public class JMSMessageListenerWrapper implements MessageHandler {
msg.setClientAcknowledge();
}
+ if (session.isEnable1xPrefixes()) {
+ msg.setEnable1xPrefixes(true);
+ }
+
try {
msg.doBeforeReceive();
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a0b4c4dd/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQResourceAdapter.java
----------------------------------------------------------------------
diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQResourceAdapter.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQResourceAdapter.java
index f3e4614..0b4cbb4 100644
--- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQResourceAdapter.java
+++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQResourceAdapter.java
@@ -1793,6 +1793,7 @@ public class ActiveMQResourceAdapter implements ResourceAdapter, Serializable {
}
cf.setEnableSharedClientID(true);
+ cf.setEnable1xPrefixes(raProperties.isEnable1xPrefixes() == null ? false : raProperties.isEnable1xPrefixes());
setParams(cf, overrideProperties);
return cf;
}
@@ -1859,6 +1860,7 @@ public class ActiveMQResourceAdapter implements ResourceAdapter, Serializable {
cf.setReconnectAttempts(0);
cf.setInitialConnectAttempts(0);
+ cf.setEnable1xPrefixes(raProperties.isEnable1xPrefixes() == null ? false : raProperties.isEnable1xPrefixes());
cf.setEnableSharedClientID(true);
return cf;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a0b4c4dd/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java
----------------------------------------------------------------------
diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java
index 9d4b096..863a252 100644
--- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java
+++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java
@@ -455,6 +455,7 @@ public class ActiveMQActivation {
// to make sure we won't close anyone's connection factory when we stop the MDB
factory = ActiveMQJMSClient.createConnectionFactory(((ActiveMQConnectionFactory) fac).toURI().toString(), "internalConnection");
factory.setEnableSharedClientID(true);
+ factory.setEnable1xPrefixes(((ActiveMQResourceAdapter) fac).isEnable1xPrefixes());
} else {
factory = ra.newConnectionFactory(spec);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a0b4c4dd/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/SimpleJNDIClientTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/SimpleJNDIClientTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/SimpleJNDIClientTest.java
index dc48d9a..2acdb0b 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/SimpleJNDIClientTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/SimpleJNDIClientTest.java
@@ -676,7 +676,6 @@ public class SimpleJNDIClientTest extends ActiveMQTestBase {
Context ctx = new InitialContext(props);
ConnectionFactory connectionFactory = (ConnectionFactory) ctx.lookup("ConnectionFactory");
- ((ActiveMQConnectionFactory)connectionFactory).setEnable1xPrefixes(true);
Connection connection = connectionFactory.createConnection();
Session session = connection.createSession();