You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2023/12/20 16:34:22 UTC
(camel) branch main updated: CAMEL-20256: adjust ThroughputLogger to use the StopWatch (#12508)
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new a44c375ebb5 CAMEL-20256: adjust ThroughputLogger to use the StopWatch (#12508)
a44c375ebb5 is described below
commit a44c375ebb5c2c4b6a3651a815b8ebbd7c30a386
Author: Otavio Rodolfo Piske <or...@users.noreply.github.com>
AuthorDate: Wed Dec 20 13:34:15 2023 -0300
CAMEL-20256: adjust ThroughputLogger to use the StopWatch (#12508)
---
.../camel/support/processor/ThroughputLogger.java | 52 +++++++---------------
1 file changed, 15 insertions(+), 37 deletions(-)
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/processor/ThroughputLogger.java b/core/camel-support/src/main/java/org/apache/camel/support/processor/ThroughputLogger.java
index a72555c8bcd..9ebc7341b27 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/processor/ThroughputLogger.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/processor/ThroughputLogger.java
@@ -30,6 +30,7 @@ import org.apache.camel.spi.IdAware;
import org.apache.camel.spi.RouteIdAware;
import org.apache.camel.support.AsyncProcessorSupport;
import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,8 +50,8 @@ public class ThroughputLogger extends AsyncProcessorSupport implements AsyncProc
private Integer groupSize;
private long groupDelay = 1000;
private Long groupInterval;
- private long startTime;
- private long groupStartTime;
+ private final StopWatch groupWatch = new StopWatch();
+ private final StopWatch testWatch = new StopWatch();
private String action = "Received";
private CamelContext camelContext;
private ScheduledExecutorService logSchedulerService;
@@ -101,9 +102,6 @@ public class ThroughputLogger extends AsyncProcessorSupport implements AsyncProc
@Override
public void process(Exchange exchange) throws Exception {
- if (startTime == 0) {
- startTime = System.currentTimeMillis();
- }
long receivedCount = receivedCounter.incrementAndGet();
//only process if groupSize is set...otherwise we're in groupInterval mode
@@ -181,9 +179,7 @@ public class ThroughputLogger extends AsyncProcessorSupport implements AsyncProc
}
public void reset() {
- startTime = 0;
receivedCounter.set(0);
- groupStartTime = 0;
groupReceivedCount = 0;
average = 0.0d;
rate = 0.0d;
@@ -230,18 +226,13 @@ public class ThroughputLogger extends AsyncProcessorSupport implements AsyncProc
}
protected String createLogMessage(Exchange exchange, long receivedCount) {
- long time = System.currentTimeMillis();
- if (groupStartTime == 0) {
- groupStartTime = startTime;
- }
-
- rate = messagesPerSecond(groupSize, groupStartTime, time);
- average = messagesPerSecond(receivedCount, startTime, time);
+ final long groupDuration = groupWatch.takenAndRestart();
+ final long testDuration = testWatch.taken();
- long duration = time - groupStartTime;
- groupStartTime = time;
+ rate = messagesPerSecond(groupSize, groupDuration);
+ average = messagesPerSecond(receivedCount, testDuration);
- return getAction() + ": " + receivedCount + " messages so far. Last group took: " + duration
+ return getAction() + ": " + receivedCount + " messages so far. Last group took: " + groupDuration
+ " millis which is: " + numberFormat.format(rate)
+ " messages per second. average: " + numberFormat.format(average);
}
@@ -265,45 +256,32 @@ public class ThroughputLogger extends AsyncProcessorSupport implements AsyncProc
}
protected void createGroupIntervalLogMessage() {
-
- // this indicates that no messages have been received yet...don't logger yet
- if (startTime == 0) {
- return;
- }
-
long receivedCount = receivedCounter.get();
- // if configured, hide logger messages when no new messages have been received
if (groupActiveOnly && receivedCount == groupReceivedCount) {
return;
}
- long time = System.currentTimeMillis();
- if (groupStartTime == 0) {
- groupStartTime = startTime;
- }
+ final long groupDuration = groupWatch.takenAndRestart();
+ final long testDuration = testWatch.taken();
- long duration = time - groupStartTime;
long currentCount = receivedCount - groupReceivedCount;
- rate = messagesPerSecond(currentCount, groupStartTime, time);
- average = messagesPerSecond(receivedCount, startTime, time);
+ rate = messagesPerSecond(currentCount, groupDuration);
+ average = messagesPerSecond(receivedCount, testDuration);
- groupStartTime = time;
groupReceivedCount = receivedCount;
lastLogMessage = getAction() + ": " + currentCount + " new messages, with total " + receivedCount
- + " so far. Last group took: " + duration
+ + " so far. Last group took: " + groupDuration
+ " millis which is: " + numberFormat.format(rate)
+ " messages per second. average: " + numberFormat.format(average);
logger.log(lastLogMessage);
}
- protected double messagesPerSecond(long messageCount, long startTime, long endTime) {
+ protected double messagesPerSecond(long messageCount, long duration) {
// timeOneMessage = elapsed / messageCount
// messagePerSend = 1000 / timeOneMessage
- double rate = messageCount * 1000.0;
- rate /= endTime - startTime;
- return rate;
+ return (messageCount * 1000.0) / duration;
}
}