You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2017/08/25 15:03:04 UTC
[2/7] qpid-broker-j git commit: QPID-7896: [Java System Tests] Add a
reply-to conversion test and refactor EndToEndConversionTest module
QPID-7896: [Java System Tests] Add a reply-to conversion test and refactor EndToEndConversionTest module
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/0822e1a8
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/0822e1a8
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/0822e1a8
Branch: refs/heads/master
Commit: 0822e1a880283a63a5e8b2c832524b2f64042381
Parents: d12b40a
Author: Lorenz Quack <lq...@apache.org>
Authored: Thu Aug 24 14:49:04 2017 +0100
Committer: Lorenz Quack <lq...@apache.org>
Committed: Fri Aug 25 10:30:40 2017 +0100
----------------------------------------------------------------------
.../ClientInstructionBuilder.java | 109 ++++++++
.../EndToEndConversionTestBase.java | 121 ++++++--
.../JmsInstructionBuilder.java | 107 -------
.../end_to_end_conversion/JmsInstructions.java | 165 -----------
.../LoggingOutputStream.java | 277 -------------------
.../end_to_end_conversion/client/Client.java | 266 ++++++++++++------
.../client/ClientInstruction.java | 27 ++
.../client/ClientInstructions.java | 86 ------
.../client/ConfigureDestination.java | 39 +++
.../client/ConfigureJndiContext.java | 43 +++
.../client/MessageCreator.java | 21 +-
.../client/MessageDescription.java | 144 ++++++++++
.../client/MessageVerifier.java | 18 +-
.../client/MessagingInstruction.java | 68 +++++
.../dependency_resolution/ClasspathQuery.java | 32 ++-
.../utils/LoggingOutputStream.java | 277 +++++++++++++++++++
.../SimpleConversionTest.java | 131 +++++++--
17 files changed, 1135 insertions(+), 796 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0822e1a8/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/ClientInstructionBuilder.java
----------------------------------------------------------------------
diff --git a/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/ClientInstructionBuilder.java b/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/ClientInstructionBuilder.java
new file mode 100644
index 0000000..7c50832
--- /dev/null
+++ b/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/ClientInstructionBuilder.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.systests.end_to_end_conversion;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.qpid.systests.end_to_end_conversion.client.ClientInstruction;
+import org.apache.qpid.systests.end_to_end_conversion.client.ConfigureDestination;
+import org.apache.qpid.systests.end_to_end_conversion.client.MessageDescription;
+import org.apache.qpid.systests.end_to_end_conversion.client.MessagingInstruction;
+
+public class ClientInstructionBuilder
+{
+ private List<ClientInstruction> _clientInstructions = new ArrayList<>();
+ private MessageDescription _latestMessageDescription;
+
+ public ClientInstructionBuilder publishMessage(final String destinationJndiName)
+ {
+ return publishMessage(destinationJndiName, new MessageDescription());
+ }
+
+ public ClientInstructionBuilder publishMessage(final String destinationJndiName,
+ final MessageDescription messageDescription)
+ {
+ _latestMessageDescription = new MessageDescription(messageDescription);
+ _clientInstructions.add(new MessagingInstruction.PublishMessage(destinationJndiName,
+ _latestMessageDescription));
+ return this;
+ }
+
+ public ClientInstructionBuilder receiveMessage(final String destinationJndiName)
+ {
+ return receiveMessage(destinationJndiName, new MessageDescription());
+ }
+
+
+ public ClientInstructionBuilder receiveMessage(final String destinationJndiName,
+ final MessageDescription messageDescription)
+ {
+ _latestMessageDescription = new MessageDescription(messageDescription);
+ _clientInstructions.add(new MessagingInstruction.ReceiveMessage(destinationJndiName,
+ _latestMessageDescription));
+ return this;
+ }
+
+ public ClientInstructionBuilder withMessageType(MessageDescription.MessageType messageType)
+ {
+ _latestMessageDescription.setMessageType(messageType);
+ return this;
+ }
+
+ public ClientInstructionBuilder withMessageContent(Serializable content)
+ {
+ _latestMessageDescription.setContent(content);
+ return this;
+ }
+
+ public ClientInstructionBuilder withHeader(final MessageDescription.MessageHeader header,
+ final Serializable value)
+ {
+ _latestMessageDescription.setHeader(header, value);
+ return this;
+ }
+
+ public ClientInstructionBuilder withProperty(final String property, final Serializable value)
+ {
+ _latestMessageDescription.setProperty(property, value);
+ return this;
+ }
+
+ public ClientInstructionBuilder withReplyToJndiName(final String replyToJndiName)
+ {
+ _latestMessageDescription.setReplyToJndiName(replyToJndiName);
+ return this;
+ }
+
+ public List<ClientInstruction> build()
+ {
+ return _clientInstructions;
+ }
+
+ public ClientInstructionBuilder configureDestinations(final Map<String, String> destinations)
+ {
+ _clientInstructions.add(new ConfigureDestination(destinations));
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0822e1a8/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/EndToEndConversionTestBase.java
----------------------------------------------------------------------
diff --git a/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/EndToEndConversionTestBase.java b/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/EndToEndConversionTestBase.java
index 28b1c6e..da04c66 100644
--- a/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/EndToEndConversionTestBase.java
+++ b/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/EndToEndConversionTestBase.java
@@ -28,6 +28,7 @@ import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
@@ -39,42 +40,39 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.junit.AfterClass;
-import org.junit.Before;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;
+import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.systests.end_to_end_conversion.client.Client;
-import org.apache.qpid.systests.end_to_end_conversion.client.ClientInstructions;
+import org.apache.qpid.systests.end_to_end_conversion.client.ClientInstruction;
import org.apache.qpid.systests.end_to_end_conversion.client.ClientResult;
+import org.apache.qpid.systests.end_to_end_conversion.client.ConfigureDestination;
+import org.apache.qpid.systests.end_to_end_conversion.client.ConfigureJndiContext;
import org.apache.qpid.systests.end_to_end_conversion.dependency_resolution.ClasspathQuery;
+import org.apache.qpid.systests.end_to_end_conversion.utils.LoggingOutputStream;
import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
public class EndToEndConversionTestBase extends BrokerAdminUsingTestBase
{
- public static final String QUEUE_NAME = "queue";
+ public static final String TEMPORARY_QUEUE_JNDI_NAME = "<TEMPORARY>";
public static final int CLIENT_SOCKET_TIMEOUT = 30000;
+ private static final int SERVER_SOCKET_TIMEOUT = 30000;
private static final Logger LOGGER = LoggerFactory.getLogger(EndToEndConversionTestBase.class);
private static final Logger CLIENT_LOGGER = LoggerFactory.getLogger(Client.class);
- private static final int SERVER_SOCKET_TIMEOUT = 30000;
- private ListeningExecutorService _executorService =
+ private final ListeningExecutorService _executorService =
MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
- @Before
- public void setUp()
- {
- getBrokerAdmin().createQueue(QUEUE_NAME);
- }
-
@AfterClass
public static void reportStats()
{
System.out.println("LQDEBUG: " + ClasspathQuery.getCacheStats());
}
- protected ListenableFuture<?> runPublisher(final List<JmsInstructions> jmsInstructions)
+ protected ListenableFuture<?> runPublisher(final List<ClientInstruction> clientInstructions)
{
List<String> gavs = Arrays.asList(System.getProperty("qpid.systests.end_to_end_conversion.publisherGavs",
"org.apache.qpid:qpid-jms-client:LATEST")
@@ -85,11 +83,11 @@ public class EndToEndConversionTestBase extends BrokerAdminUsingTestBase
return _executorService.submit(() -> {
Thread.currentThread().setName("Publisher");
- runClient(gavs, additionalJavaArgs, jmsInstructions);
+ runClient(gavs, additionalJavaArgs, clientInstructions);
});
}
- protected ListenableFuture<?> runSubscriber(final List<JmsInstructions> jmsInstructions)
+ protected ListenableFuture<?> runSubscriber(final List<ClientInstruction> clientInstructions)
{
List<String> gavs = Arrays.asList(System.getProperty("qpid.systests.end_to_end_conversion.subscriberGavs",
"org.apache.qpid:qpid-client:LATEST,org.apache.geronimo.specs:geronimo-jms_1.1_spec:1.1.1")
@@ -101,34 +99,103 @@ public class EndToEndConversionTestBase extends BrokerAdminUsingTestBase
return _executorService.submit(() -> {
Thread.currentThread().setName("Subscriber");
- runClient(gavs, additionalJavaArgs, jmsInstructions);
+ runClient(gavs, additionalJavaArgs, clientInstructions);
});
}
- private ClientInstructions getClientInstructions(final List<JmsInstructions> jmsInstructions,
- final boolean amqp0xClient)
+ private List<ClientInstruction> amendClientInstructions(final List<ClientInstruction> clientInstructions,
+ final boolean amqp0xClient)
{
- ClientInstructions clientInstructions = new ClientInstructions();
- clientInstructions.setJmsInstructions(jmsInstructions);
+ if (clientInstructions.isEmpty())
+ {
+ LOGGER.warn("client instructions are empty!");
+ }
+ else
+ {
+ if (!(clientInstructions.get(0) instanceof ConfigureDestination))
+ {
+ LOGGER.warn(String.format("first client instructions should be a 'ConfigureDestination' but is '%s'!",
+ clientInstructions.get(0).getClass().getSimpleName()));
+ }
+ if (clientInstructions.stream().filter(item -> item instanceof ConfigureJndiContext).count() != 0)
+ {
+ LOGGER.warn("Test should not set a 'ConfigureContext' client instruction!"
+ + " This is set by the base class.");
+ }
+ }
+
+ final String contextFactory;
+ final String connectionUrl;
if (amqp0xClient)
{
- clientInstructions.setContextFactory(getAmqp0xContextFactory());
- clientInstructions.setConnectionUrl(getAmqp0xConnectionUrl());
+ contextFactory = getAmqp0xContextFactory();
+ connectionUrl = getAmqp0xConnectionUrl();
+ }
+ else
+ {
+ contextFactory = getAmqp10ContextFactory();
+ connectionUrl = getAmqp10ConnectionUrl();
+ }
+ ConfigureJndiContext jndiContext = new ConfigureJndiContext(contextFactory, connectionUrl);
+ List<ClientInstruction> instructions = new ArrayList<>();
+ instructions.add(jndiContext);
+ instructions.addAll(clientInstructions);
+ return instructions;
+ }
+
+ protected Protocol getPublisherProtocolVersion()
+ {
+ final String publisherGavs = System.getProperty("qpid.systests.end_to_end_conversion.publisherGavs",
+ "org.apache.qpid:qpid-jms-client:LATEST");
+ final String additionalArgs =
+ System.getProperty("qpid.systests.end_to_end_conversion.publisherAdditionalJavaArguments", "");
+ return getClientProtocolVersion(publisherGavs, additionalArgs);
+ }
+
+ protected Protocol getSubscriberProtocolVersion()
+ {
+ final String publisherGavs = System.getProperty("qpid.systests.end_to_end_conversion.subscriberGavs",
+ "org.apache.qpid:qpid-jms-client:LATEST");
+ final String additionalArgs = System.getProperty(
+ "qpid.systests.end_to_end_conversion.subscriberAdditionalJavaArguments",
+ "-Dqpid.amqp.version=0-9-1");
+ return getClientProtocolVersion(publisherGavs, additionalArgs);
+ }
+
+ private Protocol getClientProtocolVersion(final String publisherGavs, final String additionalArgs)
+ {
+ if (publisherGavs.contains("org.apache.qpid:qpid-jms-client"))
+ {
+ return Protocol.AMQP_1_0;
}
else
{
- clientInstructions.setContextFactory(getAmqp10ContextFactory());
- clientInstructions.setConnectionUrl(getAmqp10ConnectionUrl());
+ if (additionalArgs.contains("0-10"))
+ {
+ return Protocol.AMQP_0_10;
+ }
+ else if (additionalArgs.contains("0-9-1"))
+ {
+ return Protocol.AMQP_0_9_1;
+ }
+ else if (additionalArgs.contains("0-9"))
+ {
+ return Protocol.AMQP_0_9;
+ }
+ else if (additionalArgs.contains("0-8"))
+ {
+ return Protocol.AMQP_0_8;
+ }
}
- clientInstructions.setQueueName(QUEUE_NAME);
- return clientInstructions;
+ throw new RuntimeException("Unable to determine client protocol version");
}
private void runClient(final Collection<String> clientGavs,
final List<String> additionalJavaArguments,
- final List<JmsInstructions> jmsInstructions)
+ final List<ClientInstruction> jmsInstructions)
{
- final ClientInstructions clientInstructions = getClientInstructions(jmsInstructions, isAmqp0xClient(clientGavs));
+ final List<ClientInstruction> clientInstructions = amendClientInstructions(jmsInstructions,
+ isAmqp0xClient(clientGavs));
final ClasspathQuery classpathQuery = new ClasspathQuery(Client.class, clientGavs);
try (final ServerSocket serverSocket = new ServerSocket(0))
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0822e1a8/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/JmsInstructionBuilder.java
----------------------------------------------------------------------
diff --git a/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/JmsInstructionBuilder.java b/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/JmsInstructionBuilder.java
deleted file mode 100644
index 3561ed8..0000000
--- a/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/JmsInstructionBuilder.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.systests.end_to_end_conversion;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-
-public class JmsInstructionBuilder
-{
- private List<JmsInstructions> _jmsInstructions = new ArrayList<>();
- private JmsInstructions.MessageDescription _latestMessageDescription;
-
- public static List<JmsInstructions> publishSingleMessage(final JmsInstructions.MessageDescription messageDescription)
- {
- return new JmsInstructionBuilder().publishMessage(messageDescription).build();
- }
-
- public static List<JmsInstructions> receiveSingleMessage(final JmsInstructions.MessageDescription messageDescription)
- {
- return new JmsInstructionBuilder().receiveMessage(messageDescription).build();
- }
-
- public JmsInstructionBuilder publishMessage()
- {
- return publishMessage(new JmsInstructions.MessageDescription());
- }
-
- public JmsInstructionBuilder publishMessage(final JmsInstructions.MessageDescription messageDescription)
- {
- _latestMessageDescription = messageDescription;
- _jmsInstructions.add(new JmsInstructions.PublishMessage(_latestMessageDescription));
- return this;
- }
-
- public JmsInstructionBuilder withMessageType(JmsInstructions.MessageDescription.MessageType messageType)
- {
- _latestMessageDescription.setMessageType(messageType);
- return this;
- }
-
- public JmsInstructionBuilder withMessageContent(Serializable content)
- {
- _latestMessageDescription.setContent(content);
- return this;
- }
-
- public JmsInstructionBuilder withHeader(final JmsInstructions.MessageDescription.MessageHeader header,
- final Serializable value)
- {
- _latestMessageDescription.setHeader(header, value);
- return this;
- }
-
- public JmsInstructionBuilder withProperty(final String property, final Serializable value)
- {
- _latestMessageDescription.setProperty(property, value);
- return this;
- }
-
- public JmsInstructionBuilder receiveMessage()
- {
- return receiveMessage(new JmsInstructions.MessageDescription());
- }
-
- public JmsInstructionBuilder receiveMessage(final JmsInstructions.MessageDescription messageDescription)
- {
- _latestMessageDescription = messageDescription;
- _jmsInstructions.add(new JmsInstructions.ReceiveMessage(_latestMessageDescription));
- return this;
- }
-
- public JmsInstructionBuilder replyToMessage()
- {
- return replyToMessage(new JmsInstructions.MessageDescription());
- }
-
- public JmsInstructionBuilder replyToMessage(final JmsInstructions.MessageDescription latestMessageDescription)
- {
- _latestMessageDescription = latestMessageDescription;
- _jmsInstructions.add(new JmsInstructions.ReplyToMessage(_latestMessageDescription));
- return this;
- }
-
- public List<JmsInstructions> build()
- {
- return _jmsInstructions;
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0822e1a8/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/JmsInstructions.java
----------------------------------------------------------------------
diff --git a/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/JmsInstructions.java b/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/JmsInstructions.java
deleted file mode 100644
index c3b43ad..0000000
--- a/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/JmsInstructions.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.systests.end_to_end_conversion;
-
-import java.io.Serializable;
-import java.util.HashMap;
-
-public abstract class JmsInstructions implements Serializable
-{
- private final MessageDescription _messageDescription;
-
- public JmsInstructions(final MessageDescription messageDescription)
- {
- _messageDescription = messageDescription;
- }
-
- public MessageDescription getMessageDescription()
- {
- return _messageDescription;
- }
-
- @Override
- public String toString()
- {
- return getClass().getSimpleName() + "{" +
- "_messageDescription=" + _messageDescription +
- '}';
- }
-
- public static class PublishMessage extends JmsInstructions
- {
- PublishMessage(final MessageDescription messageDescription)
- {
- super(messageDescription);
- }
- }
-
- public static class ReceiveMessage extends JmsInstructions
- {
- public ReceiveMessage(final MessageDescription messageDescription)
- {
- super(messageDescription);
- }
- }
-
- public static class ReplyToMessage extends JmsInstructions
- {
- public ReplyToMessage(final MessageDescription messageDescription)
- {
- super(messageDescription);
- }
- }
-
- public static class MessageDescription implements Serializable
- {
- private final HashMap<MessageDescription.MessageHeader, Serializable> _header;
- private final HashMap<String, Serializable> _properties;
- private MessageDescription.MessageType _messageType;
- private Object _content;
- public MessageDescription()
- {
- _header = new HashMap<>();
- _properties = new HashMap<>();
- _messageType = MessageType.MESSAGE;
- }
-
- public MessageDescription.MessageType getMessageType()
- {
- return _messageType;
- }
-
- public void setMessageType(final MessageDescription.MessageType messageType)
- {
- _messageType = messageType;
- }
-
- public Object getContent()
- {
- return _content;
- }
-
- public void setContent(final Object content)
- {
- _content = content;
- }
-
- public HashMap<MessageDescription.MessageHeader, Serializable> getHeaders()
- {
- return _header;
- }
-
- public <T extends Serializable> T getHeader(final MessageDescription.MessageHeader header, final T defaultValue)
- {
- return (T) (_header != null ? _header.getOrDefault(header, defaultValue) : defaultValue);
- }
-
- public void setHeader(final MessageDescription.MessageHeader header, final Serializable value)
- {
- _header.put(header, value);
- }
-
- public HashMap<String, Serializable> getProperties()
- {
- return _properties;
- }
-
- public void setProperty(final String property, final Serializable value)
- {
- _properties.put(property, value);
- }
-
- @Override
- public String toString()
- {
- return "MessageDescription{" +
- "_messageType=" + _messageType +
- ", _content=" + _content +
- ", _header=" + _header +
- ", _properties=" + _properties +
- '}';
- }
-
- public enum MessageType
- {
- MESSAGE,
- BYTES_MESSAGE,
- MAP_MESSAGE,
- OBJECT_MESSAGE,
- STREAM_MESSAGE,
- TEXT_MESSAGE;
- }
-
- public enum MessageHeader
- {
- DESTINATION,
- DELIVERY_MODE,
- MESSAGE_ID,
- TIMESTAMP,
- CORRELATION_ID,
- REPLY_TO,
- REDELIVERED,
- TYPE,
- EXPIRATION,
- PRIORITY
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0822e1a8/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/LoggingOutputStream.java
----------------------------------------------------------------------
diff --git a/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/LoggingOutputStream.java b/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/LoggingOutputStream.java
deleted file mode 100644
index 83940b2..0000000
--- a/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/LoggingOutputStream.java
+++ /dev/null
@@ -1,277 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.systests.end_to_end_conversion;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-import org.slf4j.Logger;
-import org.slf4j.event.Level;
-
-/*
- * Original code by Jim Moore
- * See: https://www.mail-archive.com/user@slf4j.org/msg00673.html
- * Adapted for Qpid needs.
- */
-
-/**
- * An OutputStream that flushes out to a Category.<p>
- * <p/>
- * Note that no data is written out to the Category until the stream is
- * flushed or closed.<p>
- * <p/>
- * Example:<pre>
- * // make sure everything sent to System.err is logged
- * System.setErr(new PrintStream(new
- * LoggingOutputStream(Logger.getRootCategory(),
- * Level.WARN), true));
- * <p/>
- * // make sure everything sent to System.out is also logged
- * System.setOut(new PrintStream(new
- * LoggingOutputStream(Logger.getRootCategory(),
- * Level.INFO), true));
- * </pre>
- *
- * @author <a href="[EMAIL PROTECTED]">Jim Moore</a>
- */
-
-//
-public class LoggingOutputStream extends OutputStream
-{
- /**
- * Platform dependant line separator
- */
- private static final byte[] LINE_SEPARATOR_BYTES = System.getProperty("line.separator").getBytes();
- /**
- * The default number of bytes in the buffer. =2048
- */
- private static final int DEFAULT_BUFFER_LENGTH = 2048;
- /**
- * Used to maintain the contract of [EMAIL PROTECTED] #close()}.
- */
- private boolean hasBeenClosed = false;
- /**
- * The internal buffer where data is stored.
- */
- private byte[] buf;
- /**
- * The number of valid bytes in the buffer. This value is always
- * in the range <tt>0</tt> through <tt>buf.length</tt>; elements
- * <tt>buf[0]</tt> through <tt>buf[count-1]</tt> contain valid
- * byte data.
- */
- private int count;
- /**
- * Remembers the size of the buffer for speed.
- */
- private int bufLength;
- /**
- * The category to write to.
- */
- private Logger logger;
-
- /**
- * The priority to use when writing to the Category.
- */
- private Level level;
-
- /**
- * Creates the LoggingOutputStream to flush to the given Category.
- *
- * @param log the Logger to write to
- * @param level the Level to use when writing to the Logger
- * @throws IllegalArgumentException if cat == null or priority ==
- * null
- */
- public LoggingOutputStream(Logger log, Level level) throws IllegalArgumentException
- {
- if (log == null)
- {
- throw new IllegalArgumentException("cat == null");
- }
- if (level == null)
- {
- throw new IllegalArgumentException("priority == null");
- }
-
- this.level = level;
-
- logger = log;
- bufLength = DEFAULT_BUFFER_LENGTH;
- buf = new byte[DEFAULT_BUFFER_LENGTH];
- count = 0;
- }
-
-
- /**
- * Closes this output stream and releases any system resources
- * associated with this stream. The general contract of
- * <code>close</code>
- * is that it closes the output stream. A closed stream cannot
- * perform
- * output operations and cannot be reopened.
- */
- public void close()
- {
- flush();
- hasBeenClosed = true;
- }
-
-
- /**
- * Writes the specified byte to this output stream. The general
- * contract for <code>write</code> is that one byte is written
- * to the output stream. The byte to be written is the eight
- * low-order bits of the argument <code>b</code>. The 24
- * high-order bits of <code>b</code> are ignored.
- *
- * @param b the <code>byte</code> to write
- * @throws java.io.IOException if an I/O error occurs. In particular, an
- * <code>IOException</code> may be
- * thrown if the output stream has been closed.
- */
- public void write(final int b) throws IOException
- {
- if (hasBeenClosed)
- {
- throw new IOException("The stream has been closed.");
- }
-
- // would this be writing past the buffer?
-
- if (count == bufLength)
- {
- // grow the buffer
- final int newBufLength = bufLength + DEFAULT_BUFFER_LENGTH;
- final byte[] newBuf = new byte[newBufLength];
-
- System.arraycopy(buf, 0, newBuf, 0, bufLength);
- buf = newBuf;
-
- bufLength = newBufLength;
- }
-
- buf[count] = (byte) b;
-
- count++;
-
- if (endsWithNewLine())
- {
- flush();
- }
- }
-
- private boolean endsWithNewLine()
- {
- if (count >= LINE_SEPARATOR_BYTES.length)
- {
- for (int i = 0; i < LINE_SEPARATOR_BYTES.length; i++)
- {
- if (buf[count - LINE_SEPARATOR_BYTES.length + i] != LINE_SEPARATOR_BYTES[i])
- {
- return false;
- }
- }
- return true;
- }
- else
- {
- return false;
- }
- }
-
-
- /**
- * Flushes this output stream and forces any buffered output bytes
- * to be written out. The general contract of <code>flush</code> is
- * that calling it is an indication that, if any bytes previously
- * written have been buffered by the implementation of the output
- * stream, such bytes should immediately be written to their
- * intended destination.
- */
- public void flush()
- {
-
- if (count == 0)
- {
- return;
- }
-
- // don't print out blank lines; flushing from PrintStream puts
-
- // For linux system
-
- if (count == 1 && ((char) buf[0]) == '\n')
- {
- reset();
- return;
- }
-
- // For mac system
-
- if (count == 1 && ((char) buf[0]) == '\r')
- {
- reset();
- return;
- }
-
- // On windows system
-
- if (count == 2 && (char) buf[0] == '\r' && (char) buf[1] == '\n')
- {
- reset();
- return;
- }
-
- while (endsWithNewLine())
- {
- count -= LINE_SEPARATOR_BYTES.length;
- }
- final byte[] theBytes = new byte[count];
- System.arraycopy(buf, 0, theBytes, 0, count);
- final String message = new String(theBytes);
- switch (level)
- {
- case ERROR:
- logger.error(message);
- break;
- case WARN:
- logger.warn(message);
- break;
- case INFO:
- logger.info(message);
- break;
- case DEBUG:
- logger.debug(message);
- break;
- case TRACE:
- logger.trace(message);
- break;
- }
- reset();
- }
-
- private void reset()
- {
- // not resetting the buffer -- assuming that if it grew then it will likely grow similarly again
- count = 0;
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0822e1a8/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/client/Client.java
----------------------------------------------------------------------
diff --git a/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/client/Client.java b/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/client/Client.java
index 9236c1e..fe875f1 100644
--- a/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/client/Client.java
+++ b/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/client/Client.java
@@ -20,12 +20,10 @@
package org.apache.qpid.systests.end_to_end_conversion.client;
-import java.io.ByteArrayOutputStream;
-import java.io.NotSerializableException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
-import java.io.OutputStream;
import java.io.PrintWriter;
+import java.io.Serializable;
import java.io.StringWriter;
import java.net.Socket;
import java.util.Arrays;
@@ -36,6 +34,7 @@ import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
+import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
@@ -44,7 +43,6 @@ import javax.naming.Context;
import javax.naming.InitialContext;
import org.apache.qpid.systests.end_to_end_conversion.EndToEndConversionTestBase;
-import org.apache.qpid.systests.end_to_end_conversion.JmsInstructions;
public class Client
{
@@ -66,15 +64,18 @@ public class Client
final ObjectInputStream inputStream = new ObjectInputStream(socket.getInputStream());
final ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());)
{
- System.out.println(String.format("Connected to controller %d -> %d", socket.getLocalPort(), socket.getPort()));
+ System.out.println(String.format("Connected to controller %d -> %d",
+ socket.getLocalPort(),
+ socket.getPort()));
socket.setSoTimeout(EndToEndConversionTestBase.CLIENT_SOCKET_TIMEOUT);
try
{
final Object o = inputStream.readObject();
- final ClientInstructions instructions;
- if (o instanceof ClientInstructions)
+ final List<ClientInstruction> instructions;
+
+ if (o instanceof List && ((List<?>) o).stream().allMatch(item -> item instanceof ClientInstruction))
{
- instructions = (ClientInstructions) o;
+ instructions = (List<ClientInstruction>) o;
}
else
{
@@ -82,93 +83,108 @@ public class Client
}
System.out.println(String.format("Received instructions : %s", instructions.toString()));
- String contextFactory = instructions.getContextFactory();
- String connectionUrl = instructions.getConnectionUrl();
- String queueName = instructions.getQueueName();
-
- Connection connection = null;
- try
+ if (!instructions.isEmpty())
{
+ String connectionUrl = null;
+ javax.naming.Context context = null;
Hashtable<Object, Object> env = new Hashtable<>();
- env.put(Context.INITIAL_CONTEXT_FACTORY, contextFactory);
- env.put("connectionfactory.myFactoryLookup", connectionUrl);
- env.put("queue.myQueueLookup", queueName);
-
- javax.naming.Context context = new InitialContext(env);
-
- ConnectionFactory factory = (ConnectionFactory) context.lookup("myFactoryLookup");
- Destination queue = (Destination) context.lookup("myQueueLookup");
-
- System.out.println(String.format("Connecting to broker: %s", connectionUrl));
- connection = factory.createConnection();
-
- handleInstructions(connection, queue, instructions.getJmsInstructions());
- }
- finally
- {
- if (connection != null)
+ for (int i = 0; i < instructions.size(); i++)
{
- connection.close();
+ final ClientInstruction instruction = instructions.get(i);
+ if (instruction instanceof ConfigureJndiContext)
+ {
+ env.put(Context.INITIAL_CONTEXT_FACTORY,
+ ((ConfigureJndiContext) instruction).getContextFactory());
+ connectionUrl = ((ConfigureJndiContext) instruction).getConnectionUrl();
+ env.put("connectionfactory.myFactoryLookup", connectionUrl);
+ }
+ else if (instruction instanceof ConfigureDestination)
+ {
+ env.putAll(((ConfigureDestination) instruction).getDestinations());
+ }
+ else
+ {
+ context = new InitialContext(env);
+ ConnectionFactory factory = (ConnectionFactory) context.lookup("myFactoryLookup");
+ System.out.println(String.format("Connecting to broker: %s", connectionUrl));
+ Connection connection = factory.createConnection();
+ try
+ {
+ connection.start();
+ handleInstructions(context, connection, instructions.subList(i, instructions.size()));
+ }
+ finally
+ {
+ connection.close();
+ }
+ break;
+ }
}
}
System.out.println("Finished successfully");
objectOutputStream.writeObject(new ClientResult());
}
+ catch (VerificationException e)
+ {
+ final VerificationException serializableException = new VerificationException(stringifyStacktrace(e));
+ objectOutputStream.writeObject(new ClientResult(serializableException));
+ }
catch (Exception e)
{
- System.out.println("Encountered exception: " + e.getMessage());
- try (OutputStream baos = new ByteArrayOutputStream();
- ObjectOutputStream oos = new ObjectOutputStream(baos))
- {
- oos.writeObject(e);
- objectOutputStream.writeObject(new ClientResult(e));
- }
- catch (NotSerializableException nse)
- {
- StringWriter sw = new StringWriter();
- PrintWriter pw = new PrintWriter(sw);
- e.printStackTrace(pw);
- final RuntimeException serializableException = new RuntimeException(
- "Client failed with non-serializable exception",
- new Exception(sw.toString()));
- objectOutputStream.writeObject(new ClientResult(serializableException));
- }
+ final String stringifiedStacktrace = stringifyStacktrace(e);
+ System.out.println(stringifiedStacktrace);
+ final RuntimeException serializableException =
+ new RuntimeException("Client failed with exception", new Exception(stringifiedStacktrace));
+ objectOutputStream.writeObject(new ClientResult(serializableException));
}
}
catch (Exception e)
{
- System.out.println("Encountered exception: " + e.getMessage());
e.printStackTrace();
+ e.printStackTrace(System.out);
}
}
- private void handleInstructions(final Connection connection,
- final Destination queue,
- final List<JmsInstructions> jmsInstructions) throws Exception
+ private String stringifyStacktrace(final Throwable e)
+ {
+ StringWriter sw = new StringWriter();
+ PrintWriter pw = new PrintWriter(sw);
+ e.printStackTrace(pw);
+ return sw.toString();
+ }
+
+ private void handleInstructions(final Context context,
+ final Connection connection,
+ final List<ClientInstruction> instructions) throws Exception
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
try
{
- for (JmsInstructions jmsInstruction : jmsInstructions)
+ for (ClientInstruction instruction : instructions)
{
- System.out.println(String.format("Process instruction: %s", jmsInstruction));
- if (jmsInstruction instanceof JmsInstructions.PublishMessage)
- {
- publishMessage(session, queue, jmsInstruction.getMessageDescription());
- }
- else if (jmsInstruction instanceof JmsInstructions.ReceiveMessage)
+ System.out.println(String.format("Process instruction: %s", instruction));
+ if (instruction instanceof MessagingInstruction.PublishMessage)
{
- connection.start();
- receiveMessage(session, queue, jmsInstruction.getMessageDescription());
+ final MessagingInstruction.PublishMessage publishInstruction =
+ (MessagingInstruction.PublishMessage) instruction;
+ final Destination destination =
+ (Destination) context.lookup(publishInstruction.getDestinationJndiName());
+ final MessageDescription messageDescription = publishInstruction.getMessageDescription();
+ publishMessage(context, session, destination, messageDescription);
}
- else if (jmsInstruction instanceof JmsInstructions.ReplyToMessage)
+ else if (instruction instanceof MessagingInstruction.ReceiveMessage)
{
- throw new RuntimeException("ReplyTo is not implemented, yet.");
+ final MessagingInstruction.ReceiveMessage receiveInstruction =
+ (MessagingInstruction.ReceiveMessage) instruction;
+ final Destination destination =
+ (Destination) context.lookup(receiveInstruction.getDestinationJndiName());
+ final MessageDescription messageDescription = receiveInstruction.getMessageDescription();
+ receiveMessage(session, destination, messageDescription);
}
else
{
throw new RuntimeException(String.format("Unknown jmsInstruction class: '%s'",
- jmsInstruction.getClass().getName()));
+ instruction.getClass().getName()));
}
}
}
@@ -180,31 +196,59 @@ public class Client
private void receiveMessage(final Session session,
final Destination queue,
- final JmsInstructions.MessageDescription messageDescription) throws Exception
+ final MessageDescription messageDescription) throws Exception
{
+ final Message message;
MessageConsumer consumer = session.createConsumer(queue);
+ try
+ {
+ message = consumer.receive(RECEIVE_TIMEOUT);
+ MessageVerifier.verifyMessage(messageDescription, message);
+ System.out.println(String.format("Received message: %s", message));
+ }
+ finally
+ {
+ consumer.close();
+ }
- final Message message = consumer.receive(RECEIVE_TIMEOUT);
- MessageVerifier.verifyMessage(messageDescription, message);
- System.out.println(String.format("Received message: %s", message));
+ if (message != null && message.getJMSReplyTo() != null)
+ {
+ System.out.println(String.format("Received message had replyTo: %s", message.getJMSReplyTo()));
+ sendReply(session,
+ message.getJMSReplyTo(),
+ messageDescription.getHeader(MessageDescription.MessageHeader.CORRELATION_ID));
+ }
}
-
- private void publishMessage(final Session session,
+ private void publishMessage(final Context context,
+ final Session session,
final Destination queue,
- final JmsInstructions.MessageDescription messageDescription)
- throws Exception
+ final MessageDescription messageDescription) throws Exception
{
+ Message message = MessageCreator.fromMessageDescription(session, messageDescription);
+ Destination replyToDestination = null;
+ if (messageDescription.getReplyToJndiName() != null)
+ {
+ final String replyToJndiName = messageDescription.getReplyToJndiName();
+ if (replyToJndiName.equals(EndToEndConversionTestBase.TEMPORARY_QUEUE_JNDI_NAME))
+ {
+ replyToDestination = session.createTemporaryQueue();
+ }
+ else
+ {
+ replyToDestination = (Destination) context.lookup(replyToJndiName);
+ }
+ message.setJMSReplyTo(replyToDestination);
+ }
MessageProducer messageProducer = session.createProducer(queue);
try
{
- Message message = MessageCreator.fromMessageDescription(session, messageDescription);
messageProducer.send(message,
- messageDescription.getHeader(JmsInstructions.MessageDescription.MessageHeader.DELIVERY_MODE,
+ messageDescription.getHeader(MessageDescription.MessageHeader.DELIVERY_MODE,
DeliveryMode.NON_PERSISTENT),
- messageDescription.getHeader(JmsInstructions.MessageDescription.MessageHeader.PRIORITY,
+ messageDescription.getHeader(MessageDescription.MessageHeader.PRIORITY,
Message.DEFAULT_PRIORITY),
- messageDescription.getHeader(JmsInstructions.MessageDescription.MessageHeader.EXPIRATION,
+ messageDescription.getHeader(MessageDescription.MessageHeader.EXPIRATION,
Message.DEFAULT_TIME_TO_LIVE));
System.out.println(String.format("Sent message: %s", message));
}
@@ -212,5 +256,73 @@ public class Client
{
messageProducer.close();
}
+
+ if (replyToDestination != null)
+ {
+ receiveReply(session,
+ replyToDestination,
+ messageDescription.getHeader(MessageDescription.MessageHeader.CORRELATION_ID));
+ }
+ }
+
+ private void receiveReply(final Session session,
+ final Destination jmsReplyTo,
+ final Serializable expectedCorrelationId)
+ throws Exception
+ {
+ final MessageConsumer consumer = session.createConsumer(jmsReplyTo);
+ try
+ {
+ final Message message = consumer.receive(RECEIVE_TIMEOUT);
+ System.out.println(String.format("Received message: %s", message));
+ if (expectedCorrelationId != null)
+ {
+ if (expectedCorrelationId instanceof byte[])
+ {
+ if (!Arrays.equals((byte[]) expectedCorrelationId, message.getJMSCorrelationIDAsBytes()))
+ {
+ throw new VerificationException("ReplyTo message has unexpected correlationId.");
+ }
+ }
+ else
+ {
+ if (!expectedCorrelationId.equals(message.getJMSCorrelationID()))
+ {
+ throw new VerificationException("ReplyTo message has unexpected correlationId.");
+ }
+ }
+ }
+ }
+ finally
+ {
+ consumer.close();
+ }
+ }
+
+ private void sendReply(final Session session, final Destination jmsReplyTo, final Serializable correlationId)
+ throws JMSException
+ {
+ final Message replyToMessage = session.createMessage();
+ if (correlationId != null)
+ {
+ if (correlationId instanceof byte[])
+ {
+ replyToMessage.setJMSCorrelationIDAsBytes((byte[]) correlationId);
+ }
+ else
+ {
+ replyToMessage.setJMSCorrelationID((String) correlationId);
+ }
+ }
+ System.out.println(String.format("Sending reply message: %s", replyToMessage));
+ MessageProducer producer = session.createProducer(jmsReplyTo);
+ try
+ {
+ producer.send(replyToMessage);
+ }
+ finally
+ {
+ producer.close();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0822e1a8/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/client/ClientInstruction.java
----------------------------------------------------------------------
diff --git a/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/client/ClientInstruction.java b/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/client/ClientInstruction.java
new file mode 100644
index 0000000..1d49d35
--- /dev/null
+++ b/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/client/ClientInstruction.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.systests.end_to_end_conversion.client;
+
+import java.io.Serializable;
+
+public interface ClientInstruction extends Serializable
+{
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0822e1a8/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/client/ClientInstructions.java
----------------------------------------------------------------------
diff --git a/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/client/ClientInstructions.java b/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/client/ClientInstructions.java
deleted file mode 100644
index 791eed3..0000000
--- a/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/client/ClientInstructions.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.systests.end_to_end_conversion.client;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.qpid.systests.end_to_end_conversion.JmsInstructions;
-
-public class ClientInstructions implements Serializable
-{
- private String _contextFactory;
- private String _connectionUrl;
- private String _queueName;
- private ArrayList<JmsInstructions> _jmsInstructions;
-
- public String getContextFactory()
- {
- return _contextFactory;
- }
-
- public void setContextFactory(final String contextFactory)
- {
- _contextFactory = contextFactory;
- }
-
- public String getConnectionUrl()
- {
- return _connectionUrl;
- }
-
- public void setConnectionUrl(final String connectionUrl)
- {
- _connectionUrl = connectionUrl;
- }
-
- public String getQueueName()
- {
- return _queueName;
- }
-
- public void setQueueName(final String queueName)
- {
- _queueName = queueName;
- }
-
- public List<JmsInstructions> getJmsInstructions()
- {
- return _jmsInstructions;
- }
-
- public void setJmsInstructions(final List<JmsInstructions> jmsInstructions)
- {
- _jmsInstructions = new ArrayList<>(jmsInstructions);
- }
-
- @Override
- public String toString()
- {
- return "ClientInstructions{" +
- "_contextFactory='" + _contextFactory + '\'' +
- ", _connectionUrl='" + _connectionUrl + '\'' +
- ", _queueName='" + _queueName + '\'' +
- ", _jmsInstructions=" + _jmsInstructions +
- '}';
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0822e1a8/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/client/ConfigureDestination.java
----------------------------------------------------------------------
diff --git a/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/client/ConfigureDestination.java b/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/client/ConfigureDestination.java
new file mode 100644
index 0000000..e804d64
--- /dev/null
+++ b/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/client/ConfigureDestination.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.systests.end_to_end_conversion.client;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class ConfigureDestination implements ClientInstruction
+{
+ private final HashMap<String, String> _destinations;
+
+ public ConfigureDestination(final Map<String, String> destinations)
+ {
+ _destinations = new HashMap<>(destinations);
+ }
+
+ public HashMap<String, String> getDestinations()
+ {
+ return _destinations;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0822e1a8/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/client/ConfigureJndiContext.java
----------------------------------------------------------------------
diff --git a/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/client/ConfigureJndiContext.java b/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/client/ConfigureJndiContext.java
new file mode 100644
index 0000000..e45dbb7
--- /dev/null
+++ b/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/client/ConfigureJndiContext.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.systests.end_to_end_conversion.client;
+
+public class ConfigureJndiContext implements ClientInstruction
+{
+ private final String _contextFactory;
+ private final String _connectionUrl;
+
+ public ConfigureJndiContext(final String contextFactory, final String connectionUrl)
+ {
+ _contextFactory = contextFactory;
+ _connectionUrl = connectionUrl;
+ }
+
+ public String getContextFactory()
+ {
+ return _contextFactory;
+ }
+
+ public String getConnectionUrl()
+ {
+ return _connectionUrl;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0822e1a8/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/client/MessageCreator.java
----------------------------------------------------------------------
diff --git a/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/client/MessageCreator.java b/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/client/MessageCreator.java
index 669b941..8028c33 100644
--- a/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/client/MessageCreator.java
+++ b/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/client/MessageCreator.java
@@ -33,12 +33,10 @@ import javax.jms.Message;
import javax.jms.Session;
import javax.jms.StreamMessage;
-import org.apache.qpid.systests.end_to_end_conversion.JmsInstructions;
-
public class MessageCreator
{
public static Message fromMessageDescription(final Session session,
- final JmsInstructions.MessageDescription messageDescription)
+ final MessageDescription messageDescription)
throws Exception
{
Message message = createMessage(messageDescription, session);
@@ -47,7 +45,7 @@ public class MessageCreator
return message;
}
- private static void setProperties(final JmsInstructions.MessageDescription messageDescription,
+ private static void setProperties(final MessageDescription messageDescription,
final Message message)
{
final HashMap<String, Serializable> properties = messageDescription.getProperties();
@@ -71,7 +69,7 @@ public class MessageCreator
}
}
- private static Message createMessage(final JmsInstructions.MessageDescription messageDescription,
+ private static Message createMessage(final MessageDescription messageDescription,
final Session session)
throws Exception
{
@@ -81,7 +79,7 @@ public class MessageCreator
switch (messageDescription.getMessageType())
{
case MESSAGE:
- message = session.createMessage();
+ message = session.createTextMessage();
break;
case BYTES_MESSAGE:
message = session.createBytesMessage();
@@ -121,10 +119,11 @@ public class MessageCreator
return message;
}
- private static void setJmsHeader(final JmsInstructions.MessageDescription messageDescription, final Message message)
+ private static void setJmsHeader(final MessageDescription messageDescription,
+ final Message message)
throws JMSException
{
- final HashMap<JmsInstructions.MessageDescription.MessageHeader, Serializable> header =
+ final HashMap<MessageDescription.MessageHeader, Serializable> header =
messageDescription.getHeaders();
if (header == null)
@@ -132,7 +131,7 @@ public class MessageCreator
return;
}
- for (Map.Entry<JmsInstructions.MessageDescription.MessageHeader, Serializable> entry : header.entrySet())
+ for (Map.Entry<MessageDescription.MessageHeader, Serializable> entry : header.entrySet())
{
try
{
@@ -161,8 +160,8 @@ public class MessageCreator
}
break;
case REPLY_TO:
- message.setJMSReplyTo((Destination) entry.getValue());
- break;
+ throw new RuntimeException("The Test should not set the replyTo header."
+ + " It should rather use the dedicated method");
case REDELIVERED:
message.setJMSRedelivered((Boolean) entry.getValue());
break;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0822e1a8/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/client/MessageDescription.java
----------------------------------------------------------------------
diff --git a/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/client/MessageDescription.java b/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/client/MessageDescription.java
new file mode 100644
index 0000000..ce62da6
--- /dev/null
+++ b/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/client/MessageDescription.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.systests.end_to_end_conversion.client;
+
+import java.io.Serializable;
+import java.util.HashMap;
+
+public class MessageDescription implements Serializable
+{
+ private final HashMap<MessageHeader, Serializable> _header;
+ private final HashMap<String, Serializable> _properties;
+ private MessageType _messageType;
+ private Object _content;
+ private String _replyToJndiName;
+
+ public MessageDescription()
+ {
+ _header = new HashMap<>();
+ _properties = new HashMap<>();
+ _messageType = MessageType.MESSAGE;
+ }
+
+ public MessageDescription(final MessageDescription other)
+ {
+ _header = new HashMap<>(other._header);
+ _properties = new HashMap<>(other._properties);
+ _messageType = other._messageType;
+ _content = other._content;
+ }
+
+ public MessageType getMessageType()
+ {
+ return _messageType;
+ }
+
+ public void setMessageType(final MessageType messageType)
+ {
+ _messageType = messageType;
+ }
+
+ public Object getContent()
+ {
+ return _content;
+ }
+
+ public void setContent(final Object content)
+ {
+ _content = content;
+ }
+
+ public HashMap<MessageHeader, Serializable> getHeaders()
+ {
+ return new HashMap<>(_header);
+ }
+
+ public <T extends Serializable> T getHeader(final MessageHeader header, final T defaultValue)
+ {
+ return (T) (_header != null ? _header.getOrDefault(header, defaultValue) : defaultValue);
+ }
+
+ public <T extends Serializable> T getHeader(final MessageHeader header)
+ {
+ return getHeader(header, null);
+ }
+
+ public void setHeader(final MessageHeader header, final Serializable value)
+ {
+ _header.put(header, value);
+ }
+
+ public HashMap<String, Serializable> getProperties()
+ {
+ return new HashMap<>(_properties);
+ }
+
+ public void setProperty(final String property, final Serializable value)
+ {
+ _properties.put(property, value);
+ }
+
+ public String getReplyToJndiName()
+ {
+ return _replyToJndiName;
+ }
+
+ public void setReplyToJndiName(final String replyToJndiName)
+ {
+ _replyToJndiName = replyToJndiName;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "MessageDescription{" +
+ "_header=" + _header +
+ ", _properties=" + _properties +
+ ", _messageType=" + _messageType +
+ ", _content=" + _content +
+ ", _replyToJndiName='" + _replyToJndiName + '\'' +
+ '}';
+ }
+
+ public enum MessageType
+ {
+ MESSAGE,
+ BYTES_MESSAGE,
+ MAP_MESSAGE,
+ OBJECT_MESSAGE,
+ STREAM_MESSAGE,
+ TEXT_MESSAGE;
+ }
+
+ public enum MessageHeader
+ {
+ DESTINATION,
+ DELIVERY_MODE,
+ MESSAGE_ID,
+ TIMESTAMP,
+ CORRELATION_ID,
+ REPLY_TO,
+ REDELIVERED,
+ TYPE,
+ EXPIRATION,
+ PRIORITY
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0822e1a8/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/client/MessageVerifier.java
----------------------------------------------------------------------
diff --git a/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/client/MessageVerifier.java b/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/client/MessageVerifier.java
index 63cb89c..079c893 100644
--- a/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/client/MessageVerifier.java
+++ b/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/client/MessageVerifier.java
@@ -20,8 +20,6 @@
package org.apache.qpid.systests.end_to_end_conversion.client;
-import static java.nio.charset.StandardCharsets.UTF_8;
-
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
@@ -41,11 +39,9 @@ import javax.jms.StreamMessage;
import javax.jms.TextMessage;
import javax.xml.bind.DatatypeConverter;
-import org.apache.qpid.systests.end_to_end_conversion.JmsInstructions;
-
public class MessageVerifier
{
- public static void verifyMessage(final JmsInstructions.MessageDescription messageDescription, final Message message)
+ public static void verifyMessage(final MessageDescription messageDescription, final Message message)
throws VerificationException
{
verifyNotNull("No message received", message);
@@ -54,10 +50,10 @@ public class MessageVerifier
verifyMessageProperties(messageDescription, message);
}
- private static void verifyMessageTypeAndContent(final JmsInstructions.MessageDescription messageDescription,
+ private static void verifyMessageTypeAndContent(final MessageDescription messageDescription,
final Message message) throws VerificationException
{
- final JmsInstructions.MessageDescription.MessageType messageType = messageDescription.getMessageType();
+ final MessageDescription.MessageType messageType = messageDescription.getMessageType();
Object expectedMessageContent = messageDescription.getContent();
Serializable actualContent;
Class<? extends Message> expectedMessageClass;
@@ -122,13 +118,13 @@ public class MessageVerifier
}
}
- private static void verifyMessageHeaders(final JmsInstructions.MessageDescription messageDescription,
+ private static void verifyMessageHeaders(final MessageDescription messageDescription,
final Message message) throws VerificationException
{
try
{
- for (Map.Entry<JmsInstructions.MessageDescription.MessageHeader, Serializable> entry : messageDescription.getHeaders()
- .entrySet())
+ for (Map.Entry<MessageDescription.MessageHeader, Serializable> entry : messageDescription.getHeaders()
+ .entrySet())
{
Object actualValue;
@@ -186,7 +182,7 @@ public class MessageVerifier
}
}
- private static void verifyMessageProperties(final JmsInstructions.MessageDescription messageDescription,
+ private static void verifyMessageProperties(final MessageDescription messageDescription,
final Message message) throws VerificationException
{
try
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0822e1a8/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/client/MessagingInstruction.java
----------------------------------------------------------------------
diff --git a/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/client/MessagingInstruction.java b/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/client/MessagingInstruction.java
new file mode 100644
index 0000000..2cd555e
--- /dev/null
+++ b/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/client/MessagingInstruction.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.systests.end_to_end_conversion.client;
+
+public abstract class MessagingInstruction implements ClientInstruction
+{
+ private final MessageDescription _messageDescription;
+ private final String _destinationJndiName;
+
+ public MessagingInstruction(final String destinationJndiName, final MessageDescription messageDescription)
+ {
+ _messageDescription = messageDescription;
+ _destinationJndiName = destinationJndiName;
+ }
+
+ public MessageDescription getMessageDescription()
+ {
+ return _messageDescription;
+ }
+
+ public String getDestinationJndiName()
+ {
+ return _destinationJndiName;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "MessagingInstruction{" +
+ "_messageDescription=" + _messageDescription +
+ ", _destinationJndiName='" + _destinationJndiName + '\'' +
+ '}';
+ }
+
+ public static class PublishMessage extends MessagingInstruction
+ {
+ public PublishMessage(final String destinationJndiName, final MessageDescription messageDescription)
+ {
+ super(destinationJndiName, messageDescription);
+ }
+ }
+
+ public static class ReceiveMessage extends MessagingInstruction
+ {
+ public ReceiveMessage(final String destinationJndiName, final MessageDescription messageDescription)
+ {
+ super(destinationJndiName, messageDescription);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0822e1a8/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/dependency_resolution/ClasspathQuery.java
----------------------------------------------------------------------
diff --git a/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/dependency_resolution/ClasspathQuery.java b/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/dependency_resolution/ClasspathQuery.java
index 78c8736..2337317 100644
--- a/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/dependency_resolution/ClasspathQuery.java
+++ b/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/dependency_resolution/ClasspathQuery.java
@@ -52,6 +52,7 @@ public class ClasspathQuery
private static final LoadingCache<Collection<String>, List<File>> _classpathCache;
private static final RepositorySystem _mavenRepositorySystem;
private static final RepositorySystemSession _mavenRepositorySession;
+
static
{
_mavenRepositorySystem = Booter.newRepositorySystem();
@@ -68,6 +69,7 @@ public class ClasspathQuery
}
});
}
+
private final Class<?> _clientClass;
private final Collection<String> _clientGavs;
@@ -78,21 +80,6 @@ public class ClasspathQuery
_clientGavs = gavs;
}
- public Class<?> getClientClass()
- {
- return _clientClass;
- }
-
- public Collection<String> getClientGavs()
- {
- return _clientGavs;
- }
-
- public String getClasspath()
- {
- return buildClassPath(_clientClass, _clientGavs);
- }
-
public static String getCacheStats()
{
return _classpathCache.stats().toString();
@@ -148,6 +135,21 @@ public class ClasspathQuery
return jars;
}
+ public Class<?> getClientClass()
+ {
+ return _clientClass;
+ }
+
+ public Collection<String> getClientGavs()
+ {
+ return _clientGavs;
+ }
+
+ public String getClasspath()
+ {
+ return buildClassPath(_clientClass, _clientGavs);
+ }
+
private String buildClassPath(final Class<?> clientClazz, final Collection<String> gavs)
{
List<File> classpathElements = _classpathCache.getUnchecked(gavs);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org