You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2016/03/16 04:44:49 UTC
[39/49] bookkeeper git commit: BOOKKEEPER-769: Remove the Hedwig Code
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/JMSUsecaseTest.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/JMSUsecaseTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/JMSUsecaseTest.java
deleted file mode 100644
index cadb071..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/JMSUsecaseTest.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq;
-
-import javax.jms.DeliveryMode;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import junit.framework.Test;
-
-import javax.jms.Destination;
-
-import org.apache.hedwig.jms.MessagingSessionFacade;
-import org.apache.hedwig.jms.SessionImpl;
-import org.apache.hedwig.jms.message.MessageImpl;
-
-public class JMSUsecaseTest extends JmsTestSupport {
-
- public Destination destination;
- public int deliveryMode;
- public int prefetch;
- public MessagingSessionFacade.DestinationType destinationType;
- public boolean durableConsumer;
-
- public static Test suite() {
- return suite(JMSUsecaseTest.class);
- }
-
- public static void main(String[] args) {
- junit.textui.TestRunner.run(suite());
- }
-
- public void initCombosForTestSendReceive() {
- addCombinationValues("deliveryMode", new Object[] {
- Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
- addCombinationValues("destinationType", new Object[] {MessagingSessionFacade.DestinationType.TOPIC});
- }
-
- public void testSendReceive() throws Exception {
- // Send a message to the broker.
- connection.start();
- SessionImpl session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- destination = createDestination(session, destinationType);
- MessageProducer producer = session.createProducer(destination);
- MessageConsumer consumer = session.createConsumer(destination);
- MessageImpl message = new MessageImpl(session);
- producer.send(message);
-
- // Make sure only 1 message was delivered.
- assertNotNull(consumer.receive(1000));
- assertNull(consumer.receiveNoWait());
- }
-
- public void initCombosForTestSendReceiveTransacted() {
- addCombinationValues("deliveryMode", new Object[] {
- Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
- addCombinationValues("destinationType", new Object[] {MessagingSessionFacade.DestinationType.TOPIC});
- }
-
- public void testSendReceiveTransacted() throws Exception {
- // Send a message to the broker.
- connection.start();
- Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
- destination = createDestination(session, destinationType);
- MessageProducer producer = session.createProducer(destination);
- MessageConsumer consumer = session.createConsumer(destination);
- producer.send(session.createTextMessage("test"));
-
- // Message should not be delivered until commit.
- assertNull(consumer.receiveNoWait());
- session.commit();
-
- // Make sure only 1 message was delivered.
- Message message = consumer.receive(1000);
- assertNotNull(message);
- assertFalse(message.getJMSRedelivered());
- assertNull(consumer.receiveNoWait());
-
- // Message should be redelivered is rollback is used.
- session.rollback();
-
- // Make sure only 1 message was delivered.
- message = consumer.receive(2000);
- assertNotNull(message);
- assertTrue(message.getJMSRedelivered());
- assertNull(consumer.receiveNoWait());
-
- // If we commit now, the message should not be redelivered.
- session.commit();
- assertNull(consumer.receiveNoWait());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/JmsAutoAckListenerTest.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsAutoAckListenerTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/JmsAutoAckListenerTest.java
deleted file mode 100644
index ec5243f..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsAutoAckListenerTest.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq;
-
-import javax.jms.Topic;
-import javax.jms.Connection;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-public class JmsAutoAckListenerTest extends TestSupport implements MessageListener {
-
- private Connection connection;
-
- protected void setUp() throws Exception {
- super.setUp();
- connection = createConnection();
- }
-
- /**
- * @see junit.framework.TestCase#tearDown()
- */
- protected void tearDown() throws Exception {
- if (connection != null) {
- connection.close();
- connection = null;
- }
- super.tearDown();
- }
-
- /**
- * Tests if acknowleged messages are being consumed.
- *
- * @throws javax.jms.JMSException
- */
- public void testAckedMessageAreConsumed() throws Exception {
- connection.start();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Topic queue = session.createTopic("test");
- MessageProducer producer = session.createProducer(queue);
- MessageConsumer consumer = session.createDurableSubscriber(queue, "subscriber-id1");
- consumer.setMessageListener(this);
- producer.send(session.createTextMessage("Hello"));
-
- // Consume the message...
-
- Thread.sleep(10000);
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- // Attempt to Consume the message...check if message was acknowledge
- consumer = session.createDurableSubscriber(queue, "subscriber-id1");
- Message msg = consumer.receive(1000);
- assertNull(msg);
-
- session.close();
- }
-
- public void onMessage(Message message) {
- assertNotNull(message);
-
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/JmsAutoAckTest.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsAutoAckTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/JmsAutoAckTest.java
deleted file mode 100644
index 13eaa27..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsAutoAckTest.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq;
-
-import javax.jms.Topic;
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-public class JmsAutoAckTest extends TestSupport {
-
- private Connection connection;
-
- protected void setUp() throws Exception {
- super.setUp();
- connection = createConnection();
- }
-
- /**
- * @see junit.framework.TestCase#tearDown()
- */
- protected void tearDown() throws Exception {
- if (connection != null) {
- connection.close();
- connection = null;
- }
- super.tearDown();
- }
-
- /**
- * Tests if acknowleged messages are being consumed.
- *
- * @throws javax.jms.JMSException
- */
- public void testAckedMessageAreConsumed() throws JMSException {
- connection.start();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Topic queue = session.createTopic("test");
- MessageProducer producer = session.createProducer(queue);
- MessageConsumer consumer = session.createDurableSubscriber(queue, "subscriber-id1");
- producer.send(session.createTextMessage("Hello"));
-
- // Consume the message...
- Message msg = consumer.receive(1000);
- assertNotNull(msg);
-
- // Reset the session.
- session.close();
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- // Attempt to Consume the message...
- consumer = session.createDurableSubscriber(queue, "subscriber-id1");
- msg = consumer.receive(1000);
- assertNull(msg);
-
- session.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/JmsBenchmark.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsBenchmark.java b/hedwig-client-jms/src/test/java/org/apache/activemq/JmsBenchmark.java
deleted file mode 100644
index dd914b5..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsBenchmark.java
+++ /dev/null
@@ -1,204 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq;
-
-import javax.jms.Topic;
-import org.apache.hedwig.jms.SessionImpl;
-import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import javax.jms.BytesMessage;
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.DeliveryMode;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import junit.framework.Test;
-
-
-
-import javax.jms.Destination;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Benchmarks the broker by starting many consumer and producers against the
- * same destination. Make sure you run with jvm option -server (makes a big
- * difference). The tests simulate storing 1000 1k jms messages to see the rate
- * of processing msg/sec.
- */
-public class JmsBenchmark extends JmsTestSupport {
- private static final transient Logger LOG = LoggerFactory.getLogger(JmsBenchmark.class);
-
- private static final long SAMPLE_DELAY = Integer.parseInt(System.getProperty("SAMPLE_DELAY", "" + 1000 * 5));
- private static final long SAMPLES = Integer.parseInt(System.getProperty("SAMPLES", "10"));
- private static final long SAMPLE_DURATION = Integer.parseInt(System.getProperty("SAMPLES_DURATION",
- "" + 1000 * 60));
- private static final int PRODUCER_COUNT = Integer.parseInt(System.getProperty("PRODUCER_COUNT", "10"));
- private static final int CONSUMER_COUNT = Integer.parseInt(System.getProperty("CONSUMER_COUNT", "10"));
-
- public Destination destination;
-
- public static Test suite() {
- return suite(JmsBenchmark.class);
- }
-
- public static void main(String[] args) {
- junit.textui.TestRunner.run(JmsBenchmark.class);
- }
-
- public void initCombos() {
- addCombinationValues("destination", new Object[] {SessionImpl.asTopic("TEST")});
- }
-
- protected ConnectionFactory createConnectionFactory() throws URISyntaxException, IOException {
- return new HedwigConnectionFactoryImpl();
- }
-
- /**
- * @throws Throwable
- */
- public void testConcurrentSendReceive() throws Throwable {
-
- final Semaphore connectionsEstablished = new Semaphore(1 - (CONSUMER_COUNT + PRODUCER_COUNT));
- final Semaphore workerDone = new Semaphore(1 - (CONSUMER_COUNT + PRODUCER_COUNT));
- final CountDownLatch sampleTimeDone = new CountDownLatch(1);
-
- final AtomicInteger producedMessages = new AtomicInteger(0);
- final AtomicInteger receivedMessages = new AtomicInteger(0);
-
- final Callable producer = new Callable() {
- public Object call() throws JMSException, InterruptedException {
- Connection connection = factory.createConnection();
- connections.add(connection);
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = session.createProducer(destination);
- producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- BytesMessage message = session.createBytesMessage();
- message.writeBytes(new byte[1024]);
- connection.start();
- connectionsEstablished.release();
-
- while (!sampleTimeDone.await(0, TimeUnit.MILLISECONDS)) {
- producer.send(message);
- producedMessages.incrementAndGet();
- }
-
- connection.close();
- workerDone.release();
- return null;
- }
- };
-
- final Callable consumer = new Callable() {
- public Object call() throws JMSException, InterruptedException {
- Connection connection = factory.createConnection();
- connections.add(connection);
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer = session.createConsumer(destination);
-
- consumer.setMessageListener(new MessageListener() {
- public void onMessage(Message msg) {
- receivedMessages.incrementAndGet();
- }
- });
- connection.start();
-
- connectionsEstablished.release();
- sampleTimeDone.await();
-
- connection.close();
- workerDone.release();
- return null;
- }
- };
-
- final Throwable workerError[] = new Throwable[1];
- for (int i = 0; i < PRODUCER_COUNT; i++) {
- new Thread("Producer:" + i) {
- public void run() {
- try {
- producer.call();
- } catch (Throwable e) {
- e.printStackTrace();
- workerError[0] = e;
- }
- }
- }.start();
- }
-
- for (int i = 0; i < CONSUMER_COUNT; i++) {
- new Thread("Consumer:" + i) {
- public void run() {
- try {
- consumer.call();
- } catch (Throwable e) {
- e.printStackTrace();
- workerError[0] = e;
- }
- }
- }.start();
- }
-
- LOG.info(getName() + ": Waiting for Producers and Consumers to startup.");
- connectionsEstablished.acquire();
- LOG.info("Producers and Consumers are now running. Waiting for system to reach steady state: "
- + (SAMPLE_DELAY / 1000.0f) + " seconds");
- Thread.sleep(1000 * 10);
-
- LOG.info("Starting sample: " + SAMPLES + " each lasting " + (SAMPLE_DURATION / 1000.0f) + " seconds");
-
- for (int i = 0; i < SAMPLES; i++) {
-
- long start = System.currentTimeMillis();
- producedMessages.set(0);
- receivedMessages.set(0);
-
- Thread.sleep(SAMPLE_DURATION);
-
- long end = System.currentTimeMillis();
- int r = receivedMessages.get();
- int p = producedMessages.get();
-
- LOG.info("published: " + p + " msgs at " + (p * 1000f / (end - start)) + " msgs/sec, "
- + "consumed: " + r + " msgs at " + (r * 1000f / (end - start)) + " msgs/sec");
- }
-
- LOG.info("Sample done.");
- sampleTimeDone.countDown();
-
- workerDone.acquire();
- if (workerError[0] != null) {
- throw workerError[0];
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/JmsClientAckListenerTest.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsClientAckListenerTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/JmsClientAckListenerTest.java
deleted file mode 100644
index 78d7fb3..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsClientAckListenerTest.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq;
-
-import javax.jms.Topic;
-import javax.jms.Connection;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-public class JmsClientAckListenerTest extends TestSupport implements MessageListener {
-
- private Connection connection;
- private boolean dontAck;
-
- protected void setUp() throws Exception {
- super.setUp();
- connection = createConnection();
- }
-
- /**
- * @see junit.framework.TestCase#tearDown()
- */
- protected void tearDown() throws Exception {
- if (connection != null) {
- connection.close();
- connection = null;
- }
- super.tearDown();
- }
-
- /**
- * Tests if acknowleged messages are being consumed.
- *
- * @throws javax.jms.JMSException
- */
- public void testAckedMessageAreConsumed() throws Exception {
- connection.start();
- Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- Topic queue = session.createTopic("test");
- MessageProducer producer = session.createProducer(queue);
- producer.send(session.createTextMessage("Hello"));
-
- // Consume the message...
- MessageConsumer consumer = session.createDurableSubscriber(queue, "subscriber-id1");
- consumer.setMessageListener(this);
-
- Thread.sleep(10000);
-
- // Reset the session.
- session.close();
-
- session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-
- // Attempt to Consume the message...
- consumer = session.createDurableSubscriber(queue, "subscriber-id1");
- Message msg = consumer.receive(1000);
- assertNull(msg);
-
- session.close();
- }
-
- /**
- * Tests if unacknowleged messages are being redelivered when the consumer
- * connects again.
- *
- * @throws javax.jms.JMSException
- */
- public void testUnAckedMessageAreNotConsumedOnSessionClose() throws Exception {
- connection.start();
- // don't aknowledge message on onMessage() call
- dontAck = true;
- Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- Topic queue = session.createTopic("test");
- MessageProducer producer = session.createProducer(queue);
- // Consume the message...
- MessageConsumer consumer = session.createDurableSubscriber(queue, "subscriber-id2");
- consumer.setMessageListener(this);
-
- // Don't ack the message.
- producer.send(session.createTextMessage("Hello"));
-
- // Reset the session. This should cause the Unacked message to be
- // redelivered.
- session.close();
-
- Thread.sleep(10000);
- session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- // Attempt to Consume the message...
- consumer = session.createDurableSubscriber(queue, "subscriber-id2");
- Message msg = consumer.receive(2000);
- assertNotNull(msg);
- msg.acknowledge();
-
- session.close();
- }
-
- public void onMessage(Message message) {
-
- assertNotNull(message);
- if (!dontAck) {
- try {
- message.acknowledge();
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/JmsClientAckTest.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsClientAckTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/JmsClientAckTest.java
deleted file mode 100644
index c4aa3c6..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsClientAckTest.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq;
-
-import javax.jms.Topic;
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-public class JmsClientAckTest extends TestSupport {
-
- private Connection connection;
-
- protected void setUp() throws Exception {
- super.setUp();
- connection = createConnection();
- }
-
- /**
- * @see junit.framework.TestCase#tearDown()
- */
- protected void tearDown() throws Exception {
- if (connection != null) {
- connection.close();
- connection = null;
- }
- super.tearDown();
- }
-
- /**
- * Tests if acknowledged messages are being consumed.
- *
- * @throws JMSException
- */
- public void testAckedMessageAreConsumed() throws JMSException {
- connection.start();
- Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- Topic queue = session.createTopic(getQueueName());
- MessageProducer producer = session.createProducer(queue);
- MessageConsumer consumer = session.createDurableSubscriber(queue, "subscriber-id1");
- producer.send(session.createTextMessage("Hello"));
-
- // Consume the message...
- Message msg = consumer.receive(1000);
- assertNotNull(msg);
- msg.acknowledge();
-
- // Reset the session.
- session.close();
- session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-
- // Attempt to Consume the message...
- consumer = session.createDurableSubscriber(queue, "subscriber-id1");
- msg = consumer.receive(1000);
- assertNull(msg);
-
- session.close();
- }
-
- /**
- * Tests if acknowledged messages are being consumed.
- *
- * @throws JMSException
- */
- public void testLastMessageAcked() throws JMSException {
- connection.start();
- Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- Topic queue = session.createTopic(getQueueName());
- MessageProducer producer = session.createProducer(queue);
- MessageConsumer consumer = session.createDurableSubscriber(queue, "subscriber-id2");
- producer.send(session.createTextMessage("Hello"));
- producer.send(session.createTextMessage("Hello2"));
- producer.send(session.createTextMessage("Hello3"));
-
- // Consume the message...
- Message msg = consumer.receive(1000);
- assertNotNull(msg);
- msg = consumer.receive(1000);
- assertNotNull(msg);
- msg = consumer.receive(1000);
- assertNotNull(msg);
- msg.acknowledge();
-
- // Reset the session.
- session.close();
- session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-
- // Attempt to Consume the message...
- consumer = session.createDurableSubscriber(queue, "subscriber-id2");
- msg = consumer.receive(1000);
- assertNull(msg);
-
- session.close();
- }
-
- /**
- * Tests if unacknowledged messages are being re-delivered when the consumer connects again.
- *
- * @throws JMSException
- */
- public void testUnAckedMessageAreNotConsumedOnSessionClose() throws JMSException {
- connection.start();
- Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- Topic queue = session.createTopic(getQueueName());
- MessageProducer producer = session.createProducer(queue);
- MessageConsumer consumer = session.createDurableSubscriber(queue, "subscriber-id3");
- producer.send(session.createTextMessage("Hello"));
-
- // Consume the message...
- Message msg = consumer.receive(1000);
- assertNotNull(msg);
- // Don't ack the message.
-
- // Reset the session. This should cause the unacknowledged message to be re-delivered.
- session.close();
- session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-
- // Attempt to Consume the message...
- consumer = session.createDurableSubscriber(queue, "subscriber-id3");
- msg = consumer.receive(2000);
- assertNotNull(msg);
- msg.acknowledge();
-
- session.close();
- }
-
- protected String getQueueName() {
- return getClass().getName() + "." + getName();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/JmsConnectionStartStopTest.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsConnectionStartStopTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/JmsConnectionStartStopTest.java
deleted file mode 100644
index 3649614..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsConnectionStartStopTest.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq;
-
-import org.apache.hedwig.jms.DebugUtil;
-import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl;
-import java.util.Random;
-import java.util.Vector;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-
-public class JmsConnectionStartStopTest extends TestSupport {
-
- private static final org.apache.commons.logging.Log LOG = org.apache.commons.logging.LogFactory
- .getLog(JmsConnectionStartStopTest.class);
-
- private Connection startedConnection;
- private Connection stoppedConnection;
-
- /**
- * @see junit.framework.TestCase#setUp()
- */
- protected void setUp() throws Exception {
-
- super.setUp();
- LOG.info(getClass().getClassLoader().getResource("log4j.properties"));
-
- HedwigConnectionFactoryImpl factory = createConnectionFactory();
- startedConnection = factory.createConnection();
- startedConnection.start();
- stoppedConnection = factory.createConnection();
- }
-
- /**
- * @see junit.framework.TestCase#tearDown()
- */
- protected void tearDown() throws Exception {
- stoppedConnection.close();
- startedConnection.close();
- super.tearDown();
- }
-
- /**
- * Tests if the consumer receives the messages that were sent before the
- * connection was started.
- *
- * @throws JMSException
- */
- public void testStoppedConsumerHoldsMessagesTillStarted() throws JMSException {
- Session startedSession = startedConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Session stoppedSession = stoppedConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- // Setup the consumers.
- Topic topic = startedSession.createTopic("test");
- MessageConsumer startedConsumer = startedSession.createConsumer(topic);
- MessageConsumer stoppedConsumer = stoppedSession.createConsumer(topic);
-
- // Send the message.
- MessageProducer producer = startedSession.createProducer(topic);
- TextMessage message = startedSession.createTextMessage("Hello");
- producer.send(message);
-
- // Test the assertions.
- Message m = startedConsumer.receive(1000);
- assertNotNull(m);
-
- m = stoppedConsumer.receive(1000);
- assertNull(m);
-
- stoppedConnection.start();
- m = stoppedConsumer.receive(5000);
- assertNotNull(m);
-
- startedSession.close();
- stoppedSession.close();
- }
-
- /**
- * Tests if the consumer is able to receive messages eveb when the
- * connecction restarts multiple times.
- *
- * @throws Exception
- */
- public void testMultipleConnectionStops() throws Exception {
- testStoppedConsumerHoldsMessagesTillStarted();
- stoppedConnection.stop();
- testStoppedConsumerHoldsMessagesTillStarted();
- stoppedConnection.stop();
- testStoppedConsumerHoldsMessagesTillStarted();
- }
-
-
- public void testConcurrentSessionCreateWithStart() throws Exception {
- ThreadPoolExecutor executor = new ThreadPoolExecutor(50, Integer.MAX_VALUE,
- 60L, TimeUnit.SECONDS,
- new SynchronousQueue<Runnable>());
- final Vector<Throwable> exceptions = new Vector<Throwable>();
- final Random rand = new Random();
- final int numIterations = 100;
- final CountDownLatch latch = new CountDownLatch(numIterations * 2);
- Runnable createSessionTask = new Runnable() {
- public void run() {
- try {
- TimeUnit.MILLISECONDS.sleep(rand.nextInt(10));
- latch.countDown();
- stoppedConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- } catch (Exception e) {
- e.printStackTrace();
- exceptions.add(e);
- }
- }
- };
-
- Runnable startStopTask = new Runnable() {
- public void run() {
- try {
- TimeUnit.MILLISECONDS.sleep(rand.nextInt(10));
- latch.countDown();
- stoppedConnection.start();
- stoppedConnection.stop();
- } catch (Exception e) {
- e.printStackTrace();
- exceptions.add(e);
- }
- }
- };
-
- for (int i=0; i<numIterations; i++) {
- executor.execute(createSessionTask);
- executor.execute(startStopTask);
- }
-
- executor.shutdown();
- final long remaining;
- {
- boolean terminated = executor.awaitTermination(30, TimeUnit.SECONDS);
- remaining = latch.getCount();
- if (!terminated){
- DebugUtil.dumpAllStacktraces(System.err);
- }
- assertTrue("executor terminated. remaining : " + remaining, terminated);
- }
- assertTrue("remaining : " + remaining + ", no exceptions: " + exceptions, exceptions.isEmpty());
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/JmsConsumerResetActiveListenerTest.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsConsumerResetActiveListenerTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/JmsConsumerResetActiveListenerTest.java
deleted file mode 100644
index aaf47f2..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsConsumerResetActiveListenerTest.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq;
-
-import javax.jms.Topic;
-import java.util.Vector;
-
-import org.apache.hedwig.JmsTestBase;
-import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import junit.framework.TestCase;
-
-
-public class JmsConsumerResetActiveListenerTest extends JmsTestBase {
-
- private Connection connection;
- private HedwigConnectionFactoryImpl factory;
-
- protected void setUp() throws Exception {
- super.setUp();
- factory = new HedwigConnectionFactoryImpl();
- connection = factory.createConnection();
- }
-
- protected void tearDown() throws Exception {
- if (connection != null) {
- connection.close();
- connection = null;
- }
- super.tearDown();
- }
-
- /**
- * verify the (undefined by spec) behaviour of setting a listener while receiving a message.
- *
- * @throws Exception
- */
- public void testSetListenerFromListener() throws Exception {
- Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- Destination dest = session.createTopic("Queue-" + getName());
- final MessageConsumer consumer = session.createConsumer(dest);
-
- final CountDownLatch latch = new CountDownLatch(2);
- final AtomicBoolean first = new AtomicBoolean(true);
- final Vector<Object> results = new Vector<Object>();
- consumer.setMessageListener(new MessageListener() {
-
- public void onMessage(Message message) {
- if (first.compareAndSet(true, false)) {
- try {
- consumer.setMessageListener(this);
- results.add(message);
- } catch (JMSException e) {
- results.add(e);
- }
- } else {
- results.add(message);
- }
- latch.countDown();
- }
- });
-
- connection.start();
-
- MessageProducer producer = session.createProducer(dest);
- producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- producer.send(session.createTextMessage("First"));
- producer.send(session.createTextMessage("Second"));
-
- assertTrue("we did not timeout", latch.await(5, TimeUnit.SECONDS));
-
- assertEquals("we have a result", 2, results.size());
- Object result = results.get(0);
- assertTrue(result instanceof TextMessage);
- assertEquals("result is first", "First", ((TextMessage)result).getText());
- result = results.get(1);
- assertTrue(result instanceof TextMessage);
- assertEquals("result is first", "Second", ((TextMessage)result).getText());
- }
-
- /**
- * and a listener on a new consumer, just in case.
- *
- * @throws Exception
- */
- public void testNewConsumerSetListenerFromListener() throws Exception {
- final Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- final Destination dest = session.createTopic("Queue-" + getName());
- final MessageConsumer consumer = session.createConsumer(dest);
-
- final CountDownLatch latch = new CountDownLatch(2);
- final AtomicBoolean first = new AtomicBoolean(true);
- final Vector<Object> results = new Vector<Object>();
- consumer.setMessageListener(new MessageListener() {
-
- public void onMessage(Message message) {
- if (first.compareAndSet(true, false)) {
- try {
- MessageConsumer anotherConsumer = session.createConsumer(dest);
- anotherConsumer.setMessageListener(this);
- results.add(message);
- } catch (JMSException e) {
- results.add(e);
- }
- } else {
- results.add(message);
- }
- latch.countDown();
- }
- });
-
- connection.start();
-
- MessageProducer producer = session.createProducer(dest);
- producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- producer.send(session.createTextMessage("First"));
- producer.send(session.createTextMessage("Second"));
-
- assertTrue("we did not timeout", latch.await(5, TimeUnit.SECONDS));
-
- assertEquals("we have a result", 2, results.size());
- Object result = results.get(0);
- assertTrue(result instanceof TextMessage);
- assertEquals("result is first", "First", ((TextMessage)result).getText());
- result = results.get(1);
- assertTrue(result instanceof TextMessage);
- assertEquals("result is first", "Second", ((TextMessage)result).getText());
- }
- }
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/JmsCreateConsumerInOnMessageTest.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsCreateConsumerInOnMessageTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/JmsCreateConsumerInOnMessageTest.java
deleted file mode 100644
index 218bbe5..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsCreateConsumerInOnMessageTest.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq;
-
-import javax.jms.Connection;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.Topic;
-
-public class JmsCreateConsumerInOnMessageTest extends TestSupport implements MessageListener {
-
- private Connection connection;
- private Session publisherSession;
- private Session consumerSession;
- private MessageConsumer consumer;
- private MessageConsumer testConsumer;
- private MessageProducer producer;
- private Topic topic;
- private Object lock = new Object();
-
- /*
- * @see junit.framework.TestCase#setUp()
- */
- protected void setUp() throws Exception {
- super.setUp();
- super.topic = true;
- connection = createConnection(false);
- connection.setClientID("connection:" + getSubject());
- publisherSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- consumerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- topic = (Topic)super.createDestination("Test.Topic");
- consumer = consumerSession.createConsumer(topic);
- consumer.setMessageListener(this);
- producer = publisherSession.createProducer(topic);
- connection.start();
- }
-
- /*
- * @see junit.framework.TestCase#tearDown()
- */
- protected void tearDown() throws Exception {
- connection.close();
- super.tearDown();
- }
-
- /**
- * Tests if a consumer can be created asynchronusly
- *
- * @throws Exception
- */
- public void testCreateConsumer() throws Exception {
- Message msg = super.createMessage();
- producer.send(msg);
- if (testConsumer == null) {
- synchronized (lock) {
- lock.wait(3000);
- }
- }
- assertTrue(testConsumer != null);
- }
-
- /**
- * Use the asynchronous subscription mechanism
- *
- * @param message
- */
- public void onMessage(Message message) {
- try {
- testConsumer = consumerSession.createConsumer(topic);
- consumerSession.createProducer(topic);
- synchronized (lock) {
- lock.notify();
- }
- } catch (Exception ex) {
- ex.printStackTrace();
- assertTrue(false);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/JmsDurableTopicSelectorTest.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsDurableTopicSelectorTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/JmsDurableTopicSelectorTest.java
deleted file mode 100644
index 548e7a8..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsDurableTopicSelectorTest.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq;
-
-import javax.jms.Topic;
-
-public class JmsDurableTopicSelectorTest extends JmsTopicSelectorTest {
- public void setUp() throws Exception {
- durable = true;
- super.setUp();
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/JmsDurableTopicSendReceiveTest.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsDurableTopicSendReceiveTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/JmsDurableTopicSendReceiveTest.java
deleted file mode 100644
index b2e2ed7..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsDurableTopicSendReceiveTest.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-
-import org.apache.activemq.test.JmsTopicSendReceiveTest;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class JmsDurableTopicSendReceiveTest extends JmsTopicSendReceiveTest {
- private static final Logger LOG = LoggerFactory.getLogger(JmsDurableTopicSendReceiveTest.class);
-
- protected Connection connection2;
- protected Session session2;
- protected Session consumeSession2;
- protected MessageConsumer consumer2;
- protected MessageProducer producer2;
- protected Destination consumerDestination2;
- protected Destination producerDestination2;
-
- /**
- * Set up a durable suscriber test.
- *
- * @see junit.framework.TestCase#setUp()
- */
- protected void setUp() throws Exception {
- this.durable = true;
- super.setUp();
- }
-
- /**
- * Test if all the messages sent are being received.
- *
- * @throws Exception
- */
- public void testSendWhileClosed() throws Exception {
- connection2 = createConnection(false);
- if (null == connection.getClientID()) connection2.setClientID(getName() + "test");
- connection2.start();
- session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
- producer2 = session2.createProducer(null);
- producer2.setDeliveryMode(deliveryMode);
- producerDestination2 = session2.createTopic(getProducerSubject() + "2");
- Thread.sleep(1000);
-
- consumeSession2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
- consumerDestination2 = session2.createTopic(getConsumerSubject() + "2");
- consumer2 = consumeSession2.createDurableSubscriber((Topic)consumerDestination2, getName());
- Thread.sleep(1000);
- consumer2.close();
- TextMessage message = session2.createTextMessage("test");
- message.setStringProperty("test", "test");
- message.setJMSType("test");
- producer2.send(producerDestination2, message);
- LOG.info("Creating durable consumer");
- consumer2 = consumeSession2.createDurableSubscriber((Topic)consumerDestination2, getName());
- Message msg = consumer2.receive(1000);
- assertNotNull(msg);
- assertEquals(((TextMessage)msg).getText(), "test");
- assertEquals(msg.getJMSType(), "test");
- assertEquals(msg.getStringProperty("test"), "test");
- connection2.stop();
- connection2.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/JmsDurableTopicTransactionTest.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsDurableTopicTransactionTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/JmsDurableTopicTransactionTest.java
deleted file mode 100644
index c0da42f..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsDurableTopicTransactionTest.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq;
-
-import javax.jms.Topic;
-import javax.jms.DeliveryMode;
-
-import org.apache.activemq.test.JmsResourceProvider;
-
-public class JmsDurableTopicTransactionTest extends JmsTopicTransactionTest {
-
- /**
- * @see JmsTransactionTestSupport#getJmsResourceProvider()
- */
- protected JmsResourceProvider getJmsResourceProvider() {
- JmsResourceProvider provider = new JmsResourceProvider();
- provider.setTopic(true);
- provider.setDeliveryMode(DeliveryMode.PERSISTENT);
- provider.setClientID(getClass().getName());
- provider.setDurableName(getName());
- return provider;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java b/hedwig-client-jms/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java
deleted file mode 100644
index 3873a0f..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java
+++ /dev/null
@@ -1,281 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq;
-
-import java.net.URI;
-import org.apache.hedwig.jms.SessionImpl;
-import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.jms.TopicSubscriber;
-
-
-
-import javax.jms.Destination;
-
-
-import org.apache.activemq.util.MessageIdList;
-
-/**
- * Test case support used to test multiple message comsumers and message
- * producers connecting to a single broker.
- */
-public class JmsMultipleClientsTestSupport extends CombinationTestSupport {
-
- protected Map<MessageConsumer, MessageIdList> consumers
- = new HashMap<MessageConsumer, MessageIdList>(); // Map of consumer with messages received
- protected int consumerCount = 1;
- protected int producerCount = 1;
-
- protected int messageSize = 1024;
-
- protected boolean useConcurrentSend = true;
- protected boolean autoFail = true;
- protected boolean durable;
- public boolean topic;
- protected boolean persistent;
-
- protected Destination destination;
- protected List<Connection> connections = Collections.synchronizedList(new ArrayList<Connection>());
- protected MessageIdList allMessagesList = new MessageIdList();
-
- private AtomicInteger producerLock;
-
- protected void startProducers(Destination dest, int msgCount) throws Exception {
- startProducers(createConnectionFactory(), dest, msgCount);
- }
-
- protected void startProducers(final ConnectionFactory factory,
- final Destination dest, final int msgCount) throws Exception {
- // Use concurrent send
- if (useConcurrentSend) {
- producerLock = new AtomicInteger(producerCount);
-
- for (int i = 0; i < producerCount; i++) {
- Thread t = new Thread(new Runnable() {
- public void run() {
- try {
- sendMessages(factory.createConnection(), dest, msgCount);
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- synchronized (producerLock) {
- producerLock.decrementAndGet();
- producerLock.notifyAll();
- }
- }
- });
-
- t.start();
- }
-
- // Wait for all producers to finish sending
- synchronized (producerLock) {
- while (producerLock.get() != 0) {
- producerLock.wait(2000);
- }
- }
-
- // Use serialized send
- } else {
- for (int i = 0; i < producerCount; i++) {
- sendMessages(factory.createConnection(), dest, msgCount);
- }
- }
- }
-
- protected void sendMessages(Connection connection, Destination destination, int count) throws Exception {
- connections.add(connection);
- connection.start();
-
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = session.createProducer(destination);
- producer.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-
- for (int i = 0; i < count; i++) {
- TextMessage msg = createTextMessage(session, "" + i);
- producer.send(msg);
- }
-
- producer.close();
- session.close();
- connection.close();
- }
-
- protected TextMessage createTextMessage(Session session, String initText) throws Exception {
- TextMessage msg = session.createTextMessage();
-
- // Pad message text
- if (initText.length() < messageSize) {
- char[] data = new char[messageSize - initText.length()];
- Arrays.fill(data, '*');
- String str = new String(data);
- msg.setText(initText + str);
-
- // Do not pad message text
- } else {
- msg.setText(initText);
- }
-
- return msg;
- }
-
- protected void startConsumers(Destination dest) throws Exception {
- startConsumers(createConnectionFactory(), dest);
- }
-
- protected void startConsumers(ConnectionFactory factory, Destination dest) throws Exception {
- MessageConsumer consumer;
- for (int i = 0; i < consumerCount; i++) {
- if (durable && topic) {
- consumer = createDurableSubscriber(factory.createConnection(), dest, "consumer" + (i + 1));
- } else {
- consumer = createMessageConsumer(factory.createConnection(), dest);
- }
- MessageIdList list = new MessageIdList();
- list.setParent(allMessagesList);
- consumer.setMessageListener(list);
- consumers.put(consumer, list);
- }
- }
-
- protected MessageConsumer createMessageConsumer(Connection conn, Destination dest) throws Exception {
- connections.add(conn);
-
- Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- final MessageConsumer consumer = sess.createConsumer(dest);
- conn.start();
-
- return consumer;
- }
-
- protected TopicSubscriber createDurableSubscriber(Connection conn, Destination dest, String name) throws Exception {
- conn.setClientID(name);
- connections.add(conn);
- conn.start();
-
- Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- final TopicSubscriber consumer = sess.createDurableSubscriber((javax.jms.Topic)dest, name);
-
- return consumer;
- }
-
- protected void waitForAllMessagesToBeReceived(int messageCount) throws Exception {
- allMessagesList.waitForMessagesToArrive(messageCount);
- }
-
- protected Destination createDestination() throws JMSException {
- String name = "." + getClass().getName() + "." + getName();
- // ensure not inadvertently composite because of combos
- name = name.replace(' ','_');
- name = name.replace(',','&');
- if (topic) {
- destination = SessionImpl.asTopic("Topic" + name);
- return (Destination)destination;
- } else {
- destination = SessionImpl.asTopic("Queue" + name);
- return (Destination)destination;
- }
- }
-
- protected ConnectionFactory createConnectionFactory() throws Exception {
- return new HedwigConnectionFactoryImpl();
- }
-
- protected void setUp() throws Exception {
- super.setAutoFail(autoFail);
- super.setUp();
- }
-
- protected void tearDown() throws Exception {
- for (Iterator<Connection> iter = connections.iterator(); iter.hasNext();) {
- Connection conn = iter.next();
- try {
- conn.close();
- } catch (Throwable e) {
- }
- }
- allMessagesList.flushMessages();
- consumers.clear();
- super.tearDown();
- }
-
- /*
- * Some helpful assertions for multiple consumers.
- */
- protected void assertConsumerReceivedAtLeastXMessages(MessageConsumer consumer, int msgCount) {
- MessageIdList messageIdList = consumers.get(consumer);
- messageIdList.assertAtLeastMessagesReceived(msgCount);
- }
-
- protected void assertConsumerReceivedAtMostXMessages(MessageConsumer consumer, int msgCount) {
- MessageIdList messageIdList = consumers.get(consumer);
- messageIdList.assertAtMostMessagesReceived(msgCount);
- }
-
- protected void assertConsumerReceivedXMessages(MessageConsumer consumer, int msgCount) {
- MessageIdList messageIdList = consumers.get(consumer);
- messageIdList.assertMessagesReceivedNoWait(msgCount);
- }
-
- protected void assertEachConsumerReceivedAtLeastXMessages(int msgCount) {
- for (Iterator<MessageConsumer> i = consumers.keySet().iterator(); i.hasNext();) {
- assertConsumerReceivedAtLeastXMessages(i.next(), msgCount);
- }
- }
-
- protected void assertEachConsumerReceivedAtMostXMessages(int msgCount) {
- for (Iterator<MessageConsumer> i = consumers.keySet().iterator(); i.hasNext();) {
- assertConsumerReceivedAtMostXMessages(i.next(), msgCount);
- }
- }
-
- protected void assertEachConsumerReceivedXMessages(int msgCount) {
- for (Iterator<MessageConsumer> i = consumers.keySet().iterator(); i.hasNext();) {
- assertConsumerReceivedXMessages(i.next(), msgCount);
- }
- }
-
- protected void assertTotalMessagesReceived(int msgCount) {
- allMessagesList.assertMessagesReceivedNoWait(msgCount);
-
- // now lets count the individual messages received
- int totalMsg = 0;
- for (Iterator<MessageConsumer> i = consumers.keySet().iterator(); i.hasNext();) {
- MessageIdList messageIdList = consumers.get(i.next());
- totalMsg += messageIdList.getMessageCount();
- }
- assertEquals("Total of consumers message count", msgCount, totalMsg);
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/JmsRedeliveredTest.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsRedeliveredTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/JmsRedeliveredTest.java
deleted file mode 100644
index fba7064..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsRedeliveredTest.java
+++ /dev/null
@@ -1,380 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq;
-
-import javax.jms.Connection;
-
-import org.apache.hedwig.JmsTestBase;
-import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-
-import junit.framework.Test;
-import junit.framework.TestCase;
-import junit.framework.TestSuite;
-
-public class JmsRedeliveredTest extends JmsTestBase {
-
- private Connection connection;
-
- /*
- * (non-Javadoc)
- *
- * @see junit.framework.TestCase#setUp()
- */
- protected void setUp() throws Exception {
- super.setUp();
- connection = createConnection();
- }
-
- /**
- * @see junit.framework.TestCase#tearDown()
- */
- protected void tearDown() throws Exception {
- if (connection != null) {
- connection.close();
- connection = null;
- }
- super.tearDown();
- }
-
- /**
- * Creates a connection.
- *
- * @return connection
- * @throws Exception
- */
- protected Connection createConnection() throws Exception {
- HedwigConnectionFactoryImpl factory = new HedwigConnectionFactoryImpl();
- return factory.createConnection();
- }
-
- /**
- * Tests if a message unacknowledged message gets to be resent when the
- * session is closed and then a new consumer session is created.
- *
- */
- public void testTopicSessionCloseMarksMessageRedelivered() throws JMSException {
- connection.start();
-
- Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- Topic queue = session.createTopic("queue-" + getName());
- MessageProducer producer = createProducer(session, queue);
- MessageConsumer consumer = session.createDurableSubscriber(queue, "subscriber-id1");
- producer.send(createTextMessage(session));
-
- // Consume the message...
- Message msg = consumer.receive(1000);
- assertNotNull(msg);
- assertFalse("Message should not be redelivered.", msg.getJMSRedelivered());
- // Don't ack the message.
-
- // Reset the session. This should cause the Unacked message to be
- // redelivered.
- session.close();
- session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-
- // Attempt to Consume the message...
- consumer = session.createDurableSubscriber(queue, "subscriber-id1");
- msg = consumer.receive(2000);
- assertNotNull(msg);
- // Since we only simulate this in provider, we cannot do this across consumers !
- // assertTrue("Message should be redelivered.", msg.getJMSRedelivered());
- msg.acknowledge();
-
- session.close();
- }
-
-
- public void testTopicSessionCloseMarksUnAckedMessageRedelivered() throws JMSException {
- connection.start();
-
- Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- Topic queue = session.createTopic("queue-" + getName());
- MessageProducer producer = createProducer(session, queue);
- MessageConsumer consumer = session.createDurableSubscriber(queue, "subscriber-id2");
- producer.send(createTextMessage(session, "1"));
- producer.send(createTextMessage(session, "2"));
-
- // Consume the message...
- Message msg = consumer.receive(1000);
- assertNotNull(msg);
- assertFalse("Message should not be redelivered.", msg.getJMSRedelivered());
- assertEquals("1", ((TextMessage)msg).getText());
- msg.acknowledge();
-
- // Don't ack the message.
- msg = consumer.receive(1000);
- assertNotNull(msg);
- assertFalse("Message should not be redelivered.", msg.getJMSRedelivered());
- assertEquals("2", ((TextMessage)msg).getText());
-
- // Reset the session. This should cause the Unacked message to be
- // redelivered.
- session.close();
- session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-
- // Attempt to Consume the message...
- consumer = session.createDurableSubscriber(queue, "subscriber-id2");
- msg = consumer.receive(2000);
- assertNotNull(msg);
- assertEquals("2", ((TextMessage)msg).getText());
- // Since we only simulate this in provider, we cannot do this across consumers !
- // assertTrue("Message should be redelivered.", msg.getJMSRedelivered());
- msg.acknowledge();
-
- session.close();
- }
-
- /**
- * Tests session recovery and that the redelivered message is marked as
- * such. Session uses client acknowledgement, the destination is a queue.
- *
- * @throws JMSException
- */
- public void testTopicRecoverMarksMessageRedelivered() throws Exception {
- connection.setClientID(getName());
- connection.start();
-
- Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- Topic queue = session.createTopic("queue-" + getName());
- MessageProducer producer = createProducer(session, queue);
- MessageConsumer consumer = session.createDurableSubscriber(queue, getName() + " - subscriber");
- producer.send(createTextMessage(session));
-
- // Consume the message...
- Message msg = consumer.receive(1000);
- assertNotNull(msg);
- assertFalse("Message should not be redelivered.", msg.getJMSRedelivered());
- // Don't ack the message.
-
- // We DO NOT support session recovery
- // - to unblock this test, I am stopp'ing and start'ing connection : not the same, but ...
- // Reset the session. This should cause the Unacked message to be
- // redelivered.
- // session.recover();
- connection.close();
- connection = createConnection();
- connection.setClientID(getName());
- session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- consumer = session.createDurableSubscriber(queue, getName() + " - subscriber");
- connection.start();
-
- // Attempt to Consume the message...
- msg = consumer.receive(2000);
- assertNotNull(msg);
- // assertTrue("Message should be redelivered.", msg.getJMSRedelivered());
- msg.acknowledge();
-
- session.close();
- }
-
- /**
- * Tests rollback message to be marked as redelivered. Session uses client
- * acknowledgement and the destination is a queue.
- *
- * @throws JMSException
- */
- public void testTopicRollbackMarksMessageRedelivered() throws JMSException {
- connection.start();
-
- Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
- Topic queue = session.createTopic("queue-" + getName());
- MessageProducer producer = createProducer(session, queue);
- MessageConsumer consumer = session.createConsumer(queue);
- producer.send(createTextMessage(session));
- session.commit();
-
- // Get the message... Should not be redelivered.
- Message msg = consumer.receive(1000);
- assertNotNull(msg);
- assertFalse("Message should not be redelivered.", msg.getJMSRedelivered());
-
- // Rollback.. should cause redelivery.
- session.rollback();
-
- // Attempt to Consume the message...
- msg = consumer.receive(2000);
- assertNotNull(msg);
- assertTrue("Message should be redelivered.", msg.getJMSRedelivered());
-
- session.commit();
- session.close();
- }
-
- /**
- * Tests if the message gets to be re-delivered when the session closes and
- * that the re-delivered message is marked as such. Session uses client
- * acknowledgment, the destination is a topic and the consumer is a durable
- * subscriber.
- *
- * @throws JMSException
- */
- public void testDurableTopicSessionCloseMarksMessageRedelivered() throws JMSException {
- connection.setClientID(getName());
- connection.start();
-
- Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- Topic topic = session.createTopic("topic-" + getName());
- MessageConsumer consumer = session.createDurableSubscriber(topic, "sub1");
-
- // This case only works with persistent messages since transient
- // messages
- // are dropped when the consumer goes offline.
- MessageProducer producer = session.createProducer(topic);
- producer.setDeliveryMode(DeliveryMode.PERSISTENT);
- producer.send(createTextMessage(session));
-
- // Consume the message...
- Message msg = consumer.receive(1000);
- assertNotNull(msg);
- assertFalse("Message should not be re-delivered.", msg.getJMSRedelivered());
- // Don't ack the message.
-
- // Reset the session. This should cause the Unacked message to be
- // re-delivered.
- session.close();
- session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-
- // Attempt to Consume the message...
- consumer = session.createDurableSubscriber(topic, "sub1");
- msg = consumer.receive(2000);
- assertNotNull(msg);
- // Since we only simulate this in provider, we cannot do this across consumers !
- // assertTrue("Message should be redelivered.", msg.getJMSRedelivered());
- msg.acknowledge();
-
- session.close();
- }
-
- /**
- * Tests rollback message to be marked as redelivered. Session uses client
- * acknowledgement and the destination is a topic.
- *
- * @throws JMSException
- */
- public void testDurableTopicRollbackMarksMessageRedelivered() throws JMSException {
- if (null == connection.getClientID()) connection.setClientID(getName());
- connection.start();
-
- Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
- Topic topic = session.createTopic("topic-" + getName());
- MessageConsumer consumer = session.createDurableSubscriber(topic, "sub1");
-
- MessageProducer producer = createProducer(session, topic);
- producer.send(createTextMessage(session));
- session.commit();
-
- // Get the message... Should not be redelivered.
- Message msg = consumer.receive(1000);
- assertNotNull(msg);
- assertFalse("Message should not be redelivered.", msg.getJMSRedelivered());
-
- // Rollback.. should cause redelivery.
- session.rollback();
-
- // Attempt to Consume the message...
- msg = consumer.receive(2000);
- assertNotNull(msg);
- assertTrue("Message should be redelivered.", msg.getJMSRedelivered());
-
- session.commit();
- session.close();
- }
-
- /**
- * Creates a text message.
- *
- * @param session
- * @return TextMessage.
- * @throws JMSException
- */
- private TextMessage createTextMessage(Session session) throws JMSException {
- return createTextMessage(session, "Hello");
- }
-
- private TextMessage createTextMessage(Session session, String txt) throws JMSException {
- return session.createTextMessage(txt);
- }
-
- /**
- * Creates a producer.
- *
- * @param session
- * @param queue - destination.
- * @return MessageProducer
- * @throws JMSException
- */
- private MessageProducer createProducer(Session session, Destination queue) throws JMSException {
- MessageProducer producer = session.createProducer(queue);
- producer.setDeliveryMode(getDeliveryMode());
- return producer;
- }
-
- /**
- * Returns delivery mode.
- *
- * @return int - persistent delivery mode.
- */
- protected int getDeliveryMode() {
- return DeliveryMode.PERSISTENT;
- }
-
- /**
- * Run the JmsRedeliverTest with the delivery mode set as persistent.
- */
- public static final class PersistentCase extends JmsRedeliveredTest {
-
- /**
- * Returns delivery mode.
- *
- * @return int - persistent delivery mode.
- */
- protected int getDeliveryMode() {
- return DeliveryMode.PERSISTENT;
- }
- }
-
- /**
- * Run the JmsRedeliverTest with the delivery mode set as non-persistent.
- */
- public static final class TransientCase extends JmsRedeliveredTest {
-
- /**
- * Returns delivery mode.
- *
- * @return int - non-persistent delivery mode.
- */
- protected int getDeliveryMode() {
- return DeliveryMode.NON_PERSISTENT;
- }
- }
-
- public static Test suite() {
- TestSuite suite = new TestSuite();
- suite.addTestSuite(PersistentCase.class);
- suite.addTestSuite(TransientCase.class);
- return suite;
- }
-}