You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/11/29 14:22:31 UTC
[2/2] qpid-broker-j git commit: QPID-6933: [System Tests] Move
StreamMessageTest into JMS 1.1 system tests
QPID-6933: [System Tests] Move StreamMessageTest into JMS 1.1 system tests
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/1298c891
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/1298c891
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/1298c891
Branch: refs/heads/master
Commit: 1298c89140edf118ab4bfaf96e71dd411405a4de
Parents: 2344478
Author: Alex Rudyy <or...@apache.org>
Authored: Wed Nov 29 14:22:13 2017 +0000
Committer: Alex Rudyy <or...@apache.org>
Committed: Wed Nov 29 14:22:13 2017 +0000
----------------------------------------------------------------------
.../org/apache/qpid/systests/JmsProvider.java | 2 +-
.../org/apache/qpid/systests/JmsTestBase.java | 23 +++
.../qpid/systests/QpidJmsClient0xProvider.java | 2 +-
.../qpid/systests/QpidJmsClientProvider.java | 2 +-
.../jms_1_1/message/StreamMessageTest.java | 164 +++++++++++++++++++
.../qpid/test/utils/QpidBrokerTestCase.java | 4 +-
.../test/unit/message/StreamMessageTest.java | 146 -----------------
7 files changed, 192 insertions(+), 151 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1298c891/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/JmsProvider.java
----------------------------------------------------------------------
diff --git a/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/JmsProvider.java b/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/JmsProvider.java
index 6686b2a..beece11 100644
--- a/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/JmsProvider.java
+++ b/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/JmsProvider.java
@@ -38,7 +38,7 @@ public interface JmsProvider
Queue getQueueFromName(Session session, String name) throws JMSException;
- Queue createTestQueue(Session session, String queueName) throws JMSException;
+ Queue createQueue(Session session, String queueName) throws JMSException;
Topic getTestTopic(String testQueueName) throws NamingException;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1298c891/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/JmsTestBase.java
----------------------------------------------------------------------
diff --git a/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/JmsTestBase.java b/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/JmsTestBase.java
index ca3be01..44ad555 100644
--- a/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/JmsTestBase.java
+++ b/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/JmsTestBase.java
@@ -25,6 +25,7 @@ import java.util.Map;
import javax.jms.Connection;
import javax.jms.JMSException;
+import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.Topic;
import javax.naming.NamingException;
@@ -139,4 +140,26 @@ public abstract class JmsTestBase extends BrokerAdminUsingTestBase
connection.close();
}
}
+
+ protected Queue createQueue(final String queueName) throws Exception
+ {
+ Connection connection = getConnection();
+ try
+ {
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ try
+ {
+ return _jmsProvider.createQueue(session, queueName);
+ }
+ finally
+ {
+ session.close();
+ }
+ }
+ finally
+ {
+ connection.close();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1298c891/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/QpidJmsClient0xProvider.java
----------------------------------------------------------------------
diff --git a/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/QpidJmsClient0xProvider.java b/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/QpidJmsClient0xProvider.java
index 82549c0..89de60a 100644
--- a/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/QpidJmsClient0xProvider.java
+++ b/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/QpidJmsClient0xProvider.java
@@ -70,7 +70,7 @@ public class QpidJmsClient0xProvider implements JmsProvider
}
@Override
- public Queue createTestQueue(Session session, String queueName) throws JMSException
+ public Queue createQueue(Session session, String queueName) throws JMSException
{
Queue amqQueue = null;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1298c891/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/QpidJmsClientProvider.java
----------------------------------------------------------------------
diff --git a/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/QpidJmsClientProvider.java b/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/QpidJmsClientProvider.java
index 9a22204..d8af7f6 100644
--- a/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/QpidJmsClientProvider.java
+++ b/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/QpidJmsClientProvider.java
@@ -61,7 +61,7 @@ public class QpidJmsClientProvider implements JmsProvider
}
@Override
- public Queue createTestQueue(Session session, String queueName) throws JMSException
+ public Queue createQueue(Session session, String queueName) throws JMSException
{
_managementFacade.createEntityUsingAmqpManagement(queueName, session, "org.apache.qpid.Queue");
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1298c891/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/message/StreamMessageTest.java
----------------------------------------------------------------------
diff --git a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/message/StreamMessageTest.java b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/message/StreamMessageTest.java
new file mode 100644
index 0000000..625f372
--- /dev/null
+++ b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/message/StreamMessageTest.java
@@ -0,0 +1,164 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.systests.jms_1_1.message;
+
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageEOFException;
+import javax.jms.MessageListener;
+import javax.jms.MessageNotWriteableException;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.StreamMessage;
+
+import junit.framework.TestCase;
+import org.junit.Test;
+
+import org.apache.qpid.systests.JmsTestBase;
+
+public class StreamMessageTest extends JmsTestBase
+{
+ @Test
+ public void testStreamMessageEOF() throws Exception
+ {
+ Queue queue = createQueue(getTestName());
+ Connection consumerConnection = getConnection();
+ try
+ {
+ Session consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageConsumer consumer = consumerSession.createConsumer(queue);
+
+ Connection producerConnection = getConnection();
+ try
+ {
+ Session producerSession = producerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageProducer producer = producerSession.createProducer(queue);
+ StreamMessage msg = producerSession.createStreamMessage();
+ msg.writeByte((byte) 42);
+ producer.send(msg);
+
+ consumerConnection.start();
+
+ Message receivedMessage = consumer.receive(getReceiveTimeout());
+ assertTrue(receivedMessage instanceof StreamMessage);
+ StreamMessage streamMessage = (StreamMessage)receivedMessage;
+ streamMessage.readByte();
+ try
+ {
+ streamMessage.readByte();
+ fail("Expected exception not thrown");
+ }
+ catch (Exception e)
+ {
+ assertTrue("Expected MessageEOFException: " + e, e instanceof MessageEOFException);
+ }
+
+ try
+ {
+ streamMessage.writeByte((byte) 42);
+ fail("Expected exception not thrown");
+ }
+ catch (MessageNotWriteableException e)
+ {
+ // pass
+ }
+ }
+ finally
+ {
+ producerConnection.close();
+ }
+ }
+ finally
+ {
+ consumerConnection.close();
+ }
+ }
+
+ @Test
+ public void testModifyReceivedMessageContent() throws Exception
+ {
+ Queue queue = createQueue(getTestName());
+ final CountDownLatch awaitMessages = new CountDownLatch(1);
+ final AtomicReference<Throwable> listenerCaughtException = new AtomicReference<Throwable>();
+
+ Connection consumerConnection = getConnection();
+ try
+ {
+ Session session = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ session.close();
+
+ Session consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageConsumer consumer = consumerSession.createConsumer(queue);
+ consumer.setMessageListener(message -> {
+ final StreamMessage sm = (StreamMessage) message;
+ try
+ {
+ sm.clearBody();
+ // it is legal to extend a stream message's content
+ sm.writeString("dfgjshfslfjshflsjfdlsjfhdsljkfhdsljkfhsd");
+ }
+ catch (Throwable t)
+ {
+ listenerCaughtException.set(t);
+ }
+ finally
+ {
+ awaitMessages.countDown();
+ }
+ });
+
+ Connection producerConnection = getConnection();
+ try
+ {
+ Session producerSession = producerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageProducer producer = producerSession.createProducer(queue);
+
+ StreamMessage message = producerSession.createStreamMessage();
+ message.writeInt(42);
+ producer.send(message);
+
+ consumerConnection.start();
+ assertTrue("Message did not arrive with consumer within a reasonable time",
+ awaitMessages.await(getReceiveTimeout(), TimeUnit.SECONDS));
+ assertNull("No exception should be caught by listener : " + listenerCaughtException.get(),
+ listenerCaughtException.get());
+ }
+ finally
+ {
+ producerConnection.close();
+ }
+ }
+ finally
+ {
+ consumerConnection.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1298c891/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
----------------------------------------------------------------------
diff --git a/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java b/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
index c0297c8..7c7aaa1 100755
--- a/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
+++ b/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
@@ -301,12 +301,12 @@ public class QpidBrokerTestCase extends QpidTestCase
public Queue createTestQueue(Session session) throws JMSException
{
- return _jmsProvider.createTestQueue(session, getTestQueueName());
+ return _jmsProvider.createQueue(session, getTestQueueName());
}
public Queue createTestQueue(Session session, String queueName) throws JMSException
{
- return _jmsProvider.createTestQueue(session, queueName);
+ return _jmsProvider.createQueue(session, queueName);
}
/**
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1298c891/systests/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java b/systests/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java
deleted file mode 100644
index e59ada9..0000000
--- a/systests/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.test.unit.message;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
-import javax.jms.Connection;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageEOFException;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.StreamMessage;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-
-/**
- * @author Apache Software Foundation
- */
-public class StreamMessageTest extends QpidBrokerTestCase
-{
- private static final Logger LOGGER = LoggerFactory.getLogger(StreamMessageTest.class);
-
- public void testStreamMessageEOF() throws Exception
- {
- Connection con = getConnection();
- Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- Queue queue = createTestQueue(consumerSession);
- MessageConsumer consumer = consumerSession.createConsumer(queue);
-
- Connection con2 = getConnection();
-
- Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-
- // Need to start the "producer" connection in order to receive bounced messages
- LOGGER.info("Starting producer connection");
- con2.start();
-
- MessageProducer mandatoryProducer = producerSession.createProducer(queue);
-
- // Third test - should be routed
- StreamMessage msg = producerSession.createStreamMessage();
-
- msg.writeByte((byte) 42);
-
- mandatoryProducer.send(msg);
-
- LOGGER.info("Starting consumer connection");
- con.start();
-
- StreamMessage msg2 = (StreamMessage) consumer.receive(2000);
- assertNotNull(msg2);
-
- msg2.readByte();
- try
- {
- msg2.readByte();
- fail("Expected exception not thrown");
- }
- catch (Exception e)
- {
- assertTrue("Expected MessageEOFException: " + e, e instanceof MessageEOFException);
- }
- con.close();
- con2.close();
- }
-
- public void testModifyReceivedMessageExpandsBuffer() throws Exception
- {
- final CountDownLatch awaitMessages = new CountDownLatch(1);
- final AtomicReference<Throwable> listenerCaughtException = new AtomicReference<Throwable>();
-
- Connection con = getConnection();
- Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Queue queue = createTestQueue(session);
- session.close();
-
- Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- MessageConsumer consumer = consumerSession.createConsumer(queue);
- consumer.setMessageListener(new MessageListener()
- {
-
- @Override
- public void onMessage(Message message)
- {
- final StreamMessage sm = (StreamMessage) message;
- try
- {
- sm.clearBody();
- // it is legal to extend a stream message's content
- sm.writeString("dfgjshfslfjshflsjfdlsjfhdsljkfhdsljkfhsd");
- }
- catch (Throwable t)
- {
- listenerCaughtException.set(t);
- }
- finally
- {
- awaitMessages.countDown();
- }
- }
- });
-
- Connection con2 = getConnection();
- Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- MessageProducer producer = producerSession.createProducer(queue);
- con.start();
- StreamMessage sm = producerSession.createStreamMessage();
- sm.writeInt(42);
- producer.send(sm);
-
- // Allow up to five seconds for the message to arrive with the consumer
- final boolean completed = awaitMessages.await(5, TimeUnit.SECONDS);
- assertTrue("Message did not arrive with consumer within a reasonable time", completed);
- final Throwable listenerException = listenerCaughtException.get();
- assertNull("No exception should be caught by listener : " + listenerException, listenerException);
-
- con.close();
- con2.close();
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org