You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2021/04/29 06:15:20 UTC
[beam] 01/03: [BEAM-12247] Reduce memory/string creations in
InMemoryTimerInternals
This is an automated email from the ASF dual-hosted git repository.
iemejia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
commit b87f1f01197a78d60fb08a0ee7a4ef96bc4e08eb
Author: Daniel Kulp <dk...@apache.org>
AuthorDate: Wed Apr 28 14:16:31 2021 -0400
[BEAM-12247] Reduce memory/string creations in InMemoryTimerInternals
---
.../beam/runners/core/InMemoryTimerInternals.java | 46 ++++++++--------------
1 file changed, 16 insertions(+), 30 deletions(-)
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
index 8be9081..d0b3bed 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
@@ -63,6 +63,9 @@ public class InMemoryTimerInternals implements TimerInternals {
/** Current synchronized processing time. */
private Instant synchronizedProcessingTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
+ /** Class.getSimpleName() cached to avoid allocations for tracing. */
+ private static final String SIMPLE_NAME = InMemoryTimerInternals.class.getSimpleName();
+
@Override
public @Nullable Instant currentOutputWatermarkTime() {
return outputWatermarkTime;
@@ -125,17 +128,12 @@ public class InMemoryTimerInternals implements TimerInternals {
@Deprecated
@Override
public void setTimer(TimerData timerData) {
- WindowTracing.trace("{}.setTimer: {}", getClass().getSimpleName(), timerData);
+ WindowTracing.trace("{}.setTimer: {}", SIMPLE_NAME, timerData);
- @Nullable
- TimerData existing =
- existingTimers.get(
- timerData.getNamespace(), timerData.getTimerId() + '+' + timerData.getTimerFamilyId());
+ @Nullable String colKey = timerData.getTimerId() + '+' + timerData.getTimerFamilyId();
+ TimerData existing = existingTimers.get(timerData.getNamespace(), colKey);
if (existing == null) {
- existingTimers.put(
- timerData.getNamespace(),
- timerData.getTimerId() + '+' + timerData.getTimerFamilyId(),
- timerData);
+ existingTimers.put(timerData.getNamespace(), colKey, timerData);
timersForDomain(timerData.getDomain()).add(timerData);
} else {
checkArgument(
@@ -149,10 +147,7 @@ public class InMemoryTimerInternals implements TimerInternals {
NavigableSet<TimerData> timers = timersForDomain(timerData.getDomain());
timers.remove(existing);
timers.add(timerData);
- existingTimers.put(
- timerData.getNamespace(),
- timerData.getTimerId() + '+' + timerData.getTimerFamilyId(),
- timerData);
+ existingTimers.put(timerData.getNamespace(), colKey, timerData);
}
}
}
@@ -216,7 +211,7 @@ public class InMemoryTimerInternals implements TimerInternals {
newInputWatermark);
WindowTracing.trace(
"{}.advanceInputWatermark: from {} to {}",
- getClass().getSimpleName(),
+ SIMPLE_NAME,
inputWatermarkTime,
newInputWatermark);
inputWatermarkTime = newInputWatermark;
@@ -229,7 +224,7 @@ public class InMemoryTimerInternals implements TimerInternals {
if (newOutputWatermark.isAfter(inputWatermarkTime)) {
WindowTracing.trace(
"{}.advanceOutputWatermark: clipping output watermark from {} to {}",
- getClass().getSimpleName(),
+ SIMPLE_NAME,
newOutputWatermark,
inputWatermarkTime);
adjustedOutputWatermark = inputWatermarkTime;
@@ -244,7 +239,7 @@ public class InMemoryTimerInternals implements TimerInternals {
adjustedOutputWatermark);
WindowTracing.trace(
"{}.advanceOutputWatermark: from {} to {}",
- getClass().getSimpleName(),
+ SIMPLE_NAME,
outputWatermarkTime,
adjustedOutputWatermark);
outputWatermarkTime = adjustedOutputWatermark;
@@ -259,10 +254,7 @@ public class InMemoryTimerInternals implements TimerInternals {
processingTime,
newProcessingTime);
WindowTracing.trace(
- "{}.advanceProcessingTime: from {} to {}",
- getClass().getSimpleName(),
- processingTime,
- newProcessingTime);
+ "{}.advanceProcessingTime: from {} to {}", SIMPLE_NAME, processingTime, newProcessingTime);
processingTime = newProcessingTime;
}
@@ -277,7 +269,7 @@ public class InMemoryTimerInternals implements TimerInternals {
newSynchronizedProcessingTime);
WindowTracing.trace(
"{}.advanceProcessingTime: from {} to {}",
- getClass().getSimpleName(),
+ SIMPLE_NAME,
synchronizedProcessingTime,
newSynchronizedProcessingTime);
synchronizedProcessingTime = newSynchronizedProcessingTime;
@@ -288,10 +280,7 @@ public class InMemoryTimerInternals implements TimerInternals {
TimerData timer = removeNextTimer(inputWatermarkTime, TimeDomain.EVENT_TIME);
if (timer != null) {
WindowTracing.trace(
- "{}.removeNextEventTimer: firing {} at {}",
- getClass().getSimpleName(),
- timer,
- inputWatermarkTime);
+ "{}.removeNextEventTimer: firing {} at {}", SIMPLE_NAME, timer, inputWatermarkTime);
}
return timer;
}
@@ -301,10 +290,7 @@ public class InMemoryTimerInternals implements TimerInternals {
TimerData timer = removeNextTimer(processingTime, TimeDomain.PROCESSING_TIME);
if (timer != null) {
WindowTracing.trace(
- "{}.removeNextProcessingTimer: firing {} at {}",
- getClass().getSimpleName(),
- timer,
- processingTime);
+ "{}.removeNextProcessingTimer: firing {} at {}", SIMPLE_NAME, timer, processingTime);
}
return timer;
}
@@ -316,7 +302,7 @@ public class InMemoryTimerInternals implements TimerInternals {
if (timer != null) {
WindowTracing.trace(
"{}.removeNextSynchronizedProcessingTimer: firing {} at {}",
- getClass().getSimpleName(),
+ SIMPLE_NAME,
timer,
synchronizedProcessingTime);
}