You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/06/11 05:08:14 UTC

svn commit: r783612 [3/3] - in /activemq/sandbox/activemq-flow: activemq-all/ activemq-all/src/test/java/org/apache/activemq/ activemq-all/src/test/java/org/apache/activemq/blob/ activemq-all/src/test/java/org/apache/activemq/broker/ activemq-all/src/t...

Added: activemq/sandbox/activemq-flow/activemq-transport/src/test/java/org/apache/activemq/transport/mock/MockTransport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/test/java/org/apache/activemq/transport/mock/MockTransport.java?rev=783612&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-transport/src/test/java/org/apache/activemq/transport/mock/MockTransport.java (added)
+++ activemq/sandbox/activemq-flow/activemq-transport/src/test/java/org/apache/activemq/transport/mock/MockTransport.java Thu Jun 11 03:08:06 2009
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.mock;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.activemq.transport.DefaultTransportListener;
+import org.apache.activemq.transport.FutureResponse;
+import org.apache.activemq.transport.ResponseCallback;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportFilter;
+import org.apache.activemq.transport.TransportListener;
+import org.apache.activemq.wireformat.WireFormat;
+
+/**
+ * @version $Revision: 1.5 $
+ */
+public class MockTransport extends DefaultTransportListener implements Transport {
+
+    protected Transport next;
+    protected TransportListener transportListener;
+
+    public MockTransport(Transport next) {
+        this.next = next;
+    }
+
+    /**
+     */
+    public synchronized void setTransportListener(TransportListener channelListener) {
+        this.transportListener = channelListener;
+        if (channelListener == null) {
+            getNext().setTransportListener(null);
+        } else {
+            getNext().setTransportListener(this);
+        }
+    }
+
+    /**
+     * @see org.apache.activemq.Service#start()
+     * @throws IOException if the next channel has not been set.
+     */
+    public void start() throws Exception {
+        if (getNext() == null) {
+            throw new IOException("The next channel has not been set.");
+        }
+        if (transportListener == null) {
+            throw new IOException("The command listener has not been set.");
+        }
+        getNext().start();
+    }
+
+    /**
+     * @see org.apache.activemq.Service#stop()
+     */
+    public void stop() throws Exception {
+        getNext().stop();
+    }
+
+    public void onCommand(Object command) {
+        getTransportListener().onCommand(command);
+    }
+
+    /**
+     * @return Returns the getNext().
+     */
+    public synchronized Transport getNext() {
+        return next;
+    }
+
+    /**
+     * @return Returns the packetListener.
+     */
+    public synchronized TransportListener getTransportListener() {
+        return transportListener;
+    }
+
+    public String toString() {
+        return getNext().toString();
+    }
+
+    public void oneway(Object command) throws IOException {
+        getNext().oneway(command);
+    }
+
+    public <T> FutureResponse<T> asyncRequest(Object command, ResponseCallback<T> responseCallback) throws IOException {
+        return getNext().asyncRequest(command, null);
+    }
+
+    public Object request(Object command) throws IOException {
+        return getNext().request(command);
+    }
+
+    public Object request(Object command, int timeout) throws IOException {
+        return getNext().request(command, timeout);
+    }
+
+    public void onException(IOException error) {
+        getTransportListener().onException(error);
+    }
+
+    public <T> T narrow(Class<T> target) {
+        if (target.isAssignableFrom(getClass())) {
+            return target.cast(this);
+        }
+        return getNext().narrow(target);
+    }
+
+    public synchronized void setNext(Transport next) {
+        this.next = next;
+    }
+
+    public void install(TransportFilter filter) {
+        filter.setTransportListener(this);
+        getNext().setTransportListener(filter);
+        setNext(filter);
+    }
+
+    public String getRemoteAddress() {
+        return getNext().getRemoteAddress();
+    }
+
+    /**
+     * @see org.apache.activemq.transport.Transport#isFaultTolerant()
+     */
+    public boolean isFaultTolerant() {
+        return getNext().isFaultTolerant();
+    }
+
+	public boolean isDisposed() {
+		return getNext().isDisposed();
+	}
+	
+	public boolean isConnected() {
+       return getNext().isConnected();
+    }
+
+	public void reconnect(URI uri) throws IOException {
+		getNext().reconnect(uri);
+	}
+
+	public boolean isUseInactivityMonitor() {
+		return getNext().isUseInactivityMonitor();
+	}
+
+	public WireFormat getWireformat() {
+		return getNext().getWireformat();
+	}
+}

