You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2016/03/16 04:44:45 UTC
[35/49] bookkeeper git commit: BOOKKEEPER-769: Remove the Hedwig Code
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/test/TestSupport.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/test/TestSupport.java b/hedwig-client-jms/src/test/java/org/apache/activemq/test/TestSupport.java
deleted file mode 100644
index 6de9021..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/test/TestSupport.java
+++ /dev/null
@@ -1,261 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.test;
-
-import javax.jms.Topic;
-import java.io.File;
-
-import org.apache.hedwig.JmsTestBase;
-import org.apache.hedwig.jms.SessionImpl;
-import java.lang.reflect.Array;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.TextMessage;
-
-import junit.framework.TestCase;
-
-import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl;
-import org.apache.hedwig.jms.message.MessageImpl;
-
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Useful base class for unit test cases
- */
-public abstract class TestSupport extends JmsTestBase {
- private static final Logger LOG = LoggerFactory.getLogger(TestSupport.class);
-
- protected HedwigConnectionFactoryImpl connectionFactory;
- protected boolean topic = true;
-
- public TestSupport() {
- super();
- }
-
- public TestSupport(String name) {
- super(name);
- }
-
- /**
- * Creates an MessageImpl.
- *
- * @return MessageImpl
- */
- protected MessageImpl createMessage() {
- return new MessageImpl(null);
- }
-
- /**
- * Creates a destination.
- *
- * @param subject - topic or queue name.
- * @return Destination - either an Topic or ActiveMQQUeue.
- */
- protected Destination createDestination(String subject) {
- if (topic) {
- return SessionImpl.asTopic(subject);
- } else {
- return SessionImpl.asTopic(subject);
- }
- }
-
- /**
- * Tests if firstSet and secondSet are equal.
- *
- * @param messsage - string to be displayed when the assertion fails.
- * @param firstSet[] - set of messages to be compared with its counterpart
- * in the secondset.
- * @param secondSet[] - set of messages to be compared with its counterpart
- * in the firstset.
- * @throws JMSException
- */
- protected void assertTextMessagesEqual(Message[] firstSet, Message[] secondSet) throws JMSException {
- assertTextMessagesEqual("", firstSet, secondSet);
- }
-
- /**
- * Tests if firstSet and secondSet are equal.
- *
- * @param messsage - string to be displayed when the assertion fails.
- * @param firstSet[] - set of messages to be compared with its counterpart
- * in the secondset.
- * @param secondSet[] - set of messages to be compared with its counterpart
- * in the firstset.
- */
- protected void assertTextMessagesEqual(String messsage, Message[] firstSet,
- Message[] secondSet) throws JMSException {
- assertEquals("Message count does not match: " + messsage, firstSet.length, secondSet.length);
-
- for (int i = 0; i < secondSet.length; i++) {
- TextMessage m1 = (TextMessage)firstSet[i];
- TextMessage m2 = (TextMessage)secondSet[i];
- assertTextMessageEqual("Message " + (i + 1) + " did not match : ", m1, m2);
- }
- }
-
- /**
- * Tests if m1 and m2 are equal.
- *
- * @param m1 - message to be compared with m2.
- * @param m2 - message to be compared with m1.
- * @throws JMSException
- */
- protected void assertEquals(TextMessage m1, TextMessage m2) throws JMSException {
- assertEquals("", m1, m2);
- }
-
- /**
- * Tests if m1 and m2 are equal.
- *
- * @param message - string to be displayed when the assertion fails.
- * @param m1 - message to be compared with m2.
- * @param m2 - message to be compared with m1.
- */
- protected void assertTextMessageEqual(String message, TextMessage m1, TextMessage m2) throws JMSException {
- assertFalse(message + ": expected {" + m1 + "}, but was {" + m2 + "}", m1 == null ^ m2 == null);
-
- if (m1 == null) {
- return;
- }
-
- assertEquals(message, m1.getText(), m2.getText());
- }
-
- /**
- * Tests if m1 and m2 are equal.
- *
- * @param m1 - message to be compared with m2.
- * @param m2 - message to be compared with m1.
- * @throws JMSException
- */
- protected void assertEquals(Message m1, Message m2) throws JMSException {
- assertEquals("", m1, m2);
- }
-
- /**
- * Tests if m1 and m2 are equal.
- *
- * @param message - error message.
- * @param m1 - message to be compared with m2.
- * @param m2 -- message to be compared with m1.
- */
- protected void assertEquals(String message, Message m1, Message m2) throws JMSException {
- assertFalse(message + ": expected {" + m1 + "}, but was {" + m2 + "}", m1 == null ^ m2 == null);
-
- if (m1 == null) {
- return;
- }
-
- assertTrue(message + ": expected {" + m1 + "}, but was {" + m2 + "}", m1.getClass() == m2.getClass());
-
- if (m1 instanceof TextMessage) {
- assertTextMessageEqual(message, (TextMessage)m1, (TextMessage)m2);
- } else {
- assertEquals(message, m1, m2);
- }
- }
-
- /**
- * Test if base directory contains spaces
- */
- protected void assertBaseDirectoryContainsSpaces() {
- assertFalse("Base directory cannot contain spaces.",
- new File(System.getProperty("basedir", ".")).getAbsoluteFile().toString().contains(" "));
- }
-
- /**
- * Creates an HedwigConnectionFactoryImpl.
- *
- * @return HedwigConnectionFactoryImpl
- * @throws Exception
- */
- protected HedwigConnectionFactoryImpl createConnectionFactory() throws Exception {
- return new HedwigConnectionFactoryImpl();
- }
-
- /**
- * Factory method to create a new connection.
- *
- * @return connection
- * @throws Exception
- */
- protected Connection createConnection() throws Exception {
- return getConnectionFactory().createConnection();
- }
-
- /**
- * Creates an ActiveMQ connection factory.
- *
- * @return connectionFactory
- * @throws Exception
- */
- public HedwigConnectionFactoryImpl getConnectionFactory() throws Exception {
- if (connectionFactory == null) {
- connectionFactory = createConnectionFactory();
- assertTrue("Should have created a connection factory!", connectionFactory != null);
- }
-
- return connectionFactory;
- }
-
- /**
- * Returns the consumer subject.
- *
- * @return String
- */
- protected String getConsumerSubject() {
- return getSubject();
- }
-
- /**
- * Returns the producer subject.
- *
- * @return String
- */
- protected String getProducerSubject() {
- return getSubject();
- }
-
- /**
- * Returns the subject.
- *
- * @return String
- */
- protected String getSubject() {
- return getClass().getName() + "." + getName();
- }
-
- protected void assertArrayEqual(String message, Object[] expected, Object[] actual) {
- assertEquals(message + ". Array length", expected.length, actual.length);
- for (int i = 0; i < expected.length; i++) {
- assertEquals(message + ". element: " + i, expected[i], actual[i]);
- }
- }
-
- protected void assertPrimitiveArrayEqual(String message, Object expected, Object actual) {
- int length = Array.getLength(expected);
- assertEquals(message + ". Array length", length, Array.getLength(actual));
- for (int i = 0; i < length; i++) {
- assertEquals(message + ". element: " + i, Array.get(expected, i), Array.get(actual, i));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/test/rollback/DelegatingTransactionalMessageListener.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/test/rollback/DelegatingTransactionalMessageListener.java b/hedwig-client-jms/src/test/java/org/apache/activemq/test/rollback/DelegatingTransactionalMessageListener.java
deleted file mode 100644
index 0bc8a8e..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/test/rollback/DelegatingTransactionalMessageListener.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.test.rollback;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.Session;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class DelegatingTransactionalMessageListener implements MessageListener {
- private static final transient Logger LOG = LoggerFactory.getLogger(DelegatingTransactionalMessageListener.class);
-
- private final MessageListener underlyingListener;
- private boolean transacted = true;
- private int ackMode = Session.AUTO_ACKNOWLEDGE;
- private Session session;
-
- public DelegatingTransactionalMessageListener(MessageListener underlyingListener,
- Connection connection, Destination destination) {
- this.underlyingListener = underlyingListener;
-
- try {
- session = connection.createSession(transacted, ackMode);
- MessageConsumer consumer = session.createConsumer(destination);
- consumer.setMessageListener(this);
- } catch (JMSException e) {
- throw new IllegalStateException("Could not listen to " + destination, e);
- }
- }
-
- public void onMessage(Message message) {
- try {
- underlyingListener.onMessage(message);
- session.commit();
- } catch (Throwable e) {
- rollback();
- }
- }
-
- private void rollback() {
- try {
- session.rollback();
- } catch (JMSException e) {
- LOG.error("Failed to rollback: " + e, e);
- }
- }
-
- public Session getSession() {
- return session;
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/ChangeSentMessageTest.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/ChangeSentMessageTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/ChangeSentMessageTest.java
deleted file mode 100644
index 5bc1a07..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/ChangeSentMessageTest.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.usecases;
-
-import java.util.HashMap;
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.ObjectMessage;
-import javax.jms.Session;
-
-import org.apache.activemq.test.TestSupport;
-
-public class ChangeSentMessageTest extends TestSupport {
- private static final int COUNT = 200;
- private static final String VALUE_NAME = "value";
-
- /**
- * test Object messages can be changed after sending with no side-affects
- *
- * @throws Exception
- */
- public void testDoChangeSentMessage() throws Exception {
- Destination destination = createDestination("test-" + ChangeSentMessageTest.class.getName());
- Connection connection = createConnection();
- connection.start();
- Session consumerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer = consumerSession.createConsumer(destination);
- Session publisherSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = publisherSession.createProducer(destination);
- HashMap<String, Integer> map = new HashMap<String, Integer>();
- ObjectMessage message = publisherSession.createObjectMessage();
- for (int i = 0; i < COUNT; i++) {
- map.put(VALUE_NAME, Integer.valueOf(i));
- message.setObject(map);
- producer.send(message);
- assertTrue(message.getObject() == map);
- }
- for (int i = 0; i < COUNT; i++) {
- ObjectMessage msg = (ObjectMessage)consumer.receive();
- HashMap receivedMap = (HashMap)msg.getObject();
- Integer intValue = (Integer)receivedMap.get(VALUE_NAME);
- assertTrue(intValue.intValue() == i);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/ChangeSessionDeliveryModeTest.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/ChangeSessionDeliveryModeTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/ChangeSessionDeliveryModeTest.java
deleted file mode 100644
index b4da7a7..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/ChangeSessionDeliveryModeTest.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.usecases;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.IllegalStateException;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.Session;
-
-import org.apache.activemq.test.TestSupport;
-
-public class ChangeSessionDeliveryModeTest extends TestSupport implements MessageListener {
-
- /**
- * test following condition- which are defined by JMS Spec 1.1:
- * MessageConsumers cannot use a MessageListener and receive() from the same
- * session
- *
- * @throws Exception
- */
- public void testDoChangeSessionDeliveryMode() throws Exception {
- Destination destination = createDestination("foo.bar");
- Connection connection = createConnection();
- connection.start();
- Session consumerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- /*
- MessageConsumer consumer1 = consumerSession.createConsumer(destination);
- consumer1.setMessageListener(this);
- JMSException jmsEx = null;
- MessageConsumer consumer2 = consumerSession.createConsumer(destination);
-
- try {
- consumer2.receive(10);
- fail("Did not receive expected exception.");
- } catch (JMSException e) {
- assertTrue(e instanceof IllegalStateException);
- }
- */
- MessageConsumer consumer1 = consumerSession.createConsumer(destination);
- consumer1.setMessageListener(this);
-
- try {
- consumer1.receive(10);
- fail("Did not receive expected exception.");
- } catch (JMSException e) {
- assertTrue(e instanceof IllegalStateException);
- }
- }
-
- public void onMessage(Message msg) {
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/CompositeConsumeTest.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/CompositeConsumeTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/CompositeConsumeTest.java
deleted file mode 100644
index 7ea8cc0..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/CompositeConsumeTest.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.usecases;
-
-import javax.jms.Destination;
-import javax.jms.Message;
-
-
-import org.apache.activemq.test.JmsTopicSendReceiveWithTwoConnectionsTest;
-
-import org.apache.hedwig.jms.SessionImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class CompositeConsumeTest extends JmsTopicSendReceiveWithTwoConnectionsTest {
- private static final Logger LOG = LoggerFactory.getLogger(CompositeConsumeTest.class);
-
- public void testSendReceive() throws Exception {
- messages.clear();
-
- Destination[] destinations = getDestinations();
- int destIdx = 0;
-
- for (int i = 0; i < data.length; i++) {
- Message message = session.createTextMessage(data[i]);
-
- if (verbose) {
- LOG.info("About to send a message: " + message + " with text: " + data[i]);
- }
-
- producer.send(destinations[destIdx], message);
-
- if (++destIdx >= destinations.length) {
- destIdx = 0;
- }
- }
-
- assertMessagesAreReceived();
- }
-
- /**
- * Returns the subscription subject
- */
- protected String getSubject() {
- // return getPrefix() + "FOO.BAR," + getPrefix() + "FOO.X.Y," + getPrefix() + "BAR.>";
- return getPrefix() + "FOO.BAR";
- }
-
- /**
- * Returns the destinations on which we publish
- */
- protected Destination[] getDestinations() {
- return new Destination[]{SessionImpl.asTopic(getSubject())};
- }
-
- protected String getPrefix() {
- return super.getSubject() + ".";
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/CompositePublishTest.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/CompositePublishTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/CompositePublishTest.java
deleted file mode 100644
index 7833e01..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/CompositePublishTest.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.usecases;
-
-import java.util.List;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.Session;
-
-import org.apache.hedwig.jms.SessionImpl;
-import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl;
-
-import org.apache.activemq.test.JmsSendReceiveTestSupport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class CompositePublishTest extends JmsSendReceiveTestSupport {
- private static final Logger LOG = LoggerFactory.getLogger(CompositePublishTest.class);
-
- protected Connection sendConnection;
- protected Connection receiveConnection;
- protected Session receiveSession;
- protected MessageConsumer[] consumers;
- protected List[] messageLists;
-
- @SuppressWarnings("unchecked")
- protected void setUp() throws Exception {
- super.setUp();
-
- connectionFactory = createConnectionFactory();
-
- sendConnection = createConnection(false);
- sendConnection.start();
-
- receiveConnection = createConnection(false);
- receiveConnection.start();
-
- LOG.info("Created sendConnection: " + sendConnection);
- LOG.info("Created receiveConnection: " + receiveConnection);
-
- session = sendConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- receiveSession = receiveConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- LOG.info("Created sendSession: " + session);
- LOG.info("Created receiveSession: " + receiveSession);
-
- producer = session.createProducer(null);
-
- LOG.info("Created producer: " + producer);
-
- consumerDestination = session.createTopic(getConsumerSubject());
- producerDestination = session.createTopic(getProducerSubject());
-
- LOG.info("Created consumer destination: " + consumerDestination
- + " of type: " + consumerDestination.getClass());
- LOG.info("Created producer destination: " + producerDestination
- + " of type: " + producerDestination.getClass());
-
- Destination[] destinations = getDestinations();
- consumers = new MessageConsumer[destinations.length];
- messageLists = new List[destinations.length];
- for (int i = 0; i < destinations.length; i++) {
- Destination dest = destinations[i];
- messageLists[i] = createConcurrentList();
- consumers[i] = receiveSession.createConsumer(dest);
- consumers[i].setMessageListener(createMessageListener(i, messageLists[i]));
- }
-
- LOG.info("Started connections");
- }
-
- protected MessageListener createMessageListener(int i, final List<Message> messageList) {
- return new MessageListener() {
- public void onMessage(Message message) {
- consumeMessage(message, messageList);
- }
- };
- }
-
- /**
- * Returns the subject on which we publish
- */
- protected String getSubject() {
- // return getPrefix() + "FOO.BAR," + getPrefix() + "FOO.X.Y";
- return getPrefix() + "FOO.BAR";
- }
-
- /**
- * Returns the destinations to which we consume
- */
- protected Destination[] getDestinations() {
- // return new Destination[] {SessionImpl.asTopic(getPrefix() + "FOO.BAR"),
- // SessionImpl.asTopic(getPrefix() + "FOO.*"), SessionImpl.asTopic(getPrefix() + "FOO.X.Y")};
- return new Destination[] {SessionImpl.asTopic(getPrefix() + "FOO.BAR")};
- }
-
- protected String getPrefix() {
- return super.getSubject() + ".";
- }
-
- @SuppressWarnings("unchecked")
- protected void assertMessagesAreReceived() throws JMSException {
- waitForMessagesToBeDelivered();
- int size = messageLists.length;
- for (int i = 0; i < size; i++) {
- LOG.info("Message list: " + i + " contains: " + messageLists[i].size() + " message(s)");
- }
- size = messageLists.length;
- for (int i = 0; i < size; i++) {
- assertMessagesReceivedAreValid(messageLists[i]);
- }
- }
-
- protected HedwigConnectionFactoryImpl createConnectionFactory() {
- return new HedwigConnectionFactoryImpl();
- }
-
- protected void tearDown() throws Exception {
- session.close();
- receiveSession.close();
-
- sendConnection.close();
- receiveConnection.close();
- super.tearDown();
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/ConcurrentProducerDurableConsumerTest.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/ConcurrentProducerDurableConsumerTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/ConcurrentProducerDurableConsumerTest.java
deleted file mode 100644
index 087c5ef..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/ConcurrentProducerDurableConsumerTest.java
+++ /dev/null
@@ -1,413 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.usecases;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.jms.TopicSubscriber;
-import junit.framework.Test;
-import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl;
-import org.apache.activemq.TestSupport;
-
-import org.apache.activemq.util.MessageIdList;
-import org.apache.activemq.util.Wait;
-//import org.apache.commons.dbcp.BasicDataSource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ConcurrentProducerDurableConsumerTest extends TestSupport {
- private static final Logger LOG = LoggerFactory.getLogger(ConcurrentProducerDurableConsumerTest.class);
- private int consumerCount = 5;
- protected List<Connection> connections = Collections.synchronizedList(new ArrayList<Connection>());
- protected Map<MessageConsumer, TimedMessageListener> consumers
- = new HashMap<MessageConsumer, TimedMessageListener>();
- protected MessageIdList allMessagesList = new MessageIdList();
- private int messageSize = 1024;
-
- public void testSendRateWithActivatingConsumers() throws Exception {
- final Destination destination = createDestination();
- final ConnectionFactory factory = createConnectionFactory();
- startInactiveConsumers(factory, destination);
-
- Connection connection = factory.createConnection();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = createMessageProducer(session, destination);
-
- // preload the durable consumers
- double[] inactiveConsumerStats = produceMessages(destination, 500, 10, session, producer, null);
- LOG.info("With inactive consumers: ave: " + inactiveConsumerStats[1]
- + ", max: " + inactiveConsumerStats[0] + ", multiplier: "
- + (inactiveConsumerStats[0]/inactiveConsumerStats[1]));
-
- // periodically start a durable sub that has a backlog
- final int consumersToActivate = 5;
- final Object addConsumerSignal = new Object();
- Executors.newCachedThreadPool(new ThreadFactory() {
- @Override
- public Thread newThread(Runnable r) {
- return new Thread(r, "ActivateConsumer" + this);
- }
- }).execute(new Runnable() {
- @Override
- public void run() {
- try {
- MessageConsumer consumer = null;
- for (int i = 0; i < consumersToActivate; i++) {
- LOG.info("Waiting for add signal from producer...");
- synchronized (addConsumerSignal) {
- addConsumerSignal.wait(30 * 60 * 1000);
- }
- TimedMessageListener listener = new TimedMessageListener();
- consumer = createDurableSubscriber(factory.createConnection(),
- destination, "consumer" + (i + 1));
- LOG.info("Created consumer " + consumer);
- consumer.setMessageListener(listener);
- consumers.put(consumer, listener);
- }
- } catch (Exception e) {
- LOG.error("failed to start consumer", e);
- }
- }
- });
-
-
- double[] statsWithActive = produceMessages(destination, 500, 10, session, producer, addConsumerSignal);
-
- LOG.info(" with concurrent activate, ave: " + statsWithActive[1]
- + ", max: " + statsWithActive[0] + ", multiplier: " + (statsWithActive[0]/ statsWithActive[1]));
-
- while(consumers.size() < consumersToActivate) {
- TimeUnit.SECONDS.sleep(2);
- }
-
- long timeToFirstAccumulator = 0;
- for (TimedMessageListener listener : consumers.values()) {
- long time = listener.getFirstReceipt();
- timeToFirstAccumulator += time;
- LOG.info("Time to first " + time);
- }
- LOG.info("Ave time to first message =" + timeToFirstAccumulator/consumers.size());
-
- for (TimedMessageListener listener : consumers.values()) {
- LOG.info("Ave batch receipt time: " + listener.waitForReceivedLimit(10000)
- + " max receipt: " + listener.maxReceiptTime);
- }
-
- //assertTrue("max (" + statsWithActive[0] + ") within reasonable
- // multiplier of ave (" + statsWithActive[1] + ")",
- // statsWithActive[0] < 5 * statsWithActive[1]);
-
- // compare no active to active
- LOG.info("Ave send time with active: " + statsWithActive[1]
- + " as multiplier of ave with none active: " + inactiveConsumerStats[1]
- + ", multiplier=" + (statsWithActive[1]/inactiveConsumerStats[1]));
-
- assertTrue("Ave send time with active: " + statsWithActive[1]
- + " within reasonable multpler of ave with none active: " + inactiveConsumerStats[1]
- + ", multiplier " + (statsWithActive[1]/inactiveConsumerStats[1]),
- statsWithActive[1] < 15 * inactiveConsumerStats[1]);
- }
-
-
- public void x_testSendWithInactiveAndActiveConsumers() throws Exception {
- Destination destination = createDestination();
- ConnectionFactory factory = createConnectionFactory();
- startInactiveConsumers(factory, destination);
-
- Connection connection = factory.createConnection();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = session.createProducer(destination);
- producer.setDeliveryMode(DeliveryMode.PERSISTENT);
-
- final int toSend = 100;
- final int numIterations = 5;
-
- double[] noConsumerStats = produceMessages(destination, toSend, numIterations, session, producer, null);
-
- startConsumers(factory, destination);
- LOG.info("Activated consumer");
-
- double[] withConsumerStats = produceMessages(destination, toSend, numIterations, session, producer, null);
-
- LOG.info("With consumer: " + withConsumerStats[1] + " , with noConsumer: " + noConsumerStats[1]
- + ", multiplier: " + (withConsumerStats[1]/noConsumerStats[1]));
- final int reasonableMultiplier = 15; // not so reasonable but improving
- assertTrue("max X times as slow with consumer: " + withConsumerStats[1] + ", with no Consumer: "
- + noConsumerStats[1] + ", multiplier: " + (withConsumerStats[1]/noConsumerStats[1]),
- withConsumerStats[1] < noConsumerStats[1] * reasonableMultiplier);
-
- final int toReceive = toSend * numIterations * consumerCount * 2;
- Wait.waitFor(new Wait.Condition() {
- public boolean isSatisified() throws Exception {
- LOG.info("count: " + allMessagesList.getMessageCount());
- return toReceive == allMessagesList.getMessageCount();
- }
- }, 60 * 1000);
-
- assertEquals("got all messages", toReceive, allMessagesList.getMessageCount());
- }
-
-
- private MessageProducer createMessageProducer(Session session, Destination destination) throws JMSException {
- MessageProducer producer = session.createProducer(destination);
- producer.setDeliveryMode(DeliveryMode.PERSISTENT);
- return producer;
- }
-
-
- private void startInactiveConsumers(ConnectionFactory factory, Destination destination) throws Exception {
- // create off line consumers
- startConsumers(factory, destination);
- for (Connection connection: connections) {
- connection.close();
- }
- connections.clear();
- consumers.clear();
- }
-
-
- protected void startConsumers(ConnectionFactory factory, Destination dest) throws Exception {
- MessageConsumer consumer;
- for (int i = 0; i < consumerCount; i++) {
- TimedMessageListener list = new TimedMessageListener();
- consumer = createDurableSubscriber(factory.createConnection(), dest, "consumer" + (i + 1));
- consumer.setMessageListener(list);
- consumers.put(consumer, list);
- }
- }
-
- protected TopicSubscriber createDurableSubscriber(Connection conn,
- Destination dest, String name) throws Exception {
- conn.setClientID(name);
- connections.add(conn);
- conn.start();
-
- Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- final TopicSubscriber consumer = sess.createDurableSubscriber((javax.jms.Topic)dest, name);
-
- return consumer;
- }
-
- /**
- * @return max and ave send time
- * @throws Exception
- */
- private double[] produceMessages(Destination destination,
- final int toSend,
- final int numIterations,
- Session session,
- MessageProducer producer,
- Object addConsumerSignal) throws Exception {
- long start;
- long count = 0;
- double batchMax = 0, max = 0, sum = 0;
- for (int i=0; i<numIterations; i++) {
- start = System.currentTimeMillis();
- for (int j=0; j < toSend; j++) {
- long singleSendstart = System.currentTimeMillis();
- TextMessage msg = createTextMessage(session, "" + j);
- // rotate
- int priority = ((int)count%10);
- producer.send(msg, DeliveryMode.PERSISTENT, priority, 0);
- max = Math.max(max, (System.currentTimeMillis() - singleSendstart));
- if (++count % 500 == 0) {
- if (addConsumerSignal != null) {
- synchronized (addConsumerSignal) {
- addConsumerSignal.notifyAll();
- LOG.info("Signalled add consumer");
- }
- }
- }
- ;
- if (count % 5000 == 0) {
- LOG.info("Sent " + count + ", singleSendMax:" + max);
- }
-
- }
- long duration = System.currentTimeMillis() - start;
- batchMax = Math.max(batchMax, duration);
- sum += duration;
- LOG.info("Iteration " + i + ", sent " + toSend + ", time: "
- + duration + ", batchMax:" + batchMax + ", singleSendMax:" + max);
- }
-
- LOG.info("Sent: " + toSend * numIterations + ", batchMax: " + batchMax + " singleSendMax: " + max);
- return new double[]{batchMax, sum/numIterations};
- }
-
- protected TextMessage createTextMessage(Session session, String initText) throws Exception {
- TextMessage msg = session.createTextMessage();
-
- // Pad message text
- if (initText.length() < messageSize) {
- char[] data = new char[messageSize - initText.length()];
- Arrays.fill(data, '*');
- String str = new String(data);
- msg.setText(initText + str);
-
- // Do not pad message text
- } else {
- msg.setText(initText);
- }
-
- return msg;
- }
-
- @Override
- protected void setUp() throws Exception {
- topic = true;
- super.setUp();
- }
-
- @Override
- protected void tearDown() throws Exception {
- for (Iterator<Connection> iter = connections.iterator(); iter.hasNext();) {
- Connection conn = iter.next();
- try {
- conn.close();
- } catch (Throwable e) {
- }
- }
- allMessagesList.flushMessages();
- consumers.clear();
- super.tearDown();
- }
-
-
- protected HedwigConnectionFactoryImpl createConnectionFactory() throws Exception {
- HedwigConnectionFactoryImpl factory = new HedwigConnectionFactoryImpl();
- return factory;
- }
-
- public static Test suite() {
- return suite(ConcurrentProducerDurableConsumerTest.class);
- }
-
- class TimedMessageListener implements MessageListener {
- final int batchSize = 1000;
- CountDownLatch firstReceiptLatch = new CountDownLatch(1);
- long mark = System.currentTimeMillis();
- long firstReceipt = 0l;
- long receiptAccumulator = 0;
- long batchReceiptAccumulator = 0;
- long maxReceiptTime = 0;
- AtomicLong count = new AtomicLong(0);
- Map<Integer, MessageIdList> messageLists
- = new ConcurrentHashMap<Integer, MessageIdList>(new HashMap<Integer, MessageIdList>());
-
- @Override
- public void onMessage(Message message) {
- final long current = System.currentTimeMillis();
- final long duration = current - mark;
- receiptAccumulator += duration;
- int priority = 0;
- try {
- priority = message.getJMSPriority();
- } catch (JMSException ignored) {}
- if (!messageLists.containsKey(priority)) {
- messageLists.put(priority, new MessageIdList());
- }
- messageLists.get(priority).onMessage(message);
- if (count.incrementAndGet() == 1) {
- firstReceipt = duration;
- firstReceiptLatch.countDown();
- LOG.info("First receipt in " + firstReceipt + "ms");
- } else if (count.get() % batchSize == 0) {
- LOG.info("Consumed " + count.get() + " in "
- + batchReceiptAccumulator + "ms" + ", priority:" + priority);
- batchReceiptAccumulator=0;
- }
- maxReceiptTime = Math.max(maxReceiptTime, duration);
- receiptAccumulator += duration;
- batchReceiptAccumulator += duration;
- mark = current;
- }
-
- long getMessageCount() {
- return count.get();
- }
-
- long getFirstReceipt() throws Exception {
- firstReceiptLatch.await(30, TimeUnit.SECONDS);
- return firstReceipt;
- }
-
- public long waitForReceivedLimit(long limit) throws Exception {
- final long expiry = System.currentTimeMillis() + 30*60*1000;
- while (count.get() < limit) {
- if (System.currentTimeMillis() > expiry) {
- throw new RuntimeException("Expired waiting for X messages, " + limit);
- }
- TimeUnit.SECONDS.sleep(2);
- String missing = findFirstMissingMessage();
- if (missing != null) {
- LOG.info("first missing = " + missing);
- throw new RuntimeException("We have a missing message. " + missing);
- }
-
- }
- return receiptAccumulator/(limit/batchSize);
- }
-
- private String findFirstMissingMessage() {
- /*
- MessageId current = new MessageId();
- for (MessageIdList priorityList : messageLists.values()) {
- MessageId previous = null;
- for (String id : priorityList.getMessageIds()) {
- current.setValue(id);
- if (previous == null) {
- previous = current.copy();
- } else {
- if (current.getProducerSequenceId() - 1 != previous.getProducerSequenceId() &&
- current.getProducerSequenceId() - 10 != previous.getProducerSequenceId()) {
- return "Missing next after: " + previous + ", got: " + current;
- } else {
- previous = current.copy();
- }
- }
- }
- }
- return null;
- */
- return null;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/DiscriminatingConsumerLoadTest.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/DiscriminatingConsumerLoadTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/DiscriminatingConsumerLoadTest.java
deleted file mode 100644
index b14ef71..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/DiscriminatingConsumerLoadTest.java
+++ /dev/null
@@ -1,322 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.usecases;
-
-
-import javax.jms.Topic;
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl;
-import org.apache.activemq.JmsConnectionStartStopTest;
-
-
-
-
-/**
- * Test case intended to demonstrate delivery interruption to queue consumers when
- * a JMS selector leaves some messages on the queue (due to use of a JMS Selector)
- *
- * testNonDiscriminatingConsumer() demonstrates proper functionality for consumers that don't use
- * a selector to qualify their input.
- *
- * testDiscriminatingConsumer() demonstrates the failure condition in which delivery to the consumer
- * eventually halts.
- *
- * The expected behavior is for the delivery to the client to be maintained regardless of the depth
- * of the queue, particularly when the messages in the queue do not meet the selector criteria of the
- * client.
- *
- * https://issues.apache.org/activemq/browse/AMQ-2217
- *
- */
-public class DiscriminatingConsumerLoadTest extends TestSupport {
-
- private static final org.apache.commons.logging.Log LOG = org.apache.commons.logging.LogFactory
- .getLog(DiscriminatingConsumerLoadTest.class);
-
- private Connection producerConnection;
- private Connection consumerConnection;
- private int counterSent = 0;
- private int counterReceived = 0;
-
- public static final String JMSTYPE_EATME = "DiscriminatingLoadClient.EatMe";
- public static final String JMSTYPE_IGNOREME = "DiscriminatingLoadClient.IgnoreMe";
-
- private int testSize = 5000; // setting this to a small number will pass all tests
-
-
- protected void setUp() throws Exception {
- super.setUp();
- this.producerConnection = this.createConnection();
- this.consumerConnection = this.createConnection();
- }
-
- /**
- * @see junit.framework.TestCase#tearDown()
- */
- protected void tearDown() throws Exception {
- if (producerConnection != null) {
- producerConnection.close();
- producerConnection = null;
- }
- if (consumerConnection != null) {
- consumerConnection.close();
- consumerConnection = null;
- }
- super.tearDown();
- }
-
- /**
- * Test to check if a single consumer with no JMS selector will receive all intended messages
- *
- * @throws java.lang.Exception
- */
- public void testNonDiscriminatingConsumer() throws Exception {
- consumerConnection = createConnection();
- consumerConnection.start();
- LOG.info("consumerConnection = " +consumerConnection);
-
- try {Thread.sleep(1000); } catch (Exception e) {}
-
- // here we pass in null for the JMS selector
- Consumer consumer = new Consumer(consumerConnection, null);
- Thread consumerThread = new Thread(consumer);
-
- consumerThread.start();
-
- producerConnection = createConnection();
- producerConnection.start();
- LOG.info("producerConnection = " +producerConnection);
-
- try {Thread.sleep(3000); } catch (Exception e) {}
-
- Producer producer = new Producer(producerConnection);
- Thread producerThread = new Thread(producer);
- producerThread.start();
-
- // now that everything is running, let's wait for the consumer thread to finish ...
- consumerThread.join();
- producer.stop = true;
-
- if (consumer.getCount() == testSize )
- LOG.info("test complete .... all messsages consumed!!");
- else
- LOG.info("test failed .... Sent " + (testSize / 1) +
- " messages intended to be consumed ( " + testSize
- + " total), but only consumed " + consumer.getCount());
-
-
- assertTrue("Sent " + testSize + " messages intended to be consumed, but only consumed " + consumer.getCount(),
- (consumer.getCount() == testSize ));
- assertFalse("Delivery of messages to consumer was halted during this test", consumer.deliveryHalted());
-
-
- }
-
- /**
- * Test to check if a single consumer with a JMS selector will receive all intended messages
- *
- * @throws java.lang.Exception
- */
- public void testDiscriminatingConsumer() throws Exception {
-
- consumerConnection = createConnection();
- consumerConnection.start();
- LOG.info("consumerConnection = " +consumerConnection);
-
- try {Thread.sleep(1000); } catch (Exception e) {}
-
- // here we pass the JMS selector we intend to consume
- Consumer consumer = new Consumer(consumerConnection, JMSTYPE_EATME);
- Thread consumerThread = new Thread(consumer);
-
- consumerThread.start();
-
- producerConnection = createConnection();
- producerConnection.start();
- LOG.info("producerConnection = " +producerConnection);
-
- try {Thread.sleep(3000); } catch (Exception e) {}
-
- Producer producer = new Producer(producerConnection);
- Thread producerThread = new Thread(producer);
- producerThread.start();
-
- // now that everything is running, let's wait for the consumer thread to finish ...
- consumerThread.join();
- producer.stop = true;
-
- if (consumer.getCount() == (testSize / 2))
- {
- LOG.info("test complete .... all messsages consumed!!");
- }
- else
- {
- LOG.info("test failed .... Sent " + testSize
- + " original messages, only half of which (" + (testSize / 2) +
- ") were intended to be consumed: consumer paused at: " + consumer.getCount());
- //System.out.println("test failed .... Sent " + testSize
- //+ " original messages, only half of which (" + (testSize / 2) +
- // ") were intended to be consumed: consumer paused at: " + consumer.getCount());
- }
- assertTrue("Sent " + testSize + " original messages, only half of which (" + (testSize / 2) +
- ") were intended to be consumed: consumer paused at: " + consumer.getCount(),
- (consumer.getCount() == (testSize / 2)));
- assertTrue("Delivery of messages to consumer was halted during this test as it only wants half",
- consumer.deliveryHalted());
- }
-
- /**
- * Helper class that will publish 2 * testSize messages. The messages will be distributed evenly
- * between the following two JMS types:
- *
- * @see JMSTYPE_INTENDED_FOR_CONSUMPTION
- * @see JMSTYPE_NOT_INTENDED_FOR_CONSUMPTION
- *
- */
- private class Producer extends Thread
- {
- private int counterSent = 0;
- private Connection connection = null;
- public boolean stop = false;
-
- public Producer(Connection connection)
- {
- this.connection = connection;
- }
-
- public void run() {
- try {
- final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- final Topic queue = session.createTopic("test");
-
- // wait for 10 seconds to allow consumer.receive to be run
- // first
- Thread.sleep(10000);
- MessageProducer producer = session.createProducer(queue);
-
- while (!stop && (counterSent < testSize))
- {
- // first send a message intended to be consumed ....
- TextMessage message = session.createTextMessage("*** Ill ....... Ini ***"); // alma mater ...
- message.setJMSType(JMSTYPE_EATME);
- //LOG.info("sending .... JMSType = " + message.getJMSType());
- producer.send(message,DeliveryMode.NON_PERSISTENT,0,1800000);
-
- counterSent++;
-
- // now send a message intended to be consumed by some other consumer in the the future
- // ... we expect these messages to accrue in the queue
- message = session.createTextMessage("*** Ill ....... Ini ***"); // alma mater ...
- message.setJMSType(JMSTYPE_IGNOREME);
- //LOG.info("sending .... JMSType = " + message.getJMSType());
- producer.send(message,DeliveryMode.NON_PERSISTENT,0,1800000);
-
- counterSent++;
- }
-
- session.close();
-
- } catch (Exception e) {
- e.printStackTrace();
- }
- LOG.info("producer thread complete ... " + counterSent + " messages sent to the queue");
- }
-
- public int getCount()
- {
- return this.counterSent;
- }
- }
-
- /**
- * Helper class that will consume messages from the queue based on the supplied JMS selector.
- * Thread will stop after the first receive(..) timeout, or once all expected messages have
- * been received (see testSize). If the thread stops due to a timeout, it is experiencing the
- * delivery pause that is symptomatic of a bug in the broker.
- */
- private class Consumer extends Thread
- {
- protected int counterReceived = 0;
- private Connection connection = null;
- private String jmsSelector = null;
- private boolean deliveryHalted = false;
-
- public Consumer(Connection connection, String jmsSelector)
- {
- this.connection = connection;
- this.jmsSelector = jmsSelector;
- }
-
- public void run() {
- boolean testComplete = false;
- try {
- Session session = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- final Topic queue = session.createTopic("test");
- MessageConsumer consumer = null;
- if (null != this.jmsSelector)
- {
- consumer = session.createConsumer(queue, "JMSType='" + this.jmsSelector + "'");
- }
- else
- {
- consumer = session.createConsumer(queue);
- }
-
- while (!deliveryHalted && (counterReceived < testSize))
- {
- TextMessage result = (TextMessage) consumer.receive(30000);
- if (result != null) {
- counterReceived++;
- //System.out.println("consuming .... JMSType = " + result.getJMSType()
- // + " received = " + counterReceived);
- LOG.info("consuming .... JMSType = " + result.getJMSType()
- + " received = " + counterReceived);
- } else
- {
- LOG.info("consuming .... timeout while waiting for a message ."
- + ".. broker must have stopped delivery ... received = " + counterReceived);
- deliveryHalted = true;
- }
- }
- session.close();
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- }
-
- public int getCount()
- {
- return this.counterReceived;
- }
-
- public boolean deliveryHalted()
- {
- return this.deliveryHalted;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/DispatchMultipleConsumersTest.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/DispatchMultipleConsumersTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/DispatchMultipleConsumersTest.java
deleted file mode 100644
index fa79d69..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/DispatchMultipleConsumersTest.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.usecases;
-
-import javax.jms.Topic;
-import java.util.concurrent.CountDownLatch;
-
-import org.apache.hedwig.JmsTestBase;
-import org.apache.hedwig.jms.SessionImpl;
-import java.util.concurrent.atomic.AtomicInteger;
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import junit.framework.TestCase;
-import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl;
-
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class DispatchMultipleConsumersTest extends JmsTestBase {
- private final static Logger logger = LoggerFactory.getLogger(DispatchMultipleConsumersTest.class);
- Destination dest;
- String destinationName = "TEST.Q";
- String msgStr = "Test text message";
- int messagesPerThread = 20;
- int producerThreads = 50;
- int consumerCount = 2;
- AtomicInteger sentCount;
- AtomicInteger consumedCount;
- CountDownLatch producerLatch;
- CountDownLatch consumerLatch;
- String userName = "";
- String password = "";
-
- @Override
- protected void setUp() throws Exception {
- super.setUp();
- dest = SessionImpl.asTopic(destinationName);
- }
-
- private void resetCounters() {
- sentCount = new AtomicInteger(0);
- consumedCount = new AtomicInteger(0);
- producerLatch = new CountDownLatch(producerThreads);
- consumerLatch = new CountDownLatch(consumerCount);
- }
-
- public void testDispatch1() {
- for (int i = 1; i <= 5; i++) {
- resetCounters();
- dispatch();
- assertEquals("Incorrect messages in Iteration " + i, sentCount.get() * consumerCount, consumedCount.get());
- }
- }
-
- private void dispatch() {
- startConsumers();
- startProducers();
- try {
- producerLatch.await();
- consumerLatch.await();
- } catch (InterruptedException e) {
- fail("test interrupted!");
- }
- }
-
- private void startConsumers() {
- HedwigConnectionFactoryImpl connFactory = new HedwigConnectionFactoryImpl();
- Connection conn;
- try {
- conn = connFactory.createConnection(userName, password);
- conn.start();
- for (int i = 0; i < consumerCount; i++) {
- ConsumerThread th = new ConsumerThread(conn, "ConsumerThread"+i);
- th.start();
- }
- } catch (JMSException e) {
- logger.error("Failed to start consumers", e);
- }
- }
-
- private void startProducers() {
- HedwigConnectionFactoryImpl connFactory = new HedwigConnectionFactoryImpl();
- for (int i = 0; i < producerThreads; i++) {
- Thread th = new ProducerThread(connFactory, messagesPerThread, "ProducerThread"+i);
- th.start();
- }
- }
-
- private class ConsumerThread extends Thread {
- private Session session;
- private MessageConsumer consumer;
-
- public ConsumerThread(Connection conn, String name) {
- super();
- this.setName(name);
- logger.trace("Created new consumer thread:" + name);
- try {
- session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- consumer = session.createConsumer(dest);
- } catch (JMSException e) {
- logger.error("Failed to start consumer thread:" + name, e);
- }
- }
-
- @Override
- public void run() {
- int msgCount = 0;
- int nullCount = 0;
- while (true) {
- try {
- Message msg = consumer.receive(200);
- if (msg == null) {
- if (producerLatch.getCount() > 0) {
- continue;
- }
- nullCount++;
- if (nullCount > 10) {
- //assume that we are not getting any more messages
- break;
- } else {
- continue;
- }
- } else {
- nullCount = 0;
- }
- // Thread.sleep(100);
- if (logger.isTraceEnabled()) {
- logger.trace("Message received:" + msg.getJMSMessageID());
- }
- msgCount++;
- } catch (JMSException e) {
- logger.error("Failed to consume:", e);
- /*
- } catch (InterruptedException e) {
- logger.error("Interrupted!", e);
- */
- }
- }
- try {
- consumer.close();
- } catch (JMSException e) {
- logger.error("Failed to close consumer " + getName(), e);
- }
- consumedCount.addAndGet(msgCount);
- consumerLatch.countDown();
- logger.trace("Consumed " + msgCount + " messages using thread " + getName());
- }
- }
-
- private class ProducerThread extends Thread {
- private int count;
- private Connection conn;
- private Session session;
- private MessageProducer producer;
-
- public ProducerThread(HedwigConnectionFactoryImpl connFactory, int count, String name) {
- super();
- this.count = count;
- this.setName(name);
- logger.trace("Created new producer thread:" + name);
- try {
- conn = connFactory.createConnection();
- conn.start();
- session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- producer = session.createProducer(dest);
- } catch (JMSException e) {
- logger.error("Failed to start producer thread:" + name, e);
- }
- }
-
- @Override
- public void run() {
- int i = 0;
- try {
- for (; i < count; i++) {
- producer.send(session.createTextMessage(msgStr));
- // Thread.sleep(500);
- }
- conn.close();
- } catch (JMSException e) {
- logger.error(e.getMessage(), e);
- /*
- } catch (InterruptedException e) {
- logger.error("Interrupted!", e);
- */
- }
- sentCount.addAndGet(i);
- producerLatch.countDown();
- if (logger.isTraceEnabled()) {
- logger.trace("Sent " + i + " messages from thread " + getName());
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/DurableConsumerCloseAndReconnectTest.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/DurableConsumerCloseAndReconnectTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/DurableConsumerCloseAndReconnectTest.java
deleted file mode 100644
index c4fa74d..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/DurableConsumerCloseAndReconnectTest.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.usecases;
-
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-
-import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl;
-import org.apache.activemq.test.TestSupport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class DurableConsumerCloseAndReconnectTest extends TestSupport {
- protected static final long RECEIVE_TIMEOUT = 5000L;
- private static final Logger LOG = LoggerFactory.getLogger(DurableConsumerCloseAndReconnectTest.class);
-
- protected Connection connection;
- private Session session;
- private MessageConsumer consumer;
- private MessageProducer producer;
- private Destination destination;
- private int messageCount;
-
- @Override
- protected void setUp() throws Exception {
- super.setUp();
- deleteAllMessages();
- }
-
- @Override
- protected void tearDown() throws Exception {
- super.tearDown();
- deleteAllMessages();
- }
-
- private void deleteAllMessages() throws Exception {
- HedwigConnectionFactoryImpl fac = new HedwigConnectionFactoryImpl();
- Connection dummyConnection = fac.createConnection();
- dummyConnection.start();
- dummyConnection.close();
- }
-
- protected HedwigConnectionFactoryImpl createConnectionFactory() throws Exception {
- return new HedwigConnectionFactoryImpl();
- }
-
- public void testCreateDurableConsumerCloseThenReconnect() throws Exception {
- // force the server to stay up across both connection tests
- Connection dummyConnection = createConnection();
- dummyConnection.start();
-
- consumeMessagesDeliveredWhileConsumerClosed();
-
- dummyConnection.close();
-
- // now lets try again without one connection open
- consumeMessagesDeliveredWhileConsumerClosed();
- }
-
- protected void consumeMessagesDeliveredWhileConsumerClosed() throws Exception {
- makeConsumer();
- closeConsumer();
-
- publish();
-
- // wait a few moments for the close to really occur
- Thread.sleep(1000);
-
- makeConsumer();
-
- Message message = consumer.receive(RECEIVE_TIMEOUT);
- assertTrue("Should have received a message!", message != null);
-
- closeConsumer();
-
- LOG.info("Now lets create the consumer again and because we didn't ack, we should get it again");
- makeConsumer();
-
- message = consumer.receive(RECEIVE_TIMEOUT);
- assertTrue("Should have received a message!", message != null);
- message.acknowledge();
-
- closeConsumer();
-
- LOG.info("Now lets create the consumer again and because we did ack, we should not get it again");
- makeConsumer();
-
- message = consumer.receive(2000);
- assertTrue("Should have no more messages left!", message == null);
-
- closeConsumer();
-
- LOG.info("Lets publish one more message now");
- publish();
-
- makeConsumer();
- message = consumer.receive(RECEIVE_TIMEOUT);
- assertTrue("Should have received a message!", message != null);
- message.acknowledge();
-
- closeConsumer();
- }
-
- protected void publish() throws Exception {
- connection = createConnection();
- connection.start();
-
- session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- destination = createDestination();
-
- producer = session.createProducer(destination);
- producer.setDeliveryMode(DeliveryMode.PERSISTENT);
- TextMessage msg = session.createTextMessage("This is a test: " + messageCount++);
- producer.send(msg);
-
- producer.close();
- producer = null;
- closeSession();
- }
-
- protected Destination createDestination() throws JMSException {
- if (isTopic()) {
- return session.createTopic(getSubject());
- } else {
- return session.createTopic(getSubject());
- }
- }
-
- protected boolean isTopic() {
- return true;
- }
-
- protected void closeConsumer() throws JMSException {
- consumer.close();
- consumer = null;
- closeSession();
- }
-
- protected void closeSession() throws JMSException {
- session.close();
- session = null;
- connection.close();
- connection = null;
- }
-
- protected void makeConsumer() throws Exception {
- String durableName = getName();
- String clientID = getSubject();
- LOG.info("Creating a durable subscribe for clientID: " + clientID + " and durable name: " + durableName);
- createSession(clientID);
- consumer = createConsumer(durableName);
- }
-
- private MessageConsumer createConsumer(String durableName) throws JMSException {
- if (destination instanceof Topic) {
- return session.createDurableSubscriber((Topic)destination, durableName);
- } else {
- return session.createConsumer(destination);
- }
- }
-
- protected void createSession(String clientID) throws Exception {
- connection = createConnection();
- connection.setClientID(clientID);
- connection.start();
-
- session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- destination = createDestination();
- }
-}