You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2012/11/25 19:43:57 UTC

svn commit: r1413385 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/api/management/mbean/ main/java/org/apache/camel/component/log/ main/java/org/apache/camel/management/ main/java/org/apache/camel/management/mbean/ main/java/org/apache/c...

Author: davsclaus
Date: Sun Nov 25 18:43:55 2012
New Revision: 1413385

URL: http://svn.apache.org/viewvc?rev=1413385&view=rev
Log:
CAMEL-5822: Added JMX operations to throughput logger.

Added:
    camel/trunk/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedThroughputLoggerMBean.java
      - copied, changed from r1413338, camel/trunk/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedThrottlerMBean.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThroughputLogger.java   (with props)
    camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedLogEndpointTest.java
      - copied, changed from r1413338, camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedLoadBalancerTest.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/log/LogEndpoint.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/log/LogProducer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThroughputLogger.java

Copied: camel/trunk/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedThroughputLoggerMBean.java (from r1413338, camel/trunk/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedThrottlerMBean.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedThroughputLoggerMBean.java?p2=camel/trunk/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedThroughputLoggerMBean.java&p1=camel/trunk/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedThrottlerMBean.java&r1=1413338&r2=1413385&rev=1413385&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedThrottlerMBean.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedThroughputLoggerMBean.java Sun Nov 25 18:43:55 2012
@@ -17,19 +17,23 @@
 package org.apache.camel.api.management.mbean;
 
 import org.apache.camel.api.management.ManagedAttribute;
+import org.apache.camel.api.management.ManagedOperation;
 
-public interface ManagedThrottlerMBean extends ManagedProcessorMBean {
+public interface ManagedThroughputLoggerMBean extends ManagedProcessorMBean {
 
-    @ManagedAttribute(description = "Maximum requires per period")
-    long getMaximumRequestsPerPeriod();
+    @ManagedAttribute(description = "The received number of messages")
+    int getReceivedCounter();
 
-    @ManagedAttribute(description = "Maximum requires per period")
-    void setMaximumRequestsPerPeriod(long maximumRequestsPerPeriod);
+    @ManagedAttribute(description = "The average throughput")
+    double getAverage();
 
-    @ManagedAttribute(description = "Time period in millis")
-    long getTimePeriodMillis();
+    @ManagedAttribute(description = "The throughput rate")
+    double getRate();
 
-    @ManagedAttribute(description = "Time period in millis")
-    void setTimePeriodMillis(long timePeriodMillis);
+    @ManagedAttribute(description = "The last log message")
+    String getLastLogMessage();
+
+    @ManagedOperation(description = "Resets the throughput logger statistics")
+    void resetThroughputLogger();
 
 }
\ No newline at end of file

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/log/LogEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/log/LogEndpoint.java?rev=1413385&r1=1413384&r2=1413385&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/log/LogEndpoint.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/log/LogEndpoint.java Sun Nov 25 18:43:55 2012
@@ -57,6 +57,10 @@ public class LogEndpoint extends Process
         setProcessor(this.logger);
     }
 
+    public Processor getLogger() {
+        return logger;
+    }
+
     @Override
     public Producer createProducer() throws Exception {
         return new LogProducer(this, this.logger);

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/log/LogProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/log/LogProducer.java?rev=1413385&r1=1413384&r2=1413385&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/log/LogProducer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/log/LogProducer.java Sun Nov 25 18:43:55 2012
@@ -44,4 +44,8 @@ public class LogProducer extends Default
         }
         return true;
     }
+
+    public Processor getLogger() {
+        return logger;
+    }
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java?rev=1413385&r1=1413384&r2=1413385&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java Sun Nov 25 18:43:55 2012
@@ -29,6 +29,7 @@ import org.apache.camel.Producer;
 import org.apache.camel.Route;
 import org.apache.camel.Service;
 import org.apache.camel.component.bean.BeanProcessor;
+import org.apache.camel.component.log.LogEndpoint;
 import org.apache.camel.impl.ScheduledPollConsumer;
 import org.apache.camel.management.mbean.ManagedBeanProcessor;
 import org.apache.camel.management.mbean.ManagedBrowsableEndpoint;
@@ -49,12 +50,14 @@ import org.apache.camel.management.mbean
 import org.apache.camel.management.mbean.ManagedSuspendableRoute;
 import org.apache.camel.management.mbean.ManagedThreadPool;
 import org.apache.camel.management.mbean.ManagedThrottler;
+import org.apache.camel.management.mbean.ManagedThroughputLogger;
 import org.apache.camel.model.ModelCamelContext;
 import org.apache.camel.model.ProcessorDefinition;
 import org.apache.camel.processor.Delayer;
 import org.apache.camel.processor.ErrorHandler;
 import org.apache.camel.processor.SendProcessor;
 import org.apache.camel.processor.Throttler;
+import org.apache.camel.processor.ThroughputLogger;
 import org.apache.camel.processor.idempotent.IdempotentConsumer;
 import org.apache.camel.spi.BrowsableEndpoint;
 import org.apache.camel.spi.EventNotifier;
