You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2021/04/29 06:15:20 UTC

[beam] 01/03: [BEAM-12247] Reduce memory/string creations in InMemoryTimerInternals

This is an automated email from the ASF dual-hosted git repository.

iemejia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit b87f1f01197a78d60fb08a0ee7a4ef96bc4e08eb
Author: Daniel Kulp <dk...@apache.org>
AuthorDate: Wed Apr 28 14:16:31 2021 -0400

    [BEAM-12247] Reduce memory/string creations in InMemoryTimerInternals
---
 .../beam/runners/core/InMemoryTimerInternals.java  | 46 ++++++++--------------
 1 file changed, 16 insertions(+), 30 deletions(-)

diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
index 8be9081..d0b3bed 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
@@ -63,6 +63,9 @@ public class InMemoryTimerInternals implements TimerInternals {
   /** Current synchronized processing time. */
   private Instant synchronizedProcessingTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
 
+  /** Class.getSimpleName() cached to avoid allocations for tracing. */
+  private static final String SIMPLE_NAME = InMemoryTimerInternals.class.getSimpleName();
+
   @Override
   public @Nullable Instant currentOutputWatermarkTime() {
     return outputWatermarkTime;
@@ -125,17 +128,12 @@ public class InMemoryTimerInternals implements TimerInternals {
   @Deprecated
   @Override
   public void setTimer(TimerData timerData) {
-    WindowTracing.trace("{}.setTimer: {}", getClass().getSimpleName(), timerData);
+    WindowTracing.trace("{}.setTimer: {}", SIMPLE_NAME, timerData);
 
-    @Nullable
-    TimerData existing =
-        existingTimers.get(
-            timerData.getNamespace(), timerData.getTimerId() + '+' + timerData.getTimerFamilyId());
+    @Nullable String colKey = timerData.getTimerId() + '+' + timerData.getTimerFamilyId();
+    TimerData existing = existingTimers.get(timerData.getNamespace(), colKey);
     if (existing == null) {
-      existingTimers.put(
-          timerData.getNamespace(),
-          timerData.getTimerId() + '+' + timerData.getTimerFamilyId(),
-          timerData);
+      existingTimers.put(timerData.getNamespace(), colKey, timerData);
       timersForDomain(timerData.getDomain()).add(timerData);
     } else {
       checkArgument(
@@ -149,10 +147,7 @@ public class InMemoryTimerInternals implements TimerInternals {
         NavigableSet<TimerData> timers = timersForDomain(timerData.getDomain());
         timers.remove(existing);
         timers.add(timerData);
-        existingTimers.put(
-            timerData.getNamespace(),
-            timerData.getTimerId() + '+' + timerData.getTimerFamilyId(),
-            timerData);
+        existingTimers.put(timerData.getNamespace(), colKey, timerData);
       }
     }
   }
@@ -216,7 +211,7 @@ public class InMemoryTimerInternals implements TimerInternals {
         newInputWatermark);
     WindowTracing.trace(
         "{}.advanceInputWatermark: from {} to {}",
-        getClass().getSimpleName(),
+        SIMPLE_NAME,
         inputWatermarkTime,
         newInputWatermark);
     inputWatermarkTime = newInputWatermark;
@@ -229,7 +224,7 @@ public class InMemoryTimerInternals implements TimerInternals {
     if (newOutputWatermark.isAfter(inputWatermarkTime)) {
       WindowTracing.trace(
           "{}.advanceOutputWatermark: clipping output watermark from {} to {}",
-          getClass().getSimpleName(),
+          SIMPLE_NAME,
           newOutputWatermark,
           inputWatermarkTime);
       adjustedOutputWatermark = inputWatermarkTime;
@@ -244,7 +239,7 @@ public class InMemoryTimerInternals implements TimerInternals {
         adjustedOutputWatermark);
     WindowTracing.trace(
         "{}.advanceOutputWatermark: from {} to {}",
-        getClass().getSimpleName(),
+        SIMPLE_NAME,
         outputWatermarkTime,
         adjustedOutputWatermark);
     outputWatermarkTime = adjustedOutputWatermark;
@@ -259,10 +254,7 @@ public class InMemoryTimerInternals implements TimerInternals {
         processingTime,
         newProcessingTime);
     WindowTracing.trace(
-        "{}.advanceProcessingTime: from {} to {}",
-        getClass().getSimpleName(),
-        processingTime,
-        newProcessingTime);
+        "{}.advanceProcessingTime: from {} to {}", SIMPLE_NAME, processingTime, newProcessingTime);
     processingTime = newProcessingTime;
   }
 
@@ -277,7 +269,7 @@ public class InMemoryTimerInternals implements TimerInternals {
         newSynchronizedProcessingTime);
     WindowTracing.trace(
         "{}.advanceProcessingTime: from {} to {}",
-        getClass().getSimpleName(),
+        SIMPLE_NAME,
         synchronizedProcessingTime,
         newSynchronizedProcessingTime);
     synchronizedProcessingTime = newSynchronizedProcessingTime;
@@ -288,10 +280,7 @@ public class InMemoryTimerInternals implements TimerInternals {
     TimerData timer = removeNextTimer(inputWatermarkTime, TimeDomain.EVENT_TIME);
     if (timer != null) {
       WindowTracing.trace(
-          "{}.removeNextEventTimer: firing {} at {}",
-          getClass().getSimpleName(),
-          timer,
-          inputWatermarkTime);
+          "{}.removeNextEventTimer: firing {} at {}", SIMPLE_NAME, timer, inputWatermarkTime);
     }
     return timer;
   }
@@ -301,10 +290,7 @@ public class InMemoryTimerInternals implements TimerInternals {
     TimerData timer = removeNextTimer(processingTime, TimeDomain.PROCESSING_TIME);
     if (timer != null) {
       WindowTracing.trace(
-          "{}.removeNextProcessingTimer: firing {} at {}",
-          getClass().getSimpleName(),
-          timer,
-          processingTime);
+          "{}.removeNextProcessingTimer: firing {} at {}", SIMPLE_NAME, timer, processingTime);
     }
     return timer;
   }
@@ -316,7 +302,7 @@ public class InMemoryTimerInternals implements TimerInternals {
     if (timer != null) {
       WindowTracing.trace(
           "{}.removeNextSynchronizedProcessingTimer: firing {} at {}",
-          getClass().getSimpleName(),
+          SIMPLE_NAME,
           timer,
           synchronizedProcessingTime);
     }