You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/03/15 16:15:31 UTC
svn commit: r386092 -
/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/LargeStreamletTest.java
Author: chirino
Date: Wed Mar 15 07:15:29 2006
New Revision: 386092
URL: http://svn.apache.org/viewcvs?rev=386092&view=rev
Log:
Be more carefull about how long we wait for the test to timeout
Modified:
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/LargeStreamletTest.java
Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/LargeStreamletTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/LargeStreamletTest.java?rev=386092&r1=386091&r2=386092&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/LargeStreamletTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/LargeStreamletTest.java Wed Mar 15 07:15:29 2006
@@ -25,10 +25,8 @@
import junit.framework.Assert;
import junit.framework.TestCase;
-
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
/**
* @author rnewson
@@ -41,9 +39,9 @@
private static final int MESSAGE_COUNT = 1024*1024;
- private int totalRead;
+ private AtomicInteger totalRead = new AtomicInteger();
- private int totalWritten;
+ private AtomicInteger totalWritten = new AtomicInteger();
private AtomicBoolean stopThreads = new AtomicBoolean(false);
@@ -66,7 +64,7 @@
final Thread readerThread = new Thread(new Runnable() {
public void run() {
- totalRead = 0;
+ totalRead.set(0);
try {
final InputStream inputStream = connection
.createInputStream(destination);
@@ -75,7 +73,7 @@
final byte[] buf = new byte[BUFFER_SIZE];
while (!stopThreads.get()
&& (read = inputStream.read(buf)) != -1) {
- totalRead += read;
+ totalRead.addAndGet(read);
}
} finally {
inputStream.close();
@@ -93,7 +91,7 @@
final Thread writerThread = new Thread(new Runnable() {
public void run() {
- totalWritten = 0;
+ totalWritten.set(0);
int count = MESSAGE_COUNT;
try {
final OutputStream outputStream = connection
@@ -103,7 +101,7 @@
new Random().nextBytes(buf);
while (count > 0 && !stopThreads.get()) {
outputStream.write(buf);
- totalWritten += buf.length;
+ totalWritten.addAndGet(buf.length);
count--;
}
} finally {
@@ -122,8 +120,19 @@
readerThread.start();
writerThread.start();
- writerThread.join(60 * 1000);
- readerThread.join(60 * 1000);
+
+ // Wait till reader is has finished receiving all the messages or he has stopped
+ // receiving messages.
+ Thread.sleep(1000);
+ int lastRead = totalRead.get();
+ while( readerThread.isAlive() ) {
+ readerThread.join(1000);
+ // No progress?? then stop waiting..
+ if( lastRead == totalRead.get() ) {
+ break;
+ }
+ lastRead = totalRead.get();
+ }
stopThreads.set(true);
@@ -131,7 +140,7 @@
assertTrue("Should not have received a writer exception", writerException == null);
Assert.assertEquals("Not all messages accounted for",
- totalWritten, totalRead);
+ totalWritten.get(), totalRead.get());
} finally {
session.close();