Added: activemq/sandbox/activemq-flow/activemq-transport/src/test/java/org/apache/activemq/transport/mock/MockTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/test/java/org/apache/activemq/transport/mock/MockTransportFactory.java?rev=783612&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-transport/src/test/java/org/apache/activemq/transport/mock/MockTransportFactory.java (added)
+++ activemq/sandbox/activemq-flow/activemq-transport/src/test/java/org/apache/activemq/transport/mock/MockTransportFactory.java Thu Jun 11 03:08:06 2009
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.mock;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import org.apache.activemq.transport.MutexTransport;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportFactory;
+import org.apache.activemq.transport.TransportServer;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.activemq.util.URISupport;
+import org.apache.activemq.util.URISupport.CompositeData;
+
+public class MockTransportFactory extends TransportFactory {
+
+    public Transport doConnect(URI location) throws URISyntaxException, Exception {
+        Transport transport = createTransport(URISupport.parseComposite(location));
+        transport = new MutexTransport(transport);
+//        transport = new ResponseCorrelator(transport);
+        return transport;
+    }
+
+    public Transport doCompositeConnect(URI location) throws URISyntaxException, Exception {
+        return createTransport(URISupport.parseComposite(location));
+    }
+
+    /**
+     * @param location
+     * @return
+     * @throws Exception
+     */
+    public Transport createTransport(CompositeData compositData) throws Exception {
+        MockTransport transport = new MockTransport(TransportFactory.compositeConnect(compositData.getComponents()[0]));
+        IntrospectionSupport.setProperties(transport, compositData.getParameters());
+        return transport;
+    }
+
+    public TransportServer doBind(URI location) throws IOException {
+        throw new IOException("This protocol does not support being bound.");
+    }
+
+}

Propchange: activemq/sandbox/activemq-flow/activemq-transport/src/test/java/org/apache/activemq/transport/mock/MockTransportFactory.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-transport/src/test/java/org/apache/activemq/transport/mock/package.html
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/test/java/org/apache/activemq/transport/mock/package.html?rev=783612&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-transport/src/test/java/org/apache/activemq/transport/mock/package.html (added)
+++ activemq/sandbox/activemq-flow/activemq-transport/src/test/java/org/apache/activemq/transport/mock/package.html Thu Jun 11 03:08:06 2009
@@ -0,0 +1,25 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<html>
+<head>
+</head>
+<body>
+
+ A mock implementation of the Transport layer useful for testing
+ 
+</body>
+</html>

Propchange: activemq/sandbox/activemq-flow/activemq-transport/src/test/java/org/apache/activemq/transport/mock/package.html
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-transport/src/test/resources/META-INF/services/org/apache/activemq/transport/mock
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/test/resources/META-INF/services/org/apache/activemq/transport/mock?rev=783612&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-transport/src/test/resources/META-INF/services/org/apache/activemq/transport/mock (added)
+++ activemq/sandbox/activemq-flow/activemq-transport/src/test/resources/META-INF/services/org/apache/activemq/transport/mock Thu Jun 11 03:08:06 2009
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+## 
+## http://www.apache.org/licenses/LICENSE-2.0
+## 
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.activemq.transport.mock.MockTransportFactory

Modified: activemq/sandbox/activemq-flow/activemq-util/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-util/pom.xml?rev=783612&r1=783611&r2=783612&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-util/pom.xml (original)
+++ activemq/sandbox/activemq-flow/activemq-util/pom.xml Thu Jun 11 03:08:06 2009
@@ -58,5 +58,23 @@
     </dependency>
       
   </dependencies>
+  
+  <build>
+    <plugins>
+
+      <!-- Generate a test jar for the test cases in this package -->
+      <plugin>
+        <artifactId>maven-jar-plugin</artifactId>
+        <executions>
+          <execution>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+
+    </plugins>
+  </build>  
 
 </project>

