You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/06/11 05:08:14 UTC
svn commit: r783612 [3/3] - in /activemq/sandbox/activemq-flow:
activemq-all/ activemq-all/src/test/java/org/apache/activemq/
activemq-all/src/test/java/org/apache/activemq/blob/
activemq-all/src/test/java/org/apache/activemq/broker/ activemq-all/src/t...
Added: activemq/sandbox/activemq-flow/activemq-transport/src/test/java/org/apache/activemq/transport/mock/MockTransport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/test/java/org/apache/activemq/transport/mock/MockTransport.java?rev=783612&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-transport/src/test/java/org/apache/activemq/transport/mock/MockTransport.java (added)
+++ activemq/sandbox/activemq-flow/activemq-transport/src/test/java/org/apache/activemq/transport/mock/MockTransport.java Thu Jun 11 03:08:06 2009
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.mock;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.activemq.transport.DefaultTransportListener;
+import org.apache.activemq.transport.FutureResponse;
+import org.apache.activemq.transport.ResponseCallback;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportFilter;
+import org.apache.activemq.transport.TransportListener;
+import org.apache.activemq.wireformat.WireFormat;
+
+/**
+ * @version $Revision: 1.5 $
+ */
+public class MockTransport extends DefaultTransportListener implements Transport {
+
+ protected Transport next;
+ protected TransportListener transportListener;
+
+ public MockTransport(Transport next) {
+ this.next = next;
+ }
+
+ /**
+ */
+ public synchronized void setTransportListener(TransportListener channelListener) {
+ this.transportListener = channelListener;
+ if (channelListener == null) {
+ getNext().setTransportListener(null);
+ } else {
+ getNext().setTransportListener(this);
+ }
+ }
+
+ /**
+ * @see org.apache.activemq.Service#start()
+ * @throws IOException if the next channel has not been set.
+ */
+ public void start() throws Exception {
+ if (getNext() == null) {
+ throw new IOException("The next channel has not been set.");
+ }
+ if (transportListener == null) {
+ throw new IOException("The command listener has not been set.");
+ }
+ getNext().start();
+ }
+
+ /**
+ * @see org.apache.activemq.Service#stop()
+ */
+ public void stop() throws Exception {
+ getNext().stop();
+ }
+
+ public void onCommand(Object command) {
+ getTransportListener().onCommand(command);
+ }
+
+ /**
+ * @return Returns the getNext().
+ */
+ public synchronized Transport getNext() {
+ return next;
+ }
+
+ /**
+ * @return Returns the packetListener.
+ */
+ public synchronized TransportListener getTransportListener() {
+ return transportListener;
+ }
+
+ public String toString() {
+ return getNext().toString();
+ }
+
+ public void oneway(Object command) throws IOException {
+ getNext().oneway(command);
+ }
+
+ public <T> FutureResponse<T> asyncRequest(Object command, ResponseCallback<T> responseCallback) throws IOException {
+ return getNext().asyncRequest(command, null);
+ }
+
+ public Object request(Object command) throws IOException {
+ return getNext().request(command);
+ }
+
+ public Object request(Object command, int timeout) throws IOException {
+ return getNext().request(command, timeout);
+ }
+
+ public void onException(IOException error) {
+ getTransportListener().onException(error);
+ }
+
+ public <T> T narrow(Class<T> target) {
+ if (target.isAssignableFrom(getClass())) {
+ return target.cast(this);
+ }
+ return getNext().narrow(target);
+ }
+
+ public synchronized void setNext(Transport next) {
+ this.next = next;
+ }
+
+ public void install(TransportFilter filter) {
+ filter.setTransportListener(this);
+ getNext().setTransportListener(filter);
+ setNext(filter);
+ }
+
+ public String getRemoteAddress() {
+ return getNext().getRemoteAddress();
+ }
+
+ /**
+ * @see org.apache.activemq.transport.Transport#isFaultTolerant()
+ */
+ public boolean isFaultTolerant() {
+ return getNext().isFaultTolerant();
+ }
+
+ public boolean isDisposed() {
+ return getNext().isDisposed();
+ }
+
+ public boolean isConnected() {
+ return getNext().isConnected();
+ }
+
+ public void reconnect(URI uri) throws IOException {
+ getNext().reconnect(uri);
+ }
+
+ public boolean isUseInactivityMonitor() {
+ return getNext().isUseInactivityMonitor();
+ }
+
+ public WireFormat getWireformat() {
+ return getNext().getWireformat();
+ }
+}
Added: activemq/sandbox/activemq-flow/activemq-transport/src/test/java/org/apache/activemq/transport/mock/MockTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/test/java/org/apache/activemq/transport/mock/MockTransportFactory.java?rev=783612&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-transport/src/test/java/org/apache/activemq/transport/mock/MockTransportFactory.java (added)
+++ activemq/sandbox/activemq-flow/activemq-transport/src/test/java/org/apache/activemq/transport/mock/MockTransportFactory.java Thu Jun 11 03:08:06 2009
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.mock;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import org.apache.activemq.transport.MutexTransport;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportFactory;
+import org.apache.activemq.transport.TransportServer;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.activemq.util.URISupport;
+import org.apache.activemq.util.URISupport.CompositeData;
+
+public class MockTransportFactory extends TransportFactory {
+
+ public Transport doConnect(URI location) throws URISyntaxException, Exception {
+ Transport transport = createTransport(URISupport.parseComposite(location));
+ transport = new MutexTransport(transport);
+// transport = new ResponseCorrelator(transport);
+ return transport;
+ }
+
+ public Transport doCompositeConnect(URI location) throws URISyntaxException, Exception {
+ return createTransport(URISupport.parseComposite(location));
+ }
+
+ /**
+ * @param location
+ * @return
+ * @throws Exception
+ */
+ public Transport createTransport(CompositeData compositData) throws Exception {
+ MockTransport transport = new MockTransport(TransportFactory.compositeConnect(compositData.getComponents()[0]));
+ IntrospectionSupport.setProperties(transport, compositData.getParameters());
+ return transport;
+ }
+
+ public TransportServer doBind(URI location) throws IOException {
+ throw new IOException("This protocol does not support being bound.");
+ }
+
+}
Propchange: activemq/sandbox/activemq-flow/activemq-transport/src/test/java/org/apache/activemq/transport/mock/MockTransportFactory.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-transport/src/test/java/org/apache/activemq/transport/mock/package.html
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/test/java/org/apache/activemq/transport/mock/package.html?rev=783612&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-transport/src/test/java/org/apache/activemq/transport/mock/package.html (added)
+++ activemq/sandbox/activemq-flow/activemq-transport/src/test/java/org/apache/activemq/transport/mock/package.html Thu Jun 11 03:08:06 2009
@@ -0,0 +1,25 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<html>
+<head>
+</head>
+<body>
+
+ A mock implementation of the Transport layer useful for testing
+
+</body>
+</html>
Propchange: activemq/sandbox/activemq-flow/activemq-transport/src/test/java/org/apache/activemq/transport/mock/package.html
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-transport/src/test/resources/META-INF/services/org/apache/activemq/transport/mock
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/test/resources/META-INF/services/org/apache/activemq/transport/mock?rev=783612&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-transport/src/test/resources/META-INF/services/org/apache/activemq/transport/mock (added)
+++ activemq/sandbox/activemq-flow/activemq-transport/src/test/resources/META-INF/services/org/apache/activemq/transport/mock Thu Jun 11 03:08:06 2009
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.activemq.transport.mock.MockTransportFactory
Modified: activemq/sandbox/activemq-flow/activemq-util/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-util/pom.xml?rev=783612&r1=783611&r2=783612&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-util/pom.xml (original)
+++ activemq/sandbox/activemq-flow/activemq-util/pom.xml Thu Jun 11 03:08:06 2009
@@ -58,5 +58,23 @@
</dependency>
</dependencies>
+
+ <build>
+ <plugins>
+
+ <!-- Generate a test jar for the test cases in this package -->
+ <plugin>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ </plugins>
+ </build>
</project>
Added: activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/IOHelper.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/IOHelper.java?rev=783612&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/IOHelper.java (added)
+++ activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/IOHelper.java Thu Jun 11 03:08:06 2009
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.util;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * @version $Revision: 661435 $
+ */
+public final class IOHelper {
+ protected static final int MAX_DIR_NAME_LENGTH;
+ protected static final int MAX_FILE_NAME_LENGTH;
+ private static final int DEFAULT_BUFFER_SIZE = 4096;
+ private IOHelper() {
+ }
+
+ public static String getDefaultDataDirectory() {
+ return getDefaultDirectoryPrefix() + "activemq-data";
+ }
+
+ public static String getDefaultStoreDirectory() {
+ return getDefaultDirectoryPrefix() + "amqstore";
+ }
+
+ /**
+ * Allows a system property to be used to overload the default data
+ * directory which can be useful for forcing the test cases to use a target/
+ * prefix
+ */
+ public static String getDefaultDirectoryPrefix() {
+ try {
+ return System.getProperty("org.apache.activemq.default.directory.prefix", "");
+ } catch (Exception e) {
+ return "";
+ }
+ }
+
+ /**
+ * Converts any string into a string that is safe to use as a file name.
+ * The result will only include ascii characters and numbers, and the "-","_", and "." characters.
+ *
+ * @param name
+ * @return
+ */
+ public static String toFileSystemDirectorySafeName(String name) {
+ return toFileSystemSafeName(name, true, MAX_DIR_NAME_LENGTH);
+ }
+
+ public static String toFileSystemSafeName(String name) {
+ return toFileSystemSafeName(name, false, MAX_FILE_NAME_LENGTH);
+ }
+
+ /**
+ * Converts any string into a string that is safe to use as a file name.
+ * The result will only include ascii characters and numbers, and the "-","_", and "." characters.
+ *
+ * @param name
+ * @param dirSeparators
+ * @param maxFileLength
+ * @return
+ */
+ public static String toFileSystemSafeName(String name,boolean dirSeparators,int maxFileLength) {
+ int size = name.length();
+ StringBuffer rc = new StringBuffer(size * 2);
+ for (int i = 0; i < size; i++) {
+ char c = name.charAt(i);
+ boolean valid = c >= 'a' && c <= 'z';
+ valid = valid || (c >= 'A' && c <= 'Z');
+ valid = valid || (c >= '0' && c <= '9');
+ valid = valid || (c == '_') || (c == '-') || (c == '.') || (c=='#')
+ ||(dirSeparators && ( (c == '/') || (c == '\\')));
+
+ if (valid) {
+ rc.append(c);
+ } else {
+ // Encode the character using hex notation
+ rc.append('#');
+ rc.append(HexSupport.toHexFromInt(c, true));
+ }
+ }
+ String result = rc.toString();
+ if (result.length() > maxFileLength) {
+ result = result.substring(result.length()-maxFileLength,result.length());
+ }
+ return result;
+ }
+
+ public static boolean deleteFile(File fileToDelete) {
+ if (fileToDelete == null || !fileToDelete.exists()) {
+ return true;
+ }
+ boolean result = deleteChildren(fileToDelete);
+ result &= fileToDelete.delete();
+ return result;
+ }
+
+ public static boolean deleteChildren(File parent) {
+ if (parent == null || !parent.exists()) {
+ return false;
+ }
+ boolean result = true;
+ if (parent.isDirectory()) {
+ File[] files = parent.listFiles();
+ if (files == null) {
+ result = false;
+ } else {
+ for (int i = 0; i < files.length; i++) {
+ File file = files[i];
+ if (file.getName().equals(".")
+ || file.getName().equals("..")) {
+ continue;
+ }
+ if (file.isDirectory()) {
+ result &= deleteFile(file);
+ } else {
+ result &= file.delete();
+ }
+ }
+ }
+ }
+
+ return result;
+ }
+
+
+ public static void moveFile(File src, File targetDirectory) throws IOException {
+ if (!src.renameTo(new File(targetDirectory, src.getName()))) {
+ throw new IOException("Failed to move " + src + " to " + targetDirectory);
+ }
+ }
+
+ public static void copyFile(File src, File dest) throws IOException {
+ FileInputStream fileSrc = new FileInputStream(src);
+ FileOutputStream fileDest = new FileOutputStream(dest);
+ copyInputStream(fileSrc, fileDest);
+ }
+
+ public static void copyInputStream(InputStream in, OutputStream out) throws IOException {
+ byte[] buffer = new byte[DEFAULT_BUFFER_SIZE];
+ int len = in.read(buffer);
+ while (len >= 0) {
+ out.write(buffer, 0, len);
+ len = in.read(buffer);
+ }
+ in.close();
+ out.close();
+ }
+
+ static {
+ MAX_DIR_NAME_LENGTH = Integer.valueOf(System.getProperty("MaximumDirNameLength","200")).intValue();
+ MAX_FILE_NAME_LENGTH = Integer.valueOf(System.getProperty("MaximumFileNameLength","64")).intValue();
+ }
+
+
+ public static void mkdirs(File dir) throws IOException {
+ if (dir.exists()) {
+ if (!dir.isDirectory()) {
+ throw new IOException("Failed to create directory '" + dir +"', regular file already existed with that name");
+ }
+
+ } else {
+ if (!dir.mkdirs()) {
+ throw new IOException("Failed to create directory '" + dir+"'");
+ }
+ }
+ }
+}
Added: activemq/sandbox/activemq-flow/activemq-util/src/test/java/org/apache/activemq/util/MessageIdList.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-util/src/test/java/org/apache/activemq/util/MessageIdList.java?rev=783612&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-util/src/test/java/org/apache/activemq/util/MessageIdList.java (added)
+++ activemq/sandbox/activemq-flow/activemq-util/src/test/java/org/apache/activemq/util/MessageIdList.java Thu Jun 11 03:08:06 2009
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.util;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * A simple container of messages for performing testing and rendezvous style
+ * code. You can use this class a {@link MessageListener} and then make
+ * assertions about how many messages it has received allowing a certain maximum
+ * amount of time to ensure that the test does not hang forever. Also you can
+ * chain these instances together with the {@link #setParent(MessageListener)}
+ * method so that you can aggregate the total number of messages consumed across
+ * a number of consumers.
+ *
+ * @version $Revision: 1.6 $
+ */
+public class MessageIdList extends Assert implements MessageListener {
+
+ private static final Log LOG = LogFactory.getLog(MessageIdList.class);
+
+ private List<String> messageIds = new ArrayList<String>();
+ private Object semaphore;
+ private boolean verbose;
+ private MessageListener parent;
+ private long maximumDuration = 15000L;
+ private long processingDelay;
+
+ private CountDownLatch countDownLatch;
+
+ public MessageIdList() {
+ this(new Object());
+ }
+
+ public MessageIdList(Object semaphore) {
+ this.semaphore = semaphore;
+ }
+
+ public boolean equals(Object that) {
+ if (that instanceof MessageIdList) {
+ MessageIdList thatList = (MessageIdList)that;
+ return getMessageIds().equals(thatList.getMessageIds());
+ }
+ return false;
+ }
+
+ public int hashCode() {
+ synchronized (semaphore) {
+ return messageIds.hashCode() + 1;
+ }
+ }
+
+ public String toString() {
+ synchronized (semaphore) {
+ return messageIds.toString();
+ }
+ }
+
+ /**
+ * @return all the messages on the list so far, clearing the buffer
+ */
+ public List<String> flushMessages() {
+ synchronized (semaphore) {
+ List<String> answer = new ArrayList<String>(messageIds);
+ messageIds.clear();
+ return answer;
+ }
+ }
+
+ public synchronized List<String> getMessageIds() {
+ synchronized (semaphore) {
+ return new ArrayList<String>(messageIds);
+ }
+ }
+
+ public void onMessage(Message message) {
+ String id = null;
+ try {
+ id = message.getJMSMessageID();
+ synchronized (semaphore) {
+ messageIds.add(id);
+ semaphore.notifyAll();
+ }
+ if (countDownLatch != null) {
+ countDownLatch.countDown();
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Received message: " + message);
+ }
+ } catch (JMSException e) {
+ e.printStackTrace();
+ }
+ if (parent != null) {
+ parent.onMessage(message);
+ }
+ if (processingDelay > 0) {
+ try {
+ Thread.sleep(processingDelay);
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+
+ public int getMessageCount() {
+ synchronized (semaphore) {
+ return messageIds.size();
+ }
+ }
+
+ public void waitForMessagesToArrive(int messageCount) {
+ LOG.info("Waiting for " + messageCount + " message(s) to arrive");
+
+ long start = System.currentTimeMillis();
+
+ for (int i = 0; i < messageCount; i++) {
+ try {
+ if (hasReceivedMessages(messageCount)) {
+ break;
+ }
+ long duration = System.currentTimeMillis() - start;
+ if (duration >= maximumDuration) {
+ break;
+ }
+ synchronized (semaphore) {
+ semaphore.wait(maximumDuration - duration);
+ }
+ } catch (InterruptedException e) {
+ LOG.info("Caught: " + e);
+ }
+ }
+ long end = System.currentTimeMillis() - start;
+
+ LOG.info("End of wait for " + end + " millis and received: " + getMessageCount() + " messages");
+ }
+
+ /**
+ * Performs a testing assertion that the correct number of messages have
+ * been received without waiting
+ *
+ * @param messageCount
+ */
+ public void assertMessagesReceivedNoWait(int messageCount) {
+ assertEquals("expected number of messages when received", messageCount, getMessageCount());
+ }
+
+ /**
+ * Performs a testing assertion that the correct number of messages have
+ * been received waiting for the messages to arrive up to a fixed amount of
+ * time.
+ *
+ * @param messageCount
+ */
+ public void assertMessagesReceived(int messageCount) {
+ waitForMessagesToArrive(messageCount);
+
+ assertMessagesReceivedNoWait(messageCount);
+ }
+
+ /**
+ * Asserts that there are at least the given number of messages received
+ * without waiting.
+ */
+ public void assertAtLeastMessagesReceived(int messageCount) {
+ int actual = getMessageCount();
+ assertTrue("at least: " + messageCount + " messages received. Actual: " + actual, actual >= messageCount);
+ }
+
+ /**
+ * Asserts that there are at most the number of messages received without
+ * waiting
+ *
+ * @param messageCount
+ */
+ public void assertAtMostMessagesReceived(int messageCount) {
+ int actual = getMessageCount();
+ assertTrue("at most: " + messageCount + " messages received. Actual: " + actual, actual <= messageCount);
+ }
+
+ public boolean hasReceivedMessage() {
+ return getMessageCount() == 0;
+ }
+
+ public boolean hasReceivedMessages(int messageCount) {
+ return getMessageCount() >= messageCount;
+ }
+
+ public boolean isVerbose() {
+ return verbose;
+ }
+
+ public void setVerbose(boolean verbose) {
+ this.verbose = verbose;
+ }
+
+ public MessageListener getParent() {
+ return parent;
+ }
+
+ /**
+ * Allows a parent listener to be specified such as to aggregate messages
+ * consumed across consumers
+ */
+ public void setParent(MessageListener parent) {
+ this.parent = parent;
+ }
+
+ /**
+ * @return the maximumDuration
+ */
+ public long getMaximumDuration() {
+ return this.maximumDuration;
+ }
+
+ /**
+ * @param maximumDuration the maximumDuration to set
+ */
+ public void setMaximumDuration(long maximumDuration) {
+ this.maximumDuration = maximumDuration;
+ }
+
+ public void setCountDownLatch(CountDownLatch countDownLatch) {
+ this.countDownLatch = countDownLatch;
+ }
+
+ /**
+ * Gets the amount of time the message listener will spend sleeping to
+ * simulate a processing delay.
+ *
+ * @return
+ */
+ public long getProcessingDelay() {
+ return processingDelay;
+ }
+
+ /**
+ * Sets the amount of time the message listener will spend sleeping to
+ * simulate a processing delay.
+ *
+ * @param processingDelay
+ */
+ public void setProcessingDelay(long processingDelay) {
+ this.processingDelay = processingDelay;
+ }
+
+}