You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ha...@apache.org on 2009/08/08 07:55:24 UTC

svn commit: r802301 - in /camel/trunk: camel-core/src/main/java/org/apache/camel/component/dataset/ camel-core/src/main/java/org/apache/camel/component/mock/ camel-core/src/test/java/org/apache/camel/component/dataset/ components/camel-spring/src/test/...

Author: hadrian
Date: Sat Aug  8 05:55:24 2009
New Revision: 802301

URL: http://svn.apache.org/viewvc?rev=802301&view=rev
Log:
Fix wait timeout for DataSet and other minor things

Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetTest.java
    camel/trunk/components/camel-spring/src/test/resources/log4j.properties
    camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/component/dataset/SpringDataSetTest-context.xml

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java?rev=802301&r1=802300&r2=802301&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java Sat Aug  8 05:55:24 2009
@@ -39,6 +39,7 @@
     private static final transient Log LOG = LogFactory.getLog(DataSetEndpoint.class);
     private DataSet dataSet;
     private AtomicInteger receivedCounter = new AtomicInteger();
+    private int minRate;
     private long produceDelay;
     private long consumeDelay;
     private long startTime;
@@ -92,13 +93,26 @@
         return exchange;
     }
 
+    public int getMinRate() {
+        return minRate;
+    }
+
+    public void setMinRate(int minRate) {
+        this.minRate = minRate;
+    }
+
     @Override
-    protected void waitForCompleteLatch() throws InterruptedException {
-        // TODO lets do a much better version of this!
-        long size = getDataSet().getSize();
-        size *= 4000;
-        setResultWaitTime(size);
-        super.waitForCompleteLatch();
+    protected void waitForCompleteLatch(long timeout) throws InterruptedException {
+        super.waitForCompleteLatch(timeout);
+
+        if (minRate > 0) {
+            int count = getReceivedCounter();
+            do {
+                // wait as long as we get a decent message rate
+                super.waitForCompleteLatch(1000L);
+                count = getReceivedCounter() - count;
+            } while (count >= minRate);
+        }
     }
 
     // Properties
@@ -117,7 +131,7 @@
     }
 
     /**
-     * Sets how many messages should be preloaded (sent) before the route completes its initialisation
+     * Sets how many messages should be preloaded (sent) before the route completes its initialization
      */
     public void setPreloadSize(long preloadSize) {
         this.preloadSize = preloadSize;
@@ -159,7 +173,8 @@
 
         // now lets assert that they are the same
         if (LOG.isDebugEnabled()) {
-            LOG.debug("Received message: " + index + " = " + actual);
+            Integer dsi = actual.getIn().getHeader(Exchange.DATASET_INDEX, Integer.class);
+            LOG.debug("Received message: " + index + " (DataSet index=" + dsi + ") = " + actual);
         }
 
         assertMessageExpected(index, expected, actual);
@@ -168,8 +183,7 @@
             Thread.sleep(consumeDelay);
         }
 
-        long group = getDataSet().getReportCount();
-        if (receivedCount % group == 0) {
+        if (receivedCount % getDataSet().getReportCount() == 0) {
             reportProgress(actual, receivedCount);
         }
     }
@@ -179,7 +193,8 @@
         long elapsed = time - startTime;
         startTime = time;
 
-        LOG.info("Received: " + receivedCount + " messages so far. Last group took: " + elapsed + " millis");
+        LOG.info("Received: " + receivedCount + " messages so far. Last group of " 
+            + getDataSet().getReportCount() + " took: " + elapsed + " millis");
     }
 
     protected void assertMessageExpected(long index, Exchange expected, Exchange actual) throws Exception {
@@ -192,8 +207,10 @@
     public void start() throws Exception {
         long size = getDataSet().getSize();
         expectedMessageCount((int) size);
+        LOG.info("Start: " + this + " expecting " + size + " messages");
     }
 
     public void stop() throws Exception {
+        LOG.info("Stop: " + this);
     }
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java?rev=802301&r1=802300&r2=802301&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java Sat Aug  8 05:55:24 2009
@@ -783,7 +783,7 @@
         tests = new CopyOnWriteArrayList<Runnable>();
         latch = null;
         sleepForEmptyTest = 0;
-        resultWaitTime = 20000L;
+        resultWaitTime = 0;
         resultMinimumWaitTime = 0L;
         expectedMinimumCount = -1;
         expectedBodyValues = null;
@@ -856,19 +856,26 @@
             fail("Should have a latch!");
         }
 
