You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mx...@apache.org on 2016/09/28 16:48:59 UTC

[1/2] incubator-beam git commit: [BEAM-615] Add Support for Processing-Time Timers in FlinkRunner Window Operator

Repository: incubator-beam
Updated Branches:
  refs/heads/master b5853a624 -> a1ac2222d


[BEAM-615] Add Support for Processing-Time Timers in FlinkRunner Window Operator


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/59f62318
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/59f62318
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/59f62318

Branch: refs/heads/master
Commit: 59f623189184b225723ebd5686d912aa296ce35b
Parents: 3879db0
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Sep 28 11:49:54 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Wed Sep 28 18:46:30 2016 +0200

----------------------------------------------------------------------
 .../wrappers/streaming/WindowDoFnOperator.java  | 179 +++++++++++++++++--
 1 file changed, 165 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/59f62318/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
index 14a3ca7..e06a783 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
@@ -17,6 +17,9 @@
  */
 package org.apache.beam.runners.flink.translation.wrappers.streaming;
 
+
+import com.google.common.collect.HashMultiset;
+import com.google.common.collect.Multiset;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -26,12 +29,14 @@ import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.PriorityQueue;
 import java.util.Queue;
 import java.util.Set;
+import java.util.concurrent.ScheduledFuture;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn;
 import org.apache.beam.runners.core.SystemReduceFn;
@@ -53,12 +58,15 @@ import org.apache.beam.sdk.util.state.StateInternalsFactory;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
+
+
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
 import org.joda.time.Instant;
 
@@ -69,7 +77,8 @@ import org.joda.time.Instant;
  * @param <OutputT>
  */
 public class WindowDoFnOperator<K, InputT, OutputT>
-    extends DoFnOperator<KeyedWorkItem<K, InputT>, KV<K, OutputT>, WindowedValue<KV<K, OutputT>>> {
+    extends DoFnOperator<KeyedWorkItem<K, InputT>, KV<K, OutputT>, WindowedValue<KV<K, OutputT>>>
+    implements Triggerable {
 
   private final Coder<K> keyCoder;
   private final TimerInternals.TimerDataCoder timerCoder;
@@ -77,6 +86,11 @@ public class WindowDoFnOperator<K, InputT, OutputT>
   private transient Set<Tuple2<ByteBuffer, TimerInternals.TimerData>> watermarkTimers;
   private transient Queue<Tuple2<ByteBuffer, TimerInternals.TimerData>> watermarkTimersQueue;
 
+  private transient Queue<Tuple2<ByteBuffer, TimerInternals.TimerData>> processingTimeTimersQueue;
+  private transient Set<Tuple2<ByteBuffer, TimerInternals.TimerData>> processingTimeTimers;
+  private transient Multiset<Long> processingTimeTimerTimestamps;
+  private transient Map<Long, ScheduledFuture<?>> processingTimeTimerFutures;
+
   private FlinkStateInternals<K> stateInternals;
 
   private final SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow> systemReduceFn;
@@ -151,6 +165,24 @@ public class WindowDoFnOperator<K, InputT, OutputT>
           });
     }
 
+    if (processingTimeTimers == null) {
+      processingTimeTimers = new HashSet<>();
+      processingTimeTimerTimestamps = HashMultiset.create();
+      processingTimeTimersQueue = new PriorityQueue<>(
+          10,
+          new Comparator<Tuple2<ByteBuffer, TimerInternals.TimerData>>() {
+            @Override
+            public int compare(
+                Tuple2<ByteBuffer, TimerInternals.TimerData> o1,
+                Tuple2<ByteBuffer, TimerInternals.TimerData> o2) {
+              return o1.f1.compareTo(o2.f1);
+            }
+          });
+    }
+
+    // ScheduledFutures are not checkpointed
+    processingTimeTimerFutures = new HashMap<>();
+
     stateInternals = new FlinkStateInternals<>(getStateBackend(), keyCoder);
 
     // call super at the end because this will call getDoFn() which requires stateInternals
@@ -177,6 +209,69 @@ public class WindowDoFnOperator<K, InputT, OutputT>
     if (watermarkTimers.remove(keyedTimer)) {
       watermarkTimersQueue.remove(keyedTimer);
     }
