You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by js...@apache.org on 2008/04/02 17:15:09 UTC

svn commit: r643934 - in /activemq/camel/trunk: camel-core/src/main/java/org/apache/camel/component/dataset/ camel-core/src/main/java/org/apache/camel/component/log/ camel-core/src/main/java/org/apache/camel/component/mock/ camel-core/src/main/java/org...

Author: jstrachan
Date: Wed Apr  2 08:15:07 2008
New Revision: 643934

URL: http://svn.apache.org/viewvc?rev=643934&view=rev
Log:
added patch for http://issues.apache.org/activemq/browse/CAMEL-429

Added:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThroughputLogger.java   (with props)
Modified:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/log/LogComponent.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockComponent.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileAsyncRouteTest.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerProducerRouteTest.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileDeleteRouteTest.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileRouteGenerateEmptyExchangeWhenIdleTest.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileRouteTest.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java
    activemq/camel/trunk/components/camel-bam/src/test/java/org/apache/camel/bam/BamRouteTest.java

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java?rev=643934&r1=643933&r2=643934&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java Wed Apr  2 08:15:07 2008
@@ -93,14 +93,13 @@
         // TODO lets do a much better version of this!
         long size = getDataSet().getSize();
         size *= 4000;
-        setDefaulResultWaitMillis(size);
+        setResultWaitTime(size);
         super.waitForCompleteLatch();
     }
 
     // Properties
     //-------------------------------------------------------------------------
 
-
     public DataSet getDataSet() {
         return dataSet;
     }
@@ -131,12 +130,9 @@
         this.produceDelay = produceDelay;
     }
 
-
     // Implementation methods
     //-------------------------------------------------------------------------
 
