You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2012/11/25 19:43:57 UTC
svn commit: r1413385 - in /camel/trunk/camel-core/src:
main/java/org/apache/camel/api/management/mbean/
main/java/org/apache/camel/component/log/
main/java/org/apache/camel/management/
main/java/org/apache/camel/management/mbean/ main/java/org/apache/c...
Author: davsclaus
Date: Sun Nov 25 18:43:55 2012
New Revision: 1413385
URL: http://svn.apache.org/viewvc?rev=1413385&view=rev
Log:
CAMEL-5822: Added JMX operations to throughput logger.
Added:
camel/trunk/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedThroughputLoggerMBean.java
- copied, changed from r1413338, camel/trunk/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedThrottlerMBean.java
camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThroughputLogger.java (with props)
camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedLogEndpointTest.java
- copied, changed from r1413338, camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedLoadBalancerTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/component/log/LogEndpoint.java
camel/trunk/camel-core/src/main/java/org/apache/camel/component/log/LogProducer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThroughputLogger.java
Copied: camel/trunk/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedThroughputLoggerMBean.java (from r1413338, camel/trunk/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedThrottlerMBean.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedThroughputLoggerMBean.java?p2=camel/trunk/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedThroughputLoggerMBean.java&p1=camel/trunk/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedThrottlerMBean.java&r1=1413338&r2=1413385&rev=1413385&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedThrottlerMBean.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedThroughputLoggerMBean.java Sun Nov 25 18:43:55 2012
@@ -17,19 +17,23 @@
package org.apache.camel.api.management.mbean;
import org.apache.camel.api.management.ManagedAttribute;
+import org.apache.camel.api.management.ManagedOperation;
-public interface ManagedThrottlerMBean extends ManagedProcessorMBean {
+public interface ManagedThroughputLoggerMBean extends ManagedProcessorMBean {
- @ManagedAttribute(description = "Maximum requires per period")
- long getMaximumRequestsPerPeriod();
+ @ManagedAttribute(description = "The received number of messages")
+ int getReceivedCounter();
- @ManagedAttribute(description = "Maximum requires per period")
- void setMaximumRequestsPerPeriod(long maximumRequestsPerPeriod);
+ @ManagedAttribute(description = "The average throughput")
+ double getAverage();
- @ManagedAttribute(description = "Time period in millis")
- long getTimePeriodMillis();
+ @ManagedAttribute(description = "The throughput rate")
+ double getRate();
- @ManagedAttribute(description = "Time period in millis")
- void setTimePeriodMillis(long timePeriodMillis);
+ @ManagedAttribute(description = "The last log message")
+ String getLastLogMessage();
+
+ @ManagedOperation(description = "Resets the throughput logger statistics")
+ void resetThroughputLogger();
}
\ No newline at end of file
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/log/LogEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/log/LogEndpoint.java?rev=1413385&r1=1413384&r2=1413385&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/log/LogEndpoint.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/log/LogEndpoint.java Sun Nov 25 18:43:55 2012
@@ -57,6 +57,10 @@ public class LogEndpoint extends Process
setProcessor(this.logger);
}
+ public Processor getLogger() {
+ return logger;
+ }
+
@Override
public Producer createProducer() throws Exception {
return new LogProducer(this, this.logger);
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/log/LogProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/log/LogProducer.java?rev=1413385&r1=1413384&r2=1413385&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/log/LogProducer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/log/LogProducer.java Sun Nov 25 18:43:55 2012
@@ -44,4 +44,8 @@ public class LogProducer extends Default
}
return true;
}
+
+ public Processor getLogger() {
+ return logger;
+ }
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java?rev=1413385&r1=1413384&r2=1413385&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java Sun Nov 25 18:43:55 2012
@@ -29,6 +29,7 @@ import org.apache.camel.Producer;
import org.apache.camel.Route;
import org.apache.camel.Service;
import org.apache.camel.component.bean.BeanProcessor;
+import org.apache.camel.component.log.LogEndpoint;
import org.apache.camel.impl.ScheduledPollConsumer;
import org.apache.camel.management.mbean.ManagedBeanProcessor;
import org.apache.camel.management.mbean.ManagedBrowsableEndpoint;
@@ -49,12 +50,14 @@ import org.apache.camel.management.mbean
import org.apache.camel.management.mbean.ManagedSuspendableRoute;
import org.apache.camel.management.mbean.ManagedThreadPool;
import org.apache.camel.management.mbean.ManagedThrottler;
+import org.apache.camel.management.mbean.ManagedThroughputLogger;
import org.apache.camel.model.ModelCamelContext;
import org.apache.camel.model.ProcessorDefinition;
import org.apache.camel.processor.Delayer;
import org.apache.camel.processor.ErrorHandler;
import org.apache.camel.processor.SendProcessor;
import org.apache.camel.processor.Throttler;
+import org.apache.camel.processor.ThroughputLogger;
import org.apache.camel.processor.idempotent.IdempotentConsumer;
import org.apache.camel.spi.BrowsableEndpoint;
import org.apache.camel.spi.EventNotifier;
@@ -177,7 +180,19 @@ public class DefaultManagementObjectStra
} else if (target instanceof Throttler) {
answer = new ManagedThrottler(context, (Throttler) target, definition);
} else if (target instanceof SendProcessor) {
- answer = new ManagedSendProcessor(context, (SendProcessor) target, definition);
+ SendProcessor sp = (SendProcessor) target;
+ // special for sending to throughput logger
+ if (sp.getDestination() instanceof LogEndpoint) {
+ LogEndpoint le = (LogEndpoint) sp.getDestination();
+ if (le.getLogger() instanceof ThroughputLogger) {
+ ThroughputLogger tl = (ThroughputLogger) le.getLogger();
+ answer = new ManagedThroughputLogger(context, tl, definition);
+ }
+ }
+ // regular send processor
+ if (answer == null) {
+ answer = new ManagedSendProcessor(context, (SendProcessor) target, definition);
+ }
} else if (target instanceof BeanProcessor) {
answer = new ManagedBeanProcessor(context, (BeanProcessor) target, definition);
} else if (target instanceof IdempotentConsumer) {
Added: camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThroughputLogger.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThroughputLogger.java?rev=1413385&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThroughputLogger.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThroughputLogger.java Sun Nov 25 18:43:55 2012
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.management.mbean;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.api.management.ManagedResource;
+import org.apache.camel.api.management.mbean.ManagedThroughputLoggerMBean;
+import org.apache.camel.model.ProcessorDefinition;
+import org.apache.camel.processor.ThroughputLogger;
+
+/**
+ *
+ */
+@ManagedResource(description = "Managed ThroughputLogger")
+public class ManagedThroughputLogger extends ManagedProcessor implements ManagedThroughputLoggerMBean {
+
+ private final ThroughputLogger logger;
+
+ public ManagedThroughputLogger(CamelContext context, ThroughputLogger logger, ProcessorDefinition<?> definition) {
+ super(context, logger, definition);
+ this.logger = logger;
+ }
+
+ public ThroughputLogger getLogger() {
+ return logger;
+ }
+
+ @Override
+ public synchronized void reset() {
+ super.reset();
+ logger.reset();
+ }
+
+ public int getReceivedCounter() {
+ return logger.getReceivedCounter();
+ }
+
+ public double getAverage() {
+ return logger.getAverage();
+ }
+
+ public double getRate() {
+ return logger.getRate();
+ }
+
+ public String getLastLogMessage() {
+ return logger.getLastLogMessage();
+ }
+
+ public void resetThroughputLogger() {
+ logger.reset();
+ }
+}
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThroughputLogger.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThroughputLogger.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThroughputLogger.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThroughputLogger.java?rev=1413385&r1=1413384&r2=1413385&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThroughputLogger.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThroughputLogger.java Sun Nov 25 18:43:55 2012
@@ -51,6 +51,9 @@ public class ThroughputLogger extends Se
private CamelContext camelContext;
private ScheduledExecutorService logSchedulerService;
private CamelLogger log;
+ private String lastLogMessage;
+ private double rate;
+ private double average;
public ThroughputLogger(CamelLogger log) {
this.log = log;
@@ -81,7 +84,8 @@ public class ThroughputLogger extends Se
//only process if groupSize is set...otherwise we're in groupInterval mode
if (groupSize != null) {
if (receivedCount % groupSize == 0) {
- log.log(createLogMessage(exchange, receivedCount));
+ lastLogMessage = createLogMessage(exchange, receivedCount);
+ log.log(lastLogMessage);
}
}
}
@@ -139,7 +143,33 @@ public class ThroughputLogger extends Se
public void setAction(String action) {
this.action = action;
}
-
+
+ public void reset() {
+ startTime = 0;
+ receivedCounter.set(0);
+ groupStartTime = 0;
+ groupReceivedCount = 0;
+ average = 0.0d;
+ rate = 0.0d;
+ lastLogMessage = null;
+ }
+
+ public double getRate() {
+ return rate;
+ }
+
+ public double getAverage() {
+ return average;
+ }
+
+ public int getReceivedCounter() {
+ return receivedCounter.get();
+ }
+
+ public String getLastLogMessage() {
+ return lastLogMessage;
+ }
+
@Override
public void doStart() throws Exception {
// if an interval was specified, create a background thread
@@ -168,8 +198,8 @@ public class ThroughputLogger extends Se
groupStartTime = startTime;
}
- double rate = messagesPerSecond(groupSize, groupStartTime, time);
- double average = messagesPerSecond(receivedCount, startTime, time);
+ rate = messagesPerSecond(groupSize, groupStartTime, time);
+ average = messagesPerSecond(receivedCount, startTime, time);
long duration = time - groupStartTime;
groupStartTime = time;
@@ -216,16 +246,16 @@ public class ThroughputLogger extends Se
long duration = time - groupStartTime;
long currentCount = receivedCount - groupReceivedCount;
- double rate = messagesPerSecond(currentCount, groupStartTime, time);
- double average = messagesPerSecond(receivedCount, startTime, time);
+ rate = messagesPerSecond(currentCount, groupStartTime, time);
+ average = messagesPerSecond(receivedCount, startTime, time);
groupStartTime = time;
groupReceivedCount = receivedCount;
- String message = getAction() + ": " + currentCount + " new messages, with total " + receivedCount + " so far. Last group took: " + duration
+ lastLogMessage = getAction() + ": " + currentCount + " new messages, with total " + receivedCount + " so far. Last group took: " + duration
+ " millis which is: " + numberFormat.format(rate)
+ " messages per second. average: " + numberFormat.format(average);
- log.log(message);
+ log.log(lastLogMessage);
}
protected double messagesPerSecond(long messageCount, long startTime, long endTime) {
Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedLogEndpointTest.java (from r1413338, camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedLoadBalancerTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedLogEndpointTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedLogEndpointTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedLoadBalancerTest.java&r1=1413338&r2=1413385&rev=1413385&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedLoadBalancerTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedLogEndpointTest.java Sun Nov 25 18:43:55 2012
@@ -24,33 +24,60 @@ import org.apache.camel.builder.RouteBui
/**
* @version
*/
-public class ManagedLoadBalancerTest extends ManagementTestSupport {
+public class ManagedLogEndpointTest extends ManagementTestSupport {
- public void testLoadBalancer() throws Exception {
- getMockEndpoint("mock:a").expectedBodiesReceived("Hello World", "Hi World");
- getMockEndpoint("mock:b").expectedBodiesReceived("Bye World");
-
- template.sendBody("direct:start", "Hello World");
- template.sendBody("direct:start", "Bye World");
- template.sendBody("direct:start", "Hi World");
+ public void testLogEndpoint() throws Exception {
+ getMockEndpoint("mock:a").expectedMessageCount(10);
+
+ for (int i = 0; i < 10; i++) {
+ template.sendBody("direct:start", "" + i);
+ Thread.sleep(100);
+ }
assertMockEndpointsSatisfied();
MBeanServer mbeanServer = getMBeanServer();
- ObjectName name = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=endpoints,name=\"mock://a\"");
- Long queueSize = (Long) mbeanServer.invoke(name, "queueSize", null, null);
- assertEquals(2, queueSize.intValue());
-
- name = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=endpoints,name=\"mock://b\"");
- queueSize = (Long) mbeanServer.invoke(name, "queueSize", null, null);
- assertEquals(1, queueSize.intValue());
-
- name = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=processors,name=\"myBalancer\"");
+ ObjectName name = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=processors,name=\"log-foo\"");
mbeanServer.isRegistered(name);
Long total = (Long) mbeanServer.getAttribute(name, "ExchangesTotal");
- assertEquals(3, total.intValue());
+ assertEquals(10, total.intValue());
+
+ Integer received = (Integer) mbeanServer.getAttribute(name, "ReceivedCounter");
+ assertEquals(10, received.intValue());
+
+ String last = (String) mbeanServer.getAttribute(name, "LastLogMessage");
+ assertNotNull(last);
+ assertTrue(last.startsWith("Received: 10 messages so far."));
+
+ Double rate = (Double) mbeanServer.getAttribute(name, "Rate");
+ assertNotNull(rate);
+ assertTrue(rate > 0);
+
+ Double average = (Double) mbeanServer.getAttribute(name, "Average");
+ assertNotNull(average);
+ assertTrue(average > 0);
+
+ // reset
+ mbeanServer.invoke(name, "resetThroughputLogger", null, null);
+
+ // total not reset
+ total = (Long) mbeanServer.getAttribute(name, "ExchangesTotal");
+ assertEquals(10, total.intValue());
+
+ // but the last log message is
+ last = (String) mbeanServer.getAttribute(name, "LastLogMessage");
+ assertNull(last);
+
+ received = (Integer) mbeanServer.getAttribute(name, "ReceivedCounter");
+ assertEquals(0, received.intValue());
+
+ rate = (Double) mbeanServer.getAttribute(name, "Rate");
+ assertEquals(0.0d, rate);
+
+ average = (Double) mbeanServer.getAttribute(name, "Average");
+ assertEquals(0.0d, average);
}
@Override
@@ -59,8 +86,8 @@ public class ManagedLoadBalancerTest ext
@Override
public void configure() throws Exception {
from("direct:start").routeId("foo")
- .loadBalance().id("myBalancer").roundRobin()
- .to("mock:a").to("mock:b");
+ .to("log:foo?groupSize=10").id("log-foo")
+ .to("mock:a");
}
};
}