You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by js...@apache.org on 2008/03/04 19:58:14 UTC
svn commit: r633584 - in
/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset:
DataSet.java DataSetConsumer.java DataSetEndpoint.java DataSetSupport.java
Author: jstrachan
Date: Tue Mar 4 10:58:11 2008
New Revision: 633584
URL: http://svn.apache.org/viewvc?rev=633584&view=rev
Log:
improvements to the DataSet support to allow custom delays to be used when sending or consuming messages; also added a report group so that for massive tests we can report every N messages so we can see the progress
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSet.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetConsumer.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetSupport.java
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSet.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSet.java?rev=633584&r1=633583&r2=633584&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSet.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSet.java Tue Mar 4 10:58:11 2008
@@ -43,4 +43,9 @@
* Asserts that the expected message has been received for the given index
*/
void assertMessageExpected(DataSetEndpoint dataSetEndpoint, Exchange expected, Exchange actual, long index) throws Exception;
+
+ /**
+ * Returns the number of messages which should be received before reporting on the progress of the test
+ */
+ long getReportCount();
}
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetConsumer.java?rev=633584&r1=633583&r2=633584&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetConsumer.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetConsumer.java Tue Mar 4 10:58:11 2008
@@ -41,6 +41,11 @@
for (long i = 0; i < dataSet.getSize(); i++) {
Exchange exchange = endpoint.createExchange(i);
getProcessor().process(exchange);
+
+ long delay = endpoint.getProduceDelay();
+ if (delay > 0) {
+ Thread.sleep(delay);
+ }
}
}
}
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java?rev=633584&r1=633583&r2=633584&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java Tue Mar 4 10:58:11 2008
@@ -43,6 +43,9 @@
private static final transient Log LOG = LogFactory.getLog(DataSetEndpoint.class);
private DataSet dataSet;
private AtomicInteger receivedCounter = new AtomicInteger();
+ private long produceDelay = -1;
+ private long consumeDelay = -1;
+ private long startTime;
public static void assertEquals(String description, Object expected, Object actual, Exchange exchange) {
if (!ObjectHelper.equal(expected, actual)) {
@@ -90,6 +93,10 @@
}
+ // Properties
+ //-------------------------------------------------------------------------
+
+
public DataSet getDataSet() {
return dataSet;
}
@@ -98,8 +105,39 @@
this.dataSet = dataSet;
}
+ public long getConsumeDelay() {
+ return consumeDelay;
+ }
+
+ /**
+ * Allows a delay to be specified which causes consumers to pause - to simulate slow consumers
+ */
+ public void setConsumeDelay(long consumeDelay) {
+ this.consumeDelay = consumeDelay;
+ }
+
+ public long getProduceDelay() {
+ return produceDelay;
+ }
+
+ /**
+ * Allows a delay to be specified which causes producers to pause - to simpulate slow producers
+ */
+ public void setProduceDelay(long produceDelay) {
+ this.produceDelay = produceDelay;
+ }
+
+
+ // Implementation methods
+ //-------------------------------------------------------------------------
+
+
+
@Override
protected void performAssertions(Exchange actual) throws Exception {
+ if (startTime == 0) {
+ startTime = System.currentTimeMillis();
+ }
int receivedCount = receivedCounter.incrementAndGet();
long index = receivedCount - 1;
Exchange expected = createExchange(index);
@@ -108,6 +146,23 @@
LOG.debug("Received message: " + index + " = " + actual);
assertMessageExpected(index, expected, actual);
+
+ if (consumeDelay > 0) {
+ Thread.sleep(consumeDelay);
+ }
+
+ long group = getDataSet().getReportCount();
+ if (receivedCount % group == 0) {
+ reportProgress(actual, receivedCount);
+ }
+ }
+
+ protected void reportProgress(Exchange actual, int receivedCount) {
+ long time = System.currentTimeMillis();
+ long elapsed = time - startTime;
+ startTime = time;
+
+ LOG.info("Received: " + receivedCount + " messages so far. Last group took: " + elapsed + " millis");
}
protected void assertMessageExpected(long index, Exchange expected, Exchange actual) throws Exception {
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetSupport.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetSupport.java?rev=633584&r1=633583&r2=633584&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetSupport.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetSupport.java Tue Mar 4 10:58:11 2008
@@ -32,6 +32,7 @@
private Map<String, Object> defaultHeaders;
private Processor outputTransformer;
private long size = 10;
+ private long reportCount = -1;
public DataSetSupport() {
}
@@ -61,6 +62,9 @@
DataSetEndpoint.assertEquals("message body", expectedBody, actualBody, actual);
}
+ // Properties
+ //-------------------------------------------------------------------------
+
public long getSize() {
return size;
}
@@ -69,6 +73,20 @@
this.size = size;
}
+ public long getReportCount() {
+ if (reportCount <= 0) {
+ reportCount = getSize() / 5;
+ }
+ return reportCount;
+ }
+
+ /**
+ * Sets the number of messages in a group on which we will report that messages have been received.
+ */
+ public void setReportCount(long reportCount) {
+ this.reportCount = reportCount;
+ }
+
public Map<String, Object> getDefaultHeaders() {
if (defaultHeaders == null) {
defaultHeaders = new HashMap<String, Object>();
@@ -88,6 +106,9 @@
public void setOutputTransformer(Processor outputTransformer) {
this.outputTransformer = outputTransformer;
}
+
+ // Implementation methods
+ //-------------------------------------------------------------------------
protected abstract Object createMessageBody(long messageIndex);