You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mx...@apache.org on 2016/09/28 16:48:59 UTC
[1/2] incubator-beam git commit: [BEAM-615] Add Support for
Processing-Time Timers in FlinkRunner Window Operator
Repository: incubator-beam
Updated Branches:
refs/heads/master b5853a624 -> a1ac2222d
[BEAM-615] Add Support for Processing-Time Timers in FlinkRunner Window Operator
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/59f62318
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/59f62318
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/59f62318
Branch: refs/heads/master
Commit: 59f623189184b225723ebd5686d912aa296ce35b
Parents: 3879db0
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Sep 28 11:49:54 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Wed Sep 28 18:46:30 2016 +0200
----------------------------------------------------------------------
.../wrappers/streaming/WindowDoFnOperator.java | 179 +++++++++++++++++--
1 file changed, 165 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/59f62318/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
index 14a3ca7..e06a783 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
@@ -17,6 +17,9 @@
*/
package org.apache.beam.runners.flink.translation.wrappers.streaming;
+
+import com.google.common.collect.HashMultiset;
+import com.google.common.collect.Multiset;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
@@ -26,12 +29,14 @@ import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Set;
+import java.util.concurrent.ScheduledFuture;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn;
import org.apache.beam.runners.core.SystemReduceFn;
@@ -53,12 +58,15 @@ import org.apache.beam.sdk.util.state.StateInternalsFactory;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
+
+
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
import org.joda.time.Instant;
@@ -69,7 +77,8 @@ import org.joda.time.Instant;
* @param <OutputT>
*/
public class WindowDoFnOperator<K, InputT, OutputT>
- extends DoFnOperator<KeyedWorkItem<K, InputT>, KV<K, OutputT>, WindowedValue<KV<K, OutputT>>> {
+ extends DoFnOperator<KeyedWorkItem<K, InputT>, KV<K, OutputT>, WindowedValue<KV<K, OutputT>>>
+ implements Triggerable {
private final Coder<K> keyCoder;
private final TimerInternals.TimerDataCoder timerCoder;
@@ -77,6 +86,11 @@ public class WindowDoFnOperator<K, InputT, OutputT>
private transient Set<Tuple2<ByteBuffer, TimerInternals.TimerData>> watermarkTimers;
private transient Queue<Tuple2<ByteBuffer, TimerInternals.TimerData>> watermarkTimersQueue;
+ private transient Queue<Tuple2<ByteBuffer, TimerInternals.TimerData>> processingTimeTimersQueue;
+ private transient Set<Tuple2<ByteBuffer, TimerInternals.TimerData>> processingTimeTimers;
+ private transient Multiset<Long> processingTimeTimerTimestamps;
+ private transient Map<Long, ScheduledFuture<?>> processingTimeTimerFutures;
+
private FlinkStateInternals<K> stateInternals;
private final SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow> systemReduceFn;
@@ -151,6 +165,24 @@ public class WindowDoFnOperator<K, InputT, OutputT>
});
}
+ if (processingTimeTimers == null) {
+ processingTimeTimers = new HashSet<>();
+ processingTimeTimerTimestamps = HashMultiset.create();
+ processingTimeTimersQueue = new PriorityQueue<>(
+ 10,
+ new Comparator<Tuple2<ByteBuffer, TimerInternals.TimerData>>() {
+ @Override
+ public int compare(
+ Tuple2<ByteBuffer, TimerInternals.TimerData> o1,
+ Tuple2<ByteBuffer, TimerInternals.TimerData> o2) {
+ return o1.f1.compareTo(o2.f1);
+ }
+ });
+ }
+
+ // ScheduledFutures are not checkpointed
+ processingTimeTimerFutures = new HashMap<>();
+
stateInternals = new FlinkStateInternals<>(getStateBackend(), keyCoder);
// call super at the end because this will call getDoFn() which requires stateInternals
@@ -177,6 +209,69 @@ public class WindowDoFnOperator<K, InputT, OutputT>
if (watermarkTimers.remove(keyedTimer)) {
watermarkTimersQueue.remove(keyedTimer);
}
+ }
+
+ private void registerProcessingTimeTimer(TimerInternals.TimerData timer) {
+ Tuple2<ByteBuffer, TimerInternals.TimerData> keyedTimer =
+ new Tuple2<>((ByteBuffer) getStateBackend().getCurrentKey(), timer);
+ if (processingTimeTimers.add(keyedTimer)) {
+ processingTimeTimersQueue.add(keyedTimer);
+
+ // If this is the first timer added for this timestamp register a timer Task
+ if (processingTimeTimerTimestamps.add(timer.getTimestamp().getMillis(), 1) == 0) {
+ ScheduledFuture<?> scheduledFuture = registerTimer(timer.getTimestamp().getMillis(), this);
+ processingTimeTimerFutures.put(timer.getTimestamp().getMillis(), scheduledFuture);
+ }
+ }
+ }
+
+ private void deleteProcessingTimeTimer(TimerInternals.TimerData timer) {
+ Tuple2<ByteBuffer, TimerInternals.TimerData> keyedTimer =
+ new Tuple2<>((ByteBuffer) getStateBackend().getCurrentKey(), timer);
+ if (processingTimeTimers.remove(keyedTimer)) {
+ processingTimeTimersQueue.remove(keyedTimer);
+
+ // If there are no timers left for this timestamp, remove it from queue and cancel the
+ // timer Task
+ if (processingTimeTimerTimestamps.remove(timer.getTimestamp().getMillis(), 1) == 1) {
+ ScheduledFuture<?> triggerTaskFuture =
+ processingTimeTimerFutures.remove(timer.getTimestamp().getMillis());
+ if (triggerTaskFuture != null && !triggerTaskFuture.isDone()) {
+ triggerTaskFuture.cancel(false);
+ }
+ }
+
+ }
+ }
+
+ @Override
+ public void trigger(long time) throws Exception {
+
+ //Remove information about the triggering task
+ processingTimeTimerFutures.remove(time);
+ processingTimeTimerTimestamps.setCount(time, 0);
+
+ boolean fire;
+
+ do {
+ Tuple2<ByteBuffer, TimerInternals.TimerData> timer = processingTimeTimersQueue.peek();
+ if (timer != null && timer.f1.getTimestamp().getMillis() <= time) {
+ fire = true;
+
+ processingTimeTimersQueue.remove();
+ processingTimeTimers.remove(timer);
+
+ setKeyContext(timer.f0);
+
+ pushbackDoFnRunner.processElement(WindowedValue.valueInGlobalWindow(
+ KeyedWorkItems.<K, InputT>timersWorkItem(
+ stateInternals.getKey(),
+ Collections.singletonList(timer.f1))));
+
+ } else {
+ fire = false;
+ }
+ } while (fire);
}
@@ -262,20 +357,21 @@ public class WindowDoFnOperator<K, InputT, OutputT>
private void restoreTimers(InputStream in) throws IOException {
DataInputStream dataIn = new DataInputStream(in);
+
int numWatermarkTimers = dataIn.readInt();
watermarkTimers = new HashSet<>(numWatermarkTimers);
watermarkTimersQueue = new PriorityQueue<>(
- Math.max(numWatermarkTimers, 1),
- new Comparator<Tuple2<ByteBuffer, TimerInternals.TimerData>>() {
- @Override
- public int compare(
- Tuple2<ByteBuffer, TimerInternals.TimerData> o1,
- Tuple2<ByteBuffer, TimerInternals.TimerData> o2) {
- return o1.f1.compareTo(o2.f1);
- }
- });
+ Math.max(numWatermarkTimers, 1),
+ new Comparator<Tuple2<ByteBuffer, TimerInternals.TimerData>>() {
+ @Override
+ public int compare(
+ Tuple2<ByteBuffer, TimerInternals.TimerData> o1,
+ Tuple2<ByteBuffer, TimerInternals.TimerData> o2) {
+ return o1.f1.compareTo(o2.f1);
+ }
+ });
for (int i = 0; i < numWatermarkTimers; i++) {
int length = dataIn.readInt();
@@ -288,6 +384,44 @@ public class WindowDoFnOperator<K, InputT, OutputT>
watermarkTimersQueue.add(keyedTimer);
}
}
+
+ int numProcessingTimeTimers = dataIn.readInt();
+
+ processingTimeTimers = new HashSet<>(numProcessingTimeTimers);
+ processingTimeTimersQueue = new PriorityQueue<>(
+ Math.max(numProcessingTimeTimers, 1),
+ new Comparator<Tuple2<ByteBuffer, TimerInternals.TimerData>>() {
+ @Override
+ public int compare(
+ Tuple2<ByteBuffer, TimerInternals.TimerData> o1,
+ Tuple2<ByteBuffer, TimerInternals.TimerData> o2) {
+ return o1.f1.compareTo(o2.f1);
+ }
+ });
+
+ processingTimeTimerTimestamps = HashMultiset.create();
+ processingTimeTimerFutures = new HashMap<>();
+
+ for (int i = 0; i < numProcessingTimeTimers; i++) {
+ int length = dataIn.readInt();
+ byte[] keyBytes = new byte[length];
+ dataIn.readFully(keyBytes);
+ TimerInternals.TimerData timerData = timerCoder.decode(dataIn, Coder.Context.NESTED);
+ Tuple2<ByteBuffer, TimerInternals.TimerData> keyedTimer =
+ new Tuple2<>(ByteBuffer.wrap(keyBytes), timerData);
+ if (processingTimeTimers.add(keyedTimer)) {
+ processingTimeTimersQueue.add(keyedTimer);
+
+ //If this is the first timer added for this timestamp register a timer Task
+ if (processingTimeTimerTimestamps.add(timerData.getTimestamp().getMillis(), 1) == 0) {
+ // this registers a timer with the Flink processing-time service
+ ScheduledFuture<?> scheduledFuture =
+ registerTimer(timerData.getTimestamp().getMillis(), this);
+ processingTimeTimerFutures.put(timerData.getTimestamp().getMillis(), scheduledFuture);
+ }
+
+ }
+ }
}
private void snapshotTimers(OutputStream out) throws IOException {
@@ -298,6 +432,13 @@ public class WindowDoFnOperator<K, InputT, OutputT>
dataOut.write(timer.f0.array(), 0, timer.f0.limit());
timerCoder.encode(timer.f1, dataOut, Coder.Context.NESTED);
}
+
+ dataOut.writeInt(processingTimeTimersQueue.size());
+ for (Tuple2<ByteBuffer, TimerInternals.TimerData> timer : processingTimeTimersQueue) {
+ dataOut.writeInt(timer.f0.limit());
+ dataOut.write(timer.f0.array(), 0, timer.f0.limit());
+ timerCoder.encode(timer.f1, dataOut, Coder.Context.NESTED);
+ }
}
/**
@@ -313,25 +454,35 @@ public class WindowDoFnOperator<K, InputT, OutputT>
public void setTimer(TimerData timerKey) {
if (timerKey.getDomain().equals(TimeDomain.EVENT_TIME)) {
registerEventTimeTimer(timerKey);
+ } else if (timerKey.getDomain().equals(TimeDomain.PROCESSING_TIME)) {
+ registerProcessingTimeTimer(timerKey);
} else {
- throw new UnsupportedOperationException("Processing-time timers not supported.");
+ throw new UnsupportedOperationException(
+ "Unsupported time domain: " + timerKey.getDomain());
}
}
@Override
public void deleteTimer(TimerData timerKey) {
- deleteEventTimeTimer(timerKey);
+ if (timerKey.getDomain().equals(TimeDomain.EVENT_TIME)) {
+ deleteEventTimeTimer(timerKey);
+ } else if (timerKey.getDomain().equals(TimeDomain.PROCESSING_TIME)) {
+ deleteProcessingTimeTimer(timerKey);
+ } else {
+ throw new UnsupportedOperationException(
+ "Unsupported time domain: " + timerKey.getDomain());
+ }
}
@Override
public Instant currentProcessingTime() {
- return Instant.now();
+ return new Instant(getCurrentProcessingTime());
}
@Nullable
@Override
public Instant currentSynchronizedProcessingTime() {
- return Instant.now();
+ return new Instant(getCurrentProcessingTime());
}
@Override
[2/2] incubator-beam git commit: This closes #1021
Posted by mx...@apache.org.
This closes #1021
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a1ac2222
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a1ac2222
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a1ac2222
Branch: refs/heads/master
Commit: a1ac2222dca13f6902faad30f37a877c1e6fb218
Parents: b5853a6 59f6231
Author: Maximilian Michels <mx...@apache.org>
Authored: Wed Sep 28 18:46:58 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Wed Sep 28 18:46:58 2016 +0200
----------------------------------------------------------------------
.../wrappers/streaming/WindowDoFnOperator.java | 179 +++++++++++++++++--
1 file changed, 165 insertions(+), 14 deletions(-)
----------------------------------------------------------------------