-        // now lets wait for the results
-        LOG.debug("Waiting on the latch for: " + resultWaitTime + " millis");
         long start = System.currentTimeMillis();
-        latch.await(resultWaitTime, TimeUnit.MILLISECONDS);
+        waitForCompleteLatch(resultWaitTime);
         long delta = System.currentTimeMillis() - start;
         LOG.debug("Took " + delta + " millis to complete latch");
 
         if (resultMinimumWaitTime > 0 && delta < resultMinimumWaitTime) {
             fail("Expected minimum " + resultMinimumWaitTime
-                    + " millis waiting on the result, but was faster with " + delta + " millis.");
+                + " millis waiting on the result, but was faster with " + delta + " millis.");
         }
     }
 
+    protected void waitForCompleteLatch(long timeout) throws InterruptedException {
+        // Wait for a default 10 seconds if resultWaitTime is not set
+        long waitTime = timeout == 0 ? 10000L : timeout;
+
+        // now lets wait for the results
+        LOG.debug("Waiting on the latch for: " + timeout + " millis");
+        latch.await(waitTime, TimeUnit.MILLISECONDS);
+    }
+
     protected void assertEquals(String message, Object expectedValue, Object actualValue) {
         if (!ObjectHelper.equal(expectedValue, actualValue)) {
             fail(message + ". Expected: <" + expectedValue + "> but was: <" + actualValue + ">");

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetTest.java?rev=802301&r1=802300&r2=802301&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetTest.java Sat Aug  8 05:55:24 2009
@@ -44,9 +44,9 @@
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
             public void configure() throws Exception {
-                from("dataset:foo").to("direct:foo");
-
-                from("direct:foo").to("dataset:foo");
+                // start this first to make sure the "direct:foo" consumer is ready
+                from("direct:foo").to("dataset:foo?minRate=50");
+                from("dataset:foo?minRate=50").to("direct:foo");
             }
         };
     }

Modified: camel/trunk/components/camel-spring/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/resources/log4j.properties?rev=802301&r1=802300&r2=802301&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/test/resources/log4j.properties (original)
+++ camel/trunk/components/camel-spring/src/test/resources/log4j.properties Sat Aug  8 05:55:24 2009
@@ -18,14 +18,15 @@
 #
 # The logging properties used for eclipse testing, We want to see debug output on the console.
 #
-log4j.rootLogger=INFO, file
+log4j.rootLogger=WARN, file
 
 log4j.logger.org.springframework=WARN
-#log4j.logger.org.apache.camel=DEBUG
 log4j.logger.org.apache.camel.impl.converter=WARN
 log4j.logger.org.apache.camel.management=WARN
 log4j.logger.org.apache.camel.impl.DefaultPackageScanClassResolver=WARN
 
+#log4j.logger.org.apache.camel=DEBUG
+
 # CONSOLE appender not used by default
 log4j.appender.out=org.apache.log4j.ConsoleAppender
 log4j.appender.out.layout=org.apache.log4j.PatternLayout

Modified: camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/component/dataset/SpringDataSetTest-context.xml
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/component/dataset/SpringDataSetTest-context.xml?rev=802301&r1=802300&r2=802301&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/component/dataset/SpringDataSetTest-context.xml (original)
+++ camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/component/dataset/SpringDataSetTest-context.xml Sat Aug  8 05:55:24 2009
@@ -25,14 +25,15 @@
 
   <camelContext xmlns="http://camel.apache.org/schema/spring">
 
+    <!-- start this first to make sure the "direct:foo" consumer is ready --> 
     <route>
-      <from uri="dataset:foo"/>
-      <to uri="direct:test.queue"/>
+      <from uri="direct:test.queue"/>
+      <to uri="dataset:foo?minRate=50"/>
     </route>
 
     <route>
-      <from uri="direct:test.queue"/>
-      <to uri="dataset:foo"/>
+      <from uri="dataset:foo?minRate=50"/>
+      <to uri="direct:test.queue"/>
     </route>
 
   </camelContext>