You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2016/03/16 04:44:49 UTC

[39/49] bookkeeper git commit: BOOKKEEPER-769: Remove the Hedwig Code

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/JMSUsecaseTest.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/JMSUsecaseTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/JMSUsecaseTest.java
deleted file mode 100644
index cadb071..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/JMSUsecaseTest.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq;
-
-import javax.jms.DeliveryMode;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import junit.framework.Test;
-
-import javax.jms.Destination;
-
-import org.apache.hedwig.jms.MessagingSessionFacade;
-import org.apache.hedwig.jms.SessionImpl;
-import org.apache.hedwig.jms.message.MessageImpl;
-
-public class JMSUsecaseTest extends JmsTestSupport {
-
-    public Destination destination;
-    public int deliveryMode;
-    public int prefetch;
-    public MessagingSessionFacade.DestinationType destinationType;
-    public boolean durableConsumer;
-
-    public static Test suite() {
-        return suite(JMSUsecaseTest.class);
-    }
-
-    public static void main(String[] args) {
-        junit.textui.TestRunner.run(suite());
-    }
-
-    public void initCombosForTestSendReceive() {
-        addCombinationValues("deliveryMode", new Object[] {
-                Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
-        addCombinationValues("destinationType", new Object[] {MessagingSessionFacade.DestinationType.TOPIC});
-    }
-
-    public void testSendReceive() throws Exception {
-        // Send a message to the broker.
-        connection.start();
-        SessionImpl session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        destination = createDestination(session, destinationType);
-        MessageProducer producer = session.createProducer(destination);
-        MessageConsumer consumer = session.createConsumer(destination);
-        MessageImpl message = new MessageImpl(session);
-        producer.send(message);
-
-        // Make sure only 1 message was delivered.
-        assertNotNull(consumer.receive(1000));
-        assertNull(consumer.receiveNoWait());
-    }
-
-    public void initCombosForTestSendReceiveTransacted() {
-        addCombinationValues("deliveryMode", new Object[] {
-                Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
-        addCombinationValues("destinationType", new Object[] {MessagingSessionFacade.DestinationType.TOPIC});
-    }
-
-    public void testSendReceiveTransacted() throws Exception {
-        // Send a message to the broker.
-        connection.start();
-        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
-        destination = createDestination(session, destinationType);
-        MessageProducer producer = session.createProducer(destination);
-        MessageConsumer consumer = session.createConsumer(destination);
-        producer.send(session.createTextMessage("test"));
-
-        // Message should not be delivered until commit.
-        assertNull(consumer.receiveNoWait());
-        session.commit();
-
-        // Make sure only 1 message was delivered.
-        Message message = consumer.receive(1000);
-        assertNotNull(message);
-        assertFalse(message.getJMSRedelivered());
-        assertNull(consumer.receiveNoWait());
-
-        // Message should be redelivered is rollback is used.
-        session.rollback();
-
-        // Make sure only 1 message was delivered.
-        message = consumer.receive(2000);
-        assertNotNull(message);
-        assertTrue(message.getJMSRedelivered());
-        assertNull(consumer.receiveNoWait());
-
-        // If we commit now, the message should not be redelivered.
-        session.commit();
-        assertNull(consumer.receiveNoWait());
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/JmsAutoAckListenerTest.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsAutoAckListenerTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/JmsAutoAckListenerTest.java
deleted file mode 100644
index ec5243f..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsAutoAckListenerTest.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq;
-
-import javax.jms.Topic;
-import javax.jms.Connection;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-public class JmsAutoAckListenerTest extends TestSupport implements MessageListener {
-
-    private Connection connection;
-
-    protected void setUp() throws Exception {
-        super.setUp();
-        connection = createConnection();
-    }
-
-    /**
-     * @see junit.framework.TestCase#tearDown()
-     */
-    protected void tearDown() throws Exception {
-        if (connection != null) {
-            connection.close();
-            connection = null;
-        }
-        super.tearDown();
-    }
-
-    /**
-     * Tests if acknowleged messages are being consumed.
-     *
-     * @throws javax.jms.JMSException
-     */
-    public void testAckedMessageAreConsumed() throws Exception {
-        connection.start();
-        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        Topic queue = session.createTopic("test");
-        MessageProducer producer = session.createProducer(queue);
-        MessageConsumer consumer = session.createDurableSubscriber(queue, "subscriber-id1");
-        consumer.setMessageListener(this);
-        producer.send(session.createTextMessage("Hello"));
-
-        // Consume the message...
-
-        Thread.sleep(10000);
-        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        // Attempt to Consume the message...check if message was acknowledge
-        consumer = session.createDurableSubscriber(queue, "subscriber-id1");
-        Message msg = consumer.receive(1000);
-        assertNull(msg);
-
-        session.close();
-    }
-
-    public void onMessage(Message message) {
-        assertNotNull(message);
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/JmsAutoAckTest.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsAutoAckTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/JmsAutoAckTest.java
deleted file mode 100644
index 13eaa27..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsAutoAckTest.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq;
-
-import javax.jms.Topic;
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-public class JmsAutoAckTest extends TestSupport {
-
-    private Connection connection;
-
-    protected void setUp() throws Exception {
-        super.setUp();
-        connection = createConnection();
-    }
-
-    /**
-     * @see junit.framework.TestCase#tearDown()
-     */
-    protected void tearDown() throws Exception {
-        if (connection != null) {
-            connection.close();
-            connection = null;
-        }
-        super.tearDown();
-    }
-
-    /**
-     * Tests if acknowleged messages are being consumed.
-     *
-     * @throws javax.jms.JMSException
-     */
-    public void testAckedMessageAreConsumed() throws JMSException {
-        connection.start();
-        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        Topic queue = session.createTopic("test");
-        MessageProducer producer = session.createProducer(queue);
-        MessageConsumer consumer = session.createDurableSubscriber(queue, "subscriber-id1");
-        producer.send(session.createTextMessage("Hello"));
-
-        // Consume the message...
-        Message msg = consumer.receive(1000);
-        assertNotNull(msg);
-
-        // Reset the session.
-        session.close();
-        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-        // Attempt to Consume the message...
-        consumer = session.createDurableSubscriber(queue, "subscriber-id1");
-        msg = consumer.receive(1000);
-        assertNull(msg);
-
-        session.close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/JmsBenchmark.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsBenchmark.java b/hedwig-client-jms/src/test/java/org/apache/activemq/JmsBenchmark.java
deleted file mode 100644
index dd914b5..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsBenchmark.java
+++ /dev/null
@@ -1,204 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq;
-
-import javax.jms.Topic;
-import org.apache.hedwig.jms.SessionImpl;
-import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import javax.jms.BytesMessage;
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.DeliveryMode;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import junit.framework.Test;
-
-
-
-import javax.jms.Destination;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Benchmarks the broker by starting many consumer and producers against the
- * same destination. Make sure you run with jvm option -server (makes a big
- * difference). The tests simulate storing 1000 1k jms messages to see the rate
- * of processing msg/sec.
- */
-public class JmsBenchmark extends JmsTestSupport {
-    private static final transient Logger LOG = LoggerFactory.getLogger(JmsBenchmark.class);
-
-    private static final long SAMPLE_DELAY = Integer.parseInt(System.getProperty("SAMPLE_DELAY", "" + 1000 * 5));
-    private static final long SAMPLES = Integer.parseInt(System.getProperty("SAMPLES", "10"));
-    private static final long SAMPLE_DURATION = Integer.parseInt(System.getProperty("SAMPLES_DURATION",
-                                                                                    "" + 1000 * 60));
-    private static final int PRODUCER_COUNT = Integer.parseInt(System.getProperty("PRODUCER_COUNT", "10"));
-    private static final int CONSUMER_COUNT = Integer.parseInt(System.getProperty("CONSUMER_COUNT", "10"));
-
-    public Destination destination;
-
-    public static Test suite() {
-        return suite(JmsBenchmark.class);
-    }
-
-    public static void main(String[] args) {
-        junit.textui.TestRunner.run(JmsBenchmark.class);
-    }
-
-    public void initCombos() {
-        addCombinationValues("destination", new Object[] {SessionImpl.asTopic("TEST")});
-    }
-
-    protected ConnectionFactory createConnectionFactory() throws URISyntaxException, IOException {
-        return new HedwigConnectionFactoryImpl();
-    }
-
-    /**
-     * @throws Throwable
-     */
-    public void testConcurrentSendReceive() throws Throwable {
-
-        final Semaphore connectionsEstablished = new Semaphore(1 - (CONSUMER_COUNT + PRODUCER_COUNT));
-        final Semaphore workerDone = new Semaphore(1 - (CONSUMER_COUNT + PRODUCER_COUNT));
-        final CountDownLatch sampleTimeDone = new CountDownLatch(1);
-
-        final AtomicInteger producedMessages = new AtomicInteger(0);
-        final AtomicInteger receivedMessages = new AtomicInteger(0);
-
-        final Callable producer = new Callable() {
-            public Object call() throws JMSException, InterruptedException {
-                Connection connection = factory.createConnection();
-                connections.add(connection);
-                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-                MessageProducer producer = session.createProducer(destination);
-                producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-                BytesMessage message = session.createBytesMessage();
-                message.writeBytes(new byte[1024]);
-                connection.start();
-                connectionsEstablished.release();
-
-                while (!sampleTimeDone.await(0, TimeUnit.MILLISECONDS)) {
-                    producer.send(message);
-                    producedMessages.incrementAndGet();
-                }
-
-                connection.close();
-                workerDone.release();
-                return null;
-            }
-        };
-
-        final Callable consumer = new Callable() {
-            public Object call() throws JMSException, InterruptedException {
-                Connection connection = factory.createConnection();
-                connections.add(connection);
-                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-                MessageConsumer consumer = session.createConsumer(destination);
-
-                consumer.setMessageListener(new MessageListener() {
-                    public void onMessage(Message msg) {
-                        receivedMessages.incrementAndGet();
-                    }
-                });
-                connection.start();
-
-                connectionsEstablished.release();
-                sampleTimeDone.await();
-
-                connection.close();
-                workerDone.release();
-                return null;
-            }
-        };
-
-        final Throwable workerError[] = new Throwable[1];
-        for (int i = 0; i < PRODUCER_COUNT; i++) {
-            new Thread("Producer:" + i) {
-                public void run() {
-                    try {
-                        producer.call();
-                    } catch (Throwable e) {
-                        e.printStackTrace();
-                        workerError[0] = e;
-                    }
-                }
-            }.start();
-        }
-
-        for (int i = 0; i < CONSUMER_COUNT; i++) {
-            new Thread("Consumer:" + i) {
-                public void run() {
-                    try {
-                        consumer.call();
-                    } catch (Throwable e) {
-                        e.printStackTrace();
-                        workerError[0] = e;
-                    }
-                }
-            }.start();
-        }
-
-        LOG.info(getName() + ": Waiting for Producers and Consumers to startup.");
-        connectionsEstablished.acquire();
-        LOG.info("Producers and Consumers are now running.  Waiting for system to reach steady state: "
-                 + (SAMPLE_DELAY / 1000.0f) + " seconds");
-        Thread.sleep(1000 * 10);
-
-        LOG.info("Starting sample: " + SAMPLES + " each lasting " + (SAMPLE_DURATION / 1000.0f) + " seconds");
-
-        for (int i = 0; i < SAMPLES; i++) {
-
-            long start = System.currentTimeMillis();
-            producedMessages.set(0);
-            receivedMessages.set(0);
-
-            Thread.sleep(SAMPLE_DURATION);
-
-            long end = System.currentTimeMillis();
-            int r = receivedMessages.get();
-            int p = producedMessages.get();
-
-            LOG.info("published: " + p + " msgs at " + (p * 1000f / (end - start)) + " msgs/sec, "
-                     + "consumed: " + r + " msgs at " + (r * 1000f / (end - start)) + " msgs/sec");
-        }
-
-        LOG.info("Sample done.");
-        sampleTimeDone.countDown();
-
-        workerDone.acquire();
-        if (workerError[0] != null) {
-            throw workerError[0];
-        }
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/JmsClientAckListenerTest.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsClientAckListenerTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/JmsClientAckListenerTest.java
deleted file mode 100644
index 78d7fb3..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsClientAckListenerTest.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq;
-
-import javax.jms.Topic;
-import javax.jms.Connection;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-public class JmsClientAckListenerTest extends TestSupport implements MessageListener {
-
-    private Connection connection;
-    private boolean dontAck;
-
-    protected void setUp() throws Exception {
-        super.setUp();
-        connection = createConnection();
-    }
-
-    /**
-     * @see junit.framework.TestCase#tearDown()
-     */
-    protected void tearDown() throws Exception {
-        if (connection != null) {
-            connection.close();
-            connection = null;
-        }
-        super.tearDown();
-    }
-
-    /**
-     * Tests if acknowleged messages are being consumed.
-     *
-     * @throws javax.jms.JMSException
-     */
-    public void testAckedMessageAreConsumed() throws Exception {
-        connection.start();
-        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-        Topic queue = session.createTopic("test");
-        MessageProducer producer = session.createProducer(queue);
-        producer.send(session.createTextMessage("Hello"));
-
-        // Consume the message...
-        MessageConsumer consumer = session.createDurableSubscriber(queue, "subscriber-id1");
-        consumer.setMessageListener(this);
-
-        Thread.sleep(10000);
-
-        // Reset the session.
-        session.close();
-
-        session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-
-        // Attempt to Consume the message...
-        consumer = session.createDurableSubscriber(queue, "subscriber-id1");
-        Message msg = consumer.receive(1000);
-        assertNull(msg);
-
-        session.close();
-    }
-
-    /**
-     * Tests if unacknowleged messages are being redelivered when the consumer
-     * connects again.
-     *
-     * @throws javax.jms.JMSException
-     */
-    public void testUnAckedMessageAreNotConsumedOnSessionClose() throws Exception {
-        connection.start();
-        // don't aknowledge message on onMessage() call
-        dontAck = true;
-        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-        Topic queue = session.createTopic("test");
-        MessageProducer producer = session.createProducer(queue);
-        // Consume the message...
-        MessageConsumer consumer = session.createDurableSubscriber(queue, "subscriber-id2");
-        consumer.setMessageListener(this);
-
-        // Don't ack the message.
-        producer.send(session.createTextMessage("Hello"));
-
-        // Reset the session. This should cause the Unacked message to be
-        // redelivered.
-        session.close();
-
-        Thread.sleep(10000);
-        session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-        // Attempt to Consume the message...
-        consumer = session.createDurableSubscriber(queue, "subscriber-id2");
-        Message msg = consumer.receive(2000);
-        assertNotNull(msg);
-        msg.acknowledge();
-
-        session.close();
-    }
-
-    public void onMessage(Message message) {
-
-        assertNotNull(message);
-        if (!dontAck) {
-            try {
-                message.acknowledge();
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
-
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/JmsClientAckTest.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsClientAckTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/JmsClientAckTest.java
deleted file mode 100644
index c4aa3c6..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsClientAckTest.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq;
-
-import javax.jms.Topic;
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-public class JmsClientAckTest extends TestSupport {
-
-    private Connection connection;
-
-    protected void setUp() throws Exception {
-        super.setUp();
-        connection = createConnection();
-    }
-
-    /**
-     * @see junit.framework.TestCase#tearDown()
-     */
-    protected void tearDown() throws Exception {
-        if (connection != null) {
-            connection.close();
-            connection = null;
-        }
-        super.tearDown();
-    }
-
-    /**
-     * Tests if acknowledged messages are being consumed.
-     *
-     * @throws JMSException
-     */
-    public void testAckedMessageAreConsumed() throws JMSException {
-        connection.start();
-        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-        Topic queue = session.createTopic(getQueueName());
-        MessageProducer producer = session.createProducer(queue);
-        MessageConsumer consumer = session.createDurableSubscriber(queue, "subscriber-id1");
-        producer.send(session.createTextMessage("Hello"));
-
-        // Consume the message...
-        Message msg = consumer.receive(1000);
-        assertNotNull(msg);
-        msg.acknowledge();
-
-        // Reset the session.
-        session.close();
-        session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-
-        // Attempt to Consume the message...
-        consumer = session.createDurableSubscriber(queue, "subscriber-id1");
-        msg = consumer.receive(1000);
-        assertNull(msg);
-
-        session.close();
-    }
-
-    /**
-     * Tests if acknowledged messages are being consumed.
-     *
-     * @throws JMSException
-     */
-    public void testLastMessageAcked() throws JMSException {
-        connection.start();
-        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-        Topic queue = session.createTopic(getQueueName());
-        MessageProducer producer = session.createProducer(queue);
-        MessageConsumer consumer = session.createDurableSubscriber(queue, "subscriber-id2");
-        producer.send(session.createTextMessage("Hello"));
-        producer.send(session.createTextMessage("Hello2"));
-        producer.send(session.createTextMessage("Hello3"));
-
-        // Consume the message...
-        Message msg = consumer.receive(1000);
-        assertNotNull(msg);
-        msg = consumer.receive(1000);
-        assertNotNull(msg);
-        msg = consumer.receive(1000);
-        assertNotNull(msg);
-        msg.acknowledge();
-
-        // Reset the session.
-        session.close();
-        session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-
-        // Attempt to Consume the message...
-        consumer = session.createDurableSubscriber(queue, "subscriber-id2");
-        msg = consumer.receive(1000);
-        assertNull(msg);
-
-        session.close();
-    }
-
-    /**
-     * Tests if unacknowledged messages are being re-delivered when the consumer connects again.
-     *
-     * @throws JMSException
-     */
-    public void testUnAckedMessageAreNotConsumedOnSessionClose() throws JMSException {
-        connection.start();
-        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-        Topic queue = session.createTopic(getQueueName());
-        MessageProducer producer = session.createProducer(queue);
-        MessageConsumer consumer = session.createDurableSubscriber(queue, "subscriber-id3");
-        producer.send(session.createTextMessage("Hello"));
-
-        // Consume the message...
-        Message msg = consumer.receive(1000);
-        assertNotNull(msg);
-        // Don't ack the message.
-
-        // Reset the session.  This should cause the unacknowledged message to be re-delivered.
-        session.close();
-        session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-
-        // Attempt to Consume the message...
-        consumer = session.createDurableSubscriber(queue, "subscriber-id3");
-        msg = consumer.receive(2000);
-        assertNotNull(msg);
-        msg.acknowledge();
-
-        session.close();
-    }
-
-    protected String getQueueName() {
-        return getClass().getName() + "." + getName();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/JmsConnectionStartStopTest.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsConnectionStartStopTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/JmsConnectionStartStopTest.java
deleted file mode 100644
index 3649614..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsConnectionStartStopTest.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq;
-
-import org.apache.hedwig.jms.DebugUtil;
-import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl;
-import java.util.Random;
-import java.util.Vector;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-
-public class JmsConnectionStartStopTest extends TestSupport {
-
-    private static final org.apache.commons.logging.Log LOG = org.apache.commons.logging.LogFactory
-        .getLog(JmsConnectionStartStopTest.class);
-
-    private Connection startedConnection;
-    private Connection stoppedConnection;
-
-    /**
-     * @see junit.framework.TestCase#setUp()
-     */
-    protected void setUp() throws Exception {
-
-        super.setUp();
-        LOG.info(getClass().getClassLoader().getResource("log4j.properties"));
-
-        HedwigConnectionFactoryImpl factory = createConnectionFactory();
-        startedConnection = factory.createConnection();
-        startedConnection.start();
-        stoppedConnection = factory.createConnection();
-    }
-
-    /**
-     * @see junit.framework.TestCase#tearDown()
-     */
-    protected void tearDown() throws Exception {
-        stoppedConnection.close();
-        startedConnection.close();
-        super.tearDown();
-    }
-
-    /**
-     * Tests if the consumer receives the messages that were sent before the
-     * connection was started.
-     *
-     * @throws JMSException
-     */
-    public void testStoppedConsumerHoldsMessagesTillStarted() throws JMSException {
-        Session startedSession = startedConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        Session stoppedSession = stoppedConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-        // Setup the consumers.
-        Topic topic = startedSession.createTopic("test");
-        MessageConsumer startedConsumer = startedSession.createConsumer(topic);
-        MessageConsumer stoppedConsumer = stoppedSession.createConsumer(topic);
-
-        // Send the message.
-        MessageProducer producer = startedSession.createProducer(topic);
-        TextMessage message = startedSession.createTextMessage("Hello");
-        producer.send(message);
-
-        // Test the assertions.
-        Message m = startedConsumer.receive(1000);
-        assertNotNull(m);
-
-        m = stoppedConsumer.receive(1000);
-        assertNull(m);
-
-        stoppedConnection.start();
-        m = stoppedConsumer.receive(5000);
-        assertNotNull(m);
-
-        startedSession.close();
-        stoppedSession.close();
-    }
-
-    /**
-     * Tests if the consumer is able to receive messages eveb when the
-     * connecction restarts multiple times.
-     *
-     * @throws Exception
-     */
-    public void testMultipleConnectionStops() throws Exception {
-        testStoppedConsumerHoldsMessagesTillStarted();
-        stoppedConnection.stop();
-        testStoppedConsumerHoldsMessagesTillStarted();
-        stoppedConnection.stop();
-        testStoppedConsumerHoldsMessagesTillStarted();
-    }
-
-
-    public void testConcurrentSessionCreateWithStart() throws Exception {
-        ThreadPoolExecutor executor = new ThreadPoolExecutor(50, Integer.MAX_VALUE,
-                                      60L, TimeUnit.SECONDS,
-                                      new SynchronousQueue<Runnable>());
-        final Vector<Throwable> exceptions = new Vector<Throwable>();
-        final Random rand = new Random();
-        final int numIterations = 100;
-        final CountDownLatch latch = new CountDownLatch(numIterations * 2);
-        Runnable createSessionTask = new Runnable() {
-            public void run() {
-                try {
-                    TimeUnit.MILLISECONDS.sleep(rand.nextInt(10));
-                    latch.countDown();
-                    stoppedConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-                } catch (Exception e) {
-                    e.printStackTrace();
-                    exceptions.add(e);
-                }
-            }
-        };
-
-        Runnable startStopTask = new Runnable() {
-            public void run() {
-                try {
-                    TimeUnit.MILLISECONDS.sleep(rand.nextInt(10));
-                    latch.countDown();
-                    stoppedConnection.start();
-                    stoppedConnection.stop();
-                } catch (Exception e) {
-                    e.printStackTrace();
-                    exceptions.add(e);
-                }
-            }
-        };
-
-        for (int i=0; i<numIterations; i++) {
-            executor.execute(createSessionTask);
-            executor.execute(startStopTask);
-        }
-
-        executor.shutdown();
-        final long remaining;
-        {
-            boolean terminated = executor.awaitTermination(30, TimeUnit.SECONDS);
-            remaining = latch.getCount();
-            if (!terminated){
-                DebugUtil.dumpAllStacktraces(System.err);
-            }
-            assertTrue("executor terminated. remaining : " + remaining, terminated);
-        }
-        assertTrue("remaining : " + remaining + ", no exceptions: " + exceptions, exceptions.isEmpty());
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/JmsConsumerResetActiveListenerTest.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsConsumerResetActiveListenerTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/JmsConsumerResetActiveListenerTest.java
deleted file mode 100644
index aaf47f2..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsConsumerResetActiveListenerTest.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq;
-
-import javax.jms.Topic;
-import java.util.Vector;
-
-import org.apache.hedwig.JmsTestBase;
-import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import junit.framework.TestCase;
-
-
-public class JmsConsumerResetActiveListenerTest extends JmsTestBase {
-
-    private Connection connection;
-    private HedwigConnectionFactoryImpl factory;
-
-    protected void setUp() throws Exception {
-        super.setUp();
-        factory = new HedwigConnectionFactoryImpl();
-        connection = factory.createConnection();
-    }
-
-    protected void tearDown() throws Exception {
-        if (connection != null) {
-            connection.close();
-            connection = null;
-        }
-        super.tearDown();
-    }
-
-    /**
-     * verify the (undefined by spec) behaviour of setting a listener while receiving a message.
-     *
-     * @throws Exception
-     */
-    public void testSetListenerFromListener() throws Exception {
-        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-        Destination dest = session.createTopic("Queue-" + getName());
-        final MessageConsumer consumer = session.createConsumer(dest);
-
-        final CountDownLatch latch = new CountDownLatch(2);
-        final AtomicBoolean first = new AtomicBoolean(true);
-        final Vector<Object> results = new Vector<Object>();
-        consumer.setMessageListener(new MessageListener() {
-
-            public void onMessage(Message message) {
-                if (first.compareAndSet(true, false)) {
-                    try {
-                        consumer.setMessageListener(this);
-                        results.add(message);
-                    } catch (JMSException e) {
-                        results.add(e);
-                    }
-                } else {
-                    results.add(message);
-                }
-                latch.countDown();
-            }
-        });
-
-        connection.start();
-
-        MessageProducer producer = session.createProducer(dest);
-        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-        producer.send(session.createTextMessage("First"));
-        producer.send(session.createTextMessage("Second"));
-
-        assertTrue("we did not timeout", latch.await(5, TimeUnit.SECONDS));
-
-        assertEquals("we have a result", 2, results.size());
-        Object result = results.get(0);
-        assertTrue(result instanceof TextMessage);
-        assertEquals("result is first", "First", ((TextMessage)result).getText());
-        result = results.get(1);
-        assertTrue(result instanceof TextMessage);
-        assertEquals("result is first", "Second", ((TextMessage)result).getText());
-    }
-
-    /**
-     * and a listener on a new consumer, just in case.
-      *
-     * @throws Exception
-     */
-    public void testNewConsumerSetListenerFromListener() throws Exception {
-        final Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-        final Destination dest = session.createTopic("Queue-" + getName());
-        final MessageConsumer consumer = session.createConsumer(dest);
-
-        final CountDownLatch latch = new CountDownLatch(2);
-        final AtomicBoolean first = new AtomicBoolean(true);
-        final Vector<Object> results = new Vector<Object>();
-        consumer.setMessageListener(new MessageListener() {
-
-            public void onMessage(Message message) {
-                if (first.compareAndSet(true, false)) {
-                    try {
-                        MessageConsumer anotherConsumer = session.createConsumer(dest);
-                        anotherConsumer.setMessageListener(this);
-                        results.add(message);
-                    } catch (JMSException e) {
-                        results.add(e);
-                    }
-                } else {
-                    results.add(message);
-                }
-                latch.countDown();
-            }
-        });
-
-        connection.start();
-
-        MessageProducer producer = session.createProducer(dest);
-        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-        producer.send(session.createTextMessage("First"));
-        producer.send(session.createTextMessage("Second"));
-
-        assertTrue("we did not timeout", latch.await(5, TimeUnit.SECONDS));
-
-        assertEquals("we have a result", 2, results.size());
-        Object result = results.get(0);
-        assertTrue(result instanceof TextMessage);
-        assertEquals("result is first", "First", ((TextMessage)result).getText());
-        result = results.get(1);
-        assertTrue(result instanceof TextMessage);
-        assertEquals("result is first", "Second", ((TextMessage)result).getText());
-    }
- }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/JmsCreateConsumerInOnMessageTest.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsCreateConsumerInOnMessageTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/JmsCreateConsumerInOnMessageTest.java
deleted file mode 100644
index 218bbe5..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsCreateConsumerInOnMessageTest.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq;
-
-import javax.jms.Connection;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.Topic;
-
-public class JmsCreateConsumerInOnMessageTest extends TestSupport implements MessageListener {
-
-    private Connection connection;
-    private Session publisherSession;
-    private Session consumerSession;
-    private MessageConsumer consumer;
-    private MessageConsumer testConsumer;
-    private MessageProducer producer;
-    private Topic topic;
-    private Object lock = new Object();
-
-    /*
-     * @see junit.framework.TestCase#setUp()
-     */
-    protected void setUp() throws Exception {
-        super.setUp();
-        super.topic = true;
-        connection = createConnection(false);
-        connection.setClientID("connection:" + getSubject());
-        publisherSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        consumerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        topic = (Topic)super.createDestination("Test.Topic");
-        consumer = consumerSession.createConsumer(topic);
-        consumer.setMessageListener(this);
-        producer = publisherSession.createProducer(topic);
-        connection.start();
-    }
-
-    /*
-     * @see junit.framework.TestCase#tearDown()
-     */
-    protected void tearDown() throws Exception {
-        connection.close();
-        super.tearDown();
-    }
-
-    /**
-     * Tests if a consumer can be created asynchronusly
-     *
-     * @throws Exception
-     */
-    public void testCreateConsumer() throws Exception {
-        Message msg = super.createMessage();
-        producer.send(msg);
-        if (testConsumer == null) {
-            synchronized (lock) {
-                lock.wait(3000);
-            }
-        }
-        assertTrue(testConsumer != null);
-    }
-
-    /**
-     * Use the asynchronous subscription mechanism
-     *
-     * @param message
-     */
-    public void onMessage(Message message) {
-        try {
-            testConsumer = consumerSession.createConsumer(topic);
-            consumerSession.createProducer(topic);
-            synchronized (lock) {
-                lock.notify();
-            }
-        } catch (Exception ex) {
-            ex.printStackTrace();
-            assertTrue(false);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/JmsDurableTopicSelectorTest.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsDurableTopicSelectorTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/JmsDurableTopicSelectorTest.java
deleted file mode 100644
index 548e7a8..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsDurableTopicSelectorTest.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq;
-
-import javax.jms.Topic;
-
-public class JmsDurableTopicSelectorTest extends JmsTopicSelectorTest {
-    public void setUp() throws Exception {
-        durable = true;
-        super.setUp();
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/JmsDurableTopicSendReceiveTest.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsDurableTopicSendReceiveTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/JmsDurableTopicSendReceiveTest.java
deleted file mode 100644
index b2e2ed7..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsDurableTopicSendReceiveTest.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-
-import org.apache.activemq.test.JmsTopicSendReceiveTest;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class JmsDurableTopicSendReceiveTest extends JmsTopicSendReceiveTest {
-    private static final Logger LOG = LoggerFactory.getLogger(JmsDurableTopicSendReceiveTest.class);
-
-    protected Connection connection2;
-    protected Session session2;
-    protected Session consumeSession2;
-    protected MessageConsumer consumer2;
-    protected MessageProducer producer2;
-    protected Destination consumerDestination2;
-    protected Destination producerDestination2;
-
-    /**
-     * Set up a durable suscriber test.
-     *
-     * @see junit.framework.TestCase#setUp()
-     */
-    protected void setUp() throws Exception {
-        this.durable = true;
-        super.setUp();
-    }
-
-    /**
-     * Test if all the messages sent are being received.
-     *
-     * @throws Exception
-     */
-    public void testSendWhileClosed() throws Exception {
-        connection2 = createConnection(false);
-        if (null == connection.getClientID()) connection2.setClientID(getName() + "test");
-        connection2.start();
-        session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        producer2 = session2.createProducer(null);
-        producer2.setDeliveryMode(deliveryMode);
-        producerDestination2 = session2.createTopic(getProducerSubject() + "2");
-        Thread.sleep(1000);
-
-        consumeSession2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        consumerDestination2 = session2.createTopic(getConsumerSubject() + "2");
-        consumer2 = consumeSession2.createDurableSubscriber((Topic)consumerDestination2, getName());
-        Thread.sleep(1000);
-        consumer2.close();
-        TextMessage message = session2.createTextMessage("test");
-        message.setStringProperty("test", "test");
-        message.setJMSType("test");
-        producer2.send(producerDestination2, message);
-        LOG.info("Creating durable consumer");
-        consumer2 = consumeSession2.createDurableSubscriber((Topic)consumerDestination2, getName());
-        Message msg = consumer2.receive(1000);
-        assertNotNull(msg);
-        assertEquals(((TextMessage)msg).getText(), "test");
-        assertEquals(msg.getJMSType(), "test");
-        assertEquals(msg.getStringProperty("test"), "test");
-        connection2.stop();
-        connection2.close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/JmsDurableTopicTransactionTest.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsDurableTopicTransactionTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/JmsDurableTopicTransactionTest.java
deleted file mode 100644
index c0da42f..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsDurableTopicTransactionTest.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq;
-
-import javax.jms.Topic;
-import javax.jms.DeliveryMode;
-
-import org.apache.activemq.test.JmsResourceProvider;
-
-public class JmsDurableTopicTransactionTest extends JmsTopicTransactionTest {
-
-    /**
-     * @see JmsTransactionTestSupport#getJmsResourceProvider()
-     */
-    protected JmsResourceProvider getJmsResourceProvider() {
-        JmsResourceProvider provider = new JmsResourceProvider();
-        provider.setTopic(true);
-        provider.setDeliveryMode(DeliveryMode.PERSISTENT);
-        provider.setClientID(getClass().getName());
-        provider.setDurableName(getName());
-        return provider;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java b/hedwig-client-jms/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java
deleted file mode 100644
index 3873a0f..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java
+++ /dev/null
@@ -1,281 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq;
-
-import java.net.URI;
-import org.apache.hedwig.jms.SessionImpl;
-import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.jms.TopicSubscriber;
-
-
-
-import javax.jms.Destination;
-
-
-import org.apache.activemq.util.MessageIdList;
-
-/**
- * Test case support used to test multiple message comsumers and message
- * producers connecting to a single broker.
- */
-public class JmsMultipleClientsTestSupport extends CombinationTestSupport {
-
-    protected Map<MessageConsumer, MessageIdList> consumers
-        = new HashMap<MessageConsumer, MessageIdList>(); // Map of consumer with messages received
-    protected int consumerCount = 1;
-    protected int producerCount = 1;
-
-    protected int messageSize = 1024;
-
-    protected boolean useConcurrentSend = true;
-    protected boolean autoFail = true;
-    protected boolean durable;
-    public boolean topic;
-    protected boolean persistent;
-
-    protected Destination destination;
-    protected List<Connection> connections = Collections.synchronizedList(new ArrayList<Connection>());
-    protected MessageIdList allMessagesList = new MessageIdList();
-
-    private AtomicInteger producerLock;
-
-    protected void startProducers(Destination dest, int msgCount) throws Exception {
-        startProducers(createConnectionFactory(), dest, msgCount);
-    }
-
-    protected void startProducers(final ConnectionFactory factory,
-                                  final Destination dest, final int msgCount) throws Exception {
-        // Use concurrent send
-        if (useConcurrentSend) {
-            producerLock = new AtomicInteger(producerCount);
-
-            for (int i = 0; i < producerCount; i++) {
-                Thread t = new Thread(new Runnable() {
-                    public void run() {
-                        try {
-                            sendMessages(factory.createConnection(), dest, msgCount);
-                        } catch (Exception e) {
-                            e.printStackTrace();
-                        }
-
-                        synchronized (producerLock) {
-                            producerLock.decrementAndGet();
-                            producerLock.notifyAll();
-                        }
-                    }
-                });
-
-                t.start();
-            }
-
-            // Wait for all producers to finish sending
-            synchronized (producerLock) {
-                while (producerLock.get() != 0) {
-                    producerLock.wait(2000);
-                }
-            }
-
-            // Use serialized send
-        } else {
-            for (int i = 0; i < producerCount; i++) {
-                sendMessages(factory.createConnection(), dest, msgCount);
-            }
-        }
-    }
-
-    protected void sendMessages(Connection connection, Destination destination, int count) throws Exception {
-        connections.add(connection);
-        connection.start();
-
-        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageProducer producer = session.createProducer(destination);
-        producer.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-
-        for (int i = 0; i < count; i++) {
-            TextMessage msg = createTextMessage(session, "" + i);
-            producer.send(msg);
-        }
-
-        producer.close();
-        session.close();
-        connection.close();
-    }
-
-    protected TextMessage createTextMessage(Session session, String initText) throws Exception {
-        TextMessage msg = session.createTextMessage();
-
-        // Pad message text
-        if (initText.length() < messageSize) {
-            char[] data = new char[messageSize - initText.length()];
-            Arrays.fill(data, '*');
-            String str = new String(data);
-            msg.setText(initText + str);
-
-            // Do not pad message text
-        } else {
-            msg.setText(initText);
-        }
-
-        return msg;
-    }
-
-    protected void startConsumers(Destination dest) throws Exception {
-        startConsumers(createConnectionFactory(), dest);
-    }
-
-    protected void startConsumers(ConnectionFactory factory, Destination dest) throws Exception {
-        MessageConsumer consumer;
-        for (int i = 0; i < consumerCount; i++) {
-            if (durable && topic) {
-                consumer = createDurableSubscriber(factory.createConnection(), dest, "consumer" + (i + 1));
-            } else {
-                consumer = createMessageConsumer(factory.createConnection(), dest);
-            }
-            MessageIdList list = new MessageIdList();
-            list.setParent(allMessagesList);
-            consumer.setMessageListener(list);
-            consumers.put(consumer, list);
-        }
-    }
-
-    protected MessageConsumer createMessageConsumer(Connection conn, Destination dest) throws Exception {
-        connections.add(conn);
-
-        Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        final MessageConsumer consumer = sess.createConsumer(dest);
-        conn.start();
-
-        return consumer;
-    }
-
-    protected TopicSubscriber createDurableSubscriber(Connection conn, Destination dest, String name) throws Exception {
-        conn.setClientID(name);
-        connections.add(conn);
-        conn.start();
-
-        Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        final TopicSubscriber consumer = sess.createDurableSubscriber((javax.jms.Topic)dest, name);
-
-        return consumer;
-    }
-
-    protected void waitForAllMessagesToBeReceived(int messageCount) throws Exception {
-        allMessagesList.waitForMessagesToArrive(messageCount);
-    }
-
-    protected Destination createDestination() throws JMSException {
-        String name = "." + getClass().getName() + "." + getName();
-        // ensure not inadvertently composite because of combos
-        name = name.replace(' ','_');
-        name = name.replace(',','&');
-        if (topic) {
-            destination = SessionImpl.asTopic("Topic" + name);
-            return (Destination)destination;
-        } else {
-            destination = SessionImpl.asTopic("Queue" + name);
-            return (Destination)destination;
-        }
-    }
-
-    protected ConnectionFactory createConnectionFactory() throws Exception {
-        return new HedwigConnectionFactoryImpl();
-    }
-
-    protected void setUp() throws Exception {
-        super.setAutoFail(autoFail);
-        super.setUp();
-    }
-
-    protected void tearDown() throws Exception {
-        for (Iterator<Connection> iter = connections.iterator(); iter.hasNext();) {
-            Connection conn = iter.next();
-            try {
-                conn.close();
-            } catch (Throwable e) {
-            }
-        }
-        allMessagesList.flushMessages();
-        consumers.clear();
-        super.tearDown();
-    }
-
-    /*
-     * Some helpful assertions for multiple consumers.
-     */
-    protected void assertConsumerReceivedAtLeastXMessages(MessageConsumer consumer, int msgCount) {
-        MessageIdList messageIdList = consumers.get(consumer);
-        messageIdList.assertAtLeastMessagesReceived(msgCount);
-    }
-
-    protected void assertConsumerReceivedAtMostXMessages(MessageConsumer consumer, int msgCount) {
-        MessageIdList messageIdList = consumers.get(consumer);
-        messageIdList.assertAtMostMessagesReceived(msgCount);
-    }
-
-    protected void assertConsumerReceivedXMessages(MessageConsumer consumer, int msgCount) {
-        MessageIdList messageIdList = consumers.get(consumer);
-        messageIdList.assertMessagesReceivedNoWait(msgCount);
-    }
-
-    protected void assertEachConsumerReceivedAtLeastXMessages(int msgCount) {
-        for (Iterator<MessageConsumer> i = consumers.keySet().iterator(); i.hasNext();) {
-            assertConsumerReceivedAtLeastXMessages(i.next(), msgCount);
-        }
-    }
-
-    protected void assertEachConsumerReceivedAtMostXMessages(int msgCount) {
-        for (Iterator<MessageConsumer> i = consumers.keySet().iterator(); i.hasNext();) {
-            assertConsumerReceivedAtMostXMessages(i.next(), msgCount);
-        }
-    }
-
-    protected void assertEachConsumerReceivedXMessages(int msgCount) {
-        for (Iterator<MessageConsumer> i = consumers.keySet().iterator(); i.hasNext();) {
-            assertConsumerReceivedXMessages(i.next(), msgCount);
-        }
-    }
-
-    protected void assertTotalMessagesReceived(int msgCount) {
-        allMessagesList.assertMessagesReceivedNoWait(msgCount);
-
-        // now lets count the individual messages received
-        int totalMsg = 0;
-        for (Iterator<MessageConsumer> i = consumers.keySet().iterator(); i.hasNext();) {
-            MessageIdList messageIdList = consumers.get(i.next());
-            totalMsg += messageIdList.getMessageCount();
-        }
-        assertEquals("Total of consumers message count", msgCount, totalMsg);
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/JmsRedeliveredTest.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsRedeliveredTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/JmsRedeliveredTest.java
deleted file mode 100644
index fba7064..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsRedeliveredTest.java
+++ /dev/null
@@ -1,380 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq;
-
-import javax.jms.Connection;
-
-import org.apache.hedwig.JmsTestBase;
-import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-
-import junit.framework.Test;
-import junit.framework.TestCase;
-import junit.framework.TestSuite;
-
-public class JmsRedeliveredTest extends JmsTestBase {
-
-    private Connection connection;
-
-    /*
-     * (non-Javadoc)
-     *
-     * @see junit.framework.TestCase#setUp()
-     */
-    protected void setUp() throws Exception {
-        super.setUp();
-        connection = createConnection();
-    }
-
-    /**
-     * @see junit.framework.TestCase#tearDown()
-     */
-    protected void tearDown() throws Exception {
-        if (connection != null) {
-            connection.close();
-            connection = null;
-        }
-        super.tearDown();
-    }
-
-    /**
-     * Creates a connection.
-     *
-     * @return connection
-     * @throws Exception
-     */
-    protected Connection createConnection() throws Exception {
-        HedwigConnectionFactoryImpl factory = new HedwigConnectionFactoryImpl();
-        return factory.createConnection();
-    }
-
-    /**
-     * Tests if a message unacknowledged message gets to be resent when the
-     * session is closed and then a new consumer session is created.
-     *
-     */
-    public void testTopicSessionCloseMarksMessageRedelivered() throws JMSException {
-        connection.start();
-
-        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-        Topic queue = session.createTopic("queue-" + getName());
-        MessageProducer producer = createProducer(session, queue);
-        MessageConsumer consumer = session.createDurableSubscriber(queue, "subscriber-id1");
-        producer.send(createTextMessage(session));
-
-        // Consume the message...
-        Message msg = consumer.receive(1000);
-        assertNotNull(msg);
-        assertFalse("Message should not be redelivered.", msg.getJMSRedelivered());
-        // Don't ack the message.
-
-        // Reset the session. This should cause the Unacked message to be
-        // redelivered.
-        session.close();
-        session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-
-        // Attempt to Consume the message...
-        consumer = session.createDurableSubscriber(queue, "subscriber-id1");
-        msg = consumer.receive(2000);
-        assertNotNull(msg);
-        // Since we only simulate this in provider, we cannot do this across consumers !
-        // assertTrue("Message should be redelivered.", msg.getJMSRedelivered());
-        msg.acknowledge();
-
-        session.close();
-    }
-
-
-    public void testTopicSessionCloseMarksUnAckedMessageRedelivered() throws JMSException {
-        connection.start();
-
-        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-        Topic queue = session.createTopic("queue-" + getName());
-        MessageProducer producer = createProducer(session, queue);
-        MessageConsumer consumer = session.createDurableSubscriber(queue, "subscriber-id2");
-        producer.send(createTextMessage(session, "1"));
-        producer.send(createTextMessage(session, "2"));
-
-        // Consume the message...
-        Message msg = consumer.receive(1000);
-        assertNotNull(msg);
-        assertFalse("Message should not be redelivered.", msg.getJMSRedelivered());
-        assertEquals("1", ((TextMessage)msg).getText());
-        msg.acknowledge();
-
-        // Don't ack the message.
-        msg = consumer.receive(1000);
-        assertNotNull(msg);
-        assertFalse("Message should not be redelivered.", msg.getJMSRedelivered());
-        assertEquals("2", ((TextMessage)msg).getText());
-
-        // Reset the session. This should cause the Unacked message to be
-        // redelivered.
-        session.close();
-        session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-
-        // Attempt to Consume the message...
-        consumer = session.createDurableSubscriber(queue, "subscriber-id2");
-        msg = consumer.receive(2000);
-        assertNotNull(msg);
-        assertEquals("2", ((TextMessage)msg).getText());
-        // Since we only simulate this in provider, we cannot do this across consumers !
-        // assertTrue("Message should be redelivered.", msg.getJMSRedelivered());
-        msg.acknowledge();
-
-        session.close();
-    }
-
-    /**
-     * Tests session recovery and that the redelivered message is marked as
-     * such. Session uses client acknowledgement, the destination is a queue.
-     *
-     * @throws JMSException
-     */
-    public void testTopicRecoverMarksMessageRedelivered() throws Exception {
-        connection.setClientID(getName());
-        connection.start();
-
-        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-        Topic queue = session.createTopic("queue-" + getName());
-        MessageProducer producer = createProducer(session, queue);
-        MessageConsumer consumer = session.createDurableSubscriber(queue, getName() + " - subscriber");
-        producer.send(createTextMessage(session));
-
-        // Consume the message...
-        Message msg = consumer.receive(1000);
-        assertNotNull(msg);
-        assertFalse("Message should not be redelivered.", msg.getJMSRedelivered());
-        // Don't ack the message.
-
-        // We DO NOT support session recovery
-        // - to unblock this test, I am stopp'ing and start'ing connection : not the same, but ...
-        // Reset the session. This should cause the Unacked message to be
-        // redelivered.
-        // session.recover();
-        connection.close();
-        connection = createConnection();
-        connection.setClientID(getName());
-        session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-        consumer = session.createDurableSubscriber(queue, getName() + " - subscriber");
-        connection.start();
-
-        // Attempt to Consume the message...
-        msg = consumer.receive(2000);
-        assertNotNull(msg);
-        // assertTrue("Message should be redelivered.", msg.getJMSRedelivered());
-        msg.acknowledge();
-
-        session.close();
-    }
-
-    /**
-     * Tests rollback message to be marked as redelivered. Session uses client
-     * acknowledgement and the destination is a queue.
-     *
-     * @throws JMSException
-     */
-    public void testTopicRollbackMarksMessageRedelivered() throws JMSException {
-        connection.start();
-
-        Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
-        Topic queue = session.createTopic("queue-" + getName());
-        MessageProducer producer = createProducer(session, queue);
-        MessageConsumer consumer = session.createConsumer(queue);
-        producer.send(createTextMessage(session));
-        session.commit();
-
-        // Get the message... Should not be redelivered.
-        Message msg = consumer.receive(1000);
-        assertNotNull(msg);
-        assertFalse("Message should not be redelivered.", msg.getJMSRedelivered());
-
-        // Rollback.. should cause redelivery.
-        session.rollback();
-
-        // Attempt to Consume the message...
-        msg = consumer.receive(2000);
-        assertNotNull(msg);
-        assertTrue("Message should be redelivered.", msg.getJMSRedelivered());
-
-        session.commit();
-        session.close();
-    }
-
-    /**
-     * Tests if the message gets to be re-delivered when the session closes and
-     * that the re-delivered message is marked as such. Session uses client
-     * acknowledgment, the destination is a topic and the consumer is a durable
-     * subscriber.
-     *
-     * @throws JMSException
-     */
-    public void testDurableTopicSessionCloseMarksMessageRedelivered() throws JMSException {
-        connection.setClientID(getName());
-        connection.start();
-
-        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-        Topic topic = session.createTopic("topic-" + getName());
-        MessageConsumer consumer = session.createDurableSubscriber(topic, "sub1");
-
-        // This case only works with persistent messages since transient
-        // messages
-        // are dropped when the consumer goes offline.
-        MessageProducer producer = session.createProducer(topic);
-        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
-        producer.send(createTextMessage(session));
-
-        // Consume the message...
-        Message msg = consumer.receive(1000);
-        assertNotNull(msg);
-        assertFalse("Message should not be re-delivered.", msg.getJMSRedelivered());
-        // Don't ack the message.
-
-        // Reset the session. This should cause the Unacked message to be
-        // re-delivered.
-        session.close();
-        session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-
-        // Attempt to Consume the message...
-        consumer = session.createDurableSubscriber(topic, "sub1");
-        msg = consumer.receive(2000);
-        assertNotNull(msg);
-        // Since we only simulate this in provider, we cannot do this across consumers !
-        // assertTrue("Message should be redelivered.", msg.getJMSRedelivered());
-        msg.acknowledge();
-
-        session.close();
-    }
-
-    /**
-     * Tests rollback message to be marked as redelivered. Session uses client
-     * acknowledgement and the destination is a topic.
-     *
-     * @throws JMSException
-     */
-    public void testDurableTopicRollbackMarksMessageRedelivered() throws JMSException {
-        if (null == connection.getClientID()) connection.setClientID(getName());
-        connection.start();
-
-        Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
-        Topic topic = session.createTopic("topic-" + getName());
-        MessageConsumer consumer = session.createDurableSubscriber(topic, "sub1");
-
-        MessageProducer producer = createProducer(session, topic);
-        producer.send(createTextMessage(session));
-        session.commit();
-
-        // Get the message... Should not be redelivered.
-        Message msg = consumer.receive(1000);
-        assertNotNull(msg);
-        assertFalse("Message should not be redelivered.", msg.getJMSRedelivered());
-
-        // Rollback.. should cause redelivery.
-        session.rollback();
-
-        // Attempt to Consume the message...
-        msg = consumer.receive(2000);
-        assertNotNull(msg);
-        assertTrue("Message should be redelivered.", msg.getJMSRedelivered());
-
-        session.commit();
-        session.close();
-    }
-
-    /**
-     * Creates a text message.
-     *
-     * @param session
-     * @return TextMessage.
-     * @throws JMSException
-     */
-    private TextMessage createTextMessage(Session session) throws JMSException {
-        return createTextMessage(session, "Hello");
-    }
-
-    private TextMessage createTextMessage(Session session, String txt) throws JMSException {
-        return session.createTextMessage(txt);
-    }
-
-    /**
-     * Creates a producer.
-     *
-     * @param session
-     * @param queue - destination.
-     * @return MessageProducer
-     * @throws JMSException
-     */
-    private MessageProducer createProducer(Session session, Destination queue) throws JMSException {
-        MessageProducer producer = session.createProducer(queue);
-        producer.setDeliveryMode(getDeliveryMode());
-        return producer;
-    }
-
-    /**
-     * Returns delivery mode.
-     *
-     * @return int - persistent delivery mode.
-     */
-    protected int getDeliveryMode() {
-        return DeliveryMode.PERSISTENT;
-    }
-
-    /**
-     * Run the JmsRedeliverTest with the delivery mode set as persistent.
-     */
-    public static final class PersistentCase extends JmsRedeliveredTest {
-
-        /**
-         * Returns delivery mode.
-         *
-         * @return int - persistent delivery mode.
-         */
-        protected int getDeliveryMode() {
-            return DeliveryMode.PERSISTENT;
-        }
-    }
-
-    /**
-     * Run the JmsRedeliverTest with the delivery mode set as non-persistent.
-     */
-    public static final class TransientCase extends JmsRedeliveredTest {
-
-        /**
-         * Returns delivery mode.
-         *
-         * @return int - non-persistent delivery mode.
-         */
-        protected int getDeliveryMode() {
-            return DeliveryMode.NON_PERSISTENT;
-        }
-    }
-
-    public static Test suite() {
-        TestSuite suite = new TestSuite();
-        suite.addTestSuite(PersistentCase.class);
-        suite.addTestSuite(TransientCase.class);
-        return suite;
-    }
-}