You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/08/17 11:07:16 UTC

[GitHub] [beam] jbartok commented on a change in pull request #12567: Update Jet Runner

jbartok commented on a change in pull request #12567:
URL: https://github.com/apache/beam/pull/12567#discussion_r471405630



##########
File path: runners/jet/src/main/java/org/apache/beam/runners/jet/processors/StatefulParDoP.java
##########
@@ -276,36 +262,94 @@ Processor getEx(
           inputValueCoder,
           outputValueCoders,
           ordinalToSideInput,
+          sideInputMapping,
           ownerId,
           stepId);
     }
   }
 
-  private static class KeyedStepContext implements StepContext {
+  private class KeyedStepContext implements StepContext {
 
-    private final Map<Object, InMemoryStateInternals> stateInternalsOfKeys;
-    private final InMemoryTimerInternals timerInternals;
+    private final Object nullKey = new Object();
 
-    private InMemoryStateInternals currentStateInternals;
+    private final ConcurrentHashMap<Object, InMemoryStateInternals> keyedStateInternals;
+    private final ConcurrentHashMap<Object, InMemoryTimerInternals> keyedTimerInternals;
 
-    KeyedStepContext(InMemoryTimerInternals timerInternals) {
-      this.stateInternalsOfKeys = new HashMap<>();
-      this.timerInternals = timerInternals;
+    @SuppressWarnings("ThreadLocalUsage")
+    private final ThreadLocal<Object> currentKey = new ThreadLocal<>();
+
+    KeyedStepContext() {
+      this.keyedStateInternals = new ConcurrentHashMap<>();
+      this.keyedTimerInternals = new ConcurrentHashMap<>();
     }
 
     void setKey(Object key) {
-      currentStateInternals =
-          stateInternalsOfKeys.computeIfAbsent(key, InMemoryStateInternals::forKey);
+      Object normalizedKey = key == null ? nullKey : key;
+      currentKey.set(normalizedKey);
+      keyedStateInternals.computeIfAbsent(normalizedKey, InMemoryStateInternals::forKey);
+      keyedTimerInternals.computeIfAbsent(normalizedKey, k -> new InMemoryTimerInternals());
+    }
+
+    void clearKey() {
+      currentKey.remove();
     }
 
     @Override
     public StateInternals stateInternals() {
-      return currentStateInternals;
+      Object key = currentKey.get();
+      if (key == null) {
+        throw new IllegalStateException("Active key should be set");
+      }
+      return keyedStateInternals.get(key);
     }
 
     @Override
     public TimerInternals timerInternals() {
-      return timerInternals;
+      Object key = currentKey.get();
+      if (key == null) {
+        throw new IllegalStateException("Active key should be set");
+      }
+      return keyedTimerInternals.get(key);
+    }
+
+    public void advanceProcessingTimes() {
+      Instant now = Instant.now();
+      keyedTimerInternals
+          .values()
+          .forEach(
+              timerInternals -> {
+                try {
+                  timerInternals.advanceProcessingTime(now);
+                  timerInternals.advanceSynchronizedProcessingTime(now);
+                } catch (Exception e) {
+                  throw new RuntimeException("Failed advancing time!");
+                }
+              });
+    }
+
+    public void flushTimers(long watermark) {
+      Instant watermarkInstant = new Instant(watermark);
+      keyedTimerInternals
+          .entrySet()
+          .forEach(
+              (entry) -> {
+                InMemoryTimerInternals timerInternals = entry.getValue();
+                if (timerInternals.currentInputWatermarkTime().isBefore(watermark)) {
+                  try {
+                    timerInternals.advanceInputWatermark(watermarkInstant);
+                    if (watermarkInstant.equals(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
+                      timerInternals.advanceProcessingTime(watermarkInstant);
+                      timerInternals.advanceSynchronizedProcessingTime(watermarkInstant);

Review comment:
       ok




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org