Added: activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/IOHelper.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/IOHelper.java?rev=783612&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/IOHelper.java (added)
+++ activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/IOHelper.java Thu Jun 11 03:08:06 2009
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.util;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * @version $Revision: 661435 $
+ */
+public final class IOHelper {
+    protected static final int MAX_DIR_NAME_LENGTH;
+    protected static final int MAX_FILE_NAME_LENGTH;
+    private static final int DEFAULT_BUFFER_SIZE = 4096;
+    private IOHelper() {
+    }
+
+    public static String getDefaultDataDirectory() {
+        return getDefaultDirectoryPrefix() + "activemq-data";
+    }
+
+    public static String getDefaultStoreDirectory() {
+        return getDefaultDirectoryPrefix() + "amqstore";
+    }
+
+    /**
+     * Allows a system property to be used to overload the default data
+     * directory which can be useful for forcing the test cases to use a target/
+     * prefix
+     */
+    public static String getDefaultDirectoryPrefix() {
+        try {
+            return System.getProperty("org.apache.activemq.default.directory.prefix", "");
+        } catch (Exception e) {
+            return "";
+        }
+    }
+
+    /**
+     * Converts any string into a string that is safe to use as a file name.
+     * The result will only include ascii characters and numbers, and the "-","_", and "." characters.
+     *
+     * @param name
+     * @return
+     */
+    public static String toFileSystemDirectorySafeName(String name) {
+        return toFileSystemSafeName(name, true, MAX_DIR_NAME_LENGTH);
+    }
+    
+    public static String toFileSystemSafeName(String name) {
+        return toFileSystemSafeName(name, false, MAX_FILE_NAME_LENGTH);
+    }
+    
+    /**
+     * Converts any string into a string that is safe to use as a file name.
+     * The result will only include ascii characters and numbers, and the "-","_", and "." characters.
+     *
+     * @param name
+     * @param dirSeparators 
+     * @param maxFileLength 
+     * @return
+     */
+    public static String toFileSystemSafeName(String name,boolean dirSeparators,int maxFileLength) {
+        int size = name.length();
+        StringBuffer rc = new StringBuffer(size * 2);
+        for (int i = 0; i < size; i++) {
+            char c = name.charAt(i);
+            boolean valid = c >= 'a' && c <= 'z';
+            valid = valid || (c >= 'A' && c <= 'Z');
+            valid = valid || (c >= '0' && c <= '9');
+            valid = valid || (c == '_') || (c == '-') || (c == '.') || (c=='#')
+                    ||(dirSeparators && ( (c == '/') || (c == '\\')));
+
+            if (valid) {
+                rc.append(c);
+            } else {
+                // Encode the character using hex notation
+                rc.append('#');
+                rc.append(HexSupport.toHexFromInt(c, true));
+            }
+        }
+        String result = rc.toString();
+        if (result.length() > maxFileLength) {
+            result = result.substring(result.length()-maxFileLength,result.length());
+        }
+        return result;
+    }
+    
+    public static boolean deleteFile(File fileToDelete) {
+        if (fileToDelete == null || !fileToDelete.exists()) {
+            return true;
+        }
+        boolean result = deleteChildren(fileToDelete);
+        result &= fileToDelete.delete();
+        return result;
+    }
+    
+    public static boolean deleteChildren(File parent) {
+        if (parent == null || !parent.exists()) {
+            return false;
+        }
+        boolean result = true;
+        if (parent.isDirectory()) {
+            File[] files = parent.listFiles();
+            if (files == null) {
+                result = false;
+            } else {
+                for (int i = 0; i < files.length; i++) {
+                    File file = files[i];
+                    if (file.getName().equals(".")
+                            || file.getName().equals("..")) {
+                        continue;
+                    }
+                    if (file.isDirectory()) {
+                        result &= deleteFile(file);
+                    } else {
+                        result &= file.delete();
+                    }
+                }
+            }
+        }
+       
+        return result;
+    }
+    
+    
+    public static void moveFile(File src, File targetDirectory) throws IOException {
+        if (!src.renameTo(new File(targetDirectory, src.getName()))) {
+            throw new IOException("Failed to move " + src + " to " + targetDirectory);
+        }
+    }
+    
+    public static void copyFile(File src, File dest) throws IOException {
+        FileInputStream fileSrc = new FileInputStream(src);
+        FileOutputStream fileDest = new FileOutputStream(dest);
+        copyInputStream(fileSrc, fileDest);
+    }
+    
+    public static void copyInputStream(InputStream in, OutputStream out) throws IOException {
+        byte[] buffer = new byte[DEFAULT_BUFFER_SIZE];
+        int len = in.read(buffer);
+        while (len >= 0) {
+            out.write(buffer, 0, len);
+            len = in.read(buffer);
+        }
+        in.close();
+        out.close();
+    }
+    
+    static {
+        MAX_DIR_NAME_LENGTH = Integer.valueOf(System.getProperty("MaximumDirNameLength","200")).intValue();  
+        MAX_FILE_NAME_LENGTH = Integer.valueOf(System.getProperty("MaximumFileNameLength","64")).intValue();             
+    }
+
+    
+    public static void mkdirs(File dir) throws IOException {
+        if (dir.exists()) {
+            if (!dir.isDirectory()) {
+                throw new IOException("Failed to create directory '" + dir +"', regular file already existed with that name");
+            }
+            
+        } else {
+            if (!dir.mkdirs()) {
+                throw new IOException("Failed to create directory '" + dir+"'");
+            }
+        }
+    }
+}

