You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2016/03/16 04:44:50 UTC
[40/49] bookkeeper git commit: BOOKKEEPER-769: Remove the Hedwig Code
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/CombinationTestSupport.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/CombinationTestSupport.java b/hedwig-client-jms/src/test/java/org/apache/activemq/CombinationTestSupport.java
deleted file mode 100644
index d4155a7..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/CombinationTestSupport.java
+++ /dev/null
@@ -1,250 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq;
-
-import java.lang.reflect.Field;
-import java.lang.reflect.Method;
-import java.lang.reflect.Modifier;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedHashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import junit.framework.Test;
-import junit.framework.TestSuite;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Poor mans way of getting JUnit to run a test case through a few different
- * combinations of options. Usage: If you have a test case called testFoo what
- * you want to run through a few combinations, of of values for the attributes
- * age and color, you would something like: <code>
- * public void initCombosForTestFoo() {
- * addCombinationValues( "age", new Object[]{ new Integer(21), new Integer(30) } );
- * addCombinationValues( "color", new Object[]{"blue", "green"} );
- * }
- * </code>
- * The testFoo test case would be run for each possible combination of age and
- * color that you setup in the initCombosForTestFoo method. Before each
- * combination is run, the age and color fields of the test class are set to one
- * of the values defined. This is done before the normal setUp method is called.
- * If you want the test combinations to show up as separate test runs in the
- * JUnit reports, add a suite method to your test case similar to: <code>
- * public static Test suite() {
- * return suite(FooTest.class);
- * }
- * </code>
- *
- *
- */
-public abstract class CombinationTestSupport extends AutoFailTestSupport {
-
- private static final Logger LOG = LoggerFactory.getLogger(CombinationTestSupport.class);
-
- private HashMap<String, ComboOption> comboOptions = new HashMap<String, ComboOption>();
- private boolean combosEvaluated;
- private Map<String, Object> options;
-
- static class ComboOption {
- final String attribute;
- final LinkedHashSet<Object> values = new LinkedHashSet<Object>();
-
- public ComboOption(String attribute, Collection<Object> options) {
- this.attribute = attribute;
- this.values.addAll(options);
- }
- }
-
- public void addCombinationValues(String attribute, Object[] options) {
- ComboOption co = this.comboOptions.get(attribute);
- if (co == null) {
- this.comboOptions.put(attribute, new ComboOption(attribute, Arrays.asList(options)));
- } else {
- co.values.addAll(Arrays.asList(options));
- }
- }
-
- public void runBare() throws Throwable {
- if (combosEvaluated) {
- super.runBare();
- } else {
- CombinationTestSupport[] combinations = getCombinations();
- for (int i = 0; i < combinations.length; i++) {
- CombinationTestSupport test = combinations[i];
- if (getName() == null || getName().equals(test.getName())) {
- test.runBare();
- }
- }
- }
- }
-
- private void setOptions(Map<String, Object> options) throws NoSuchFieldException, IllegalAccessException {
- this.options = options;
- for (Iterator<String> iterator = options.keySet().iterator(); iterator.hasNext();) {
- String attribute = iterator.next();
- Object value = options.get(attribute);
- try {
- Field field = getClass().getField(attribute);
- field.set(this, value);
- } catch (Throwable e) {
- try {
- boolean found = false;
- String setterName = "set" + attribute.substring(0, 1).toUpperCase() +
- attribute.substring(1);
- for(Method method : getClass().getMethods()) {
- if (method.getName().equals(setterName)) {
- method.invoke(this, value);
- found = true;
- break;
- }
- }
-
- if (!found) {
- throw new NoSuchMethodError("No setter found for field: " + attribute);
- }
-
- } catch(Throwable ex) {
- LOG.info("Could not set field '" + attribute + "' to value '" + value +
- "', make sure the field exists and is public or has a setter.");
- }
- }
- }
- }
-
- private CombinationTestSupport[] getCombinations() {
- try {
- Method method = getClass().getMethod("initCombos", (Class[])null);
- method.invoke(this, (Object[])null);
- } catch (Throwable e) {
- }
-
- String name = getName().split(" ")[0];
- String comboSetupMethodName = "initCombosFor" + Character.toUpperCase(name.charAt(0)) + name.substring(1);
- try {
- Method method = getClass().getMethod(comboSetupMethodName, (Class[])null);
- method.invoke(this, (Object[])null);
- } catch (Throwable e) {
- }
-
- try {
- ArrayList<HashMap<String, Object>> expandedOptions = new ArrayList<HashMap<String, Object>>();
- expandCombinations(new ArrayList<ComboOption>(comboOptions.values()), expandedOptions);
-
- if (expandedOptions.isEmpty()) {
- combosEvaluated = true;
- return new CombinationTestSupport[] {this};
- } else {
-
- ArrayList<CombinationTestSupport> result = new ArrayList<CombinationTestSupport>();
- // Run the test case for each possible combination
- for (Iterator<HashMap<String, Object>> iter = expandedOptions.iterator(); iter.hasNext();) {
- CombinationTestSupport combo = (CombinationTestSupport)TestSuite.createTest(getClass(), name);
- combo.combosEvaluated = true;
- combo.setOptions(iter.next());
- result.add(combo);
- }
-
- CombinationTestSupport rc[] = new CombinationTestSupport[result.size()];
- result.toArray(rc);
- return rc;
- }
- } catch (Throwable e) {
- combosEvaluated = true;
- return new CombinationTestSupport[] {this};
- }
-
- }
-
- private void expandCombinations(List<ComboOption> optionsLeft, List<HashMap<String, Object>> expandedCombos) {
- if (!optionsLeft.isEmpty()) {
- HashMap<String, Object> map;
- if (comboOptions.size() == optionsLeft.size()) {
- map = new HashMap<String, Object>();
- expandedCombos.add(map);
- } else {
- map = expandedCombos.get(expandedCombos.size() - 1);
- }
-
- LinkedList<ComboOption> l = new LinkedList<ComboOption>(optionsLeft);
- ComboOption comboOption = l.removeLast();
- int i = 0;
- for (Iterator<Object> iter = comboOption.values.iterator(); iter.hasNext();) {
- Object value = iter.next();
- if (i != 0) {
- map = new HashMap<String, Object>(map);
- expandedCombos.add(map);
- }
- map.put(comboOption.attribute, value);
- expandCombinations(l, expandedCombos);
- i++;
- }
- }
- }
-
- public static Test suite(Class<? extends CombinationTestSupport> clazz) {
- TestSuite suite = new TestSuite();
-
- ArrayList<String> names = new ArrayList<String>();
- Method[] methods = clazz.getMethods();
- for (int i = 0; i < methods.length; i++) {
- String name = methods[i].getName();
- if (names.contains(name) || !isPublicTestMethod(methods[i])) {
- continue;
- }
- names.add(name);
- Test test = TestSuite.createTest(clazz, name);
- if (test instanceof CombinationTestSupport) {
- CombinationTestSupport[] combinations = ((CombinationTestSupport)test).getCombinations();
- for (int j = 0; j < combinations.length; j++) {
- suite.addTest(combinations[j]);
- }
- } else {
- suite.addTest(test);
- }
- }
- return suite;
- }
-
- private static boolean isPublicTestMethod(Method m) {
- return isTestMethod(m) && Modifier.isPublic(m.getModifiers());
- }
-
- private static boolean isTestMethod(Method m) {
- String name = m.getName();
- Class<?>[] parameters = m.getParameterTypes();
- Class<?> returnType = m.getReturnType();
- return parameters.length == 0 && name.startsWith("test") && returnType.equals(Void.TYPE);
- }
-
- public String getName() {
- return getName(false);
- }
-
- public String getName(boolean original) {
- if (options != null && !original) {
- return super.getName() + " " + options;
- }
- return super.getName();
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/ConnectionCleanupTest.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/ConnectionCleanupTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/ConnectionCleanupTest.java
deleted file mode 100644
index e1e85e5..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/ConnectionCleanupTest.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq;
-
-import javax.jms.JMSException;
-import javax.jms.Session;
-
-import junit.framework.TestCase;
-import org.apache.hedwig.JmsTestBase;
-import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl;
-import org.apache.hedwig.jms.spi.HedwigConnectionImpl;
-
-public class ConnectionCleanupTest extends JmsTestBase {
-
- private HedwigConnectionImpl connection;
- private HedwigConnectionFactoryImpl factory;
-
- protected void setUp() throws Exception {
- super.setUp();
- this.factory = new HedwigConnectionFactoryImpl();
- connection = factory.createConnection();
- }
-
- /**
- * @see junit.framework.TestCase#tearDown()
- */
- protected void tearDown() throws Exception {
- connection.close();
- super.tearDown();
- }
-
- /**
- * @throws JMSException
- */
- public void testChangeClientID() throws JMSException {
-
- connection.setClientID("test");
- connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- try {
- connection.setClientID("test");
- fail("Should have received JMSException");
- } catch (JMSException e) {
- }
-
- connection.close();
- connection = factory.createConnection();
- connection.setClientID("test");
-
- connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- try {
- connection.setClientID("test");
- fail("Should have received JMSException");
- } catch (JMSException e) {
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/ConsumerReceiveWithTimeoutTest.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/ConsumerReceiveWithTimeoutTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/ConsumerReceiveWithTimeoutTest.java
deleted file mode 100644
index 93e0615..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/ConsumerReceiveWithTimeoutTest.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq;
-
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.Topic;
-
-public class ConsumerReceiveWithTimeoutTest extends TestSupport {
-
- private Connection connection;
-
- protected void setUp() throws Exception {
- super.setUp();
- connection = createConnection();
- }
-
- /**
- * @see junit.framework.TestCase#tearDown()
- */
- protected void tearDown() throws Exception {
- if (connection != null) {
- connection.close();
- connection = null;
- }
- super.tearDown();
- }
-
- /**
- * Test to check if consumer thread wakes up inside a receive(timeout) after
- * a message is dispatched to the consumer
- *
- * @throws javax.jms.JMSException
- */
- public void testConsumerReceiveBeforeMessageDispatched() throws JMSException {
-
- connection.start();
-
- final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- final Topic queue = session.createTopic("test");
- MessageConsumer consumer = session.createConsumer(queue);
-
- Thread t = new Thread() {
- public void run() {
- try {
- // wait for 10 seconds to allow consumer.receive to be run
- // first
- Thread.sleep(10000);
- MessageProducer producer = session.createProducer(queue);
- producer.send(session.createTextMessage("Hello"));
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- };
-
- t.start();
-
- // Consume the message...
- Message msg = consumer.receive(60000);
- assertNotNull(msg);
- session.close();
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/ExpiryHogTest.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/ExpiryHogTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/ExpiryHogTest.java
deleted file mode 100644
index 6856837..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/ExpiryHogTest.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq;
-
-import java.util.concurrent.TimeUnit;
-import javax.jms.ConnectionFactory;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-
-
-
-
-/**
- * User: gtully
- */
-public class ExpiryHogTest extends JmsMultipleClientsTestSupport {
- boolean sleep = false;
-
- int numMessages = 4;
-
- public void testImmediateDispatchWhenCacheDisabled() throws Exception {
- ConnectionFactory f = createConnectionFactory();
- destination = createDestination();
- startConsumers(f, destination);
- sleep = true;
- this.startProducers(f, destination, numMessages);
- allMessagesList.assertMessagesReceived(numMessages);
- }
-
- protected TextMessage createTextMessage(Session session, String initText) throws Exception {
- if (sleep) {
- TimeUnit.SECONDS.sleep(10);
- }
- TextMessage msg = super.createTextMessage(session, initText);
- // what is the point of setting this !
- // msg.setJMSExpiration(4000);
- return msg;
- }
-
- @Override
- protected void setUp() throws Exception {
- autoFail = false;
- persistent = true;
- super.setUp();
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/JMSConsumerTest.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/JMSConsumerTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/JMSConsumerTest.java
deleted file mode 100644
index 408b40e..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/JMSConsumerTest.java
+++ /dev/null
@@ -1,936 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq;
-
-import java.lang.Thread.UncaughtExceptionHandler;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import javax.jms.BytesMessage;
-import javax.jms.DeliveryMode;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-
-
-
-
-import junit.framework.Test;
-
-
-import javax.jms.Destination;
-
-import org.apache.hedwig.jms.MessagingSessionFacade;
-import org.apache.hedwig.jms.SessionImpl;
-import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl;
-import org.apache.hedwig.jms.spi.HedwigConnectionImpl;
-import org.junit.Ignore;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Test cases used to test the JMS message consumer.
- */
-public class JMSConsumerTest extends JmsTestSupport {
-
- private static final Logger LOG = LoggerFactory.getLogger(JMSConsumerTest.class);
-
- public Destination destination;
- public int deliveryMode;
- public int prefetch;
- public int ackMode;
- public MessagingSessionFacade.DestinationType destinationType;
- public boolean durableConsumer;
-
- public static Test suite() {
- return suite(JMSConsumerTest.class);
- }
-
- public static void main(String[] args) {
- junit.textui.TestRunner.run(suite());
- }
-
- public void initCombosForTestMessageListenerWithConsumerCanBeStopped() {
- addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
- Integer.valueOf(DeliveryMode.PERSISTENT)});
- addCombinationValues("destinationType", new MessagingSessionFacade.DestinationType[] {
- MessagingSessionFacade.DestinationType.TOPIC});
- }
-
- public void testMessageListenerWithConsumerCanBeStopped() throws Exception {
-
- final AtomicInteger counter = new AtomicInteger(0);
- final CountDownLatch done1 = new CountDownLatch(1);
- final CountDownLatch done2 = new CountDownLatch(1);
-
- // Receive a message with the JMS API
- connection.start();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- destination = createDestination(session, destinationType);
- MessageConsumer consumer = (MessageConsumer)session.createConsumer(destination);
- consumer.setMessageListener(new MessageListener() {
- public void onMessage(Message m) {
- counter.incrementAndGet();
- if (counter.get() == 1) {
- done1.countDown();
- }
- if (counter.get() == 2) {
- done2.countDown();
- }
- }
- });
-
- // Send a first message to make sure that the consumer dispatcher is
- // running
- sendMessages(session, destination, 1);
- assertTrue(done1.await(1, TimeUnit.SECONDS));
- assertEquals(1, counter.get());
-
- // Stop the consumer.
- connection.stop();
-
- // Send a message, but should not get delivered.
- sendMessages(session, destination, 1);
- assertFalse(done2.await(1, TimeUnit.SECONDS));
- assertEquals(1, counter.get());
-
- // Start the consumer, and the message should now get delivered.
- connection.start();
- assertTrue(done2.await(1, TimeUnit.SECONDS));
- assertEquals(2, counter.get());
- }
-
- public void testMessageListenerWithConsumerCanBeStoppedConcurently() throws Exception {
-
- final AtomicInteger counter = new AtomicInteger(0);
- final CountDownLatch closeDone = new CountDownLatch(1);
-
- connection.start();
- Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- destination = createDestination(session, MessagingSessionFacade.DestinationType.TOPIC);
-
- final Map<Thread, Throwable> exceptions =
- Collections.synchronizedMap(new HashMap<Thread, Throwable>());
- Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler() {
- public void uncaughtException(Thread t, Throwable e) {
- LOG.error("Uncaught exception:", e);
- exceptions.put(t, e);
- }
- });
-
- final int numOutStanding = (connection.getHedwigClientConfig().getMaximumOutstandingMessages() * 2 / 3) + 1;
-
- final MessageConsumer consumer = (MessageConsumer)session.createConsumer(destination);
-
- final class AckAndClose implements Runnable {
- private Message message;
-
- public AckAndClose(Message m) {
- this.message = m;
- }
-
- public void run() {
- try {
- message.acknowledge();
- int count = counter.incrementAndGet();
- if (590 == count) {
- // close in a separate thread is ok by jms
- consumer.close();
- closeDone.countDown();
- }
- } catch (Exception e) {
- LOG.error("Exception on close or ack:", e);
- exceptions.put(Thread.currentThread(), e);
- }
- }
- };
-
- final AtomicInteger listenerReceivedCount = new AtomicInteger(0);
- // final ExecutorService executor = Executors.newSingleThreadExecutor();
- final ExecutorService executor = Executors.newCachedThreadPool();
- consumer.setMessageListener(new MessageListener() {
- public void onMessage(Message m) {
- // close can be in a different thread, but NOT acknowledge iirc
- // - this will not cause a problem for us though ...
- // ack and close eventually in separate thread
- int val = listenerReceivedCount.incrementAndGet();
- // System.out.println("message count : " + val + ", message : " + m);
- executor.execute(new AckAndClose(m));
- // new AckAndClose(m).run();
- }
- });
-
- // preload the queue
- sendMessages(session, destination, 600);
-
- assert closeDone.await(10, TimeUnit.SECONDS) :
- "closeDone : " + closeDone.getCount() + ", counter : " + counter.get()
- + ", listenerReceivedCount : " + listenerReceivedCount.get();
- // await possible exceptions
- Thread.sleep(1000);
- assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
- }
-
-
- public void initCombosForTestMutiReceiveWithPrefetch1() {
- addCombinationValues("deliveryMode", new Object[] {
- Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
- addCombinationValues("ackMode", new Object[] {
- Integer.valueOf(Session.AUTO_ACKNOWLEDGE), Integer.valueOf(Session.DUPS_OK_ACKNOWLEDGE),
- Integer.valueOf(Session.CLIENT_ACKNOWLEDGE)});
- addCombinationValues("destinationType", new Object[] {MessagingSessionFacade.DestinationType.TOPIC});
- }
-
- public void testMutiReceiveWithPrefetch1() throws Exception {
-
- // Set prefetch to 1
- connection.start();
-
- // Use all the ack modes
- Session session = connection.createSession(false, ackMode);
- destination = createDestination(session, destinationType);
- MessageConsumer consumer = session.createConsumer(destination);
-
- // Send the messages
- sendMessages(session, destination, 4);
-
- // Make sure 4 messages were delivered.
- Message message = null;
- for (int i = 0; i < 4; i++) {
- message = consumer.receive(1000);
- assertNotNull(message);
- }
- assertNull(consumer.receiveNoWait());
- assert null != message;
- message.acknowledge();
- }
-
- public void initCombosForTestDurableConsumerSelectorChange() {
- addCombinationValues("deliveryMode", new Object[] {
- Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
- addCombinationValues("destinationType", new Object[] {MessagingSessionFacade.DestinationType.TOPIC});
- }
-
- public void testDurableConsumerSelectorChange() throws Exception {
-
- // Receive a message with the JMS API
- if (null == connection.getClientID()) connection.setClientID(getName() + "test");
- connection.start();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- destination = createDestination(session, destinationType);
- MessageProducer producer = session.createProducer(destination);
- producer.setDeliveryMode(deliveryMode);
- MessageConsumer consumer = session.createDurableSubscriber((Topic)destination, "test", "color='red'", false);
-
- // Send the messages
- TextMessage message = session.createTextMessage("1st");
- message.setStringProperty("color", "red");
- producer.send(message);
-
- Message m = consumer.receive(1000);
- assertNotNull(m);
- assertEquals("1st", ((TextMessage)m).getText());
-
- // Change the subscription.
- consumer.close();
- consumer = session.createDurableSubscriber((Topic)destination, "test", "color='blue'", false);
-
- message = session.createTextMessage("2nd");
- message.setStringProperty("color", "red");
- producer.send(message);
- message = session.createTextMessage("3rd");
- message.setStringProperty("color", "blue");
- producer.send(message);
-
- // Selector should skip the 2nd message.
- m = consumer.receive(1000);
- assertNotNull(m);
- assertEquals("3rd", ((TextMessage)m).getText());
-
- assertNull(consumer.receiveNoWait());
- }
-
- public void initCombosForTestSendReceiveBytesMessage() {
- addCombinationValues("deliveryMode", new Object[] {
- Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
- addCombinationValues("destinationType", new Object[] {MessagingSessionFacade.DestinationType.TOPIC});
- }
-
- public void testSendReceiveBytesMessage() throws Exception {
-
- // Receive a message with the JMS API
- connection.start();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- destination = createDestination(session, destinationType);
- MessageConsumer consumer = session.createConsumer(destination);
- MessageProducer producer = session.createProducer(destination);
-
- BytesMessage message = session.createBytesMessage();
- message.writeBoolean(true);
- message.writeBoolean(false);
- producer.send(message);
-
- // Make sure only 1 message was delivered.
- BytesMessage m = (BytesMessage)consumer.receive(1000);
- assertNotNull(m);
- assertTrue(m.readBoolean());
- assertFalse(m.readBoolean());
-
- assertNull(consumer.receiveNoWait());
- }
-
- public void initCombosForTestSetMessageListenerAfterStart() {
- addCombinationValues("deliveryMode", new Object[] {
- Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
- addCombinationValues("destinationType", new Object[] {MessagingSessionFacade.DestinationType.TOPIC});
- }
-
- public void testSetMessageListenerAfterStart() throws Exception {
-
- final AtomicInteger counter = new AtomicInteger(0);
- final CountDownLatch done = new CountDownLatch(1);
-
- // Receive a message with the JMS API
- connection.start();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- destination = createDestination(session, destinationType);
- MessageConsumer consumer = session.createConsumer(destination);
-
- // See if the message get sent to the listener
- consumer.setMessageListener(new MessageListener() {
- public void onMessage(Message m) {
- counter.incrementAndGet();
- if (counter.get() == 4) {
- done.countDown();
- }
- }
- });
-
- // Send the messages
- sendMessages(session, destination, 4);
-
- assertTrue(done.await(1000, TimeUnit.MILLISECONDS));
- Thread.sleep(200);
-
- // Make sure only 4 messages were delivered.
- assertEquals(4, counter.get());
- }
-
- public void initCombosForTestPassMessageListenerIntoCreateConsumer() {
- addCombinationValues("destinationType", new Object[] {MessagingSessionFacade.DestinationType.TOPIC});
- }
-
- public void testPassMessageListenerIntoCreateConsumer() throws Exception {
-
- final AtomicInteger counter = new AtomicInteger(0);
- final CountDownLatch done = new CountDownLatch(1);
-
- // Receive a message with the JMS API
- connection.start();
- SessionImpl session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- destination = createDestination(session, destinationType);
- MessageConsumer consumer = session.createConsumer(destination);
- consumer.setMessageListener(new MessageListener() {
- public void onMessage(Message m) {
- counter.incrementAndGet();
- if (counter.get() == 4) {
- done.countDown();
- }
- }
- });
-
- // Send the messages
- sendMessages(session, destination, 4);
-
- assertTrue(done.await(1000, TimeUnit.MILLISECONDS));
- Thread.sleep(200);
-
- // Make sure only 4 messages were delivered.
- assertEquals(4, counter.get());
- }
-
- public void initCombosForTestMessageListenerOnMessageCloseUnackedWithPrefetch1StayInQueue() {
- addCombinationValues("deliveryMode", new Object[] {
- Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
- addCombinationValues("ackMode", new Object[] {Integer.valueOf(Session.CLIENT_ACKNOWLEDGE)});
- addCombinationValues("destinationType", new Object[] {MessagingSessionFacade.DestinationType.TOPIC});
- }
-
- public void testMessageListenerOnMessageCloseUnackedWithPrefetch1StayInQueue() throws Exception {
- final AtomicInteger counter = new AtomicInteger(0);
- final CountDownLatch sendDone = new CountDownLatch(1);
- final CountDownLatch got2Done = new CountDownLatch(1);
-
- // Set prefetch to 1
- // This test case does not work if optimized message dispatch is used as
- // the main thread send block until the consumer receives the
- // message. This test depends on thread decoupling so that the main
- // thread can stop the consumer thread.
- if (null == connection.getClientID()) connection.setClientID(getName() + "test-client-id-1");
- connection.start();
-
- // Use all the ack modes
- Session session = connection.createSession(false, ackMode);
- destination = createDestination(session, destinationType);
- MessageConsumer consumer = session.createDurableSubscriber((Topic) destination, "subscriber-id1");
- consumer.setMessageListener(new MessageListener() {
- private final HedwigConnectionImpl _connection = connection;
- public void onMessage(Message m) {
- try {
- TextMessage tm = (TextMessage)m;
- LOG.info("Got in first listener: " + tm.getText());
- assertEquals(messageTextPrefix + counter.get(), tm.getText());
- counter.incrementAndGet();
- if (counter.get() == 2) {
- sendDone.await();
- _connection.close();
- got2Done.countDown();
- }
- // will fail when we close connection when counter == 2 !
- tm.acknowledge();
- } catch (Throwable e) {
- // e.printStackTrace();
- }
- }
- });
-
- // Send the messages
- sendMessages(session, destination, 4);
- sendDone.countDown();
-
- // Wait for first 2 messages to arrive.
- assert got2Done.await(5, TimeUnit.SECONDS) :
- "counter1 : " + counter.get() + ", got2Done : " + got2Done.getCount() + ", sendDone : " + sendDone.getCount();
-
- // Re-start connection.
- connection.close();
- connection = (HedwigConnectionImpl)factory.createConnection();
- if (null == connection.getClientID()) connection.setClientID(getName() + "test-client-id-1");
- connections.add(connection);
-
- // Pickup the remaining messages.
- final CountDownLatch done2 = new CountDownLatch(1);
- session = connection.createSession(false, ackMode);
- consumer = session.createDurableSubscriber((Topic) destination, "subscriber-id1");
- consumer.setMessageListener(new MessageListener() {
- public void onMessage(Message m) {
- try {
- TextMessage tm = (TextMessage)m;
- LOG.info("Got in second listener: " + tm.getText());
- // order is not guaranteed as the connection is started before the listener is set.
- // assertEquals(messageTextPrefix + counter.get(), tm.getText());
- counter.incrementAndGet();
- tm.acknowledge();
- if (counter.get() == 4) {
- done2.countDown();
- }
- } catch (Throwable e) {
- LOG.error("unexpected ex onMessage: ", e);
- }
- }
- });
-
- connection.start();
-
- assert done2.await(2000, TimeUnit.MILLISECONDS) :
- "count2 : " + done2.getCount() + ", counter : " + counter.get();
- Thread.sleep(200);
-
- // assert msg 2 was redelivered as close() from onMessages() will only ack in auto_ack and dups_ok mode
- assert 5 == counter.get(): "count3 : " + done2.getCount() + ", counter : " + counter.get();
- }
-
- public void initCombosForTestMessageListenerAutoAckOnCloseWithPrefetch1() {
- addCombinationValues("deliveryMode", new Object[] {
- Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
- addCombinationValues("ackMode", new Object[] {
- Integer.valueOf(Session.AUTO_ACKNOWLEDGE), Integer.valueOf(Session.CLIENT_ACKNOWLEDGE)});
- addCombinationValues("destinationType", new Object[] {MessagingSessionFacade.DestinationType.TOPIC});
- }
-
- public void testMessageListenerAutoAckOnCloseWithPrefetch1() throws Exception {
- final AtomicInteger counter = new AtomicInteger(0);
- final CountDownLatch sendDone = new CountDownLatch(1);
- final CountDownLatch got2Done = new CountDownLatch(1);
-
- // Set prefetch to 1
- // This test case does not work if optimized message dispatch is used as
- // the main thread send block until the consumer receives the
- // message. This test depends on thread decoupling so that the main
- // thread can stop the consumer thread.
- if (null == connection.getClientID()) connection.setClientID(getName() + "test-client-id-2");
- connection.start();
-
- // Use all the ack modes
- Session session = connection.createSession(false, ackMode);
- destination = createDestination(session, destinationType);
- MessageConsumer consumer = session.createDurableSubscriber((Topic) destination, "subscriber-id2");
- final List<Message> receivedMessages = new ArrayList<Message>(8);
- consumer.setMessageListener(new MessageListener() {
- final HedwigConnectionImpl _connection = connection;
- public void onMessage(Message m) {
- try {
- TextMessage tm = (TextMessage)m;
- LOG.info("Got in first listener: " + tm.getText());
- assertEquals(messageTextPrefix + counter.get(), tm.getText());
- counter.incrementAndGet();
- m.acknowledge();
- receivedMessages.add(m);
- if (counter.get() == 2) {
- sendDone.await();
- _connection.close();
- got2Done.countDown();
- }
- } catch (Throwable e) {
- e.printStackTrace();
- }
- }
- });
-
- // Send the messages
- sendMessages(session, destination, 4);
- sendDone.countDown();
-
- // Wait for first 2 messages to arrive.
- assert got2Done.await(5, TimeUnit.SECONDS) :
- "counter : " + counter.get() + ", got2Done : " + got2Done.getCount() + ", sendDone : " + sendDone.getCount();
-
- // Re-start connection.
- connection.close();
- connection = (HedwigConnectionImpl)factory.createConnection();
- if (null == connection.getClientID()) connection.setClientID(getName() + "test-client-id-2");
- connections.add(connection);
-
- // Pickup the remaining messages.
- final CountDownLatch done2 = new CountDownLatch(1);
- session = connection.createSession(false, ackMode);
- consumer = session.createDurableSubscriber((Topic) destination, "subscriber-id2");
- consumer.setMessageListener(new MessageListener() {
- public void onMessage(Message m) {
- try {
- TextMessage tm = (TextMessage)m;
- LOG.info("Got in second listener: " + tm.getText());
- counter.incrementAndGet();
- m.acknowledge();
- receivedMessages.add(m);
- if (counter.get() == 4) {
- done2.countDown();
- }
- } catch (Throwable e) {
- LOG.error("unexpected ex onMessage: ", e);
- }
- }
- });
-
- connection.start();
-
- assert done2.await(5, TimeUnit.SECONDS) : "count : " + done2.getCount() + ", counter : " + counter.get();
- Thread.sleep(200);
-
- // close from onMessage with Auto_ack will ack
- // Make sure only 4 messages were delivered.
- assert 4 == counter.get() :
- "counter : " + counter.get() + ", got2Done : " + got2Done.getCount() + ", sendDone : "
- + sendDone.getCount() + ", messages : " + receivedMessages;
- }
-
- public void initCombosForTestMessageListenerWithConsumerWithPrefetch1() {
- addCombinationValues("deliveryMode", new Object[] {
- Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
- addCombinationValues("destinationType", new Object[] {MessagingSessionFacade.DestinationType.TOPIC});
- }
-
- public void testMessageListenerWithConsumerWithPrefetch1() throws Exception {
-
- final AtomicInteger counter = new AtomicInteger(0);
- final CountDownLatch done = new CountDownLatch(1);
-
- // Receive a message with the JMS API
- connection.start();
-
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- destination = createDestination(session, destinationType);
- MessageConsumer consumer = session.createConsumer(destination);
- consumer.setMessageListener(new MessageListener() {
- public void onMessage(Message m) {
- counter.incrementAndGet();
- if (counter.get() == 4) {
- done.countDown();
- }
- }
- });
-
- // Send the messages
- sendMessages(session, destination, 4);
-
- assertTrue(done.await(1000, TimeUnit.MILLISECONDS));
- Thread.sleep(200);
-
- // Make sure only 4 messages were delivered.
- assertEquals(4, counter.get());
- }
-
- public void initCombosForTestMessageListenerWithConsumer() {
- addCombinationValues("deliveryMode", new Object[] {
- Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
- addCombinationValues("destinationType", new Object[] {MessagingSessionFacade.DestinationType.TOPIC});
- }
-
- public void testMessageListenerWithConsumer() throws Exception {
-
- final AtomicInteger counter = new AtomicInteger(0);
- final CountDownLatch done = new CountDownLatch(1);
-
- // Receive a message with the JMS API
- connection.start();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- destination = createDestination(session, destinationType);
- MessageConsumer consumer = session.createConsumer(destination);
- consumer.setMessageListener(new MessageListener() {
- public void onMessage(Message m) {
- counter.incrementAndGet();
- if (counter.get() == 4) {
- done.countDown();
- }
- }
- });
-
- // Send the messages
- sendMessages(session, destination, 4);
-
- assertTrue(done.await(1000, TimeUnit.MILLISECONDS));
- Thread.sleep(200);
-
- // Make sure only 4 messages were delivered.
- assertEquals(4, counter.get());
- }
-
- public void initCombosForTestUnackedWithPrefetch1StayInQueue() {
- addCombinationValues("deliveryMode", new Object[] {
- Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
- addCombinationValues("ackMode", new Object[] {
- Integer.valueOf(Session.AUTO_ACKNOWLEDGE), Integer.valueOf(Session.DUPS_OK_ACKNOWLEDGE),
- Integer.valueOf(Session.CLIENT_ACKNOWLEDGE)});
- addCombinationValues("destinationType", new Object[] {MessagingSessionFacade.DestinationType.TOPIC});
- }
-
- public void testUnackedWithPrefetch1StayInQueue() throws Exception {
-
- // Set prefetch to 1
- if (null == connection.getClientID()) connection.setClientID(getName() + "test-client-id-3");
- connection.start();
-
- // Use all the ack modes
- Session session = connection.createSession(false, ackMode);
- destination = createDestination(session, destinationType);
- MessageConsumer consumer = session.createDurableSubscriber((Topic) destination, "subscriber-id3");
-
- // Send the messages
- sendMessages(session, destination, 4);
-
- // Only pick up the first 2 messages.
- Message message = null;
- for (int i = 0; i < 2; i++) {
- message = consumer.receive(1000);
- assertNotNull(message);
- assert (message instanceof TextMessage);
- assert (((TextMessage) message).getText().equals(messageTextPrefix + i))
- : "Received message " + ((TextMessage) message).getText() + " .. i = " + i;
- }
- assert null != message;
- message.acknowledge();
-
- connection.close();
- connection = (HedwigConnectionImpl)factory.createConnection();
- if (null == connection.getClientID()) connection.setClientID(getName() + "test-client-id-3");
- // Use all the ack modes
- session = connection.createSession(false, ackMode);
- consumer = session.createDurableSubscriber((Topic) destination, "subscriber-id3");
- connections.add(connection);
- connection.start();
-
- // Pickup the rest of the messages.
- for (int i = 0; i < 2; i++) {
- message = consumer.receive(1000);
- assertNotNull(message);
- assert (message instanceof TextMessage);
- assert (((TextMessage) message).getText().equals(messageTextPrefix + (i + 2))) :
- "Received message " + ((TextMessage) message).getText() + " .. i = " + i;
- }
- message.acknowledge();
- // assertNull(consumer.receiveNoWait());
- {
- Message msg = consumer.receiveNoWait();
- assert null == msg : "Unexpected message " + msg;
- }
-
- }
-
- public void initCombosForTestPrefetch1MessageNotDispatched() {
- addCombinationValues("deliveryMode", new Object[] {
- Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
- }
-
- public void testPrefetch1MessageNotDispatched() throws Exception {
-
- // Set prefetch to 1
- connection.start();
-
- Session session = connection.createSession(true, 0);
- destination = SessionImpl.asTopic("TEST");
- MessageConsumer consumer = session.createConsumer(destination);
-
- // The prefetch should fill up with 1 message.
- // Since prefetch is still full, the 2nd message should get dispatched
- // to another consumer.. lets create the 2nd consumer test that it does
- // make sure it does.
- HedwigConnectionImpl connection2 = (HedwigConnectionImpl)factory.createConnection();
- connection2.start();
- connections.add(connection2);
- Session session2 = connection2.createSession(true, 0);
- MessageConsumer consumer2 = session2.createConsumer(destination);
-
- // Send 2 messages to the destination.
- sendMessages(session, destination, 2);
- session.commit();
-
- // Pick up the first message.
- Message message1 = consumer.receive(1000);
- assertNotNull(message1);
- assertNotNull(consumer.receive(1000));
-
- // Pick up the 2nd messages.
- Message message2 = consumer2.receive(5000);
- assertNotNull(message2);
- assertNotNull(consumer2.receive(1000));
-
- session.commit();
- session2.commit();
-
- assertNull(consumer.receiveNoWait());
-
- }
-
- public void initCombosForTestDontStart() {
- addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT)});
- addCombinationValues("destinationType", new Object[] { MessagingSessionFacade.DestinationType.TOPIC});
- }
-
- public void testDontStart() throws Exception {
-
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- destination = createDestination(session, destinationType);
- MessageConsumer consumer = session.createConsumer(destination);
-
- // Send the messages
- sendMessages(session, destination, 1);
-
- // Make sure no messages were delivered.
- assertNull(consumer.receive(1000));
- }
-
- public void initCombosForTestStartAfterSend() {
- addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT)});
- addCombinationValues("destinationType", new Object[] {MessagingSessionFacade.DestinationType.TOPIC});
- }
-
- public void testStartAfterSend() throws Exception {
-
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- destination = createDestination(session, destinationType);
- MessageConsumer consumer = session.createConsumer(destination);
-
- // Send the messages
- sendMessages(session, destination, 1);
-
- // Start the conncection after the message was sent.
- connection.start();
-
- // Make sure only 1 message was delivered.
- assertNotNull(consumer.receive(1000));
- assertNull(consumer.receiveNoWait());
- }
-
- public void initCombosForTestReceiveMessageWithConsumer() {
- addCombinationValues("deliveryMode", new Object[] {
- Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
- addCombinationValues("destinationType", new Object[] {MessagingSessionFacade.DestinationType.TOPIC});
- }
-
- public void testReceiveMessageWithConsumer() throws Exception {
-
- // Receive a message with the JMS API
- connection.start();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- destination = createDestination(session, destinationType);
- MessageConsumer consumer = session.createConsumer(destination);
-
- // Send the messages
- sendMessages(session, destination, 1);
-
- // Make sure only 1 message was delivered.
- Message m = consumer.receive(1000);
- assertNotNull(m);
- assertEquals("0", ((TextMessage)m).getText());
- assertNull(consumer.receiveNoWait());
- }
-
-
- public void testDupsOkConsumer() throws Exception {
-
- // Receive a message with the JMS API
- connection.start();
- Session session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
- destination = createDestination(session, MessagingSessionFacade.DestinationType.TOPIC);
- MessageConsumer consumer = session.createDurableSubscriber((Topic) destination, "subscriber-id4");
-
- // Send the messages
- sendMessages(session, destination, 4);
-
- // Make sure only 4 message are delivered.
- for( int i=0; i < 4; i++){
- Message m = consumer.receive(1000);
- assertNotNull(m);
- }
- assertNull(consumer.receive(1000));
-
- // Close out the consumer.. no other messages should be left on the queue.
- consumer.close();
-
- consumer = session.createDurableSubscriber((Topic) destination, "subscriber-id4");
- assertNull(consumer.receive(1000));
- }
-
- public void testRedispatchOfUncommittedTx() throws Exception {
-
- connection.start();
- Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
- destination = createDestination(session, MessagingSessionFacade.DestinationType.TOPIC);
- MessageConsumer consumer = session.createDurableSubscriber((Topic) destination, "subscriber-id2");
-
- sendMessages(connection, destination, 2);
-
- assertNotNull(consumer.receive(1000));
- assertNotNull(consumer.receive(1000));
-
- // install another consumer while message dispatch is unacked/uncommitted
-
- // no commit so will auto rollback and get re-dispatched to redisptachConsumer
- session.close();
-
- Session redispatchSession = connection.createSession(true, Session.SESSION_TRANSACTED);
- MessageConsumer redispatchConsumer
- = redispatchSession.createDurableSubscriber((Topic) destination, "subscriber-id2");
-
- Message msg = redispatchConsumer.receive(1000);
- assertNotNull(msg);
- // assertTrue("redelivered flag set", msg.getJMSRedelivered());
-
- msg = redispatchConsumer.receive(1000);
- assertNotNull(msg);
- // assertTrue(msg.getJMSRedelivered());
- redispatchSession.commit();
-
- assertNull(redispatchConsumer.receive(500));
- redispatchSession.close();
- }
-
-
- public void testRedispatchOfRolledbackTx() throws Exception {
-
- connection.start();
- Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
- destination = createDestination(session, MessagingSessionFacade.DestinationType.TOPIC);
- MessageConsumer consumer = session.createDurableSubscriber((Topic) destination, "subscriber-id1");
-
- sendMessages(connection, destination, 2);
-
- assertNotNull(consumer.receive(1000));
- assertNotNull(consumer.receive(1000));
-
- // install another consumer while message dispatch is unacked/uncommitted
-
- session.rollback();
- session.close();
-
- Session redispatchSession = connection.createSession(true, Session.SESSION_TRANSACTED);
- MessageConsumer redispatchConsumer
- = redispatchSession.createDurableSubscriber((Topic) destination, "subscriber-id1");
-
- Message msg = redispatchConsumer.receive(1000);
- assertNotNull(msg);
- // assertTrue(msg.getJMSRedelivered());
- msg = redispatchConsumer.receive(1000);
- assertNotNull(msg);
- // assertTrue(msg.getJMSRedelivered());
- redispatchSession.commit();
-
- assertNull(redispatchConsumer.receive(500));
- redispatchSession.close();
- }
-
- public void initCombosForTestAckOfExpired() {
- addCombinationValues("destinationType",
- new Object[] {MessagingSessionFacade.DestinationType.TOPIC});
- }
-
- public void testAckOfExpired() throws Exception {
- HedwigConnectionFactoryImpl fact = new HedwigConnectionFactoryImpl();
- connection = fact.createConnection();
-
- connection.start();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- destination = (Destination) (destinationType == MessagingSessionFacade.DestinationType.QUEUE ?
- session.createTopic("test") : session.createTopic("test"));
-
- MessageConsumer consumer = session.createConsumer(destination);
-
- Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = sendSession.createProducer(destination);
- final int count = 4;
-
-
- // producer.setTimeToLive(0);
- for (int i = 0; i < count; i++) {
- TextMessage message = sendSession.createTextMessage("no expiry" + i);
- producer.send(message);
- }
-
- MessageConsumer amqConsumer = (MessageConsumer) consumer;
-
- for(int i=0; i<count; i++) {
- TextMessage msg = (TextMessage) amqConsumer.receive();
- assertNotNull(msg);
- assertTrue("message has \"no expiry\" text: " + msg.getText(), msg.getText().contains("no expiry"));
-
- // force an ack when there are expired messages
- msg.acknowledge();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/JMSDurableTopicRedeliverTest.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/JMSDurableTopicRedeliverTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/JMSDurableTopicRedeliverTest.java
deleted file mode 100644
index 343919a..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/JMSDurableTopicRedeliverTest.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq;
-
-import javax.jms.Topic;
-import javax.jms.Message;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class JMSDurableTopicRedeliverTest extends JmsTopicRedeliverTest {
-
- private static final Logger LOG = LoggerFactory.getLogger(JMSDurableTopicRedeliverTest.class);
-
- protected void setUp() throws Exception {
- durable = true;
- super.setUp();
- }
-
- /**
- * Sends and consumes the messages.
- *
- * @throws Exception
- */
- public void testRedeliverNewSession() throws Exception {
- String text = "TEST: " + System.currentTimeMillis();
- Message sendMessage = session.createTextMessage(text);
-
- if (verbose) {
- LOG.info("About to send a message: " + sendMessage + " with text: " + text);
- }
- producer.send(producerDestination, sendMessage);
-
- // receive but don't acknowledge
- Message unackMessage = consumer.receive(1000);
- assertNotNull(unackMessage);
- String unackId = unackMessage.getJMSMessageID();
- assertEquals(((TextMessage)unackMessage).getText(), text);
- assertFalse(unackMessage.getJMSRedelivered());
- consumeSession.close();
- consumer.close();
-
- // receive then acknowledge
- consumeSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- consumer = createConsumer();
- Message ackMessage = consumer.receive(1000);
- assertNotNull(ackMessage);
- ackMessage.acknowledge();
- String ackId = ackMessage.getJMSMessageID();
- assertEquals(((TextMessage)ackMessage).getText(), text);
- // assertTrue(ackMessage.getJMSRedelivered());
- assertEquals(unackId, ackId);
- consumeSession.close();
- consumer.close();
-
- consumeSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- consumer = createConsumer();
- assertNull(consumer.receive(1000));
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/JMSIndividualAckTest.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/JMSIndividualAckTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/JMSIndividualAckTest.java
deleted file mode 100644
index 24551f3..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/JMSIndividualAckTest.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq;
-
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-
-/**
- *
- */
-public class JMSIndividualAckTest extends TestSupport {
-
- private Connection connection;
-
- protected void setUp() throws Exception {
- super.setUp();
- connection = createConnection();
- }
-
- /**
- * @see junit.framework.TestCase#tearDown()
- */
- protected void tearDown() throws Exception {
- if (connection != null) {
- connection.close();
- connection = null;
- }
- super.tearDown();
- }
-
- /**
- * Tests if acknowledged messages are being consumed.
- *
- * @throws JMSException
- */
- public void testAckedMessageAreConsumed() throws JMSException {
- connection.start();
- Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- Topic queue = session.createTopic(getQueueName());
- MessageProducer producer = session.createProducer(queue);
- MessageConsumer consumer = session.createDurableSubscriber(queue, "subscriber-id1");
-
- producer.send(session.createTextMessage("Hello"));
-
- // Consume the message...
- Message msg = consumer.receive(1000);
- assertNotNull(msg);
- msg.acknowledge();
-
- // Reset the session.
- session.close();
- session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-
- // Attempt to Consume the message...
- consumer = session.createDurableSubscriber(queue, "subscriber-id1");
- msg = consumer.receive(1000);
- assertNull(msg);
-
- session.close();
- }
-
- /**
- * Tests if acknowledged messages are being consumed.
- *
- * @throws JMSException
- */
- // This test cant, unfortunately, pass
- //- in hedwig, acknowledge is a ACKNOWLEDGE UNTIL. So the last ack will ack all messages until then ...
- /*
- public void testLastMessageAcked() throws JMSException {
- connection.start();
- Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- Topic queue = session.createTopic(getQueueName());
- MessageProducer producer = session.createProducer(queue);
- MessageConsumer consumer = session.createDurableSubscriber(queue, "subscriber-id2");
- TextMessage msg1 = session.createTextMessage("msg1");
- TextMessage msg2 = session.createTextMessage("msg2");
- TextMessage msg3 = session.createTextMessage("msg3");
- producer.send(msg1);
- producer.send(msg2);
- producer.send(msg3);
-
- // Consume the message...
- Message msg = consumer.receive(1000);
- assertNotNull(msg);
- msg = consumer.receive(1000);
- assertNotNull(msg);
- msg = consumer.receive(1000);
- assertNotNull(msg);
- msg.acknowledge();
-
- // Reset the session.
- session.close();
- session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-
- // Attempt to Consume the message...
- consumer = session.createDurableSubscriber(queue, "subscriber-id2");
- msg = consumer.receive(1000);
- assertNotNull(msg);
- assertEquals(msg1,msg);
- msg = consumer.receive(1000);
- assertNotNull(msg);
- assertEquals(msg2,msg);
- msg = consumer.receive(1000);
- assertNull(msg);
- session.close();
- }
- */
-
- /**
- * Tests if unacknowledged messages are being re-delivered when the consumer connects again.
- *
- * @throws JMSException
- */
- public void testUnAckedMessageAreNotConsumedOnSessionClose() throws JMSException {
- connection.start();
- Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- Topic queue = session.createTopic(getQueueName());
- MessageProducer producer = session.createProducer(queue);
- MessageConsumer consumer = session.createDurableSubscriber(queue, "subscriber-id3");
- producer.send(session.createTextMessage("Hello"));
-
- // Consume the message...
- Message msg = consumer.receive(1000);
- assertNotNull(msg);
- // Don't ack the message.
-
- // Reset the session. This should cause the unacknowledged message to be re-delivered.
- session.close();
- session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-
- // Attempt to Consume the message...
- consumer = session.createDurableSubscriber(queue, "subscriber-id3");
- msg = consumer.receive(2000);
- assertNotNull(msg);
- msg.acknowledge();
-
- session.close();
- }
-
- protected String getQueueName() {
- return getClass().getName() + "." + getName();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/JMSMessageTest.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/JMSMessageTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/JMSMessageTest.java
deleted file mode 100644
index cee1698..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/JMSMessageTest.java
+++ /dev/null
@@ -1,504 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq;
-
-import java.net.URISyntaxException;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.Vector;
-
-import javax.jms.BytesMessage;
-import javax.jms.ConnectionFactory;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.MapMessage;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageEOFException;
-import javax.jms.MessageFormatException;
-import javax.jms.MessageProducer;
-import javax.jms.ObjectMessage;
-import javax.jms.Session;
-import javax.jms.StreamMessage;
-import javax.jms.TextMessage;
-
-import junit.framework.Test;
-import org.apache.hedwig.jms.MessagingSessionFacade;
-import org.apache.hedwig.jms.message.StreamMessageImpl;
-import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl;
-
-import javax.jms.Destination;
-
-/**
- * Test cases used to test the JMS message consumer.
- */
-public class JMSMessageTest extends JmsTestSupport {
-
- public Destination destination;
- public int deliveryMode = DeliveryMode.NON_PERSISTENT;
- public int prefetch;
- public int ackMode;
- public MessagingSessionFacade.DestinationType destinationType = MessagingSessionFacade.DestinationType.TOPIC;
- public boolean durableConsumer;
-
- /**
- * Run all these tests in both marshaling and non-marshaling mode.
- */
- public void initCombos() {
- addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
- Integer.valueOf(DeliveryMode.PERSISTENT)});
- addCombinationValues("destinationType", new Object[] {MessagingSessionFacade.DestinationType.TOPIC});
- }
-
- public void testTextMessage() throws Exception {
-
- // Receive a message with the JMS API
- connection.start();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- destination = createDestination(session, destinationType);
- MessageConsumer consumer = session.createConsumer(destination);
- MessageProducer producer = session.createProducer(destination);
-
- // Send the message.
- {
- TextMessage message = session.createTextMessage();
- message.setText("Hi");
- producer.send(message);
- }
-
- // Check the Message
- {
- TextMessage message = (TextMessage)consumer.receive(1000);
- assertNotNull(message);
- assertEquals("Hi", message.getText());
- }
-
- assertNull(consumer.receiveNoWait());
- }
-
- public static Test suite() {
- return suite(JMSMessageTest.class);
- }
-
- public static void main(String[] args) {
- junit.textui.TestRunner.run(suite());
- }
-
- protected ConnectionFactory createConnectionFactory() throws URISyntaxException {
- HedwigConnectionFactoryImpl factory = new HedwigConnectionFactoryImpl();
- return factory;
- }
-
- public void testBytesMessageLength() throws Exception {
-
- // Receive a message with the JMS API
- connection.start();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- destination = createDestination(session, destinationType);
- MessageConsumer consumer = session.createConsumer(destination);
- MessageProducer producer = session.createProducer(destination);
-
- // Send the message
- {
- BytesMessage message = session.createBytesMessage();
- message.writeInt(1);
- message.writeInt(2);
- message.writeInt(3);
- message.writeInt(4);
- producer.send(message);
- }
-
- // Check the message.
- {
- BytesMessage message = (BytesMessage)consumer.receive(1000);
- assertNotNull(message);
- assertEquals(16, message.getBodyLength());
- }
-
- assertNull(consumer.receiveNoWait());
- }
-
- public void testObjectMessage() throws Exception {
-
- // Receive a message with the JMS API
- connection.start();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- destination = createDestination(session, destinationType);
- MessageConsumer consumer = session.createConsumer(destination);
- MessageProducer producer = session.createProducer(destination);
-
- // send the message.
- {
- ObjectMessage message = session.createObjectMessage();
- message.setObject("Hi");
- producer.send(message);
- }
-
- // Check the message
- {
- ObjectMessage message = (ObjectMessage)consumer.receive(1000);
- assertNotNull(message);
- assertEquals("Hi", message.getObject());
- }
- assertNull(consumer.receiveNoWait());
- }
-
- public void testBytesMessage() throws Exception {
-
- // Receive a message with the JMS API
- connection.start();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- destination = createDestination(session, destinationType);
- MessageConsumer consumer = session.createConsumer(destination);
- MessageProducer producer = session.createProducer(destination);
-
- // Send the message
- {
- BytesMessage message = session.createBytesMessage();
- message.writeBoolean(true);
- producer.send(message);
- }
-
- // Check the message
- {
- BytesMessage message = (BytesMessage)consumer.receive(1000);
- assertNotNull(message);
- assertTrue(message.readBoolean());
-
- try {
- message.readByte();
- fail("Expected exception not thrown.");
- } catch (MessageEOFException e) {
- }
-
- }
- assertNull(consumer.receiveNoWait());
- }
-
- public void testStreamMessage() throws Exception {
-
- // Receive a message with the JMS API
- connection.start();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- destination = createDestination(session, destinationType);
- MessageConsumer consumer = session.createConsumer(destination);
- MessageProducer producer = session.createProducer(destination);
-
- // Send the message.
- {
- StreamMessage message = session.createStreamMessage();
- message.writeString("This is a test to see how it works.");
- producer.send(message);
- }
-
- // Check the message.
- {
- StreamMessage message = (StreamMessage)consumer.receive(1000);
- assertNotNull(message);
-
- // Invalid conversion should throw exception and not move the stream
- // position.
- try {
- message.readByte();
- fail("Should have received NumberFormatException");
- } catch (NumberFormatException e) {
- } catch (MessageFormatException e) {
- }
-
- assertEquals("This is a test to see how it works.", message.readString());
-
- // Invalid conversion should throw exception and not move the stream
- // position.
- try {
- message.readByte();
- fail("Should have received MessageEOFException");
- } catch (MessageEOFException e) {
- }
- }
- assertNull(consumer.receiveNoWait());
- }
-
- public void testMapMessage() throws Exception {
-
- // Receive a message with the JMS API
- connection.start();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- destination = createDestination(session, destinationType);
- MessageConsumer consumer = session.createConsumer(destination);
- MessageProducer producer = session.createProducer(destination);
-
- // send the message.
- {
- MapMessage message = session.createMapMessage();
- message.setBoolean("boolKey", true);
- producer.send(message);
- }
-
- // get the message.
- {
- MapMessage message = (MapMessage)consumer.receive(1000);
- assertNotNull(message);
- assertTrue(message.getBoolean("boolKey"));
- }
- assertNull(consumer.receiveNoWait());
- }
-
- static class ForeignMessage implements TextMessage {
-
- public int deliveryMode;
-
- private String messageId;
- private long timestamp;
- private String correlationId;
- private Destination replyTo;
- private Destination destination;
- private boolean redelivered;
- private String type;
- private long expiration;
- private int priority;
- private String text;
- private HashMap<String, Object> props = new HashMap<String, Object>();
-
- public String getJMSMessageID() throws JMSException {
- return messageId;
- }
-
- public void setJMSMessageID(String arg0) throws JMSException {
- messageId = arg0;
- }
-
- public long getJMSTimestamp() throws JMSException {
- return timestamp;
- }
-
- public void setJMSTimestamp(long arg0) throws JMSException {
- timestamp = arg0;
- }
-
- public byte[] getJMSCorrelationIDAsBytes() throws JMSException {
- return null;
- }
-
- public void setJMSCorrelationIDAsBytes(byte[] arg0) throws JMSException {
- }
-
- public void setJMSCorrelationID(String arg0) throws JMSException {
- correlationId = arg0;
- }
-
- public String getJMSCorrelationID() throws JMSException {
- return correlationId;
- }
-
- public Destination getJMSReplyTo() throws JMSException {
- return replyTo;
- }
-
- public void setJMSReplyTo(Destination arg0) throws JMSException {
- replyTo = arg0;
- }
-
- public Destination getJMSDestination() throws JMSException {
- return destination;
- }
-
- public void setJMSDestination(Destination arg0) throws JMSException {
- destination = arg0;
- }
-
- public int getJMSDeliveryMode() throws JMSException {
- return deliveryMode;
- }
-
- public void setJMSDeliveryMode(int arg0) throws JMSException {
- deliveryMode = arg0;
- }
-
- public boolean getJMSRedelivered() throws JMSException {
- return redelivered;
- }
-
- public void setJMSRedelivered(boolean arg0) throws JMSException {
- redelivered = arg0;
- }
-
- public String getJMSType() throws JMSException {
- return type;
- }
-
- public void setJMSType(String arg0) throws JMSException {
- type = arg0;
- }
-
- public long getJMSExpiration() throws JMSException {
- return expiration;
- }
-
- public void setJMSExpiration(long arg0) throws JMSException {
- expiration = arg0;
- }
-
- public int getJMSPriority() throws JMSException {
- return priority;
- }
-
- public void setJMSPriority(int arg0) throws JMSException {
- priority = arg0;
- }
-
- public void clearProperties() throws JMSException {
- }
-
- public boolean propertyExists(String arg0) throws JMSException {
- return false;
- }
-
- public boolean getBooleanProperty(String arg0) throws JMSException {
- return false;
- }
-
- public byte getByteProperty(String arg0) throws JMSException {
- return 0;
- }
-
- public short getShortProperty(String arg0) throws JMSException {
- return 0;
- }
-
- public int getIntProperty(String arg0) throws JMSException {
- return 0;
- }
-
- public long getLongProperty(String arg0) throws JMSException {
- return 0;
- }
-
- public float getFloatProperty(String arg0) throws JMSException {
- return 0;
- }
-
- public double getDoubleProperty(String arg0) throws JMSException {
- return 0;
- }
-
- public String getStringProperty(String arg0) throws JMSException {
- return (String)props.get(arg0);
- }
-
- public Object getObjectProperty(String arg0) throws JMSException {
- return props.get(arg0);
- }
-
- public Enumeration getPropertyNames() throws JMSException {
- return new Vector<String>(props.keySet()).elements();
- }
-
- public void setBooleanProperty(String arg0, boolean arg1) throws JMSException {
- }
-
- public void setByteProperty(String arg0, byte arg1) throws JMSException {
- }
-
- public void setShortProperty(String arg0, short arg1) throws JMSException {
- }
-
- public void setIntProperty(String arg0, int arg1) throws JMSException {
- }
-
- public void setLongProperty(String arg0, long arg1) throws JMSException {
- }
-
- public void setFloatProperty(String arg0, float arg1) throws JMSException {
- }
-
- public void setDoubleProperty(String arg0, double arg1) throws JMSException {
- }
-
- public void setStringProperty(String arg0, String arg1) throws JMSException {
- props.put(arg0, arg1);
- }
-
- public void setObjectProperty(String arg0, Object arg1) throws JMSException {
- props.put(arg0, arg1);
- }
-
- public void acknowledge() throws JMSException {
- }
-
- public void clearBody() throws JMSException {
- }
-
- public void setText(String arg0) throws JMSException {
- text = arg0;
- }
-
- public String getText() throws JMSException {
- return text;
- }
- }
-
- public void testForeignMessage() throws Exception {
-
- // Receive a message with the JMS API
- connection.start();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- destination = createDestination(session, destinationType);
- MessageConsumer consumer = session.createConsumer(destination);
- MessageProducer producer = session.createProducer(destination);
-
- // Send the message.
- {
- ForeignMessage message = new ForeignMessage();
- message.text = "Hello";
- message.setStringProperty("test", "value");
- // long timeToLive = 10000L;
- long timeToLive = 0L;
- long start = System.currentTimeMillis();
- producer.send(message, Session.AUTO_ACKNOWLEDGE, 7, timeToLive);
- long end = System.currentTimeMillis();
-
-
- //validate jms spec 1.1 section 3.4.11 table 3.1
- // JMSDestination, JMSDeliveryMode, JMSPriority, JMSMessageID, and JMSTimestamp
- //must be set by sending a message.
-
- // This is NOT specified in the spec !
- // exception for jms destination as the format is provider defined so it is only set on the copy
- // assertNull(message.getJMSDestination());
-
- assertEquals(Session.AUTO_ACKNOWLEDGE, message.getJMSDeliveryMode());
- // assertTrue(start + timeToLive <= message.getJMSExpiration());
- // assertTrue(end + timeToLive >= message.getJMSExpiration());
- assertEquals(7, message.getJMSPriority());
- assertNotNull(message.getJMSMessageID());
- assertTrue(start <= message.getJMSTimestamp());
- assertTrue(end >= message.getJMSTimestamp());
- }
-
- // Validate message is OK.
- {
- TextMessage message = (TextMessage)consumer.receive(1000);
- assertNotNull(message);
- assertEquals("Hello", message.getText());
- assertEquals("value", message.getStringProperty("test"));
- }
-
- assertNull(consumer.receiveNoWait());
- }
-
-}