You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/06/18 14:42:29 UTC
svn commit: r786040 [4/6] - in /activemq/sandbox/activemq-flow:
activemq-all/src/test/java/org/apache/activemq/legacy/
activemq-all/src/test/java/org/apache/activemq/legacy/broker/
activemq-all/src/test/java/org/apache/activemq/legacy/broker/advisory/ ...
Added: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JMSIndividualAckTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JMSIndividualAckTest.java?rev=786040&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JMSIndividualAckTest.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JMSIndividualAckTest.java Thu Jun 18 12:42:23 2009
@@ -0,0 +1,146 @@
+package org.apache.activemq.legacy.test2;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQSession;
+
+/**
+ * @version $Revision: 1.4 $
+ */
+public class JMSIndividualAckTest extends TestSupport {
+
+ private Connection connection;
+
+ protected void setUp() throws Exception {
+ super.setUp();
+ connection = createConnection();
+ }
+
+ /**
+ * @see junit.framework.TestCase#tearDown()
+ */
+ protected void tearDown() throws Exception {
+ if (connection != null) {
+ connection.close();
+ connection = null;
+ }
+ super.tearDown();
+ }
+
+ /**
+ * Tests if acknowledged messages are being consumed.
+ *
+ * @throws JMSException
+ */
+ public void testAckedMessageAreConsumed() throws JMSException {
+ connection.start();
+ Session session = connection.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
+ Queue queue = session.createQueue(getQueueName());
+ MessageProducer producer = session.createProducer(queue);
+ producer.send(session.createTextMessage("Hello"));
+
+ // Consume the message...
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message msg = consumer.receive(1000);
+ assertNotNull(msg);
+ msg.acknowledge();
+
+ // Reset the session.
+ session.close();
+ session = connection.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
+
+ // Attempt to Consume the message...
+ consumer = session.createConsumer(queue);
+ msg = consumer.receive(1000);
+ assertNull(msg);
+
+ session.close();
+ }
+
+ /**
+ * Tests if acknowledged messages are being consumed.
+ *
+ * @throws JMSException
+ */
+ public void testLastMessageAcked() throws JMSException {
+ connection.start();
+ Session session = connection.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
+ Queue queue = session.createQueue(getQueueName());
+ MessageProducer producer = session.createProducer(queue);
+ TextMessage msg1 = session.createTextMessage("msg1");
+ TextMessage msg2 = session.createTextMessage("msg2");
+ TextMessage msg3 = session.createTextMessage("msg3");
+ producer.send(msg1);
+ producer.send(msg2);
+ producer.send(msg3);
+
+ // Consume the message...
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message msg = consumer.receive(1000);
+ assertNotNull(msg);
+ msg = consumer.receive(1000);
+ assertNotNull(msg);
+ msg = consumer.receive(1000);
+ assertNotNull(msg);
+ msg.acknowledge();
+
+ // Reset the session.
+ session.close();
+ session = connection.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
+
+ // Attempt to Consume the message...
+ consumer = session.createConsumer(queue);
+ msg = consumer.receive(1000);
+ assertNotNull(msg);
+ assertEquals(msg1,msg);
+ msg = consumer.receive(1000);
+ assertNotNull(msg);
+ assertEquals(msg2,msg);
+ msg = consumer.receive(1000);
+ assertNull(msg);
+ session.close();
+ }
+
+ /**
+ * Tests if unacknowledged messages are being re-delivered when the consumer connects again.
+ *
+ * @throws JMSException
+ */
+ public void testUnAckedMessageAreNotConsumedOnSessionClose() throws JMSException {
+ connection.start();
+ Session session = connection.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
+ Queue queue = session.createQueue(getQueueName());
+ MessageProducer producer = session.createProducer(queue);
+ producer.send(session.createTextMessage("Hello"));
+
+ // Consume the message...
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message msg = consumer.receive(1000);
+ assertNotNull(msg);
+ // Don't ack the message.
+
+ // Reset the session. This should cause the unacknowledged message to be re-delivered.
+ session.close();
+ session = connection.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
+
+ // Attempt to Consume the message...
+ consumer = session.createConsumer(queue);
+ msg = consumer.receive(2000);
+ assertNotNull(msg);
+ msg.acknowledge();
+
+ session.close();
+ }
+
+ protected String getQueueName() {
+ return getClass().getName() + "." + getName();
+ }
+
+}
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JMSIndividualAckTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JMSQueueRedeliverTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JMSQueueRedeliverTest.java?rev=786040&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JMSQueueRedeliverTest.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JMSQueueRedeliverTest.java Thu Jun 18 12:42:23 2009
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.legacy.test2;
+
+
+/**
+ * @version $Revision: 1.2 $
+ */
+public class JMSQueueRedeliverTest extends JmsTopicRedeliverTest {
+ protected void setUp() throws Exception {
+ topic = false;
+ super.setUp();
+ }
+}
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JMSQueueRedeliverTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JMSQueueRedeliverTest.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsAutoAckListenerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsAutoAckListenerTest.java?rev=786040&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsAutoAckListenerTest.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsAutoAckListenerTest.java Thu Jun 18 12:42:23 2009
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.legacy.test2;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+
+/**
+ * @version $Revision: 1.2 $
+ */
+public class JmsAutoAckListenerTest extends TestSupport implements MessageListener {
+
+ private Connection connection;
+
+ protected void setUp() throws Exception {
+ super.setUp();
+ connection = createConnection();
+ }
+
+ /**
+ * @see junit.framework.TestCase#tearDown()
+ */
+ protected void tearDown() throws Exception {
+ if (connection != null) {
+ connection.close();
+ connection = null;
+ }
+ super.tearDown();
+ }
+
+ /**
+ * Tests if acknowleged messages are being consumed.
+ *
+ * @throws javax.jms.JMSException
+ */
+ public void testAckedMessageAreConsumed() throws Exception {
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue("test");
+ MessageProducer producer = session.createProducer(queue);
+ producer.send(session.createTextMessage("Hello"));
+
+ // Consume the message...
+ MessageConsumer consumer = session.createConsumer(queue);
+ consumer.setMessageListener(this);
+
+ Thread.sleep(10000);
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ // Attempt to Consume the message...check if message was acknowledge
+ consumer = session.createConsumer(queue);
+ Message msg = consumer.receive(1000);
+ assertNull(msg);
+
+ session.close();
+ }
+
+ public void onMessage(Message message) {
+ assertNotNull(message);
+
+ }
+}
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsAutoAckListenerTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsAutoAckListenerTest.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsAutoAckTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsAutoAckTest.java?rev=786040&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsAutoAckTest.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsAutoAckTest.java Thu Jun 18 12:42:23 2009
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.legacy.test2;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+
+/**
+ * @version $Revision: 1.2 $
+ */
+public class JmsAutoAckTest extends TestSupport {
+
+ private Connection connection;
+
+ protected void setUp() throws Exception {
+ super.setUp();
+ connection = createConnection();
+ }
+
+ /**
+ * @see junit.framework.TestCase#tearDown()
+ */
+ protected void tearDown() throws Exception {
+ if (connection != null) {
+ connection.close();
+ connection = null;
+ }
+ super.tearDown();
+ }
+
+ /**
+ * Tests if acknowleged messages are being consumed.
+ *
+ * @throws javax.jms.JMSException
+ */
+ public void testAckedMessageAreConsumed() throws JMSException {
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue("test");
+ MessageProducer producer = session.createProducer(queue);
+ producer.send(session.createTextMessage("Hello"));
+
+ // Consume the message...
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message msg = consumer.receive(1000);
+ assertNotNull(msg);
+
+ // Reset the session.
+ session.close();
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // Attempt to Consume the message...
+ consumer = session.createConsumer(queue);
+ msg = consumer.receive(1000);
+ assertNull(msg);
+
+ session.close();
+ }
+
+
+}
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsAutoAckTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsAutoAckTest.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsClientAckListenerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsClientAckListenerTest.java?rev=786040&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsClientAckListenerTest.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsClientAckListenerTest.java Thu Jun 18 12:42:23 2009
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.legacy.test2;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+
+/**
+ * @version $Revision: 1.3 $
+ */
+public class JmsClientAckListenerTest extends TestSupport implements MessageListener {
+
+ private Connection connection;
+ private boolean dontAck;
+
+ protected void setUp() throws Exception {
+ super.setUp();
+ connection = createConnection();
+ }
+
+ /**
+ * @see junit.framework.TestCase#tearDown()
+ */
+ protected void tearDown() throws Exception {
+ if (connection != null) {
+ connection.close();
+ connection = null;
+ }
+ super.tearDown();
+ }
+
+ /**
+ * Tests if acknowleged messages are being consumed.
+ *
+ * @throws javax.jms.JMSException
+ */
+ public void testAckedMessageAreConsumed() throws Exception {
+ connection.start();
+ Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ Queue queue = session.createQueue("test");
+ MessageProducer producer = session.createProducer(queue);
+ producer.send(session.createTextMessage("Hello"));
+
+ // Consume the message...
+ MessageConsumer consumer = session.createConsumer(queue);
+ consumer.setMessageListener(this);
+
+ Thread.sleep(10000);
+
+ // Reset the session.
+ session.close();
+
+ session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ // Attempt to Consume the message...
+ consumer = session.createConsumer(queue);
+ Message msg = consumer.receive(1000);
+ assertNull(msg);
+
+ session.close();
+ }
+
+ /**
+ * Tests if unacknowleged messages are being redelivered when the consumer
+ * connects again.
+ *
+ * @throws javax.jms.JMSException
+ */
+ public void testUnAckedMessageAreNotConsumedOnSessionClose() throws Exception {
+ connection.start();
+ // don't aknowledge message on onMessage() call
+ dontAck = true;
+ Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ Queue queue = session.createQueue("test");
+ MessageProducer producer = session.createProducer(queue);
+ producer.send(session.createTextMessage("Hello"));
+
+ // Consume the message...
+ MessageConsumer consumer = session.createConsumer(queue);
+ consumer.setMessageListener(this);
+ // Don't ack the message.
+
+ // Reset the session. This should cause the Unacked message to be
+ // redelivered.
+ session.close();
+
+ Thread.sleep(10000);
+ session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ // Attempt to Consume the message...
+ consumer = session.createConsumer(queue);
+ Message msg = consumer.receive(2000);
+ assertNotNull(msg);
+ msg.acknowledge();
+
+ session.close();
+ }
+
+ public void onMessage(Message message) {
+
+ assertNotNull(message);
+ if (!dontAck) {
+ try {
+ message.acknowledge();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ }
+
+ }
+}
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsClientAckListenerTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsClientAckListenerTest.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsClientAckTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsClientAckTest.java?rev=786040&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsClientAckTest.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsClientAckTest.java Thu Jun 18 12:42:23 2009
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.legacy.test2;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+
+/**
+ * @version $Revision: 1.4 $
+ */
+public class JmsClientAckTest extends TestSupport {
+
+ private Connection connection;
+
+ protected void setUp() throws Exception {
+ super.setUp();
+ connection = createConnection();
+ }
+
+ /**
+ * @see junit.framework.TestCase#tearDown()
+ */
+ protected void tearDown() throws Exception {
+ if (connection != null) {
+ connection.close();
+ connection = null;
+ }
+ super.tearDown();
+ }
+
+ /**
+ * Tests if acknowledged messages are being consumed.
+ *
+ * @throws JMSException
+ */
+ public void testAckedMessageAreConsumed() throws JMSException {
+ connection.start();
+ Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ Queue queue = session.createQueue(getQueueName());
+ MessageProducer producer = session.createProducer(queue);
+ producer.send(session.createTextMessage("Hello"));
+
+ // Consume the message...
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message msg = consumer.receive(1000);
+ assertNotNull(msg);
+ msg.acknowledge();
+
+ // Reset the session.
+ session.close();
+ session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ // Attempt to Consume the message...
+ consumer = session.createConsumer(queue);
+ msg = consumer.receive(1000);
+ assertNull(msg);
+
+ session.close();
+ }
+
+ /**
+ * Tests if acknowledged messages are being consumed.
+ *
+ * @throws JMSException
+ */
+ public void testLastMessageAcked() throws JMSException {
+ connection.start();
+ Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ Queue queue = session.createQueue(getQueueName());
+ MessageProducer producer = session.createProducer(queue);
+ producer.send(session.createTextMessage("Hello"));
+ producer.send(session.createTextMessage("Hello2"));
+ producer.send(session.createTextMessage("Hello3"));
+
+ // Consume the message...
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message msg = consumer.receive(1000);
+ assertNotNull(msg);
+ msg = consumer.receive(1000);
+ assertNotNull(msg);
+ msg = consumer.receive(1000);
+ assertNotNull(msg);
+ msg.acknowledge();
+
+ // Reset the session.
+ session.close();
+ session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ // Attempt to Consume the message...
+ consumer = session.createConsumer(queue);
+ msg = consumer.receive(1000);
+ assertNull(msg);
+
+ session.close();
+ }
+
+ /**
+ * Tests if unacknowledged messages are being re-delivered when the consumer connects again.
+ *
+ * @throws JMSException
+ */
+ public void testUnAckedMessageAreNotConsumedOnSessionClose() throws JMSException {
+ connection.start();
+ Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ Queue queue = session.createQueue(getQueueName());
+ MessageProducer producer = session.createProducer(queue);
+ producer.send(session.createTextMessage("Hello"));
+
+ // Consume the message...
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message msg = consumer.receive(1000);
+ assertNotNull(msg);
+ // Don't ack the message.
+
+ // Reset the session. This should cause the unacknowledged message to be re-delivered.
+ session.close();
+ session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ // Attempt to Consume the message...
+ consumer = session.createConsumer(queue);
+ msg = consumer.receive(2000);
+ assertNotNull(msg);
+ msg.acknowledge();
+
+ session.close();
+ }
+
+ protected String getQueueName() {
+ return getClass().getName() + "." + getName();
+ }
+
+}
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsClientAckTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsClientAckTest.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsConnectionStartStopTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsConnectionStartStopTest.java?rev=786040&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsConnectionStartStopTest.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsConnectionStartStopTest.java Thu Jun 18 12:42:23 2009
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.legacy.test2;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+
+/**
+ * @version $Revision: 1.3 $
+ */
+public class JmsConnectionStartStopTest extends TestSupport {
+
+ private static final org.apache.commons.logging.Log LOG = org.apache.commons.logging.LogFactory
+ .getLog(JmsConnectionStartStopTest.class);
+
+ private Connection startedConnection;
+ private Connection stoppedConnection;
+
+ /**
+ * @see junit.framework.TestCase#setUp()
+ */
+ protected void setUp() throws Exception {
+
+ LOG.info(getClass().getClassLoader().getResource("log4j.properties"));
+
+ ActiveMQConnectionFactory factory = createConnectionFactory();
+ startedConnection = factory.createConnection();
+ startedConnection.start();
+ stoppedConnection = factory.createConnection();
+ }
+
+ /**
+ * @see junit.framework.TestCase#tearDown()
+ */
+ protected void tearDown() throws Exception {
+ stoppedConnection.close();
+ startedConnection.close();
+ }
+
+ /**
+ * Tests if the consumer receives the messages that were sent before the
+ * connection was started.
+ *
+ * @throws JMSException
+ */
+ public void testStoppedConsumerHoldsMessagesTillStarted() throws JMSException {
+ Session startedSession = startedConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session stoppedSession = stoppedConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // Setup the consumers.
+ Topic topic = startedSession.createTopic("test");
+ MessageConsumer startedConsumer = startedSession.createConsumer(topic);
+ MessageConsumer stoppedConsumer = stoppedSession.createConsumer(topic);
+
+ // Send the message.
+ MessageProducer producer = startedSession.createProducer(topic);
+ TextMessage message = startedSession.createTextMessage("Hello");
+ producer.send(message);
+
+ // Test the assertions.
+ Message m = startedConsumer.receive(1000);
+ assertNotNull(m);
+
+ m = stoppedConsumer.receive(1000);
+ assertNull(m);
+
+ stoppedConnection.start();
+ m = stoppedConsumer.receive(5000);
+ assertNotNull(m);
+
+ startedSession.close();
+ stoppedSession.close();
+ }
+
+ /**
+ * Tests if the consumer is able to receive messages eveb when the
+ * connecction restarts multiple times.
+ *
+ * @throws Exception
+ */
+ public void testMultipleConnectionStops() throws Exception {
+ testStoppedConsumerHoldsMessagesTillStarted();
+ stoppedConnection.stop();
+ testStoppedConsumerHoldsMessagesTillStarted();
+ stoppedConnection.stop();
+ testStoppedConsumerHoldsMessagesTillStarted();
+ }
+}
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsConnectionStartStopTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsConnectionStartStopTest.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsCreateConsumerInOnMessageTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsCreateConsumerInOnMessageTest.java?rev=786040&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsCreateConsumerInOnMessageTest.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsCreateConsumerInOnMessageTest.java Thu Jun 18 12:42:23 2009
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.legacy.test2;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+
+
+/**
+ * @version $Revision: 1.2 $
+ */
+public class JmsCreateConsumerInOnMessageTest extends TestSupport implements MessageListener {
+
+ private Connection connection;
+ private Session publisherSession;
+ private Session consumerSession;
+ private MessageConsumer consumer;
+ private MessageConsumer testConsumer;
+ private MessageProducer producer;
+ private Topic topic;
+ private Object lock = new Object();
+
+ /*
+ * @see junit.framework.TestCase#setUp()
+ */
+ protected void setUp() throws Exception {
+ super.setUp();
+ super.topic = true;
+ connection = createConnection();
+ connection.setClientID("connection:" + getSubject());
+ publisherSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ consumerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ topic = (Topic)super.createDestination("Test.Topic");
+ consumer = consumerSession.createConsumer(topic);
+ consumer.setMessageListener(this);
+ producer = publisherSession.createProducer(topic);
+ connection.start();
+ }
+
+ /*
+ * @see junit.framework.TestCase#tearDown()
+ */
+ protected void tearDown() throws Exception {
+ super.tearDown();
+ connection.close();
+ }
+
+ /**
+ * Tests if a consumer can be created asynchronusly
+ *
+ * @throws Exception
+ */
+ public void testCreateConsumer() throws Exception {
+ Message msg = super.createMessage();
+ producer.send(msg);
+ if (testConsumer == null) {
+ synchronized (lock) {
+ lock.wait(3000);
+ }
+ }
+ assertTrue(testConsumer != null);
+ }
+
+ /**
+ * Use the asynchronous subscription mechanism
+ *
+ * @param message
+ */
+ public void onMessage(Message message) {
+ try {
+ testConsumer = consumerSession.createConsumer(topic);
+ consumerSession.createProducer(topic);
+ synchronized (lock) {
+ lock.notify();
+ }
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ assertTrue(false);
+ }
+ }
+}
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsCreateConsumerInOnMessageTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsCreateConsumerInOnMessageTest.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsDurableQueueWildcardSendReceiveTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsDurableQueueWildcardSendReceiveTest.java?rev=786040&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsDurableQueueWildcardSendReceiveTest.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsDurableQueueWildcardSendReceiveTest.java Thu Jun 18 12:42:23 2009
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.legacy.test2;
+
+import javax.jms.DeliveryMode;
+
+
+/**
+ * @version $Revision: 1.2 $
+ */
+public class JmsDurableQueueWildcardSendReceiveTest extends JmsTopicSendReceiveTest {
+
+ /**
+ * Set up the test with a queue and persistent delivery mode.
+ *
+ * @see junit.framework.TestCase#setUp()
+ */
+ protected void setUp() throws Exception {
+ topic = false;
+ deliveryMode = DeliveryMode.PERSISTENT;
+ super.setUp();
+ }
+
+ /**
+ * Returns the consumer subject.
+ */
+ protected String getConsumerSubject() {
+ return "FOO.>";
+ }
+
+ /**
+ * Returns the producer subject.
+ */
+ protected String getProducerSubject() {
+ return "FOO.BAR.HUMBUG";
+ }
+}
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsDurableQueueWildcardSendReceiveTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsDurableQueueWildcardSendReceiveTest.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsDurableTopicSelectorTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsDurableTopicSelectorTest.java?rev=786040&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsDurableTopicSelectorTest.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsDurableTopicSelectorTest.java Thu Jun 18 12:42:23 2009
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.legacy.test2;
+
+
+/**
+ * @version $Revision: 1.2 $
+ */
+public class JmsDurableTopicSelectorTest extends JmsTopicSelectorTest {
+ public void setUp() throws Exception {
+ durable = true;
+ super.setUp();
+ }
+}
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsDurableTopicSelectorTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsDurableTopicSelectorTest.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsDurableTopicSendReceiveTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsDurableTopicSendReceiveTest.java?rev=786040&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsDurableTopicSendReceiveTest.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsDurableTopicSendReceiveTest.java Thu Jun 18 12:42:23 2009
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.legacy.test2;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * @version $Revision: 1.5 $
+ */
+public class JmsDurableTopicSendReceiveTest extends JmsTopicSendReceiveTest {
+ private static final Log LOG = LogFactory.getLog(JmsDurableTopicSendReceiveTest.class);
+
+ protected Connection connection2;
+ protected Session session2;
+ protected Session consumeSession2;
+ protected MessageConsumer consumer2;
+ protected MessageProducer producer2;
+ protected Destination consumerDestination2;
+ protected Destination producerDestination2;
+
+ /**
+ * Set up a durable suscriber test.
+ *
+ * @see junit.framework.TestCase#setUp()
+ */
+ protected void setUp() throws Exception {
+ this.durable = true;
+ super.setUp();
+ }
+
+ /**
+ * Test if all the messages sent are being received.
+ *
+ * @throws Exception
+ */
+ public void testSendWhileClosed() throws Exception {
+ connection2 = createConnection();
+ connection2.setClientID("test");
+ connection2.start();
+ session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ producer2 = session2.createProducer(null);
+ producer2.setDeliveryMode(deliveryMode);
+ producerDestination2 = session2.createTopic(getProducerSubject() + "2");
+ Thread.sleep(1000);
+
+ consumeSession2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ consumerDestination2 = session2.createTopic(getConsumerSubject() + "2");
+ consumer2 = consumeSession2.createDurableSubscriber((Topic)consumerDestination2, getName());
+ Thread.sleep(1000);
+ consumer2.close();
+ TextMessage message = session2.createTextMessage("test");
+ message.setStringProperty("test", "test");
+ message.setJMSType("test");
+ producer2.send(producerDestination2, message);
+ LOG.info("Creating durable consumer");
+ consumer2 = consumeSession2.createDurableSubscriber((Topic)consumerDestination2, getName());
+ Message msg = consumer2.receive(1000);
+ assertNotNull(msg);
+ assertEquals(((TextMessage)msg).getText(), "test");
+ assertEquals(msg.getJMSType(), "test");
+ assertEquals(msg.getStringProperty("test"), "test");
+ connection2.stop();
+ connection2.close();
+ }
+}
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsDurableTopicSendReceiveTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsDurableTopicSendReceiveTest.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsDurableTopicWildcardSendReceiveTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsDurableTopicWildcardSendReceiveTest.java?rev=786040&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsDurableTopicWildcardSendReceiveTest.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsDurableTopicWildcardSendReceiveTest.java Thu Jun 18 12:42:23 2009
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.legacy.test2;
+
+import javax.jms.DeliveryMode;
+
+
+/**
+ * @version $Revision: 1.2 $
+ */
+public class JmsDurableTopicWildcardSendReceiveTest extends JmsTopicSendReceiveTest {
+
+ /**
+ * Sets up a test with a topic destination, durable suscriber and persistent
+ * delivery mode.
+ *
+ * @see junit.framework.TestCase#setUp()
+ */
+ protected void setUp() throws Exception {
+ topic = true;
+ durable = true;
+ deliveryMode = DeliveryMode.PERSISTENT;
+ super.setUp();
+ }
+
+ /**
+ * Returns the consumer subject.
+ */
+ protected String getConsumerSubject() {
+ return "FOO.>";
+ }
+
+ /**
+ * Returns the producer subject.
+ */
+ protected String getProducerSubject() {
+ return "FOO.BAR.HUMBUG";
+ }
+}
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsDurableTopicWildcardSendReceiveTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsDurableTopicWildcardSendReceiveTest.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsQueueSelectorTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsQueueSelectorTest.java?rev=786040&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsQueueSelectorTest.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsQueueSelectorTest.java Thu Jun 18 12:42:23 2009
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.legacy.test2;
+
+
+/**
+ * @version $Revision: 1.2 $
+ */
+public class JmsQueueSelectorTest extends JmsTopicSelectorTest {
+ public void setUp() throws Exception {
+ topic = false;
+ super.setUp();
+ }
+}
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsQueueSelectorTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsQueueSelectorTest.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsSendReceiveTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsSendReceiveTestSupport.java?rev=786040&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsSendReceiveTestSupport.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsSendReceiveTestSupport.java Thu Jun 18 12:42:23 2009
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.legacy.test2;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * @version $Revision: 1.7 $
+ */
+public class JmsSendReceiveTestSupport extends TestSupport implements MessageListener {
+ private static final Log LOG = LogFactory.getLog(JmsSendReceiveTestSupport.class);
+
+ protected int messageCount = 100;
+ protected String[] data;
+ protected Session session;
+ protected MessageConsumer consumer;
+ protected MessageProducer producer;
+ protected Destination consumerDestination;
+ protected Destination producerDestination;
+ protected List<Message> messages = createConcurrentList();
+ protected boolean topic = true;
+ protected boolean durable;
+ protected int deliveryMode = DeliveryMode.PERSISTENT;
+ protected final Object lock = new Object();
+ protected boolean verbose;
+
+ /*
+ * @see junit.framework.TestCase#setUp()
+ */
+ protected void setUp() throws Exception {
+ super.setUp();
+ String temp = System.getProperty("messageCount");
+
+ if (temp != null) {
+ int i = Integer.parseInt(temp);
+ if (i > 0) {
+ messageCount = i;
+ }
+ }
+
+ LOG.info("Message count for test case is: " + messageCount);
+ data = new String[messageCount];
+
+ for (int i = 0; i < messageCount; i++) {
+ data[i] = "Text for message: " + i + " at " + new Date();
+ }
+ }
+
+ /**
+ * Sends and consumes the messages.
+ *
+ * @throws Exception
+ */
+ public void testSendReceive() throws Exception {
+ messages.clear();
+ for (int i = 0; i < data.length; i++) {
+ Message message = session.createTextMessage(data[i]);
+ message.setStringProperty("stringProperty", data[i]);
+ message.setIntProperty("intProperty", i);
+
+ if (verbose) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("About to send a message: " + message + " with text: " + data[i]);
+ }
+ }
+
+ sendToProducer(producer, producerDestination, message);
+ messageSent();
+ }
+
+ assertMessagesAreReceived();
+ LOG.info("" + data.length + " messages(s) received, closing down connections");
+ }
+
+ /**
+ * Sends a message to a destination using the supplied producer
+ * @param producer
+ * @param producerDestination
+ * @param message
+ * @throws JMSException
+ */
+ protected void sendToProducer(MessageProducer producer,
+ Destination producerDestination, Message message) throws JMSException {
+ producer.send(producerDestination, message);
+ }
+
+ /**
+ * Asserts messages are received.
+ *
+ * @throws JMSException
+ */
+ protected void assertMessagesAreReceived() throws JMSException {
+ waitForMessagesToBeDelivered();
+ assertMessagesReceivedAreValid(messages);
+ }
+
+ /**
+ * Tests if the messages received are valid.
+ *
+ * @param receivedMessages - list of received messages.
+ * @throws JMSException
+ */
+ protected void assertMessagesReceivedAreValid(List<Message> receivedMessages) throws JMSException {
+ List<Object> copyOfMessages = Arrays.asList(receivedMessages.toArray());
+ int counter = 0;
+
+ if (data.length != copyOfMessages.size()) {
+ for (Iterator<Object> iter = copyOfMessages.iterator(); iter.hasNext();) {
+ TextMessage message = (TextMessage)iter.next();
+ if (LOG.isInfoEnabled()) {
+ LOG.info("<== " + counter++ + " = " + message.getText());
+ }
+ }
+ }
+
+ assertEquals("Not enough messages received", data.length, receivedMessages.size());
+
+ for (int i = 0; i < data.length; i++) {
+ TextMessage received = (TextMessage)receivedMessages.get(i);
+ String text = received.getText();
+ String stringProperty = received.getStringProperty("stringProperty");
+ int intProperty = received.getIntProperty("intProperty");
+
+ if (verbose) {
+ if (LOG.isDebugEnabled()) {
+ LOG.info("Received Text: " + text);
+ }
+ }
+
+ assertEquals("Message: " + i, data[i], text);
+ assertEquals(data[i], stringProperty);
+ assertEquals(i, intProperty);
+ }
+ }
+
+ /**
+ * Waits for messages to be delivered.
+ */
+ protected void waitForMessagesToBeDelivered() {
+ long maxWaitTime = 30000;
+ long waitTime = maxWaitTime;
+ long start = (maxWaitTime <= 0) ? 0 : System.currentTimeMillis();
+
+ synchronized (lock) {
+ while (messages.size() < data.length && waitTime >= 0) {
+ try {
+ lock.wait(200);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ waitTime = maxWaitTime - (System.currentTimeMillis() - start);
+ }
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see javax.jms.MessageListener#onMessage(javax.jms.Message)
+ */
+ public synchronized void onMessage(Message message) {
+ consumeMessage(message, messages);
+ }
+
+ /**
+ * Consumes messages.
+ *
+ * @param message - message to be consumed.
+ * @param messageList -list of consumed messages.
+ */
+ protected void consumeMessage(Message message, List<Message> messageList) {
+ if (verbose) {
+ if (LOG.isDebugEnabled()) {
+ LOG.info("Received message: " + message);
+ }
+ }
+
+ messageList.add(message);
+
+ if (messageList.size() >= data.length) {
+ synchronized (lock) {
+ lock.notifyAll();
+ }
+ }
+ }
+
+ /**
+ * Returns the ArrayList as a synchronized list.
+ *
+ * @return List
+ */
+ protected List<Message> createConcurrentList() {
+ return Collections.synchronizedList(new ArrayList<Message>());
+ }
+
+ /**
+ * Just a hook so can insert failure tests
+ *
+ * @throws Exception
+ */
+ protected void messageSent() throws Exception {
+
+ }
+}
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsSendReceiveTestSupport.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsSendReceiveTestSupport.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsSendReceiveWithMessageExpirationTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsSendReceiveWithMessageExpirationTest.java?rev=786040&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsSendReceiveWithMessageExpirationTest.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsSendReceiveWithMessageExpirationTest.java Thu Jun 18 12:42:23 2009
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.legacy.test2;
+
+import java.util.Date;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ *
+ */
+public class JmsSendReceiveWithMessageExpirationTest extends TestSupport {
+
+ private static final Log LOG = LogFactory.getLog(JmsSendReceiveWithMessageExpirationTest.class);
+
+ protected int messageCount = 100;
+ protected String[] data;
+ protected Session session;
+ protected Destination consumerDestination;
+ protected Destination producerDestination;
+ protected boolean durable;
+ protected int deliveryMode = DeliveryMode.PERSISTENT;
+ protected long timeToLive = 5000;
+ protected boolean verbose;
+
+ protected Connection connection;
+
+ protected void setUp() throws Exception {
+
+ super.setUp();
+
+ data = new String[messageCount];
+
+ for (int i = 0; i < messageCount; i++) {
+ data[i] = "Text for message: " + i + " at " + new Date();
+ }
+
+ connectionFactory = createConnectionFactory();
+ connection = createConnection();
+
+ if (durable) {
+ connection.setClientID(getClass().getName());
+ }
+
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ }
+
+ /**
+ * Test consuming an expired queue.
+ *
+ * @throws Exception
+ */
+ public void testConsumeExpiredQueue() throws Exception {
+
+ MessageProducer producer = createProducer(timeToLive);
+
+ consumerDestination = session.createQueue(getConsumerSubject());
+ producerDestination = session.createQueue(getProducerSubject());
+
+ MessageConsumer consumer = createConsumer();
+ connection.start();
+
+ for (int i = 0; i < data.length; i++) {
+ Message message = session.createTextMessage(data[i]);
+ message.setStringProperty("stringProperty", data[i]);
+ message.setIntProperty("intProperty", i);
+
+ if (verbose) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("About to send a queue message: " + message + " with text: " + data[i]);
+ }
+ }
+
+ producer.send(producerDestination, message);
+ }
+
+ // sleeps a second longer than the expiration time.
+ // Basically waits till queue expires.
+ Thread.sleep(timeToLive + 1000);
+
+ // message should have expired.
+ assertNull(consumer.receive(1000));
+ }
+
+ /**
+ * Sends and consumes the messages to a queue destination.
+ *
+ * @throws Exception
+ */
+ public void testConsumeQueue() throws Exception {
+
+ MessageProducer producer = createProducer(0);
+
+ consumerDestination = session.createQueue(getConsumerSubject());
+ producerDestination = session.createQueue(getProducerSubject());
+
+ MessageConsumer consumer = createConsumer();
+ connection.start();
+
+ for (int i = 0; i < data.length; i++) {
+ Message message = session.createTextMessage(data[i]);
+ message.setStringProperty("stringProperty", data[i]);
+ message.setIntProperty("intProperty", i);
+
+ if (verbose) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("About to send a queue message: " + message + " with text: " + data[i]);
+ }
+ }
+
+ producer.send(producerDestination, message);
+ }
+
+ // should receive a queue since there is no expiration.
+ assertNotNull(consumer.receive(1000));
+ }
+
+ /**
+ * Test consuming an expired topic.
+ *
+ * @throws Exception
+ */
+ public void testConsumeExpiredTopic() throws Exception {
+
+ MessageProducer producer = createProducer(timeToLive);
+
+ consumerDestination = session.createTopic(getConsumerSubject());
+ producerDestination = session.createTopic(getProducerSubject());
+
+ MessageConsumer consumer = createConsumer();
+ connection.start();
+
+ for (int i = 0; i < data.length; i++) {
+ Message message = session.createTextMessage(data[i]);
+ message.setStringProperty("stringProperty", data[i]);
+ message.setIntProperty("intProperty", i);
+
+ if (verbose) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("About to send a topic message: " + message + " with text: " + data[i]);
+ }
+ }
+
+ producer.send(producerDestination, message);
+ }
+
+ // sleeps a second longer than the expiration time.
+ // Basically waits till topic expires.
+ Thread.sleep(timeToLive + 1000);
+
+ // message should have expired.
+ assertNull(consumer.receive(1000));
+ }
+
+ /**
+ * Sends and consumes the messages to a topic destination.
+ *
+ * @throws Exception
+ */
+ public void testConsumeTopic() throws Exception {
+
+ MessageProducer producer = createProducer(0);
+
+ consumerDestination = session.createTopic(getConsumerSubject());
+ producerDestination = session.createTopic(getProducerSubject());
+
+ MessageConsumer consumer = createConsumer();
+ connection.start();
+
+ for (int i = 0; i < data.length; i++) {
+ Message message = session.createTextMessage(data[i]);
+ message.setStringProperty("stringProperty", data[i]);
+ message.setIntProperty("intProperty", i);
+
+ if (verbose) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("About to send a topic message: " + message + " with text: " + data[i]);
+ }
+ }
+
+ producer.send(producerDestination, message);
+ }
+
+ // should receive a topic since there is no expiration.
+ assertNotNull(consumer.receive(1000));
+ }
+
+ protected MessageProducer createProducer(long timeToLive) throws JMSException {
+ MessageProducer producer = session.createProducer(null);
+ producer.setDeliveryMode(deliveryMode);
+ producer.setTimeToLive(timeToLive);
+
+ return producer;
+ }
+
+ protected MessageConsumer createConsumer() throws JMSException {
+ if (durable) {
+ LOG.info("Creating durable consumer");
+ return session.createDurableSubscriber((Topic)consumerDestination, getName());
+ }
+ return session.createConsumer(consumerDestination);
+ }
+
+ protected void tearDown() throws Exception {
+ LOG.info("Dumping stats...");
+ LOG.info("Closing down connection");
+
+ session.close();
+ connection.close();
+ }
+
+}
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsSendReceiveWithMessageExpirationTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsTopicRedeliverTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsTopicRedeliverTest.java?rev=786040&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsTopicRedeliverTest.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsTopicRedeliverTest.java Thu Jun 18 12:42:23 2009
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.legacy.test2;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * @version $Revision: 1.4 $
+ */
+public class JmsTopicRedeliverTest extends TestSupport {
+
+ private static final Log LOG = LogFactory.getLog(JmsTopicRedeliverTest.class);
+
+ protected Connection connection;
+ protected Session session;
+ protected Session consumeSession;
+ protected MessageConsumer consumer;
+ protected MessageProducer producer;
+ protected Destination consumerDestination;
+ protected Destination producerDestination;
+ protected boolean topic = true;
+ protected boolean durable;
+ protected boolean verbose;
+ protected long initRedeliveryDelay;
+
+ protected void setUp() throws Exception {
+ super.setUp();
+
+ connectionFactory = createConnectionFactory();
+ connection = createConnection();
+ initRedeliveryDelay = ((ActiveMQConnection)connection).getRedeliveryPolicy().getInitialRedeliveryDelay();
+
+ if (durable) {
+ connection.setClientID(getClass().getName());
+ }
+
+ LOG.info("Created connection: " + connection);
+
+ session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ consumeSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ LOG.info("Created session: " + session);
+ LOG.info("Created consumeSession: " + consumeSession);
+ producer = session.createProducer(null);
+ // producer.setDeliveryMode(deliveryMode);
+
+ LOG.info("Created producer: " + producer);
+
+ if (topic) {
+ consumerDestination = session.createTopic(getConsumerSubject());
+ producerDestination = session.createTopic(getProducerSubject());
+ } else {
+ consumerDestination = session.createQueue(getConsumerSubject());
+ producerDestination = session.createQueue(getProducerSubject());
+ }
+
+ LOG.info("Created consumer destination: " + consumerDestination + " of type: " + consumerDestination.getClass());
+ LOG.info("Created producer destination: " + producerDestination + " of type: " + producerDestination.getClass());
+ consumer = createConsumer();
+ connection.start();
+
+ LOG.info("Created connection: " + connection);
+ }
+
+ protected void tearDown() throws Exception {
+ if (connection != null) {
+ connection.close();
+ }
+ super.tearDown();
+ }
+
+ /**
+ * Returns the consumer subject.
+ *
+ * @return String - consumer subject
+ * @see org.apache.activemq.legacy.testsupport.test.TestSupport#getConsumerSubject()
+ */
+ protected String getConsumerSubject() {
+ return "TEST";
+ }
+
+ /**
+ * Returns the producer subject.
+ *
+ * @return String - producer subject
+ * @see org.apache.activemq.legacy.testsupport.test.TestSupport#getProducerSubject()
+ */
+ protected String getProducerSubject() {
+ return "TEST";
+ }
+
+ /**
+ * Sends and consumes the messages.
+ *
+ * @throws Exception
+ */
+ public void testRecover() throws Exception {
+ String text = "TEST";
+ Message sendMessage = session.createTextMessage(text);
+
+ if (verbose) {
+ LOG.info("About to send a message: " + sendMessage + " with text: " + text);
+ }
+ producer.send(producerDestination, sendMessage);
+
+ // receive but don't acknowledge
+ Message unackMessage = consumer.receive(initRedeliveryDelay + 1000);
+ assertNotNull(unackMessage);
+ String unackId = unackMessage.getJMSMessageID();
+ assertEquals(((TextMessage)unackMessage).getText(), text);
+ assertFalse(unackMessage.getJMSRedelivered());
+ // assertEquals(unackMessage.getIntProperty("JMSXDeliveryCount"),1);
+
+ // receive then acknowledge
+ consumeSession.recover();
+ Message ackMessage = consumer.receive(initRedeliveryDelay + 1000);
+ assertNotNull(ackMessage);
+ ackMessage.acknowledge();
+ String ackId = ackMessage.getJMSMessageID();
+ assertEquals(((TextMessage)ackMessage).getText(), text);
+ assertTrue(ackMessage.getJMSRedelivered());
+ // assertEquals(ackMessage.getIntProperty("JMSXDeliveryCount"),2);
+ assertEquals(unackId, ackId);
+ consumeSession.recover();
+ assertNull(consumer.receiveNoWait());
+ }
+
+ protected MessageConsumer createConsumer() throws JMSException {
+ if (durable) {
+ LOG.info("Creating durable consumer");
+ return consumeSession.createDurableSubscriber((Topic)consumerDestination, getName());
+ }
+ return consumeSession.createConsumer(consumerDestination);
+ }
+
+}
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsTopicRedeliverTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsTopicRedeliverTest.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsTopicSelectorTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsTopicSelectorTest.java?rev=786040&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsTopicSelectorTest.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsTopicSelectorTest.java Thu Jun 18 12:42:23 2009
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.legacy.test2;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * @version $Revision: 1.2 $
+ */
+public class JmsTopicSelectorTest extends TestSupport {
+ private static final Log LOG = LogFactory.getLog(JmsTopicSelectorTest.class);
+
+ protected Connection connection;
+ protected Session session;
+ protected MessageConsumer consumer;
+ protected MessageProducer producer;
+ protected Destination consumerDestination;
+ protected Destination producerDestination;
+ protected boolean topic = true;
+ protected boolean durable;
+ protected int deliveryMode = DeliveryMode.PERSISTENT;
+
+ public JmsTopicSelectorTest() {
+ super();
+ }
+
+ public JmsTopicSelectorTest(String name) {
+ super(name);
+ }
+
+ public void setUp() throws Exception {
+ super.setUp();
+
+ connectionFactory = createConnectionFactory();
+ connection = createConnection();
+ if (durable) {
+ connection.setClientID(getClass().getName());
+ }
+
+ LOG.info("Created connection: " + connection);
+
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ LOG.info("Created session: " + session);
+
+ if (topic) {
+ consumerDestination = session.createTopic(getConsumerSubject());
+ producerDestination = session.createTopic(getProducerSubject());
+ } else {
+ consumerDestination = session.createQueue(getConsumerSubject());
+ producerDestination = session.createQueue(getProducerSubject());
+ }
+
+ LOG.info("Created consumer destination: " + consumerDestination + " of type: " + consumerDestination.getClass());
+ LOG.info("Created producer destination: " + producerDestination + " of type: " + producerDestination.getClass());
+ producer = session.createProducer(producerDestination);
+ producer.setDeliveryMode(deliveryMode);
+
+ LOG.info("Created producer: " + producer + " delivery mode = " + (deliveryMode == DeliveryMode.PERSISTENT ? "PERSISTENT" : "NON_PERSISTENT"));
+ connection.start();
+ }
+
+ public void tearDown() throws Exception {
+ session.close();
+ connection.close();
+ }
+
+ protected MessageConsumer createConsumer(String selector) throws JMSException {
+ if (durable) {
+ LOG.info("Creating durable consumer");
+ return session.createDurableSubscriber((Topic)consumerDestination, getName(), selector, false);
+ }
+ return session.createConsumer(consumerDestination, selector);
+ }
+
+ public void sendMessages() throws Exception {
+ TextMessage message = session.createTextMessage("1");
+ message.setIntProperty("id", 1);
+ message.setJMSType("a");
+ message.setStringProperty("stringProperty", "a");
+ message.setLongProperty("longProperty", 1);
+ message.setBooleanProperty("booleanProperty", true);
+ producer.send(message);
+
+ message = session.createTextMessage("2");
+ message.setIntProperty("id", 2);
+ message.setJMSType("a");
+ message.setStringProperty("stringProperty", "a");
+ message.setLongProperty("longProperty", 1);
+ message.setBooleanProperty("booleanProperty", false);
+ producer.send(message);
+
+ message = session.createTextMessage("3");
+ message.setIntProperty("id", 3);
+ message.setJMSType("a");
+ message.setStringProperty("stringProperty", "a");
+ message.setLongProperty("longProperty", 1);
+ message.setBooleanProperty("booleanProperty", true);
+ producer.send(message);
+
+ message = session.createTextMessage("4");
+ message.setIntProperty("id", 4);
+ message.setJMSType("b");
+ message.setStringProperty("stringProperty", "b");
+ message.setLongProperty("longProperty", 2);
+ message.setBooleanProperty("booleanProperty", false);
+ producer.send(message);
+
+ message = session.createTextMessage("5");
+ message.setIntProperty("id", 5);
+ message.setJMSType("c");
+ message.setStringProperty("stringProperty", "c");
+ message.setLongProperty("longProperty", 3);
+ message.setBooleanProperty("booleanProperty", true);
+ producer.send(message);
+ }
+
+ public void consumeMessages(int remaining) throws Exception {
+ consumer = createConsumer(null);
+ for (int i = 0; i < remaining; i++) {
+ consumer.receive(1000);
+ }
+ consumer.close();
+
+ }
+
+ public void testPropertySelector() throws Exception {
+ int remaining = 5;
+ Message message = null;
+ consumer = createConsumer("stringProperty = 'a' and longProperty = 1 and booleanProperty = true");
+ sendMessages();
+ while (true) {
+ message = consumer.receive(1000);
+ if (message == null) {
+ break;
+ }
+ String text = ((TextMessage)message).getText();
+ if (!text.equals("1") && !text.equals("3")) {
+ fail("unexpected message: " + text);
+ }
+ remaining--;
+ }
+ assertEquals(remaining, 3);
+ consumer.close();
+ consumeMessages(remaining);
+
+ }
+
+ public void testJMSPropertySelector() throws Exception {
+ int remaining = 5;
+ Message message = null;
+ consumer = createConsumer("JMSType = 'a' and stringProperty = 'a'");
+ sendMessages();
+ while (true) {
+ message = consumer.receive(1000);
+ if (message == null) {
+ break;
+ }
+ String text = ((TextMessage)message).getText();
+ if (!text.equals("1") && !text.equals("2") && !text.equals("3")) {
+ fail("unexpected message: " + text);
+ }
+ remaining--;
+ }
+ assertEquals(remaining, 2);
+ consumer.close();
+ consumeMessages(remaining);
+
+ }
+
+}
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsTopicSelectorTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsTopicSelectorTest.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsTopicSendReceiveSubscriberTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsTopicSendReceiveSubscriberTest.java?rev=786040&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsTopicSendReceiveSubscriberTest.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsTopicSendReceiveSubscriberTest.java Thu Jun 18 12:42:23 2009
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.legacy.test2;
+
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.Topic;
+import javax.jms.TopicSession;
+
+
+/**
+ * @version $Revision: 1.4 $
+ */
+public class JmsTopicSendReceiveSubscriberTest extends JmsTopicSendReceiveTest {
+ protected MessageConsumer createConsumer() throws JMSException {
+ if (durable) {
+ return super.createConsumer();
+ } else {
+ TopicSession topicSession = (TopicSession)session;
+ return topicSession.createSubscriber((Topic)consumerDestination, null, false);
+ }
+ }
+}
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsTopicSendReceiveSubscriberTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsTopicSendReceiveSubscriberTest.java
------------------------------------------------------------------------------
svn:executable = *