You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2018/05/11 16:31:59 UTC

[4/6] qpid-broker-j git commit: QPID-8182: [End to End Conversion Tests] Extend test mechanism to allow testing of JMS provider assigned message ids. Add test cases too.

QPID-8182: [End to End Conversion Tests] Extend test mechanism to allow testing of JMS provider assigned message ids.  Add test cases too.

(cherry picked from commit d293206d72989f1004f8fa3577f36d2da104f615)


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/40df8179
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/40df8179
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/40df8179

Branch: refs/heads/7.0.x
Commit: 40df8179df216530caa418197175fdf8bba8a0a3
Parents: d79537d
Author: Keith Wall <kw...@apache.org>
Authored: Mon May 7 07:39:27 2018 +0100
Committer: Alex Rudyy <or...@apache.org>
Committed: Fri May 11 17:30:23 2018 +0100

----------------------------------------------------------------------
 .../ClientInstructionBuilder.java               |   7 +
 .../EndToEndConversionTestBase.java             |  67 +++--
 .../client/AugumentConnectionUrl.java           |  42 +++
 .../end_to_end_conversion/client/Client.java    | 162 ++++++++++-
 .../client/ClientMessage.java                   |  37 +++
 .../client/ClientResult.java                    |  18 +-
 .../SimpleConversionTest.java                   | 289 ++++++++++++++++++-
 7 files changed, 579 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/40df8179/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/ClientInstructionBuilder.java
----------------------------------------------------------------------
diff --git a/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/ClientInstructionBuilder.java b/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/ClientInstructionBuilder.java
index 3559693..2d208f4 100644
--- a/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/ClientInstructionBuilder.java
+++ b/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/ClientInstructionBuilder.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.qpid.systests.end_to_end_conversion.client.AugumentConnectionUrl;
 import org.apache.qpid.systests.end_to_end_conversion.client.ClientInstruction;
 import org.apache.qpid.systests.end_to_end_conversion.client.ConfigureDestination;
 import org.apache.qpid.systests.end_to_end_conversion.client.MessageDescription;
@@ -35,6 +36,12 @@ public class ClientInstructionBuilder
     private List<ClientInstruction> _clientInstructions = new ArrayList<>();
     private MessageDescription _latestMessageDescription;
 
+    public ClientInstructionBuilder configureConnectionUrl(final Map<String, String> connectionUrlConfig)
+    {
+        _clientInstructions.add(new AugumentConnectionUrl(connectionUrlConfig));
+        return this;
+    }
+
     public ClientInstructionBuilder publishMessage(final String destinationJndiName)
     {
         return publishMessage(destinationJndiName, new MessageDescription());

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/40df8179/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/EndToEndConversionTestBase.java
----------------------------------------------------------------------
diff --git a/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/EndToEndConversionTestBase.java b/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/EndToEndConversionTestBase.java
index 66e601a..1e52152 100644
--- a/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/EndToEndConversionTestBase.java
+++ b/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/EndToEndConversionTestBase.java
@@ -48,6 +48,7 @@ import org.slf4j.LoggerFactory;
 import org.slf4j.event.Level;
 
 import org.apache.qpid.server.model.Protocol;
+import org.apache.qpid.systests.end_to_end_conversion.client.AugumentConnectionUrl;
 import org.apache.qpid.systests.end_to_end_conversion.client.Client;
 import org.apache.qpid.systests.end_to_end_conversion.client.ClientInstruction;
 import org.apache.qpid.systests.end_to_end_conversion.client.ClientResult;
@@ -94,7 +95,7 @@ public class EndToEndConversionTestBase extends BrokerAdminUsingTestBase
         System.out.println("LQDEBUG: " + ClasspathQuery.getCacheStats());
     }
 
