You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2016/03/16 04:44:48 UTC
[38/49] bookkeeper git commit: BOOKKEEPER-769: Remove the Hedwig Code
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java
deleted file mode 100644
index 82572be..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java
+++ /dev/null
@@ -1,339 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq;
-
-
-import javax.jms.Topic;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-
-import org.apache.hedwig.jms.SessionImpl;
-import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class JmsRollbackRedeliveryTest extends AutoFailTestSupport {
- protected static final Logger LOG = LoggerFactory.getLogger(JmsRollbackRedeliveryTest.class);
- final int nbMessages = 10;
- final String destinationName = "Destination";
- boolean consumerClose = true;
- boolean rollback = true;
-
- public void setUp() throws Exception {
- setAutoFail(true);
- super.setUp();
- }
-
- public void tearDown() throws Exception {
- super.tearDown();
- }
-
- public void testRedelivery() throws Exception {
- doTestRedelivery(false);
- }
-
- public void testRedeliveryWithInterleavedProducer() throws Exception {
- doTestRedelivery(true);
- }
-
- public void doTestRedelivery(boolean interleaveProducer) throws Exception {
-
- ConnectionFactory connectionFactory = new HedwigConnectionFactoryImpl();
- Connection connection = connectionFactory.createConnection();
- Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
- Destination destination = session.createTopic(destinationName);
- MessageConsumer consumer = session.createConsumer(destination);
- connection.start();
-
- if (interleaveProducer) {
- populateDestinationWithInterleavedProducer(nbMessages, destinationName, connection);
- } else {
- populateDestination(nbMessages, destinationName, connection);
- }
- // Consume messages and rollback transactions
- {
- AtomicInteger received = new AtomicInteger();
- Map<String, Boolean> rolledback = new ConcurrentHashMap<String, Boolean>();
- while (received.get() < nbMessages) {
- TextMessage msg = (TextMessage) consumer.receive(6000000);
- if (msg != null) {
- if (msg != null && rolledback.put(msg.getText(), Boolean.TRUE) != null) {
- LOG.info("Received message " + msg.getText()
- + " (" + received.getAndIncrement() + ")" + msg.getJMSMessageID());
- assertTrue(msg.getJMSRedelivered());
- // assertEquals(2, msg.getLongProperty("JMSXDeliveryCount"));
- session.commit();
- } else {
- LOG.info("Rollback message " + msg.getText() + " id: " + msg.getJMSMessageID());
- assertFalse("should not have redelivery flag set, id: "
- + msg.getJMSMessageID(), msg.getJMSRedelivered());
- session.rollback();
- }
- }
- }
- consumer.close();
- session.close();
- }
- }
-
- public void testRedeliveryOnSingleConsumer() throws Exception {
-
- ConnectionFactory connectionFactory =
- new HedwigConnectionFactoryImpl();
- Connection connection = connectionFactory.createConnection();
- connection.start();
- Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
- Destination destination = session.createTopic(destinationName);
- MessageConsumer consumer = session.createConsumer(destination);
-
- populateDestinationWithInterleavedProducer(nbMessages, destinationName, connection);
-
- // Consume messages and rollback transactions
- {
- AtomicInteger received = new AtomicInteger();
- Map<String, Boolean> rolledback = new ConcurrentHashMap<String, Boolean>();
- while (received.get() < nbMessages) {
- TextMessage msg = (TextMessage) consumer.receive(6000000);
- if (msg != null) {
- if (msg != null && rolledback.put(msg.getText(), Boolean.TRUE) != null) {
- LOG.info("Received message " + msg.getText() + " ("
- + received.getAndIncrement() + ")" + msg.getJMSMessageID());
- assertTrue(msg.getJMSRedelivered());
- session.commit();
- } else {
- LOG.info("Rollback message " + msg.getText() + " id: " + msg.getJMSMessageID());
- session.rollback();
- }
- }
- }
- consumer.close();
- session.close();
- }
- }
-
- public void testRedeliveryOnSingleSession() throws Exception {
-
- ConnectionFactory connectionFactory =
- new HedwigConnectionFactoryImpl();
- Connection connection = connectionFactory.createConnection();
- Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
- Destination destination = session.createTopic(destinationName);
- MessageConsumer consumer = session.createConsumer(destination);
- connection.start();
-
- populateDestination(nbMessages, destinationName, connection);
-
- // Consume messages and rollback transactions
- {
- AtomicInteger received = new AtomicInteger();
- Map<String, Boolean> rolledback = new ConcurrentHashMap<String, Boolean>();
- while (received.get() < nbMessages) {
- TextMessage msg = (TextMessage) consumer.receive(6000000);
- if (msg != null) {
- if (msg != null && rolledback.put(msg.getText(), Boolean.TRUE) != null) {
- LOG.info("Received message " + msg.getText() + " ("
- + received.getAndIncrement() + ")" + msg.getJMSMessageID());
- assertTrue(msg.getJMSRedelivered());
- session.commit();
- } else {
- LOG.info("Rollback message " + msg.getText() + " id: " + msg.getJMSMessageID());
- session.rollback();
- }
- }
- }
- consumer.close();
- session.close();
- }
- }
-
- // AMQ-1593
- public void testValidateRedeliveryCountOnRollback() throws Exception {
-
- final int numMessages = 1;
- ConnectionFactory connectionFactory =
- new HedwigConnectionFactoryImpl();
- Connection connection = connectionFactory.createConnection();
- Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
- Destination destination = session.createTopic(destinationName);
-
- MessageConsumer consumer = session.createDurableSubscriber((Topic) destination, "subscriber-id-1");
- connection.start();
-
- populateDestination(numMessages, destinationName, connection);
-
- {
- AtomicInteger received = new AtomicInteger();
- // hardcoded, we actually allow for infinite rollback/redelivery ...
- final int maxRetries = 5;
- while (received.get() < maxRetries) {
- TextMessage msg = (TextMessage) consumer.receive(1000);
- assert msg != null;
- if (msg != null) {
- LOG.info("Received message " + msg.getText() + " ("
- + received.getAndIncrement() + ")" + msg.getJMSMessageID());
- session.rollback();
- }
- }
- session.close();
- consumeMessage(connection, "subscriber-id-1");
- }
- }
-
- // AMQ-1593
- public void testValidateRedeliveryCountOnRollbackWithPrefetch0() throws Exception {
-
- final int numMessages = 1;
- ConnectionFactory connectionFactory =
- new HedwigConnectionFactoryImpl();
- Connection connection = connectionFactory.createConnection();
- Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
- Destination destination = session.createTopic(destinationName);
-
- MessageConsumer consumer = session.createDurableSubscriber((Topic) destination, "subscriber-id-2");
- connection.start();
-
- populateDestination(numMessages, destinationName, connection);
-
- {
- AtomicInteger received = new AtomicInteger();
- // hardcoded, we actually allow for infinite rollback/redelivery ...
- final int maxRetries = 5;
- while (received.get() < maxRetries) {
- TextMessage msg = (TextMessage) consumer.receive(1000);
- assert msg != null;
- if (msg != null) {
- LOG.info("Received message " + msg.getText() + " ("
- + received.getAndIncrement() + ")" + msg.getJMSMessageID());
- session.rollback();
- }
- }
-
- session.close();
- consumeMessage(connection, "subscriber-id-2");
- }
- }
-
-
- private void consumeMessage(Connection connection, String subscriberId)
- throws JMSException {
- Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
- Destination destination = session.createTopic(destinationName);
- MessageConsumer consumer;
- if (null == subscriberId) consumer = session.createConsumer(destination);
- else consumer = session.createDurableSubscriber((Topic) destination, subscriberId);
-
- TextMessage msg = (TextMessage) consumer.receive(1000);
- assertNotNull(msg);
- session.commit();
- session.close();
- }
-
- public void testRedeliveryPropertyWithNoRollback() throws Exception {
- final int numMessages = 1;
- ConnectionFactory connectionFactory =
- new HedwigConnectionFactoryImpl();
- Connection connection = connectionFactory.createConnection();
- // ensure registration of durable subscription
- {
- connection.setClientID(getName() + "-client-id-1");
- Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
- Destination destination = session.createTopic(destinationName);
-
- MessageConsumer consumer = session.createDurableSubscriber((Topic) destination, "subscriber-id-3");
- }
- connection.start();
-
- populateDestination(numMessages, destinationName, connection);
- connection.close();
- {
- AtomicInteger received = new AtomicInteger();
- // hardcoded, we actually allow for infinite rollback/redelivery ...
- final int maxRetries = 5;
- while (received.get() < maxRetries) {
- connection = connectionFactory.createConnection();
- connection.setClientID(getName() + "-client-id-1");
- connection.start();
- Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
- Destination destination = session.createTopic(destinationName);
-
- MessageConsumer consumer = session.createDurableSubscriber((Topic) destination, "subscriber-id-3");
- TextMessage msg = (TextMessage) consumer.receive(2000);
- assert msg != null;
- if (msg != null) {
- LOG.info("Received message " + msg.getText() + " ("
- + received.getAndIncrement() + ")" + msg.getJMSMessageID());
- }
- session.close();
- connection.close();
- }
- connection = connectionFactory.createConnection();
- connection.setClientID(getName() + "-client-id-1");
- connection.start();
- consumeMessage(connection, "subscriber-id-3");
- }
- }
-
- private void populateDestination(final int nbMessages,
- final String destinationName, Connection connection)
- throws JMSException {
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Destination destination = session.createTopic(destinationName);
- MessageProducer producer = session.createProducer(destination);
- for (int i = 1; i <= nbMessages; i++) {
- producer.send(session.createTextMessage("<hello id='" + i + "'/>"));
- }
- producer.close();
- session.close();
- }
-
-
- private void populateDestinationWithInterleavedProducer(final int nbMessages,
- final String destinationName, Connection connection)
- throws JMSException {
- Session session1 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Destination destination1 = session1.createTopic(destinationName);
- MessageProducer producer1 = session1.createProducer(destination1);
- Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Destination destination2 = session2.createTopic(destinationName);
- MessageProducer producer2 = session2.createProducer(destination2);
-
- for (int i = 1; i <= nbMessages; i++) {
- if (i%2 == 0) {
- producer1.send(session1.createTextMessage("<hello id='" + i + "'/>"));
- } else {
- producer2.send(session2.createTextMessage("<hello id='" + i + "'/>"));
- }
- }
- producer1.close();
- session1.close();
- producer2.close();
- session2.close();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/JmsSendReceiveTestSupport.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsSendReceiveTestSupport.java b/hedwig-client-jms/src/test/java/org/apache/activemq/JmsSendReceiveTestSupport.java
deleted file mode 100644
index 62a1715..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsSendReceiveTestSupport.java
+++ /dev/null
@@ -1,234 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Date;
-import java.util.Iterator;
-import java.util.List;
-
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class JmsSendReceiveTestSupport extends TestSupport implements MessageListener {
- private static final Logger LOG = LoggerFactory.getLogger(JmsSendReceiveTestSupport.class);
-
- protected int messageCount = 100;
- protected String[] data;
- protected Session session;
- protected MessageConsumer consumer;
- protected MessageProducer producer;
- protected Destination consumerDestination;
- protected Destination producerDestination;
- protected List<Message> messages = createConcurrentList();
- protected boolean topic = true;
- protected boolean durable;
- protected int deliveryMode = DeliveryMode.PERSISTENT;
- protected final Object lock = new Object();
- protected boolean verbose;
-
- /*
- * @see junit.framework.TestCase#setUp()
- */
- protected void setUp() throws Exception {
- super.setUp();
- String temp = System.getProperty("messageCount");
-
- if (temp != null) {
- int i = Integer.parseInt(temp);
- if (i > 0) {
- messageCount = i;
- }
- }
-
- LOG.info("Message count for test case is: " + messageCount);
- data = new String[messageCount];
-
- for (int i = 0; i < messageCount; i++) {
- data[i] = "Text for message: " + i + " at " + new Date();
- }
- }
-
- /**
- * Sends and consumes the messages.
- *
- * @throws Exception
- */
- public void testSendReceive() throws Exception {
- messages.clear();
- for (int i = 0; i < data.length; i++) {
- Message message = session.createTextMessage(data[i]);
- message.setStringProperty("stringProperty", data[i]);
- message.setIntProperty("intProperty", i);
-
- if (verbose) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("About to send a message: " + message + " with text: " + data[i]);
- }
- }
-
- sendToProducer(producer, producerDestination, message);
- messageSent();
- }
-
- assertMessagesAreReceived();
- LOG.info("" + data.length + " messages(s) received, closing down connections");
- }
-
- /**
- * Sends a message to a destination using the supplied producer
- * @param producer
- * @param producerDestination
- * @param message
- * @throws JMSException
- */
- protected void sendToProducer(MessageProducer producer,
- Destination producerDestination, Message message) throws JMSException {
- producer.send(producerDestination, message);
- }
-
- /**
- * Asserts messages are received.
- *
- * @throws JMSException
- */
- protected void assertMessagesAreReceived() throws JMSException {
- waitForMessagesToBeDelivered();
- assertMessagesReceivedAreValid(messages);
- }
-
- /**
- * Tests if the messages received are valid.
- *
- * @param receivedMessages - list of received messages.
- * @throws JMSException
- */
- protected void assertMessagesReceivedAreValid(List<Message> receivedMessages) throws JMSException {
- List<Object> copyOfMessages = Arrays.asList(receivedMessages.toArray());
- int counter = 0;
-
- if (data.length != copyOfMessages.size()) {
- for (Iterator<Object> iter = copyOfMessages.iterator(); iter.hasNext();) {
- TextMessage message = (TextMessage)iter.next();
- if (LOG.isInfoEnabled()) {
- LOG.info("<== " + counter++ + " = " + message.getText());
- }
- }
- }
-
- assertEquals("Not enough messages received", data.length, receivedMessages.size());
-
- for (int i = 0; i < data.length; i++) {
- TextMessage received = (TextMessage)receivedMessages.get(i);
- String text = received.getText();
- String stringProperty = received.getStringProperty("stringProperty");
- int intProperty = received.getIntProperty("intProperty");
-
- if (verbose) {
- if (LOG.isDebugEnabled()) {
- LOG.info("Received Text: " + text);
- }
- }
-
- assertEquals("Message: " + i, data[i], text);
- assertEquals(data[i], stringProperty);
- assertEquals(i, intProperty);
- }
- }
-
- /**
- * Waits for messages to be delivered.
- */
- protected void waitForMessagesToBeDelivered() {
- long maxWaitTime = 30000;
- long waitTime = maxWaitTime;
- long start = (maxWaitTime <= 0) ? 0 : System.currentTimeMillis();
-
- synchronized (lock) {
- while (messages.size() < data.length && waitTime >= 0) {
- try {
- lock.wait(200);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
-
- waitTime = maxWaitTime - (System.currentTimeMillis() - start);
- }
- }
- }
-
- /*
- * (non-Javadoc)
- *
- * @see javax.jms.MessageListener#onMessage(javax.jms.Message)
- */
- public synchronized void onMessage(Message message) {
- consumeMessage(message, messages);
- }
-
- /**
- * Consumes messages.
- *
- * @param message - message to be consumed.
- * @param messageList -list of consumed messages.
- */
- protected void consumeMessage(Message message, List<Message> messageList) {
- if (verbose) {
- if (LOG.isDebugEnabled()) {
- LOG.info("Received message: " + message);
- }
- }
-
- messageList.add(message);
-
- if (messageList.size() >= data.length) {
- synchronized (lock) {
- lock.notifyAll();
- }
- }
- }
-
- /**
- * Returns the ArrayList as a synchronized list.
- *
- * @return List
- */
- protected List<Message> createConcurrentList() {
- return Collections.synchronizedList(new ArrayList<Message>());
- }
-
- /**
- * Just a hook so can insert failure tests
- *
- * @throws Exception
- */
- protected void messageSent() throws Exception {
-
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/JmsSendReceiveWithMessageExpirationTest.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsSendReceiveWithMessageExpirationTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/JmsSendReceiveWithMessageExpirationTest.java
deleted file mode 100644
index 82d2108..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsSendReceiveWithMessageExpirationTest.java
+++ /dev/null
@@ -1,239 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq;
-
-import java.util.Date;
-import java.util.Vector;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.Topic;
-
-import org.apache.hedwig.jms.spi.HedwigConnectionImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class JmsSendReceiveWithMessageExpirationTest extends TestSupport {
-
- private static final Logger LOG = LoggerFactory.getLogger(JmsSendReceiveWithMessageExpirationTest.class);
-
- protected int messageCount = 100;
- protected String[] data;
- protected Session session;
- protected Destination consumerDestination;
- protected Destination producerDestination;
- protected boolean durable;
- protected int deliveryMode = DeliveryMode.PERSISTENT;
- protected long timeToLive = 5000;
- protected boolean verbose;
-
- protected Connection connection;
-
- protected void setUp() throws Exception {
-
- super.setUp();
-
- data = new String[messageCount];
-
- for (int i = 0; i < messageCount; i++) {
- data[i] = "Text for message: " + i + " at " + new Date();
- }
-
- connectionFactory = createConnectionFactory();
- connection = createConnection(!durable);
-
- if (durable) {
- connection.setClientID(getClass().getName());
- }
-
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- }
-
- /**
- * Test consuming an expired queue.
- *
- * @throws Exception
- */
- public void testConsumeExpiredQueue() throws Exception {
-
- MessageProducer producer = createProducer(timeToLive);
-
- consumerDestination = session.createTopic(getConsumerSubject());
- producerDestination = session.createTopic(getProducerSubject());
-
- MessageConsumer consumer = createConsumer();
- connection.start();
-
- for (int i = 0; i < data.length; i++) {
- Message message = session.createTextMessage(data[i]);
- message.setStringProperty("stringProperty", data[i]);
- message.setIntProperty("intProperty", i);
-
- if (verbose) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("About to send a queue message: " + message + " with text: " + data[i]);
- }
- }
-
- producer.send(producerDestination, message, producer.getDeliveryMode(),
- producer.getPriority(), timeToLive);
- }
-
- // sleeps a second longer than the expiration time.
- // Basically waits till queue expires.
- Thread.sleep(timeToLive + 1000);
-
- // message should have expired.
- assertNull(consumer.receive(1000));
- }
-
- /**
- * Sends and consumes the messages to a queue destination.
- *
- * @throws Exception
- */
- public void testConsumeQueue() throws Exception {
-
- MessageProducer producer = createProducer(0);
-
- consumerDestination = session.createTopic(getConsumerSubject());
- producerDestination = session.createTopic(getProducerSubject());
-
- MessageConsumer consumer = createConsumer();
- connection.start();
-
- for (int i = 0; i < data.length; i++) {
- Message message = session.createTextMessage(data[i]);
- message.setStringProperty("stringProperty", data[i]);
- message.setIntProperty("intProperty", i);
-
- if (verbose) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("About to send a queue message: " + message + " with text: " + data[i]);
- }
- }
-
- producer.send(producerDestination, message);
- }
-
- // should receive a queue since there is no expiration.
- assertNotNull(consumer.receive(1000));
- }
-
- /**
- * Test consuming an expired topic.
- *
- * @throws Exception
- */
- public void testConsumeExpiredTopic() throws Exception {
-
- MessageProducer producer = createProducer(timeToLive);
-
- consumerDestination = session.createTopic(getConsumerSubject());
- producerDestination = session.createTopic(getProducerSubject());
-
- MessageConsumer consumer = createConsumer();
- connection.start();
-
- for (int i = 0; i < data.length; i++) {
- Message message = session.createTextMessage(data[i]);
- message.setStringProperty("stringProperty", data[i]);
- message.setIntProperty("intProperty", i);
-
- if (verbose) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("About to send a topic message: " + message + " with text: " + data[i]);
- }
- }
-
- producer.send(producerDestination, message);
- }
-
- // sleeps a second longer than the expiration time.
- // Basically waits till topic expires.
- Thread.sleep(timeToLive + 1000);
-
- // message should have expired.
- assertNull(consumer.receive(1000));
- }
-
- /**
- * Sends and consumes the messages to a topic destination.
- *
- * @throws Exception
- */
- public void testConsumeTopic() throws Exception {
-
- MessageProducer producer = createProducer(0);
-
- consumerDestination = session.createTopic(getConsumerSubject());
- producerDestination = session.createTopic(getProducerSubject());
-
- MessageConsumer consumer = createConsumer();
- connection.start();
-
- for (int i = 0; i < data.length; i++) {
- Message message = session.createTextMessage(data[i]);
- message.setStringProperty("stringProperty", data[i]);
- message.setIntProperty("intProperty", i);
-
- if (verbose) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("About to send a topic message: " + message + " with text: " + data[i]);
- }
- }
-
- producer.send(producerDestination, message);
- }
-
- // should receive a topic since there is no expiration.
- assertNotNull(consumer.receive(1000));
- }
-
- protected MessageProducer createProducer(long timeToLive) throws JMSException {
- MessageProducer producer = session.createProducer(null);
- producer.setDeliveryMode(deliveryMode);
- producer.setTimeToLive(timeToLive);
-
- return producer;
- }
-
- protected MessageConsumer createConsumer() throws JMSException {
- if (durable) {
- LOG.info("Creating durable consumer");
- return session.createDurableSubscriber((Topic)consumerDestination, getName());
- }
- return session.createConsumer(consumerDestination);
- }
-
- protected void tearDown() throws Exception {
- LOG.info("Dumping stats...");
- LOG.info("Closing down connection");
-
- session.close();
- connection.close();
- super.tearDown();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/JmsSendWithAsyncCallbackTest.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsSendWithAsyncCallbackTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/JmsSendWithAsyncCallbackTest.java
deleted file mode 100644
index 7b60106..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsSendWithAsyncCallbackTest.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq;
-
-
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.Topic;
-import javax.jms.Message;
-
-public class JmsSendWithAsyncCallbackTest extends TestSupport {
-
- private Connection connection;
-
- protected void setUp() throws Exception {
- super.setUp();
- connection = createConnection();
- }
-
- /**
- * @see junit.framework.TestCase#tearDown()
- */
- protected void tearDown() throws Exception {
- if (connection != null) {
- connection.close();
- connection = null;
- }
- super.tearDown();
- }
-
- public void testAsyncCallbackIsFaster() throws JMSException, InterruptedException {
- connection.start();
-
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Topic queue = session.createTopic(getName());
-
- // setup a consumer to drain messages..
- MessageConsumer consumer = session.createConsumer(queue);
- consumer.setMessageListener(new MessageListener() {
- @Override
- public void onMessage(Message message) {
- }
- });
-
- // warmup...
- for(int i=0; i < 10; i++) {
- benchmarkNonCallbackRate();
- benchmarkCallbackRate();
- }
-
- double callbackRate = benchmarkCallbackRate();
- double nonCallbackRate = benchmarkNonCallbackRate();
-
- System.out.println(String.format("AsyncCallback Send rate: %,.2f m/s", callbackRate));
- System.out.println(String.format("NonAsyncCallback Send rate: %,.2f m/s", nonCallbackRate));
-
- // there is no such requirement in hedwig case :-)
- // The async style HAS to be faster than the non-async style..
- // assertTrue( callbackRate/nonCallbackRate > 1.5 );
- }
-
- private double benchmarkNonCallbackRate() throws JMSException {
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Topic queue = session.createTopic(getName());
- int count = 1000;
- MessageProducer producer = session.createProducer(queue);
- producer.setDeliveryMode(DeliveryMode.PERSISTENT);
- long start = System.currentTimeMillis();
- for (int i = 0; i < count; i++) {
- producer.send(session.createTextMessage("Hello"));
- }
- return 1000.0 * count / (System.currentTimeMillis() - start);
- }
-
- private double benchmarkCallbackRate() throws JMSException, InterruptedException {
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Topic queue = session.createTopic(getName());
- int count = 1000;
- MessageProducer producer = session.createProducer(queue);
- producer.setDeliveryMode(DeliveryMode.PERSISTENT);
- long start = System.currentTimeMillis();
- for (int i = 0; i < count; i++) {
- producer.send(session.createTextMessage("Hello"));
- }
- return 1000.0 * count / (System.currentTimeMillis() - start);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/JmsTestSupport.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsTestSupport.java b/hedwig-client-jms/src/test/java/org/apache/activemq/JmsTestSupport.java
deleted file mode 100644
index 82ec6ae..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsTestSupport.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq;
-
-import javax.jms.Topic;
-import org.apache.hedwig.jms.MessagingSessionFacade;
-import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl;
-import org.apache.hedwig.jms.spi.HedwigConnectionImpl;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-
-
-import javax.jms.Destination;
-
-/**
- * Test cases used to test the JMS message consumer.
- */
-public class JmsTestSupport extends CombinationTestSupport {
-
- static final private AtomicLong TEST_COUNTER = new AtomicLong();
- public String userName;
- public String password;
- public String messageTextPrefix = "";
-
- protected ConnectionFactory factory;
- protected HedwigConnectionImpl connection;
-
- protected List<Connection> connections = Collections.synchronizedList(new ArrayList<Connection>());
-
- // /////////////////////////////////////////////////////////////////
- //
- // Test support methods.
- //
- // /////////////////////////////////////////////////////////////////
- protected Destination createDestination(Session session,
- MessagingSessionFacade.DestinationType type) throws JMSException {
- String testMethod = getName();
- if( testMethod.indexOf(" ")>0 ) {
- testMethod = testMethod.substring(0, testMethod.indexOf(" "));
- }
- String name = "TEST." + getClass().getName() + "." +testMethod+"."+TEST_COUNTER.getAndIncrement();
- switch (type) {
- case QUEUE:
- return (Destination)session.createTopic(name);
- case TOPIC:
- return (Destination)session.createTopic(name);
- default:
- throw new IllegalArgumentException("type: " + type);
- }
- }
-
- protected void sendMessages(Destination destination, int count) throws Exception {
- ConnectionFactory factory = createConnectionFactory();
- Connection connection = factory.createConnection();
- connection.start();
- sendMessages(connection, destination, count);
- connection.close();
- }
-
- protected void sendMessages(Connection connection, Destination destination, int count) throws JMSException {
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- sendMessages(session, destination, count);
- session.close();
- }
-
- protected void sendMessages(Session session, Destination destination, int count) throws JMSException {
- MessageProducer producer = session.createProducer(destination);
- for (int i = 0; i < count; i++) {
- producer.send(session.createTextMessage(messageTextPrefix + i));
- }
- producer.close();
- }
-
- protected ConnectionFactory createConnectionFactory() throws Exception {
- return new HedwigConnectionFactoryImpl();
- }
-
- protected void setUp() throws Exception {
- super.setUp();
-
- if (System.getProperty("basedir") == null) {
- File file = new File(".");
- System.setProperty("basedir", file.getAbsolutePath());
- }
-
- factory = createConnectionFactory();
- connection = (HedwigConnectionImpl)factory.createConnection(userName, password);
- connections.add(connection);
- }
-
- protected void tearDown() throws Exception {
- for (Iterator iter = connections.iterator(); iter.hasNext();) {
- Connection conn = (Connection)iter.next();
- try {
- conn.close();
- } catch (Throwable e) {
- }
- iter.remove();
- }
- super.tearDown();
- }
-
- protected void safeClose(Connection c) {
- try {
- c.close();
- } catch (Throwable e) {
- }
- }
-
- protected void safeClose(Session s) {
- try {
- s.close();
- } catch (Throwable e) {
- }
- }
-
- protected void safeClose(MessageConsumer c) {
- try {
- c.close();
- } catch (Throwable e) {
- }
- }
-
- protected void safeClose(MessageProducer p) {
- try {
- p.close();
- } catch (Throwable e) {
- }
- }
-
- protected void profilerPause(String prompt) throws IOException {
- if (System.getProperty("profiler") != null) {
- pause(prompt);
- }
- }
-
- protected void pause(String prompt) throws IOException {
- System.out.println();
- System.out.println(prompt + "> Press enter to continue: ");
- while (System.in.read() != '\n') {
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/JmsTopicCompositeSendReceiveTest.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsTopicCompositeSendReceiveTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/JmsTopicCompositeSendReceiveTest.java
deleted file mode 100644
index 78afcdd..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsTopicCompositeSendReceiveTest.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq;
-
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.MessageConsumer;
-import javax.jms.Topic;
-
-import org.apache.activemq.test.JmsTopicSendReceiveTest;
-import org.apache.hedwig.jms.SessionImpl;
-import org.apache.hedwig.jms.spi.HedwigConnectionImpl;
-
-public class JmsTopicCompositeSendReceiveTest extends JmsTopicSendReceiveTest {
- private static final org.apache.commons.logging.Log LOG = org.apache.commons.logging.LogFactory
- .getLog(JmsTopicCompositeSendReceiveTest.class);
-
- Destination consumerDestination2;
- MessageConsumer consumer2;
-
- /**
- * Sets a test to have a queue destination and non-persistent delivery mode.
- *
- * @see junit.framework.TestCase#setUp()
- */
- protected void setUp() throws Exception {
- deliveryMode = DeliveryMode.NON_PERSISTENT;
- super.setUp();
- consumerDestination2 = consumeSession.createTopic("FOO.BAR.HUMBUG2");
- LOG.info("Created consumer destination: " + consumerDestination2
- + " of type: " + consumerDestination2.getClass());
-
- /*
- JMS spec 6.11.1
-"Sessions with durable subscribers must always provide the same client
-identifier. In addition, each client must specify a name that uniquely identifies
-(within client identifier) each durable subscription it creates. Only one session
-at a time can have a TopicSubscriber for a particular durable subscription. See
-Section 4.3.2, “Client Identifier,” for more information.
-A client can change an existing durable subscription by creating a durable
-TopicSubscriber with the same name and a new topic and/or message selector,
-or NoLocal attribute. Changing a durable subscription is equivalent to deleting
-and recreating it."
-
- So, we CANNOT reuse the subscriber id !
- */
- if (durable) {
- LOG.info("Creating durable consumer");
- consumer2 = consumeSession.createDurableSubscriber((Topic) consumerDestination2, getName() + "_2");
- } else {
- consumer2 = consumeSession.createConsumer(consumerDestination2);
- }
- consumer2.setMessageListener(this);
- }
-
- /**
- * Returns the consumer subject.
- *
- * @return String - consumer subject
- * @see org.apache.activemq.test.TestSupport#getConsumerSubject()
- */
- protected String getConsumerSubject() {
- return "FOO.BAR.HUMBUG";
- }
-
- /**
- * Returns the producer subject.
- *
- * @return String - producer subject
- * @see org.apache.activemq.test.TestSupport#getProducerSubject()
- */
- protected String getProducerSubject() {
- return "FOO.BAR.HUMBUG";
- }
-
- /**
- * Test if all the messages sent are being received.
- *
- * @throws Exception
- */
- public void testSendReceive() throws Exception {
- super.testSendReceive();
- // messages.clear();
- assertMessagesAreReceived();
- LOG.info("" + data.length + " messages(s) received, closing down connections");
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/JmsTopicRedeliverTest.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsTopicRedeliverTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/JmsTopicRedeliverTest.java
deleted file mode 100644
index ac25e58..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsTopicRedeliverTest.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-
-import org.apache.hedwig.jms.spi.HedwigConnectionImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class JmsTopicRedeliverTest extends TestSupport {
-
- private static final Logger LOG = LoggerFactory.getLogger(JmsTopicRedeliverTest.class);
-
- protected Connection connection;
- protected Session session;
- protected Session consumeSession;
- protected MessageConsumer consumer;
- protected MessageProducer producer;
- protected Destination consumerDestination;
- protected Destination producerDestination;
- protected boolean topic = true;
- protected boolean durable = true;
- protected boolean verbose;
- // hardcoded to some random default.
- protected long initRedeliveryDelay = 1000L;
-
- protected void setUp() throws Exception {
- super.setUp();
-
- connectionFactory = createConnectionFactory();
- connection = createConnection();
-
- if (durable && null == connection.getClientID()) {
- connection.setClientID(getClass().getName());
- }
-
- LOG.info("Created connection: " + connection);
-
- session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- consumeSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-
- LOG.info("Created session: " + session);
- LOG.info("Created consumeSession: " + consumeSession);
- producer = session.createProducer(null);
- // producer.setDeliveryMode(deliveryMode);
-
- LOG.info("Created producer: " + producer);
-
- if (topic) {
- consumerDestination = session.createTopic(getConsumerSubject());
- producerDestination = session.createTopic(getProducerSubject());
- } else {
- consumerDestination = session.createTopic(getConsumerSubject());
- producerDestination = session.createTopic(getProducerSubject());
- }
-
- LOG.info("Created consumer destination: "
- + consumerDestination + " of type: " + consumerDestination.getClass());
- LOG.info("Created producer destination: "
- + producerDestination + " of type: " + producerDestination.getClass());
- consumer = createConsumer();
- connection.start();
-
- LOG.info("Created connection: " + connection);
- }
-
- protected void tearDown() throws Exception {
- if (connection != null) {
- connection.close();
- }
- super.tearDown();
- }
-
- /**
- * Returns the consumer subject.
- *
- * @return String - consumer subject
- * @see org.apache.activemq.test.TestSupport#getConsumerSubject()
- */
- protected String getConsumerSubject() {
- return "TEST";
- }
-
- /**
- * Returns the producer subject.
- *
- * @return String - producer subject
- * @see org.apache.activemq.test.TestSupport#getProducerSubject()
- */
- protected String getProducerSubject() {
- return "TEST";
- }
-
- /**
- * Sends and consumes the messages.
- *
- * @throws Exception
- */
- public void testRecover() throws Exception {
- String text = "TEST";
- Message sendMessage = session.createTextMessage(text);
-
- if (verbose) {
- LOG.info("About to send a message: " + sendMessage + " with text: " + text);
- }
- producer.send(producerDestination, sendMessage);
-
- // receive but don't acknowledge
- Message unackMessage = consumer.receive(initRedeliveryDelay + 1000);
- assertNotNull(unackMessage);
- String unackId = unackMessage.getJMSMessageID();
- assertEquals(((TextMessage)unackMessage).getText(), text);
- assertFalse(unackMessage.getJMSRedelivered());
-
- // We DO NOT support session recovery
- // - to unblock this test, I am stopp'ing and start'ing connection : not the same, but ...
- // receive then acknowledge
- // consumeSession.recover();
- connection.close();
- connection = createConnection();
- consumeSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- consumer = createConsumer();
- connection.start();
-
- Message ackMessage = consumer.receive(initRedeliveryDelay + 1000);
- assertNotNull(ackMessage);
- ackMessage.acknowledge();
- String ackId = ackMessage.getJMSMessageID();
- assertEquals(((TextMessage)ackMessage).getText(), text);
- // assertTrue(ackMessage.getJMSRedelivered());
- assertEquals(unackId, ackId);
-
- // We DO NOT support session recovery
- // - to unblock this test, I am stopp'ing and start'ing connection : not the same, but ...
- // consumeSession.recover();
- connection.close();
- connection = createConnection();
- consumeSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- consumer = createConsumer();
- connection.start();
-
- assertNull(consumer.receiveNoWait());
- }
-
- protected MessageConsumer createConsumer() throws JMSException {
- if (durable) {
- LOG.info("Creating durable consumer");
- return consumeSession.createDurableSubscriber((Topic)consumerDestination, getName());
- }
- return consumeSession.createConsumer(consumerDestination);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/JmsTopicSelectorTest.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsTopicSelectorTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/JmsTopicSelectorTest.java
deleted file mode 100644
index 73dd2ed..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsTopicSelectorTest.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq;
-
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- *
- */
-public class JmsTopicSelectorTest extends TestSupport {
- private static final Logger LOG = LoggerFactory.getLogger(JmsTopicSelectorTest.class);
-
- protected Connection connection;
- protected Session session;
- protected MessageConsumer consumer;
- protected MessageProducer producer;
- protected Destination consumerDestination;
- protected Destination producerDestination;
- protected boolean topic = true;
- protected boolean durable;
- protected int deliveryMode = DeliveryMode.PERSISTENT;
-
- public void setUp() throws Exception {
- super.setUp();
-
- connectionFactory = createConnectionFactory();
- connection = createConnection(!durable);
- if (durable) {
- connection.setClientID(getClass().getName());
- }
-
- LOG.info("Created connection: " + connection);
-
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- LOG.info("Created session: " + session);
-
- if (topic) {
- consumerDestination = session.createTopic(getConsumerSubject());
- producerDestination = session.createTopic(getProducerSubject());
- } else {
- consumerDestination = session.createTopic(getConsumerSubject());
- producerDestination = session.createTopic(getProducerSubject());
- }
-
- LOG.info("Created consumer destination: " + consumerDestination
- + " of type: " + consumerDestination.getClass());
- LOG.info("Created producer destination: " + producerDestination
- + " of type: " + producerDestination.getClass());
- producer = session.createProducer(producerDestination);
- producer.setDeliveryMode(deliveryMode);
-
- LOG.info("Created producer: " + producer + " delivery mode = "
- + (deliveryMode == DeliveryMode.PERSISTENT ? "PERSISTENT" : "NON_PERSISTENT"));
- connection.start();
- }
-
- public void tearDown() throws Exception {
- session.close();
- connection.close();
- super.tearDown();
- }
-
- protected MessageConsumer createConsumer(String selector) throws JMSException {
- if (durable) {
- LOG.info("Creating durable consumer");
- return session.createDurableSubscriber((Topic)consumerDestination, getName(), selector, false);
- }
- return session.createConsumer(consumerDestination, selector);
- }
-
- public void sendMessages() throws Exception {
- TextMessage message = session.createTextMessage("1");
- message.setIntProperty("id", 1);
- message.setJMSType("a");
- message.setStringProperty("stringProperty", "a");
- message.setLongProperty("longProperty", 1);
- message.setBooleanProperty("booleanProperty", true);
- producer.send(message);
-
- message = session.createTextMessage("2");
- message.setIntProperty("id", 2);
- message.setJMSType("a");
- message.setStringProperty("stringProperty", "a");
- message.setLongProperty("longProperty", 1);
- message.setBooleanProperty("booleanProperty", false);
- producer.send(message);
-
- message = session.createTextMessage("3");
- message.setIntProperty("id", 3);
- message.setJMSType("a");
- message.setStringProperty("stringProperty", "a");
- message.setLongProperty("longProperty", 1);
- message.setBooleanProperty("booleanProperty", true);
- producer.send(message);
-
- message = session.createTextMessage("4");
- message.setIntProperty("id", 4);
- message.setJMSType("b");
- message.setStringProperty("stringProperty", "b");
- message.setLongProperty("longProperty", 2);
- message.setBooleanProperty("booleanProperty", false);
- producer.send(message);
-
- message = session.createTextMessage("5");
- message.setIntProperty("id", 5);
- message.setJMSType("c");
- message.setStringProperty("stringProperty", "c");
- message.setLongProperty("longProperty", 3);
- message.setBooleanProperty("booleanProperty", true);
- producer.send(message);
- }
-
- public void consumeMessages(int remaining) throws Exception {
- consumer = createConsumer(null);
- for (int i = 0; i < remaining; i++) {
- consumer.receive(1000);
- }
- consumer.close();
-
- }
-
- public void testEmptyPropertySelector() throws Exception {
- int remaining = 5;
- Message message = null;
- consumer = createConsumer("");
- sendMessages();
- while (true) {
- message = consumer.receive(1000);
- if (message == null) {
- break;
- }
-
- remaining--;
- }
- assertEquals(remaining, 0);
- consumer.close();
- consumeMessages(remaining);
- }
-
- public void testPropertySelector() throws Exception {
- int remaining = 5;
- Message message = null;
- consumer = createConsumer("stringProperty = 'a' and longProperty = 1 and booleanProperty = true");
- sendMessages();
- while (true) {
- message = consumer.receive(1000);
- if (message == null) {
- break;
- }
- String text = ((TextMessage)message).getText();
- if (!text.equals("1") && !text.equals("3")) {
- fail("unexpected message: " + text);
- }
- remaining--;
- }
- assertEquals(remaining, 3);
- consumer.close();
- consumeMessages(remaining);
-
- }
-
- public void testJMSPropertySelector() throws Exception {
- int remaining = 5;
- Message message = null;
- consumer = createConsumer("JMSType = 'a' and stringProperty = 'a'");
- sendMessages();
- while (true) {
- message = consumer.receive(1000);
- if (message == null) {
- break;
- }
- String text = ((TextMessage)message).getText();
- if (!text.equals("1") && !text.equals("2") && !text.equals("3")) {
- fail("unexpected message: " + text);
- }
- remaining--;
- }
- assertEquals(remaining, 2);
- consumer.close();
- consumeMessages(remaining);
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/JmsTopicSendReceiveSubscriberTest.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsTopicSendReceiveSubscriberTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/JmsTopicSendReceiveSubscriberTest.java
deleted file mode 100644
index e75d6fe..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsTopicSendReceiveSubscriberTest.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq;
-
-import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.Topic;
-import javax.jms.TopicSession;
-
-public class JmsTopicSendReceiveSubscriberTest extends JmsTopicSendReceiveTest {
- protected MessageConsumer createConsumer() throws JMSException {
- if (durable) {
- return super.createConsumer();
- } else {
- TopicSession topicSession = (TopicSession)session;
- return topicSession.createSubscriber((Topic)consumerDestination, null, false);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/JmsTopicSendReceiveTest.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsTopicSendReceiveTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/JmsTopicSendReceiveTest.java
deleted file mode 100644
index dc590d8..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsTopicSendReceiveTest.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq;
-
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.Session;
-import javax.jms.Topic;
-import javax.jms.TopicConnection;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class JmsTopicSendReceiveTest extends JmsSendReceiveTestSupport {
- private static final Logger LOG = LoggerFactory.getLogger(JmsTopicSendReceiveTest.class);
-
- protected Connection connection;
-
- protected void setUp() throws Exception {
- super.setUp();
-
- connectionFactory = createConnectionFactory();
- connection = createConnection(!durable);
- if (durable) {
- connection.setClientID(getClass().getName());
- }
-
- LOG.info("Created connection: " + connection);
-
- session = ((TopicConnection) connection).createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
-
- LOG.info("Created session: " + session);
- producer = session.createProducer(null);
- producer.setDeliveryMode(deliveryMode);
-
- LOG.info("Created producer: " + producer + " delivery mode = "
- + (deliveryMode == DeliveryMode.PERSISTENT ? "PERSISTENT" : "NON_PERSISTENT"));
-
- if (topic) {
- consumerDestination = session.createTopic(getConsumerSubject());
- producerDestination = session.createTopic(getProducerSubject());
- } else {
- consumerDestination = session.createTopic(getConsumerSubject());
- producerDestination = session.createTopic(getProducerSubject());
- }
-
- LOG.info("Created consumer destination: " + consumerDestination
- + " of type: " + consumerDestination.getClass());
- LOG.info("Created producer destination: " + producerDestination
- + " of type: " + producerDestination.getClass());
- consumer = createConsumer();
- consumer.setMessageListener(this);
- connection.start();
-
- // log.info("Created connection: " + connection);
- }
-
- protected MessageConsumer createConsumer() throws JMSException {
- if (durable) {
- LOG.info("Creating durable consumer");
- return session.createDurableSubscriber((Topic)consumerDestination, getName());
- }
- return session.createConsumer(consumerDestination);
- }
-
- protected void tearDown() throws Exception {
- LOG.info("Dumping stats...");
- // connectionFactory.getStats().reset();
-
- LOG.info("Closing down connection");
-
- /** TODO we should be able to shut down properly */
- session.close();
- connection.close();
-
- super.tearDown();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/JmsTopicSendReceiveWithTwoConnectionsTest.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsTopicSendReceiveWithTwoConnectionsTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/JmsTopicSendReceiveWithTwoConnectionsTest.java
deleted file mode 100644
index 36f4beb..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsTopicSendReceiveWithTwoConnectionsTest.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq;
-
-import javax.jms.Topic;
-import javax.jms.Connection;
-import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.Session;
-
-/**
- * @version
- */
-public class JmsTopicSendReceiveWithTwoConnectionsTest extends JmsSendReceiveTestSupport {
-
- private static final org.apache.commons.logging.Log LOG = org.apache.commons.logging.LogFactory
- .getLog(JmsTopicSendReceiveWithTwoConnectionsTest.class);
-
- protected Connection sendConnection;
- protected Connection receiveConnection;
- protected Session receiveSession;
-
- protected void setUp() throws Exception {
- super.setUp();
-
- connectionFactory = createConnectionFactory();
-
- sendConnection = createSendConnection();
- sendConnection.start();
-
- receiveConnection = createReceiveConnection();
- receiveConnection.start();
-
- LOG.info("Created sendConnection: " + sendConnection);
- LOG.info("Created receiveConnection: " + receiveConnection);
-
- session = createSendSession(sendConnection);
- receiveSession = createReceiveSession(receiveConnection);
-
- LOG.info("Created sendSession: " + session);
- LOG.info("Created receiveSession: " + receiveSession);
-
- producer = session.createProducer(null);
- producer.setDeliveryMode(deliveryMode);
-
- LOG.info("Created producer: " + producer + " delivery mode = "
- + (deliveryMode == DeliveryMode.PERSISTENT ? "PERSISTENT" : "NON_PERSISTENT"));
-
- if (topic) {
- consumerDestination = session.createTopic(getConsumerSubject());
- producerDestination = session.createTopic(getProducerSubject());
- } else {
- consumerDestination = session.createTopic(getConsumerSubject());
- producerDestination = session.createTopic(getProducerSubject());
- }
-
- LOG.info("Created consumer destination: " + consumerDestination + " of type: "
- + consumerDestination.getClass());
- LOG.info("Created producer destination: " + producerDestination + " of type: "
- + producerDestination.getClass());
-
- consumer = createConsumer(receiveSession, consumerDestination);
- consumer.setMessageListener(this);
-
- LOG.info("Started connections");
- }
-
- protected Session createReceiveSession(Connection receiveConnection) throws Exception {
- return receiveConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- }
-
- protected Session createSendSession(Connection sendConnection) throws Exception {
- return sendConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- }
-
- protected Connection createReceiveConnection() throws Exception {
- return createConnection(false);
- }
-
- protected Connection createSendConnection() throws Exception {
- return createConnection(false);
- }
-
- protected MessageConsumer createConsumer(Session session, Destination dest) throws JMSException {
- return session.createConsumer(dest);
- }
-
- protected HedwigConnectionFactoryImpl createConnectionFactory() throws Exception {
- return new HedwigConnectionFactoryImpl();
- }
-
- protected void tearDown() throws Exception {
- session.close();
- receiveSession.close();
- sendConnection.close();
- receiveConnection.close();
- super.tearDown();
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/JmsTopicSendReceiveWithTwoConnectionsWithJMXTest.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsTopicSendReceiveWithTwoConnectionsWithJMXTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/JmsTopicSendReceiveWithTwoConnectionsWithJMXTest.java
deleted file mode 100644
index f122176..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsTopicSendReceiveWithTwoConnectionsWithJMXTest.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq;
-
-import javax.jms.Topic;
-import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl;
-
-public class JmsTopicSendReceiveWithTwoConnectionsWithJMXTest extends
- JmsTopicSendReceiveWithTwoConnectionsTest {
-
- protected HedwigConnectionFactoryImpl createConnectionFactory() throws Exception {
- return new HedwigConnectionFactoryImpl();
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/JmsTopicSendSameMessageTest.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsTopicSendSameMessageTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/JmsTopicSendSameMessageTest.java
deleted file mode 100644
index e7fd5bf..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsTopicSendSameMessageTest.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq;
-
-import javax.jms.Topic;
-import javax.jms.TextMessage;
-
-public class JmsTopicSendSameMessageTest extends JmsTopicSendReceiveWithTwoConnectionsTest {
-
- private static final org.apache.commons.logging.Log LOG = org.apache.commons.logging.LogFactory
- .getLog(JmsTopicSendSameMessageTest.class);
-
- public void testSendReceive() throws Exception {
- messages.clear();
-
- TextMessage message = session.createTextMessage();
-
- for (int i = 0; i < data.length; i++) {
- message.setText(data[i]);
- message.setStringProperty("stringProperty", data[i]);
- message.setIntProperty("intProperty", i);
-
- if (verbose) {
- LOG.info("About to send a message: " + message + " with text: " + data[i]);
- }
-
- producer.send(producerDestination, message);
- }
-
- assertMessagesAreReceived();
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/JmsTopicTransactionTest.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsTopicTransactionTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/JmsTopicTransactionTest.java
deleted file mode 100644
index 81060f6..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsTopicTransactionTest.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq;
-
-import javax.jms.Topic;
-import org.apache.activemq.test.JmsResourceProvider;
-
-
-public class JmsTopicTransactionTest extends JmsTransactionTestSupport {
-
- /**
- * @see org.apache.activemq.JmsTransactionTestSupport#getJmsResourceProvider()
- */
- protected JmsResourceProvider getJmsResourceProvider() {
- JmsResourceProvider p = new JmsResourceProvider();
- p.setTopic(true);
- p.setDurableName("testsub");
- p.setClientID("testclient");
- return p;
- }
-
-}