-
-
     @Override
     protected void performAssertions(Exchange actual) throws Exception {
         if (startTime == 0) {
@@ -147,7 +143,9 @@
         Exchange expected = createExchange(index);
 
         // now lets assert that they are the same
-        LOG.debug("Received message: " + index + " = " + actual);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Received message: " + index + " = " + actual);
+        }
 
         assertMessageExpected(index, expected, actual);
 

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/log/LogComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/log/LogComponent.java?rev=643934&r1=643933&r2=643934&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/log/LogComponent.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/log/LogComponent.java Wed Apr  2 08:15:07 2008
@@ -20,10 +20,12 @@
 
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
+import org.apache.camel.converter.ObjectConverter;
 import org.apache.camel.impl.DefaultComponent;
 import org.apache.camel.impl.ProcessorEndpoint;
 import org.apache.camel.processor.Logger;
 import org.apache.camel.processor.LoggingLevel;
+import org.apache.camel.processor.ThroughputLogger;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -35,7 +37,15 @@
 
     protected Endpoint<Exchange> createEndpoint(String uri, String remaining, Map parameters) throws Exception {
         LoggingLevel level = getLoggingLevel(parameters);
-        Logger logger = new Logger(remaining, level);
+        Object value = parameters.remove("groupSize");
+
+        Logger logger;
+        if (value != null) {
+            logger = new ThroughputLogger(remaining, level, ObjectConverter.toInteger(value));
+        }
+        else {
+            logger = new Logger(remaining, level);
+        }
 
         return new ProcessorEndpoint(uri, this, logger);
     }

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockComponent.java?rev=643934&r1=643933&r2=643934&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockComponent.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockComponent.java Wed Apr  2 08:15:07 2008
@@ -20,6 +20,9 @@
 
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.processor.ThroughputLogger;
+import org.apache.camel.converter.ObjectConverter;
 import org.apache.camel.impl.DefaultComponent;
 
 /**
@@ -31,6 +34,12 @@
 
     @Override
     protected Endpoint<Exchange> createEndpoint(String uri, String remaining, Map parameters) throws Exception {
-        return new MockEndpoint(uri, this);
+        MockEndpoint endpoint = new MockEndpoint(uri, this);
+        Integer value = ObjectConverter.toInteger(parameters.remove("reportGroup"));
+        if (value != null) {
+            Processor reporter = new ThroughputLogger("org.apache.camel.mock:" + remaining, value);
+            endpoint.setReporter(reporter);
+        }
+        return endpoint;
     }
 }

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java?rev=643934&r1=643933&r2=643934&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java Wed Apr  2 08:15:07 2008
@@ -61,7 +61,7 @@
     private List<Runnable> tests;
     private CountDownLatch latch;
     private long sleepForEmptyTest;
-    private long defaulResultWaitMillis;
+    private long resultWaitTime;
     private int expectedMinimumCount;
     private List expectedBodyValues;
     private List actualBodyValues;
@@ -69,6 +69,7 @@
     private String headerName;
     private String headerValue;
     private Object actualHeader;
+    private Processor reporter;
 
     public MockEndpoint(String endpointUri, Component component) {
         super(endpointUri, component);
@@ -152,6 +153,11 @@
         };
     }
 
+    public void reset() {
+        init();
+    }
+
+
     // Testing API
     // -------------------------------------------------------------------------
 
@@ -250,12 +256,7 @@
      *                expected by this endpoint
      */
     public void expectedMessageCount(int expectedCount) {
-        this.expectedCount = expectedCount;
-        if (expectedCount <= 0) {
-            latch = null;
-        } else {
-            latch = new CountDownLatch(expectedCount);
-        }
+        setExpectedMessageCount(expectedCount);
     }
 
     /**
@@ -266,12 +267,7 @@
      *                expected by this endpoint
      */
     public void expectedMinimumMessageCount(int expectedCount) {
-        this.expectedMinimumCount = expectedCount;
-        if (expectedCount <= 0) {
-            latch = null;
-        } else {
-            latch = new CountDownLatch(expectedMinimumCount);
-        }
+        setMinimumExpectedMessageCount(expectedCount);
     }
 
     /**
@@ -511,22 +507,63 @@
         this.sleepForEmptyTest = sleepForEmptyTest;
     }
 
-    public long getDefaulResultWaitMillis() {
-        return defaulResultWaitMillis;
+    public long getResultWaitTime() {
+        return resultWaitTime;
     }
 
     /**
      * Sets the maximum amount of time the {@link #assertIsSatisfied()} will
      * wait on a latch until it is satisfied
      */
-    public void setDefaulResultWaitMillis(long defaulResultWaitMillis) {
-        this.defaulResultWaitMillis = defaulResultWaitMillis;
+    public void setResultWaitTime(long resultWaitTime) {
+        this.resultWaitTime = resultWaitTime;
     }
 
-    public void reset() {
-        init();
+    /**
+     * Specifies the expected number of message exchanges that should be
+     * received by this endpoint
+     *
+     * @param expectedCount the number of message exchanges that should be
+     *                expected by this endpoint
+     */
+    public void setExpectedMessageCount(int expectedCount) {
+        this.expectedCount = expectedCount;
+        if (expectedCount <= 0) {
+            latch = null;
+        } else {
+            latch = new CountDownLatch(expectedCount);
+        }
+    }
+
+    /**
+     * Specifies the minimum number of expected message exchanges that should be
+     * received by this endpoint
+     *
+     * @param expectedCount the number of message exchanges that should be
+     *                expected by this endpoint
+     */
+    public void setMinimumExpectedMessageCount(int expectedCount) {
+        this.expectedMinimumCount = expectedCount;
+        if (expectedCount <= 0) {
+            latch = null;
+        } else {
+            latch = new CountDownLatch(expectedMinimumCount);
+        }
+    }
+
+    public Processor getReporter() {
+        return reporter;
+    }
+
+    /**
+     * Allows a processor to added to the endpoint to report on progress of the test
+     */
+    public void setReporter(Processor reporter) {
+        this.reporter = reporter;
     }
 
+    // Implementation methods
+    // -------------------------------------------------------------------------
     private void init() {
         expectedCount = -1;
         counter = 0;
@@ -536,18 +573,19 @@
         tests = new CopyOnWriteArrayList<Runnable>();
         latch = null;
         sleepForEmptyTest = 1000L;
-        defaulResultWaitMillis = 20000L;
+        resultWaitTime = 20000L;
         expectedMinimumCount = -1;
         expectedBodyValues = null;
         actualBodyValues = new ArrayList();
     }
 
-    // Implementation methods
-    // -------------------------------------------------------------------------
     protected synchronized void onExchange(Exchange exchange) {
         try {
-            performAssertions(exchange);
+            if (reporter != null) {
+                reporter.process(exchange);
+            }
 
+            performAssertions(exchange);
         } catch (Throwable e) {
             failures.add(e);
         }
@@ -593,8 +631,8 @@
         }
 
         // now lets wait for the results
-        LOG.debug("Waiting on the latch for: " + defaulResultWaitMillis + " millis");
-        latch.await(defaulResultWaitMillis, TimeUnit.MILLISECONDS);
+        LOG.debug("Waiting on the latch for: " + resultWaitTime + " millis");
+        latch.await(resultWaitTime, TimeUnit.MILLISECONDS);
     }
 
     protected void assertEquals(String message, Object expectedValue, Object actualValue) {

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThroughputLogger.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThroughputLogger.java?rev=643934&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThroughputLogger.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThroughputLogger.java Wed Apr  2 08:15:07 2008
@@ -0,0 +1,112 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.text.MessageFormat;
+import java.text.NumberFormat;
+
+import org.apache.camel.Exchange;
+import org.apache.commons.logging.Log;
+
+/**
+ * @version $Revision: 1.1 $
+ */
+public class ThroughputLogger extends Logger {
+    private int groupSize = 100;
+    private long startTime;
+    private AtomicInteger receivedCounter = new AtomicInteger();
+    private NumberFormat numberFormat = NumberFormat.getNumberInstance();
+
+    public ThroughputLogger() {
+    }
+
+    public ThroughputLogger(Log log) {
+        super(log);
+    }
+
+    public ThroughputLogger(Log log, LoggingLevel level) {
+        super(log, level);
+    }
+
+    public ThroughputLogger(String logName) {
+        super(logName);
+    }
+
+    public ThroughputLogger(String logName, LoggingLevel level) {
+        super(logName, level);
+    }
+
+    public ThroughputLogger(String logName, LoggingLevel level, int groupSize) {
+        super(logName, level);
+        setGroupSize(groupSize);
+    }
+
+    public ThroughputLogger(String logName, int groupSize) {
+        super(logName);
+        setGroupSize(groupSize);
+    }
+
+    public ThroughputLogger(int groupSize) {
+        setGroupSize(groupSize);
+    }
+
+    @Override
+    public void process(Exchange exchange) {
+        if (startTime == 0) {
+            startTime = System.currentTimeMillis();
+        }
+        int receivedCount = receivedCounter.incrementAndGet();
+        if (receivedCount % groupSize == 0) {
+            super.process(exchange);
+        }
+    }
+
+    public int getGroupSize() {
+        return groupSize;
+    }
+
+    public void setGroupSize(int groupSize) {
+        if (groupSize == 0) {
+            throw new IllegalArgumentException("groupSize cannot be zero!");
+        }
+        this.groupSize = groupSize;
+    }
+
+    public NumberFormat getNumberFormat() {
+        return numberFormat;
+    }
+
+    public void setNumberFormat(NumberFormat numberFormat) {
+        this.numberFormat = numberFormat;
+    }
+
+    @Override
+    protected Object logMessage(Exchange exchange) {
+        long time = System.currentTimeMillis();
+        long elapsed = time - startTime;
+        startTime = time;
+
+        // timeOneMessage = time / group
+        // messagePerSend = 1000 / timeOneMessage
+        double rate = groupSize * 1000.0;
+        rate /= elapsed;
+
+        return "Received: " + receivedCounter.get() + " messages so far. Last group took: " + elapsed + " millis which is: " + numberFormat.format(rate) + " messages per second";
+    }
+}

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThroughputLogger.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileAsyncRouteTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileAsyncRouteTest.java?rev=643934&r1=643933&r2=643934&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileAsyncRouteTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileAsyncRouteTest.java Wed Apr  2 08:15:07 2008
@@ -54,7 +54,7 @@
     public void testFileRoute() throws Exception {
         MockEndpoint result = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
         result.expectedBodiesReceived(expectedBody);
-        result.setDefaulResultWaitMillis(5000);
+        result.setResultWaitTime(5000);
 
         template.sendBodyAndHeader(uri, expectedBody, "cheese", 123);
 

Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerProducerRouteTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerProducerRouteTest.java?rev=643934&r1=643933&r2=643934&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerProducerRouteTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerProducerRouteTest.java Wed Apr  2 08:15:07 2008
@@ -27,7 +27,7 @@
     public void testFileRoute() throws Exception {
         MockEndpoint result = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
         result.expectedMessageCount(2);
-        result.setDefaulResultWaitMillis(10000);
+        result.setResultWaitTime(10000);
 
         result.assertIsSatisfied();
     }

Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileDeleteRouteTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileDeleteRouteTest.java?rev=643934&r1=643933&r2=643934&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileDeleteRouteTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileDeleteRouteTest.java Wed Apr  2 08:15:07 2008
@@ -33,7 +33,7 @@
     public void testFileRoute() throws Exception {
         MockEndpoint result = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
         result.expectedBodiesReceived(expectedBody);
-        result.setDefaulResultWaitMillis(5000);
+        result.setResultWaitTime(5000);
 
         template.sendBodyAndHeader(uri, expectedBody, "cheese", 123);
 

Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileRouteGenerateEmptyExchangeWhenIdleTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileRouteGenerateEmptyExchangeWhenIdleTest.java?rev=643934&r1=643933&r2=643934&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileRouteGenerateEmptyExchangeWhenIdleTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileRouteGenerateEmptyExchangeWhenIdleTest.java Wed Apr  2 08:15:07 2008
@@ -31,7 +31,7 @@
     public void testFileRoute() throws Exception {
         MockEndpoint result = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
         result.expectedMinimumMessageCount(2);
-        result.setDefaulResultWaitMillis(5000);
+        result.setResultWaitTime(5000);
 
         template.sendBodyAndHeader(uri, expectedBody, "cheese", 123);
 

Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileRouteTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileRouteTest.java?rev=643934&r1=643933&r2=643934&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileRouteTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileRouteTest.java Wed Apr  2 08:15:07 2008
@@ -30,7 +30,7 @@
     public void testFileRoute() throws Exception {
         MockEndpoint result = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
         result.expectedBodiesReceived(expectedBody);
-        result.setDefaulResultWaitMillis(5000);
+        result.setResultWaitTime(5000);
 
         template.sendBodyAndHeader(uri, expectedBody, "cheese", 123);
 

Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java?rev=643934&r1=643933&r2=643934&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java Wed Apr  2 08:15:07 2008
@@ -29,7 +29,7 @@
     public void testSendLotsOfMessagesButOnly3GetThrough() throws Exception {
         MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
         resultEndpoint.expectedMessageCount(3);
-        resultEndpoint.setDefaulResultWaitMillis(1000);
+        resultEndpoint.setResultWaitTime(1000);
 
         for (int i = 0; i < messageCount; i++) {
             template.sendBody("seda:a", "<message>" + i + "</message>");

Modified: activemq/camel/trunk/components/camel-bam/src/test/java/org/apache/camel/bam/BamRouteTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-bam/src/test/java/org/apache/camel/bam/BamRouteTest.java?rev=643934&r1=643933&r2=643934&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-bam/src/test/java/org/apache/camel/bam/BamRouteTest.java (original)
+++ activemq/camel/trunk/components/camel-bam/src/test/java/org/apache/camel/bam/BamRouteTest.java Wed Apr  2 08:15:07 2008
@@ -59,7 +59,7 @@
         super.setUp();
 
         overdueEndpoint = getMockEndpoint("mock:overdue");
-        overdueEndpoint.setDefaulResultWaitMillis(8000);
+        overdueEndpoint.setResultWaitTime(8000);
     }
 
     protected RouteBuilder createRouteBuilder() throws Exception {