-    protected ListenableFuture<?> runPublisher(final List<ClientInstruction> clientInstructions)
+    protected ListenableFuture<ClientResult> runPublisher(final List<ClientInstruction> clientInstructions)
     {
         List<String> gavs = Arrays.asList(System.getProperty("qpid.systests.end_to_end_conversion.publisherGavs",
                                                              "org.apache.qpid:qpid-jms-client:LATEST")
@@ -105,11 +106,11 @@ public class EndToEndConversionTestBase extends BrokerAdminUsingTestBase
 
         return _executorService.submit(() -> {
             Thread.currentThread().setName("Publisher");
-            runClient(gavs, additionalJavaArgs, clientInstructions);
+            return runClient(gavs, additionalJavaArgs, clientInstructions);
         });
     }
 
-    protected ListenableFuture<?> runSubscriber(final List<ClientInstruction> clientInstructions)
+    protected ListenableFuture<ClientResult> runSubscriber(final List<ClientInstruction> clientInstructions)
     {
         List<String> gavs = Arrays.asList(System.getProperty("qpid.systests.end_to_end_conversion.subscriberGavs",
                                                              "org.apache.qpid:qpid-client:LATEST,org.apache.geronimo.specs:geronimo-jms_1.1_spec:1.1.1")
@@ -121,11 +122,11 @@ public class EndToEndConversionTestBase extends BrokerAdminUsingTestBase
 
         return _executorService.submit(() -> {
             Thread.currentThread().setName("Subscriber");
-            runClient(gavs, additionalJavaArgs, clientInstructions);
+            return runClient(gavs, additionalJavaArgs, clientInstructions);
         });
     }
 
-    private List<ClientInstruction> amendClientInstructions(final List<ClientInstruction> clientInstructions,
+    private List<ClientInstruction> amendClientInstructions(List<ClientInstruction> clientInstructions,
                                                             final boolean amqp0xClient)
     {
         if (clientInstructions.isEmpty())
@@ -146,18 +147,27 @@ public class EndToEndConversionTestBase extends BrokerAdminUsingTestBase
             }
         }
 
+        List<AugumentConnectionUrl> configUrls = clientInstructions.stream()
+                                                                   .filter(AugumentConnectionUrl.class::isInstance)
+                                                                   .map(AugumentConnectionUrl.class::cast)
+                                                                   .collect(Collectors.toList());
+
         final String contextFactory;
         final String connectionUrl;
         if (amqp0xClient)
         {
             contextFactory = getAmqp0xContextFactory();
-            connectionUrl = getAmqp0xConnectionUrl();
+            connectionUrl = getAmqp0xConnectionUrl(configUrls);
         }
         else
         {
             contextFactory = getAmqp10ContextFactory();
-            connectionUrl = getAmqp10ConnectionUrl();
+            connectionUrl = getAmqp10ConnectionUrl(configUrls);
         }
+
+        clientInstructions = new ArrayList<>(clientInstructions);
+        clientInstructions.removeAll(configUrls);
+
         ConfigureJndiContext jndiContext = new ConfigureJndiContext(contextFactory, connectionUrl);
         List<ClientInstruction> instructions = new ArrayList<>();
         instructions.add(jndiContext);
@@ -209,12 +219,13 @@ public class EndToEndConversionTestBase extends BrokerAdminUsingTestBase
                 return Protocol.AMQP_0_8;
             }
         }
-        throw new RuntimeException("Unable to determine client protocol version");
+        throw new RuntimeException(String.format("Unable to determine client protocol version. Addition args are : "
+                                                 + "[%s]", additionalArgs));
     }
 
-    private void runClient(final Collection<String> clientGavs,
-                           final List<String> additionalJavaArguments,
-                           final List<ClientInstruction> jmsInstructions)
+    private ClientResult runClient(final Collection<String> clientGavs,
+                                   final List<String> additionalJavaArguments,
+                                   final List<ClientInstruction> jmsInstructions)
     {
         final List<ClientInstruction> clientInstructions = amendClientInstructions(jmsInstructions,
                                                                                    isAmqp0xClient(clientGavs));
@@ -252,11 +263,12 @@ public class EndToEndConversionTestBase extends BrokerAdminUsingTestBase
                     final Object result = inputStream.readObject();
                     if (result instanceof ClientResult)
                     {
-                        final ClientResult publisherResult = (ClientResult) result;
-                        if (publisherResult.getException() != null)
+                        final ClientResult clientResult = (ClientResult) result;
+                        if (clientResult.getException() != null)
                         {
-                            throw publisherResult.getException();
+                            throw clientResult.getException();
                         }
+                        return clientResult;
                     }
                     else
                     {
@@ -269,10 +281,9 @@ public class EndToEndConversionTestBase extends BrokerAdminUsingTestBase
                     p.waitFor();
                     loggingThread.flush();
                     loggingThread.stop();
+                    LOGGER.debug("client process finished exit value: {}", p.exitValue());
                 }
             }
-
-            LOGGER.debug("client process finished exit value: {}", p.exitValue());
         }
         catch (RuntimeException e)
         {
@@ -295,11 +306,22 @@ public class EndToEndConversionTestBase extends BrokerAdminUsingTestBase
         return "org.apache.qpid.jndi.PropertiesFileInitialContextFactory";
     }
 
-    private String getAmqp0xConnectionUrl()
+    private String getAmqp0xConnectionUrl(final List<AugumentConnectionUrl> configUrls)
     {
         InetSocketAddress brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
         int port = brokerAddress.getPort();
         String hostString = "localhost";
+
+        if (!configUrls.isEmpty())
+        {
+            for (final AugumentConnectionUrl configUrl : configUrls)
+            {
+                if (!configUrl.getConnectionUrlConfig().isEmpty())
+                {
+                    throw new UnsupportedOperationException("Not implemented");
+                }
+            }
+        }
         return String.format("amqp://clientid/?brokerlist='tcp://%s:%d'", hostString, port);
     }
 
@@ -308,13 +330,20 @@ public class EndToEndConversionTestBase extends BrokerAdminUsingTestBase
         return "org.apache.qpid.jms.jndi.JmsInitialContextFactory";
     }
 
