You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2016/03/16 04:44:47 UTC
[37/49] bookkeeper git commit: BOOKKEEPER-769: Remove the Hedwig Code
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/JmsTransactionTestSupport.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsTransactionTestSupport.java b/hedwig-client-jms/src/test/java/org/apache/activemq/JmsTransactionTestSupport.java
deleted file mode 100644
index 23a9121..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsTransactionTestSupport.java
+++ /dev/null
@@ -1,692 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.ObjectMessage;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-
-
-import org.apache.activemq.test.JmsResourceProvider;
-import org.apache.activemq.test.TestSupport;
-import org.apache.hedwig.jms.spi.HedwigConnectionImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public abstract class JmsTransactionTestSupport extends TestSupport implements MessageListener {
-
- private static final Logger LOG = LoggerFactory.getLogger(JmsTransactionTestSupport.class);
- private static final int MESSAGE_COUNT = 5;
- private static final String MESSAGE_TEXT = "message";
-
- protected ConnectionFactory connectionFactory;
- protected Connection connection;
- protected Session session;
- protected MessageConsumer consumer;
- protected MessageProducer producer;
- protected JmsResourceProvider resourceProvider;
- protected Destination destination;
- protected int batchCount = 10;
- // protected int batchSize = 20;
-
- // for message listener test
- private List<Message> unackMessages = new ArrayList<Message>(MESSAGE_COUNT);
- private List<Message> ackMessages = new ArrayList<Message>(MESSAGE_COUNT);
- private boolean resendPhase;
-
- public JmsTransactionTestSupport() {
- super();
- }
-
- public JmsTransactionTestSupport(String name) {
- super(name);
- }
-
- /*
- * (non-Javadoc)
- *
- * @see junit.framework.TestCase#setUp()
- */
- protected void setUp() throws Exception {
- super.setUp();
- resourceProvider = getJmsResourceProvider();
- topic = resourceProvider.isTopic();
- // We will be using transacted sessions.
- setSessionTransacted();
- connectionFactory = newConnectionFactory();
- reconnect();
- }
-
- protected void setSessionTransacted() {
- resourceProvider.setTransacted(true);
- }
-
- protected ConnectionFactory newConnectionFactory() throws Exception {
- return resourceProvider.createConnectionFactory();
- }
-
- protected void beginTx() throws Exception {
- //no-op for local tx
- }
-
- protected void commitTx() throws Exception {
- session.commit();
- }
-
- protected void rollbackTx() throws Exception {
- session.rollback();
- }
-
- /*
- * (non-Javadoc)
- *
- * @see junit.framework.TestCase#tearDown()
- */
- protected void tearDown() throws Exception {
- LOG.info("Closing down connection");
-
- session.close();
- session = null;
- connection.close();
- connection = null;
- LOG.info("Connection closed.");
- super.tearDown();
- }
-
- protected abstract JmsResourceProvider getJmsResourceProvider();
-
- /**
- * Sends a batch of messages and validates that the messages are received.
- *
- * @throws Exception
- */
- public void testSendReceiveTransactedBatches() throws Exception {
-
- final int batchSize = ((HedwigConnectionImpl) connection).getHedwigClientConfig()
- .getMaximumOutstandingMessages() - 1;
-
- TextMessage message = session.createTextMessage("Batch Message");
- for (int j = 0; j < batchCount; j++) {
- LOG.info("Producing bacth " + j + " of " + batchSize + " messages");
-
- beginTx();
- for (int i = 0; i < batchSize; i++) {
- producer.send(message);
- }
- messageSent();
- commitTx();
- LOG.info("Consuming bacth " + j + " of " + batchSize + " messages");
-
- beginTx();
- for (int i = 0; i < batchSize; i++) {
- message = (TextMessage)consumer.receive(1000 * 5);
- assertNotNull("Received only " + i + " messages in batch " + j, message);
- assertEquals("Batch Message", message.getText());
- }
-
- commitTx();
- }
- }
-
- protected void messageSent() throws Exception {
- }
-
- /**
- * Sends a batch of messages and validates that the rollbacked message was
- * not consumed.
- *
- * @throws Exception
- */
- public void testSendRollback() throws Exception {
- Message[] outbound = new Message[] {session.createTextMessage("First Message"),
- session.createTextMessage("Second Message")};
-
- // sends a message
- beginTx();
- producer.send(outbound[0]);
- commitTx();
-
- // sends a message that gets rollbacked
- beginTx();
- producer.send(session.createTextMessage("I'm going to get rolled back."));
- rollbackTx();
-
- // sends a message
- beginTx();
- producer.send(outbound[1]);
- commitTx();
-
- // receives the first message
- beginTx();
- ArrayList<Message> messages = new ArrayList<Message>();
- LOG.info("About to consume message 1");
- Message message = consumer.receive(1000);
- messages.add(message);
- LOG.info("Received: " + message);
-
- // receives the second message
- LOG.info("About to consume message 2");
- message = consumer.receive(4000);
- messages.add(message);
- LOG.info("Received: " + message);
-
- // validates that the rollbacked was not consumed
- commitTx();
- Message inbound[] = new Message[messages.size()];
- messages.toArray(inbound);
- assertTextMessagesEqual("Rollback did not work.", outbound, inbound);
- }
-
- /**
- * spec section 3.6 acking a message with automation acks has no effect.
- * @throws Exception
- */
- public void testAckMessageInTx() throws Exception {
- Message[] outbound = new Message[] {session.createTextMessage("First Message")};
-
- // sends a message
- beginTx();
- producer.send(outbound[0]);
- outbound[0].acknowledge();
- commitTx();
- outbound[0].acknowledge();
-
- // receives the first message
- beginTx();
- ArrayList<Message> messages = new ArrayList<Message>();
- LOG.info("About to consume message 1");
- Message message = consumer.receive(1000);
- messages.add(message);
- LOG.info("Received: " + message);
-
- // validates that the rollbacked was not consumed
- commitTx();
- Message inbound[] = new Message[messages.size()];
- messages.toArray(inbound);
- assertTextMessagesEqual("Message not delivered.", outbound, inbound);
- }
-
- /**
- * Sends a batch of messages and validates that the message sent before
- * session close is not consumed.
- *
- * This test only works with local transactions, not xa.
- * @throws Exception
- */
- public void testSendSessionClose() throws Exception {
- Message[] outbound = new Message[] {session.createTextMessage("First Message"),
- session.createTextMessage("Second Message")};
-
- // sends a message
- beginTx();
- producer.send(outbound[0]);
- commitTx();
-
- // sends a message that gets rollbacked
- beginTx();
- producer.send(session.createTextMessage("I'm going to get rolled back."));
- consumer.close();
-
- reconnectSession();
-
- // sends a message
- producer.send(outbound[1]);
- commitTx();
-
- // receives the first message
- ArrayList<Message> messages = new ArrayList<Message>();
- LOG.info("About to consume message 1");
- beginTx();
- Message message = consumer.receive(1000);
- messages.add(message);
- LOG.info("Received: " + message);
-
- // receives the second message
- LOG.info("About to consume message 2");
- message = consumer.receive(4000);
- messages.add(message);
- LOG.info("Received: " + message);
-
- // validates that the rollbacked was not consumed
- commitTx();
- Message inbound[] = new Message[messages.size()];
- messages.toArray(inbound);
- assertTextMessagesEqual("Rollback did not work.", outbound, inbound);
- }
-
- /**
- * Sends a batch of messages and validates that the message sent before
- * session close is not consumed.
- *
- * @throws Exception
- */
- public void testSendSessionAndConnectionClose() throws Exception {
- Message[] outbound = new Message[] {session.createTextMessage("First Message"),
- session.createTextMessage("Second Message")};
-
- // sends a message
- beginTx();
- producer.send(outbound[0]);
- commitTx();
-
- // sends a message that gets rollbacked
- beginTx();
- producer.send(session.createTextMessage("I'm going to get rolled back."));
- consumer.close();
- session.close();
-
- reconnect();
-
- // sends a message
- beginTx();
- producer.send(outbound[1]);
- commitTx();
-
- // receives the first message
- ArrayList<Message> messages = new ArrayList<Message>();
- LOG.info("About to consume message 1");
- beginTx();
- Message message = consumer.receive(1000);
- messages.add(message);
- LOG.info("Received: " + message);
-
- // receives the second message
- LOG.info("About to consume message 2");
- message = consumer.receive(4000);
- messages.add(message);
- LOG.info("Received: " + message);
-
- // validates that the rollbacked was not consumed
- commitTx();
- Message inbound[] = new Message[messages.size()];
- messages.toArray(inbound);
- assertTextMessagesEqual("Rollback did not work.", outbound, inbound);
- }
-
- /**
- * Sends a batch of messages and validates that the rollbacked message was
- * redelivered.
- *
- * @throws Exception
- */
- public void testReceiveRollback() throws Exception {
- Message[] outbound = new Message[] {session.createTextMessage("First Message"),
- session.createTextMessage("Second Message")};
-
- // lets consume any outstanding messages from prev test runs
- beginTx();
- while (consumer.receive(1000) != null) {
- }
- commitTx();
-
- // sent both messages
- beginTx();
- producer.send(outbound[0]);
- producer.send(outbound[1]);
- commitTx();
-
- LOG.info("Sent 0: " + outbound[0]);
- LOG.info("Sent 1: " + outbound[1]);
-
- ArrayList<Message> messages = new ArrayList<Message>();
- beginTx();
- Message message = consumer.receive(1000);
- messages.add(message);
- assertEquals(outbound[0], message);
- commitTx();
-
- // rollback so we can get that last message again.
- beginTx();
- message = consumer.receive(1000);
- assertNotNull(message);
- assertEquals(outbound[1], message);
- rollbackTx();
-
- // Consume again.. the prev message should
- // get redelivered.
- beginTx();
- message = consumer.receive(5000);
- assertNotNull("Should have re-received the message again!", message);
- messages.add(message);
- commitTx();
-
- Message inbound[] = new Message[messages.size()];
- messages.toArray(inbound);
- assertTextMessagesEqual("Rollback did not work", outbound, inbound);
- }
-
- /**
- * Sends a batch of messages and validates that the rollbacked message was
- * redelivered.
- *
- * @throws Exception
- */
- public void testReceiveTwoThenRollback() throws Exception {
- TextMessage[] outbound = new TextMessage[] {session.createTextMessage("First Message"),
- session.createTextMessage("Second Message")};
-
- // lets consume any outstanding messages from prev test runs
- beginTx();
- while (consumer.receive(1000) != null) {
- }
- commitTx();
-
- //
- beginTx();
- producer.send(outbound[0]);
- producer.send(outbound[1]);
- commitTx();
-
- LOG.info("Sent 0: " + outbound[0]);
- LOG.info("Sent 1: " + outbound[1]);
-
- ArrayList<Message> messages = new ArrayList<Message>();
- beginTx();
- Message message = consumer.receive(1000);
- assert message instanceof TextMessage;
- assertEquals(outbound[0].getText(), ((TextMessage) message).getText());
-
- message = consumer.receive(1000);
- assertNotNull(message);
- assert message instanceof TextMessage;
- assertEquals(outbound[1].getText(), ((TextMessage) message).getText());
- rollbackTx();
-
- // Consume again.. the prev message should
- // get redelivered.
- beginTx();
- message = consumer.receive(5000);
- assertNotNull("Should have re-received the first message again!", message);
- messages.add(message);
- assert message instanceof TextMessage;
- assertEquals(outbound[0].getText(), ((TextMessage) message).getText());
- message = consumer.receive(5000);
- assertNotNull("Should have re-received the second message again!", message);
- messages.add(message);
- assert message instanceof TextMessage;
- assertEquals(outbound[1].getText(), ((TextMessage) message).getText());
-
- assertNull(consumer.receiveNoWait());
- commitTx();
-
- Message inbound[] = new Message[messages.size()];
- messages.toArray(inbound);
- assertTextMessagesEqual("Rollback did not work", outbound, inbound);
- }
-
- /**
- * Sends a batch of messages and validates that the rollbacked message was
- * not consumed.
- *
- * @throws Exception
- */
- public void testSendReceiveWithPrefetchOne() throws Exception {
- Message[] outbound = new Message[] {session.createTextMessage("First Message"),
- session.createTextMessage("Second Message"),
- session.createTextMessage("Third Message"),
- session.createTextMessage("Fourth Message")};
-
- beginTx();
- for (int i = 0; i < outbound.length; i++) {
- // sends a message
- producer.send(outbound[i]);
- }
- commitTx();
-
- // receives the first message
- beginTx();
- for (int i = 0; i < outbound.length; i++) {
- LOG.info("About to consume message 1");
- Message message = consumer.receive(1000);
- assertNotNull(message);
- LOG.info("Received: " + message);
- }
-
- // validates that the rollbacked was not consumed
- commitTx();
- }
-
- /**
- * Perform the test that validates if the rollbacked message was redelivered
- * multiple times.
- *
- * @throws Exception
- */
- public void testReceiveTwoThenRollbackManyTimes() throws Exception {
- for (int i = 0; i < 5; i++) {
- testReceiveTwoThenRollback();
- }
- }
-
- /**
- * Sends a batch of messages and validates that the rollbacked message was
- * not consumed. This test differs by setting the message prefetch to one.
- *
- * @throws Exception
- */
- public void testSendRollbackWithPrefetchOfOne() throws Exception {
- testSendRollback();
- }
-
- /**
- * Sends a batch of messages and and validates that the rollbacked message
- * was redelivered. This test differs by setting the message prefetch to
- * one.
- *
- * @throws Exception
- */
- public void testReceiveRollbackWithPrefetchOfOne() throws Exception {
- testReceiveRollback();
- }
-
- /**
- * Tests if the messages can still be received if the consumer is closed
- * (session is not closed).
- *
- * @throws Exception see http://jira.codehaus.org/browse/AMQ-143
- */
- public void testCloseConsumerBeforeCommit() throws Exception {
- TextMessage[] outbound = new TextMessage[] {session.createTextMessage("First Message"),
- session.createTextMessage("Second Message")};
-
- // lets consume any outstanding messages from prev test runs
- beginTx();
- while (consumer.receiveNoWait() != null) {
- }
-
- commitTx();
-
- // sends the messages
- beginTx();
- producer.send(outbound[0]);
- producer.send(outbound[1]);
- commitTx();
- LOG.info("Sent 0: " + outbound[0]);
- LOG.info("Sent 1: " + outbound[1]);
-
- beginTx();
- TextMessage message = (TextMessage)consumer.receive(1000);
- assertEquals(outbound[0].getText(), message.getText());
- // Close the consumer before the commit. This should not cause the
- // received message
- // to rollback.
- consumer.close();
- commitTx();
- reconnectSession();
-
- // Create a new consumer
- consumer = resourceProvider.createConsumer(session, destination);
- LOG.info("Created consumer: " + consumer);
-
- beginTx();
- message = (TextMessage)consumer.receive(1000);
- assert null != message;
- assertEquals(outbound[1].getText(), message.getText());
- commitTx();
- }
-
- public void testChangeMutableObjectInObjectMessageThenRollback() throws Exception {
- ArrayList<String> list = new ArrayList<String>();
- list.add("First");
- Message outbound = session.createObjectMessage(list);
- outbound.setStringProperty("foo", "abc");
-
- beginTx();
- producer.send(outbound);
- commitTx();
-
- LOG.info("About to consume message 1");
- beginTx();
- Message message = consumer.receive(5000);
-
- List<String> body = assertReceivedObjectMessageWithListBody(message);
-
- // now lets try mutate it
- try {
- message.setStringProperty("foo", "def");
- fail("Cannot change properties of the object!");
- } catch (JMSException e) {
- LOG.info("Caught expected exception: " + e, e);
- }
- body.clear();
- body.add("This should never be seen!");
- rollbackTx();
-
- beginTx();
- message = consumer.receive(5000);
- List<String> secondBody = assertReceivedObjectMessageWithListBody(message);
- assertNotSame("Second call should return a different body", secondBody, body);
- commitTx();
- }
-
- @SuppressWarnings("unchecked")
- protected List<String> assertReceivedObjectMessageWithListBody(Message message) throws JMSException {
- assertNotNull("Should have received a message!", message);
- assertEquals("foo header", "abc", message.getStringProperty("foo"));
-
- assertTrue("Should be an object message but was: " + message, message instanceof ObjectMessage);
- ObjectMessage objectMessage = (ObjectMessage)message;
- List<String> body = (List<String>)objectMessage.getObject();
- LOG.info("Received body: " + body);
-
- assertEquals("Size of list should be 1", 1, body.size());
- assertEquals("element 0 of list", "First", body.get(0));
- return body;
- }
-
- /**
- * Recreates the connection.
- *
- * @throws JMSException
- */
- protected void reconnect() throws Exception {
-
- if (connection != null) {
- // Close the prev connection.
- connection.close();
- }
- session = null;
- connection = resourceProvider.createConnection(connectionFactory);
- reconnectSession();
- connection.start();
- }
-
- /**
- * Recreates the connection.
- *
- * @throws JMSException
- */
- protected void reconnectSession() throws JMSException {
- if (session != null) {
- session.close();
- }
-
- session = resourceProvider.createSession(connection);
- destination = resourceProvider.createDestination(session, getSubject());
- producer = resourceProvider.createProducer(session, destination);
- consumer = resourceProvider.createConsumer(session, destination);
- }
-
- //This test won't work with xa tx so no beginTx() has been added.
- public void testMessageListener() throws Exception {
- consumer.setMessageListener(this);
- // send messages
- for (int i = 0; i < MESSAGE_COUNT; i++) {
- producer.send(session.createTextMessage(MESSAGE_TEXT + i));
- }
- commitTx();
- // wait receive
- waitReceiveUnack();
- assertEquals(unackMessages.size(), MESSAGE_COUNT);
- // resend phase
- waitReceiveAck();
- assertEquals(ackMessages.size(), MESSAGE_COUNT);
- // should no longer re-receive
- consumer.setMessageListener(null);
- assertNull(consumer.receive(500));
- reconnect();
- }
-
- public void onMessage(Message message) {
- if (!resendPhase) {
- unackMessages.add(message);
- if (unackMessages.size() == MESSAGE_COUNT) {
- try {
- rollbackTx();
- resendPhase = true;
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- } else {
- ackMessages.add(message);
- if (ackMessages.size() == MESSAGE_COUNT) {
- try {
- commitTx();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- }
-
- private void waitReceiveUnack() throws Exception {
- for (int i = 0; i < 100 && !resendPhase; i++) {
- Thread.sleep(100);
- }
- assertTrue(resendPhase);
- }
-
- private void waitReceiveAck() throws Exception {
- for (int i = 0; i < 100 && ackMessages.size() < MESSAGE_COUNT; i++) {
- Thread.sleep(100);
- }
- assertFalse(ackMessages.size() < MESSAGE_COUNT);
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/LoadTestBurnIn.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/LoadTestBurnIn.java b/hedwig-client-jms/src/test/java/org/apache/activemq/LoadTestBurnIn.java
deleted file mode 100644
index 5593daf..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/LoadTestBurnIn.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq;
-
-import java.io.IOException;
-
-import org.apache.hedwig.jms.MessagingSessionFacade;
-import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.BytesMessage;
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.DeliveryMode;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.Topic;
-
-import junit.framework.Test;
-
-
-
-import javax.jms.Destination;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Small burn test moves sends a moderate amount of messages through the broker,
- * to checking to make sure that the broker does not lock up after a while of
- * sustained messaging.
- */
-public class LoadTestBurnIn extends JmsTestSupport {
- private static final transient Logger LOG = LoggerFactory.getLogger(LoadTestBurnIn.class);
-
- public Destination destination;
- public int deliveryMode;
- public MessagingSessionFacade.DestinationType destinationType;
- public boolean durableConsumer;
- public int messageCount = 50000;
- public int messageSize = 1024;
-
- public static Test suite() {
- return suite(LoadTestBurnIn.class);
- }
-
- protected void setUp() throws Exception {
- LOG.info("Start: " + getName());
- super.setUp();
- }
-
- protected void tearDown() throws Exception {
- try {
- super.tearDown();
- } catch (Throwable e) {
- e.printStackTrace(System.out);
- } finally {
- LOG.info("End: " + getName());
- }
- }
-
- public static void main(String[] args) {
- junit.textui.TestRunner.run(suite());
- }
-
- protected ConnectionFactory createConnectionFactory() throws URISyntaxException, IOException {
- return new HedwigConnectionFactoryImpl();
- }
-
- public void initCombosForTestSendReceive() {
- addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
- Integer.valueOf(DeliveryMode.PERSISTENT)});
- addCombinationValues("destinationType", new Object[] {MessagingSessionFacade.DestinationType.TOPIC});
- addCombinationValues("durableConsumer", new Object[] {Boolean.TRUE});
- addCombinationValues("messageSize", new Object[] {Integer.valueOf(101), Integer.valueOf(102),
- Integer.valueOf(103), Integer.valueOf(104),
- Integer.valueOf(105), Integer.valueOf(106),
- Integer.valueOf(107), Integer.valueOf(108)});
- }
-
- public void testSendReceive() throws Exception {
-
- // Durable consumer combination is only valid with topics
- if (durableConsumer && destinationType != MessagingSessionFacade.DestinationType.TOPIC) {
- return;
- }
-
- if (null == connection.getClientID()) connection.setClientID(getName());
- connection.start();
-
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- destination = createDestination(session, destinationType);
- MessageConsumer consumer;
- if (durableConsumer) {
- consumer = session.createDurableSubscriber((Topic)destination, "sub1:"
- + System.currentTimeMillis());
- } else {
- consumer = session.createConsumer(destination);
- }
- profilerPause("Ready: ");
-
- final CountDownLatch producerDoneLatch = new CountDownLatch(1);
-
- // Send the messages, async
- new Thread() {
- public void run() {
- Connection connection2 = null;
- try {
- connection2 = factory.createConnection();
- Session session = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = session.createProducer(destination);
- producer.setDeliveryMode(deliveryMode);
- for (int i = 0; i < messageCount; i++) {
- BytesMessage m = session.createBytesMessage();
- m.writeBytes(new byte[messageSize]);
- producer.send(m);
- }
- producer.close();
- } catch (JMSException e) {
- e.printStackTrace();
- } finally {
- safeClose(connection2);
- producerDoneLatch.countDown();
- }
-
- }
- }.start();
-
- // Make sure all the messages were delivered.
- Message message = null;
- for (int i = 0; i < messageCount; i++) {
- message = consumer.receive(5000);
- assertNotNull("Did not get message: " + i, message);
- }
-
- profilerPause("Done: ");
-
- assertNull(consumer.receiveNoWait());
- message.acknowledge();
-
- // Make sure the producer thread finishes.
- assertTrue(producerDoneLatch.await(5, TimeUnit.SECONDS));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java
deleted file mode 100644
index 735d701..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java
+++ /dev/null
@@ -1,193 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq;
-
-import javax.jms.Topic;
-
-import org.apache.hedwig.JmsTestBase;
-import org.apache.hedwig.jms.SessionImpl;
-import java.util.ArrayList;
-import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import junit.framework.TestCase;
-
-import javax.jms.Destination;
-import org.apache.hedwig.jms.message.MessageImpl;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class MessageListenerRedeliveryTest extends JmsTestBase {
-
- private static final Logger LOG = LoggerFactory.getLogger(MessageListenerRedeliveryTest.class);
-
- private Connection connection;
-
- protected void setUp() throws Exception {
- super.setUp();
- connection = createConnection();
- }
-
- /**
- * @see junit.framework.TestCase#tearDown()
- */
- protected void tearDown() throws Exception {
- if (connection != null) {
- connection.close();
- connection = null;
- }
- super.tearDown();
- }
-
- protected Connection createConnection() throws Exception {
- HedwigConnectionFactoryImpl factory = new HedwigConnectionFactoryImpl();
- return factory.createConnection();
- }
-
- private class TestMessageListener implements MessageListener {
-
- public int counter = 0;
- private Session session;
- static final int ROLLBACK_COUNT = 5;
-
- public TestMessageListener(Session session) {
- this.session = session;
- }
-
- public void onMessage(Message message) {
- try {
- LOG.info("Message Received: " + message);
- counter++;
- if (counter < ROLLBACK_COUNT) {
- LOG.info("Message Rollback.");
- session.rollback();
- } else {
- LOG.info("Message Commit.");
- message.acknowledge();
- session.commit();
- }
- } catch (JMSException e) {
- LOG.error("Error when rolling back transaction");
- }
- }
- }
-
- public void testTopicRollbackConsumerListener() throws JMSException {
- connection.start();
-
- Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
- Topic queue = session.createTopic("queue-" + getName());
- MessageProducer producer = createProducer(session, queue);
- Message message = createTextMessage(session);
- MessageConsumer consumer = session.createConsumer(queue);
- TestMessageListener listener = new TestMessageListener(session);
- consumer.setMessageListener(listener);
- producer.send(message);
- session.commit();
-
-
- MessageConsumer mc = (MessageConsumer)consumer;
-
-
- try {
- Thread.sleep(500);
- } catch (InterruptedException e) {
-
- }
-
- assertEquals(TestMessageListener.ROLLBACK_COUNT, listener.counter);
-
- session.close();
- }
-
- public void testTopicSessionListenerExceptionRetry() throws Exception {
- connection.start();
-
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Topic queue = session.createTopic("queue-" + getName());
- Message message = createTextMessage(session, "1");
- MessageConsumer consumer = session.createConsumer(queue);
-
- final int maxDeliveries = 2;
- final CountDownLatch gotMessage = new CountDownLatch(2);
- final AtomicInteger count = new AtomicInteger(0);
- final ArrayList<String> received = new ArrayList<String>();
- consumer.setMessageListener(new MessageListener() {
- public void onMessage(Message message) {
- LOG.info("Message Received: " + message);
- try {
- received.add(((TextMessage) message).getText());
- } catch (JMSException e) {
- e.printStackTrace();
- fail(e.toString());
- }
- if (count.incrementAndGet() < maxDeliveries) {
- throw new RuntimeException(getName() + " force a redelivery");
- }
- // new blood
- count.set(0);
- gotMessage.countDown();
- }
- });
-
- MessageProducer producer = createProducer(session, queue);
- producer.send(message);
- message = createTextMessage(session, "2");
- producer.send(message);
-
- assertTrue("got message before retry expiry", gotMessage.await(20, TimeUnit.SECONDS));
-
- for (int i=0; i<maxDeliveries; i++) {
- assertEquals("got first redelivered: " + i, "1", received.get(i));
- }
- for (int i=maxDeliveries; i<maxDeliveries*2; i++) {
- assertEquals("got first redelivered: " + i, "2", received.get(i));
- }
- session.close();
- }
-
- private TextMessage createTextMessage(Session session, String text) throws JMSException {
- return session.createTextMessage(text);
- }
- private TextMessage createTextMessage(Session session) throws JMSException {
- return session.createTextMessage("Hello");
- }
-
- private MessageProducer createProducer(Session session, Destination queue) throws JMSException {
- MessageProducer producer = session.createProducer(queue);
- producer.setDeliveryMode(getDeliveryMode());
- return producer;
- }
-
- protected int getDeliveryMode() {
- return DeliveryMode.PERSISTENT;
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
deleted file mode 100644
index 002fea0..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq;
-
-import javax.jms.Topic;
-import org.apache.hedwig.jms.SessionImpl;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import junit.framework.Test;
-
-
-
-/**
- * Test cases used to test the JMS message exclusive consumers.
- *
- *
- */
-public class RedeliveryPolicyTest extends JmsTestSupport {
-
- public static Test suite() {
- return suite(RedeliveryPolicyTest.class);
- }
-
- public static void main(String[] args) {
- junit.textui.TestRunner.run(suite());
- }
-
- /**
- * @throws Exception
- */
- public void testExponentialRedeliveryPolicyDelaysDeliveryOnRollback() throws Exception {
-
- connection.start();
- Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
- Topic destination = SessionImpl.asTopic(getName());
- MessageProducer producer = session.createProducer(destination);
-
- MessageConsumer consumer = session.createConsumer(destination);
-
- // Send the messages
- producer.send(session.createTextMessage("1st"));
- producer.send(session.createTextMessage("2nd"));
- session.commit();
-
- TextMessage m;
- m = (TextMessage)consumer.receive(1000);
- assertNotNull(m);
- assertEquals("1st", m.getText());
- session.rollback();
-
- // No delay on first rollback..
- m = (TextMessage)consumer.receive(100);
- assertNotNull(m);
- session.rollback();
-
- m = (TextMessage)consumer.receive(700);
- assertNotNull(m);
- assertEquals("1st", m.getText());
- session.rollback();
-
- // Show re-delivery delay is incrementing exponentially
- m = (TextMessage)consumer.receive(100);
- assertNotNull(m);
- assertEquals("1st", m.getText());
-
- m = (TextMessage)consumer.receive(100);
- assertNotNull(m);
- assertEquals("2nd", m.getText());
- }
-
-
- /**
- * @throws Exception
- */
- public void testInfiniteMaximumNumberOfRedeliveries() throws Exception {
-
- connection.start();
- Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
- Topic destination = SessionImpl.asTopic("TEST");
- MessageProducer producer = session.createProducer(destination);
-
- MessageConsumer consumer = session.createConsumer(destination);
-
- // Send the messages
- producer.send(session.createTextMessage("1st"));
- producer.send(session.createTextMessage("2nd"));
- session.commit();
-
- TextMessage m;
-
- m = (TextMessage)consumer.receive(1000);
- assertNotNull(m);
- assertEquals("1st", m.getText());
- session.rollback();
-
- //we should be able to get the 1st message redelivered until a session.commit is called
- m = (TextMessage)consumer.receive(1000);
- assertNotNull(m);
- assertEquals("1st", m.getText());
- session.rollback();
-
- m = (TextMessage)consumer.receive(1000);
- assertNotNull(m);
- assertEquals("1st", m.getText());
- session.rollback();
-
- m = (TextMessage)consumer.receive(1000);
- assertNotNull(m);
- assertEquals("1st", m.getText());
- session.rollback();
-
- m = (TextMessage)consumer.receive(1000);
- assertNotNull(m);
- assertEquals("1st", m.getText());
- session.rollback();
-
- m = (TextMessage)consumer.receive(1000);
- assertNotNull(m);
- assertEquals("1st", m.getText());
- session.commit();
-
- m = (TextMessage)consumer.receive(1000);
- assertNotNull(m);
- assertEquals("2nd", m.getText());
- session.commit();
-
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/TestSupport.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/TestSupport.java b/hedwig-client-jms/src/test/java/org/apache/activemq/TestSupport.java
deleted file mode 100644
index fd838ab..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/TestSupport.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq;
-
-import javax.jms.Topic;
-import java.io.File;
-import java.io.IOException;
-import java.util.Enumeration;
-import java.util.Map;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.TextMessage;
-
-
-import org.apache.hedwig.jms.SessionImpl;
-import org.apache.hedwig.jms.message.MessageImpl;
-import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl;
-import org.apache.hedwig.jms.spi.HedwigConnectionImpl;
-
-
-/**
- * Useful base class for unit test cases
- */
-public abstract class TestSupport extends CombinationTestSupport {
-
- protected HedwigConnectionFactoryImpl connectionFactory;
- protected boolean topic = true;
-
- protected MessageImpl createMessage() {
- return new MessageImpl(null);
- }
-
- protected Destination createDestination(String subject) {
- if (topic) {
- return SessionImpl.asTopic(subject);
- } else {
- throw new IllegalArgumentException("Queue NOT supported");
- }
- }
-
- protected Destination createDestination() {
- return createDestination(getDestinationString());
- }
-
- /**
- * Returns the name of the destination used in this test case
- */
- protected String getDestinationString() {
- return getClass().getName() + "." + getName(true);
- }
-
- /**
- * @param messsage
- * @param firstSet
- * @param secondSet
- */
- protected void assertTextMessagesEqual(String messsage, Message[] firstSet, Message[] secondSet)
- throws JMSException {
- assertEquals("Message count does not match: " + messsage, firstSet.length, secondSet.length);
- for (int i = 0; i < secondSet.length; i++) {
- TextMessage m1 = (TextMessage)firstSet[i];
- TextMessage m2 = (TextMessage)secondSet[i];
- assertFalse("Message " + (i + 1) + " did not match : " + messsage + ": expected {" + m1
- + "}, but was {" + m2 + "}", m1 == null ^ m2 == null);
- assertEquals("Message " + (i + 1) + " did not match: " + messsage + ": expected {" + m1
- + "}, but was {" + m2 + "}", m1.getText(), m2.getText());
- }
- }
-
- protected HedwigConnectionFactoryImpl createConnectionFactory() throws Exception {
- return new HedwigConnectionFactoryImpl();
- }
-
- /**
- * Factory method to create a new connection
- */
- protected Connection createConnection() throws Exception {
- return createConnection(true);
- }
-
- protected Connection createConnection(boolean setClientId) throws Exception {
- HedwigConnectionImpl connection = getConnectionFactory().createConnection();
- if (setClientId) connection.setClientID(getName());
- return connection;
- }
-
- public HedwigConnectionFactoryImpl getConnectionFactory() throws Exception {
- if (connectionFactory == null) {
- connectionFactory = createConnectionFactory();
- assertTrue("Should have created a connection factory!", connectionFactory != null);
- }
- return connectionFactory;
- }
-
- protected String getConsumerSubject() {
- return getSubject();
- }
-
- protected String getProducerSubject() {
- return getSubject();
- }
-
- protected String getSubject() {
- return getName();
- }
-
- public static void recursiveDelete(File f) {
- if (f.isDirectory()) {
- File[] files = f.listFiles();
- for (int i = 0; i < files.length; i++) {
- recursiveDelete(files[i]);
- }
- }
- f.delete();
- }
-
- public static void removeMessageStore() {
- if (System.getProperty("activemq.store.dir") != null) {
- recursiveDelete(new File(System.getProperty("activemq.store.dir")));
- }
- if (System.getProperty("derby.system.home") != null) {
- recursiveDelete(new File(System.getProperty("derby.system.home")));
- }
- }
-
- /**
- * Test if base directory contains spaces
- */
- protected void assertBaseDirectoryContainsSpaces() {
- assertFalse("Base directory cannot contain spaces.",
- new File(System.getProperty("basedir", ".")).getAbsoluteFile().toString().contains(" "));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/TimeStampTest.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/TimeStampTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/TimeStampTest.java
deleted file mode 100644
index 4b12ccc..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/TimeStampTest.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq;
-
-import javax.jms.Topic;
-import javax.jms.Connection;
-
-import org.apache.hedwig.JmsTestBase;
-import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import junit.framework.TestCase;
-
-
-public class TimeStampTest extends JmsTestBase {
- public void test() throws Exception {
- // Create a ConnectionFactory
- HedwigConnectionFactoryImpl connectionFactory =
- new HedwigConnectionFactoryImpl();
-
- // Create a Connection
- Connection connection = connectionFactory.createConnection();
- connection.start();
-
- // Create a Session
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- // Create the destination Queue
- Destination destination = session.createTopic("TEST.FOO");
-
- // Create a MessageProducer from the Session to the Topic or Queue
- MessageProducer producer = session.createProducer(destination);
- producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-
- // Create a MessageConsumer from the Session to the Topic or Queue
- MessageConsumer consumer = session.createConsumer(destination);
- // Create a messages
- Message sentMessage = session.createMessage();
-
- // Tell the producer to send the message
- long beforeSend = System.currentTimeMillis();
- producer.send(sentMessage);
- long afterSend = System.currentTimeMillis();
-
- // assert message timestamp is in window
- assertTrue(beforeSend <= sentMessage.getJMSTimestamp() && sentMessage.getJMSTimestamp() <= afterSend);
-
-
- // Wait for a message
- Message receivedMessage = consumer.receive(1000);
-
- // assert we got the same message ID we sent
- assertEquals(sentMessage.getJMSMessageID(), receivedMessage.getJMSMessageID());
-
- // assert message timestamp is in window
- assertTrue("JMS Message Timestamp should be set during the send method: \n" + " beforeSend = "
- + beforeSend + "\n" + " getJMSTimestamp = "
- + receivedMessage.getJMSTimestamp() + "\n" + " afterSend = "
- + afterSend + "\n", beforeSend <= receivedMessage.getJMSTimestamp()
- && receivedMessage.getJMSTimestamp() <= afterSend);
-
- // assert message timestamp is unchanged
- assertEquals("JMS Message Timestamp of recieved message should be the same as the sent message\n ",
- sentMessage.getJMSTimestamp(), receivedMessage.getJMSTimestamp());
-
- // Clean up
- producer.close();
- consumer.close();
- session.close();
- connection.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/demo/SimpleConsumer.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/demo/SimpleConsumer.java b/hedwig-client-jms/src/test/java/org/apache/activemq/demo/SimpleConsumer.java
deleted file mode 100644
index 1d18724..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/demo/SimpleConsumer.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * The SimpleQueueReceiver class consists only of a main method,
- * which fetches one or more messages from a queue using
- * synchronous message delivery. Run this program in conjunction
- * with SimpleQueueSender. Specify a queue name on the command
- * line when you run the program.
- */
-package org.apache.activemq.demo;
-
-import javax.jms.Topic;
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.naming.Context;
-import javax.naming.InitialContext;
-import javax.naming.NamingException;
-
-/**
- * A simple polymorphic JMS consumer which can work with Queues or Topics which
- * uses JNDI to lookup the JMS connection factory and destination
- */
-public final class SimpleConsumer {
-
- private static final org.apache.commons.logging.Log LOG = org.apache.commons.logging.LogFactory
- .getLog(SimpleConsumer.class);
-
- private SimpleConsumer() {
- }
-
- /**
- * @param args the queue used by the example
- */
- public static void main(String[] args) {
- String destinationName = null;
- Context jndiContext = null;
- ConnectionFactory connectionFactory = null;
- Connection connection = null;
- Session session = null;
- Destination destination = null;
- MessageConsumer consumer = null;
-
- /*
- * Read destination name from command line and display it.
- */
- if (args.length != 1) {
- LOG.info("Usage: java SimpleConsumer <destination-name>");
- System.exit(1);
- }
- destinationName = args[0];
- LOG.info("Destination name is " + destinationName);
-
- /*
- * Create a JNDI API InitialContext object
- */
- try {
- jndiContext = new InitialContext();
- } catch (NamingException e) {
- LOG.info("Could not create JNDI API " + "context: " + e.toString());
- System.exit(1);
- }
-
- /*
- * Look up connection factory and destination.
- */
- try {
- connectionFactory = (ConnectionFactory)jndiContext.lookup("ConnectionFactory");
- destination = (Destination)jndiContext.lookup(destinationName);
- } catch (NamingException e) {
- LOG.info("JNDI API lookup failed: " + e.toString());
- System.exit(1);
- }
-
- /*
- * Create connection. Create session from connection; false means
- * session is not transacted. Create receiver, then start message
- * delivery. Receive all text messages from destination until a non-text
- * message is received indicating end of message stream. Close
- * connection.
- */
- try {
- connection = connectionFactory.createConnection();
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- consumer = session.createConsumer(destination);
- connection.start();
- while (true) {
- Message m = consumer.receive(1);
- if (m != null) {
- if (m instanceof TextMessage) {
- TextMessage message = (TextMessage)m;
- LOG.info("Reading message: " + message.getText());
- } else {
- break;
- }
- }
- }
- } catch (JMSException e) {
- LOG.info("Exception occurred: " + e);
- } finally {
- if (connection != null) {
- try {
- connection.close();
- } catch (JMSException e) {
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/demo/SimpleProducer.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/demo/SimpleProducer.java b/hedwig-client-jms/src/test/java/org/apache/activemq/demo/SimpleProducer.java
deleted file mode 100644
index 4facc3d..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/demo/SimpleProducer.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * The SimpleQueueSender class consists only of a main method,
- * which sends several messages to a queue.
- *
- * Run this program in conjunction with SimpleQueueReceiver.
- * Specify a queue name on the command line when you run the
- * program. By default, the program sends one message. Specify
- * a number after the queue name to send that number of messages.
- */
-package org.apache.activemq.demo;
-
-// START SNIPPET: demo
-
-import javax.jms.Topic;
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.naming.Context;
-import javax.naming.InitialContext;
-import javax.naming.NamingException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A simple polymorphic JMS producer which can work with Queues or Topics which
- * uses JNDI to lookup the JMS connection factory and destination
- */
-public final class SimpleProducer {
-
- private static final Logger LOG = LoggerFactory.getLogger(SimpleProducer.class);
-
- private SimpleProducer() {
- }
-
- /**
- * @param args the destination name to send to and optionally, the number of
- * messages to send
- */
- public static void main(String[] args) {
- Context jndiContext = null;
- ConnectionFactory connectionFactory = null;
- Connection connection = null;
- Session session = null;
- Destination destination = null;
- MessageProducer producer = null;
- String destinationName = null;
- final int numMsgs;
-
- if ((args.length < 1) || (args.length > 2)) {
- LOG.info("Usage: java SimpleProducer <destination-name> [<number-of-messages>]");
- System.exit(1);
- }
- destinationName = args[0];
- LOG.info("Destination name is " + destinationName);
- if (args.length == 2) {
- numMsgs = (new Integer(args[1])).intValue();
- } else {
- numMsgs = 1;
- }
-
- /*
- * Create a JNDI API InitialContext object
- */
- try {
- jndiContext = new InitialContext();
- } catch (NamingException e) {
- LOG.info("Could not create JNDI API context: " + e.toString());
- System.exit(1);
- }
-
- /*
- * Look up connection factory and destination.
- */
- try {
- connectionFactory = (ConnectionFactory)jndiContext.lookup("ConnectionFactory");
- destination = (Destination)jndiContext.lookup(destinationName);
- } catch (NamingException e) {
- LOG.info("JNDI API lookup failed: " + e);
- System.exit(1);
- }
-
- /*
- * Create connection. Create session from connection; false means
- * session is not transacted. Create sender and text message. Send
- * messages, varying text slightly. Send end-of-messages message.
- * Finally, close connection.
- */
- try {
- connection = connectionFactory.createConnection();
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- producer = session.createProducer(destination);
- TextMessage message = session.createTextMessage();
- for (int i = 0; i < numMsgs; i++) {
- message.setText("This is message " + (i + 1));
- LOG.info("Sending message: " + message.getText());
- producer.send(message);
- }
-
- /*
- * Send a non-text control message indicating end of messages.
- */
- producer.send(session.createMessage());
- } catch (JMSException e) {
- LOG.info("Exception occurred: " + e);
- } finally {
- if (connection != null) {
- try {
- connection.close();
- } catch (JMSException e) {
- }
- }
- }
- }
-}
-
-// END SNIPPET: demo
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/load/LoadClient.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/load/LoadClient.java b/hedwig-client-jms/src/test/java/org/apache/activemq/load/LoadClient.java
deleted file mode 100644
index c5d9b0c..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/load/LoadClient.java
+++ /dev/null
@@ -1,222 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.load;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.activemq.perf.PerfRate;
-import org.apache.hedwig.jms.LRUCacheSet;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class LoadClient implements Runnable{
- private static final Logger LOG = LoggerFactory.getLogger(LoadClient.class);
- protected static int SLEEP_TIME = 2;
- protected String name;
- protected ConnectionFactory factory;
- protected Connection connection;
- protected Destination startDestination;
- protected Destination nextDestination;
- protected Session session;
- protected MessageConsumer consumer;
- protected MessageProducer producer;
- protected PerfRate rate = new PerfRate();
- protected int deliveryMode = DeliveryMode.PERSISTENT;
- protected boolean connectionPerMessage = false;
- protected boolean running;
- protected int timeout = 10000;
-
- public LoadClient(String name,ConnectionFactory factory) {
- this.name=name;
- this.factory = factory;
- }
-
- public synchronized void start() throws JMSException {
- if (!running) {
- rate.reset();
- running = true;
- if (!connectionPerMessage) {
- connection = factory.createConnection();
- connection.start();
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- consumer = session.createConsumer(getConsumeDestination());
- producer = session.createProducer(getSendDestination());
- producer.setDeliveryMode(this.deliveryMode);
- }
- Thread t = new Thread(this);
- t.setName(name);
- t.start();
- }
- }
-
- public void stop() throws JMSException, InterruptedException {
- running = false;
- if(connection != null) {
- connection.stop();
- }
- }
-
- public void run() {
- try {
- while (running) {
- String result = consume();
- if(result != null) {
- send(result);
- rate.increment();
- }
- else if (running) {
- LOG.error(name + " Failed to consume!");
- }
- }
- } catch (Throwable e) {
- e.printStackTrace();
- }
- }
-
- private LRUCacheSet<String> messageIdCache = new LRUCacheSet<String>(2048, false);
- protected String consume() throws Exception {
- Connection con = null;
- MessageConsumer c = consumer;
- if (connectionPerMessage){
- con = factory.createConnection();
- con.start();
- Session s = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- c = s.createConsumer(getConsumeDestination());
- }
- TextMessage result = (TextMessage) c.receive(timeout);
- if (result != null) {
- if (messageIdCache.contains(result.getJMSMessageID())) {
- throw new JMSException("Received duplicate " + result.getText());
- }
- messageIdCache.add(result.getJMSMessageID());
-
- if (connectionPerMessage) {
- Thread.sleep(SLEEP_TIME);//give the broker a chance
- con.close();
- }
- }
- return result != null ? result.getText() : null;
- }
-
- protected void send(String text) throws Exception {
- Connection con = connection;
- MessageProducer p = producer;
- Session s = session;
- if (connectionPerMessage){
- con = factory.createConnection();
- con.start();
- s = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- p = s.createProducer(getSendDestination());
- p.setDeliveryMode(deliveryMode);
- }
- TextMessage message = s.createTextMessage(text);
- p.send(message);
- if (connectionPerMessage) {
- Thread.sleep(SLEEP_TIME);//give the broker a chance
- con.close();
- }
- }
-
-
-
- public String getName() {
- return name;
- }
-
-
-
- public void setName(String name) {
- this.name = name;
- }
-
-
-
- public Destination getStartDestination() {
- return startDestination;
- }
-
-
-
- public void setStartDestination(Destination startDestination) {
- this.startDestination = startDestination;
- }
-
-
-
- public Destination getNextDestination() {
- return nextDestination;
- }
-
-
-
- public void setNextDestination(Destination nextDestination) {
- this.nextDestination = nextDestination;
- }
-
-
-
- public int getDeliveryMode() {
- return deliveryMode;
- }
-
-
-
- public void setDeliveryMode(int deliveryMode) {
- this.deliveryMode = deliveryMode;
- }
-
-
-
- public boolean isConnectionPerMessage() {
- return connectionPerMessage;
- }
-
-
-
- public void setConnectionPerMessage(boolean connectionPerMessage) {
- this.connectionPerMessage = connectionPerMessage;
- }
-
-
-
- public int getTimeout() {
- return timeout;
- }
-
-
-
- public void setTimeout(int timeout) {
- this.timeout = timeout;
- }
-
- protected Destination getSendDestination() {
- return nextDestination;
- }
-
- protected Destination getConsumeDestination() {
- return startDestination;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/load/LoadController.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/load/LoadController.java b/hedwig-client-jms/src/test/java/org/apache/activemq/load/LoadController.java
deleted file mode 100644
index 8149814..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/load/LoadController.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.load;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-
-public class LoadController extends LoadClient{
- private int numberOfBatches=1;
- private int batchSize =1000;
- private int count;
- private final CountDownLatch stopped = new CountDownLatch(1);
-
- public LoadController(String name,ConnectionFactory factory) {
- super(name,factory);
- }
-
- public int awaitTestComplete() throws InterruptedException {
- boolean complete = stopped.await(60*5,TimeUnit.SECONDS);
- return count;
- }
-
- public void stop() throws JMSException, InterruptedException {
- running = false;
- stopped.countDown();
- if (connection != null) {
- this.connection.stop();
- }
- }
-
- public void run() {
- try {
- for (int i = 0; i < numberOfBatches; i++) {
- for (int j = 0; j < batchSize; j++) {
- String payLoad = "batch[" + i + "]no:" + j;
- send(payLoad);
- }
- for (int j = 0; j < batchSize; j++) {
- String result = consume();
- if (result != null) {
- count++;
- rate.increment();
- }
- }
- }
- } catch (Throwable e) {
- e.printStackTrace();
- } finally {
- stopped.countDown();
- }
- }
-
-
- public int getNumberOfBatches() {
- return numberOfBatches;
- }
-
-
- public void setNumberOfBatches(int numberOfBatches) {
- this.numberOfBatches = numberOfBatches;
- }
-
-
- public int getBatchSize() {
- return batchSize;
- }
-
-
- public void setBatchSize(int batchSize) {
- this.batchSize = batchSize;
- }
-
- protected Destination getSendDestination() {
- return startDestination;
- }
-
- protected Destination getConsumeDestination() {
- return nextDestination;
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/load/LoadTest.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/load/LoadTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/load/LoadTest.java
deleted file mode 100644
index 66880af..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/load/LoadTest.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.load;
-
-
-import javax.jms.Topic;
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Session;
-import junit.framework.TestCase;
-import org.apache.hedwig.JmsTestBase;
-import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl;
-
-import org.junit.Ignore;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-// For now, ignore it ...
-@Ignore
-public class LoadTest extends JmsTestBase {
-
- private static final Logger LOG = LoggerFactory.getLogger(LoadTest.class);
-
- protected LoadController controller;
- protected LoadClient[] clients;
- protected ConnectionFactory factory;
- protected Destination destination;
- protected int numberOfClients = 50;
- protected int deliveryMode = DeliveryMode.PERSISTENT;
- protected int batchSize = 1000;
- protected int numberOfBatches = 10;
- protected int timeout = Integer.MAX_VALUE;
- protected boolean connectionPerMessage = false;
- protected Connection managementConnection;
- protected Session managementSession;
-
- /**
- * Sets up a test where the producer and consumer have their own connection.
- *
- * @see junit.framework.TestCase#setUp()
- */
- protected void setUp() throws Exception {
- super.setUp();
- factory = createConnectionFactory();
- managementConnection = factory.createConnection();
- managementSession = managementConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- Destination startDestination = createDestination(managementSession, getClass()+".start");
- Destination endDestination = createDestination(managementSession, getClass()+".end");
- LOG.info("Running with " + numberOfClients + " clients - sending "
- + numberOfBatches + " batches of " + batchSize + " messages");
- controller = new LoadController("Controller",factory);
- controller.setBatchSize(batchSize);
- controller.setNumberOfBatches(numberOfBatches);
- controller.setDeliveryMode(deliveryMode);
- controller.setConnectionPerMessage(connectionPerMessage);
- controller.setStartDestination(startDestination);
- controller.setNextDestination(endDestination);
- controller.setTimeout(timeout);
- clients = new LoadClient[numberOfClients];
- for (int i = 0; i < numberOfClients; i++) {
- Destination inDestination = null;
- if (i==0) {
- inDestination = startDestination;
- }else {
- inDestination = createDestination(managementSession, getClass() + ".client."+(i));
- }
- Destination outDestination = null;
- if (i==(numberOfClients-1)) {
- outDestination = endDestination;
- }else {
- outDestination = createDestination(managementSession, getClass() + ".client."+(i+1));
- }
- LoadClient client = new LoadClient("client("+i+")",factory);
- client.setTimeout(timeout);
- client.setDeliveryMode(deliveryMode);
- client.setConnectionPerMessage(connectionPerMessage);
- client.setStartDestination(inDestination);
- client.setNextDestination(outDestination);
- clients[i] = client;
- }
- super.setUp();
- }
-
- protected void tearDown() throws Exception {
- super.tearDown();
- managementConnection.close();
- for (int i = 0; i < numberOfClients; i++) {
- clients[i].stop();
- }
- controller.stop();
- }
-
- protected Destination createDestination(Session s, String destinationName) throws JMSException {
- return s.createTopic(destinationName);
- }
-
- protected HedwigConnectionFactoryImpl createConnectionFactory() throws Exception {
- return new HedwigConnectionFactoryImpl();
- }
-
- public void testLoad() throws JMSException, InterruptedException {
- for (int i = 0; i < numberOfClients; i++) {
- clients[i].start();
- }
- controller.start();
- assertEquals((batchSize* numberOfBatches),controller.awaitTestComplete());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/perf/ConnectionChurnTest.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/perf/ConnectionChurnTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/perf/ConnectionChurnTest.java
deleted file mode 100644
index a47ba67..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/perf/ConnectionChurnTest.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.perf;
-
-import java.util.ArrayList;
-import java.util.List;
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.JMSException;
-import junit.framework.TestCase;
-import org.apache.hedwig.JmsTestBase;
-import org.apache.hedwig.jms.spi.HedwigConnectionImpl;
-import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl;
-
-
-import org.junit.Ignore;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-// For now, ignore it ...
-@Ignore
-public class ConnectionChurnTest extends JmsTestBase {
- protected static final int CONNECTION_COUNT = 200;
- private static final Logger LOG = LoggerFactory.getLogger(ConnectionChurnTest.class);
- protected int topicCount;
-
- public void testPerformance() throws Exception {
- ConnectionFactory factory = createConnectionFactory();
- List<Connection> list = new ArrayList<Connection>();
- for (int i = 0; i < CONNECTION_COUNT; i++) {
- Connection connection = factory.createConnection();
- connection.start();
- list.add(connection);
- LOG.info("Created " + i);
- if (i % 100 == 0) {
- closeConnections(list);
- }
- }
- closeConnections(list);
- }
-
- protected void closeConnections(List<Connection> list) throws JMSException {
- for (Connection c : list) {
- c.close();
- }
- list.clear();
- }
-
- protected void setUp() throws Exception {
- super.setUp();
- }
-
- protected void tearDown() throws Exception {
- super.tearDown();
- }
-
- protected HedwigConnectionFactoryImpl createConnectionFactory()
- throws Exception {
- HedwigConnectionFactoryImpl cf = new HedwigConnectionFactoryImpl();
- return cf;
- }
-
-}