+  }
+
+  private void registerProcessingTimeTimer(TimerInternals.TimerData timer) {
+    Tuple2<ByteBuffer, TimerInternals.TimerData> keyedTimer =
+        new Tuple2<>((ByteBuffer) getStateBackend().getCurrentKey(), timer);
+    if (processingTimeTimers.add(keyedTimer)) {
+      processingTimeTimersQueue.add(keyedTimer);
+
+      // If this is the first timer added for this timestamp register a timer Task
+      if (processingTimeTimerTimestamps.add(timer.getTimestamp().getMillis(), 1) == 0) {
+        ScheduledFuture<?> scheduledFuture = registerTimer(timer.getTimestamp().getMillis(), this);
+        processingTimeTimerFutures.put(timer.getTimestamp().getMillis(), scheduledFuture);
+      }
+    }
+  }
+
+  private void deleteProcessingTimeTimer(TimerInternals.TimerData timer) {
+    Tuple2<ByteBuffer, TimerInternals.TimerData> keyedTimer =
+        new Tuple2<>((ByteBuffer) getStateBackend().getCurrentKey(), timer);
+    if (processingTimeTimers.remove(keyedTimer)) {
+      processingTimeTimersQueue.remove(keyedTimer);
+
+      // If there are no timers left for this timestamp, remove it from queue and cancel the
+      // timer Task
+      if (processingTimeTimerTimestamps.remove(timer.getTimestamp().getMillis(), 1) == 1) {
+        ScheduledFuture<?> triggerTaskFuture =
+            processingTimeTimerFutures.remove(timer.getTimestamp().getMillis());
+        if (triggerTaskFuture != null && !triggerTaskFuture.isDone()) {
+          triggerTaskFuture.cancel(false);
+        }
+      }
+
+    }
+  }
+
+  @Override
+  public void trigger(long time) throws Exception {
+
+    //Remove information about the triggering task
+    processingTimeTimerFutures.remove(time);
+    processingTimeTimerTimestamps.setCount(time, 0);
+
+    boolean fire;
+
+    do {
+      Tuple2<ByteBuffer, TimerInternals.TimerData> timer = processingTimeTimersQueue.peek();
+      if (timer != null && timer.f1.getTimestamp().getMillis() <= time) {
+        fire = true;
+
+        processingTimeTimersQueue.remove();
+        processingTimeTimers.remove(timer);
+
+        setKeyContext(timer.f0);
+
+        pushbackDoFnRunner.processElement(WindowedValue.valueInGlobalWindow(
+            KeyedWorkItems.<K, InputT>timersWorkItem(
+                stateInternals.getKey(),
+                Collections.singletonList(timer.f1))));
+
+      } else {
+        fire = false;
+      }
+    } while (fire);
 
   }
 
@@ -262,20 +357,21 @@ public class WindowDoFnOperator<K, InputT, OutputT>
 
   private void restoreTimers(InputStream in) throws IOException {
     DataInputStream dataIn = new DataInputStream(in);
+
     int numWatermarkTimers = dataIn.readInt();
 
     watermarkTimers = new HashSet<>(numWatermarkTimers);
 
     watermarkTimersQueue = new PriorityQueue<>(
-            Math.max(numWatermarkTimers, 1),
-            new Comparator<Tuple2<ByteBuffer, TimerInternals.TimerData>>() {
-              @Override
-              public int compare(
-                      Tuple2<ByteBuffer, TimerInternals.TimerData> o1,
-                      Tuple2<ByteBuffer, TimerInternals.TimerData> o2) {
-                return o1.f1.compareTo(o2.f1);
-              }
-            });
+        Math.max(numWatermarkTimers, 1),
+        new Comparator<Tuple2<ByteBuffer, TimerInternals.TimerData>>() {
+          @Override
+          public int compare(
+                  Tuple2<ByteBuffer, TimerInternals.TimerData> o1,
+                  Tuple2<ByteBuffer, TimerInternals.TimerData> o2) {
+            return o1.f1.compareTo(o2.f1);
+          }
+        });
 
     for (int i = 0; i < numWatermarkTimers; i++) {
       int length = dataIn.readInt();
@@ -288,6 +384,44 @@ public class WindowDoFnOperator<K, InputT, OutputT>
         watermarkTimersQueue.add(keyedTimer);
       }
     }
