You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by js...@apache.org on 2008/03/04 19:58:14 UTC

svn commit: r633584 - in /activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset: DataSet.java DataSetConsumer.java DataSetEndpoint.java DataSetSupport.java

Author: jstrachan
Date: Tue Mar  4 10:58:11 2008
New Revision: 633584

URL: http://svn.apache.org/viewvc?rev=633584&view=rev
Log:
improvements to the DataSet support to allow custom delays to be used when sending or consuming messages; also added a report group so that for massive tests we can report every N messages so we can see the progress

Modified:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSet.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetConsumer.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetSupport.java

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSet.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSet.java?rev=633584&r1=633583&r2=633584&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSet.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSet.java Tue Mar  4 10:58:11 2008
@@ -43,4 +43,9 @@
      * Asserts that the expected message has been received for the given index
      */
     void assertMessageExpected(DataSetEndpoint dataSetEndpoint, Exchange expected, Exchange actual, long index) throws Exception;
+
+    /**
+     * Returns the number of messages which should be received before reporting on the progress of the test
+     */
+    long getReportCount();
 }

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetConsumer.java?rev=633584&r1=633583&r2=633584&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetConsumer.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetConsumer.java Tue Mar  4 10:58:11 2008
@@ -41,6 +41,11 @@
         for (long i = 0; i < dataSet.getSize(); i++) {
             Exchange exchange = endpoint.createExchange(i);
             getProcessor().process(exchange);
+
+            long delay = endpoint.getProduceDelay();
+            if (delay > 0) {
+                Thread.sleep(delay);
+            }
         }
     }
 }

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java?rev=633584&r1=633583&r2=633584&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java Tue Mar  4 10:58:11 2008
@@ -43,6 +43,9 @@
     private static final transient Log LOG = LogFactory.getLog(DataSetEndpoint.class);
     private DataSet dataSet;
     private AtomicInteger receivedCounter = new AtomicInteger();
+    private long produceDelay = -1;
+    private long consumeDelay = -1;
+    private long startTime;
 
     public static void assertEquals(String description, Object expected, Object actual, Exchange exchange) {
         if (!ObjectHelper.equal(expected, actual)) {
@@ -90,6 +93,10 @@
     }
 
 
+    // Properties
+    //-------------------------------------------------------------------------
+
+
     public DataSet getDataSet() {
         return dataSet;
     }
@@ -98,8 +105,39 @@
         this.dataSet = dataSet;
     }
 
+    public long getConsumeDelay() {
+        return consumeDelay;
+    }
+
+    /**
+     * Allows a delay to be specified which causes consumers to pause - to simulate slow consumers
+     */
+    public void setConsumeDelay(long consumeDelay) {
+        this.consumeDelay = consumeDelay;
+    }
+
+    public long getProduceDelay() {
+        return produceDelay;
+    }
+
+    /**
+     * Allows a delay to be specified which causes producers to pause - to simpulate slow producers
+     */
+    public void setProduceDelay(long produceDelay) {
+        this.produceDelay = produceDelay;
+    }
+
+
+    // Implementation methods
+    //-------------------------------------------------------------------------
+
+
+
     @Override
     protected void performAssertions(Exchange actual) throws Exception {
+        if (startTime == 0) {
+            startTime = System.currentTimeMillis();
+        }
         int receivedCount = receivedCounter.incrementAndGet();
         long index = receivedCount - 1;
         Exchange expected = createExchange(index);
@@ -108,6 +146,23 @@
         LOG.debug("Received message: " + index + " = " + actual);
 
         assertMessageExpected(index, expected, actual);
+
+        if (consumeDelay > 0) {
+            Thread.sleep(consumeDelay);
+        }
+
+        long group = getDataSet().getReportCount();
+        if (receivedCount % group == 0) {
+            reportProgress(actual, receivedCount);
+        }
+    }
+
+    protected void reportProgress(Exchange actual, int receivedCount) {
+        long time = System.currentTimeMillis();
+        long elapsed = time - startTime;
+        startTime = time;
+
+        LOG.info("Received: " + receivedCount + " messages so far. Last group took: " + elapsed + " millis");
     }
 
     protected void assertMessageExpected(long index, Exchange expected, Exchange actual) throws Exception {

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetSupport.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetSupport.java?rev=633584&r1=633583&r2=633584&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetSupport.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetSupport.java Tue Mar  4 10:58:11 2008
@@ -32,6 +32,7 @@
     private Map<String, Object> defaultHeaders;
     private Processor outputTransformer;
     private long size = 10;
+    private long reportCount = -1;
 
     public DataSetSupport() {
     }
@@ -61,6 +62,9 @@
         DataSetEndpoint.assertEquals("message body", expectedBody, actualBody, actual);
     }
 
+    // Properties
+    //-------------------------------------------------------------------------
+
     public long getSize() {
         return size;
     }
@@ -69,6 +73,20 @@
         this.size = size;
     }
 
+    public long getReportCount() {
+        if (reportCount <= 0) {
+            reportCount = getSize() / 5;
+        }
+        return reportCount;
+    }
+
+    /**
+     * Sets the number of messages in a group on which we will report that messages have been received.
+     */
+    public void setReportCount(long reportCount) {
+        this.reportCount = reportCount;
+    }
+
     public Map<String, Object> getDefaultHeaders() {
         if (defaultHeaders == null) {
             defaultHeaders = new HashMap<String, Object>();
@@ -88,6 +106,9 @@
     public void setOutputTransformer(Processor outputTransformer) {
         this.outputTransformer = outputTransformer;
     }
+
+    // Implementation methods
+    //-------------------------------------------------------------------------
 
     protected abstract Object createMessageBody(long messageIndex);