You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by js...@apache.org on 2008/04/02 17:15:09 UTC
svn commit: r643934 - in /activemq/camel/trunk:
camel-core/src/main/java/org/apache/camel/component/dataset/
camel-core/src/main/java/org/apache/camel/component/log/
camel-core/src/main/java/org/apache/camel/component/mock/
camel-core/src/main/java/org...
Author: jstrachan
Date: Wed Apr 2 08:15:07 2008
New Revision: 643934
URL: http://svn.apache.org/viewvc?rev=643934&view=rev
Log:
added patch for http://issues.apache.org/activemq/browse/CAMEL-429
Added:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThroughputLogger.java (with props)
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/log/LogComponent.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockComponent.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileAsyncRouteTest.java
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerProducerRouteTest.java
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileDeleteRouteTest.java
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileRouteGenerateEmptyExchangeWhenIdleTest.java
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileRouteTest.java
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java
activemq/camel/trunk/components/camel-bam/src/test/java/org/apache/camel/bam/BamRouteTest.java
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java?rev=643934&r1=643933&r2=643934&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java Wed Apr 2 08:15:07 2008
@@ -93,14 +93,13 @@
// TODO lets do a much better version of this!
long size = getDataSet().getSize();
size *= 4000;
- setDefaulResultWaitMillis(size);
+ setResultWaitTime(size);
super.waitForCompleteLatch();
}
// Properties
//-------------------------------------------------------------------------
-
public DataSet getDataSet() {
return dataSet;
}
@@ -131,12 +130,9 @@
this.produceDelay = produceDelay;
}
-
// Implementation methods
//-------------------------------------------------------------------------
-
-
@Override
protected void performAssertions(Exchange actual) throws Exception {
if (startTime == 0) {
@@ -147,7 +143,9 @@
Exchange expected = createExchange(index);
// now lets assert that they are the same
- LOG.debug("Received message: " + index + " = " + actual);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Received message: " + index + " = " + actual);
+ }
assertMessageExpected(index, expected, actual);
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/log/LogComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/log/LogComponent.java?rev=643934&r1=643933&r2=643934&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/log/LogComponent.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/log/LogComponent.java Wed Apr 2 08:15:07 2008
@@ -20,10 +20,12 @@
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
+import org.apache.camel.converter.ObjectConverter;
import org.apache.camel.impl.DefaultComponent;
import org.apache.camel.impl.ProcessorEndpoint;
import org.apache.camel.processor.Logger;
import org.apache.camel.processor.LoggingLevel;
+import org.apache.camel.processor.ThroughputLogger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -35,7 +37,15 @@
protected Endpoint<Exchange> createEndpoint(String uri, String remaining, Map parameters) throws Exception {
LoggingLevel level = getLoggingLevel(parameters);
- Logger logger = new Logger(remaining, level);
+ Object value = parameters.remove("groupSize");
+
+ Logger logger;
+ if (value != null) {
+ logger = new ThroughputLogger(remaining, level, ObjectConverter.toInteger(value));
+ }
+ else {
+ logger = new Logger(remaining, level);
+ }
return new ProcessorEndpoint(uri, this, logger);
}
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockComponent.java?rev=643934&r1=643933&r2=643934&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockComponent.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockComponent.java Wed Apr 2 08:15:07 2008
@@ -20,6 +20,9 @@
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.processor.ThroughputLogger;
+import org.apache.camel.converter.ObjectConverter;
import org.apache.camel.impl.DefaultComponent;
/**
@@ -31,6 +34,12 @@
@Override
protected Endpoint<Exchange> createEndpoint(String uri, String remaining, Map parameters) throws Exception {
- return new MockEndpoint(uri, this);
+ MockEndpoint endpoint = new MockEndpoint(uri, this);
+ Integer value = ObjectConverter.toInteger(parameters.remove("reportGroup"));
+ if (value != null) {
+ Processor reporter = new ThroughputLogger("org.apache.camel.mock:" + remaining, value);
+ endpoint.setReporter(reporter);
+ }
+ return endpoint;
}
}
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java?rev=643934&r1=643933&r2=643934&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java Wed Apr 2 08:15:07 2008
@@ -61,7 +61,7 @@
private List<Runnable> tests;
private CountDownLatch latch;
private long sleepForEmptyTest;
- private long defaulResultWaitMillis;
+ private long resultWaitTime;
private int expectedMinimumCount;
private List expectedBodyValues;
private List actualBodyValues;
@@ -69,6 +69,7 @@
private String headerName;
private String headerValue;
private Object actualHeader;
+ private Processor reporter;
public MockEndpoint(String endpointUri, Component component) {
super(endpointUri, component);
@@ -152,6 +153,11 @@
};
}
+ public void reset() {
+ init();
+ }
+
+
// Testing API
// -------------------------------------------------------------------------
@@ -250,12 +256,7 @@
* expected by this endpoint
*/
public void expectedMessageCount(int expectedCount) {
- this.expectedCount = expectedCount;
- if (expectedCount <= 0) {
- latch = null;
- } else {
- latch = new CountDownLatch(expectedCount);
- }
+ setExpectedMessageCount(expectedCount);
}
/**
@@ -266,12 +267,7 @@
* expected by this endpoint
*/
public void expectedMinimumMessageCount(int expectedCount) {
- this.expectedMinimumCount = expectedCount;
- if (expectedCount <= 0) {
- latch = null;
- } else {
- latch = new CountDownLatch(expectedMinimumCount);
- }
+ setMinimumExpectedMessageCount(expectedCount);
}
/**
@@ -511,22 +507,63 @@
this.sleepForEmptyTest = sleepForEmptyTest;
}
- public long getDefaulResultWaitMillis() {
- return defaulResultWaitMillis;
+ public long getResultWaitTime() {
+ return resultWaitTime;
}
/**
* Sets the maximum amount of time the {@link #assertIsSatisfied()} will
* wait on a latch until it is satisfied
*/
- public void setDefaulResultWaitMillis(long defaulResultWaitMillis) {
- this.defaulResultWaitMillis = defaulResultWaitMillis;
+ public void setResultWaitTime(long resultWaitTime) {
+ this.resultWaitTime = resultWaitTime;
}
- public void reset() {
- init();
+ /**
+ * Specifies the expected number of message exchanges that should be
+ * received by this endpoint
+ *
+ * @param expectedCount the number of message exchanges that should be
+ * expected by this endpoint
+ */
+ public void setExpectedMessageCount(int expectedCount) {
+ this.expectedCount = expectedCount;
+ if (expectedCount <= 0) {
+ latch = null;
+ } else {
+ latch = new CountDownLatch(expectedCount);
+ }
+ }
+
+ /**
+ * Specifies the minimum number of expected message exchanges that should be
+ * received by this endpoint
+ *
+ * @param expectedCount the number of message exchanges that should be
+ * expected by this endpoint
+ */
+ public void setMinimumExpectedMessageCount(int expectedCount) {
+ this.expectedMinimumCount = expectedCount;
+ if (expectedCount <= 0) {
+ latch = null;
+ } else {
+ latch = new CountDownLatch(expectedMinimumCount);
+ }
+ }
+
+ public Processor getReporter() {
+ return reporter;
+ }
+
+ /**
+ * Allows a processor to added to the endpoint to report on progress of the test
+ */
+ public void setReporter(Processor reporter) {
+ this.reporter = reporter;
}
+ // Implementation methods
+ // -------------------------------------------------------------------------
private void init() {
expectedCount = -1;
counter = 0;
@@ -536,18 +573,19 @@
tests = new CopyOnWriteArrayList<Runnable>();
latch = null;
sleepForEmptyTest = 1000L;
- defaulResultWaitMillis = 20000L;
+ resultWaitTime = 20000L;
expectedMinimumCount = -1;
expectedBodyValues = null;
actualBodyValues = new ArrayList();
}
- // Implementation methods
- // -------------------------------------------------------------------------
protected synchronized void onExchange(Exchange exchange) {
try {
- performAssertions(exchange);
+ if (reporter != null) {
+ reporter.process(exchange);
+ }
+ performAssertions(exchange);
} catch (Throwable e) {
failures.add(e);
}
@@ -593,8 +631,8 @@
}
// now lets wait for the results
- LOG.debug("Waiting on the latch for: " + defaulResultWaitMillis + " millis");
- latch.await(defaulResultWaitMillis, TimeUnit.MILLISECONDS);
+ LOG.debug("Waiting on the latch for: " + resultWaitTime + " millis");
+ latch.await(resultWaitTime, TimeUnit.MILLISECONDS);
}
protected void assertEquals(String message, Object expectedValue, Object actualValue) {
Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThroughputLogger.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThroughputLogger.java?rev=643934&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThroughputLogger.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThroughputLogger.java Wed Apr 2 08:15:07 2008
@@ -0,0 +1,112 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.text.MessageFormat;
+import java.text.NumberFormat;
+
+import org.apache.camel.Exchange;
+import org.apache.commons.logging.Log;
+
+/**
+ * @version $Revision: 1.1 $
+ */
+public class ThroughputLogger extends Logger {
+ private int groupSize = 100;
+ private long startTime;
+ private AtomicInteger receivedCounter = new AtomicInteger();
+ private NumberFormat numberFormat = NumberFormat.getNumberInstance();
+
+ public ThroughputLogger() {
+ }
+
+ public ThroughputLogger(Log log) {
+ super(log);
+ }
+
+ public ThroughputLogger(Log log, LoggingLevel level) {
+ super(log, level);
+ }
+
+ public ThroughputLogger(String logName) {
+ super(logName);
+ }
+
+ public ThroughputLogger(String logName, LoggingLevel level) {
+ super(logName, level);
+ }
+
+ public ThroughputLogger(String logName, LoggingLevel level, int groupSize) {
+ super(logName, level);
+ setGroupSize(groupSize);
+ }
+
+ public ThroughputLogger(String logName, int groupSize) {
+ super(logName);
+ setGroupSize(groupSize);
+ }
+
+ public ThroughputLogger(int groupSize) {
+ setGroupSize(groupSize);
+ }
+
+ @Override
+ public void process(Exchange exchange) {
+ if (startTime == 0) {
+ startTime = System.currentTimeMillis();
+ }
+ int receivedCount = receivedCounter.incrementAndGet();
+ if (receivedCount % groupSize == 0) {
+ super.process(exchange);
+ }
+ }
+
+ public int getGroupSize() {
+ return groupSize;
+ }
+
+ public void setGroupSize(int groupSize) {
+ if (groupSize == 0) {
+ throw new IllegalArgumentException("groupSize cannot be zero!");
+ }
+ this.groupSize = groupSize;
+ }
+
+ public NumberFormat getNumberFormat() {
+ return numberFormat;
+ }
+
+ public void setNumberFormat(NumberFormat numberFormat) {
+ this.numberFormat = numberFormat;
+ }
+
+ @Override
+ protected Object logMessage(Exchange exchange) {
+ long time = System.currentTimeMillis();
+ long elapsed = time - startTime;
+ startTime = time;
+
+ // timeOneMessage = time / group
+ // messagePerSend = 1000 / timeOneMessage
+ double rate = groupSize * 1000.0;
+ rate /= elapsed;
+
+ return "Received: " + receivedCounter.get() + " messages so far. Last group took: " + elapsed + " millis which is: " + numberFormat.format(rate) + " messages per second";
+ }
+}
Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThroughputLogger.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileAsyncRouteTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileAsyncRouteTest.java?rev=643934&r1=643933&r2=643934&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileAsyncRouteTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileAsyncRouteTest.java Wed Apr 2 08:15:07 2008
@@ -54,7 +54,7 @@
public void testFileRoute() throws Exception {
MockEndpoint result = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
result.expectedBodiesReceived(expectedBody);
- result.setDefaulResultWaitMillis(5000);
+ result.setResultWaitTime(5000);
template.sendBodyAndHeader(uri, expectedBody, "cheese", 123);
Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerProducerRouteTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerProducerRouteTest.java?rev=643934&r1=643933&r2=643934&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerProducerRouteTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerProducerRouteTest.java Wed Apr 2 08:15:07 2008
@@ -27,7 +27,7 @@
public void testFileRoute() throws Exception {
MockEndpoint result = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
result.expectedMessageCount(2);
- result.setDefaulResultWaitMillis(10000);
+ result.setResultWaitTime(10000);
result.assertIsSatisfied();
}
Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileDeleteRouteTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileDeleteRouteTest.java?rev=643934&r1=643933&r2=643934&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileDeleteRouteTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileDeleteRouteTest.java Wed Apr 2 08:15:07 2008
@@ -33,7 +33,7 @@
public void testFileRoute() throws Exception {
MockEndpoint result = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
result.expectedBodiesReceived(expectedBody);
- result.setDefaulResultWaitMillis(5000);
+ result.setResultWaitTime(5000);
template.sendBodyAndHeader(uri, expectedBody, "cheese", 123);
Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileRouteGenerateEmptyExchangeWhenIdleTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileRouteGenerateEmptyExchangeWhenIdleTest.java?rev=643934&r1=643933&r2=643934&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileRouteGenerateEmptyExchangeWhenIdleTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileRouteGenerateEmptyExchangeWhenIdleTest.java Wed Apr 2 08:15:07 2008
@@ -31,7 +31,7 @@
public void testFileRoute() throws Exception {
MockEndpoint result = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
result.expectedMinimumMessageCount(2);
- result.setDefaulResultWaitMillis(5000);
+ result.setResultWaitTime(5000);
template.sendBodyAndHeader(uri, expectedBody, "cheese", 123);
Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileRouteTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileRouteTest.java?rev=643934&r1=643933&r2=643934&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileRouteTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileRouteTest.java Wed Apr 2 08:15:07 2008
@@ -30,7 +30,7 @@
public void testFileRoute() throws Exception {
MockEndpoint result = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
result.expectedBodiesReceived(expectedBody);
- result.setDefaulResultWaitMillis(5000);
+ result.setResultWaitTime(5000);
template.sendBodyAndHeader(uri, expectedBody, "cheese", 123);
Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java?rev=643934&r1=643933&r2=643934&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java Wed Apr 2 08:15:07 2008
@@ -29,7 +29,7 @@
public void testSendLotsOfMessagesButOnly3GetThrough() throws Exception {
MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
resultEndpoint.expectedMessageCount(3);
- resultEndpoint.setDefaulResultWaitMillis(1000);
+ resultEndpoint.setResultWaitTime(1000);
for (int i = 0; i < messageCount; i++) {
template.sendBody("seda:a", "<message>" + i + "</message>");
Modified: activemq/camel/trunk/components/camel-bam/src/test/java/org/apache/camel/bam/BamRouteTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-bam/src/test/java/org/apache/camel/bam/BamRouteTest.java?rev=643934&r1=643933&r2=643934&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-bam/src/test/java/org/apache/camel/bam/BamRouteTest.java (original)
+++ activemq/camel/trunk/components/camel-bam/src/test/java/org/apache/camel/bam/BamRouteTest.java Wed Apr 2 08:15:07 2008
@@ -59,7 +59,7 @@
super.setUp();
overdueEndpoint = getMockEndpoint("mock:overdue");
- overdueEndpoint.setDefaulResultWaitMillis(8000);
+ overdueEndpoint.setResultWaitTime(8000);
}
protected RouteBuilder createRouteBuilder() throws Exception {