Added: activemq/sandbox/activemq-flow/activemq-util/src/test/java/org/apache/activemq/util/MessageIdList.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-util/src/test/java/org/apache/activemq/util/MessageIdList.java?rev=783612&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-util/src/test/java/org/apache/activemq/util/MessageIdList.java (added)
+++ activemq/sandbox/activemq-flow/activemq-util/src/test/java/org/apache/activemq/util/MessageIdList.java Thu Jun 11 03:08:06 2009
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.util;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * A simple container of messages for performing testing and rendezvous style
+ * code. You can use this class a {@link MessageListener} and then make
+ * assertions about how many messages it has received allowing a certain maximum
+ * amount of time to ensure that the test does not hang forever. Also you can
+ * chain these instances together with the {@link #setParent(MessageListener)}
+ * method so that you can aggregate the total number of messages consumed across
+ * a number of consumers.
+ * 
+ * @version $Revision: 1.6 $
+ */
+public class MessageIdList extends Assert implements MessageListener {
+
+    private static final Log LOG = LogFactory.getLog(MessageIdList.class);
+
+    private List<String> messageIds = new ArrayList<String>();
+    private Object semaphore;
+    private boolean verbose;
+    private MessageListener parent;
+    private long maximumDuration = 15000L;
+    private long processingDelay;
+
+    private CountDownLatch countDownLatch;
+
+    public MessageIdList() {
+        this(new Object());
+    }
+
+    public MessageIdList(Object semaphore) {
+        this.semaphore = semaphore;
+    }
+
+    public boolean equals(Object that) {
+        if (that instanceof MessageIdList) {
+            MessageIdList thatList = (MessageIdList)that;
+            return getMessageIds().equals(thatList.getMessageIds());
+        }
+        return false;
+    }
+
+    public int hashCode() {
+        synchronized (semaphore) {
+            return messageIds.hashCode() + 1;
+        }
+    }
+
+    public String toString() {
+        synchronized (semaphore) {
+            return messageIds.toString();
+        }
+    }
+
+    /**
+     * @return all the messages on the list so far, clearing the buffer
+     */
+    public List<String> flushMessages() {
+        synchronized (semaphore) {
+            List<String> answer = new ArrayList<String>(messageIds);
+            messageIds.clear();
+            return answer;
+        }
+    }
+
+    public synchronized List<String> getMessageIds() {
+        synchronized (semaphore) {
+            return new ArrayList<String>(messageIds);
+        }
+    }
+
+    public void onMessage(Message message) {
+        String id = null;
+        try {
+            id = message.getJMSMessageID();
+            synchronized (semaphore) {
+                messageIds.add(id);
+                semaphore.notifyAll();
+            }
+            if (countDownLatch != null) {
+                countDownLatch.countDown();
+            }
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Received message: " + message);
+            }
+        } catch (JMSException e) {
+            e.printStackTrace();
+        }
+        if (parent != null) {
+            parent.onMessage(message);
+        }
+        if (processingDelay > 0) {
+            try {
+                Thread.sleep(processingDelay);
+            } catch (InterruptedException e) {
+            }
+        }
+    }
+
+    public int getMessageCount() {
+        synchronized (semaphore) {
+            return messageIds.size();
+        }
+    }
+
+    public void waitForMessagesToArrive(int messageCount) {
+        LOG.info("Waiting for " + messageCount + " message(s) to arrive");
+
+        long start = System.currentTimeMillis();
+
+        for (int i = 0; i < messageCount; i++) {
+            try {
+                if (hasReceivedMessages(messageCount)) {
+                    break;
+                }
+                long duration = System.currentTimeMillis() - start;
+                if (duration >= maximumDuration) {
+                    break;
+                }
+                synchronized (semaphore) {
+                    semaphore.wait(maximumDuration - duration);
+                }
+            } catch (InterruptedException e) {
+                LOG.info("Caught: " + e);
+            }
+        }
+        long end = System.currentTimeMillis() - start;
+
+        LOG.info("End of wait for " + end + " millis and received: " + getMessageCount() + " messages");
+    }
+
+    /**
+     * Performs a testing assertion that the correct number of messages have
+     * been received without waiting
+     * 
+     * @param messageCount
+     */
+    public void assertMessagesReceivedNoWait(int messageCount) {
+        assertEquals("expected number of messages when received", messageCount, getMessageCount());
+    }
+
+    /**
+     * Performs a testing assertion that the correct number of messages have
+     * been received waiting for the messages to arrive up to a fixed amount of
+     * time.
+     * 
+     * @param messageCount
+     */
+    public void assertMessagesReceived(int messageCount) {
+        waitForMessagesToArrive(messageCount);
+
+        assertMessagesReceivedNoWait(messageCount);
+    }
+
+    /**
+     * Asserts that there are at least the given number of messages received
+     * without waiting.
+     */
+    public void assertAtLeastMessagesReceived(int messageCount) {
+        int actual = getMessageCount();
+        assertTrue("at least: " + messageCount + " messages received. Actual: " + actual, actual >= messageCount);
+    }
+
+    /**
+     * Asserts that there are at most the number of messages received without
+     * waiting
+     * 
+     * @param messageCount
+     */
+    public void assertAtMostMessagesReceived(int messageCount) {
+        int actual = getMessageCount();
+        assertTrue("at most: " + messageCount + " messages received. Actual: " + actual, actual <= messageCount);
+    }
+
+    public boolean hasReceivedMessage() {
+        return getMessageCount() == 0;
+    }
+
+    public boolean hasReceivedMessages(int messageCount) {
+        return getMessageCount() >= messageCount;
+    }
+
+    public boolean isVerbose() {
+        return verbose;
+    }
+
+    public void setVerbose(boolean verbose) {
+        this.verbose = verbose;
+    }
+
+    public MessageListener getParent() {
+        return parent;
+    }
+
+    /**
+     * Allows a parent listener to be specified such as to aggregate messages
+     * consumed across consumers
+     */
+    public void setParent(MessageListener parent) {
+        this.parent = parent;
+    }
+
+    /**
+     * @return the maximumDuration
+     */
+    public long getMaximumDuration() {
+        return this.maximumDuration;
+    }
+
+    /**
+     * @param maximumDuration the maximumDuration to set
+     */
+    public void setMaximumDuration(long maximumDuration) {
+        this.maximumDuration = maximumDuration;
+    }
+
+    public void setCountDownLatch(CountDownLatch countDownLatch) {
+        this.countDownLatch = countDownLatch;
+    }
+
+    /**
+     * Gets the amount of time the message listener will spend sleeping to
+     * simulate a processing delay.
+     * 
+     * @return
+     */
+    public long getProcessingDelay() {
+        return processingDelay;
+    }
+
+    /**
+     * Sets the amount of time the message listener will spend sleeping to
+     * simulate a processing delay.
+     * 
+     * @param processingDelay
+     */
+    public void setProcessingDelay(long processingDelay) {
+        this.processingDelay = processingDelay;
+    }
+
+}