You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2016/03/16 04:44:50 UTC

[40/49] bookkeeper git commit: BOOKKEEPER-769: Remove the Hedwig Code

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/CombinationTestSupport.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/CombinationTestSupport.java b/hedwig-client-jms/src/test/java/org/apache/activemq/CombinationTestSupport.java
deleted file mode 100644
index d4155a7..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/CombinationTestSupport.java
+++ /dev/null
@@ -1,250 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq;
-
-import java.lang.reflect.Field;
-import java.lang.reflect.Method;
-import java.lang.reflect.Modifier;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedHashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import junit.framework.Test;
-import junit.framework.TestSuite;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Poor mans way of getting JUnit to run a test case through a few different
- * combinations of options. Usage: If you have a test case called testFoo what
- * you want to run through a few combinations, of of values for the attributes
- * age and color, you would something like: <code>
- *    public void initCombosForTestFoo() {
- *        addCombinationValues( "age", new Object[]{ new Integer(21), new Integer(30) } );
- *        addCombinationValues( "color", new Object[]{"blue", "green"} );
- *    }
- * </code>
- * The testFoo test case would be run for each possible combination of age and
- * color that you setup in the initCombosForTestFoo method. Before each
- * combination is run, the age and color fields of the test class are set to one
- * of the values defined. This is done before the normal setUp method is called.
- * If you want the test combinations to show up as separate test runs in the
- * JUnit reports, add a suite method to your test case similar to: <code>
- *     public static Test suite() {
- *         return suite(FooTest.class);
- *     }
- * </code>
- *
- *
- */
-public abstract class CombinationTestSupport extends AutoFailTestSupport {
-
-    private static final Logger LOG = LoggerFactory.getLogger(CombinationTestSupport.class);
-
-    private HashMap<String, ComboOption> comboOptions = new HashMap<String, ComboOption>();
-    private boolean combosEvaluated;
-    private Map<String, Object> options;
-
-    static class ComboOption {
-        final String attribute;
-        final LinkedHashSet<Object> values = new LinkedHashSet<Object>();
-
-        public ComboOption(String attribute, Collection<Object> options) {
-            this.attribute = attribute;
-            this.values.addAll(options);
-        }
-    }
-
-    public void addCombinationValues(String attribute, Object[] options) {
-        ComboOption co = this.comboOptions.get(attribute);
-        if (co == null) {
-            this.comboOptions.put(attribute, new ComboOption(attribute, Arrays.asList(options)));
-        } else {
-            co.values.addAll(Arrays.asList(options));
-        }
-    }
-
-    public void runBare() throws Throwable {
-        if (combosEvaluated) {
-            super.runBare();
-        } else {
-            CombinationTestSupport[] combinations = getCombinations();
-            for (int i = 0; i < combinations.length; i++) {
-                CombinationTestSupport test = combinations[i];
-                if (getName() == null || getName().equals(test.getName())) {
-                    test.runBare();
-                }
-            }
-        }
-    }
-
-    private void setOptions(Map<String, Object> options) throws NoSuchFieldException, IllegalAccessException {
-        this.options = options;
-        for (Iterator<String> iterator = options.keySet().iterator(); iterator.hasNext();) {
-            String attribute = iterator.next();
-            Object value = options.get(attribute);
-            try {
-                Field field = getClass().getField(attribute);
-                field.set(this, value);
-            } catch (Throwable e) {
-                try {
-                    boolean found = false;
-                    String setterName = "set" + attribute.substring(0, 1).toUpperCase() +
-                                        attribute.substring(1);
-                    for(Method method : getClass().getMethods()) {
-                        if (method.getName().equals(setterName)) {
-                            method.invoke(this, value);
-                            found = true;
-                            break;
-                        }
-                    }
-
-                    if (!found) {
-                        throw new NoSuchMethodError("No setter found for field: " + attribute);
-                    }
-
-                } catch(Throwable ex) {
-                    LOG.info("Could not set field '" + attribute + "' to value '" + value +
-                             "', make sure the field exists and is public or has a setter.");
-                }
-            }
-        }
-    }
-
-    private CombinationTestSupport[] getCombinations() {
-        try {
-            Method method = getClass().getMethod("initCombos", (Class[])null);
-            method.invoke(this, (Object[])null);
-        } catch (Throwable e) {
-        }
-
-        String name = getName().split(" ")[0];
-        String comboSetupMethodName = "initCombosFor" + Character.toUpperCase(name.charAt(0)) + name.substring(1);
-        try {
-            Method method = getClass().getMethod(comboSetupMethodName, (Class[])null);
-            method.invoke(this, (Object[])null);
-        } catch (Throwable e) {
-        }
-
-        try {
-            ArrayList<HashMap<String, Object>> expandedOptions = new ArrayList<HashMap<String, Object>>();
-            expandCombinations(new ArrayList<ComboOption>(comboOptions.values()), expandedOptions);
-
-            if (expandedOptions.isEmpty()) {
-                combosEvaluated = true;
-                return new CombinationTestSupport[] {this};
-            } else {
-
-                ArrayList<CombinationTestSupport> result = new ArrayList<CombinationTestSupport>();
-                // Run the test case for each possible combination
-                for (Iterator<HashMap<String, Object>> iter = expandedOptions.iterator(); iter.hasNext();) {
-                    CombinationTestSupport combo = (CombinationTestSupport)TestSuite.createTest(getClass(), name);
-                    combo.combosEvaluated = true;
-                    combo.setOptions(iter.next());
-                    result.add(combo);
-                }
-
-                CombinationTestSupport rc[] = new CombinationTestSupport[result.size()];
-                result.toArray(rc);
-                return rc;
-            }
-        } catch (Throwable e) {
-            combosEvaluated = true;
-            return new CombinationTestSupport[] {this};
-        }
-
-    }
-
-    private void expandCombinations(List<ComboOption> optionsLeft, List<HashMap<String, Object>> expandedCombos) {
-        if (!optionsLeft.isEmpty()) {
-            HashMap<String, Object> map;
-            if (comboOptions.size() == optionsLeft.size()) {
-                map = new HashMap<String, Object>();
-                expandedCombos.add(map);
-            } else {
-                map = expandedCombos.get(expandedCombos.size() - 1);
-            }
-
-            LinkedList<ComboOption> l = new LinkedList<ComboOption>(optionsLeft);
-            ComboOption comboOption = l.removeLast();
-            int i = 0;
-            for (Iterator<Object> iter = comboOption.values.iterator(); iter.hasNext();) {
-                Object value = iter.next();
-                if (i != 0) {
-                    map = new HashMap<String, Object>(map);
-                    expandedCombos.add(map);
-                }
-                map.put(comboOption.attribute, value);
-                expandCombinations(l, expandedCombos);
-                i++;
-            }
-        }
-    }
-
-    public static Test suite(Class<? extends CombinationTestSupport> clazz) {
-        TestSuite suite = new TestSuite();
-
-        ArrayList<String> names = new ArrayList<String>();
-        Method[] methods = clazz.getMethods();
-        for (int i = 0; i < methods.length; i++) {
-            String name = methods[i].getName();
-            if (names.contains(name) || !isPublicTestMethod(methods[i])) {
-                continue;
-            }
-            names.add(name);
-            Test test = TestSuite.createTest(clazz, name);
-            if (test instanceof CombinationTestSupport) {
-                CombinationTestSupport[] combinations = ((CombinationTestSupport)test).getCombinations();
-                for (int j = 0; j < combinations.length; j++) {
-                    suite.addTest(combinations[j]);
-                }
-            } else {
-                suite.addTest(test);
-            }
-        }
-        return suite;
-    }
-
-    private static boolean isPublicTestMethod(Method m) {
-        return isTestMethod(m) && Modifier.isPublic(m.getModifiers());
-    }
-
-    private static boolean isTestMethod(Method m) {
-        String name = m.getName();
-        Class<?>[] parameters = m.getParameterTypes();
-        Class<?> returnType = m.getReturnType();
-        return parameters.length == 0 && name.startsWith("test") && returnType.equals(Void.TYPE);
-    }
-
-    public String getName() {
-        return getName(false);
-    }
-
-    public String getName(boolean original) {
-        if (options != null && !original) {
-            return super.getName() + " " + options;
-        }
-        return super.getName();
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/ConnectionCleanupTest.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/ConnectionCleanupTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/ConnectionCleanupTest.java
deleted file mode 100644
index e1e85e5..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/ConnectionCleanupTest.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq;
-
-import javax.jms.JMSException;
-import javax.jms.Session;
-
-import junit.framework.TestCase;
-import org.apache.hedwig.JmsTestBase;
-import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl;
-import org.apache.hedwig.jms.spi.HedwigConnectionImpl;
-
-public class ConnectionCleanupTest extends JmsTestBase {
-
-    private HedwigConnectionImpl connection;
-    private HedwigConnectionFactoryImpl factory;
-
-    protected void setUp() throws Exception {
-        super.setUp();
-        this.factory = new HedwigConnectionFactoryImpl();
-        connection = factory.createConnection();
-    }
-
-    /**
-     * @see junit.framework.TestCase#tearDown()
-     */
-    protected void tearDown() throws Exception {
-        connection.close();
-        super.tearDown();
-    }
-
-    /**
-     * @throws JMSException
-     */
-    public void testChangeClientID() throws JMSException {
-
-        connection.setClientID("test");
-        connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-        try {
-            connection.setClientID("test");
-            fail("Should have received JMSException");
-        } catch (JMSException e) {
-        }
-
-        connection.close();
-        connection = factory.createConnection();
-        connection.setClientID("test");
-
-        connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-        try {
-            connection.setClientID("test");
-            fail("Should have received JMSException");
-        } catch (JMSException e) {
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/ConsumerReceiveWithTimeoutTest.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/ConsumerReceiveWithTimeoutTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/ConsumerReceiveWithTimeoutTest.java
deleted file mode 100644
index 93e0615..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/ConsumerReceiveWithTimeoutTest.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq;
-
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.Topic;
-
-public class ConsumerReceiveWithTimeoutTest extends TestSupport {
-
-    private Connection connection;
-
-    protected void setUp() throws Exception {
-        super.setUp();
-        connection = createConnection();
-    }
-
-    /**
-     * @see junit.framework.TestCase#tearDown()
-     */
-    protected void tearDown() throws Exception {
-        if (connection != null) {
-            connection.close();
-            connection = null;
-        }
-        super.tearDown();
-    }
-
-    /**
-     * Test to check if consumer thread wakes up inside a receive(timeout) after
-     * a message is dispatched to the consumer
-     *
-     * @throws javax.jms.JMSException
-     */
-    public void testConsumerReceiveBeforeMessageDispatched() throws JMSException {
-
-        connection.start();
-
-        final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        final Topic queue = session.createTopic("test");
-        MessageConsumer consumer = session.createConsumer(queue);
-
-        Thread t = new Thread() {
-            public void run() {
-                try {
-                    // wait for 10 seconds to allow consumer.receive to be run
-                    // first
-                    Thread.sleep(10000);
-                    MessageProducer producer = session.createProducer(queue);
-                    producer.send(session.createTextMessage("Hello"));
-                } catch (Exception e) {
-                    e.printStackTrace();
-                }
-            }
-        };
-
-        t.start();
-
-        // Consume the message...
-        Message msg = consumer.receive(60000);
-        assertNotNull(msg);
-        session.close();
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/ExpiryHogTest.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/ExpiryHogTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/ExpiryHogTest.java
deleted file mode 100644
index 6856837..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/ExpiryHogTest.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq;
-
-import java.util.concurrent.TimeUnit;
-import javax.jms.ConnectionFactory;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-
-
-
-
-/**
- * User: gtully
- */
-public class ExpiryHogTest extends JmsMultipleClientsTestSupport {
-    boolean sleep = false;
-
-    int numMessages = 4;
-
-    public void testImmediateDispatchWhenCacheDisabled() throws Exception {
-        ConnectionFactory f = createConnectionFactory();
-        destination = createDestination();
-        startConsumers(f, destination);
-        sleep = true;
-        this.startProducers(f, destination, numMessages);
-        allMessagesList.assertMessagesReceived(numMessages);
-    }
-
-    protected TextMessage createTextMessage(Session session, String initText) throws Exception {
-        if (sleep) {
-            TimeUnit.SECONDS.sleep(10);
-        }
-        TextMessage msg = super.createTextMessage(session, initText);
-        // what is the point of setting this !
-        // msg.setJMSExpiration(4000);
-        return msg;
-    }
-
-    @Override
-    protected void setUp() throws Exception {
-        autoFail = false;
-        persistent = true;
-        super.setUp();
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/JMSConsumerTest.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/JMSConsumerTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/JMSConsumerTest.java
deleted file mode 100644
index 408b40e..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/JMSConsumerTest.java
+++ /dev/null
@@ -1,936 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq;
-
-import java.lang.Thread.UncaughtExceptionHandler;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import javax.jms.BytesMessage;
-import javax.jms.DeliveryMode;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-
-
-
-
-import junit.framework.Test;
-
-
-import javax.jms.Destination;
-
-import org.apache.hedwig.jms.MessagingSessionFacade;
-import org.apache.hedwig.jms.SessionImpl;
-import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl;
-import org.apache.hedwig.jms.spi.HedwigConnectionImpl;
-import org.junit.Ignore;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Test cases used to test the JMS message consumer.
- */
-public class JMSConsumerTest extends JmsTestSupport {
-
-    private static final Logger LOG = LoggerFactory.getLogger(JMSConsumerTest.class);
-
-    public Destination destination;
-    public int deliveryMode;
-    public int prefetch;
-    public int ackMode;
-    public MessagingSessionFacade.DestinationType destinationType;
-    public boolean durableConsumer;
-
-    public static Test suite() {
-        return suite(JMSConsumerTest.class);
-    }
-
-    public static void main(String[] args) {
-        junit.textui.TestRunner.run(suite());
-    }
-
-    public void initCombosForTestMessageListenerWithConsumerCanBeStopped() {
-        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
-                                                           Integer.valueOf(DeliveryMode.PERSISTENT)});
-        addCombinationValues("destinationType", new MessagingSessionFacade.DestinationType[] {
-                MessagingSessionFacade.DestinationType.TOPIC});
-    }
-
-    public void testMessageListenerWithConsumerCanBeStopped() throws Exception {
-
-        final AtomicInteger counter = new AtomicInteger(0);
-        final CountDownLatch done1 = new CountDownLatch(1);
-        final CountDownLatch done2 = new CountDownLatch(1);
-
-        // Receive a message with the JMS API
-        connection.start();
-        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        destination = createDestination(session, destinationType);
-        MessageConsumer consumer = (MessageConsumer)session.createConsumer(destination);
-        consumer.setMessageListener(new MessageListener() {
-            public void onMessage(Message m) {
-                counter.incrementAndGet();
-                if (counter.get() == 1) {
-                    done1.countDown();
-                }
-                if (counter.get() == 2) {
-                    done2.countDown();
-                }
-            }
-        });
-
-        // Send a first message to make sure that the consumer dispatcher is
-        // running
-        sendMessages(session, destination, 1);
-        assertTrue(done1.await(1, TimeUnit.SECONDS));
-        assertEquals(1, counter.get());
-
-        // Stop the consumer.
-        connection.stop();
-
-        // Send a message, but should not get delivered.
-        sendMessages(session, destination, 1);
-        assertFalse(done2.await(1, TimeUnit.SECONDS));
-        assertEquals(1, counter.get());
-
-        // Start the consumer, and the message should now get delivered.
-        connection.start();
-        assertTrue(done2.await(1, TimeUnit.SECONDS));
-        assertEquals(2, counter.get());
-    }
-
-    public void testMessageListenerWithConsumerCanBeStoppedConcurently() throws Exception {
-
-        final AtomicInteger counter = new AtomicInteger(0);
-        final CountDownLatch closeDone = new CountDownLatch(1);
-
-        connection.start();
-        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-        destination = createDestination(session, MessagingSessionFacade.DestinationType.TOPIC);
-
-        final Map<Thread, Throwable> exceptions =
-            Collections.synchronizedMap(new HashMap<Thread, Throwable>());
-        Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler() {
-            public void uncaughtException(Thread t, Throwable e) {
-                LOG.error("Uncaught exception:", e);
-                exceptions.put(t, e);
-            }
-        });
-
-        final int numOutStanding = (connection.getHedwigClientConfig().getMaximumOutstandingMessages() * 2 / 3) + 1;
-
-        final MessageConsumer consumer = (MessageConsumer)session.createConsumer(destination);
-
-        final class AckAndClose implements Runnable {
-            private Message message;
-
-            public AckAndClose(Message m) {
-                this.message = m;
-            }
-
-            public void run() {
-                try {
-                    message.acknowledge();
-                    int count = counter.incrementAndGet();
-                    if (590 == count) {
-                        // close in a separate thread is ok by jms
-                        consumer.close();
-                        closeDone.countDown();
-                    }
-                } catch (Exception e) {
-                    LOG.error("Exception on close or ack:", e);
-                    exceptions.put(Thread.currentThread(), e);
-                }
-            }
-        };
-
-        final AtomicInteger listenerReceivedCount = new AtomicInteger(0);
-        // final ExecutorService executor = Executors.newSingleThreadExecutor();
-        final ExecutorService executor = Executors.newCachedThreadPool();
-        consumer.setMessageListener(new MessageListener() {
-            public void onMessage(Message m) {
-                // close can be in a different thread, but NOT acknowledge iirc
-                // - this will not cause a problem for us though ...
-                // ack and close eventually in separate thread
-                int val = listenerReceivedCount.incrementAndGet();
-                // System.out.println("message count : " + val + ", message : " + m);
-                executor.execute(new AckAndClose(m));
-                // new AckAndClose(m).run();
-            }
-        });
-
-        // preload the queue
-        sendMessages(session, destination, 600);
-
-        assert closeDone.await(10, TimeUnit.SECONDS) :
-        "closeDone : " + closeDone.getCount() + ", counter : " + counter.get()
-            + ", listenerReceivedCount : " + listenerReceivedCount.get();
-        // await possible exceptions
-        Thread.sleep(1000);
-        assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
-    }
-
-
-    public void initCombosForTestMutiReceiveWithPrefetch1() {
-        addCombinationValues("deliveryMode", new Object[] {
-                Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
-        addCombinationValues("ackMode", new Object[] {
-                Integer.valueOf(Session.AUTO_ACKNOWLEDGE), Integer.valueOf(Session.DUPS_OK_ACKNOWLEDGE),
-                Integer.valueOf(Session.CLIENT_ACKNOWLEDGE)});
-        addCombinationValues("destinationType", new Object[] {MessagingSessionFacade.DestinationType.TOPIC});
-    }
-
-    public void testMutiReceiveWithPrefetch1() throws Exception {
-
-        // Set prefetch to 1
-        connection.start();
-
-        // Use all the ack modes
-        Session session = connection.createSession(false, ackMode);
-        destination = createDestination(session, destinationType);
-        MessageConsumer consumer = session.createConsumer(destination);
-
-        // Send the messages
-        sendMessages(session, destination, 4);
-
-        // Make sure 4 messages were delivered.
-        Message message = null;
-        for (int i = 0; i < 4; i++) {
-            message = consumer.receive(1000);
-            assertNotNull(message);
-        }
-        assertNull(consumer.receiveNoWait());
-        assert null != message;
-        message.acknowledge();
-    }
-
-    public void initCombosForTestDurableConsumerSelectorChange() {
-        addCombinationValues("deliveryMode", new Object[] {
-                Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
-        addCombinationValues("destinationType", new Object[] {MessagingSessionFacade.DestinationType.TOPIC});
-    }
-
-    public void testDurableConsumerSelectorChange() throws Exception {
-
-        // Receive a message with the JMS API
-        if (null == connection.getClientID()) connection.setClientID(getName() + "test");
-        connection.start();
-        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        destination = createDestination(session, destinationType);
-        MessageProducer producer = session.createProducer(destination);
-        producer.setDeliveryMode(deliveryMode);
-        MessageConsumer consumer = session.createDurableSubscriber((Topic)destination, "test", "color='red'", false);
-
-        // Send the messages
-        TextMessage message = session.createTextMessage("1st");
-        message.setStringProperty("color", "red");
-        producer.send(message);
-
-        Message m = consumer.receive(1000);
-        assertNotNull(m);
-        assertEquals("1st", ((TextMessage)m).getText());
-
-        // Change the subscription.
-        consumer.close();
-        consumer = session.createDurableSubscriber((Topic)destination, "test", "color='blue'", false);
-
-        message = session.createTextMessage("2nd");
-        message.setStringProperty("color", "red");
-        producer.send(message);
-        message = session.createTextMessage("3rd");
-        message.setStringProperty("color", "blue");
-        producer.send(message);
-
-        // Selector should skip the 2nd message.
-        m = consumer.receive(1000);
-        assertNotNull(m);
-        assertEquals("3rd", ((TextMessage)m).getText());
-
-        assertNull(consumer.receiveNoWait());
-    }
-
-    public void initCombosForTestSendReceiveBytesMessage() {
-        addCombinationValues("deliveryMode", new Object[] {
-                Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
-        addCombinationValues("destinationType", new Object[] {MessagingSessionFacade.DestinationType.TOPIC});
-    }
-
-    public void testSendReceiveBytesMessage() throws Exception {
-
-        // Receive a message with the JMS API
-        connection.start();
-        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        destination = createDestination(session, destinationType);
-        MessageConsumer consumer = session.createConsumer(destination);
-        MessageProducer producer = session.createProducer(destination);
-
-        BytesMessage message = session.createBytesMessage();
-        message.writeBoolean(true);
-        message.writeBoolean(false);
-        producer.send(message);
-
-        // Make sure only 1 message was delivered.
-        BytesMessage m = (BytesMessage)consumer.receive(1000);
-        assertNotNull(m);
-        assertTrue(m.readBoolean());
-        assertFalse(m.readBoolean());
-
-        assertNull(consumer.receiveNoWait());
-    }
-
-    public void initCombosForTestSetMessageListenerAfterStart() {
-        addCombinationValues("deliveryMode", new Object[] {
-                Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
-        addCombinationValues("destinationType", new Object[] {MessagingSessionFacade.DestinationType.TOPIC});
-    }
-
-    public void testSetMessageListenerAfterStart() throws Exception {
-
-        final AtomicInteger counter = new AtomicInteger(0);
-        final CountDownLatch done = new CountDownLatch(1);
-
-        // Receive a message with the JMS API
-        connection.start();
-        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        destination = createDestination(session, destinationType);
-        MessageConsumer consumer = session.createConsumer(destination);
-
-        // See if the message get sent to the listener
-        consumer.setMessageListener(new MessageListener() {
-            public void onMessage(Message m) {
-                counter.incrementAndGet();
-                if (counter.get() == 4) {
-                    done.countDown();
-                }
-            }
-        });
-
-        // Send the messages
-        sendMessages(session, destination, 4);
-
-        assertTrue(done.await(1000, TimeUnit.MILLISECONDS));
-        Thread.sleep(200);
-
-        // Make sure only 4 messages were delivered.
-        assertEquals(4, counter.get());
-    }
-
-    public void initCombosForTestPassMessageListenerIntoCreateConsumer() {
-        addCombinationValues("destinationType", new Object[] {MessagingSessionFacade.DestinationType.TOPIC});
-    }
-
-    public void testPassMessageListenerIntoCreateConsumer() throws Exception {
-
-        final AtomicInteger counter = new AtomicInteger(0);
-        final CountDownLatch done = new CountDownLatch(1);
-
-        // Receive a message with the JMS API
-        connection.start();
-        SessionImpl session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        destination = createDestination(session, destinationType);
-        MessageConsumer consumer = session.createConsumer(destination);
-        consumer.setMessageListener(new MessageListener() {
-            public void onMessage(Message m) {
-                counter.incrementAndGet();
-                if (counter.get() == 4) {
-                    done.countDown();
-                }
-            }
-        });
-
-        // Send the messages
-        sendMessages(session, destination, 4);
-
-        assertTrue(done.await(1000, TimeUnit.MILLISECONDS));
-        Thread.sleep(200);
-
-        // Make sure only 4 messages were delivered.
-        assertEquals(4, counter.get());
-    }
-
-    public void initCombosForTestMessageListenerOnMessageCloseUnackedWithPrefetch1StayInQueue() {
-        addCombinationValues("deliveryMode", new Object[] {
-                Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
-        addCombinationValues("ackMode", new Object[] {Integer.valueOf(Session.CLIENT_ACKNOWLEDGE)});
-        addCombinationValues("destinationType", new Object[] {MessagingSessionFacade.DestinationType.TOPIC});
-    }
-
-    public void testMessageListenerOnMessageCloseUnackedWithPrefetch1StayInQueue() throws Exception {
-        final AtomicInteger counter = new AtomicInteger(0);
-        final CountDownLatch sendDone = new CountDownLatch(1);
-        final CountDownLatch got2Done = new CountDownLatch(1);
-
-        // Set prefetch to 1
-        // This test case does not work if optimized message dispatch is used as
-        // the main thread send block until the consumer receives the
-        // message. This test depends on thread decoupling so that the main
-        // thread can stop the consumer thread.
-        if (null == connection.getClientID()) connection.setClientID(getName() + "test-client-id-1");
-        connection.start();
-
-        // Use all the ack modes
-        Session session = connection.createSession(false, ackMode);
-        destination = createDestination(session, destinationType);
-        MessageConsumer consumer = session.createDurableSubscriber((Topic) destination, "subscriber-id1");
-        consumer.setMessageListener(new MessageListener() {
-            private final HedwigConnectionImpl _connection = connection;
-            public void onMessage(Message m) {
-                try {
-                    TextMessage tm = (TextMessage)m;
-                    LOG.info("Got in first listener: " + tm.getText());
-                    assertEquals(messageTextPrefix + counter.get(), tm.getText());
-                    counter.incrementAndGet();
-                    if (counter.get() == 2) {
-                        sendDone.await();
-                        _connection.close();
-                        got2Done.countDown();
-                    }
-                    // will fail when we close connection when counter == 2 !
-                    tm.acknowledge();
-                } catch (Throwable e) {
-                    // e.printStackTrace();
-                }
-            }
-        });
-
-        // Send the messages
-        sendMessages(session, destination, 4);
-        sendDone.countDown();
-
-        // Wait for first 2 messages to arrive.
-        assert got2Done.await(5, TimeUnit.SECONDS) :
-        "counter1 : " + counter.get() + ", got2Done : " + got2Done.getCount() + ", sendDone : " + sendDone.getCount();
-
-        // Re-start connection.
-        connection.close();
-        connection = (HedwigConnectionImpl)factory.createConnection();
-        if (null == connection.getClientID()) connection.setClientID(getName() + "test-client-id-1");
-        connections.add(connection);
-
-        // Pickup the remaining messages.
-        final CountDownLatch done2 = new CountDownLatch(1);
-        session = connection.createSession(false, ackMode);
-        consumer = session.createDurableSubscriber((Topic) destination, "subscriber-id1");
-        consumer.setMessageListener(new MessageListener() {
-            public void onMessage(Message m) {
-                try {
-                    TextMessage tm = (TextMessage)m;
-                    LOG.info("Got in second listener: " + tm.getText());
-                    // order is not guaranteed as the connection is started before the listener is set.
-                    // assertEquals(messageTextPrefix + counter.get(), tm.getText());
-                    counter.incrementAndGet();
-                    tm.acknowledge();
-                    if (counter.get() == 4) {
-                        done2.countDown();
-                    }
-                } catch (Throwable e) {
-                    LOG.error("unexpected ex onMessage: ", e);
-                }
-            }
-        });
-
-        connection.start();
-
-        assert done2.await(2000, TimeUnit.MILLISECONDS) :
-        "count2 : " + done2.getCount() + ", counter : " + counter.get();
-        Thread.sleep(200);
-
-        // assert msg 2 was redelivered as close() from onMessages() will only ack in auto_ack and dups_ok mode
-        assert 5 == counter.get(): "count3 : " + done2.getCount() + ", counter : " + counter.get();
-    }
-
-    public void initCombosForTestMessageListenerAutoAckOnCloseWithPrefetch1() {
-        addCombinationValues("deliveryMode", new Object[] {
-                Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
-        addCombinationValues("ackMode", new Object[] {
-                Integer.valueOf(Session.AUTO_ACKNOWLEDGE), Integer.valueOf(Session.CLIENT_ACKNOWLEDGE)});
-        addCombinationValues("destinationType", new Object[] {MessagingSessionFacade.DestinationType.TOPIC});
-    }
-
-    public void testMessageListenerAutoAckOnCloseWithPrefetch1() throws Exception {
-        final AtomicInteger counter = new AtomicInteger(0);
-        final CountDownLatch sendDone = new CountDownLatch(1);
-        final CountDownLatch got2Done = new CountDownLatch(1);
-
-        // Set prefetch to 1
-        // This test case does not work if optimized message dispatch is used as
-        // the main thread send block until the consumer receives the
-        // message. This test depends on thread decoupling so that the main
-        // thread can stop the consumer thread.
-        if (null == connection.getClientID()) connection.setClientID(getName() + "test-client-id-2");
-        connection.start();
-
-        // Use all the ack modes
-        Session session = connection.createSession(false, ackMode);
-        destination = createDestination(session, destinationType);
-        MessageConsumer consumer = session.createDurableSubscriber((Topic) destination, "subscriber-id2");
-        final List<Message> receivedMessages = new ArrayList<Message>(8);
-        consumer.setMessageListener(new MessageListener() {
-            final HedwigConnectionImpl _connection = connection;
-            public void onMessage(Message m) {
-                try {
-                    TextMessage tm = (TextMessage)m;
-                    LOG.info("Got in first listener: " + tm.getText());
-                    assertEquals(messageTextPrefix + counter.get(), tm.getText());
-                    counter.incrementAndGet();
-                    m.acknowledge();
-                    receivedMessages.add(m);
-                    if (counter.get() == 2) {
-                        sendDone.await();
-                        _connection.close();
-                        got2Done.countDown();
-                    }
-                } catch (Throwable e) {
-                    e.printStackTrace();
-                }
-            }
-        });
-
-        // Send the messages
-        sendMessages(session, destination, 4);
-        sendDone.countDown();
-
-        // Wait for first 2 messages to arrive.
-        assert got2Done.await(5, TimeUnit.SECONDS) :
-        "counter : " + counter.get() + ", got2Done : " + got2Done.getCount() + ", sendDone : " + sendDone.getCount();
-
-        // Re-start connection.
-        connection.close();
-        connection = (HedwigConnectionImpl)factory.createConnection();
-        if (null == connection.getClientID()) connection.setClientID(getName() + "test-client-id-2");
-        connections.add(connection);
-
-        // Pickup the remaining messages.
-        final CountDownLatch done2 = new CountDownLatch(1);
-        session = connection.createSession(false, ackMode);
-        consumer = session.createDurableSubscriber((Topic) destination, "subscriber-id2");
-        consumer.setMessageListener(new MessageListener() {
-            public void onMessage(Message m) {
-                try {
-                    TextMessage tm = (TextMessage)m;
-                    LOG.info("Got in second listener: " + tm.getText());
-                    counter.incrementAndGet();
-                    m.acknowledge();
-                    receivedMessages.add(m);
-                    if (counter.get() == 4) {
-                        done2.countDown();
-                    }
-                } catch (Throwable e) {
-                    LOG.error("unexpected ex onMessage: ", e);
-                }
-            }
-        });
-
-        connection.start();
-
-        assert done2.await(5, TimeUnit.SECONDS) : "count : " + done2.getCount() + ", counter : " + counter.get();
-        Thread.sleep(200);
-
-        // close from onMessage with Auto_ack will ack
-        // Make sure only 4 messages were delivered.
-        assert 4 == counter.get() :
-        "counter : " + counter.get() + ", got2Done : " + got2Done.getCount() + ", sendDone : "
-            + sendDone.getCount() + ", messages : " + receivedMessages;
-    }
-
-    public void initCombosForTestMessageListenerWithConsumerWithPrefetch1() {
-        addCombinationValues("deliveryMode", new Object[] {
-                Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
-        addCombinationValues("destinationType", new Object[] {MessagingSessionFacade.DestinationType.TOPIC});
-    }
-
-    public void testMessageListenerWithConsumerWithPrefetch1() throws Exception {
-
-        final AtomicInteger counter = new AtomicInteger(0);
-        final CountDownLatch done = new CountDownLatch(1);
-
-        // Receive a message with the JMS API
-        connection.start();
-
-        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        destination = createDestination(session, destinationType);
-        MessageConsumer consumer = session.createConsumer(destination);
-        consumer.setMessageListener(new MessageListener() {
-            public void onMessage(Message m) {
-                counter.incrementAndGet();
-                if (counter.get() == 4) {
-                    done.countDown();
-                }
-            }
-        });
-
-        // Send the messages
-        sendMessages(session, destination, 4);
-
-        assertTrue(done.await(1000, TimeUnit.MILLISECONDS));
-        Thread.sleep(200);
-
-        // Make sure only 4 messages were delivered.
-        assertEquals(4, counter.get());
-    }
-
-    public void initCombosForTestMessageListenerWithConsumer() {
-        addCombinationValues("deliveryMode", new Object[] {
-                Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
-        addCombinationValues("destinationType", new Object[] {MessagingSessionFacade.DestinationType.TOPIC});
-    }
-
-    public void testMessageListenerWithConsumer() throws Exception {
-
-        final AtomicInteger counter = new AtomicInteger(0);
-        final CountDownLatch done = new CountDownLatch(1);
-
-        // Receive a message with the JMS API
-        connection.start();
-        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        destination = createDestination(session, destinationType);
-        MessageConsumer consumer = session.createConsumer(destination);
-        consumer.setMessageListener(new MessageListener() {
-            public void onMessage(Message m) {
-                counter.incrementAndGet();
-                if (counter.get() == 4) {
-                    done.countDown();
-                }
-            }
-        });
-
-        // Send the messages
-        sendMessages(session, destination, 4);
-
-        assertTrue(done.await(1000, TimeUnit.MILLISECONDS));
-        Thread.sleep(200);
-
-        // Make sure only 4 messages were delivered.
-        assertEquals(4, counter.get());
-    }
-
-    public void initCombosForTestUnackedWithPrefetch1StayInQueue() {
-        addCombinationValues("deliveryMode", new Object[] {
-                Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
-        addCombinationValues("ackMode", new Object[] {
-                Integer.valueOf(Session.AUTO_ACKNOWLEDGE), Integer.valueOf(Session.DUPS_OK_ACKNOWLEDGE),
-                Integer.valueOf(Session.CLIENT_ACKNOWLEDGE)});
-        addCombinationValues("destinationType", new Object[] {MessagingSessionFacade.DestinationType.TOPIC});
-    }
-
-    public void testUnackedWithPrefetch1StayInQueue() throws Exception {
-
-        // Set prefetch to 1
-        if (null == connection.getClientID()) connection.setClientID(getName() + "test-client-id-3");
-        connection.start();
-
-        // Use all the ack modes
-        Session session = connection.createSession(false, ackMode);
-        destination = createDestination(session, destinationType);
-        MessageConsumer consumer = session.createDurableSubscriber((Topic) destination, "subscriber-id3");
-
-        // Send the messages
-        sendMessages(session, destination, 4);
-
-        // Only pick up the first 2 messages.
-        Message message = null;
-        for (int i = 0; i < 2; i++) {
-            message = consumer.receive(1000);
-            assertNotNull(message);
-            assert (message instanceof TextMessage);
-            assert (((TextMessage) message).getText().equals(messageTextPrefix  + i))
-                : "Received message " + ((TextMessage) message).getText() + " .. i = " + i;
-        }
-        assert null != message;
-        message.acknowledge();
-
-        connection.close();
-        connection = (HedwigConnectionImpl)factory.createConnection();
-        if (null == connection.getClientID()) connection.setClientID(getName() + "test-client-id-3");
-        // Use all the ack modes
-        session = connection.createSession(false, ackMode);
-        consumer = session.createDurableSubscriber((Topic) destination, "subscriber-id3");
-        connections.add(connection);
-        connection.start();
-
-        // Pickup the rest of the messages.
-        for (int i = 0; i < 2; i++) {
-            message = consumer.receive(1000);
-            assertNotNull(message);
-            assert (message instanceof TextMessage);
-            assert (((TextMessage) message).getText().equals(messageTextPrefix  + (i + 2))) :
-            "Received message " + ((TextMessage) message).getText() + " .. i = " + i;
-        }
-        message.acknowledge();
-        // assertNull(consumer.receiveNoWait());
-        {
-            Message msg = consumer.receiveNoWait();
-            assert null == msg : "Unexpected message " + msg;
-        }
-
-    }
-
-    public void initCombosForTestPrefetch1MessageNotDispatched() {
-        addCombinationValues("deliveryMode", new Object[] {
-                Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
-    }
-
-    public void testPrefetch1MessageNotDispatched() throws Exception {
-
-        // Set prefetch to 1
-        connection.start();
-
-        Session session = connection.createSession(true, 0);
-        destination = SessionImpl.asTopic("TEST");
-        MessageConsumer consumer = session.createConsumer(destination);
-
-        // The prefetch should fill up with 1 message.
-        // Since prefetch is still full, the 2nd message should get dispatched
-        // to another consumer.. lets create the 2nd consumer test that it does
-        // make sure it does.
-        HedwigConnectionImpl connection2 = (HedwigConnectionImpl)factory.createConnection();
-        connection2.start();
-        connections.add(connection2);
-        Session session2 = connection2.createSession(true, 0);
-        MessageConsumer consumer2 = session2.createConsumer(destination);
-
-        // Send 2 messages to the destination.
-        sendMessages(session, destination, 2);
-        session.commit();
-
-        // Pick up the first message.
-        Message message1 = consumer.receive(1000);
-        assertNotNull(message1);
-        assertNotNull(consumer.receive(1000));
-
-        // Pick up the 2nd messages.
-        Message message2 = consumer2.receive(5000);
-        assertNotNull(message2);
-        assertNotNull(consumer2.receive(1000));
-
-        session.commit();
-        session2.commit();
-
-        assertNull(consumer.receiveNoWait());
-
-    }
-
-    public void initCombosForTestDontStart() {
-        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT)});
-        addCombinationValues("destinationType", new Object[] { MessagingSessionFacade.DestinationType.TOPIC});
-    }
-
-    public void testDontStart() throws Exception {
-
-        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        destination = createDestination(session, destinationType);
-        MessageConsumer consumer = session.createConsumer(destination);
-
-        // Send the messages
-        sendMessages(session, destination, 1);
-
-        // Make sure no messages were delivered.
-        assertNull(consumer.receive(1000));
-    }
-
-    public void initCombosForTestStartAfterSend() {
-        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT)});
-        addCombinationValues("destinationType", new Object[] {MessagingSessionFacade.DestinationType.TOPIC});
-    }
-
-    public void testStartAfterSend() throws Exception {
-
-        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        destination = createDestination(session, destinationType);
-        MessageConsumer consumer = session.createConsumer(destination);
-
-        // Send the messages
-        sendMessages(session, destination, 1);
-
-        // Start the conncection after the message was sent.
-        connection.start();
-
-        // Make sure only 1 message was delivered.
-        assertNotNull(consumer.receive(1000));
-        assertNull(consumer.receiveNoWait());
-    }
-
-    public void initCombosForTestReceiveMessageWithConsumer() {
-        addCombinationValues("deliveryMode", new Object[] {
-                Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
-        addCombinationValues("destinationType", new Object[] {MessagingSessionFacade.DestinationType.TOPIC});
-    }
-
-    public void testReceiveMessageWithConsumer() throws Exception {
-
-        // Receive a message with the JMS API
-        connection.start();
-        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        destination = createDestination(session, destinationType);
-        MessageConsumer consumer = session.createConsumer(destination);
-
-        // Send the messages
-        sendMessages(session, destination, 1);
-
-        // Make sure only 1 message was delivered.
-        Message m = consumer.receive(1000);
-        assertNotNull(m);
-        assertEquals("0", ((TextMessage)m).getText());
-        assertNull(consumer.receiveNoWait());
-    }
-
-
-    public void testDupsOkConsumer() throws Exception {
-
-        // Receive a message with the JMS API
-        connection.start();
-        Session session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
-        destination = createDestination(session, MessagingSessionFacade.DestinationType.TOPIC);
-        MessageConsumer consumer = session.createDurableSubscriber((Topic) destination, "subscriber-id4");
-
-        // Send the messages
-        sendMessages(session, destination, 4);
-
-        // Make sure only 4 message are delivered.
-        for( int i=0; i < 4; i++){
-            Message m = consumer.receive(1000);
-            assertNotNull(m);
-        }
-        assertNull(consumer.receive(1000));
-
-        // Close out the consumer.. no other messages should be left on the queue.
-        consumer.close();
-
-        consumer = session.createDurableSubscriber((Topic) destination, "subscriber-id4");
-        assertNull(consumer.receive(1000));
-    }
-
-    public void testRedispatchOfUncommittedTx() throws Exception {
-
-        connection.start();
-        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
-        destination = createDestination(session, MessagingSessionFacade.DestinationType.TOPIC);
-        MessageConsumer consumer = session.createDurableSubscriber((Topic) destination, "subscriber-id2");
-
-        sendMessages(connection, destination, 2);
-
-        assertNotNull(consumer.receive(1000));
-        assertNotNull(consumer.receive(1000));
-
-        // install another consumer while message dispatch is unacked/uncommitted
-
-        // no commit so will auto rollback and get re-dispatched to redisptachConsumer
-        session.close();
-
-        Session redispatchSession = connection.createSession(true, Session.SESSION_TRANSACTED);
-        MessageConsumer redispatchConsumer
-            = redispatchSession.createDurableSubscriber((Topic) destination, "subscriber-id2");
-
-        Message msg = redispatchConsumer.receive(1000);
-        assertNotNull(msg);
-        // assertTrue("redelivered flag set", msg.getJMSRedelivered());
-
-        msg = redispatchConsumer.receive(1000);
-        assertNotNull(msg);
-        // assertTrue(msg.getJMSRedelivered());
-        redispatchSession.commit();
-
-        assertNull(redispatchConsumer.receive(500));
-        redispatchSession.close();
-    }
-
-
-    public void testRedispatchOfRolledbackTx() throws Exception {
-
-        connection.start();
-        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
-        destination = createDestination(session, MessagingSessionFacade.DestinationType.TOPIC);
-        MessageConsumer consumer = session.createDurableSubscriber((Topic) destination, "subscriber-id1");
-
-        sendMessages(connection, destination, 2);
-
-        assertNotNull(consumer.receive(1000));
-        assertNotNull(consumer.receive(1000));
-
-        // install another consumer while message dispatch is unacked/uncommitted
-
-        session.rollback();
-        session.close();
-
-        Session redispatchSession = connection.createSession(true, Session.SESSION_TRANSACTED);
-        MessageConsumer redispatchConsumer
-            = redispatchSession.createDurableSubscriber((Topic) destination, "subscriber-id1");
-
-        Message msg = redispatchConsumer.receive(1000);
-        assertNotNull(msg);
-        // assertTrue(msg.getJMSRedelivered());
-        msg = redispatchConsumer.receive(1000);
-        assertNotNull(msg);
-        // assertTrue(msg.getJMSRedelivered());
-        redispatchSession.commit();
-
-        assertNull(redispatchConsumer.receive(500));
-        redispatchSession.close();
-    }
-
-    public void initCombosForTestAckOfExpired() {
-        addCombinationValues("destinationType",
-                new Object[] {MessagingSessionFacade.DestinationType.TOPIC});
-    }
-
-    public void testAckOfExpired() throws Exception {
-        HedwigConnectionFactoryImpl fact = new HedwigConnectionFactoryImpl();
-        connection = fact.createConnection();
-
-        connection.start();
-        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        destination = (Destination) (destinationType == MessagingSessionFacade.DestinationType.QUEUE ?
-                session.createTopic("test") : session.createTopic("test"));
-
-        MessageConsumer consumer = session.createConsumer(destination);
-
-        Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-            MessageProducer producer = sendSession.createProducer(destination);
-        final int count = 4;
-
-
-        // producer.setTimeToLive(0);
-        for (int i = 0; i < count; i++) {
-            TextMessage message = sendSession.createTextMessage("no expiry" + i);
-            producer.send(message);
-        }
-
-        MessageConsumer amqConsumer = (MessageConsumer) consumer;
-
-        for(int i=0; i<count; i++) {
-            TextMessage msg = (TextMessage) amqConsumer.receive();
-            assertNotNull(msg);
-            assertTrue("message has \"no expiry\" text: " + msg.getText(), msg.getText().contains("no expiry"));
-
-            // force an ack when there are expired messages
-            msg.acknowledge();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/JMSDurableTopicRedeliverTest.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/JMSDurableTopicRedeliverTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/JMSDurableTopicRedeliverTest.java
deleted file mode 100644
index 343919a..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/JMSDurableTopicRedeliverTest.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq;
-
-import javax.jms.Topic;
-import javax.jms.Message;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class JMSDurableTopicRedeliverTest extends JmsTopicRedeliverTest {
-
-    private static final Logger LOG = LoggerFactory.getLogger(JMSDurableTopicRedeliverTest.class);
-
-    protected void setUp() throws Exception {
-        durable = true;
-        super.setUp();
-    }
-
-    /**
-     * Sends and consumes the messages.
-     *
-     * @throws Exception
-     */
-    public void testRedeliverNewSession() throws Exception {
-        String text = "TEST: " + System.currentTimeMillis();
-        Message sendMessage = session.createTextMessage(text);
-
-        if (verbose) {
-            LOG.info("About to send a message: " + sendMessage + " with text: " + text);
-        }
-        producer.send(producerDestination, sendMessage);
-
-        // receive but don't acknowledge
-        Message unackMessage = consumer.receive(1000);
-        assertNotNull(unackMessage);
-        String unackId = unackMessage.getJMSMessageID();
-        assertEquals(((TextMessage)unackMessage).getText(), text);
-        assertFalse(unackMessage.getJMSRedelivered());
-        consumeSession.close();
-        consumer.close();
-
-        // receive then acknowledge
-        consumeSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-        consumer = createConsumer();
-        Message ackMessage = consumer.receive(1000);
-        assertNotNull(ackMessage);
-        ackMessage.acknowledge();
-        String ackId = ackMessage.getJMSMessageID();
-        assertEquals(((TextMessage)ackMessage).getText(), text);
-        // assertTrue(ackMessage.getJMSRedelivered());
-        assertEquals(unackId, ackId);
-        consumeSession.close();
-        consumer.close();
-
-        consumeSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-        consumer = createConsumer();
-        assertNull(consumer.receive(1000));
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/JMSIndividualAckTest.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/JMSIndividualAckTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/JMSIndividualAckTest.java
deleted file mode 100644
index 24551f3..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/JMSIndividualAckTest.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq;
-
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-
-/**
- *
- */
-public class JMSIndividualAckTest extends TestSupport {
-
-    private Connection connection;
-
-    protected void setUp() throws Exception {
-        super.setUp();
-        connection = createConnection();
-    }
-
-    /**
-     * @see junit.framework.TestCase#tearDown()
-     */
-    protected void tearDown() throws Exception {
-        if (connection != null) {
-            connection.close();
-            connection = null;
-        }
-        super.tearDown();
-    }
-
-    /**
-     * Tests if acknowledged messages are being consumed.
-     *
-     * @throws JMSException
-     */
-    public void testAckedMessageAreConsumed() throws JMSException {
-        connection.start();
-        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-        Topic queue = session.createTopic(getQueueName());
-        MessageProducer producer = session.createProducer(queue);
-        MessageConsumer consumer = session.createDurableSubscriber(queue, "subscriber-id1");
-
-        producer.send(session.createTextMessage("Hello"));
-
-        // Consume the message...
-        Message msg = consumer.receive(1000);
-        assertNotNull(msg);
-        msg.acknowledge();
-
-        // Reset the session.
-        session.close();
-        session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-
-        // Attempt to Consume the message...
-        consumer = session.createDurableSubscriber(queue, "subscriber-id1");
-        msg = consumer.receive(1000);
-        assertNull(msg);
-
-        session.close();
-    }
-
-    /**
-     * Tests if acknowledged messages are being consumed.
-     *
-     * @throws JMSException
-     */
-    // This test cant, unfortunately, pass
-    //- in hedwig, acknowledge is a ACKNOWLEDGE UNTIL. So the last ack will ack all messages until then ...
-    /*
-    public void testLastMessageAcked() throws JMSException {
-        connection.start();
-        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-        Topic queue = session.createTopic(getQueueName());
-        MessageProducer producer = session.createProducer(queue);
-        MessageConsumer consumer = session.createDurableSubscriber(queue, "subscriber-id2");
-        TextMessage msg1 = session.createTextMessage("msg1");
-        TextMessage msg2 = session.createTextMessage("msg2");
-        TextMessage msg3 = session.createTextMessage("msg3");
-        producer.send(msg1);
-        producer.send(msg2);
-        producer.send(msg3);
-
-        // Consume the message...
-        Message msg = consumer.receive(1000);
-        assertNotNull(msg);
-        msg = consumer.receive(1000);
-        assertNotNull(msg);
-        msg = consumer.receive(1000);
-        assertNotNull(msg);
-        msg.acknowledge();
-
-        // Reset the session.
-        session.close();
-        session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-
-        // Attempt to Consume the message...
-        consumer = session.createDurableSubscriber(queue, "subscriber-id2");
-        msg = consumer.receive(1000);
-        assertNotNull(msg);
-        assertEquals(msg1,msg);
-        msg = consumer.receive(1000);
-        assertNotNull(msg);
-        assertEquals(msg2,msg);
-        msg = consumer.receive(1000);
-        assertNull(msg);
-        session.close();
-    }
-    */
-
-    /**
-     * Tests if unacknowledged messages are being re-delivered when the consumer connects again.
-     *
-     * @throws JMSException
-     */
-    public void testUnAckedMessageAreNotConsumedOnSessionClose() throws JMSException {
-        connection.start();
-        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-        Topic queue = session.createTopic(getQueueName());
-        MessageProducer producer = session.createProducer(queue);
-        MessageConsumer consumer = session.createDurableSubscriber(queue, "subscriber-id3");
-        producer.send(session.createTextMessage("Hello"));
-
-        // Consume the message...
-        Message msg = consumer.receive(1000);
-        assertNotNull(msg);
-        // Don't ack the message.
-
-        // Reset the session.  This should cause the unacknowledged message to be re-delivered.
-        session.close();
-        session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-
-        // Attempt to Consume the message...
-        consumer = session.createDurableSubscriber(queue, "subscriber-id3");
-        msg = consumer.receive(2000);
-        assertNotNull(msg);
-        msg.acknowledge();
-
-        session.close();
-    }
-
-    protected String getQueueName() {
-        return getClass().getName() + "." + getName();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/JMSMessageTest.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/JMSMessageTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/JMSMessageTest.java
deleted file mode 100644
index cee1698..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/JMSMessageTest.java
+++ /dev/null
@@ -1,504 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq;
-
-import java.net.URISyntaxException;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.Vector;
-
-import javax.jms.BytesMessage;
-import javax.jms.ConnectionFactory;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.MapMessage;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageEOFException;
-import javax.jms.MessageFormatException;
-import javax.jms.MessageProducer;
-import javax.jms.ObjectMessage;
-import javax.jms.Session;
-import javax.jms.StreamMessage;
-import javax.jms.TextMessage;
-
-import junit.framework.Test;
-import org.apache.hedwig.jms.MessagingSessionFacade;
-import org.apache.hedwig.jms.message.StreamMessageImpl;
-import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl;
-
-import javax.jms.Destination;
-
-/**
- * Test cases used to test the JMS message consumer.
- */
-public class JMSMessageTest extends JmsTestSupport {
-
-    public Destination destination;
-    public int deliveryMode = DeliveryMode.NON_PERSISTENT;
-    public int prefetch;
-    public int ackMode;
-    public MessagingSessionFacade.DestinationType destinationType = MessagingSessionFacade.DestinationType.TOPIC;
-    public boolean durableConsumer;
-
-    /**
-     * Run all these tests in both marshaling and non-marshaling mode.
-     */
-    public void initCombos() {
-        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
-                                                           Integer.valueOf(DeliveryMode.PERSISTENT)});
-        addCombinationValues("destinationType", new Object[] {MessagingSessionFacade.DestinationType.TOPIC});
-    }
-
-    public void testTextMessage() throws Exception {
-
-        // Receive a message with the JMS API
-        connection.start();
-        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        destination = createDestination(session, destinationType);
-        MessageConsumer consumer = session.createConsumer(destination);
-        MessageProducer producer = session.createProducer(destination);
-
-        // Send the message.
-        {
-            TextMessage message = session.createTextMessage();
-            message.setText("Hi");
-            producer.send(message);
-        }
-
-        // Check the Message
-        {
-            TextMessage message = (TextMessage)consumer.receive(1000);
-            assertNotNull(message);
-            assertEquals("Hi", message.getText());
-        }
-
-        assertNull(consumer.receiveNoWait());
-    }
-
-    public static Test suite() {
-        return suite(JMSMessageTest.class);
-    }
-
-    public static void main(String[] args) {
-        junit.textui.TestRunner.run(suite());
-    }
-
-    protected ConnectionFactory createConnectionFactory() throws URISyntaxException {
-        HedwigConnectionFactoryImpl factory = new HedwigConnectionFactoryImpl();
-        return factory;
-    }
-
-    public void testBytesMessageLength() throws Exception {
-
-        // Receive a message with the JMS API
-        connection.start();
-        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        destination = createDestination(session, destinationType);
-        MessageConsumer consumer = session.createConsumer(destination);
-        MessageProducer producer = session.createProducer(destination);
-
-        // Send the message
-        {
-            BytesMessage message = session.createBytesMessage();
-            message.writeInt(1);
-            message.writeInt(2);
-            message.writeInt(3);
-            message.writeInt(4);
-            producer.send(message);
-        }
-
-        // Check the message.
-        {
-            BytesMessage message = (BytesMessage)consumer.receive(1000);
-            assertNotNull(message);
-            assertEquals(16, message.getBodyLength());
-        }
-
-        assertNull(consumer.receiveNoWait());
-    }
-
-    public void testObjectMessage() throws Exception {
-
-        // Receive a message with the JMS API
-        connection.start();
-        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        destination = createDestination(session, destinationType);
-        MessageConsumer consumer = session.createConsumer(destination);
-        MessageProducer producer = session.createProducer(destination);
-
-        // send the message.
-        {
-            ObjectMessage message = session.createObjectMessage();
-            message.setObject("Hi");
-            producer.send(message);
-        }
-
-        // Check the message
-        {
-            ObjectMessage message = (ObjectMessage)consumer.receive(1000);
-            assertNotNull(message);
-            assertEquals("Hi", message.getObject());
-        }
-        assertNull(consumer.receiveNoWait());
-    }
-
-    public void testBytesMessage() throws Exception {
-
-        // Receive a message with the JMS API
-        connection.start();
-        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        destination = createDestination(session, destinationType);
-        MessageConsumer consumer = session.createConsumer(destination);
-        MessageProducer producer = session.createProducer(destination);
-
-        // Send the message
-        {
-            BytesMessage message = session.createBytesMessage();
-            message.writeBoolean(true);
-            producer.send(message);
-        }
-
-        // Check the message
-        {
-            BytesMessage message = (BytesMessage)consumer.receive(1000);
-            assertNotNull(message);
-            assertTrue(message.readBoolean());
-
-            try {
-                message.readByte();
-                fail("Expected exception not thrown.");
-            } catch (MessageEOFException e) {
-            }
-
-        }
-        assertNull(consumer.receiveNoWait());
-    }
-
-    public void testStreamMessage() throws Exception {
-
-        // Receive a message with the JMS API
-        connection.start();
-        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        destination = createDestination(session, destinationType);
-        MessageConsumer consumer = session.createConsumer(destination);
-        MessageProducer producer = session.createProducer(destination);
-
-        // Send the message.
-        {
-            StreamMessage message = session.createStreamMessage();
-            message.writeString("This is a test to see how it works.");
-            producer.send(message);
-        }
-
-        // Check the message.
-        {
-            StreamMessage message = (StreamMessage)consumer.receive(1000);
-            assertNotNull(message);
-
-            // Invalid conversion should throw exception and not move the stream
-            // position.
-            try {
-                message.readByte();
-                fail("Should have received NumberFormatException");
-            } catch (NumberFormatException e) {
-            } catch (MessageFormatException e) {
-            }
-
-            assertEquals("This is a test to see how it works.", message.readString());
-
-            // Invalid conversion should throw exception and not move the stream
-            // position.
-            try {
-                message.readByte();
-                fail("Should have received MessageEOFException");
-            } catch (MessageEOFException e) {
-            }
-        }
-        assertNull(consumer.receiveNoWait());
-    }
-
-    public void testMapMessage() throws Exception {
-
-        // Receive a message with the JMS API
-        connection.start();
-        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        destination = createDestination(session, destinationType);
-        MessageConsumer consumer = session.createConsumer(destination);
-        MessageProducer producer = session.createProducer(destination);
-
-        // send the message.
-        {
-            MapMessage message = session.createMapMessage();
-            message.setBoolean("boolKey", true);
-            producer.send(message);
-        }
-
-        // get the message.
-        {
-            MapMessage message = (MapMessage)consumer.receive(1000);
-            assertNotNull(message);
-            assertTrue(message.getBoolean("boolKey"));
-        }
-        assertNull(consumer.receiveNoWait());
-    }
-
-    static class ForeignMessage implements TextMessage {
-
-        public int deliveryMode;
-
-        private String messageId;
-        private long timestamp;
-        private String correlationId;
-        private Destination replyTo;
-        private Destination destination;
-        private boolean redelivered;
-        private String type;
-        private long expiration;
-        private int priority;
-        private String text;
-        private HashMap<String, Object> props = new HashMap<String, Object>();
-
-        public String getJMSMessageID() throws JMSException {
-            return messageId;
-        }
-
-        public void setJMSMessageID(String arg0) throws JMSException {
-            messageId = arg0;
-        }
-
-        public long getJMSTimestamp() throws JMSException {
-            return timestamp;
-        }
-
-        public void setJMSTimestamp(long arg0) throws JMSException {
-            timestamp = arg0;
-        }
-
-        public byte[] getJMSCorrelationIDAsBytes() throws JMSException {
-            return null;
-        }
-
-        public void setJMSCorrelationIDAsBytes(byte[] arg0) throws JMSException {
-        }
-
-        public void setJMSCorrelationID(String arg0) throws JMSException {
-            correlationId = arg0;
-        }
-
-        public String getJMSCorrelationID() throws JMSException {
-            return correlationId;
-        }
-
-        public Destination getJMSReplyTo() throws JMSException {
-            return replyTo;
-        }
-
-        public void setJMSReplyTo(Destination arg0) throws JMSException {
-            replyTo = arg0;
-        }
-
-        public Destination getJMSDestination() throws JMSException {
-            return destination;
-        }
-
-        public void setJMSDestination(Destination arg0) throws JMSException {
-            destination = arg0;
-        }
-
-        public int getJMSDeliveryMode() throws JMSException {
-            return deliveryMode;
-        }
-
-        public void setJMSDeliveryMode(int arg0) throws JMSException {
-            deliveryMode = arg0;
-        }
-
-        public boolean getJMSRedelivered() throws JMSException {
-            return redelivered;
-        }
-
-        public void setJMSRedelivered(boolean arg0) throws JMSException {
-            redelivered = arg0;
-        }
-
-        public String getJMSType() throws JMSException {
-            return type;
-        }
-
-        public void setJMSType(String arg0) throws JMSException {
-            type = arg0;
-        }
-
-        public long getJMSExpiration() throws JMSException {
-            return expiration;
-        }
-
-        public void setJMSExpiration(long arg0) throws JMSException {
-            expiration = arg0;
-        }
-
-        public int getJMSPriority() throws JMSException {
-            return priority;
-        }
-
-        public void setJMSPriority(int arg0) throws JMSException {
-            priority = arg0;
-        }
-
-        public void clearProperties() throws JMSException {
-        }
-
-        public boolean propertyExists(String arg0) throws JMSException {
-            return false;
-        }
-
-        public boolean getBooleanProperty(String arg0) throws JMSException {
-            return false;
-        }
-
-        public byte getByteProperty(String arg0) throws JMSException {
-            return 0;
-        }
-
-        public short getShortProperty(String arg0) throws JMSException {
-            return 0;
-        }
-
-        public int getIntProperty(String arg0) throws JMSException {
-            return 0;
-        }
-
-        public long getLongProperty(String arg0) throws JMSException {
-            return 0;
-        }
-
-        public float getFloatProperty(String arg0) throws JMSException {
-            return 0;
-        }
-
-        public double getDoubleProperty(String arg0) throws JMSException {
-            return 0;
-        }
-
-        public String getStringProperty(String arg0) throws JMSException {
-            return (String)props.get(arg0);
-        }
-
-        public Object getObjectProperty(String arg0) throws JMSException {
-            return props.get(arg0);
-        }
-
-        public Enumeration getPropertyNames() throws JMSException {
-            return new Vector<String>(props.keySet()).elements();
-        }
-
-        public void setBooleanProperty(String arg0, boolean arg1) throws JMSException {
-        }
-
-        public void setByteProperty(String arg0, byte arg1) throws JMSException {
-        }
-
-        public void setShortProperty(String arg0, short arg1) throws JMSException {
-        }
-
-        public void setIntProperty(String arg0, int arg1) throws JMSException {
-        }
-
-        public void setLongProperty(String arg0, long arg1) throws JMSException {
-        }
-
-        public void setFloatProperty(String arg0, float arg1) throws JMSException {
-        }
-
-        public void setDoubleProperty(String arg0, double arg1) throws JMSException {
-        }
-
-        public void setStringProperty(String arg0, String arg1) throws JMSException {
-            props.put(arg0, arg1);
-        }
-
-        public void setObjectProperty(String arg0, Object arg1) throws JMSException {
-            props.put(arg0, arg1);
-        }
-
-        public void acknowledge() throws JMSException {
-        }
-
-        public void clearBody() throws JMSException {
-        }
-
-        public void setText(String arg0) throws JMSException {
-            text = arg0;
-        }
-
-        public String getText() throws JMSException {
-            return text;
-        }
-    }
-
-    public void testForeignMessage() throws Exception {
-
-        // Receive a message with the JMS API
-        connection.start();
-        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        destination = createDestination(session, destinationType);
-        MessageConsumer consumer = session.createConsumer(destination);
-        MessageProducer producer = session.createProducer(destination);
-
-        // Send the message.
-        {
-            ForeignMessage message = new ForeignMessage();
-            message.text = "Hello";
-            message.setStringProperty("test", "value");
-            // long timeToLive = 10000L;
-            long timeToLive = 0L;
-            long start = System.currentTimeMillis();
-            producer.send(message, Session.AUTO_ACKNOWLEDGE, 7, timeToLive);
-            long end = System.currentTimeMillis();
-
-
-            //validate jms spec 1.1 section 3.4.11 table 3.1
-            // JMSDestination, JMSDeliveryMode,  JMSPriority, JMSMessageID, and JMSTimestamp
-            //must be set by sending a message.
-
-            // This is NOT specified in the spec !
-            // exception for jms destination as the format is provider defined so it is only set on the copy
-            // assertNull(message.getJMSDestination());
-
-            assertEquals(Session.AUTO_ACKNOWLEDGE, message.getJMSDeliveryMode());
-            // assertTrue(start  + timeToLive <= message.getJMSExpiration());
-            // assertTrue(end + timeToLive >= message.getJMSExpiration());
-            assertEquals(7, message.getJMSPriority());
-            assertNotNull(message.getJMSMessageID());
-            assertTrue(start <= message.getJMSTimestamp());
-            assertTrue(end >= message.getJMSTimestamp());
-        }
-
-        // Validate message is OK.
-        {
-            TextMessage message = (TextMessage)consumer.receive(1000);
-            assertNotNull(message);
-            assertEquals("Hello", message.getText());
-            assertEquals("value", message.getStringProperty("test"));
-        }
-
-        assertNull(consumer.receiveNoWait());
-    }
-
-}