+
+    int numProcessingTimeTimers = dataIn.readInt();
+
+    processingTimeTimers = new HashSet<>(numProcessingTimeTimers);
+    processingTimeTimersQueue = new PriorityQueue<>(
+        Math.max(numProcessingTimeTimers, 1),
+        new Comparator<Tuple2<ByteBuffer, TimerInternals.TimerData>>() {
+          @Override
+          public int compare(
+              Tuple2<ByteBuffer, TimerInternals.TimerData> o1,
+              Tuple2<ByteBuffer, TimerInternals.TimerData> o2) {
+            return o1.f1.compareTo(o2.f1);
+          }
+        });
+
+    processingTimeTimerTimestamps = HashMultiset.create();
+    processingTimeTimerFutures = new HashMap<>();
+
+    for (int i = 0; i < numProcessingTimeTimers; i++) {
+      int length = dataIn.readInt();
+      byte[] keyBytes = new byte[length];
+      dataIn.readFully(keyBytes);
+      TimerInternals.TimerData timerData = timerCoder.decode(dataIn, Coder.Context.NESTED);
+      Tuple2<ByteBuffer, TimerInternals.TimerData> keyedTimer =
+          new Tuple2<>(ByteBuffer.wrap(keyBytes), timerData);
+      if (processingTimeTimers.add(keyedTimer)) {
+        processingTimeTimersQueue.add(keyedTimer);
+
+        //If this is the first timer added for this timestamp register a timer Task
+        if (processingTimeTimerTimestamps.add(timerData.getTimestamp().getMillis(), 1) == 0) {
+          // this registers a timer with the Flink processing-time service
+          ScheduledFuture<?> scheduledFuture =
+              registerTimer(timerData.getTimestamp().getMillis(), this);
+          processingTimeTimerFutures.put(timerData.getTimestamp().getMillis(), scheduledFuture);
+        }
+
+      }
+    }
   }
 
   private void snapshotTimers(OutputStream out) throws IOException {
@@ -298,6 +432,13 @@ public class WindowDoFnOperator<K, InputT, OutputT>
       dataOut.write(timer.f0.array(), 0, timer.f0.limit());
       timerCoder.encode(timer.f1, dataOut, Coder.Context.NESTED);
     }
+
+    dataOut.writeInt(processingTimeTimersQueue.size());
+    for (Tuple2<ByteBuffer, TimerInternals.TimerData> timer : processingTimeTimersQueue) {
+      dataOut.writeInt(timer.f0.limit());
+      dataOut.write(timer.f0.array(), 0, timer.f0.limit());
+      timerCoder.encode(timer.f1, dataOut, Coder.Context.NESTED);
+    }
   }
 
   /**
@@ -313,25 +454,35 @@ public class WindowDoFnOperator<K, InputT, OutputT>
         public void setTimer(TimerData timerKey) {
           if (timerKey.getDomain().equals(TimeDomain.EVENT_TIME)) {
             registerEventTimeTimer(timerKey);
+          } else if (timerKey.getDomain().equals(TimeDomain.PROCESSING_TIME)) {
+            registerProcessingTimeTimer(timerKey);
           } else {
-            throw new UnsupportedOperationException("Processing-time timers not supported.");
+            throw new UnsupportedOperationException(
+                "Unsupported time domain: " + timerKey.getDomain());
           }
         }
 
         @Override
         public void deleteTimer(TimerData timerKey) {
-          deleteEventTimeTimer(timerKey);
+          if (timerKey.getDomain().equals(TimeDomain.EVENT_TIME)) {
+            deleteEventTimeTimer(timerKey);
+          } else if (timerKey.getDomain().equals(TimeDomain.PROCESSING_TIME)) {
+            deleteProcessingTimeTimer(timerKey);
+          } else {
+            throw new UnsupportedOperationException(
+                "Unsupported time domain: " + timerKey.getDomain());
+          }
         }
 
         @Override
         public Instant currentProcessingTime() {
-          return Instant.now();
+          return new Instant(getCurrentProcessingTime());
         }
 
         @Nullable
         @Override
         public Instant currentSynchronizedProcessingTime() {
-          return Instant.now();
+          return new Instant(getCurrentProcessingTime());
         }
 
         @Override


[2/2] incubator-beam git commit: This closes #1021

Posted by mx...@apache.org.
This closes #1021


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a1ac2222
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a1ac2222
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a1ac2222

Branch: refs/heads/master
Commit: a1ac2222dca13f6902faad30f37a877c1e6fb218
Parents: b5853a6 59f6231
Author: Maximilian Michels <mx...@apache.org>
Authored: Wed Sep 28 18:46:58 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Wed Sep 28 18:46:58 2016 +0200

----------------------------------------------------------------------
 .../wrappers/streaming/WindowDoFnOperator.java  | 179 +++++++++++++++++--
 1 file changed, 165 insertions(+), 14 deletions(-)
----------------------------------------------------------------------