You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mx...@apache.org on 2016/03/15 17:07:04 UTC
[09/13] incubator-beam git commit: [flink] restructure and cleanup
Maven layout
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
new file mode 100644
index 0000000..10c8bbf
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming;
+
+import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.common.base.Preconditions;
+import com.google.cloud.dataflow.sdk.transforms.Aggregator;
+import com.google.cloud.dataflow.sdk.transforms.Combine;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
+import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
+import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn;
+import com.google.cloud.dataflow.sdk.util.*;
+import com.google.cloud.dataflow.sdk.values.PCollectionView;
+import com.google.cloud.dataflow.sdk.values.TupleTag;
+import com.google.common.base.Throwables;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.accumulators.AccumulatorHelper;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.util.Collector;
+import org.joda.time.Instant;
+import org.joda.time.format.PeriodFormat;
+
+import java.util.Collection;
+
+/**
+ * An abstract class that encapsulates the common code of the the {@link com.google.cloud.dataflow.sdk.transforms.ParDo.Bound}
+ * and {@link com.google.cloud.dataflow.sdk.transforms.ParDo.BoundMulti} wrappers. See the {@link FlinkParDoBoundWrapper} and
+ * {@link FlinkParDoBoundMultiWrapper} for the actual wrappers of the aforementioned transformations.
+ * */
+public abstract class FlinkAbstractParDoWrapper<IN, OUTDF, OUTFL> extends RichFlatMapFunction<WindowedValue<IN>, WindowedValue<OUTFL>> {
+
+ private final DoFn<IN, OUTDF> doFn;
+ private final WindowingStrategy<?, ?> windowingStrategy;
+ private transient PipelineOptions options;
+
+ private DoFnProcessContext context;
+
+ public FlinkAbstractParDoWrapper(PipelineOptions options, WindowingStrategy<?, ?> windowingStrategy, DoFn<IN, OUTDF> doFn) {
+ Preconditions.checkNotNull(options);
+ Preconditions.checkNotNull(windowingStrategy);
+ Preconditions.checkNotNull(doFn);
+
+ this.doFn = doFn;
+ this.options = options;
+ this.windowingStrategy = windowingStrategy;
+ }
+
+ private void initContext(DoFn<IN, OUTDF> function, Collector<WindowedValue<OUTFL>> outCollector) {
+ if (this.context == null) {
+ this.context = new DoFnProcessContext(function, outCollector);
+ }
+ }
+
+ @Override
+ public void flatMap(WindowedValue<IN> value, Collector<WindowedValue<OUTFL>> out) throws Exception {
+ this.initContext(doFn, out);
+
+ // for each window the element belongs to, create a new copy here.
+ Collection<? extends BoundedWindow> windows = value.getWindows();
+ if (windows.size() <= 1) {
+ processElement(value);
+ } else {
+ for (BoundedWindow window : windows) {
+ processElement(WindowedValue.of(
+ value.getValue(), value.getTimestamp(), window, value.getPane()));
+ }
+ }
+ }
+
+ private void processElement(WindowedValue<IN> value) throws Exception {
+ this.context.setElement(value);
+ this.doFn.startBundle(context);
+ doFn.processElement(context);
+ this.doFn.finishBundle(context);
+ }
+
+ private class DoFnProcessContext extends DoFn<IN, OUTDF>.ProcessContext {
+
+ private final DoFn<IN, OUTDF> fn;
+
+ protected final Collector<WindowedValue<OUTFL>> collector;
+
+ private WindowedValue<IN> element;
+
+ private DoFnProcessContext(DoFn<IN, OUTDF> function, Collector<WindowedValue<OUTFL>> outCollector) {
+ function.super();
+ super.setupDelegateAggregators();
+
+ this.fn = function;
+ this.collector = outCollector;
+ }
+
+ public void setElement(WindowedValue<IN> value) {
+ this.element = value;
+ }
+
+ @Override
+ public IN element() {
+ return this.element.getValue();
+ }
+
+ @Override
+ public Instant timestamp() {
+ return this.element.getTimestamp();
+ }
+
+ @Override
+ public BoundedWindow window() {
+ if (!(fn instanceof DoFn.RequiresWindowAccess)) {
+ throw new UnsupportedOperationException(
+ "window() is only available in the context of a DoFn marked as RequiresWindow.");
+ }
+
+ Collection<? extends BoundedWindow> windows = this.element.getWindows();
+ if (windows.size() != 1) {
+ throw new IllegalArgumentException("Each element is expected to belong to 1 window. " +
+ "This belongs to " + windows.size() + ".");
+ }
+ return windows.iterator().next();
+ }
+
+ @Override
+ public PaneInfo pane() {
+ return this.element.getPane();
+ }
+
+ @Override
+ public WindowingInternals<IN, OUTDF> windowingInternals() {
+ return windowingInternalsHelper(element, collector);
+ }
+
+ @Override
+ public PipelineOptions getPipelineOptions() {
+ return options;
+ }
+
+ @Override
+ public <T> T sideInput(PCollectionView<T> view) {
+ throw new RuntimeException("sideInput() is not supported in Streaming mode.");
+ }
+
+ @Override
+ public void output(OUTDF output) {
+ outputWithTimestamp(output, this.element.getTimestamp());
+ }
+
+ @Override
+ public void outputWithTimestamp(OUTDF output, Instant timestamp) {
+ outputWithTimestampHelper(element, output, timestamp, collector);
+ }
+
+ @Override
+ public <T> void sideOutput(TupleTag<T> tag, T output) {
+ sideOutputWithTimestamp(tag, output, this.element.getTimestamp());
+ }
+
+ @Override
+ public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+ sideOutputWithTimestampHelper(element, output, timestamp, collector, tag);
+ }
+
+ @Override
+ protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) {
+ Accumulator acc = getRuntimeContext().getAccumulator(name);
+ if (acc != null) {
+ AccumulatorHelper.compareAccumulatorTypes(name,
+ SerializableFnAggregatorWrapper.class, acc.getClass());
+ return (Aggregator<AggInputT, AggOutputT>) acc;
+ }
+
+ SerializableFnAggregatorWrapper<AggInputT, AggOutputT> accumulator =
+ new SerializableFnAggregatorWrapper<>(combiner);
+ getRuntimeContext().addAccumulator(name, accumulator);
+ return accumulator;
+ }
+ }
+
+ protected void checkTimestamp(WindowedValue<IN> ref, Instant timestamp) {
+ if (timestamp.isBefore(ref.getTimestamp().minus(doFn.getAllowedTimestampSkew()))) {
+ throw new IllegalArgumentException(String.format(
+ "Cannot output with timestamp %s. Output timestamps must be no earlier than the "
+ + "timestamp of the current input (%s) minus the allowed skew (%s). See the "
+ + "DoFn#getAllowedTimestmapSkew() Javadoc for details on changing the allowed skew.",
+ timestamp, ref.getTimestamp(),
+ PeriodFormat.getDefault().print(doFn.getAllowedTimestampSkew().toPeriod())));
+ }
+ }
+
+ protected <T> WindowedValue<T> makeWindowedValue(
+ T output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) {
+ final Instant inputTimestamp = timestamp;
+ final WindowFn windowFn = windowingStrategy.getWindowFn();
+
+ if (timestamp == null) {
+ timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
+ }
+
+ if (windows == null) {
+ try {
+ windows = windowFn.assignWindows(windowFn.new AssignContext() {
+ @Override
+ public Object element() {
+ throw new UnsupportedOperationException(
+ "WindowFn attempted to access input element when none was available");
+ }
+
+ @Override
+ public Instant timestamp() {
+ if (inputTimestamp == null) {
+ throw new UnsupportedOperationException(
+ "WindowFn attempted to access input timestamp when none was available");
+ }
+ return inputTimestamp;
+ }
+
+ @Override
+ public Collection<? extends BoundedWindow> windows() {
+ throw new UnsupportedOperationException(
+ "WindowFn attempted to access input windows when none were available");
+ }
+ });
+ } catch (Exception e) {
+ throw UserCodeException.wrap(e);
+ }
+ }
+
+ return WindowedValue.of(output, timestamp, windows, pane);
+ }
+
+ /////////// ABSTRACT METHODS TO BE IMPLEMENTED BY SUBCLASSES /////////////////
+
+ public abstract void outputWithTimestampHelper(
+ WindowedValue<IN> inElement,
+ OUTDF output,
+ Instant timestamp,
+ Collector<WindowedValue<OUTFL>> outCollector);
+
+ public abstract <T> void sideOutputWithTimestampHelper(
+ WindowedValue<IN> inElement,
+ T output,
+ Instant timestamp,
+ Collector<WindowedValue<OUTFL>> outCollector,
+ TupleTag<T> tag);
+
+ public abstract WindowingInternals<IN, OUTDF> windowingInternalsHelper(
+ WindowedValue<IN> inElement,
+ Collector<WindowedValue<OUTFL>> outCollector);
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
new file mode 100644
index 0000000..e115a15
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
@@ -0,0 +1,631 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming;
+
+import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
+import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper;
+import org.apache.beam.runners.flink.translation.wrappers.streaming.state.*;
+import com.google.cloud.dataflow.sdk.coders.*;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
+import com.google.cloud.dataflow.sdk.transforms.Aggregator;
+import com.google.cloud.dataflow.sdk.transforms.Combine;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
+import com.google.cloud.dataflow.sdk.transforms.windowing.OutputTimeFn;
+import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
+import com.google.cloud.dataflow.sdk.util.*;
+import com.google.cloud.dataflow.sdk.values.*;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.accumulators.AccumulatorHelper;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.operators.*;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
+import org.joda.time.Instant;
+
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * This class is the key class implementing all the windowing/triggering logic of Apache Beam.
+ * To provide full compatibility and support for all the windowing/triggering combinations offered by
+ * Beam, we opted for a strategy that uses the SDK's code for doing these operations. See the code in
+ * ({@link com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowsDoFn}.
+ * <p/>
+ * In a nutshell, when the execution arrives to this operator, we expect to have a stream <b>already
+ * grouped by key</b>. Each of the elements that enter here, registers a timer
+ * (see {@link TimerInternals#setTimer(TimerInternals.TimerData)} in the
+ * {@link FlinkGroupAlsoByWindowWrapper#activeTimers}.
+ * This is essentially a timestamp indicating when to trigger the computation over the window this
+ * element belongs to.
+ * <p/>
+ * When a watermark arrives, all the registered timers are checked to see which ones are ready to
+ * fire (see {@link FlinkGroupAlsoByWindowWrapper#processWatermark(Watermark)}). These are deregistered from
+ * the {@link FlinkGroupAlsoByWindowWrapper#activeTimers}
+ * list, and are fed into the {@link com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowsDoFn}
+ * for furhter processing.
+ */
+public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, VOUT>
+ extends AbstractStreamOperator<WindowedValue<KV<K, VOUT>>>
+ implements OneInputStreamOperator<WindowedValue<KV<K, VIN>>, WindowedValue<KV<K, VOUT>>> {
+
+ private static final long serialVersionUID = 1L;
+
+ private transient PipelineOptions options;
+
+ private transient CoderRegistry coderRegistry;
+
+ private DoFn<KeyedWorkItem<K, VIN>, KV<K, VOUT>> operator;
+
+ private ProcessContext context;
+
+ private final WindowingStrategy<KV<K, VIN>, BoundedWindow> windowingStrategy;
+
+ private final Combine.KeyedCombineFn<K, VIN, VACC, VOUT> combineFn;
+
+ private final KvCoder<K, VIN> inputKvCoder;
+
+ /**
+ * State is kept <b>per-key</b>. This data structure keeps this mapping between an active key, i.e. a
+ * key whose elements are currently waiting to be processed, and its associated state.
+ */
+ private Map<K, FlinkStateInternals<K>> perKeyStateInternals = new HashMap<>();
+
+ /**
+ * Timers waiting to be processed.
+ */
+ private Map<K, Set<TimerInternals.TimerData>> activeTimers = new HashMap<>();
+
+ private FlinkTimerInternals timerInternals = new FlinkTimerInternals();
+
+ /**
+ * Creates an DataStream where elements are grouped in windows based on the specified windowing strategy.
+ * This method assumes that <b>elements are already grouped by key</b>.
+ * <p/>
+ * The difference with {@link #createForIterable(PipelineOptions, PCollection, KeyedStream)}
+ * is that this method assumes that a combiner function is provided
+ * (see {@link com.google.cloud.dataflow.sdk.transforms.Combine.KeyedCombineFn}).
+ * A combiner helps at increasing the speed and, in most of the cases, reduce the per-window state.
+ *
+ * @param options the general job configuration options.
+ * @param input the input Dataflow {@link com.google.cloud.dataflow.sdk.values.PCollection}.
+ * @param groupedStreamByKey the input stream, it is assumed to already be grouped by key.
+ * @param combiner the combiner to be used.
+ * @param outputKvCoder the type of the output values.
+ */
+ public static <K, VIN, VACC, VOUT> DataStream<WindowedValue<KV<K, VOUT>>> create(
+ PipelineOptions options,
+ PCollection input,
+ KeyedStream<WindowedValue<KV<K, VIN>>, K> groupedStreamByKey,
+ Combine.KeyedCombineFn<K, VIN, VACC, VOUT> combiner,
+ KvCoder<K, VOUT> outputKvCoder) {
+ Preconditions.checkNotNull(options);
+
+ KvCoder<K, VIN> inputKvCoder = (KvCoder<K, VIN>) input.getCoder();
+ FlinkGroupAlsoByWindowWrapper windower = new FlinkGroupAlsoByWindowWrapper<>(options,
+ input.getPipeline().getCoderRegistry(), input.getWindowingStrategy(), inputKvCoder, combiner);
+
+ Coder<WindowedValue<KV<K, VOUT>>> windowedOutputElemCoder = WindowedValue.FullWindowedValueCoder.of(
+ outputKvCoder,
+ input.getWindowingStrategy().getWindowFn().windowCoder());
+
+ CoderTypeInformation<WindowedValue<KV<K, VOUT>>> outputTypeInfo =
+ new CoderTypeInformation<>(windowedOutputElemCoder);
+
+ DataStream<WindowedValue<KV<K, VOUT>>> groupedByKeyAndWindow = groupedStreamByKey
+ .transform("GroupByWindowWithCombiner",
+ new CoderTypeInformation<>(outputKvCoder),
+ windower)
+ .returns(outputTypeInfo);
+
+ return groupedByKeyAndWindow;
+ }
+
+ /**
+ * Creates an DataStream where elements are grouped in windows based on the specified windowing strategy.
+ * This method assumes that <b>elements are already grouped by key</b>.
+ * <p/>
+ * The difference with {@link #create(PipelineOptions, PCollection, KeyedStream, Combine.KeyedCombineFn, KvCoder)}
+ * is that this method assumes no combiner function
+ * (see {@link com.google.cloud.dataflow.sdk.transforms.Combine.KeyedCombineFn}).
+ *
+ * @param options the general job configuration options.
+ * @param input the input Dataflow {@link com.google.cloud.dataflow.sdk.values.PCollection}.
+ * @param groupedStreamByKey the input stream, it is assumed to already be grouped by key.
+ */
+ public static <K, VIN> DataStream<WindowedValue<KV<K, Iterable<VIN>>>> createForIterable(
+ PipelineOptions options,
+ PCollection input,
+ KeyedStream<WindowedValue<KV<K, VIN>>, K> groupedStreamByKey) {
+ Preconditions.checkNotNull(options);
+
+ KvCoder<K, VIN> inputKvCoder = (KvCoder<K, VIN>) input.getCoder();
+ Coder<K> keyCoder = inputKvCoder.getKeyCoder();
+ Coder<VIN> inputValueCoder = inputKvCoder.getValueCoder();
+
+ FlinkGroupAlsoByWindowWrapper windower = new FlinkGroupAlsoByWindowWrapper(options,
+ input.getPipeline().getCoderRegistry(), input.getWindowingStrategy(), inputKvCoder, null);
+
+ Coder<Iterable<VIN>> valueIterCoder = IterableCoder.of(inputValueCoder);
+ KvCoder<K, Iterable<VIN>> outputElemCoder = KvCoder.of(keyCoder, valueIterCoder);
+
+ Coder<WindowedValue<KV<K, Iterable<VIN>>>> windowedOutputElemCoder = WindowedValue.FullWindowedValueCoder.of(
+ outputElemCoder,
+ input.getWindowingStrategy().getWindowFn().windowCoder());
+
+ CoderTypeInformation<WindowedValue<KV<K, Iterable<VIN>>>> outputTypeInfo =
+ new CoderTypeInformation<>(windowedOutputElemCoder);
+
+ DataStream<WindowedValue<KV<K, Iterable<VIN>>>> groupedByKeyAndWindow = groupedStreamByKey
+ .transform("GroupByWindow",
+ new CoderTypeInformation<>(windowedOutputElemCoder),
+ windower)
+ .returns(outputTypeInfo);
+
+ return groupedByKeyAndWindow;
+ }
+
+ public static <K, VIN, VACC, VOUT> FlinkGroupAlsoByWindowWrapper
+ createForTesting(PipelineOptions options,
+ CoderRegistry registry,
+ WindowingStrategy<KV<K, VIN>, BoundedWindow> windowingStrategy,
+ KvCoder<K, VIN> inputCoder,
+ Combine.KeyedCombineFn<K, VIN, VACC, VOUT> combiner) {
+ Preconditions.checkNotNull(options);
+
+ return new FlinkGroupAlsoByWindowWrapper(options, registry, windowingStrategy, inputCoder, combiner);
+ }
+
+ private FlinkGroupAlsoByWindowWrapper(PipelineOptions options,
+ CoderRegistry registry,
+ WindowingStrategy<KV<K, VIN>, BoundedWindow> windowingStrategy,
+ KvCoder<K, VIN> inputCoder,
+ Combine.KeyedCombineFn<K, VIN, VACC, VOUT> combiner) {
+ Preconditions.checkNotNull(options);
+
+ this.options = Preconditions.checkNotNull(options);
+ this.coderRegistry = Preconditions.checkNotNull(registry);
+ this.inputKvCoder = Preconditions.checkNotNull(inputCoder);//(KvCoder<K, VIN>) input.getCoder();
+ this.windowingStrategy = Preconditions.checkNotNull(windowingStrategy);//input.getWindowingStrategy();
+ this.combineFn = combiner;
+ this.operator = createGroupAlsoByWindowOperator();
+ this.chainingStrategy = ChainingStrategy.ALWAYS;
+ }
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+ this.context = new ProcessContext(operator, new TimestampedCollector<>(output), this.timerInternals);
+ }
+
+ /**
+ * Create the adequate {@link com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowsDoFn},
+ * <b> if not already created</b>.
+ * If a {@link com.google.cloud.dataflow.sdk.transforms.Combine.KeyedCombineFn} was provided, then
+ * a function with that combiner is created, so that elements are combined as they arrive. This is
+ * done for speed and (in most of the cases) for reduction of the per-window state.
+ */
+ private <W extends BoundedWindow> DoFn<KeyedWorkItem<K, VIN>, KV<K, VOUT>> createGroupAlsoByWindowOperator() {
+ if (this.operator == null) {
+ if (this.combineFn == null) {
+ // Thus VOUT == Iterable<VIN>
+ Coder<VIN> inputValueCoder = inputKvCoder.getValueCoder();
+
+ this.operator = (DoFn) GroupAlsoByWindowViaWindowSetDoFn.create(
+ (WindowingStrategy<?, W>) this.windowingStrategy, SystemReduceFn.<K, VIN, W>buffering(inputValueCoder));
+ } else {
+ Coder<K> inputKeyCoder = inputKvCoder.getKeyCoder();
+
+ AppliedCombineFn<K, VIN, VACC, VOUT> appliedCombineFn = AppliedCombineFn
+ .withInputCoder(combineFn, coderRegistry, inputKvCoder);
+
+ this.operator = GroupAlsoByWindowViaWindowSetDoFn.create(
+ (WindowingStrategy<?, W>) this.windowingStrategy, SystemReduceFn.<K, VIN, VACC, VOUT, W>combining(inputKeyCoder, appliedCombineFn));
+ }
+ }
+ return this.operator;
+ }
+
+ private void processKeyedWorkItem(KeyedWorkItem<K, VIN> workItem) throws Exception {
+ context.setElement(workItem, getStateInternalsForKey(workItem.key()));
+
+ // TODO: Ideally startBundle/finishBundle would be called when the operator is first used / about to be discarded.
+ operator.startBundle(context);
+ operator.processElement(context);
+ operator.finishBundle(context);
+ }
+
+ @Override
+ public void processElement(StreamRecord<WindowedValue<KV<K, VIN>>> element) throws Exception {
+ ArrayList<WindowedValue<VIN>> elements = new ArrayList<>();
+ elements.add(WindowedValue.of(element.getValue().getValue().getValue(), element.getValue().getTimestamp(),
+ element.getValue().getWindows(), element.getValue().getPane()));
+ processKeyedWorkItem(KeyedWorkItems.elementsWorkItem(element.getValue().getValue().getKey(), elements));
+ }
+
+ @Override
+ public void processWatermark(Watermark mark) throws Exception {
+ context.setCurrentInputWatermark(new Instant(mark.getTimestamp()));
+
+ Multimap<K, TimerInternals.TimerData> timers = getTimersReadyToProcess(mark.getTimestamp());
+ if (!timers.isEmpty()) {
+ for (K key : timers.keySet()) {
+ processKeyedWorkItem(KeyedWorkItems.<K, VIN>timersWorkItem(key, timers.get(key)));
+ }
+ }
+
+ /**
+ * This is to take into account the different semantics of the Watermark in Flink and
+ * in Dataflow. To understand the reasoning behind the Dataflow semantics and its
+ * watermark holding logic, see the documentation of
+ * {@link WatermarkHold#addHold(ReduceFn.ProcessValueContext, boolean)}
+ * */
+ long millis = Long.MAX_VALUE;
+ for (FlinkStateInternals state : perKeyStateInternals.values()) {
+ Instant watermarkHold = state.getWatermarkHold();
+ if (watermarkHold != null && watermarkHold.getMillis() < millis) {
+ millis = watermarkHold.getMillis();
+ }
+ }
+
+ if (mark.getTimestamp() < millis) {
+ millis = mark.getTimestamp();
+ }
+
+ context.setCurrentOutputWatermark(new Instant(millis));
+
+ // Don't forget to re-emit the watermark for further operators down the line.
+ // This is critical for jobs with multiple aggregation steps.
+ // Imagine a job with a groupByKey() on key K1, followed by a map() that changes
+ // the key K1 to K2, and another groupByKey() on K2. In this case, if the watermark
+ // is not re-emitted, the second aggregation would never be triggered, and no result
+ // will be produced.
+ output.emitWatermark(new Watermark(millis));
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ }
+
+ private void registerActiveTimer(K key, TimerInternals.TimerData timer) {
+ Set<TimerInternals.TimerData> timersForKey = activeTimers.get(key);
+ if (timersForKey == null) {
+ timersForKey = new HashSet<>();
+ }
+ timersForKey.add(timer);
+ activeTimers.put(key, timersForKey);
+ }
+
+ private void unregisterActiveTimer(K key, TimerInternals.TimerData timer) {
+ Set<TimerInternals.TimerData> timersForKey = activeTimers.get(key);
+ if (timersForKey != null) {
+ timersForKey.remove(timer);
+ if (timersForKey.isEmpty()) {
+ activeTimers.remove(key);
+ } else {
+ activeTimers.put(key, timersForKey);
+ }
+ }
+ }
+
+ /**
+ * Returns the list of timers that are ready to fire. These are the timers
+ * that are registered to be triggered at a time before the current watermark.
+ * We keep these timers in a Set, so that they are deduplicated, as the same
+ * timer can be registered multiple times.
+ */
+ private Multimap<K, TimerInternals.TimerData> getTimersReadyToProcess(long currentWatermark) {
+
+ // we keep the timers to return in a different list and launch them later
+ // because we cannot prevent a trigger from registering another trigger,
+ // which would lead to concurrent modification exception.
+ Multimap<K, TimerInternals.TimerData> toFire = HashMultimap.create();
+
+ Iterator<Map.Entry<K, Set<TimerInternals.TimerData>>> it = activeTimers.entrySet().iterator();
+ while (it.hasNext()) {
+ Map.Entry<K, Set<TimerInternals.TimerData>> keyWithTimers = it.next();
+
+ Iterator<TimerInternals.TimerData> timerIt = keyWithTimers.getValue().iterator();
+ while (timerIt.hasNext()) {
+ TimerInternals.TimerData timerData = timerIt.next();
+ if (timerData.getTimestamp().isBefore(currentWatermark)) {
+ toFire.put(keyWithTimers.getKey(), timerData);
+ timerIt.remove();
+ }
+ }
+
+ if (keyWithTimers.getValue().isEmpty()) {
+ it.remove();
+ }
+ }
+ return toFire;
+ }
+
+ /**
+ * Gets the state associated with the specified key.
+ *
+ * @param key the key whose state we want.
+ * @return The {@link FlinkStateInternals}
+ * associated with that key.
+ */
+ private FlinkStateInternals<K> getStateInternalsForKey(K key) {
+ FlinkStateInternals<K> stateInternals = perKeyStateInternals.get(key);
+ if (stateInternals == null) {
+ Coder<? extends BoundedWindow> windowCoder = this.windowingStrategy.getWindowFn().windowCoder();
+ OutputTimeFn<? super BoundedWindow> outputTimeFn = this.windowingStrategy.getWindowFn().getOutputTimeFn();
+ stateInternals = new FlinkStateInternals<>(key, inputKvCoder.getKeyCoder(), windowCoder, outputTimeFn);
+ perKeyStateInternals.put(key, stateInternals);
+ }
+ return stateInternals;
+ }
+
+ private class FlinkTimerInternals extends AbstractFlinkTimerInternals<K, VIN> {
+ @Override
+ public void setTimer(TimerData timerKey) {
+ registerActiveTimer(context.element().key(), timerKey);
+ }
+
+ @Override
+ public void deleteTimer(TimerData timerKey) {
+ unregisterActiveTimer(context.element().key(), timerKey);
+ }
+ }
+
+ private class ProcessContext extends GroupAlsoByWindowViaWindowSetDoFn<K, VIN, VOUT, ?, KeyedWorkItem<K, VIN>>.ProcessContext {
+
+ private final FlinkTimerInternals timerInternals;
+
+ private final TimestampedCollector<WindowedValue<KV<K, VOUT>>> collector;
+
+ private FlinkStateInternals<K> stateInternals;
+
+ private KeyedWorkItem<K, VIN> element;
+
+ public ProcessContext(DoFn<KeyedWorkItem<K, VIN>, KV<K, VOUT>> function,
+ TimestampedCollector<WindowedValue<KV<K, VOUT>>> outCollector,
+ FlinkTimerInternals timerInternals) {
+ function.super();
+ super.setupDelegateAggregators();
+
+ this.collector = Preconditions.checkNotNull(outCollector);
+ this.timerInternals = Preconditions.checkNotNull(timerInternals);
+ }
+
+ public void setElement(KeyedWorkItem<K, VIN> element,
+ FlinkStateInternals<K> stateForKey) {
+ this.element = element;
+ this.stateInternals = stateForKey;
+ }
+
+ public void setCurrentInputWatermark(Instant watermark) {
+ this.timerInternals.setCurrentInputWatermark(watermark);
+ }
+
+ public void setCurrentOutputWatermark(Instant watermark) {
+ this.timerInternals.setCurrentOutputWatermark(watermark);
+ }
+
+ @Override
+ public KeyedWorkItem<K, VIN> element() {
+ return this.element;
+ }
+
+ @Override
+ public Instant timestamp() {
+ throw new UnsupportedOperationException("timestamp() is not available when processing KeyedWorkItems.");
+ }
+
+ @Override
+ public PipelineOptions getPipelineOptions() {
+ // TODO: PipelineOptions need to be available on the workers.
+ // Ideally they are captured as part of the pipeline.
+ // For now, construct empty options so that StateContexts.createFromComponents
+ // will yield a valid StateContext, which is needed to support the StateContext.window().
+ if (options == null) {
+ options = new PipelineOptions() {
+ @Override
+ public <T extends PipelineOptions> T as(Class<T> kls) {
+ return null;
+ }
+
+ @Override
+ public <T extends PipelineOptions> T cloneAs(Class<T> kls) {
+ return null;
+ }
+
+ @Override
+ public Class<? extends PipelineRunner<?>> getRunner() {
+ return null;
+ }
+
+ @Override
+ public void setRunner(Class<? extends PipelineRunner<?>> kls) {
+
+ }
+
+ @Override
+ public CheckEnabled getStableUniqueNames() {
+ return null;
+ }
+
+ @Override
+ public void setStableUniqueNames(CheckEnabled enabled) {
+ }
+ };
+ }
+ return options;
+ }
+
+ @Override
+ public void output(KV<K, VOUT> output) {
+ throw new UnsupportedOperationException(
+ "output() is not available when processing KeyedWorkItems.");
+ }
+
+ @Override
+ public void outputWithTimestamp(KV<K, VOUT> output, Instant timestamp) {
+ throw new UnsupportedOperationException(
+ "outputWithTimestamp() is not available when processing KeyedWorkItems.");
+ }
+
+ @Override
+ public PaneInfo pane() {
+ throw new UnsupportedOperationException("pane() is not available when processing KeyedWorkItems.");
+ }
+
+ @Override
+ public BoundedWindow window() {
+ throw new UnsupportedOperationException(
+ "window() is not available when processing KeyedWorkItems.");
+ }
+
+ @Override
+ public WindowingInternals<KeyedWorkItem<K, VIN>, KV<K, VOUT>> windowingInternals() {
+ return new WindowingInternals<KeyedWorkItem<K, VIN>, KV<K, VOUT>>() {
+
+ @Override
+ public com.google.cloud.dataflow.sdk.util.state.StateInternals stateInternals() {
+ return stateInternals;
+ }
+
+ @Override
+ public void outputWindowedValue(KV<K, VOUT> output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) {
+ // TODO: No need to represent timestamp twice.
+ collector.setAbsoluteTimestamp(timestamp.getMillis());
+ collector.collect(WindowedValue.of(output, timestamp, windows, pane));
+
+ }
+
+ @Override
+ public TimerInternals timerInternals() {
+ return timerInternals;
+ }
+
+ @Override
+ public Collection<? extends BoundedWindow> windows() {
+ throw new UnsupportedOperationException("windows() is not available in Streaming mode.");
+ }
+
+ @Override
+ public PaneInfo pane() {
+ throw new UnsupportedOperationException("pane() is not available in Streaming mode.");
+ }
+
+ @Override
+ public <T> void writePCollectionViewData(TupleTag<?> tag, Iterable<WindowedValue<T>> data, Coder<T> elemCoder) throws IOException {
+ throw new RuntimeException("writePCollectionViewData() not available in Streaming mode.");
+ }
+
+ @Override
+ public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) {
+ throw new RuntimeException("sideInput() is not available in Streaming mode.");
+ }
+ };
+ }
+
+ @Override
+ public <T> T sideInput(PCollectionView<T> view) {
+ throw new RuntimeException("sideInput() is not supported in Streaming mode.");
+ }
+
+ @Override
+ public <T> void sideOutput(TupleTag<T> tag, T output) {
+ // ignore the side output, this can happen when a user does not register
+ // side outputs but then outputs using a freshly created TupleTag.
+ throw new RuntimeException("sideOutput() is not available when grouping by window.");
+ }
+
+ @Override
+ public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+ sideOutput(tag, output);
+ }
+
+ @Override
+ protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) {
+ Accumulator acc = getRuntimeContext().getAccumulator(name);
+ if (acc != null) {
+ AccumulatorHelper.compareAccumulatorTypes(name,
+ SerializableFnAggregatorWrapper.class, acc.getClass());
+ return (Aggregator<AggInputT, AggOutputT>) acc;
+ }
+
+ SerializableFnAggregatorWrapper<AggInputT, AggOutputT> accumulator =
+ new SerializableFnAggregatorWrapper<>(combiner);
+ getRuntimeContext().addAccumulator(name, accumulator);
+ return accumulator;
+ }
+ }
+
+ ////////////// Checkpointing implementation ////////////////
+
+ @Override
+ public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
+ StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp);
+ AbstractStateBackend.CheckpointStateOutputView out = getStateBackend().createCheckpointStateOutputView(checkpointId, timestamp);
+ StateCheckpointWriter writer = StateCheckpointWriter.create(out);
+ Coder<K> keyCoder = inputKvCoder.getKeyCoder();
+
+ // checkpoint the timers
+ StateCheckpointUtils.encodeTimers(activeTimers, writer, keyCoder);
+
+ // checkpoint the state
+ StateCheckpointUtils.encodeState(perKeyStateInternals, writer, keyCoder);
+
+ // checkpoint the timerInternals
+ context.timerInternals.encodeTimerInternals(context, writer,
+ inputKvCoder, windowingStrategy.getWindowFn().windowCoder());
+
+ taskState.setOperatorState(out.closeAndGetHandle());
+ return taskState;
+ }
+
+ @Override
+ public void restoreState(StreamTaskState taskState, long recoveryTimestamp) throws Exception {
+ super.restoreState(taskState, recoveryTimestamp);
+
+ final ClassLoader userClassloader = getUserCodeClassloader();
+
+ Coder<? extends BoundedWindow> windowCoder = this.windowingStrategy.getWindowFn().windowCoder();
+ Coder<K> keyCoder = inputKvCoder.getKeyCoder();
+
+ @SuppressWarnings("unchecked")
+ StateHandle<DataInputView> inputState = (StateHandle<DataInputView>) taskState.getOperatorState();
+ DataInputView in = inputState.getState(userClassloader);
+ StateCheckpointReader reader = new StateCheckpointReader(in);
+
+ // restore the timers
+ this.activeTimers = StateCheckpointUtils.decodeTimers(reader, windowCoder, keyCoder);
+
+ // restore the state
+ this.perKeyStateInternals = StateCheckpointUtils.decodeState(
+ reader, windowingStrategy.getOutputTimeFn(), keyCoder, windowCoder, userClassloader);
+
+ // restore the timerInternals.
+ this.timerInternals.restoreTimerInternals(reader, inputKvCoder, windowCoder);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java
new file mode 100644
index 0000000..1a6a665
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming;
+
+import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
+import org.apache.beam.runners.flink.translation.types.VoidCoderTypeSerializer;
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.coders.KvCoder;
+import com.google.cloud.dataflow.sdk.coders.VoidCoder;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import com.google.cloud.dataflow.sdk.values.KV;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+
+/**
+ * This class groups the elements by key. It assumes that already the incoming stream
+ * is composed of <code>[Key,Value]</code> pairs.
+ * */
+public class FlinkGroupByKeyWrapper {
+
+ /**
+ * Just an auxiliary interface to bypass the fact that java anonymous classes cannot implement
+ * multiple interfaces.
+ */
+ private interface KeySelectorWithQueryableResultType<K, V> extends KeySelector<WindowedValue<KV<K, V>>, K>, ResultTypeQueryable<K> {
+ }
+
+ public static <K, V> KeyedStream<WindowedValue<KV<K, V>>, K> groupStreamByKey(DataStream<WindowedValue<KV<K, V>>> inputDataStream, KvCoder<K, V> inputKvCoder) {
+ final Coder<K> keyCoder = inputKvCoder.getKeyCoder();
+ final TypeInformation<K> keyTypeInfo = new CoderTypeInformation<>(keyCoder);
+ final boolean isKeyVoid = keyCoder instanceof VoidCoder;
+
+ return inputDataStream.keyBy(
+ new KeySelectorWithQueryableResultType<K, V>() {
+
+ @Override
+ public K getKey(WindowedValue<KV<K, V>> value) throws Exception {
+ return isKeyVoid ? (K) VoidCoderTypeSerializer.VoidValue.INSTANCE :
+ value.getValue().getKey();
+ }
+
+ @Override
+ public TypeInformation<K> getProducedType() {
+ return keyTypeInfo;
+ }
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java
new file mode 100644
index 0000000..df7f953
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming;
+
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.join.RawUnionValue;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import com.google.cloud.dataflow.sdk.util.WindowingInternals;
+import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
+import com.google.cloud.dataflow.sdk.values.TupleTag;
+import com.google.common.base.Preconditions;
+import org.apache.flink.util.Collector;
+import org.joda.time.Instant;
+
+import java.util.Map;
+
+/**
+ * A wrapper for the {@link com.google.cloud.dataflow.sdk.transforms.ParDo.BoundMulti} Beam transformation.
+ * */
+public class FlinkParDoBoundMultiWrapper<IN, OUT> extends FlinkAbstractParDoWrapper<IN, OUT, RawUnionValue> {
+
+ private final TupleTag<?> mainTag;
+ private final Map<TupleTag<?>, Integer> outputLabels;
+
+ public FlinkParDoBoundMultiWrapper(PipelineOptions options, WindowingStrategy<?, ?> windowingStrategy, DoFn<IN, OUT> doFn, TupleTag<?> mainTag, Map<TupleTag<?>, Integer> tagsToLabels) {
+ super(options, windowingStrategy, doFn);
+ this.mainTag = Preconditions.checkNotNull(mainTag);
+ this.outputLabels = Preconditions.checkNotNull(tagsToLabels);
+ }
+
+ @Override
+ public void outputWithTimestampHelper(WindowedValue<IN> inElement, OUT output, Instant timestamp, Collector<WindowedValue<RawUnionValue>> collector) {
+ checkTimestamp(inElement, timestamp);
+ Integer index = outputLabels.get(mainTag);
+ collector.collect(makeWindowedValue(
+ new RawUnionValue(index, output),
+ timestamp,
+ inElement.getWindows(),
+ inElement.getPane()));
+ }
+
+ @Override
+ public <T> void sideOutputWithTimestampHelper(WindowedValue<IN> inElement, T output, Instant timestamp, Collector<WindowedValue<RawUnionValue>> collector, TupleTag<T> tag) {
+ checkTimestamp(inElement, timestamp);
+ Integer index = outputLabels.get(tag);
+ if (index != null) {
+ collector.collect(makeWindowedValue(
+ new RawUnionValue(index, output),
+ timestamp,
+ inElement.getWindows(),
+ inElement.getPane()));
+ }
+ }
+
+ @Override
+ public WindowingInternals<IN, OUT> windowingInternalsHelper(WindowedValue<IN> inElement, Collector<WindowedValue<RawUnionValue>> outCollector) {
+ throw new RuntimeException("FlinkParDoBoundMultiWrapper is just an internal operator serving as " +
+ "an intermediate transformation for the ParDo.BoundMulti translation. windowingInternals() " +
+ "is not available in this class.");
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundWrapper.java
new file mode 100644
index 0000000..2ed5620
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundWrapper.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming;
+
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
+import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
+import com.google.cloud.dataflow.sdk.util.*;
+import com.google.cloud.dataflow.sdk.util.state.StateInternals;
+import com.google.cloud.dataflow.sdk.values.PCollectionView;
+import com.google.cloud.dataflow.sdk.values.TupleTag;
+import org.apache.flink.util.Collector;
+import org.joda.time.Instant;
+
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * A wrapper for the {@link com.google.cloud.dataflow.sdk.transforms.ParDo.Bound} Beam transformation.
+ * */
+public class FlinkParDoBoundWrapper<IN, OUT> extends FlinkAbstractParDoWrapper<IN, OUT, OUT> {
+
+ public FlinkParDoBoundWrapper(PipelineOptions options, WindowingStrategy<?, ?> windowingStrategy, DoFn<IN, OUT> doFn) {
+ super(options, windowingStrategy, doFn);
+ }
+
+ @Override
+ public void outputWithTimestampHelper(WindowedValue<IN> inElement, OUT output, Instant timestamp, Collector<WindowedValue<OUT>> collector) {
+ checkTimestamp(inElement, timestamp);
+ collector.collect(makeWindowedValue(
+ output,
+ timestamp,
+ inElement.getWindows(),
+ inElement.getPane()));
+ }
+
+ @Override
+ public <T> void sideOutputWithTimestampHelper(WindowedValue<IN> inElement, T output, Instant timestamp, Collector<WindowedValue<OUT>> outCollector, TupleTag<T> tag) {
+ // ignore the side output, this can happen when a user does not register
+ // side outputs but then outputs using a freshly created TupleTag.
+ throw new RuntimeException("sideOutput() not not available in ParDo.Bound().");
+ }
+
+ @Override
+ public WindowingInternals<IN, OUT> windowingInternalsHelper(final WindowedValue<IN> inElement, final Collector<WindowedValue<OUT>> collector) {
+ return new WindowingInternals<IN, OUT>() {
+ @Override
+ public StateInternals stateInternals() {
+ throw new NullPointerException("StateInternals are not available for ParDo.Bound().");
+ }
+
+ @Override
+ public void outputWindowedValue(OUT output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) {
+ collector.collect(makeWindowedValue(output, timestamp, windows, pane));
+ }
+
+ @Override
+ public TimerInternals timerInternals() {
+ throw new NullPointerException("TimeInternals are not available for ParDo.Bound().");
+ }
+
+ @Override
+ public Collection<? extends BoundedWindow> windows() {
+ return inElement.getWindows();
+ }
+
+ @Override
+ public PaneInfo pane() {
+ return inElement.getPane();
+ }
+
+ @Override
+ public <T> void writePCollectionViewData(TupleTag<?> tag, Iterable<WindowedValue<T>> data, Coder<T> elemCoder) throws IOException {
+ throw new RuntimeException("writePCollectionViewData() not supported in Streaming mode.");
+ }
+
+ @Override
+ public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) {
+ throw new RuntimeException("sideInput() not implemented.");
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java
new file mode 100644
index 0000000..f6c243f
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming.io;
+
+import org.apache.beam.runners.flink.translation.types.VoidCoderTypeSerializer;
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindow;
+import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.util.Collector;
+import org.joda.time.Instant;
+
+import java.io.ByteArrayInputStream;
+import java.util.List;
+
+/**
+ * This flat map function bootstraps from collection elements and turns them into WindowedValues
+ * (as required by the Flink runner).
+ */
+public class FlinkStreamingCreateFunction<IN, OUT> implements FlatMapFunction<IN, WindowedValue<OUT>> {
+
+ private final List<byte[]> elements;
+ private final Coder<OUT> coder;
+
+ public FlinkStreamingCreateFunction(List<byte[]> elements, Coder<OUT> coder) {
+ this.elements = elements;
+ this.coder = coder;
+ }
+
+ @Override
+ public void flatMap(IN value, Collector<WindowedValue<OUT>> out) throws Exception {
+
+ @SuppressWarnings("unchecked")
+ OUT voidValue = (OUT) VoidCoderTypeSerializer.VoidValue.INSTANCE;
+ for (byte[] element : elements) {
+ ByteArrayInputStream bai = new ByteArrayInputStream(element);
+ OUT outValue = coder.decode(bai, Coder.Context.OUTER);
+
+ if (outValue == null) {
+ out.collect(WindowedValue.of(voidValue, Instant.now(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING));
+ } else {
+ out.collect(WindowedValue.of(outValue, Instant.now(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING));
+ }
+ }
+
+ out.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java
new file mode 100644
index 0000000..2857efd
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming.io;
+
+import org.apache.beam.runners.flink.FlinkPipelineRunner;
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.io.UnboundedSource;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.common.base.Preconditions;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+
+import javax.annotation.Nullable;
+import java.util.List;
+
+/**
+ * A wrapper translating Flink Sources implementing the {@link RichParallelSourceFunction} interface, into
+ * unbounded Beam sources (see {@link UnboundedSource}).
+ * */
+public class UnboundedFlinkSource<T, C extends UnboundedSource.CheckpointMark> extends UnboundedSource<T, C> {
+
+ private final PipelineOptions options;
+ private final RichParallelSourceFunction<T> flinkSource;
+
+ public UnboundedFlinkSource(PipelineOptions pipelineOptions, RichParallelSourceFunction<T> source) {
+ if(!pipelineOptions.getRunner().equals(FlinkPipelineRunner.class)) {
+ throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner.");
+ }
+ options = Preconditions.checkNotNull(pipelineOptions);
+ flinkSource = Preconditions.checkNotNull(source);
+ validate();
+ }
+
+ public RichParallelSourceFunction<T> getFlinkSource() {
+ return this.flinkSource;
+ }
+
+ @Override
+ public List<? extends UnboundedSource<T, C>> generateInitialSplits(int desiredNumSplits, PipelineOptions options) throws Exception {
+ throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner.");
+ }
+
+ @Override
+ public UnboundedReader<T> createReader(PipelineOptions options, @Nullable C checkpointMark) {
+ throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner.");
+ }
+
+ @Nullable
+ @Override
+ public Coder<C> getCheckpointMarkCoder() {
+ throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner.");
+ }
+
+
+ @Override
+ public void validate() {
+ Preconditions.checkNotNull(options);
+ Preconditions.checkNotNull(flinkSource);
+ if(!options.getRunner().equals(FlinkPipelineRunner.class)) {
+ throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner.");
+ }
+ }
+
+ @Override
+ public Coder<T> getDefaultOutputCoder() {
+ throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner.");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java
new file mode 100644
index 0000000..1389e9d
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming.io;
+
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
+import com.google.cloud.dataflow.sdk.io.UnboundedSource;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Serializable;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.Collections;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+/**
+ * An example unbounded Beam source that reads input from a socket. This is used mainly for testing and debugging.
+ * */
+public class UnboundedSocketSource<C extends UnboundedSource.CheckpointMark> extends UnboundedSource<String, C> {
+
+ private static final Coder<String> DEFAULT_SOCKET_CODER = StringUtf8Coder.of();
+
+ private static final long serialVersionUID = 1L;
+
+ private static final int DEFAULT_CONNECTION_RETRY_SLEEP = 500;
+
+ private static final int CONNECTION_TIMEOUT_TIME = 0;
+
+ private final String hostname;
+ private final int port;
+ private final char delimiter;
+ private final long maxNumRetries;
+ private final long delayBetweenRetries;
+
+ public UnboundedSocketSource(String hostname, int port, char delimiter, long maxNumRetries) {
+ this(hostname, port, delimiter, maxNumRetries, DEFAULT_CONNECTION_RETRY_SLEEP);
+ }
+
+ public UnboundedSocketSource(String hostname, int port, char delimiter, long maxNumRetries, long delayBetweenRetries) {
+ this.hostname = hostname;
+ this.port = port;
+ this.delimiter = delimiter;
+ this.maxNumRetries = maxNumRetries;
+ this.delayBetweenRetries = delayBetweenRetries;
+ }
+
+ public String getHostname() {
+ return this.hostname;
+ }
+
+ public int getPort() {
+ return this.port;
+ }
+
+ public char getDelimiter() {
+ return this.delimiter;
+ }
+
+ public long getMaxNumRetries() {
+ return this.maxNumRetries;
+ }
+
+ public long getDelayBetweenRetries() {
+ return this.delayBetweenRetries;
+ }
+
+ @Override
+ public List<? extends UnboundedSource<String, C>> generateInitialSplits(int desiredNumSplits, PipelineOptions options) throws Exception {
+ return Collections.<UnboundedSource<String, C>>singletonList(this);
+ }
+
+ @Override
+ public UnboundedReader<String> createReader(PipelineOptions options, @Nullable C checkpointMark) {
+ return new UnboundedSocketReader(this);
+ }
+
+ @Nullable
+ @Override
+ public Coder getCheckpointMarkCoder() {
+ // Flink and Dataflow have different checkpointing mechanisms.
+ // In our case we do not need a coder.
+ return null;
+ }
+
+ @Override
+ public void validate() {
+ checkArgument(port > 0 && port < 65536, "port is out of range");
+ checkArgument(maxNumRetries >= -1, "maxNumRetries must be zero or larger (num retries), or -1 (infinite retries)");
+ checkArgument(delayBetweenRetries >= 0, "delayBetweenRetries must be zero or positive");
+ }
+
+ @Override
+ public Coder getDefaultOutputCoder() {
+ return DEFAULT_SOCKET_CODER;
+ }
+
+ public static class UnboundedSocketReader extends UnboundedSource.UnboundedReader<String> implements Serializable {
+
+ private static final long serialVersionUID = 7526472295622776147L;
+ private static final Logger LOG = LoggerFactory.getLogger(UnboundedSocketReader.class);
+
+ private final UnboundedSocketSource source;
+
+ private Socket socket;
+ private BufferedReader reader;
+
+ private boolean isRunning;
+
+ private String currentRecord;
+
+ public UnboundedSocketReader(UnboundedSocketSource source) {
+ this.source = source;
+ }
+
+ private void openConnection() throws IOException {
+ this.socket = new Socket();
+ this.socket.connect(new InetSocketAddress(this.source.getHostname(), this.source.getPort()), CONNECTION_TIMEOUT_TIME);
+ this.reader = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
+ this.isRunning = true;
+ }
+
+ @Override
+ public boolean start() throws IOException {
+ int attempt = 0;
+ while (!isRunning) {
+ try {
+ openConnection();
+ LOG.info("Connected to server socket " + this.source.getHostname() + ':' + this.source.getPort());
+
+ return advance();
+ } catch (IOException e) {
+ LOG.info("Lost connection to server socket " + this.source.getHostname() + ':' + this.source.getPort() + ". Retrying in " + this.source.getDelayBetweenRetries() + " msecs...");
+
+ if (this.source.getMaxNumRetries() == -1 || attempt++ < this.source.getMaxNumRetries()) {
+ try {
+ Thread.sleep(this.source.getDelayBetweenRetries());
+ } catch (InterruptedException e1) {
+ e1.printStackTrace();
+ }
+ } else {
+ this.isRunning = false;
+ break;
+ }
+ }
+ }
+ LOG.error("Unable to connect to host " + this.source.getHostname() + " : " + this.source.getPort());
+ return false;
+ }
+
+ @Override
+ public boolean advance() throws IOException {
+ final StringBuilder buffer = new StringBuilder();
+ int data;
+ while (isRunning && (data = reader.read()) != -1) {
+ // check if the string is complete
+ if (data != this.source.getDelimiter()) {
+ buffer.append((char) data);
+ } else {
+ if (buffer.length() > 0 && buffer.charAt(buffer.length() - 1) == '\r') {
+ buffer.setLength(buffer.length() - 1);
+ }
+ this.currentRecord = buffer.toString();
+ buffer.setLength(0);
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public byte[] getCurrentRecordId() throws NoSuchElementException {
+ return new byte[0];
+ }
+
+ @Override
+ public String getCurrent() throws NoSuchElementException {
+ return this.currentRecord;
+ }
+
+ @Override
+ public Instant getCurrentTimestamp() throws NoSuchElementException {
+ return Instant.now();
+ }
+
+ @Override
+ public void close() throws IOException {
+ this.reader.close();
+ this.socket.close();
+ this.isRunning = false;
+ LOG.info("Closed connection to server socket at " + this.source.getHostname() + ":" + this.source.getPort() + ".");
+ }
+
+ @Override
+ public Instant getWatermark() {
+ return Instant.now();
+ }
+
+ @Override
+ public CheckpointMark getCheckpointMark() {
+ return null;
+ }
+
+ @Override
+ public UnboundedSource<String, ?> getCurrentSource() {
+ return this.source;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
new file mode 100644
index 0000000..97084cf
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming.io;
+
+import com.google.cloud.dataflow.sdk.io.Read;
+import com.google.cloud.dataflow.sdk.io.UnboundedSource;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
+import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindow;
+import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.operators.StreamSource;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.operators.Triggerable;
+import org.joda.time.Instant;
+
+/**
+ * A wrapper for Beam's unbounded sources. This class wraps around a source implementing the {@link com.google.cloud.dataflow.sdk.io.Read.Unbounded}
+ * interface.
+ *
+ *</p>
+ * For now we support non-parallel, not checkpointed sources.
+ * */
+public class UnboundedSourceWrapper<T> extends RichSourceFunction<WindowedValue<T>> implements Triggerable {
+
+ private final String name;
+ private final UnboundedSource.UnboundedReader<T> reader;
+
+ private StreamingRuntimeContext runtime = null;
+ private StreamSource.ManualWatermarkContext<WindowedValue<T>> context = null;
+
+ private volatile boolean isRunning = false;
+
+ public UnboundedSourceWrapper(PipelineOptions options, Read.Unbounded<T> transform) {
+ this.name = transform.getName();
+ this.reader = transform.getSource().createReader(options, null);
+ }
+
+ public String getName() {
+ return this.name;
+ }
+
+ WindowedValue<T> makeWindowedValue(T output, Instant timestamp) {
+ if (timestamp == null) {
+ timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
+ }
+ return WindowedValue.of(output, timestamp, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING);
+ }
+
+ @Override
+ public void run(SourceContext<WindowedValue<T>> ctx) throws Exception {
+ if (!(ctx instanceof StreamSource.ManualWatermarkContext)) {
+ throw new RuntimeException("We assume that all sources in Dataflow are EventTimeSourceFunction. " +
+ "Apparently " + this.name + " is not. Probably you should consider writing your own Wrapper for this source.");
+ }
+
+ context = (StreamSource.ManualWatermarkContext<WindowedValue<T>>) ctx;
+ runtime = (StreamingRuntimeContext) getRuntimeContext();
+
+ this.isRunning = true;
+ boolean inputAvailable = reader.start();
+
+ setNextWatermarkTimer(this.runtime);
+
+ while (isRunning) {
+
+ while (!inputAvailable && isRunning) {
+ // wait a bit until we retry to pull more records
+ Thread.sleep(50);
+ inputAvailable = reader.advance();
+ }
+
+ if (inputAvailable) {
+
+ // get it and its timestamp from the source
+ T item = reader.getCurrent();
+ Instant timestamp = reader.getCurrentTimestamp();
+
+ // write it to the output collector
+ synchronized (ctx.getCheckpointLock()) {
+ context.collectWithTimestamp(makeWindowedValue(item, timestamp), timestamp.getMillis());
+ }
+
+ inputAvailable = reader.advance();
+ }
+
+ }
+ }
+
+ @Override
+ public void cancel() {
+ isRunning = false;
+ }
+
+ @Override
+ public void trigger(long timestamp) throws Exception {
+ if (this.isRunning) {
+ synchronized (context.getCheckpointLock()) {
+ long watermarkMillis = this.reader.getWatermark().getMillis();
+ context.emitWatermark(new Watermark(watermarkMillis));
+ }
+ setNextWatermarkTimer(this.runtime);
+ }
+ }
+
+ private void setNextWatermarkTimer(StreamingRuntimeContext runtime) {
+ if (this.isRunning) {
+ long watermarkInterval = runtime.getExecutionConfig().getAutoWatermarkInterval();
+ long timeToNextWatermark = getTimeToNextWaternark(watermarkInterval);
+ runtime.registerTimer(timeToNextWatermark, this);
+ }
+ }
+
+ private long getTimeToNextWaternark(long watermarkInterval) {
+ return System.currentTimeMillis() + watermarkInterval;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/AbstractFlinkTimerInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/AbstractFlinkTimerInternals.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/AbstractFlinkTimerInternals.java
new file mode 100644
index 0000000..fc75948
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/AbstractFlinkTimerInternals.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming.state;
+
+import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer;
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.coders.KvCoder;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
+import com.google.cloud.dataflow.sdk.util.TimerInternals;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import com.google.cloud.dataflow.sdk.values.KV;
+import org.joda.time.Instant;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * An implementation of Beam's {@link TimerInternals}, that also provides serialization functionality.
+ * The latter is used when snapshots of the current state are taken, for fault-tolerance.
+ * */
+public abstract class AbstractFlinkTimerInternals<K, VIN> implements TimerInternals, Serializable {
+ private Instant currentInputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
+ private Instant currentOutputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
+
+ public void setCurrentInputWatermark(Instant watermark) {
+ checkIfValidInputWatermark(watermark);
+ this.currentInputWatermark = watermark;
+ }
+
+ public void setCurrentOutputWatermark(Instant watermark) {
+ checkIfValidOutputWatermark(watermark);
+ this.currentOutputWatermark = watermark;
+ }
+
+ private void setCurrentInputWatermarkAfterRecovery(Instant watermark) {
+ if (!currentInputWatermark.isEqual(BoundedWindow.TIMESTAMP_MIN_VALUE)) {
+ throw new RuntimeException("Explicitly setting the input watermark is only allowed on " +
+ "initialization after recovery from a node failure. Apparently this is not " +
+ "the case here as the watermark is already set.");
+ }
+ this.currentInputWatermark = watermark;
+ }
+
+ private void setCurrentOutputWatermarkAfterRecovery(Instant watermark) {
+ if (!currentOutputWatermark.isEqual(BoundedWindow.TIMESTAMP_MIN_VALUE)) {
+ throw new RuntimeException("Explicitly setting the output watermark is only allowed on " +
+ "initialization after recovery from a node failure. Apparently this is not " +
+ "the case here as the watermark is already set.");
+ }
+ this.currentOutputWatermark = watermark;
+ }
+
+ @Override
+ public Instant currentProcessingTime() {
+ return Instant.now();
+ }
+
+ @Override
+ public Instant currentInputWatermarkTime() {
+ return currentInputWatermark;
+ }
+
+ @Nullable
+ @Override
+ public Instant currentSynchronizedProcessingTime() {
+ // TODO
+ return null;
+ }
+
+ @Override
+ public Instant currentOutputWatermarkTime() {
+ return currentOutputWatermark;
+ }
+
+ private void checkIfValidInputWatermark(Instant newWatermark) {
+ if (currentInputWatermark.isAfter(newWatermark)) {
+ throw new IllegalArgumentException(String.format(
+ "Cannot set current input watermark to %s. Newer watermarks " +
+ "must be no earlier than the current one (%s).",
+ newWatermark, currentInputWatermark));
+ }
+ }
+
+ private void checkIfValidOutputWatermark(Instant newWatermark) {
+ if (currentOutputWatermark.isAfter(newWatermark)) {
+ throw new IllegalArgumentException(String.format(
+ "Cannot set current output watermark to %s. Newer watermarks " +
+ "must be no earlier than the current one (%s).",
+ newWatermark, currentOutputWatermark));
+ }
+ }
+
+ public void encodeTimerInternals(DoFn.ProcessContext context,
+ StateCheckpointWriter writer,
+ KvCoder<K, VIN> kvCoder,
+ Coder<? extends BoundedWindow> windowCoder) throws IOException {
+ if (context == null) {
+ throw new RuntimeException("The Context has not been initialized.");
+ }
+
+ writer.setTimestamp(currentInputWatermark);
+ writer.setTimestamp(currentOutputWatermark);
+ }
+
+ public void restoreTimerInternals(StateCheckpointReader reader,
+ KvCoder<K, VIN> kvCoder,
+ Coder<? extends BoundedWindow> windowCoder) throws IOException {
+ setCurrentInputWatermarkAfterRecovery(reader.getTimestamp());
+ setCurrentOutputWatermarkAfterRecovery(reader.getTimestamp());
+ }
+}