-    private String getAmqp10ConnectionUrl()
+    private String getAmqp10ConnectionUrl(final List<AugumentConnectionUrl> configUrls)
     {
         InetSocketAddress brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
         int port = brokerAddress.getPort();
         String hostString = "localhost";
         int connectTimeout = 30000;
-        return String.format("amqp://%s:%d?jms.connectTimeout=%d", hostString, port, connectTimeout);
+
+        String additional = configUrls.stream()
+                                      .map(i -> i.getConnectionUrlConfig().entrySet())
+                                      .flatMap(Collection::stream)
+                                      .map(e -> String.format("%s=%s", e.getKey(), e.getValue()))
+                                      .collect(Collectors.joining("&", "&", ""));
+
+        return String.format("amqp://%s:%d?jms.connectTimeout=%d%s", hostString, port, connectTimeout, additional);
     }
 
     private boolean isAmqp0xClient(final Collection<String> gavs)

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/40df8179/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/client/AugumentConnectionUrl.java
----------------------------------------------------------------------
diff --git a/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/client/AugumentConnectionUrl.java b/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/client/AugumentConnectionUrl.java
new file mode 100644
index 0000000..df7137e
--- /dev/null
+++ b/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/client/AugumentConnectionUrl.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.systests.end_to_end_conversion.client;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+public class AugumentConnectionUrl implements ClientInstruction
+{
+    private Map<String, String> _connectionUrlConfig;
+
+    public AugumentConnectionUrl(final Map<String, String> connectionUrlConfig)
+    {
+
+        _connectionUrlConfig = new HashMap<>(connectionUrlConfig);
+    }
+
+
+    public Map<String, String> getConnectionUrlConfig()
+    {
+        return Collections.unmodifiableMap(_connectionUrlConfig);
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/40df8179/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/client/Client.java
----------------------------------------------------------------------
diff --git a/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/client/Client.java b/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/client/Client.java
index b6ad152..9e53200 100644
--- a/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/client/Client.java
+++ b/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/client/Client.java
@@ -26,6 +26,7 @@ import java.io.PrintWriter;
 import java.io.Serializable;
 import java.io.StringWriter;
 import java.net.Socket;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Hashtable;
 import java.util.List;
@@ -83,6 +84,7 @@ public class Client
                 }
                 System.out.println(String.format("Received instructions : %s", instructions.toString()));
 
+                List<ClientMessage> clientMessages = new ArrayList<>();
                 if (!instructions.isEmpty())
                 {
                     String connectionUrl = null;
@@ -111,7 +113,12 @@ public class Client
                             try
                             {
                                 connection.start();
-                                handleInstructions(context, connection, instructions.subList(i, instructions.size()));
+                                List<ClientMessage> messages = handleInstructions(context,
+                                                                                  connection,
+                                                                                  instructions.subList(i,
+                                                                                                       instructions
+                                                                                                               .size()));
+                                clientMessages.addAll(messages);
                             }
                             finally
                             {
@@ -122,7 +129,7 @@ public class Client
                     }
                 }
                 System.out.println("Finished successfully");
-                objectOutputStream.writeObject(new ClientResult());
+                objectOutputStream.writeObject(new ClientResult(clientMessages));
             }
             catch (VerificationException e)
             {
@@ -153,21 +160,23 @@ public class Client
         return sw.toString();
     }
 
-    private void handleInstructions(final Context context,
-                                    final Connection connection,
-                                    final List<ClientInstruction> instructions) throws Exception
+    private List<ClientMessage> handleInstructions(final Context context,
+                                                   final Connection connection,
+                                                   final List<ClientInstruction> instructions) throws Exception
     {
+        List<ClientMessage> clientMessages = new ArrayList<>(instructions.size());
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         try
         {
             for (ClientInstruction instruction : instructions)
             {
                 System.out.println(String.format("Process instruction: %s", instruction));
+                final ClientMessage clientMessage;
                 if (instruction instanceof MessagingInstruction.PublishMessage)
                 {
                     final MessagingInstruction.PublishMessage publishInstruction =
                             (MessagingInstruction.PublishMessage) instruction;
-                    publishMessage(context, session, publishInstruction);
+                    clientMessage = publishMessage(context, session, publishInstruction);
                 }
                 else if (instruction instanceof MessagingInstruction.ReceiveMessage)
                 {
@@ -176,24 +185,26 @@ public class Client
                     final Destination destination =
                             (Destination) context.lookup(receiveInstruction.getDestinationJndiName());
                     final MessageDescription messageDescription = receiveInstruction.getMessageDescription();
-                    receiveMessage(session, destination, messageDescription);
+                    clientMessage = receiveMessage(session, destination, messageDescription);
                 }
                 else
                 {
                     throw new RuntimeException(String.format("Unknown jmsInstruction class: '%s'",
                                                              instruction.getClass().getName()));
                 }
+                clientMessages.add(clientMessage);
             }
         }
         finally
         {
             session.close();
         }
+        return clientMessages;
     }
 
-    private void receiveMessage(final Session session,
-                                final Destination queue,
-                                final MessageDescription messageDescription) throws Exception
+    private ClientMessage receiveMessage(final Session session,
+                                         final Destination queue,
+                                         final MessageDescription messageDescription) throws Exception
     {
         final Message message;
         MessageConsumer consumer = session.createConsumer(queue);
@@ -215,11 +226,14 @@ public class Client
                       message.getJMSReplyTo(),
                       messageDescription.getHeader(MessageDescription.MessageHeader.CORRELATION_ID));
         }
+
+        return buildClientMessage(message);
     }
 
-    private void publishMessage(final Context context,
-                                final Session session,
-                                final MessagingInstruction.PublishMessage publishMessageInstruction) throws Exception
+    private ClientMessage publishMessage(final Context context,
+                                         final Session session,
+                                         final MessagingInstruction.PublishMessage publishMessageInstruction)
+            throws Exception
     {
         final MessageDescription messageDescription = publishMessageInstruction.getMessageDescription();
 
@@ -286,6 +300,9 @@ public class Client
                 replyToConsumer.close();
             }
         }
