You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ha...@apache.org on 2009/08/08 07:55:24 UTC
svn commit: r802301 - in /camel/trunk:
camel-core/src/main/java/org/apache/camel/component/dataset/
camel-core/src/main/java/org/apache/camel/component/mock/
camel-core/src/test/java/org/apache/camel/component/dataset/
components/camel-spring/src/test/...
Author: hadrian
Date: Sat Aug 8 05:55:24 2009
New Revision: 802301
URL: http://svn.apache.org/viewvc?rev=802301&view=rev
Log:
Fix wait timeout for DataSet and other minor things
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java
camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
camel/trunk/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetTest.java
camel/trunk/components/camel-spring/src/test/resources/log4j.properties
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/component/dataset/SpringDataSetTest-context.xml
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java?rev=802301&r1=802300&r2=802301&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java Sat Aug 8 05:55:24 2009
@@ -39,6 +39,7 @@
private static final transient Log LOG = LogFactory.getLog(DataSetEndpoint.class);
private DataSet dataSet;
private AtomicInteger receivedCounter = new AtomicInteger();
+ private int minRate;
private long produceDelay;
private long consumeDelay;
private long startTime;
@@ -92,13 +93,26 @@
return exchange;
}
+ public int getMinRate() {
+ return minRate;
+ }
+
+ public void setMinRate(int minRate) {
+ this.minRate = minRate;
+ }
+
@Override
- protected void waitForCompleteLatch() throws InterruptedException {
- // TODO lets do a much better version of this!
- long size = getDataSet().getSize();
- size *= 4000;
- setResultWaitTime(size);
- super.waitForCompleteLatch();
+ protected void waitForCompleteLatch(long timeout) throws InterruptedException {
+ super.waitForCompleteLatch(timeout);
+
+ if (minRate > 0) {
+ int count = getReceivedCounter();
+ do {
+ // wait as long as we get a decent message rate
+ super.waitForCompleteLatch(1000L);
+ count = getReceivedCounter() - count;
+ } while (count >= minRate);
+ }
}
// Properties
@@ -117,7 +131,7 @@
}
/**
- * Sets how many messages should be preloaded (sent) before the route completes its initialisation
+ * Sets how many messages should be preloaded (sent) before the route completes its initialization
*/
public void setPreloadSize(long preloadSize) {
this.preloadSize = preloadSize;
@@ -159,7 +173,8 @@
// now lets assert that they are the same
if (LOG.isDebugEnabled()) {
- LOG.debug("Received message: " + index + " = " + actual);
+ Integer dsi = actual.getIn().getHeader(Exchange.DATASET_INDEX, Integer.class);
+ LOG.debug("Received message: " + index + " (DataSet index=" + dsi + ") = " + actual);
}
assertMessageExpected(index, expected, actual);
@@ -168,8 +183,7 @@
Thread.sleep(consumeDelay);
}
- long group = getDataSet().getReportCount();
- if (receivedCount % group == 0) {
+ if (receivedCount % getDataSet().getReportCount() == 0) {
reportProgress(actual, receivedCount);
}
}
@@ -179,7 +193,8 @@
long elapsed = time - startTime;
startTime = time;
- LOG.info("Received: " + receivedCount + " messages so far. Last group took: " + elapsed + " millis");
+ LOG.info("Received: " + receivedCount + " messages so far. Last group of "
+ + getDataSet().getReportCount() + " took: " + elapsed + " millis");
}
protected void assertMessageExpected(long index, Exchange expected, Exchange actual) throws Exception {
@@ -192,8 +207,10 @@
public void start() throws Exception {
long size = getDataSet().getSize();
expectedMessageCount((int) size);
+ LOG.info("Start: " + this + " expecting " + size + " messages");
}
public void stop() throws Exception {
+ LOG.info("Stop: " + this);
}
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java?rev=802301&r1=802300&r2=802301&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java Sat Aug 8 05:55:24 2009
@@ -783,7 +783,7 @@
tests = new CopyOnWriteArrayList<Runnable>();
latch = null;
sleepForEmptyTest = 0;
- resultWaitTime = 20000L;
+ resultWaitTime = 0;
resultMinimumWaitTime = 0L;
expectedMinimumCount = -1;
expectedBodyValues = null;
@@ -856,19 +856,26 @@
fail("Should have a latch!");
}
- // now lets wait for the results
- LOG.debug("Waiting on the latch for: " + resultWaitTime + " millis");
long start = System.currentTimeMillis();
- latch.await(resultWaitTime, TimeUnit.MILLISECONDS);
+ waitForCompleteLatch(resultWaitTime);
long delta = System.currentTimeMillis() - start;
LOG.debug("Took " + delta + " millis to complete latch");
if (resultMinimumWaitTime > 0 && delta < resultMinimumWaitTime) {
fail("Expected minimum " + resultMinimumWaitTime
- + " millis waiting on the result, but was faster with " + delta + " millis.");
+ + " millis waiting on the result, but was faster with " + delta + " millis.");
}
}
+ protected void waitForCompleteLatch(long timeout) throws InterruptedException {
+ // Wait for a default 10 seconds if resultWaitTime is not set
+ long waitTime = timeout == 0 ? 10000L : timeout;
+
+ // now lets wait for the results
+ LOG.debug("Waiting on the latch for: " + timeout + " millis");
+ latch.await(waitTime, TimeUnit.MILLISECONDS);
+ }
+
protected void assertEquals(String message, Object expectedValue, Object actualValue) {
if (!ObjectHelper.equal(expectedValue, actualValue)) {
fail(message + ". Expected: <" + expectedValue + "> but was: <" + actualValue + ">");
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetTest.java?rev=802301&r1=802300&r2=802301&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetTest.java Sat Aug 8 05:55:24 2009
@@ -44,9 +44,9 @@
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
public void configure() throws Exception {
- from("dataset:foo").to("direct:foo");
-
- from("direct:foo").to("dataset:foo");
+ // start this first to make sure the "direct:foo" consumer is ready
+ from("direct:foo").to("dataset:foo?minRate=50");
+ from("dataset:foo?minRate=50").to("direct:foo");
}
};
}
Modified: camel/trunk/components/camel-spring/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/resources/log4j.properties?rev=802301&r1=802300&r2=802301&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/test/resources/log4j.properties (original)
+++ camel/trunk/components/camel-spring/src/test/resources/log4j.properties Sat Aug 8 05:55:24 2009
@@ -18,14 +18,15 @@
#
# The logging properties used for eclipse testing, We want to see debug output on the console.
#
-log4j.rootLogger=INFO, file
+log4j.rootLogger=WARN, file
log4j.logger.org.springframework=WARN
-#log4j.logger.org.apache.camel=DEBUG
log4j.logger.org.apache.camel.impl.converter=WARN
log4j.logger.org.apache.camel.management=WARN
log4j.logger.org.apache.camel.impl.DefaultPackageScanClassResolver=WARN
+#log4j.logger.org.apache.camel=DEBUG
+
# CONSOLE appender not used by default
log4j.appender.out=org.apache.log4j.ConsoleAppender
log4j.appender.out.layout=org.apache.log4j.PatternLayout
Modified: camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/component/dataset/SpringDataSetTest-context.xml
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/component/dataset/SpringDataSetTest-context.xml?rev=802301&r1=802300&r2=802301&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/component/dataset/SpringDataSetTest-context.xml (original)
+++ camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/component/dataset/SpringDataSetTest-context.xml Sat Aug 8 05:55:24 2009
@@ -25,14 +25,15 @@
<camelContext xmlns="http://camel.apache.org/schema/spring">
+ <!-- start this first to make sure the "direct:foo" consumer is ready -->
<route>
- <from uri="dataset:foo"/>
- <to uri="direct:test.queue"/>
+ <from uri="direct:test.queue"/>
+ <to uri="dataset:foo?minRate=50"/>
</route>
<route>
- <from uri="direct:test.queue"/>
- <to uri="dataset:foo"/>
+ <from uri="dataset:foo?minRate=50"/>
+ <to uri="direct:test.queue"/>
</route>
</camelContext>