You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2016/03/16 04:44:43 UTC
[33/49] bookkeeper git commit: BOOKKEEPER-769: Remove the Hedwig Code
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/TopicRedeliverTest.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/TopicRedeliverTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/TopicRedeliverTest.java
deleted file mode 100644
index 2c1e24c..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/TopicRedeliverTest.java
+++ /dev/null
@@ -1,300 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.usecases;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-
-import org.apache.activemq.test.TestSupport;
-import org.apache.activemq.util.IdGenerator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TopicRedeliverTest extends TestSupport {
-
- private static final Logger LOG = LoggerFactory.getLogger(TopicRedeliverTest.class);
- private static final int RECEIVE_TIMEOUT = 10000;
-
- protected int deliveryMode = DeliveryMode.PERSISTENT;
- private IdGenerator idGen = new IdGenerator();
-
- public TopicRedeliverTest() {
- }
-
- public TopicRedeliverTest(String n) {
- super(n);
- }
-
- protected void setUp() throws Exception {
- super.setUp();
- topic = true;
- }
-
- /**
- * test messages are acknowledged and recovered properly
- * @throws Exception
- */
- public void testClientAcknowledge() throws Exception {
- Destination destination = createDestination(getClass().getName());
- Connection connection = createConnection();
- final String clientId = idGen.generateId();
- connection.setClientID(clientId);
- connection.start();
- Session consumerSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- MessageConsumer consumer = consumerSession.createDurableSubscriber((Topic) destination, "subscriber-id1");
- Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = producerSession.createProducer(destination);
- producer.setDeliveryMode(deliveryMode);
-
- // send some messages
-
- TextMessage sent1 = producerSession.createTextMessage();
- sent1.setText("msg1");
- producer.send(sent1);
-
- TextMessage sent2 = producerSession.createTextMessage();
- sent1.setText("msg2");
- producer.send(sent2);
-
- TextMessage sent3 = producerSession.createTextMessage();
- sent1.setText("msg3");
- producer.send(sent3);
-
- consumer.receive(RECEIVE_TIMEOUT);
- Message rec2 = consumer.receive(RECEIVE_TIMEOUT);
- consumer.receive(RECEIVE_TIMEOUT);
-
- // ack rec2 - in hedwig, this implicitly ack's rec1 too ...
- rec2.acknowledge();
-
- TextMessage sent4 = producerSession.createTextMessage();
- sent4.setText("msg4");
- producer.send(sent4);
-
- Message rec4 = consumer.receive(RECEIVE_TIMEOUT);
- // assertTrue(rec4.equals(sent4));
- assert rec4 instanceof TextMessage;
- assertTrue(((TextMessage) rec4).getText().equals(sent4.getText()));
- // We DO NOT support session recovery - to unblock this test,
- // I am stopp'ing and start'ing connection : not the same, but ...
- // consumerSession.recover();
- connection.close();
- connection = createConnection();
- // same client id !
- connection.setClientID(clientId);
- consumerSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- consumer = consumerSession.createDurableSubscriber((Topic) destination, "subscriber-id1");
- connection.start();
-
- consumer.receive(RECEIVE_TIMEOUT);
- rec4 = consumer.receive(RECEIVE_TIMEOUT);
- // assertTrue(rec4.equals(sent4));
- assert rec4 instanceof TextMessage : "rec4 == " + rec4;
- assertTrue(((TextMessage) rec4).getText().equals(sent4.getText()));
- // assertTrue(rec4.getJMSRedelivered());
- rec4.acknowledge();
- connection.close();
-
- }
-
- /**
- * Test redelivered flag is set on rollbacked transactions
- *
- * @throws Exception
- */
- public void testRedilveredFlagSetOnRollback() throws Exception {
- Destination destination = createDestination(getClass().getName());
- Connection connection = createConnection();
- connection.setClientID(idGen.generateId());
- connection.start();
- Session consumerSession = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
- MessageConsumer consumer = null;
- if (topic) {
- consumer = consumerSession.createDurableSubscriber((Topic)destination, "TESTRED");
- } else {
- consumer = consumerSession.createConsumer(destination);
- }
- Session producerSession = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = producerSession.createProducer(destination);
- producer.setDeliveryMode(deliveryMode);
-
- TextMessage sentMsg = producerSession.createTextMessage();
- sentMsg.setText("msg1");
- producer.send(sentMsg);
- producerSession.commit();
-
- Message recMsg = consumer.receive(RECEIVE_TIMEOUT);
- assertFalse(recMsg.getJMSRedelivered());
- recMsg = consumer.receive(RECEIVE_TIMEOUT);
- consumerSession.rollback();
- recMsg = consumer.receive(RECEIVE_TIMEOUT);
- assertTrue(recMsg.getJMSRedelivered());
- consumerSession.commit();
- // assertTrue(recMsg.equals(sentMsg));
- assert recMsg instanceof TextMessage;
- assertTrue(((TextMessage) recMsg).getText().equals(sentMsg.getText()));
- assertTrue(recMsg.getJMSRedelivered());
- connection.close();
- }
-
- public void testNoExceptionOnRedeliveryAckWithSimpleTopicConsumer() throws Exception {
- Destination destination = createDestination(getClass().getName());
- Connection connection = createConnection();
- final AtomicBoolean gotException = new AtomicBoolean();
- connection.setExceptionListener(new ExceptionListener() {
- public void onException(JMSException exception) {
- LOG.error("unexpected ex:" + exception);
- gotException.set(true);
- }
- });
- connection.setClientID(idGen.generateId());
- connection.start();
- Session consumerSession = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
- MessageConsumer consumer = null;
- if (topic) {
- consumer = consumerSession.createConsumer((Topic)destination);
- } else {
- consumer = consumerSession.createConsumer(destination);
- }
- Session producerSession = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = producerSession.createProducer(destination);
- producer.setDeliveryMode(deliveryMode);
-
- TextMessage sentMsg = producerSession.createTextMessage();
- sentMsg.setText("msg1");
- producer.send(sentMsg);
- producerSession.commit();
-
- Message recMsg = consumer.receive(RECEIVE_TIMEOUT);
- assertFalse(recMsg.getJMSRedelivered());
- recMsg = consumer.receive(RECEIVE_TIMEOUT);
- consumerSession.rollback();
- recMsg = consumer.receive(RECEIVE_TIMEOUT);
- assertTrue(recMsg.getJMSRedelivered());
- consumerSession.rollback();
- recMsg = consumer.receive(RECEIVE_TIMEOUT);
- assertTrue(recMsg.getJMSRedelivered());
- consumerSession.commit();
- // assertTrue(recMsg.equals(sentMsg));
- assert recMsg instanceof TextMessage;
- assertTrue(((TextMessage) recMsg).getText().equals(sentMsg.getText()));
- assertTrue(recMsg.getJMSRedelivered());
- connection.close();
-
- assertFalse("no exception", gotException.get());
- }
-
- /**
- * Check a session is rollbacked on a Session close();
- *
- * @throws Exception
- */
-
- public void xtestTransactionRollbackOnSessionClose() throws Exception {
- Destination destination = createDestination(getClass().getName());
- Connection connection = createConnection();
- connection.setClientID(idGen.generateId());
- connection.start();
- Session consumerSession = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
- MessageConsumer consumer = null;
- if (topic) {
- consumer = consumerSession.createDurableSubscriber((Topic)destination, "TESTRED");
- } else {
- consumer = consumerSession.createConsumer(destination);
- }
- Session producerSession = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = producerSession.createProducer(destination);
- producer.setDeliveryMode(deliveryMode);
-
- TextMessage sentMsg = producerSession.createTextMessage();
- sentMsg.setText("msg1");
- producer.send(sentMsg);
-
- producerSession.commit();
-
- Message recMsg = consumer.receive(RECEIVE_TIMEOUT);
- assertFalse(recMsg.getJMSRedelivered());
- consumerSession.close();
- consumerSession = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
- consumer = consumerSession.createConsumer(destination);
-
- recMsg = consumer.receive(RECEIVE_TIMEOUT);
- consumerSession.commit();
- // assertTrue(recMsg.equals(sentMsg));
- assert recMsg instanceof TextMessage;
- assertTrue(((TextMessage) recMsg).getText().equals(sentMsg.getText()));
- connection.close();
- }
-
- /**
- * check messages are actuallly sent on a tx rollback
- *
- * @throws Exception
- */
-
- public void testTransactionRollbackOnSend() throws Exception {
- Destination destination = createDestination(getClass().getName());
- Connection connection = createConnection();
- connection.setClientID(idGen.generateId());
- connection.start();
- Session consumerSession = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
- MessageConsumer consumer = consumerSession.createConsumer(destination);
- Session producerSession = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = producerSession.createProducer(destination);
- producer.setDeliveryMode(deliveryMode);
-
- TextMessage sentMsg = producerSession.createTextMessage();
- sentMsg.setText("msg1");
- producer.send(sentMsg);
- producerSession.commit();
-
- Message recMsg = consumer.receive(RECEIVE_TIMEOUT);
- consumerSession.commit();
- // assertTrue(recMsg.equals(sentMsg));
- assert recMsg instanceof TextMessage;
- assertTrue(((TextMessage) recMsg).getText().equals(sentMsg.getText()));
-
- sentMsg = producerSession.createTextMessage();
- sentMsg.setText("msg2");
- producer.send(sentMsg);
- producerSession.rollback();
-
- sentMsg = producerSession.createTextMessage();
- sentMsg.setText("msg3");
- producer.send(sentMsg);
- producerSession.commit();
-
- recMsg = consumer.receive(RECEIVE_TIMEOUT);
- // assertTrue(recMsg.equals(sentMsg));
- assert recMsg instanceof TextMessage;
- assertTrue(((TextMessage) recMsg).getText().equals(sentMsg.getText()));
- consumerSession.commit();
-
- connection.close();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/TransactionRollbackOrderTest.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/TransactionRollbackOrderTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/TransactionRollbackOrderTest.java
deleted file mode 100644
index 375004b..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/TransactionRollbackOrderTest.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.usecases;
-
-import javax.jms.Topic;
-
-import org.apache.hedwig.JmsTestBase;
-import org.apache.hedwig.jms.SessionImpl;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import junit.framework.TestCase;
-import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Test case for AMQ-268
- */
-public final class TransactionRollbackOrderTest extends JmsTestBase {
- private static final Logger LOG = LoggerFactory.getLogger(TransactionRollbackOrderTest.class);
-
- private volatile String receivedText;
-
- private Session producerSession;
- private Session consumerSession;
- private Destination queue;
-
- private MessageProducer producer;
- private MessageConsumer consumer;
- private Connection connection;
- private CountDownLatch latch = new CountDownLatch(1);
- private int numMessages = 5;
- private List<String> msgSent = new ArrayList<String>();
- private List<String> msgCommitted = new ArrayList<String>();
- private List<String> msgRolledBack = new ArrayList<String>();
- private List<String> msgRedelivered = new ArrayList<String>();
-
- public void testTransaction() throws Exception {
-
- HedwigConnectionFactoryImpl factory = new HedwigConnectionFactoryImpl();
-
- connection = factory.createConnection();
- queue = SessionImpl.asTopic(getClass().getName() + "." + getName());
-
- producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- consumerSession = connection.createSession(true, 0);
-
- producer = producerSession.createProducer(queue);
-
- consumer = consumerSession.createConsumer(queue);
- consumer.setMessageListener(new MessageListener() {
-
- int msgCount;
- int msgCommittedCount;
-
- public void onMessage(Message m) {
- try {
- msgCount++;
- TextMessage tm = (TextMessage)m;
- receivedText = tm.getText();
-
- if (tm.getJMSRedelivered()) {
- msgRedelivered.add(receivedText);
- }
-
- LOG.info("consumer received message: " + receivedText
- + (tm.getJMSRedelivered() ? " ** Redelivered **" : ""));
- if (msgCount == 3) {
- msgRolledBack.add(receivedText);
- consumerSession.rollback();
- LOG.info("[msg: " + receivedText + "] ** rolled back **");
- } else {
- msgCommittedCount++;
- msgCommitted.add(receivedText);
- consumerSession.commit();
- LOG.info("[msg: " + receivedText + "] committed transaction ");
- }
- if (msgCommittedCount == numMessages) {
- latch.countDown();
- }
- } catch (JMSException e) {
- try {
- consumerSession.rollback();
- LOG.info("rolled back transaction");
- } catch (JMSException e1) {
- LOG.info(e1.toString());
- e1.printStackTrace();
- }
- LOG.info(e.toString());
- e.printStackTrace();
- }
- }
- });
- connection.start();
-
- TextMessage tm = null;
- try {
- for (int i = 1; i <= numMessages; i++) {
- tm = producerSession.createTextMessage();
- tm.setText("Hello " + i);
- msgSent.add(tm.getText());
- producer.send(tm);
- LOG.info("producer sent message: " + tm.getText());
- }
- } catch (JMSException e) {
- e.printStackTrace();
- }
-
- LOG.info("Waiting for latch");
- latch.await();
-
- assertEquals(1, msgRolledBack.size());
- assertEquals(1, msgRedelivered.size());
-
- LOG.info("msg RolledBack = " + msgRolledBack.get(0));
- LOG.info("msg Redelivered = " + msgRedelivered.get(0));
-
- assertEquals(msgRolledBack.get(0), msgRedelivered.get(0));
-
- assertEquals(numMessages, msgSent.size());
- assertEquals(numMessages, msgCommitted.size());
-
- assertEquals(msgSent, msgCommitted);
-
- }
-
- protected void tearDown() throws Exception {
- if (connection != null) {
- LOG.info("Closing the connection");
- connection.close();
- }
- super.tearDown();
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/TransactionTest.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/TransactionTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/TransactionTest.java
deleted file mode 100644
index 6b9a2f9..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/TransactionTest.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.usecases;
-
-import javax.jms.Topic;
-
-import org.apache.hedwig.JmsTestBase;
-import org.apache.hedwig.jms.SessionImpl;
-import java.util.Date;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import junit.framework.TestCase;
-import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public final class TransactionTest extends JmsTestBase {
-
- private static final Logger LOG = LoggerFactory.getLogger(TransactionTest.class);
-
- private volatile String receivedText;
-
- private Session producerSession;
- private Session consumerSession;
- private Destination queue;
-
- private MessageProducer producer;
- private MessageConsumer consumer;
- private Connection connection;
- private CountDownLatch latch = new CountDownLatch(1);
-
- public void testTransaction() throws Exception {
-
- HedwigConnectionFactoryImpl factory = new HedwigConnectionFactoryImpl();
- connection = factory.createConnection();
- queue = SessionImpl.asTopic(getClass().getName() + "." + getName());
-
- producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- consumerSession = connection.createSession(true, 0);
-
- producer = producerSession.createProducer(queue);
-
- consumer = consumerSession.createConsumer(queue);
- consumer.setMessageListener(new MessageListener() {
-
- public void onMessage(Message m) {
- try {
- TextMessage tm = (TextMessage)m;
- receivedText = tm.getText();
- latch.countDown();
-
- LOG.info("consumer received message :" + receivedText);
- consumerSession.commit();
- LOG.info("committed transaction");
- } catch (JMSException e) {
- try {
- consumerSession.rollback();
- LOG.info("rolled back transaction");
- } catch (JMSException e1) {
- LOG.info(e1.toString());
- e1.printStackTrace();
- }
- LOG.info(e.toString());
- e.printStackTrace();
- }
- }
- });
-
- connection.start();
-
- TextMessage tm = null;
- try {
- tm = producerSession.createTextMessage();
- tm.setText("Hello, " + new Date());
- producer.send(tm);
- LOG.info("producer sent message :" + tm.getText());
- } catch (JMSException e) {
- e.printStackTrace();
- }
-
- LOG.info("Waiting for latch");
- latch.await(2,TimeUnit.SECONDS);
- assertNotNull(receivedText);
- LOG.info("test completed, destination=" + receivedText);
- }
-
- protected void tearDown() throws Exception {
- if (connection != null) {
- connection.close();
- }
- super.tearDown();
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/util/ConsumerThread.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/util/ConsumerThread.java b/hedwig-client-jms/src/test/java/org/apache/activemq/util/ConsumerThread.java
deleted file mode 100644
index 1f9ce8e..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/util/ConsumerThread.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.util;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-
-public class ConsumerThread extends Thread {
-
- private static final Logger LOG = LoggerFactory.getLogger(ConsumerThread.class);
-
- int messageCount = 1000;
- int received = 0;
- Destination dest;
- Session sess;
- boolean breakOnNull = true;
-
- public ConsumerThread(Session sess, Destination dest) {
- this.dest = dest;
- this.sess = sess;
- }
-
- @Override
- public void run() {
- MessageConsumer consumer = null;
-
- try {
- consumer = sess.createConsumer(dest);
- while (received < messageCount) {
- Message msg = consumer.receive(3000);
- if (msg != null) {
- LOG.info("Received " + ((TextMessage)msg).getText());
- received++;
- } else {
- if (breakOnNull) {
- break;
- }
- }
- }
- } catch (JMSException e) {
- e.printStackTrace();
- } finally {
- if (consumer != null) {
- try {
- consumer.close();
- } catch (JMSException e) {
- e.printStackTrace();
- }
- }
- }
- }
-
- public int getReceived() {
- return received;
- }
-
- public void setMessageCount(int messageCount) {
- this.messageCount = messageCount;
- }
-
- public void setBreakOnNull(boolean breakOnNull) {
- this.breakOnNull = breakOnNull;
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/util/DefaultTestAppender.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/util/DefaultTestAppender.java b/hedwig-client-jms/src/test/java/org/apache/activemq/util/DefaultTestAppender.java
deleted file mode 100644
index c4f55d4..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/util/DefaultTestAppender.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.util;
-
-import org.apache.log4j.Appender;
-import org.apache.log4j.Layout;
-import org.apache.log4j.spi.ErrorHandler;
-import org.apache.log4j.spi.Filter;
-
-public abstract class DefaultTestAppender implements Appender {
- String name = this.getClass().getSimpleName();
-
- @Override
- public void addFilter(Filter newFilter) {
-
- }
-
- @Override
- public Filter getFilter() {
- return null;
- }
-
- @Override
- public void clearFilters() {
-
- }
-
- @Override
- public void close() {
-
- }
-
- @Override
- public String getName() {
- return name;
- }
-
- @Override
- public void setErrorHandler(ErrorHandler errorHandler) {
-
- }
-
- @Override
- public ErrorHandler getErrorHandler() {
- return null;
- }
-
- @Override
- public void setLayout(Layout layout) {
-
- }
-
- @Override
- public Layout getLayout() {
- return null;
- }
-
- @Override
- public void setName(String name) {
- this.name = name;
- }
-
- @Override
- public boolean requiresLayout() {
- return false;
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/util/IdGenerator.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/util/IdGenerator.java b/hedwig-client-jms/src/test/java/org/apache/activemq/util/IdGenerator.java
deleted file mode 100644
index 197e3fd..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/util/IdGenerator.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.util;
-
-import org.apache.hedwig.jms.SessionImpl;
-
-/**
- */
-public class IdGenerator {
- public String generateId() {
- return SessionImpl.generateRandomString();
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/util/MessageIdList.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/util/MessageIdList.java b/hedwig-client-jms/src/test/java/org/apache/activemq/util/MessageIdList.java
deleted file mode 100644
index a9584a8..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/util/MessageIdList.java
+++ /dev/null
@@ -1,267 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.util;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageListener;
-
-import junit.framework.Assert;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A simple container of messages for performing testing and rendezvous style
- * code. You can use this class a {@link MessageListener} and then make
- * assertions about how many messages it has received allowing a certain maximum
- * amount of time to ensure that the test does not hang forever. Also you can
- * chain these instances together with the {@link #setParent(MessageListener)}
- * method so that you can aggregate the total number of messages consumed across
- * a number of consumers.
- */
-public class MessageIdList extends Assert implements MessageListener {
-
- private static final Logger LOG = LoggerFactory.getLogger(MessageIdList.class);
-
- private List<String> messageIds = new ArrayList<String>();
- private Object semaphore;
- private boolean verbose;
- private MessageListener parent;
- private long maximumDuration = 15000L;
- private long processingDelay;
-
- private CountDownLatch countDownLatch;
-
- public MessageIdList() {
- this(new Object());
- }
-
- public MessageIdList(Object semaphore) {
- this.semaphore = semaphore;
- }
-
- public boolean equals(Object that) {
- if (that instanceof MessageIdList) {
- MessageIdList thatList = (MessageIdList)that;
- return getMessageIds().equals(thatList.getMessageIds());
- }
- return false;
- }
-
- public int hashCode() {
- synchronized (semaphore) {
- return messageIds.hashCode() + 1;
- }
- }
-
- public String toString() {
- synchronized (semaphore) {
- return messageIds.toString();
- }
- }
-
- /**
- * @return all the messages on the list so far, clearing the buffer
- */
- public List<String> flushMessages() {
- synchronized (semaphore) {
- List<String> answer = new ArrayList<String>(messageIds);
- messageIds.clear();
- return answer;
- }
- }
-
- public synchronized List<String> getMessageIds() {
- synchronized (semaphore) {
- return new ArrayList<String>(messageIds);
- }
- }
-
- public void onMessage(Message message) {
- String id = null;
- try {
- id = message.getJMSMessageID();
- synchronized (semaphore) {
- messageIds.add(id);
- semaphore.notifyAll();
- }
- if (countDownLatch != null) {
- countDownLatch.countDown();
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Received message: " + message);
- }
- } catch (JMSException e) {
- e.printStackTrace();
- }
- if (parent != null) {
- parent.onMessage(message);
- }
- if (processingDelay > 0) {
- try {
- Thread.sleep(processingDelay);
- } catch (InterruptedException e) {
- }
- }
- }
-
- public int getMessageCount() {
- synchronized (semaphore) {
- return messageIds.size();
- }
- }
-
- public void waitForMessagesToArrive(int messageCount) {
- LOG.info("Waiting for " + messageCount + " message(s) to arrive");
-
- long start = System.currentTimeMillis();
-
- for (int i = 0; i < messageCount; i++) {
- try {
- if (hasReceivedMessages(messageCount)) {
- break;
- }
- long duration = System.currentTimeMillis() - start;
- if (duration >= maximumDuration) {
- break;
- }
- synchronized (semaphore) {
- semaphore.wait(maximumDuration - duration);
- }
- } catch (InterruptedException e) {
- LOG.info("Caught: " + e);
- }
- }
- long end = System.currentTimeMillis() - start;
-
- LOG.info("End of wait for " + end + " millis and received: " + getMessageCount() + " messages");
- }
-
- /**
- * Performs a testing assertion that the correct number of messages have
- * been received without waiting
- *
- * @param messageCount
- */
- public void assertMessagesReceivedNoWait(int messageCount) {
- assertEquals("expected number of messages when received", messageCount, getMessageCount());
- }
-
- /**
- * Performs a testing assertion that the correct number of messages have
- * been received waiting for the messages to arrive up to a fixed amount of
- * time.
- * @param messageCount
- */
- public void assertMessagesReceived(int messageCount) {
- waitForMessagesToArrive(messageCount);
-
- assertMessagesReceivedNoWait(messageCount);
- }
-
- /**
- * Asserts that there are at least the given number of messages received
- * without waiting.
- */
- public void assertAtLeastMessagesReceived(int messageCount) {
- int actual = getMessageCount();
- assertTrue("at least: " + messageCount + " messages received. Actual: " + actual, actual >= messageCount);
- }
-
- /**
- * Asserts that there are at most the number of messages received without
- * waiting
- *
- * @param messageCount
- */
- public void assertAtMostMessagesReceived(int messageCount) {
- int actual = getMessageCount();
- assertTrue("at most: " + messageCount + " messages received. Actual: " + actual, actual <= messageCount);
- }
-
- public boolean hasReceivedMessage() {
- return getMessageCount() == 0;
- }
-
- public boolean hasReceivedMessages(int messageCount) {
- return getMessageCount() >= messageCount;
- }
-
- public boolean isVerbose() {
- return verbose;
- }
-
- public void setVerbose(boolean verbose) {
- this.verbose = verbose;
- }
-
- public MessageListener getParent() {
- return parent;
- }
-
- /**
- * Allows a parent listener to be specified such as to aggregate messages
- * consumed across consumers
- */
- public void setParent(MessageListener parent) {
- this.parent = parent;
- }
-
- /**
- * @return the maximumDuration
- */
- public long getMaximumDuration() {
- return this.maximumDuration;
- }
-
- /**
- * @param maximumDuration the maximumDuration to set
- */
- public void setMaximumDuration(long maximumDuration) {
- this.maximumDuration = maximumDuration;
- }
-
- public void setCountDownLatch(CountDownLatch countDownLatch) {
- this.countDownLatch = countDownLatch;
- }
-
- /**
- * Gets the amount of time the message listener will spend sleeping to
- * simulate a processing delay.
- *
- * @return
- */
- public long getProcessingDelay() {
- return processingDelay;
- }
-
- /**
- * Sets the amount of time the message listener will spend sleeping to
- * simulate a processing delay.
- *
- * @param processingDelay
- */
- public void setProcessingDelay(long processingDelay) {
- this.processingDelay = processingDelay;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/util/ProducerThread.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/util/ProducerThread.java b/hedwig-client-jms/src/test/java/org/apache/activemq/util/ProducerThread.java
deleted file mode 100644
index 12dfe3a..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/util/ProducerThread.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.util;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-
-public class ProducerThread extends Thread {
-
- private static final Logger LOG = LoggerFactory.getLogger(ProducerThread.class);
-
- int messageCount = 1000;
- Destination dest;
- protected Session sess;
- int sleep = 0;
- int sentCount = 0;
-
- public ProducerThread(Session sess, Destination dest) {
- this.dest = dest;
- this.sess = sess;
- }
-
- public void run() {
- MessageProducer producer = null;
- try {
- producer = sess.createProducer(dest);
- for (sentCount = 0; sentCount < messageCount; sentCount++) {
- producer.send(createMessage(sentCount));
- LOG.info("Sent 'test message: " + sentCount + "'");
- if (sleep > 0) {
- Thread.sleep(sleep);
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- if (producer != null) {
- try {
- producer.close();
- } catch (JMSException e) {
- e.printStackTrace();
- }
- }
- }
- }
-
- protected Message createMessage(int i) throws Exception {
- return sess.createTextMessage("test message: " + i);
- }
-
- public void setMessageCount(int messageCount) {
- this.messageCount = messageCount;
- }
-
- public void setSleep(int sleep) {
- this.sleep = sleep;
- }
-
- public int getMessageCount() {
- return messageCount;
- }
-
- public int getSentCount() {
- return sentCount;
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/util/SimplePojo.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/util/SimplePojo.java b/hedwig-client-jms/src/test/java/org/apache/activemq/util/SimplePojo.java
deleted file mode 100644
index 4953762..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/util/SimplePojo.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.util;
-
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
-
-import javax.jms.Destination;
-
-public class SimplePojo {
-
- String name;
- int age;
- boolean enabled;
- URI uri;
- List<Destination> favorites = new ArrayList<Destination>();
- List<Destination> nonFavorites = new ArrayList<Destination>();
- List<Destination> others = new ArrayList<Destination>();
-
- public int getAge() {
- return age;
- }
- public void setAge(int age) {
- this.age = age;
- }
- public boolean isEnabled() {
- return enabled;
- }
- public void setEnabled(boolean enabled) {
- this.enabled = enabled;
- }
- public String getName() {
- return name;
- }
- public void setName(String name) {
- this.name = name;
- }
- public URI getUri() {
- return uri;
- }
- public void setUri(URI uri) {
- this.uri = uri;
- }
- public List<Destination> getFavorites() {
- return favorites;
- }
- public void setFavorites(List<Destination> favorites) {
- this.favorites = favorites;
- }
- public List<Destination> getNonFavorites() {
- return nonFavorites;
- }
- public void setNonFavorites(List<Destination> nonFavorites) {
- this.nonFavorites = nonFavorites;
- }
- public List<Destination> getOthers() {
- return others;
- }
- public void setOthers(List<Destination> others) {
- this.others = others;
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/util/Wait.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/util/Wait.java b/hedwig-client-jms/src/test/java/org/apache/activemq/util/Wait.java
deleted file mode 100644
index 1a89fe5..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/util/Wait.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.util;
-
-
-public class Wait {
- public static final long MAX_WAIT_MILLIS = 30*1000;
- public interface Condition {
- boolean isSatisified() throws Exception;
- }
-
- public static boolean waitFor(Condition condition) throws Exception {
- return waitFor(condition, MAX_WAIT_MILLIS);
- }
-
- public static boolean waitFor(final Condition condition, final long duration) throws Exception {
- final long expiry = System.currentTimeMillis() + duration;
- boolean conditionSatisified = condition.isSatisified();
- while (!conditionSatisified && System.currentTimeMillis() < expiry) {
- Thread.sleep(1000);
- conditionSatisified = condition.isSatisified();
- }
- return conditionSatisified;
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/hedwig/JmsTestBase.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/hedwig/JmsTestBase.java b/hedwig-client-jms/src/test/java/org/apache/hedwig/JmsTestBase.java
deleted file mode 100644
index ecf8830..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/hedwig/JmsTestBase.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hedwig;
-
-import org.apache.hedwig.server.HedwigHubTestBase;
-import org.apache.hedwig.server.HedwigHubTestBase;
-import java.io.*;
-
-/**
- * Does any jms specific initializations
- */
-public class JmsTestBase extends HedwigHubTestBase {
- protected String generatedConfig;
-
- public JmsTestBase(){
- super(1);
- }
-
- public JmsTestBase(String name){
- super(name, 1);
- }
-
- private void init() {
- // single bookie
- this.numBookies = 1;
- // is this required ?
- // disable ssl
- this.sslEnabled = false;
- // Not sure why it works only in standalone mode - something for hedwig folks to debug ?
- this.standalone = true;
- // this.standalone = false;
- // required ?
- // this.readDelay = 1L;
- }
-
- @Override
- protected void setUp() throws Exception {
- init();
- super.setUp();
- // Now generate HEDWIG_CLIENT_CONFIG_FILE and set the right host/port to it.
- this.generatedConfig = generateConfig(new HubClientConfiguration()
- .getDefaultServerHedwigSocketAddress().getPort());
- System.setProperty(org.apache.hedwig.jms.ConnectionImpl.HEDWIG_CLIENT_CONFIG_FILE, generatedConfig);
- }
-
- @Override
- protected void tearDown() throws Exception {
- super.tearDown();
- // best case
- if (null != generatedConfig) {
- (new File(generatedConfig)).delete();
- generatedConfig = null;
- }
- }
-
- // Override only standalone, sslEnabled, server port - the config we rely on.
- protected String generateConfig(int serverPort) throws IOException {
- File configFile = File.createTempFile("jms_", ".conf");
- BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(configFile), "utf-8"));
- writer.write("# The default Hedwig server host to contact\n"
- + "default_server_host=" + HOST + ":" + serverPort + "\n");
- writer.write("# Flag indicating if the server should also operate in SSL mode.\nssl_enabled=false\n");
- writer.flush();
- writer.close();
- configFile.deleteOnExit();
- return configFile.getAbsolutePath();
- }
-
- protected void startHubServers() throws Exception {
- super.startHubServers();
- System.out.println("startHubServers done ... " + serversList);
- }
-
-
- protected void stopHubServers() throws Exception {
- super.stopHubServers();
- System.out.println("stopHubServers done ... " + serversList);
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/hedwig/jms/BasicJMSTest.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/hedwig/jms/BasicJMSTest.java b/hedwig-client-jms/src/test/java/org/apache/hedwig/jms/BasicJMSTest.java
deleted file mode 100644
index be64fc4..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/hedwig/jms/BasicJMSTest.java
+++ /dev/null
@@ -1,337 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.jms;
-
-import junit.framework.Assert;
-import org.apache.hedwig.JmsTestBase;
-import org.apache.hedwig.jms.jndi.HedwigInitialContext;
-
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageListener;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-import javax.jms.TopicConnection;
-import javax.jms.TopicConnectionFactory;
-import javax.jms.TopicPublisher;
-import javax.jms.TopicSession;
-import javax.jms.TopicSubscriber;
-import javax.naming.Context;
-import javax.naming.NamingException;
-
-/**
- * Basic JMS testcases.
- */
-public class BasicJMSTest extends JmsTestBase {
-
- private static final int NUM_ITERATIONS = 1;
- private static final String TEXT_MESSAGE = "test_message";
-
- // private static final String CHAT_TOPIC_NAME = "chat_topic";
- private static final String[] CHAT_MESSAGES = {"message1", "message2", "message3", "message4"};
- private static final String ATTRIBUTE_KEY = "key";
- private static final String ATTRIBUTE_VALUE = "value";
-
- private TopicConnectionFactory topicConnectionFactory;
-
- // private static final String testTopicName = "test_topic3";
- private String testTopicName;
-
- @Override
- protected void setUp() throws Exception {
- super.setUp();
- Context messaging = new HedwigInitialContext();
- topicConnectionFactory = (TopicConnectionFactory) messaging.lookup(
- HedwigInitialContext.TOPIC_CONNECTION_FACTORY_NAME);
- testTopicName = SessionImpl.generateRandomString();
- }
-
- @Override
- public void tearDown() throws Exception {
- super.tearDown();
- testTopicName = null;
- }
-
- public void testSimpleJms() throws JMSException {
- for (int i = 0; i < NUM_ITERATIONS; i++) {
- simpleJMSTestImpl(false);
- }
- for (int i = 0; i < NUM_ITERATIONS; i++) {
- simpleJMSTestImpl(true);
- }
- }
-
- private void simpleJMSTestImpl(boolean transacted) throws JMSException {
- TopicConnection connection = topicConnectionFactory.createTopicConnection();
-
- // Creating two sessions : one to subscribe, the other to publish and test between them ...
- TopicSession publishTopicSession = connection.createTopicSession(transacted, Session.AUTO_ACKNOWLEDGE);
- TopicSession subscribeTopicSession = connection.createTopicSession(transacted, Session.AUTO_ACKNOWLEDGE);
-
- TopicPublisher publisher = publishTopicSession.createPublisher(publishTopicSession.createTopic(testTopicName));
- TopicSubscriber subscriber = subscribeTopicSession.createDurableSubscriber(
- publishTopicSession.createTopic(testTopicName), "test_subscriber");
- //TopicSubscriber subscriber_dup =
- subscribeTopicSession.createDurableSubscriber(
- publishTopicSession.createTopic(testTopicName), "test_subscriber");
- // TopicSubscriber subscriber1 =
- subscribeTopicSession.createDurableSubscriber(
- publishTopicSession.createTopic(testTopicName), "test_subscriber1");
-
- // Start connection ...
- connection.start();
- // subscriber.receiveNoWait();
-
- publisher.publish(publishTopicSession.createTextMessage(TEXT_MESSAGE));
- if (transacted) publishTopicSession.commit();
-
- Message message = subscriber.receive();
-
- Assert.assertNotNull(message);
- Assert.assertTrue(message instanceof TextMessage);
- Assert.assertEquals(((TextMessage) message).getText(), TEXT_MESSAGE);
-
- if (transacted) subscribeTopicSession.commit();
-
- subscribeTopicSession.close();
- // Must return null, we have closed the session.
- Assert.assertNull(subscriber.receive());
-
- connection.close();
- }
-
-
- // Based on code from http://onjava.com/pub/a/onjava/excerpt/jms_ch2/index.html?page=2
- public void testSimpleChat() throws NamingException, JMSException {
- for (int i = 0; i < NUM_ITERATIONS; i++) {
- simpleChatTestImpl(false);
- }
- for (int i = 0; i < NUM_ITERATIONS; i++) {
- simpleChatTestImpl(true);
- }
- }
-
- private void simpleChatTestImpl(boolean transacted) throws NamingException, JMSException {
- // Create a JMS connection
- TopicConnection connection = topicConnectionFactory.createTopicConnection();
-
- // Create two JMS session objects
- TopicSession pubSession = connection.createTopicSession(transacted, Session.AUTO_ACKNOWLEDGE);
- TopicSession subSession = connection.createTopicSession(transacted, Session.AUTO_ACKNOWLEDGE);
-
- // Look up a JMS topic
- // Topic chatTopic = pubSession.createTopic(CHAT_TOPIC_NAME);
- Topic chatTopic = pubSession.createTopic(SessionImpl.generateRandomString());
-
- // Create a JMS publisher and subscriber
- TopicPublisher publisher = pubSession.createPublisher(chatTopic);
- TopicSubscriber subscriber = subSession.createSubscriber(chatTopic);
- TopicSubscriber subscriber1 = subSession.createSubscriber(chatTopic);
-
- final Mutable<Boolean> error = new Mutable<Boolean>(false);
- final Mutable<String> errorMessage = new Mutable<String>(null);
- final Mutable<Integer> messageCount = new Mutable<Integer>(0);
- // Set a JMS message listener
- subscriber.setMessageListener(new MessageListener() {
- @Override
- public void onMessage(Message message) {
- // if already failed, ignore.
- if (error.getValue()) return;
-
- if (!(message instanceof TextMessage)) {
- errorMessage.setValue("Not text message ?");
- error.setValue(true);
- return;
- }
- TextMessage textMessage = (TextMessage) message;
- String text;
- try {
- text = textMessage.getText();
- } catch (JMSException e) {
- e.printStackTrace();
- errorMessage.setValue("Exception getting text : " + e);
- error.setValue(true);
- return;
- }
-
- final int count = messageCount.getValue();
- messageCount.setValue(messageCount.getValue() + 1);
-
- if (count >= CHAT_MESSAGES.length) {
- errorMessage.setValue("Unexpected message count : " + count);
- error.setValue(true);
- return;
- }
- if (!CHAT_MESSAGES[count].equals(text)) {
- errorMessage.setValue("Message mismatch. expected : "
- + CHAT_MESSAGES[count] + ", received : " + text);
- error.setValue(true);
- return;
- }
- try {
- if (!ATTRIBUTE_VALUE.equals(textMessage.getStringProperty(ATTRIBUTE_KEY))) {
- errorMessage.setValue("Attribute value mismatch. Expected : " + ATTRIBUTE_VALUE
- + ", found : " + textMessage.getStringProperty(ATTRIBUTE_KEY));
- error.setValue(true);
- return;
- }
- } catch (JMSException e) {
- e.printStackTrace();
- errorMessage.setValue("Exception getting text : " + e);
- error.setValue(true);
- return;
- }
- }
- });
-
- // Start the JMS connection; allows messages to be delivered
- connection.start();
- for (String message : CHAT_MESSAGES) {
- TextMessage tmessage = pubSession.createTextMessage(message);
- tmessage.setStringProperty(ATTRIBUTE_KEY, ATTRIBUTE_VALUE);
- publisher.publish(tmessage);
- }
-
- if (transacted) pubSession.commit();
-
- try {
- Thread.sleep(10);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
-
- for (int i = 0; i < CHAT_MESSAGES.length; i++) {
- Message receivedMessage = subscriber1.receive(100);
- Assert.assertNotNull(receivedMessage);
- }
-
- if (messageCount.getValue() != CHAT_MESSAGES.length) {
- error.setValue(true);
- errorMessage.setValue("Expected to receive " + CHAT_MESSAGES.length
- + ", got " + messageCount.getValue() + " messages.");
- }
-
- if (transacted) subSession.commit();
-
- Assert.assertFalse(String.valueOf(error.getValue()), error.getValue());
- connection.close();
- }
-
-
- public void testSimpleSelector() throws JMSException {
- for (int i = 0; i < NUM_ITERATIONS; i++) {
- simpleSelectorImpl(false);
- }
- for (int i = 0; i < NUM_ITERATIONS; i++) {
- simpleSelectorImpl(true);
- }
- }
-
- private void simpleSelectorImpl(boolean transacted) throws JMSException {
- TopicConnection connection = topicConnectionFactory.createTopicConnection();
-
- // Creating three sessions : one to subscribe with selector,
- // one to publish and third to validate that the message was/was-not delivered !
- TopicSession publishTopicSession = connection.createTopicSession(transacted, Session.AUTO_ACKNOWLEDGE);
- TopicSession subscribeTopicSession = connection.createTopicSession(transacted, Session.AUTO_ACKNOWLEDGE);
- TopicSession subscribeValidationTopicSession = connection.createTopicSession(transacted,
- Session.AUTO_ACKNOWLEDGE);
-
-
- TopicPublisher publisher = publishTopicSession.createPublisher(publishTopicSession.createTopic(testTopicName));
-
- // The first subscriber's subscription should be overridden by the second
- // hence, we MUST have selector enabled.
- TopicSubscriber selectorSubscriber = subscribeTopicSession.createDurableSubscriber(
- publishTopicSession.createTopic(testTopicName), "test_subscriber");
- TopicSubscriber selectorSubscriber_dup = subscribeTopicSession.createDurableSubscriber(
- publishTopicSession.createTopic(testTopicName),
- "test_subscriber", ATTRIBUTE_KEY + " <> '" + ATTRIBUTE_VALUE + "'", false);
-
- // without selector.
- TopicSubscriber subscriberValidation =
- subscribeValidationTopicSession.createDurableSubscriber(
- publishTopicSession.createTopic(testTopicName), "test_subscriber1");
-
- // Start connection ...
- connection.start();
-
- final String textMessage = TEXT_MESSAGE + ", transacted : " + transacted;
- // Send the message.
- {
- TextMessage message = publishTopicSession.createTextMessage(textMessage);
- message.setStringProperty(ATTRIBUTE_KEY, ATTRIBUTE_VALUE);
- publisher.publish(message);
- }
-
- if (transacted) {
- // Must return null ... no publish must happen until we commit !
- Message message = subscriberValidation.receive(200);
- Assert.assertNull("Unexpected message : " + message, message);
- publishTopicSession.commit();
- }
-
- // subscriberValidation must get the message as soon as it is available,
- // while selectorSubscriber might/might not
- // (depending on whether Selector works :-) ). So wait on subscriberValidation
- {
- Message receivedMessage = subscriberValidation.receive(200);
-
- // Validate whether it is the correct message.
- Assert.assertNotNull("receivedMessage was expected", receivedMessage);
- Assert.assertTrue("receivedMessage not a textMessage ? " + receivedMessage,
- receivedMessage instanceof TextMessage);
- Assert.assertEquals("test content does not match ? " + ((TextMessage) receivedMessage).getText(),
- textMessage, ((TextMessage) receivedMessage).getText());
-
- final String attrValue = receivedMessage.getStringProperty(ATTRIBUTE_KEY);
- Assert.assertEquals("attribute value invalid ? " + attrValue, attrValue, ATTRIBUTE_VALUE);
-
- if (transacted) subscribeValidationTopicSession.commit();
- }
-
- // Now that subscriberValidation received the message,
- // selectorSubscriber and/or selectorSubscriber_dup must also receive the
- // message or they will never receive the message (since selector blocked it).
- {
- // Even though selectorSubscriber was subscribed WITHOUT selector, we create selectorSubscriber_dup LATER to
- // override the subscription policy using the SAME subscription id/topic
- // - so selectorSubscriber_dup config MUST take
- // precedence ...
- Message msg = selectorSubscriber.receive(100);
- Message msg1 = selectorSubscriber_dup.receive(100);
- Assert.assertNull("Unexpected received message " + msg, msg);
- Assert.assertNull("Unexpected received message " + msg1, msg1);
- }
-
- // close all sessions.
- subscribeTopicSession.close();
- subscribeValidationTopicSession.close();
- publishTopicSession.close();
-
- // Must return null, we have closed the session ! simple validation :-)
- {
- Message smsg = subscriberValidation.receive();
- Assert.assertNull("Unexpected validation message received " + smsg, smsg);
- }
-
- // close connection ...
- connection.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/hedwig/jms/selector/BasicSelectorGrammarTest.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/hedwig/jms/selector/BasicSelectorGrammarTest.java b/hedwig-client-jms/src/test/java/org/apache/hedwig/jms/selector/BasicSelectorGrammarTest.java
deleted file mode 100644
index 918dbbf..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/hedwig/jms/selector/BasicSelectorGrammarTest.java
+++ /dev/null
@@ -1,376 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.jms.selector;
-
-import junit.framework.Assert;
-import org.apache.hedwig.jms.message.MessageImpl;
-import org.apache.hedwig.jms.message.TextMessageImpl;
-import org.junit.Before;
-import org.junit.Test;
-
-import javax.jms.JMSException;
-
-/**
- * Test basic selector grammar.
- */
-public class BasicSelectorGrammarTest {
-
- private static final String TEST_MESSAGE = "test_message";
-
- private static final String BOOLEAN_HEADER1 = "boolean_header1";
- private static final boolean BOOLEAN_VALUE1 = true;
-
-
- private static final String INT_HEADER1 = "int_header1";
- private static final int INT_VALUE1 = 1;
-
- private static final String INT_HEADER2 = "int_header2";
- private static final int INT_VALUE2 = 2;
-
- private static final String INT_HEADER3 = "int_header3";
- private static final int INT_VALUE3 = 3;
-
-
- private static final String DOUBLE_HEADER1 = "double_header1";
- private static final double DOUBLE_VALUE1 = 1;
-
- private static final String DOUBLE_HEADER2 = "double_header2";
- private static final double DOUBLE_VALUE2 = 2;
-
-
- private static final String STRING_HEADER1 = "string_header1";
- private static final String STRING_VALUE1 = "header_value1";
-
- private static final String STRING_HEADER2 = "string_header2";
- private static final String STRING_VALUE2 = "header_value2";
-
- private static final String STRING_HEADER3 = "string_header3";
- private static final String STRING_VALUE3 = "header_value3";
-
- private static final String STRING_HEADER4 = "string_header4";
- private static final String STRING_VALUE4 = "header_value4";
-
- // Contains both characters used to do regexp in LIKE
- private static final String STRING_LIKE_HEADER = "string_like_header";
- private static final String STRING_LIKE_VALUE = "value with a _ and % in it and a \n with \t also for testing.";
-
- private static final String STRING_QUOTES_HEADER = "string_quotes_header";
- private static final String STRING_QUOTES_VALUE = "quotes's value";
- private static final String STRING_QUOTED_QUOTES_VALUE = "quotes''s value";
-
-
- private MessageImpl message;
-
- @Before
- public void createMessage() {
- try {
- // Directly creating instead of using session ... this is just to test !
- TextMessageImpl message = new TextMessageImpl(null, TEST_MESSAGE);
-
- message.setBooleanProperty(BOOLEAN_HEADER1, BOOLEAN_VALUE1);
-
- message.setIntProperty(INT_HEADER1, INT_VALUE1);
- message.setIntProperty(INT_HEADER2, INT_VALUE2);
- message.setIntProperty(INT_HEADER3, INT_VALUE3);
-
- message.setDoubleProperty(DOUBLE_HEADER1, DOUBLE_VALUE1);
- message.setDoubleProperty(DOUBLE_HEADER2, DOUBLE_VALUE2);
-
- message.setStringProperty(STRING_HEADER1, STRING_VALUE1);
- message.setStringProperty(STRING_HEADER2, STRING_VALUE2);
- message.setStringProperty(STRING_HEADER3, STRING_VALUE3);
- message.setStringProperty(STRING_HEADER4, STRING_VALUE4);
- message.setStringProperty(STRING_LIKE_HEADER, STRING_LIKE_VALUE);
-
- message.setStringProperty(STRING_QUOTES_HEADER, STRING_QUOTES_VALUE);
-
- this.message = message;
- } catch (JMSException e) {
- throw new IllegalStateException("Unexpected ... ", e);
- }
- }
-
-
- @Test
- public void testBasicLookup() throws ParseException {
- // simple check's for int header.
- Assert.assertEquals(Boolean.TRUE,
- SelectorParser.evaluateSelector(
- SelectorParser.parseMessageSelector(BOOLEAN_HEADER1 + " = " + BOOLEAN_VALUE1),
- message)
- );
- Assert.assertEquals(Boolean.FALSE,
- SelectorParser.evaluateSelector(
- SelectorParser.parseMessageSelector(BOOLEAN_HEADER1 + " <> " + BOOLEAN_VALUE1),
- message)
- );
-
- // simple check's for int header.
- Assert.assertEquals(Boolean.TRUE,
- SelectorParser.evaluateSelector(
- SelectorParser.parseMessageSelector(INT_HEADER1 + " = " + INT_VALUE1),
- message)
- );
- Assert.assertEquals(Boolean.FALSE,
- SelectorParser.evaluateSelector(
- SelectorParser.parseMessageSelector(INT_HEADER1 + " <> " + INT_VALUE1),
- message)
- );
-
- // simple check's for double header.
- Assert.assertEquals(Boolean.TRUE,
- SelectorParser.evaluateSelector(
- SelectorParser.parseMessageSelector(DOUBLE_HEADER1 + " = " + DOUBLE_VALUE1),
- message)
- );
- Assert.assertEquals(Boolean.FALSE,
- SelectorParser.evaluateSelector(
- SelectorParser.parseMessageSelector(DOUBLE_HEADER1 + " <> " + DOUBLE_VALUE1),
- message)
- );
-
- // simple check's for String header.
- Assert.assertEquals(Boolean.TRUE,
- SelectorParser.evaluateSelector(
- SelectorParser.parseMessageSelector(STRING_HEADER1 + " = '" + STRING_VALUE1 + "'"),
- message)
- );
- Assert.assertEquals(Boolean.FALSE,
- SelectorParser.evaluateSelector(
- SelectorParser.parseMessageSelector(STRING_HEADER1 + " <> '" + STRING_VALUE1 + "'"),
- message)
- );
-
- // check for String header with quote ...
- Assert.assertEquals(Boolean.TRUE,
- SelectorParser.evaluateSelector(
- SelectorParser.parseMessageSelector(STRING_QUOTES_HEADER + " = '"
- + STRING_QUOTED_QUOTES_VALUE + "'"),
- message)
- );
- Assert.assertEquals(Boolean.FALSE,
- SelectorParser.evaluateSelector(
- SelectorParser.parseMessageSelector(STRING_QUOTES_HEADER
- + " <> '" + STRING_QUOTED_QUOTES_VALUE + "'"),
- message)
- );
-
-
- // incompatible header.
- Assert.assertNull(
- SelectorParser.evaluateSelector(
- SelectorParser.parseMessageSelector(STRING_QUOTES_HEADER + " = " + INT_VALUE1),
- message)
- );
- Assert.assertNull(
- SelectorParser.evaluateSelector(
- SelectorParser.parseMessageSelector(BOOLEAN_HEADER1 + " = " + DOUBLE_VALUE1),
- message)
- );
- Assert.assertNull(
- SelectorParser.evaluateSelector(
- SelectorParser.parseMessageSelector("unknown_header = " + STRING_VALUE1),
- message)
- );
- }
-
-
- @Test
- public void testArithmetic() throws ParseException {
- Assert.assertEquals(Boolean.TRUE,
- SelectorParser.evaluateSelector(
- SelectorParser.parseMessageSelector(INT_HEADER1 + " + " + INT_HEADER2 + " > " + INT_VALUE1),
- message)
- );
- Assert.assertEquals(Boolean.TRUE,
- SelectorParser.evaluateSelector(
- SelectorParser.parseMessageSelector(INT_HEADER1 + " + ( 2 * " + INT_HEADER2
- + " + " + INT_HEADER1 + " ) < " +
- " 4 * ( " + INT_HEADER1 + " + " + INT_VALUE2 + " ) "),
- message)
- );
-
- Assert.assertEquals(Boolean.TRUE,
- SelectorParser.evaluateSelector(
- SelectorParser.parseMessageSelector(DOUBLE_HEADER1 + " + "
- + DOUBLE_HEADER2 + " > " + DOUBLE_VALUE1),
- message)
- );
- Assert.assertEquals(Boolean.TRUE,
- SelectorParser.evaluateSelector(
- SelectorParser.parseMessageSelector(DOUBLE_HEADER1 + " * 7.5 + 1 + 2 * ( "
- + DOUBLE_HEADER2 + " + 2.0 * " + DOUBLE_HEADER1 + " ) = " +
- " 0.5 + 4 * ( 2.0 * " + DOUBLE_HEADER1 + " + " + DOUBLE_VALUE2 + " ) "),
- message)
- );
-
- // Incompatible header in computation - string used in arithmetic.
- Assert.assertNull(
- SelectorParser.evaluateSelector(
- SelectorParser.parseMessageSelector(STRING_HEADER1 + " * 4 + " + DOUBLE_HEADER1
- + " * 7.5 + 1 + 2 * ( " + DOUBLE_HEADER2
- + "+ 2.0 * " + DOUBLE_HEADER1 + " ) = " +
- " 0.5 + 4 * ( 2.0 * " + DOUBLE_HEADER1 + " + " + DOUBLE_VALUE2 + " ) "),
- message)
- );
-
- // Unknown header in computation.
- Assert.assertNull(
- SelectorParser.evaluateSelector(
- SelectorParser.parseMessageSelector(" unknown_header * 4 + "
- + DOUBLE_HEADER1 + " * 7.5 + 1 + 2 * ( "
- + DOUBLE_HEADER2 + " + 2.0 * " + DOUBLE_HEADER1 + " ) = " +
- " 0.5 + 4 * ( 2.0 * " + DOUBLE_HEADER1 + " + " + DOUBLE_VALUE2 + " ) "),
- message)
- );
-
- }
-
- @Test
- public void testFunctions() throws ParseException {
-
- // is (not) null.
- Assert.assertEquals(Boolean.TRUE,
- SelectorParser.evaluateSelector(
- SelectorParser.parseMessageSelector(DOUBLE_HEADER2 + " IS NOT NULL"),
- message)
- );
- Assert.assertEquals(Boolean.FALSE,
- SelectorParser.evaluateSelector(
- SelectorParser.parseMessageSelector(STRING_HEADER1 + " IS NULL"),
- message)
- );
- // unknown header.
- Assert.assertEquals(Boolean.TRUE,
- SelectorParser.evaluateSelector(
- SelectorParser.parseMessageSelector("unknown_header is null"),
- message)
- );
-
-
- // Between ...
- Assert.assertEquals(Boolean.TRUE,
- SelectorParser.evaluateSelector(
- SelectorParser.parseMessageSelector(DOUBLE_HEADER2 + " BETWEEN 1 AND 2"),
- message)
- );
- Assert.assertEquals(Boolean.TRUE,
- SelectorParser.evaluateSelector(
- SelectorParser.parseMessageSelector(" ( - " + INT_HEADER1 + " * 2 + "
- + DOUBLE_HEADER1 + " * 4 ) / 10.0 between 0 and 3 * "
- + INT_HEADER2 + " * 2.4"),
- message)
- );
- Assert.assertEquals(Boolean.FALSE,
- SelectorParser.evaluateSelector(
- SelectorParser.parseMessageSelector(INT_HEADER2
- + " not between (0.4 * 2 + (0.01 + 0.3 * 0.2) ) AND 10.0"),
- // SelectorParser.parseMessageSelector(INT_HEADER2 + " NOT BETWEEN 1 AND 3"),
- message)
- );
- Assert.assertEquals(Boolean.FALSE,
- SelectorParser.evaluateSelector(
- SelectorParser.parseMessageSelector(INT_HEADER2
- + " NOT BETWEEN (0.4 * 2 + (0.01 + 0.3 * 0.2) + "
- + DOUBLE_VALUE1 + " / 10.0 ) AND 10.0"),
- message)
- );
-
- // must throw runtime evaluation exception and return null (NOT parse time exception ) and so evaluate to false.
- Assert.assertNull(
- SelectorParser.evaluateSelector(
- SelectorParser.parseMessageSelector(STRING_HEADER1 + " * 2 + " + DOUBLE_HEADER1
- + " / 1.4 NOT BETWEEN (0.4 * 2 + (0.01 + 0.3 * 0.2) + "
- + DOUBLE_VALUE1 + " / 10.0 ) AND 10.0"),
- message)
- );
-
-
- // (not)? IN
- Assert.assertEquals(Boolean.TRUE,
- SelectorParser.evaluateSelector(
- SelectorParser.parseMessageSelector(
- STRING_HEADER3 + " IN ( '" + STRING_VALUE1
- + "', '" + STRING_VALUE2 + "', '" + STRING_VALUE3 +
- "', '" + STRING_VALUE4 + "', '" + STRING_QUOTED_QUOTES_VALUE + "') "),
- message)
- );
- Assert.assertEquals(Boolean.TRUE,
- SelectorParser.evaluateSelector(
- SelectorParser.parseMessageSelector(
- STRING_QUOTES_HEADER + " IN ( '" + STRING_VALUE1
- + "', '" + STRING_VALUE2 + "', '" + STRING_VALUE3 +
- "', '" + STRING_VALUE4 + "', '" + STRING_QUOTED_QUOTES_VALUE + "') "),
- message)
- );
- Assert.assertEquals(Boolean.TRUE,
- SelectorParser.evaluateSelector(
- SelectorParser.parseMessageSelector(
- STRING_QUOTES_HEADER + " NOT IN ( '" + STRING_VALUE1
- + "', '" + STRING_VALUE2 + "', '" + STRING_VALUE3 +
- "', '" + STRING_VALUE4 + "') "),
- message)
- );
- // using non string identifiers used in 'IN' construct should return null.
- Assert.assertNull(
- SelectorParser.evaluateSelector(
- SelectorParser.parseMessageSelector(
- INT_HEADER1 + " NOT IN ( '" + STRING_VALUE1 + "', '"
- + STRING_VALUE2 + "', '" + STRING_VALUE3 +
- "', '" + STRING_VALUE4 + "') "),
- message)
- );
- Assert.assertNull(
- SelectorParser.evaluateSelector(
- SelectorParser.parseMessageSelector(
- BOOLEAN_HEADER1 + " IN ( '" + STRING_VALUE1 + "', '" + STRING_VALUE2
- + "', '" + STRING_VALUE3 + "', '" + STRING_VALUE4 + "') "),
- message)
- );
-
-
- // like
- Assert.assertEquals(Boolean.TRUE,
- SelectorParser.evaluateSelector(
- SelectorParser.parseMessageSelector(
- STRING_HEADER1 + " LIKE 'header\\_%' ESCAPE '\\'"),
- message)
- );
-
- // value is - ""value with a _ and % in it and a \n with \t also for testing."";
- Assert.assertEquals(Boolean.TRUE,
- SelectorParser.evaluateSelector(
- SelectorParser.parseMessageSelector(
- STRING_LIKE_HEADER + " LIKE '% with a \\_ and \\%%' ESCAPE '\\'"),
- message)
- );
-
- Assert.assertEquals(Boolean.TRUE,
- SelectorParser.evaluateSelector(
- SelectorParser.parseMessageSelector(
- STRING_LIKE_HEADER + " LIKE '%\n%'"),
- message)
- );
- Assert.assertEquals(Boolean.TRUE,
- SelectorParser.evaluateSelector(
- SelectorParser.parseMessageSelector(
- STRING_LIKE_HEADER + " NOT LIKE '%\r%'"),
- message)
- );
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/hedwig/jms/selector/activemq/SelectorParserTest.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/hedwig/jms/selector/activemq/SelectorParserTest.java b/hedwig-client-jms/src/test/java/org/apache/hedwig/jms/selector/activemq/SelectorParserTest.java
deleted file mode 100644
index 7895b1d..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/hedwig/jms/selector/activemq/SelectorParserTest.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.jms.selector.activemq;
-
-import org.apache.hedwig.jms.message.TextMessageImpl;
-import org.apache.hedwig.jms.selector.Node;
-import org.apache.hedwig.jms.selector.ParseException;
-import org.apache.hedwig.jms.selector.SelectorParser;
-import org.junit.Test;
-
-import javax.jms.JMSException;
-
-/**
- * Based on ActiveMQ's codebase : modified to suit our codebase.
- */
-public class SelectorParserTest {
-
- @Test
- public void testParseWithParensAround() throws JMSException, ParseException {
- String[] values = {"x = 1 and y = 2", "(x = 1) and (y = 2)", "((x = 1) and (y = 2))"};
-
- TextMessageImpl message = new TextMessageImpl(null, "test");
- message.setIntProperty("x", 1);
- message.setIntProperty("y", 2);
-
- for (String value : values) {
- Node ast = SelectorParser.parseMessageSelector(value);
- assert Boolean.TRUE.equals(SelectorParser.evaluateSelector(ast, message));
- }
- }
-
-}