+
+        return buildClientMessage(message);
+
     }
 
     private void receiveReply(final MessageConsumer consumer, final Serializable expectedCorrelationId)
@@ -338,4 +355,123 @@ public class Client
             producer.close();
         }
     }
+
+    private ClientMessage buildClientMessage(final Message message) throws JMSException
+    {
+        String jmsMessageID = message.getJMSMessageID();
+        String jmsCorrelationID = message.getJMSCorrelationID();
+        byte[] jmsCorrelationIDAsBytes;
+        try
+        {
+            jmsCorrelationIDAsBytes = message.getJMSCorrelationIDAsBytes();
+        }
+        // NPE is thrown in 6.1.x JMS AMQP 0-x client on attempt to retrieve correlation ID when it is not set
+        // The issue was fixed in 6.3.0 client as part of QPID-7897
+        catch (JMSException | NullPointerException e)
+        {
+            jmsCorrelationIDAsBytes = null;
+        }
+        long jmsTimestamp = message.getJMSTimestamp();
+        int jmsDeliveryMode = message.getJMSDeliveryMode();
+        boolean jmsRedelivered = message.getJMSRedelivered();
+        String jmsType = message.getJMSType();
+        long jmsExpiration = message.getJMSExpiration();
+        int jmsPriority = message.getJMSPriority();
+
+        return new JMSMessageAdaptor(jmsMessageID,
+                                     jmsTimestamp,
+                                     jmsCorrelationID,
+                                     jmsCorrelationIDAsBytes,
+                                     jmsDeliveryMode,
+                                     jmsRedelivered,
+                                     jmsType,
+                                     jmsExpiration,
+                                     jmsPriority);
+    }
+
+    private static class JMSMessageAdaptor implements ClientMessage
+    {
+        private final String _jmsMessageID;
+        private final long _jmsTimestamp;
+        private final String _jmsCorrelationID;
+        private final byte[] _jmsCorrelationIDAsBytes;
+        private final int _jmsDeliveryMode;
+        private final boolean _jmsRedelivered;
+        private final String _jmsType;
+        private final long _jmsExpiration;
+        private final int _jmsPriority;
+
+        JMSMessageAdaptor(final String jmsMessageID,
+                          final long jmsTimestamp,
+                          final String jmsCorrelationID,
+                          final byte[] jmsCorrelationIDAsBytes,
+                          final int jmsDeliveryMode,
+                          final boolean jmsRedelivered,
+                          final String jmsType, final long jmsExpiration, final int jmsPriority)
+        {
+            _jmsMessageID = jmsMessageID;
+            _jmsTimestamp = jmsTimestamp;
+            _jmsCorrelationID = jmsCorrelationID;
+            _jmsCorrelationIDAsBytes = jmsCorrelationIDAsBytes;
+            _jmsDeliveryMode = jmsDeliveryMode;
+            _jmsRedelivered = jmsRedelivered;
+            _jmsType = jmsType;
+            _jmsExpiration = jmsExpiration;
+            _jmsPriority = jmsPriority;
+        }
+
+        @Override
+        public String getJMSMessageID()
+        {
+            return _jmsMessageID;
+        }
+
+        @Override
+        public long getJMSTimestamp()
+        {
+            return _jmsTimestamp;
+        }
+
+        @Override
+        public String getJMSCorrelationID()
+        {
+            return _jmsCorrelationID;
+        }
+
+        @Override
+        public byte[] getJMSCorrelationIDAsBytes()
+        {
+            return _jmsCorrelationIDAsBytes;
+        }
+
+        @Override
+        public int getJMSDeliveryMode()
+        {
+            return _jmsDeliveryMode;
+        }
+
+        @Override
+        public boolean getJMSRedelivered()
+        {
+            return _jmsRedelivered;
+        }
+
+        @Override
+        public String getJMSType()
+        {
+            return _jmsType;
+        }
+
+        @Override
+        public long getJMSExpiration()
+        {
+            return _jmsExpiration;
+        }
+
+        @Override
+        public int getJMSPriority()
+        {
+            return _jmsPriority;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/40df8179/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/client/ClientMessage.java
----------------------------------------------------------------------
diff --git a/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/client/ClientMessage.java b/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/client/ClientMessage.java
new file mode 100644
index 0000000..d3eb146
--- /dev/null
+++ b/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/client/ClientMessage.java
@@ -0,0 +1,37 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.systests.end_to_end_conversion.client;
+
+import java.io.Serializable;
+
+public interface ClientMessage extends Serializable
+{
+    String getJMSMessageID();
+    long getJMSTimestamp();
+    String getJMSCorrelationID();
+    byte[] getJMSCorrelationIDAsBytes();
+    int getJMSDeliveryMode();
+    boolean getJMSRedelivered();
+    String getJMSType();
+    long getJMSExpiration();
+    int getJMSPriority();
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/40df8179/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/client/ClientResult.java
----------------------------------------------------------------------
diff --git a/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/client/ClientResult.java b/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/client/ClientResult.java
index f8eef68..ff91780 100644
--- a/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/client/ClientResult.java
+++ b/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/client/ClientResult.java
@@ -21,23 +21,33 @@
 package org.apache.qpid.systests.end_to_end_conversion.client;
 
 import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
 
 public class ClientResult implements Serializable
 {
     private final Exception _exception;
+    private final List<ClientMessage> _clientMessages;
 
-    public ClientResult()
+    public ClientResult(final Exception exception)
     {
-        this(null);
+        _exception = exception;
+        _clientMessages = Collections.emptyList();
     }
 
-    public ClientResult(final Exception exception)
+    public ClientResult(final List<ClientMessage> clientMessages)
     {
-        _exception = exception;
+        _exception = null;
+        _clientMessages = clientMessages;
     }
 
     public Exception getException()
     {
         return _exception;
     }
+
+    public List<ClientMessage> getClientMessages()
+    {
+        return Collections.unmodifiableList(_clientMessages);
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/40df8179/systests/end-to-end-conversion-tests/src/test/java/org/apache/qpid/systests/end_to_end_conversion/SimpleConversionTest.java
----------------------------------------------------------------------
diff --git a/systests/end-to-end-conversion-tests/src/test/java/org/apache/qpid/systests/end_to_end_conversion/SimpleConversionTest.java b/systests/end-to-end-conversion-tests/src/test/java/org/apache/qpid/systests/end_to_end_conversion/SimpleConversionTest.java
index dbad8fd..2385cd2 100644
--- a/systests/end-to-end-conversion-tests/src/test/java/org/apache/qpid/systests/end_to_end_conversion/SimpleConversionTest.java
+++ b/systests/end-to-end-conversion-tests/src/test/java/org/apache/qpid/systests/end_to_end_conversion/SimpleConversionTest.java
@@ -20,13 +20,20 @@
 
 package org.apache.qpid.systests.end_to_end_conversion;
 
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.CoreMatchers.startsWith;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assume.assumeFalse;
 import static org.junit.Assume.assumeTrue;
 
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
@@ -34,12 +41,15 @@ import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
 
 import org.apache.qpid.server.model.Protocol;
 import org.apache.qpid.systests.end_to_end_conversion.client.ClientInstruction;
+import org.apache.qpid.systests.end_to_end_conversion.client.ClientMessage;
+import org.apache.qpid.systests.end_to_end_conversion.client.ClientResult;
 import org.apache.qpid.systests.end_to_end_conversion.client.MessageDescription;
 import org.apache.qpid.systests.end_to_end_conversion.client.SerializableTestClass;
 import org.apache.qpid.systests.end_to_end_conversion.client.VerificationException;
@@ -48,6 +58,9 @@ public class SimpleConversionTest extends EndToEndConversionTestBase
 {
     private static final long TEST_TIMEOUT = 30000L;
     private static final String QUEUE_JNDI_NAME = "queue";
+    private static final EnumSet<Protocol> AMQP_PRE010_PROTOCOLS =
+            EnumSet.of(Protocol.AMQP_0_9, Protocol.AMQP_0_9_1, Protocol.AMQP_0_8);
+    private static final String JMS_MESSAGE_IDPOLICY_MESSAGE_IDTYPE = "jms.messageIDPolicy.messageIDType";
 
     private HashMap<String, String> _defaultDestinations;
     @Rule
@@ -178,6 +191,240 @@ public class SimpleConversionTest extends EndToEndConversionTestBase
     }
 
     @Test
+    public void providerAssignedMessageId09_010() throws Exception
+    {
+        assumeTrue(AMQP_PRE010_PROTOCOLS.contains(getPublisherProtocolVersion())
+                   && EnumSet.of(Protocol.AMQP_0_10).contains(getSubscriberProtocolVersion()));
+
+        List<ClientMessage> clientResults = performProviderAssignedMessageIdTest(Collections.emptyMap());
+
+        ClientMessage publishedMessage = clientResults.get(0);
+        ClientMessage subscriberMessage = clientResults.get(1);
+
+        // Subscriber receives 0-10 UUID message-id.  Qpid JMS 0-x library synthesizes the ID: prefix
+        assertThat(subscriberMessage.getJMSMessageID(), equalTo(publishedMessage.getJMSMessageID()));
+    }
+
+    @Test
+    public void providerAssignedMessageId09_10() throws Exception
+    {
+        assumeTrue(AMQP_PRE010_PROTOCOLS.contains(getPublisherProtocolVersion())
+                   && EnumSet.of(Protocol.AMQP_1_0).contains(getSubscriberProtocolVersion()));
+
+        List<ClientMessage> clientResults = performProviderAssignedMessageIdTest(Collections.emptyMap());
+
+        ClientMessage publishedMessage = clientResults.get(0);
+        ClientMessage subscriberMessage = clientResults.get(1);
+
+        // TODO: On the wire the AMQP 1.0 client receives a string containing a message with
+        // message-id-string contain a ID: prefixed UUID. Would be better if the conversion layer sent a message-id-uuid
+        // as this would offer most compatibility and miminise the exposure of the ID prefix.
+        assertThat(subscriberMessage.getJMSMessageID(), equalTo(publishedMessage.getJMSMessageID()));
+    }
+
+    @Test
+    public void providerAssignedMessageId010_09() throws Exception
+    {
+        assumeTrue(EnumSet.of(Protocol.AMQP_0_10).contains(getPublisherProtocolVersion())
+                   && AMQP_PRE010_PROTOCOLS.contains(getSubscriberProtocolVersion()));
+
+        List<ClientMessage> clientResults = performProviderAssignedMessageIdTest(Collections.emptyMap());
+
+        ClientMessage publishedMessage = clientResults.get(0);
+        ClientMessage subscriberMessage = clientResults.get(1);
+
+        // Subscriber receives AMQShortString message-id with a ID: prefix.  The conversion layer already synthesizes
+        // this. See MessageConverter_0_10_to_0_8.java:130
+        assertThat(subscriberMessage.getJMSMessageID(), equalTo(publishedMessage.getJMSMessageID()));
+    }
+
+    @Test
+    public void providerAssignedMessageId010_10() throws Exception
+    {
+        assumeTrue(EnumSet.of(Protocol.AMQP_0_10).contains(getPublisherProtocolVersion())
+                   && EnumSet.of(Protocol.AMQP_1_0).contains(getSubscriberProtocolVersion()));
+
+        List<ClientMessage> clientResults = performProviderAssignedMessageIdTest(Collections.emptyMap());
+
+        ClientMessage publishedMessage = clientResults.get(0);
+        ClientMessage subscriberMessage = clientResults.get(1);
+
+        // Publisher sends a 0-10 UUID message-id. This is converted into message-id-uuid.  The Qpid JMS
+        // Client returns a ID:AMQP_UUID:
+        final String publishedJmsMessageID = publishedMessage.getJMSMessageID();
+        assertThat(publishedJmsMessageID, startsWith("ID:"));
+        String barePublishedJmsMessageID = publishedJmsMessageID.substring("ID:".length());
+        String expectedSubscriberJmsMessageID = String.format("ID:AMQP_UUID:%s", barePublishedJmsMessageID);
+        assertThat(subscriberMessage.getJMSMessageID(), equalTo(expectedSubscriberJmsMessageID));
+    }
+
+    @Test
+    public void providerAssignedMessageId_DefaultMode_10_09() throws Exception
+    {
+        assumeTrue(EnumSet.of(Protocol.AMQP_1_0).contains(getPublisherProtocolVersion())
+                   && AMQP_PRE010_PROTOCOLS.contains(getSubscriberProtocolVersion()));
+
+        List<ClientMessage> clientResults = performProviderAssignedMessageIdTest(Collections.emptyMap());
+
+        // On the wire the "message-id-string comprises an identity of the publisher + a message sequence number
+        ClientMessage publishedMessage = clientResults.get(0);
+        ClientMessage subscriberMessage = clientResults.get(1);
+
+        assertThat(subscriberMessage.getJMSMessageID(), equalTo(publishedMessage.getJMSMessageID()));
+    }
+
+    @Test
+    @Ignore("Currently subscriber receives the correct message id but without the ID prefix")
+    public void providerAssignedMessageId_UuidMode_10_09() throws Exception
+    {
+        assumeTrue(EnumSet.of(Protocol.AMQP_1_0).contains(getPublisherProtocolVersion())
+                   && AMQP_PRE010_PROTOCOLS
+                           .contains(getSubscriberProtocolVersion()));
+
+        List<ClientMessage> clientResults = performProviderAssignedMessageIdTest(Collections.singletonMap(
+                JMS_MESSAGE_IDPOLICY_MESSAGE_IDTYPE, "UUID"));
+
+        ClientMessage publishedMessage = clientResults.get(0);
+        ClientMessage subscriberMessage = clientResults.get(1);
+
+        // On the wire the message id is a AMQP 1.0 UUID
+        final String publishedJmsMessageID = publishedMessage.getJMSMessageID();
+        assertThat(publishedJmsMessageID, startsWith("ID:AMQP_UUID:"));
+        String barePublishedJmsMessageID = publishedJmsMessageID.substring("ID:AMQP_UUID:".length());
+        String expectedSubscriberJmsMessageID = String.format("ID:%s", barePublishedJmsMessageID);
+        assertThat(subscriberMessage.getJMSMessageID(), equalTo(expectedSubscriberJmsMessageID));
+
+
+        // TODO: On the wire the AMQP 0-x client receives a message id containing the a stringified UUID without prefix.
+        // This is inconsistent - in all other cases the client receives message ids prefixed.  Would be
+        // better if the conversion layer sent a synthesized the ID prefix.
+    }
+
+    @Test
+    @Ignore("Currently subscriber receives the correct message id but without the ID prefix")
+    public void providerAssignedMessageId_UuidStringMode_10_09() throws Exception
+    {
+        assumeTrue(EnumSet.of(Protocol.AMQP_1_0).contains(getPublisherProtocolVersion())
+                   && AMQP_PRE010_PROTOCOLS.contains(getSubscriberProtocolVersion()));
+
+        List<ClientMessage> clientResults = performProviderAssignedMessageIdTest(Collections.singletonMap(
+                JMS_MESSAGE_IDPOLICY_MESSAGE_IDTYPE, "UUID_STRING"));
+
+        ClientMessage publishedMessage = clientResults.get(0);
+        ClientMessage subscriberMessage = clientResults.get(1);
+
+        // On the wire the message-id is a string containing a UUID with no prefix
+        final String publishedJmsMessageID = publishedMessage.getJMSMessageID();
+        assertThat(publishedJmsMessageID, startsWith("ID:AMQP_NO_PREFIX:"));
+        String barePublishedJmsMessageID = publishedJmsMessageID.substring("ID:AMQP_NO_PREFIX:".length());
+        String expectedSubscriberJmsMessageID = String.format("ID:%s", barePublishedJmsMessageID);
+        assertThat(subscriberMessage.getJMSMessageID(), equalTo(expectedSubscriberJmsMessageID));
+
+        // TODO ditto above
+    }
+
+    @Test
+    public void providerAssignedMessageId_PrefixedUuidStringMode_10_09() throws Exception
+    {
+        assumeTrue(EnumSet.of(Protocol.AMQP_1_0).contains(getPublisherProtocolVersion())
+                   && AMQP_PRE010_PROTOCOLS.contains(getSubscriberProtocolVersion()));
+
+        List<ClientMessage> clientResults = performProviderAssignedMessageIdTest(Collections.singletonMap(
+                JMS_MESSAGE_IDPOLICY_MESSAGE_IDTYPE, "PREFIXED_UUID_STRING"));
+
+        ClientMessage publishedMessage = clientResults.get(0);
+        ClientMessage subscriberMessage = clientResults.get(1);
+
+        // On the wire the message-id is a message-id-string containing a UUID with ID: prefix
+        final String publishedJmsMessageID = publishedMessage.getJMSMessageID();
+        assertThat(publishedJmsMessageID, startsWith("ID:"));
+        String barePublishedJmsMessageID = publishedJmsMessageID.substring("ID:".length());
+        String expectedSubscriberJmsMessageID = String.format("ID:%s", barePublishedJmsMessageID);
+        assertThat(subscriberMessage.getJMSMessageID(), equalTo(expectedSubscriberJmsMessageID));
+    }
+
+    @Test
+    public void providerAssignedMessageId_DefaultMode_10_010() throws Exception
+    {
+        assumeTrue(EnumSet.of(Protocol.AMQP_1_0).contains(getPublisherProtocolVersion())
+                   && EnumSet.of(Protocol.AMQP_0_10).contains(getSubscriberProtocolVersion()));
+
+        List<ClientMessage> clientResults = performProviderAssignedMessageIdTest(Collections.emptyMap());
+
+        // On the wire the message-id is a message-id-string comprising an identity of the pubisher + a message
+        // sequence number
+        ClientMessage publishedMessage = clientResults.get(0);
+        ClientMessage subscriberMessage = clientResults.get(1);
+
+        // Conversion layer manufactures an UUID.  This will be unpredictable to the client.
+        assertThat(subscriberMessage.getJMSMessageID(), is(notNullValue()));
+    }
+
+    @Test
+    public void providerAssignedMessageId_UuidMode_10_010() throws Exception
+    {
+        assumeTrue(EnumSet.of(Protocol.AMQP_1_0).contains(getPublisherProtocolVersion())
+                   && EnumSet.of(Protocol.AMQP_0_10).contains(getSubscriberProtocolVersion()));
+
+        List<ClientMessage> clientResults = performProviderAssignedMessageIdTest(Collections.singletonMap(
+                JMS_MESSAGE_IDPOLICY_MESSAGE_IDTYPE, "UUID"));
+
+        ClientMessage publishedMessage = clientResults.get(0);
+        ClientMessage subscriberMessage = clientResults.get(1);
+
+        // On the wire the message-id is a message-id-uuid
+        final String publishedJmsMessageID = publishedMessage.getJMSMessageID();
+        assertThat(publishedJmsMessageID, startsWith("ID:AMQP_UUID:"));
+        String barePublishedJmsMessageID = publishedJmsMessageID.substring("ID:AMQP_UUID:".length());
+        String expectedSubscriberJmsMessageID = String.format("ID:%s", barePublishedJmsMessageID);
+        assertThat(subscriberMessage.getJMSMessageID(), equalTo(expectedSubscriberJmsMessageID));
+    }
+
+    @Test
+    public void providerAssignedMessageId_UuidStringMode_10_010() throws Exception
+    {
+        assumeTrue(EnumSet.of(Protocol.AMQP_1_0).contains(getPublisherProtocolVersion())
+                   && EnumSet.of(Protocol.AMQP_0_10).contains(getSubscriberProtocolVersion()));
+
+        List<ClientMessage> clientResults = performProviderAssignedMessageIdTest(Collections.singletonMap(
+                JMS_MESSAGE_IDPOLICY_MESSAGE_IDTYPE, "UUID_STRING"));
+
+        ClientMessage publishedMessage = clientResults.get(0);
+        ClientMessage subscriberMessage = clientResults.get(1);
+
+        // On the wire the message-id is a message-id-string containing a UUID without prefix
+        final String publishedJmsMessageID = publishedMessage.getJMSMessageID();
+        assertThat(publishedJmsMessageID, startsWith("ID:AMQP_NO_PREFIX:"));
+        String barePublishedJmsMessageID = publishedJmsMessageID.substring("ID:AMQP_NO_PREFIX:".length());
+        String expectedSubscriberJmsMessageID = String.format("ID:%s", barePublishedJmsMessageID);
+        assertThat(subscriberMessage.getJMSMessageID(), equalTo(expectedSubscriberJmsMessageID));
+    }
+
+    @Test
+    @Ignore("Currently subscriber receives a UUID that differs from the one sent")
+    public void providerAssignedMessageId_PrefixedUuidStringMode_10_010() throws Exception
+    {
+        assumeTrue(EnumSet.of(Protocol.AMQP_1_0).contains(getPublisherProtocolVersion())
+                   && EnumSet.of(Protocol.AMQP_0_10).contains(getSubscriberProtocolVersion()));
+
+        List<ClientMessage> clientResults = performProviderAssignedMessageIdTest(Collections.singletonMap(
+                JMS_MESSAGE_IDPOLICY_MESSAGE_IDTYPE, "PREFIXED_UUID_STRING"));
+
+        ClientMessage publishedMessage = clientResults.get(0);
+        ClientMessage subscriberMessage = clientResults.get(1);
+
+        // On the wire the message-id is a message-id-string containing a UUID with ID: prefix
+        final String publishedJmsMessageID = publishedMessage.getJMSMessageID();
+        assertThat(publishedJmsMessageID, startsWith("ID:"));
+        String barePublishedJmsMessageID = publishedJmsMessageID.substring("ID:".length());
+        String expectedSubscriberJmsMessageID = String.format("ID:%s", barePublishedJmsMessageID);
+        assertThat(subscriberMessage.getJMSMessageID(), equalTo(expectedSubscriberJmsMessageID));
+
+        // TODO correct conversion layer so that a string that contains a ID prefixed UUID is converted
+        // as a UUID.
+    }
+
+    @Test
     public void property() throws Exception
     {
         final MessageDescription messageDescription = new MessageDescription();
@@ -297,7 +544,7 @@ public class SimpleConversionTest extends EndToEndConversionTestBase
         performTest(publisherInstructions, subscriberInstructions);
     }
 
-    private void performSimpleTest(final MessageDescription messageDescription) throws Exception
+    private List<ClientResult> performSimpleTest(final MessageDescription messageDescription) throws Exception
     {
         final String destinationJndiName = QUEUE_JNDI_NAME;
         final List<ClientInstruction> publisherInstructions =
@@ -308,17 +555,17 @@ public class SimpleConversionTest extends EndToEndConversionTestBase
                 new ClientInstructionBuilder().configureDestinations(_defaultDestinations)
                                               .receiveMessage(destinationJndiName, messageDescription)
                                               .build();
-        performTest(publisherInstructions,subscriberInstructions);
+        return performTest(publisherInstructions,subscriberInstructions);
     }
 
-    private void performTest(final List<ClientInstruction> publisherInstructions,
-                            final List<ClientInstruction> subscriberInstructions) throws Exception
+    private List<ClientResult> performTest(final List<ClientInstruction> publisherInstructions,
+                                           final List<ClientInstruction> subscriberInstructions) throws Exception
     {
-        final ListenableFuture<?> publisherFuture = runPublisher(publisherInstructions);
-        final ListenableFuture<?> subscriberFuture = runSubscriber(subscriberInstructions);
+        final ListenableFuture<ClientResult> publisherFuture = runPublisher(publisherInstructions);
+        final ListenableFuture<ClientResult> subscriberFuture = runSubscriber(subscriberInstructions);
         try
         {
-            Futures.allAsList(publisherFuture, subscriberFuture).get(TEST_TIMEOUT, TimeUnit.MILLISECONDS);
+            return Futures.allAsList(publisherFuture, subscriberFuture).get(TEST_TIMEOUT, TimeUnit.MILLISECONDS);
         }
         catch (ExecutionException e)
         {
@@ -337,4 +584,32 @@ public class SimpleConversionTest extends EndToEndConversionTestBase
             }
         }
     }
+
+    private List<ClientMessage> performProviderAssignedMessageIdTest(final Map<String, String> publisherConnectionUrlConfig) throws Exception
+    {
+        final MessageDescription messageDescription = new MessageDescription();
+
+        final String destinationJndiName = QUEUE_JNDI_NAME;
+        final List<ClientInstruction> publisherInstructions =
+                new ClientInstructionBuilder().configureConnectionUrl(publisherConnectionUrlConfig)
+                                              .configureDestinations(_defaultDestinations)
+                                              .publishMessage(destinationJndiName, messageDescription)
+                                              .build();
+        final List<ClientInstruction> subscriberInstructions =
+                new ClientInstructionBuilder().configureDestinations(_defaultDestinations)
+                                              .receiveMessage(destinationJndiName, messageDescription)
+                                              .build();
+        List<ClientResult> clientResults = performTest(publisherInstructions, subscriberInstructions);
+        assertThat("Unexpected number of client results", clientResults.size(), equalTo(2));
+
+        ClientResult publishedClientResult = clientResults.get(0);
+        assertThat("Unexpected number of published client messages", publishedClientResult.getClientMessages().size(), equalTo(1));
+        ClientMessage publishedMessage = publishedClientResult.getClientMessages().get(0);
+
+        ClientResult subscriberClientResults = clientResults.get(1);
+        assertThat("Unexpected number of published client messages", subscriberClientResults.getClientMessages().size(), equalTo(1));
+        ClientMessage subscriberMessage = subscriberClientResults.getClientMessages().get(0);
+
+        return Arrays.asList(publishedMessage, subscriberMessage);
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org