You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2018/05/11 16:31:59 UTC
[4/6] qpid-broker-j git commit: QPID-8182: [End to End Conversion
Tests] Extend test mechanism to allow testing of JMS provider assigned
message ids. Add test cases too.
QPID-8182: [End to End Conversion Tests] Extend test mechanism to allow testing of JMS provider assigned message ids. Add test cases too.
(cherry picked from commit d293206d72989f1004f8fa3577f36d2da104f615)
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/40df8179
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/40df8179
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/40df8179
Branch: refs/heads/7.0.x
Commit: 40df8179df216530caa418197175fdf8bba8a0a3
Parents: d79537d
Author: Keith Wall <kw...@apache.org>
Authored: Mon May 7 07:39:27 2018 +0100
Committer: Alex Rudyy <or...@apache.org>
Committed: Fri May 11 17:30:23 2018 +0100
----------------------------------------------------------------------
.../ClientInstructionBuilder.java | 7 +
.../EndToEndConversionTestBase.java | 67 +++--
.../client/AugumentConnectionUrl.java | 42 +++
.../end_to_end_conversion/client/Client.java | 162 ++++++++++-
.../client/ClientMessage.java | 37 +++
.../client/ClientResult.java | 18 +-
.../SimpleConversionTest.java | 289 ++++++++++++++++++-
7 files changed, 579 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/40df8179/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/ClientInstructionBuilder.java
----------------------------------------------------------------------
diff --git a/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/ClientInstructionBuilder.java b/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/ClientInstructionBuilder.java
index 3559693..2d208f4 100644
--- a/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/ClientInstructionBuilder.java
+++ b/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/ClientInstructionBuilder.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import org.apache.qpid.systests.end_to_end_conversion.client.AugumentConnectionUrl;
import org.apache.qpid.systests.end_to_end_conversion.client.ClientInstruction;
import org.apache.qpid.systests.end_to_end_conversion.client.ConfigureDestination;
import org.apache.qpid.systests.end_to_end_conversion.client.MessageDescription;
@@ -35,6 +36,12 @@ public class ClientInstructionBuilder
private List<ClientInstruction> _clientInstructions = new ArrayList<>();
private MessageDescription _latestMessageDescription;
+ public ClientInstructionBuilder configureConnectionUrl(final Map<String, String> connectionUrlConfig)
+ {
+ _clientInstructions.add(new AugumentConnectionUrl(connectionUrlConfig));
+ return this;
+ }
+
public ClientInstructionBuilder publishMessage(final String destinationJndiName)
{
return publishMessage(destinationJndiName, new MessageDescription());
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/40df8179/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/EndToEndConversionTestBase.java
----------------------------------------------------------------------
diff --git a/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/EndToEndConversionTestBase.java b/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/EndToEndConversionTestBase.java
index 66e601a..1e52152 100644
--- a/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/EndToEndConversionTestBase.java
+++ b/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/EndToEndConversionTestBase.java
@@ -48,6 +48,7 @@ import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;
import org.apache.qpid.server.model.Protocol;
+import org.apache.qpid.systests.end_to_end_conversion.client.AugumentConnectionUrl;
import org.apache.qpid.systests.end_to_end_conversion.client.Client;
import org.apache.qpid.systests.end_to_end_conversion.client.ClientInstruction;
import org.apache.qpid.systests.end_to_end_conversion.client.ClientResult;
@@ -94,7 +95,7 @@ public class EndToEndConversionTestBase extends BrokerAdminUsingTestBase
System.out.println("LQDEBUG: " + ClasspathQuery.getCacheStats());
}
- protected ListenableFuture<?> runPublisher(final List<ClientInstruction> clientInstructions)
+ protected ListenableFuture<ClientResult> runPublisher(final List<ClientInstruction> clientInstructions)
{
List<String> gavs = Arrays.asList(System.getProperty("qpid.systests.end_to_end_conversion.publisherGavs",
"org.apache.qpid:qpid-jms-client:LATEST")
@@ -105,11 +106,11 @@ public class EndToEndConversionTestBase extends BrokerAdminUsingTestBase
return _executorService.submit(() -> {
Thread.currentThread().setName("Publisher");
- runClient(gavs, additionalJavaArgs, clientInstructions);
+ return runClient(gavs, additionalJavaArgs, clientInstructions);
});
}
- protected ListenableFuture<?> runSubscriber(final List<ClientInstruction> clientInstructions)
+ protected ListenableFuture<ClientResult> runSubscriber(final List<ClientInstruction> clientInstructions)
{
List<String> gavs = Arrays.asList(System.getProperty("qpid.systests.end_to_end_conversion.subscriberGavs",
"org.apache.qpid:qpid-client:LATEST,org.apache.geronimo.specs:geronimo-jms_1.1_spec:1.1.1")
@@ -121,11 +122,11 @@ public class EndToEndConversionTestBase extends BrokerAdminUsingTestBase
return _executorService.submit(() -> {
Thread.currentThread().setName("Subscriber");
- runClient(gavs, additionalJavaArgs, clientInstructions);
+ return runClient(gavs, additionalJavaArgs, clientInstructions);
});
}
- private List<ClientInstruction> amendClientInstructions(final List<ClientInstruction> clientInstructions,
+ private List<ClientInstruction> amendClientInstructions(List<ClientInstruction> clientInstructions,
final boolean amqp0xClient)
{
if (clientInstructions.isEmpty())
@@ -146,18 +147,27 @@ public class EndToEndConversionTestBase extends BrokerAdminUsingTestBase
}
}
+ List<AugumentConnectionUrl> configUrls = clientInstructions.stream()
+ .filter(AugumentConnectionUrl.class::isInstance)
+ .map(AugumentConnectionUrl.class::cast)
+ .collect(Collectors.toList());
+
final String contextFactory;
final String connectionUrl;
if (amqp0xClient)
{
contextFactory = getAmqp0xContextFactory();
- connectionUrl = getAmqp0xConnectionUrl();
+ connectionUrl = getAmqp0xConnectionUrl(configUrls);
}
else
{
contextFactory = getAmqp10ContextFactory();
- connectionUrl = getAmqp10ConnectionUrl();
+ connectionUrl = getAmqp10ConnectionUrl(configUrls);
}
+
+ clientInstructions = new ArrayList<>(clientInstructions);
+ clientInstructions.removeAll(configUrls);
+
ConfigureJndiContext jndiContext = new ConfigureJndiContext(contextFactory, connectionUrl);
List<ClientInstruction> instructions = new ArrayList<>();
instructions.add(jndiContext);
@@ -209,12 +219,13 @@ public class EndToEndConversionTestBase extends BrokerAdminUsingTestBase
return Protocol.AMQP_0_8;
}
}
- throw new RuntimeException("Unable to determine client protocol version");
+ throw new RuntimeException(String.format("Unable to determine client protocol version. Addition args are : "
+ + "[%s]", additionalArgs));
}
- private void runClient(final Collection<String> clientGavs,
- final List<String> additionalJavaArguments,
- final List<ClientInstruction> jmsInstructions)
+ private ClientResult runClient(final Collection<String> clientGavs,
+ final List<String> additionalJavaArguments,
+ final List<ClientInstruction> jmsInstructions)
{
final List<ClientInstruction> clientInstructions = amendClientInstructions(jmsInstructions,
isAmqp0xClient(clientGavs));
@@ -252,11 +263,12 @@ public class EndToEndConversionTestBase extends BrokerAdminUsingTestBase
final Object result = inputStream.readObject();
if (result instanceof ClientResult)
{
- final ClientResult publisherResult = (ClientResult) result;
- if (publisherResult.getException() != null)
+ final ClientResult clientResult = (ClientResult) result;
+ if (clientResult.getException() != null)
{
- throw publisherResult.getException();
+ throw clientResult.getException();
}
+ return clientResult;
}
else
{
@@ -269,10 +281,9 @@ public class EndToEndConversionTestBase extends BrokerAdminUsingTestBase
p.waitFor();
loggingThread.flush();
loggingThread.stop();
+ LOGGER.debug("client process finished exit value: {}", p.exitValue());
}
}
-
- LOGGER.debug("client process finished exit value: {}", p.exitValue());
}
catch (RuntimeException e)
{
@@ -295,11 +306,22 @@ public class EndToEndConversionTestBase extends BrokerAdminUsingTestBase
return "org.apache.qpid.jndi.PropertiesFileInitialContextFactory";
}
- private String getAmqp0xConnectionUrl()
+ private String getAmqp0xConnectionUrl(final List<AugumentConnectionUrl> configUrls)
{
InetSocketAddress brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
int port = brokerAddress.getPort();
String hostString = "localhost";
+
+ if (!configUrls.isEmpty())
+ {
+ for (final AugumentConnectionUrl configUrl : configUrls)
+ {
+ if (!configUrl.getConnectionUrlConfig().isEmpty())
+ {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+ }
+ }
return String.format("amqp://clientid/?brokerlist='tcp://%s:%d'", hostString, port);
}
@@ -308,13 +330,20 @@ public class EndToEndConversionTestBase extends BrokerAdminUsingTestBase
return "org.apache.qpid.jms.jndi.JmsInitialContextFactory";
}
- private String getAmqp10ConnectionUrl()
+ private String getAmqp10ConnectionUrl(final List<AugumentConnectionUrl> configUrls)
{
InetSocketAddress brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
int port = brokerAddress.getPort();
String hostString = "localhost";
int connectTimeout = 30000;
- return String.format("amqp://%s:%d?jms.connectTimeout=%d", hostString, port, connectTimeout);
+
+ String additional = configUrls.stream()
+ .map(i -> i.getConnectionUrlConfig().entrySet())
+ .flatMap(Collection::stream)
+ .map(e -> String.format("%s=%s", e.getKey(), e.getValue()))
+ .collect(Collectors.joining("&", "&", ""));
+
+ return String.format("amqp://%s:%d?jms.connectTimeout=%d%s", hostString, port, connectTimeout, additional);
}
private boolean isAmqp0xClient(final Collection<String> gavs)
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/40df8179/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/client/AugumentConnectionUrl.java
----------------------------------------------------------------------
diff --git a/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/client/AugumentConnectionUrl.java b/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/client/AugumentConnectionUrl.java
new file mode 100644
index 0000000..df7137e
--- /dev/null
+++ b/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/client/AugumentConnectionUrl.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.systests.end_to_end_conversion.client;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+public class AugumentConnectionUrl implements ClientInstruction
+{
+ private Map<String, String> _connectionUrlConfig;
+
+ public AugumentConnectionUrl(final Map<String, String> connectionUrlConfig)
+ {
+
+ _connectionUrlConfig = new HashMap<>(connectionUrlConfig);
+ }
+
+
+ public Map<String, String> getConnectionUrlConfig()
+ {
+ return Collections.unmodifiableMap(_connectionUrlConfig);
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/40df8179/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/client/Client.java
----------------------------------------------------------------------
diff --git a/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/client/Client.java b/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/client/Client.java
index b6ad152..9e53200 100644
--- a/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/client/Client.java
+++ b/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/client/Client.java
@@ -26,6 +26,7 @@ import java.io.PrintWriter;
import java.io.Serializable;
import java.io.StringWriter;
import java.net.Socket;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Hashtable;
import java.util.List;
@@ -83,6 +84,7 @@ public class Client
}
System.out.println(String.format("Received instructions : %s", instructions.toString()));
+ List<ClientMessage> clientMessages = new ArrayList<>();
if (!instructions.isEmpty())
{
String connectionUrl = null;
@@ -111,7 +113,12 @@ public class Client
try
{
connection.start();
- handleInstructions(context, connection, instructions.subList(i, instructions.size()));
+ List<ClientMessage> messages = handleInstructions(context,
+ connection,
+ instructions.subList(i,
+ instructions
+ .size()));
+ clientMessages.addAll(messages);
}
finally
{
@@ -122,7 +129,7 @@ public class Client
}
}
System.out.println("Finished successfully");
- objectOutputStream.writeObject(new ClientResult());
+ objectOutputStream.writeObject(new ClientResult(clientMessages));
}
catch (VerificationException e)
{
@@ -153,21 +160,23 @@ public class Client
return sw.toString();
}
- private void handleInstructions(final Context context,
- final Connection connection,
- final List<ClientInstruction> instructions) throws Exception
+ private List<ClientMessage> handleInstructions(final Context context,
+ final Connection connection,
+ final List<ClientInstruction> instructions) throws Exception
{
+ List<ClientMessage> clientMessages = new ArrayList<>(instructions.size());
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
try
{
for (ClientInstruction instruction : instructions)
{
System.out.println(String.format("Process instruction: %s", instruction));
+ final ClientMessage clientMessage;
if (instruction instanceof MessagingInstruction.PublishMessage)
{
final MessagingInstruction.PublishMessage publishInstruction =
(MessagingInstruction.PublishMessage) instruction;
- publishMessage(context, session, publishInstruction);
+ clientMessage = publishMessage(context, session, publishInstruction);
}
else if (instruction instanceof MessagingInstruction.ReceiveMessage)
{
@@ -176,24 +185,26 @@ public class Client
final Destination destination =
(Destination) context.lookup(receiveInstruction.getDestinationJndiName());
final MessageDescription messageDescription = receiveInstruction.getMessageDescription();
- receiveMessage(session, destination, messageDescription);
+ clientMessage = receiveMessage(session, destination, messageDescription);
}
else
{
throw new RuntimeException(String.format("Unknown jmsInstruction class: '%s'",
instruction.getClass().getName()));
}
+ clientMessages.add(clientMessage);
}
}
finally
{
session.close();
}
+ return clientMessages;
}
- private void receiveMessage(final Session session,
- final Destination queue,
- final MessageDescription messageDescription) throws Exception
+ private ClientMessage receiveMessage(final Session session,
+ final Destination queue,
+ final MessageDescription messageDescription) throws Exception
{
final Message message;
MessageConsumer consumer = session.createConsumer(queue);
@@ -215,11 +226,14 @@ public class Client
message.getJMSReplyTo(),
messageDescription.getHeader(MessageDescription.MessageHeader.CORRELATION_ID));
}
+
+ return buildClientMessage(message);
}
- private void publishMessage(final Context context,
- final Session session,
- final MessagingInstruction.PublishMessage publishMessageInstruction) throws Exception
+ private ClientMessage publishMessage(final Context context,
+ final Session session,
+ final MessagingInstruction.PublishMessage publishMessageInstruction)
+ throws Exception
{
final MessageDescription messageDescription = publishMessageInstruction.getMessageDescription();
@@ -286,6 +300,9 @@ public class Client
replyToConsumer.close();
}
}
+
+ return buildClientMessage(message);
+
}
private void receiveReply(final MessageConsumer consumer, final Serializable expectedCorrelationId)
@@ -338,4 +355,123 @@ public class Client
producer.close();
}
}
+
+ private ClientMessage buildClientMessage(final Message message) throws JMSException
+ {
+ String jmsMessageID = message.getJMSMessageID();
+ String jmsCorrelationID = message.getJMSCorrelationID();
+ byte[] jmsCorrelationIDAsBytes;
+ try
+ {
+ jmsCorrelationIDAsBytes = message.getJMSCorrelationIDAsBytes();
+ }
+ // NPE is thrown in 6.1.x JMS AMQP 0-x client on attempt to retrieve correlation ID when it is not set
+ // The issue was fixed in 6.3.0 client as part of QPID-7897
+ catch (JMSException | NullPointerException e)
+ {
+ jmsCorrelationIDAsBytes = null;
+ }
+ long jmsTimestamp = message.getJMSTimestamp();
+ int jmsDeliveryMode = message.getJMSDeliveryMode();
+ boolean jmsRedelivered = message.getJMSRedelivered();
+ String jmsType = message.getJMSType();
+ long jmsExpiration = message.getJMSExpiration();
+ int jmsPriority = message.getJMSPriority();
+
+ return new JMSMessageAdaptor(jmsMessageID,
+ jmsTimestamp,
+ jmsCorrelationID,
+ jmsCorrelationIDAsBytes,
+ jmsDeliveryMode,
+ jmsRedelivered,
+ jmsType,
+ jmsExpiration,
+ jmsPriority);
+ }
+
+ private static class JMSMessageAdaptor implements ClientMessage
+ {
+ private final String _jmsMessageID;
+ private final long _jmsTimestamp;
+ private final String _jmsCorrelationID;
+ private final byte[] _jmsCorrelationIDAsBytes;
+ private final int _jmsDeliveryMode;
+ private final boolean _jmsRedelivered;
+ private final String _jmsType;
+ private final long _jmsExpiration;
+ private final int _jmsPriority;
+
+ JMSMessageAdaptor(final String jmsMessageID,
+ final long jmsTimestamp,
+ final String jmsCorrelationID,
+ final byte[] jmsCorrelationIDAsBytes,
+ final int jmsDeliveryMode,
+ final boolean jmsRedelivered,
+ final String jmsType, final long jmsExpiration, final int jmsPriority)
+ {
+ _jmsMessageID = jmsMessageID;
+ _jmsTimestamp = jmsTimestamp;
+ _jmsCorrelationID = jmsCorrelationID;
+ _jmsCorrelationIDAsBytes = jmsCorrelationIDAsBytes;
+ _jmsDeliveryMode = jmsDeliveryMode;
+ _jmsRedelivered = jmsRedelivered;
+ _jmsType = jmsType;
+ _jmsExpiration = jmsExpiration;
+ _jmsPriority = jmsPriority;
+ }
+
+ @Override
+ public String getJMSMessageID()
+ {
+ return _jmsMessageID;
+ }
+
+ @Override
+ public long getJMSTimestamp()
+ {
+ return _jmsTimestamp;
+ }
+
+ @Override
+ public String getJMSCorrelationID()
+ {
+ return _jmsCorrelationID;
+ }
+
+ @Override
+ public byte[] getJMSCorrelationIDAsBytes()
+ {
+ return _jmsCorrelationIDAsBytes;
+ }
+
+ @Override
+ public int getJMSDeliveryMode()
+ {
+ return _jmsDeliveryMode;
+ }
+
+ @Override
+ public boolean getJMSRedelivered()
+ {
+ return _jmsRedelivered;
+ }
+
+ @Override
+ public String getJMSType()
+ {
+ return _jmsType;
+ }
+
+ @Override
+ public long getJMSExpiration()
+ {
+ return _jmsExpiration;
+ }
+
+ @Override
+ public int getJMSPriority()
+ {
+ return _jmsPriority;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/40df8179/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/client/ClientMessage.java
----------------------------------------------------------------------
diff --git a/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/client/ClientMessage.java b/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/client/ClientMessage.java
new file mode 100644
index 0000000..d3eb146
--- /dev/null
+++ b/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/client/ClientMessage.java
@@ -0,0 +1,37 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.systests.end_to_end_conversion.client;
+
+import java.io.Serializable;
+
+public interface ClientMessage extends Serializable
+{
+ String getJMSMessageID();
+ long getJMSTimestamp();
+ String getJMSCorrelationID();
+ byte[] getJMSCorrelationIDAsBytes();
+ int getJMSDeliveryMode();
+ boolean getJMSRedelivered();
+ String getJMSType();
+ long getJMSExpiration();
+ int getJMSPriority();
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/40df8179/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/client/ClientResult.java
----------------------------------------------------------------------
diff --git a/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/client/ClientResult.java b/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/client/ClientResult.java
index f8eef68..ff91780 100644
--- a/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/client/ClientResult.java
+++ b/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/client/ClientResult.java
@@ -21,23 +21,33 @@
package org.apache.qpid.systests.end_to_end_conversion.client;
import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
public class ClientResult implements Serializable
{
private final Exception _exception;
+ private final List<ClientMessage> _clientMessages;
- public ClientResult()
+ public ClientResult(final Exception exception)
{
- this(null);
+ _exception = exception;
+ _clientMessages = Collections.emptyList();
}
- public ClientResult(final Exception exception)
+ public ClientResult(final List<ClientMessage> clientMessages)
{
- _exception = exception;
+ _exception = null;
+ _clientMessages = clientMessages;
}
public Exception getException()
{
return _exception;
}
+
+ public List<ClientMessage> getClientMessages()
+ {
+ return Collections.unmodifiableList(_clientMessages);
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/40df8179/systests/end-to-end-conversion-tests/src/test/java/org/apache/qpid/systests/end_to_end_conversion/SimpleConversionTest.java
----------------------------------------------------------------------
diff --git a/systests/end-to-end-conversion-tests/src/test/java/org/apache/qpid/systests/end_to_end_conversion/SimpleConversionTest.java b/systests/end-to-end-conversion-tests/src/test/java/org/apache/qpid/systests/end_to_end_conversion/SimpleConversionTest.java
index dbad8fd..2385cd2 100644
--- a/systests/end-to-end-conversion-tests/src/test/java/org/apache/qpid/systests/end_to_end_conversion/SimpleConversionTest.java
+++ b/systests/end-to-end-conversion-tests/src/test/java/org/apache/qpid/systests/end_to_end_conversion/SimpleConversionTest.java
@@ -20,13 +20,20 @@
package org.apache.qpid.systests.end_to_end_conversion;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.CoreMatchers.startsWith;
+import static org.junit.Assert.assertThat;
import static org.junit.Assume.assumeFalse;
import static org.junit.Assume.assumeTrue;
+import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@@ -34,12 +41,15 @@ import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.systests.end_to_end_conversion.client.ClientInstruction;
+import org.apache.qpid.systests.end_to_end_conversion.client.ClientMessage;
+import org.apache.qpid.systests.end_to_end_conversion.client.ClientResult;
import org.apache.qpid.systests.end_to_end_conversion.client.MessageDescription;
import org.apache.qpid.systests.end_to_end_conversion.client.SerializableTestClass;
import org.apache.qpid.systests.end_to_end_conversion.client.VerificationException;
@@ -48,6 +58,9 @@ public class SimpleConversionTest extends EndToEndConversionTestBase
{
private static final long TEST_TIMEOUT = 30000L;
private static final String QUEUE_JNDI_NAME = "queue";
+ private static final EnumSet<Protocol> AMQP_PRE010_PROTOCOLS =
+ EnumSet.of(Protocol.AMQP_0_9, Protocol.AMQP_0_9_1, Protocol.AMQP_0_8);
+ private static final String JMS_MESSAGE_IDPOLICY_MESSAGE_IDTYPE = "jms.messageIDPolicy.messageIDType";
private HashMap<String, String> _defaultDestinations;
@Rule
@@ -178,6 +191,240 @@ public class SimpleConversionTest extends EndToEndConversionTestBase
}
@Test
+ public void providerAssignedMessageId09_010() throws Exception
+ {
+ assumeTrue(AMQP_PRE010_PROTOCOLS.contains(getPublisherProtocolVersion())
+ && EnumSet.of(Protocol.AMQP_0_10).contains(getSubscriberProtocolVersion()));
+
+ List<ClientMessage> clientResults = performProviderAssignedMessageIdTest(Collections.emptyMap());
+
+ ClientMessage publishedMessage = clientResults.get(0);
+ ClientMessage subscriberMessage = clientResults.get(1);
+
+ // Subscriber receives 0-10 UUID message-id. Qpid JMS 0-x library synthesizes the ID: prefix
+ assertThat(subscriberMessage.getJMSMessageID(), equalTo(publishedMessage.getJMSMessageID()));
+ }
+
+ @Test
+ public void providerAssignedMessageId09_10() throws Exception
+ {
+ assumeTrue(AMQP_PRE010_PROTOCOLS.contains(getPublisherProtocolVersion())
+ && EnumSet.of(Protocol.AMQP_1_0).contains(getSubscriberProtocolVersion()));
+
+ List<ClientMessage> clientResults = performProviderAssignedMessageIdTest(Collections.emptyMap());
+
+ ClientMessage publishedMessage = clientResults.get(0);
+ ClientMessage subscriberMessage = clientResults.get(1);
+
+ // TODO: On the wire the AMQP 1.0 client receives a string containing a message with
+ // message-id-string contain a ID: prefixed UUID. Would be better if the conversion layer sent a message-id-uuid
+ // as this would offer most compatibility and miminise the exposure of the ID prefix.
+ assertThat(subscriberMessage.getJMSMessageID(), equalTo(publishedMessage.getJMSMessageID()));
+ }
+
+ @Test
+ public void providerAssignedMessageId010_09() throws Exception
+ {
+ assumeTrue(EnumSet.of(Protocol.AMQP_0_10).contains(getPublisherProtocolVersion())
+ && AMQP_PRE010_PROTOCOLS.contains(getSubscriberProtocolVersion()));
+
+ List<ClientMessage> clientResults = performProviderAssignedMessageIdTest(Collections.emptyMap());
+
+ ClientMessage publishedMessage = clientResults.get(0);
+ ClientMessage subscriberMessage = clientResults.get(1);
+
+ // Subscriber receives AMQShortString message-id with a ID: prefix. The conversion layer already synthesizes
+ // this. See MessageConverter_0_10_to_0_8.java:130
+ assertThat(subscriberMessage.getJMSMessageID(), equalTo(publishedMessage.getJMSMessageID()));
+ }
+
+ @Test
+ public void providerAssignedMessageId010_10() throws Exception
+ {
+ assumeTrue(EnumSet.of(Protocol.AMQP_0_10).contains(getPublisherProtocolVersion())
+ && EnumSet.of(Protocol.AMQP_1_0).contains(getSubscriberProtocolVersion()));
+
+ List<ClientMessage> clientResults = performProviderAssignedMessageIdTest(Collections.emptyMap());
+
+ ClientMessage publishedMessage = clientResults.get(0);
+ ClientMessage subscriberMessage = clientResults.get(1);
+
+ // Publisher sends a 0-10 UUID message-id. This is converted into message-id-uuid. The Qpid JMS
+ // Client returns a ID:AMQP_UUID:
+ final String publishedJmsMessageID = publishedMessage.getJMSMessageID();
+ assertThat(publishedJmsMessageID, startsWith("ID:"));
+ String barePublishedJmsMessageID = publishedJmsMessageID.substring("ID:".length());
+ String expectedSubscriberJmsMessageID = String.format("ID:AMQP_UUID:%s", barePublishedJmsMessageID);
+ assertThat(subscriberMessage.getJMSMessageID(), equalTo(expectedSubscriberJmsMessageID));
+ }
+
+ @Test
+ public void providerAssignedMessageId_DefaultMode_10_09() throws Exception
+ {
+ assumeTrue(EnumSet.of(Protocol.AMQP_1_0).contains(getPublisherProtocolVersion())
+ && AMQP_PRE010_PROTOCOLS.contains(getSubscriberProtocolVersion()));
+
+ List<ClientMessage> clientResults = performProviderAssignedMessageIdTest(Collections.emptyMap());
+
+ // On the wire the "message-id-string comprises an identity of the publisher + a message sequence number
+ ClientMessage publishedMessage = clientResults.get(0);
+ ClientMessage subscriberMessage = clientResults.get(1);
+
+ assertThat(subscriberMessage.getJMSMessageID(), equalTo(publishedMessage.getJMSMessageID()));
+ }
+
+ @Test
+ @Ignore("Currently subscriber receives the correct message id but without the ID prefix")
+ public void providerAssignedMessageId_UuidMode_10_09() throws Exception
+ {
+ assumeTrue(EnumSet.of(Protocol.AMQP_1_0).contains(getPublisherProtocolVersion())
+ && AMQP_PRE010_PROTOCOLS
+ .contains(getSubscriberProtocolVersion()));
+
+ List<ClientMessage> clientResults = performProviderAssignedMessageIdTest(Collections.singletonMap(
+ JMS_MESSAGE_IDPOLICY_MESSAGE_IDTYPE, "UUID"));
+
+ ClientMessage publishedMessage = clientResults.get(0);
+ ClientMessage subscriberMessage = clientResults.get(1);
+
+ // On the wire the message id is a AMQP 1.0 UUID
+ final String publishedJmsMessageID = publishedMessage.getJMSMessageID();
+ assertThat(publishedJmsMessageID, startsWith("ID:AMQP_UUID:"));
+ String barePublishedJmsMessageID = publishedJmsMessageID.substring("ID:AMQP_UUID:".length());
+ String expectedSubscriberJmsMessageID = String.format("ID:%s", barePublishedJmsMessageID);
+ assertThat(subscriberMessage.getJMSMessageID(), equalTo(expectedSubscriberJmsMessageID));
+
+
+ // TODO: On the wire the AMQP 0-x client receives a message id containing the a stringified UUID without prefix.
+ // This is inconsistent - in all other cases the client receives message ids prefixed. Would be
+ // better if the conversion layer sent a synthesized the ID prefix.
+ }
+
+ @Test
+ @Ignore("Currently subscriber receives the correct message id but without the ID prefix")
+ public void providerAssignedMessageId_UuidStringMode_10_09() throws Exception
+ {
+ assumeTrue(EnumSet.of(Protocol.AMQP_1_0).contains(getPublisherProtocolVersion())
+ && AMQP_PRE010_PROTOCOLS.contains(getSubscriberProtocolVersion()));
+
+ List<ClientMessage> clientResults = performProviderAssignedMessageIdTest(Collections.singletonMap(
+ JMS_MESSAGE_IDPOLICY_MESSAGE_IDTYPE, "UUID_STRING"));
+
+ ClientMessage publishedMessage = clientResults.get(0);
+ ClientMessage subscriberMessage = clientResults.get(1);
+
+ // On the wire the message-id is a string containing a UUID with no prefix
+ final String publishedJmsMessageID = publishedMessage.getJMSMessageID();
+ assertThat(publishedJmsMessageID, startsWith("ID:AMQP_NO_PREFIX:"));
+ String barePublishedJmsMessageID = publishedJmsMessageID.substring("ID:AMQP_NO_PREFIX:".length());
+ String expectedSubscriberJmsMessageID = String.format("ID:%s", barePublishedJmsMessageID);
+ assertThat(subscriberMessage.getJMSMessageID(), equalTo(expectedSubscriberJmsMessageID));
+
+ // TODO ditto above
+ }
+
+ @Test
+ public void providerAssignedMessageId_PrefixedUuidStringMode_10_09() throws Exception
+ {
+ assumeTrue(EnumSet.of(Protocol.AMQP_1_0).contains(getPublisherProtocolVersion())
+ && AMQP_PRE010_PROTOCOLS.contains(getSubscriberProtocolVersion()));
+
+ List<ClientMessage> clientResults = performProviderAssignedMessageIdTest(Collections.singletonMap(
+ JMS_MESSAGE_IDPOLICY_MESSAGE_IDTYPE, "PREFIXED_UUID_STRING"));
+
+ ClientMessage publishedMessage = clientResults.get(0);
+ ClientMessage subscriberMessage = clientResults.get(1);
+
+ // On the wire the message-id is a message-id-string containing a UUID with ID: prefix
+ final String publishedJmsMessageID = publishedMessage.getJMSMessageID();
+ assertThat(publishedJmsMessageID, startsWith("ID:"));
+ String barePublishedJmsMessageID = publishedJmsMessageID.substring("ID:".length());
+ String expectedSubscriberJmsMessageID = String.format("ID:%s", barePublishedJmsMessageID);
+ assertThat(subscriberMessage.getJMSMessageID(), equalTo(expectedSubscriberJmsMessageID));
+ }
+
+ @Test
+ public void providerAssignedMessageId_DefaultMode_10_010() throws Exception
+ {
+ assumeTrue(EnumSet.of(Protocol.AMQP_1_0).contains(getPublisherProtocolVersion())
+ && EnumSet.of(Protocol.AMQP_0_10).contains(getSubscriberProtocolVersion()));
+
+ List<ClientMessage> clientResults = performProviderAssignedMessageIdTest(Collections.emptyMap());
+
+ // On the wire the message-id is a message-id-string comprising an identity of the pubisher + a message
+ // sequence number
+ ClientMessage publishedMessage = clientResults.get(0);
+ ClientMessage subscriberMessage = clientResults.get(1);
+
+ // Conversion layer manufactures an UUID. This will be unpredictable to the client.
+ assertThat(subscriberMessage.getJMSMessageID(), is(notNullValue()));
+ }
+
+ @Test
+ public void providerAssignedMessageId_UuidMode_10_010() throws Exception
+ {
+ assumeTrue(EnumSet.of(Protocol.AMQP_1_0).contains(getPublisherProtocolVersion())
+ && EnumSet.of(Protocol.AMQP_0_10).contains(getSubscriberProtocolVersion()));
+
+ List<ClientMessage> clientResults = performProviderAssignedMessageIdTest(Collections.singletonMap(
+ JMS_MESSAGE_IDPOLICY_MESSAGE_IDTYPE, "UUID"));
+
+ ClientMessage publishedMessage = clientResults.get(0);
+ ClientMessage subscriberMessage = clientResults.get(1);
+
+ // On the wire the message-id is a message-id-uuid
+ final String publishedJmsMessageID = publishedMessage.getJMSMessageID();
+ assertThat(publishedJmsMessageID, startsWith("ID:AMQP_UUID:"));
+ String barePublishedJmsMessageID = publishedJmsMessageID.substring("ID:AMQP_UUID:".length());
+ String expectedSubscriberJmsMessageID = String.format("ID:%s", barePublishedJmsMessageID);
+ assertThat(subscriberMessage.getJMSMessageID(), equalTo(expectedSubscriberJmsMessageID));
+ }
+
+ @Test
+ public void providerAssignedMessageId_UuidStringMode_10_010() throws Exception
+ {
+ assumeTrue(EnumSet.of(Protocol.AMQP_1_0).contains(getPublisherProtocolVersion())
+ && EnumSet.of(Protocol.AMQP_0_10).contains(getSubscriberProtocolVersion()));
+
+ List<ClientMessage> clientResults = performProviderAssignedMessageIdTest(Collections.singletonMap(
+ JMS_MESSAGE_IDPOLICY_MESSAGE_IDTYPE, "UUID_STRING"));
+
+ ClientMessage publishedMessage = clientResults.get(0);
+ ClientMessage subscriberMessage = clientResults.get(1);
+
+ // On the wire the message-id is a message-id-string containing a UUID without prefix
+ final String publishedJmsMessageID = publishedMessage.getJMSMessageID();
+ assertThat(publishedJmsMessageID, startsWith("ID:AMQP_NO_PREFIX:"));
+ String barePublishedJmsMessageID = publishedJmsMessageID.substring("ID:AMQP_NO_PREFIX:".length());
+ String expectedSubscriberJmsMessageID = String.format("ID:%s", barePublishedJmsMessageID);
+ assertThat(subscriberMessage.getJMSMessageID(), equalTo(expectedSubscriberJmsMessageID));
+ }
+
+ @Test
+ @Ignore("Currently subscriber receives a UUID that differs from the one sent")
+ public void providerAssignedMessageId_PrefixedUuidStringMode_10_010() throws Exception
+ {
+ assumeTrue(EnumSet.of(Protocol.AMQP_1_0).contains(getPublisherProtocolVersion())
+ && EnumSet.of(Protocol.AMQP_0_10).contains(getSubscriberProtocolVersion()));
+
+ List<ClientMessage> clientResults = performProviderAssignedMessageIdTest(Collections.singletonMap(
+ JMS_MESSAGE_IDPOLICY_MESSAGE_IDTYPE, "PREFIXED_UUID_STRING"));
+
+ ClientMessage publishedMessage = clientResults.get(0);
+ ClientMessage subscriberMessage = clientResults.get(1);
+
+ // On the wire the message-id is a message-id-string containing a UUID with ID: prefix
+ final String publishedJmsMessageID = publishedMessage.getJMSMessageID();
+ assertThat(publishedJmsMessageID, startsWith("ID:"));
+ String barePublishedJmsMessageID = publishedJmsMessageID.substring("ID:".length());
+ String expectedSubscriberJmsMessageID = String.format("ID:%s", barePublishedJmsMessageID);
+ assertThat(subscriberMessage.getJMSMessageID(), equalTo(expectedSubscriberJmsMessageID));
+
+ // TODO correct conversion layer so that a string that contains a ID prefixed UUID is converted
+ // as a UUID.
+ }
+
+ @Test
public void property() throws Exception
{
final MessageDescription messageDescription = new MessageDescription();
@@ -297,7 +544,7 @@ public class SimpleConversionTest extends EndToEndConversionTestBase
performTest(publisherInstructions, subscriberInstructions);
}
- private void performSimpleTest(final MessageDescription messageDescription) throws Exception
+ private List<ClientResult> performSimpleTest(final MessageDescription messageDescription) throws Exception
{
final String destinationJndiName = QUEUE_JNDI_NAME;
final List<ClientInstruction> publisherInstructions =
@@ -308,17 +555,17 @@ public class SimpleConversionTest extends EndToEndConversionTestBase
new ClientInstructionBuilder().configureDestinations(_defaultDestinations)
.receiveMessage(destinationJndiName, messageDescription)
.build();
- performTest(publisherInstructions,subscriberInstructions);
+ return performTest(publisherInstructions,subscriberInstructions);
}
- private void performTest(final List<ClientInstruction> publisherInstructions,
- final List<ClientInstruction> subscriberInstructions) throws Exception
+ private List<ClientResult> performTest(final List<ClientInstruction> publisherInstructions,
+ final List<ClientInstruction> subscriberInstructions) throws Exception
{
- final ListenableFuture<?> publisherFuture = runPublisher(publisherInstructions);
- final ListenableFuture<?> subscriberFuture = runSubscriber(subscriberInstructions);
+ final ListenableFuture<ClientResult> publisherFuture = runPublisher(publisherInstructions);
+ final ListenableFuture<ClientResult> subscriberFuture = runSubscriber(subscriberInstructions);
try
{
- Futures.allAsList(publisherFuture, subscriberFuture).get(TEST_TIMEOUT, TimeUnit.MILLISECONDS);
+ return Futures.allAsList(publisherFuture, subscriberFuture).get(TEST_TIMEOUT, TimeUnit.MILLISECONDS);
}
catch (ExecutionException e)
{
@@ -337,4 +584,32 @@ public class SimpleConversionTest extends EndToEndConversionTestBase
}
}
}
+
+ private List<ClientMessage> performProviderAssignedMessageIdTest(final Map<String, String> publisherConnectionUrlConfig) throws Exception
+ {
+ final MessageDescription messageDescription = new MessageDescription();
+
+ final String destinationJndiName = QUEUE_JNDI_NAME;
+ final List<ClientInstruction> publisherInstructions =
+ new ClientInstructionBuilder().configureConnectionUrl(publisherConnectionUrlConfig)
+ .configureDestinations(_defaultDestinations)
+ .publishMessage(destinationJndiName, messageDescription)
+ .build();
+ final List<ClientInstruction> subscriberInstructions =
+ new ClientInstructionBuilder().configureDestinations(_defaultDestinations)
+ .receiveMessage(destinationJndiName, messageDescription)
+ .build();
+ List<ClientResult> clientResults = performTest(publisherInstructions, subscriberInstructions);
+ assertThat("Unexpected number of client results", clientResults.size(), equalTo(2));
+
+ ClientResult publishedClientResult = clientResults.get(0);
+ assertThat("Unexpected number of published client messages", publishedClientResult.getClientMessages().size(), equalTo(1));
+ ClientMessage publishedMessage = publishedClientResult.getClientMessages().get(0);
+
+ ClientResult subscriberClientResults = clientResults.get(1);
+ assertThat("Unexpected number of published client messages", subscriberClientResults.getClientMessages().size(), equalTo(1));
+ ClientMessage subscriberMessage = subscriberClientResults.getClientMessages().get(0);
+
+ return Arrays.asList(publishedMessage, subscriberMessage);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org