@@ -177,7 +180,19 @@ public class DefaultManagementObjectStra
             } else if (target instanceof Throttler) {
                 answer = new ManagedThrottler(context, (Throttler) target, definition);
             } else if (target instanceof SendProcessor) {
-                answer = new ManagedSendProcessor(context, (SendProcessor) target, definition);
+                SendProcessor sp = (SendProcessor) target;
+                // special for sending to throughput logger
+                if (sp.getDestination() instanceof LogEndpoint) {
+                    LogEndpoint le = (LogEndpoint) sp.getDestination();
+                    if (le.getLogger() instanceof ThroughputLogger) {
+                        ThroughputLogger tl = (ThroughputLogger) le.getLogger();
+                        answer = new ManagedThroughputLogger(context, tl, definition);
+                    }
+                }
+                // regular send processor
+                if (answer == null) {
+                    answer = new ManagedSendProcessor(context, (SendProcessor) target, definition);
+                }
             } else if (target instanceof BeanProcessor) {
                 answer = new ManagedBeanProcessor(context, (BeanProcessor) target, definition);
             } else if (target instanceof IdempotentConsumer) {

Added: camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThroughputLogger.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThroughputLogger.java?rev=1413385&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThroughputLogger.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThroughputLogger.java Sun Nov 25 18:43:55 2012
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.management.mbean;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.api.management.ManagedResource;
+import org.apache.camel.api.management.mbean.ManagedThroughputLoggerMBean;
+import org.apache.camel.model.ProcessorDefinition;
+import org.apache.camel.processor.ThroughputLogger;
+
+/**
+ *
+ */
+@ManagedResource(description = "Managed ThroughputLogger")
+public class ManagedThroughputLogger extends ManagedProcessor implements ManagedThroughputLoggerMBean {
+
+    private final ThroughputLogger logger;
+
+    public ManagedThroughputLogger(CamelContext context, ThroughputLogger logger, ProcessorDefinition<?> definition) {
+        super(context, logger, definition);
+        this.logger = logger;
+    }
+
+    public ThroughputLogger getLogger() {
+        return logger;
+    }
+
+    @Override
+    public synchronized void reset() {
+        super.reset();
+        logger.reset();
+    }
+
+    public int getReceivedCounter() {
+        return logger.getReceivedCounter();
+    }
+
+    public double getAverage() {
+        return logger.getAverage();
+    }
+
+    public double getRate() {
+        return logger.getRate();
+    }
+
+    public String getLastLogMessage() {
+        return logger.getLastLogMessage();
+    }
+
+    public void resetThroughputLogger() {
+        logger.reset();
+    }
+}

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThroughputLogger.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThroughputLogger.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThroughputLogger.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThroughputLogger.java?rev=1413385&r1=1413384&r2=1413385&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThroughputLogger.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThroughputLogger.java Sun Nov 25 18:43:55 2012
@@ -51,6 +51,9 @@ public class ThroughputLogger extends Se
     private CamelContext camelContext;
     private ScheduledExecutorService logSchedulerService;
     private CamelLogger log;
+    private String lastLogMessage;
+    private double rate;
+    private double average;
 
     public ThroughputLogger(CamelLogger log) {
         this.log = log;
@@ -81,7 +84,8 @@ public class ThroughputLogger extends Se
         //only process if groupSize is set...otherwise we're in groupInterval mode
         if (groupSize != null) {
             if (receivedCount % groupSize == 0) {
-                log.log(createLogMessage(exchange, receivedCount));
+                lastLogMessage = createLogMessage(exchange, receivedCount);
+                log.log(lastLogMessage);
             }
         }
     }
@@ -139,7 +143,33 @@ public class ThroughputLogger extends Se
     public void setAction(String action) {
         this.action = action;
     }
-    
+
+    public void reset() {
+        startTime = 0;
+        receivedCounter.set(0);
+        groupStartTime = 0;
+        groupReceivedCount = 0;
+        average = 0.0d;
+        rate = 0.0d;
+        lastLogMessage = null;
+    }
+
+    public double getRate() {
+        return rate;
+    }
+
+    public double getAverage() {
+        return average;
+    }
+
+    public int getReceivedCounter() {
+        return receivedCounter.get();
+    }
+
+    public String getLastLogMessage() {
+        return lastLogMessage;
+    }
+
     @Override
     public void doStart() throws Exception {
         // if an interval was specified, create a background thread
@@ -168,8 +198,8 @@ public class ThroughputLogger extends Se
             groupStartTime = startTime;
         }
 
-        double rate = messagesPerSecond(groupSize, groupStartTime, time);
-        double average = messagesPerSecond(receivedCount, startTime, time);
+        rate = messagesPerSecond(groupSize, groupStartTime, time);
+        average = messagesPerSecond(receivedCount, startTime, time);
 
         long duration = time - groupStartTime;
         groupStartTime = time;
@@ -216,16 +246,16 @@ public class ThroughputLogger extends Se
 
         long duration = time - groupStartTime;
         long currentCount = receivedCount - groupReceivedCount;
-        double rate = messagesPerSecond(currentCount, groupStartTime, time);
-        double average = messagesPerSecond(receivedCount, startTime, time);
+        rate = messagesPerSecond(currentCount, groupStartTime, time);
+        average = messagesPerSecond(receivedCount, startTime, time);
 
         groupStartTime = time;
         groupReceivedCount = receivedCount;
 
-        String message = getAction() + ": " + currentCount + " new messages, with total " + receivedCount + " so far. Last group took: " + duration
+        lastLogMessage = getAction() + ": " + currentCount + " new messages, with total " + receivedCount + " so far. Last group took: " + duration
                 + " millis which is: " + numberFormat.format(rate)
                 + " messages per second. average: " + numberFormat.format(average);
-        log.log(message);
+        log.log(lastLogMessage);
     }
 
     protected double messagesPerSecond(long messageCount, long startTime, long endTime) {

Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedLogEndpointTest.java (from r1413338, camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedLoadBalancerTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedLogEndpointTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedLogEndpointTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedLoadBalancerTest.java&r1=1413338&r2=1413385&rev=1413385&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedLoadBalancerTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedLogEndpointTest.java Sun Nov 25 18:43:55 2012
@@ -24,33 +24,60 @@ import org.apache.camel.builder.RouteBui
 /**
  * @version 
  */
-public class ManagedLoadBalancerTest extends ManagementTestSupport {
+public class ManagedLogEndpointTest extends ManagementTestSupport {
 
-    public void testLoadBalancer() throws Exception {
-        getMockEndpoint("mock:a").expectedBodiesReceived("Hello World", "Hi World");
-        getMockEndpoint("mock:b").expectedBodiesReceived("Bye World");
-
-        template.sendBody("direct:start", "Hello World");
-        template.sendBody("direct:start", "Bye World");
-        template.sendBody("direct:start", "Hi World");
+    public void testLogEndpoint() throws Exception {
+        getMockEndpoint("mock:a").expectedMessageCount(10);
+
+        for (int i = 0; i < 10; i++) {
+            template.sendBody("direct:start", "" + i);
+            Thread.sleep(100);
+        }
 
         assertMockEndpointsSatisfied();
 
         MBeanServer mbeanServer = getMBeanServer();
 
-        ObjectName name = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=endpoints,name=\"mock://a\"");
-        Long queueSize = (Long) mbeanServer.invoke(name, "queueSize", null, null);
-        assertEquals(2, queueSize.intValue());
-
-        name = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=endpoints,name=\"mock://b\"");
-        queueSize = (Long) mbeanServer.invoke(name, "queueSize", null, null);
-        assertEquals(1, queueSize.intValue());
-
-        name = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=processors,name=\"myBalancer\"");
+        ObjectName name = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=processors,name=\"log-foo\"");
         mbeanServer.isRegistered(name);
         
         Long total = (Long) mbeanServer.getAttribute(name, "ExchangesTotal");
-        assertEquals(3, total.intValue());
+        assertEquals(10, total.intValue());
+
+        Integer received = (Integer) mbeanServer.getAttribute(name, "ReceivedCounter");
+        assertEquals(10, received.intValue());
+
+        String last = (String) mbeanServer.getAttribute(name, "LastLogMessage");
+        assertNotNull(last);
+        assertTrue(last.startsWith("Received: 10 messages so far."));
+
+        Double rate = (Double) mbeanServer.getAttribute(name, "Rate");
+        assertNotNull(rate);
+        assertTrue(rate > 0);
+
+        Double average = (Double) mbeanServer.getAttribute(name, "Average");
+        assertNotNull(average);
+        assertTrue(average > 0);
+
+        // reset
+        mbeanServer.invoke(name, "resetThroughputLogger", null, null);
+
+        // total not reset
+        total = (Long) mbeanServer.getAttribute(name, "ExchangesTotal");
+        assertEquals(10, total.intValue());
+
+        // but the last log message is
+        last = (String) mbeanServer.getAttribute(name, "LastLogMessage");
+        assertNull(last);
+
+        received = (Integer) mbeanServer.getAttribute(name, "ReceivedCounter");
+        assertEquals(0, received.intValue());
+
+        rate = (Double) mbeanServer.getAttribute(name, "Rate");
+        assertEquals(0.0d, rate);
+
+        average = (Double) mbeanServer.getAttribute(name, "Average");
+        assertEquals(0.0d, average);
     }
 
     @Override
@@ -59,8 +86,8 @@ public class ManagedLoadBalancerTest ext
             @Override
             public void configure() throws Exception {
                 from("direct:start").routeId("foo")
-                    .loadBalance().id("myBalancer").roundRobin()
-                        .to("mock:a").to("mock:b");
+                    .to("log:foo?groupSize=10").id("log-foo")
+                    .to("mock:a");
             }
         };
     }