You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/08/24 19:52:48 UTC
[10/17] incubator-beam git commit: [BEAM-102] Add Side Inputs in
Flink Streaming Runner
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dfbdc6c2/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index e273132..092a226 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -17,9 +17,13 @@
*/
package org.apache.beam.runners.flink.translation.wrappers.streaming;
+import avro.shaded.com.google.common.base.Preconditions;
+
+import org.apache.beam.runners.core.SideInputHandler;
import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine;
@@ -27,9 +31,13 @@ import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.DoFnRunner;
import org.apache.beam.sdk.util.DoFnRunners;
import org.apache.beam.sdk.util.ExecutionContext;
+import org.apache.beam.sdk.util.NullSideInputReader;
+import org.apache.beam.sdk.util.PushbackSideInputDoFnRunner;
+import org.apache.beam.sdk.util.SideInputReader;
import org.apache.beam.sdk.util.TimerInternals;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
@@ -37,15 +45,36 @@ import org.apache.beam.sdk.util.state.StateInternals;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
+import com.google.common.collect.Iterables;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.VoidSerializer;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.KvStateSnapshot;
+import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
import java.io.IOException;
import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -58,7 +87,8 @@ import java.util.Map;
*/
public class DoFnOperator<InputT, FnOutputT, OutputT>
extends AbstractStreamOperator<OutputT>
- implements OneInputStreamOperator<WindowedValue<InputT>, OutputT> {
+ implements OneInputStreamOperator<WindowedValue<InputT>, OutputT>,
+ TwoInputStreamOperator<WindowedValue<InputT>, RawUnionValue, OutputT> {
protected OldDoFn<InputT, FnOutputT> doFn;
protected final SerializedPipelineOptions serializedOptions;
@@ -66,7 +96,8 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
protected final TupleTag<FnOutputT> mainOutputTag;
protected final List<TupleTag<?>> sideOutputTags;
- protected final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs;
+ protected final Collection<PCollectionView<?>> sideInputs;
+ protected final Map<Integer, PCollectionView<?>> sideInputTagMapping;
protected final boolean hasSideInputs;
@@ -74,25 +105,36 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
protected final OutputManagerFactory<OutputT> outputManagerFactory;
- protected transient DoFnRunner<InputT, FnOutputT> doFnRunner;
+ protected transient PushbackSideInputDoFnRunner<InputT, FnOutputT> pushbackDoFnRunner;
- /**
- * To keep track of the current watermark so that we can immediately fire if a trigger
- * registers an event time callback for a timestamp that lies in the past.
- */
- protected transient long currentWatermark = Long.MIN_VALUE;
+ protected transient SideInputHandler sideInputHandler;
+
+ protected transient long currentInputWatermark;
+
+ protected transient long currentOutputWatermark;
+
+ private transient AbstractStateBackend sideInputStateBackend;
+
+ private final ReducingStateDescriptor<Long> pushedBackWatermarkDescriptor;
+
+ private final ListStateDescriptor<WindowedValue<InputT>> pushedBackDescriptor;
+
+ private transient Map<String, KvStateSnapshot<?, ?, ?, ?, ?>> restoredSideInputState;
public DoFnOperator(
OldDoFn<InputT, FnOutputT> doFn,
+ TypeInformation<WindowedValue<InputT>> inputType,
TupleTag<FnOutputT> mainOutputTag,
List<TupleTag<?>> sideOutputTags,
OutputManagerFactory<OutputT> outputManagerFactory,
WindowingStrategy<?, ?> windowingStrategy,
- Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs,
+ Map<Integer, PCollectionView<?>> sideInputTagMapping,
+ Collection<PCollectionView<?>> sideInputs,
PipelineOptions options) {
this.doFn = doFn;
this.mainOutputTag = mainOutputTag;
this.sideOutputTags = sideOutputTags;
+ this.sideInputTagMapping = sideInputTagMapping;
this.sideInputs = sideInputs;
this.serializedOptions = new SerializedPipelineOptions(options);
this.windowingStrategy = windowingStrategy;
@@ -100,6 +142,15 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
this.hasSideInputs = !sideInputs.isEmpty();
+ this.pushedBackWatermarkDescriptor =
+ new ReducingStateDescriptor<>(
+ "pushed-back-elements-watermark-hold",
+ new LongMinReducer(),
+ LongSerializer.INSTANCE);
+
+ this.pushedBackDescriptor =
+ new ListStateDescriptor<>("pushed-back-values", inputType);
+
setChainingStrategy(ChainingStrategy.ALWAYS);
}
@@ -119,7 +170,10 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
this.doFn = getDoFn();
- Aggregator.AggregatorFactory aggregatorFactory = new Aggregator.AggregatorFactory() {
+ currentInputWatermark = Long.MIN_VALUE;
+ currentOutputWatermark = currentInputWatermark;
+
+ Aggregator.AggregatorFactory aggregatorFactory = new Aggregator.AggregatorFactory() {
@Override
public <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn(
Class<?> fnClass,
@@ -134,10 +188,42 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
}
};
- doFnRunner = DoFnRunners.createDefault(
+ SideInputReader sideInputReader = NullSideInputReader.of(sideInputs);
+ if (!sideInputs.isEmpty()) {
+ String operatorIdentifier =
+ this.getClass().getSimpleName() + "_"
+ + getRuntimeContext().getIndexOfThisSubtask() + "_sideInput";
+
+ sideInputStateBackend = this
+ .getContainingTask()
+ .createStateBackend(operatorIdentifier,
+ new GenericTypeInfo<>(ByteBuffer.class).createSerializer(new ExecutionConfig()));
+
+ Preconditions.checkState(
+ sideInputStateBackend != null,
+ "Side input state backend cannot be bull");
+
+ if (restoredSideInputState != null) {
+ @SuppressWarnings("unchecked,rawtypes")
+ HashMap<String, KvStateSnapshot> castRestored = (HashMap) restoredSideInputState;
+ sideInputStateBackend.injectKeyValueStateSnapshots(castRestored, 0L);
+ restoredSideInputState = null;
+ }
+
+ sideInputStateBackend.setCurrentKey(
+ ByteBuffer.wrap(CoderUtils.encodeToByteArray(VoidCoder.of(), null)));
+
+ StateInternals<Void> sideInputStateInternals =
+ new FlinkStateInternals<>(sideInputStateBackend, VoidCoder.of());
+
+ sideInputHandler = new SideInputHandler(sideInputs, sideInputStateInternals);
+ sideInputReader = sideInputHandler;
+ }
+
+ DoFnRunner<InputT, FnOutputT> doFnRunner = DoFnRunners.createDefault(
serializedOptions.getPipelineOptions(),
doFn,
- null,
+ sideInputReader,
outputManagerFactory.create(output),
mainOutputTag,
sideOutputTags,
@@ -145,25 +231,177 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
aggregatorFactory,
windowingStrategy);
- doFnRunner.startBundle();
+ pushbackDoFnRunner =
+ PushbackSideInputDoFnRunner.create(doFnRunner, sideInputs, sideInputHandler);
+
doFn.setup();
}
@Override
public void close() throws Exception {
super.close();
- doFnRunner.finishBundle();
doFn.teardown();
}
+ protected final long getPushbackWatermarkHold() {
+ // if we don't have side inputs we never hold the watermark
+ if (sideInputs.isEmpty()) {
+ return Long.MAX_VALUE;
+ }
+
+ try {
+ Long result = sideInputStateBackend.getPartitionedState(
+ null,
+ VoidSerializer.INSTANCE,
+ pushedBackWatermarkDescriptor).get();
+ return result != null ? result : Long.MAX_VALUE;
+ } catch (Exception e) {
+ throw new RuntimeException("Error retrieving pushed back watermark state.", e);
+ }
+ }
+
@Override
- public final void processElement(StreamRecord<WindowedValue<InputT>> streamRecord) throws Exception {
- doFnRunner.processElement(streamRecord.getValue());
+ public final void processElement(
+ StreamRecord<WindowedValue<InputT>> streamRecord) throws Exception {
+ pushbackDoFnRunner.startBundle();
+ pushbackDoFnRunner.processElement(streamRecord.getValue());
+ pushbackDoFnRunner.finishBundle();
+ }
+
+ @Override
+ public final void processElement1(
+ StreamRecord<WindowedValue<InputT>> streamRecord) throws Exception {
+ pushbackDoFnRunner.startBundle();
+ Iterable<WindowedValue<InputT>> justPushedBack =
+ pushbackDoFnRunner.processElementInReadyWindows(streamRecord.getValue());
+
+ ListState<WindowedValue<InputT>> pushedBack =
+ sideInputStateBackend.getPartitionedState(
+ null,
+ VoidSerializer.INSTANCE,
+ pushedBackDescriptor);
+
+ ReducingState<Long> pushedBackWatermark =
+ sideInputStateBackend.getPartitionedState(
+ null,
+ VoidSerializer.INSTANCE,
+ pushedBackWatermarkDescriptor);
+
+ for (WindowedValue<InputT> pushedBackValue : justPushedBack) {
+ pushedBackWatermark.add(pushedBackValue.getTimestamp().getMillis());
+ pushedBack.add(pushedBackValue);
+ }
+ pushbackDoFnRunner.finishBundle();
+ }
+
+ @Override
+ public final void processElement2(
+ StreamRecord<RawUnionValue> streamRecord) throws Exception {
+ pushbackDoFnRunner.startBundle();
+
+ @SuppressWarnings("unchecked")
+ WindowedValue<Iterable<?>> value =
+ (WindowedValue<Iterable<?>>) streamRecord.getValue().getValue();
+
+ PCollectionView<?> sideInput = sideInputTagMapping.get(streamRecord.getValue().getUnionTag());
+ sideInputHandler.addSideInputValue(sideInput, value);
+
+ ListState<WindowedValue<InputT>> pushedBack =
+ sideInputStateBackend.getPartitionedState(
+ null,
+ VoidSerializer.INSTANCE,
+ pushedBackDescriptor);
+
+ List<WindowedValue<InputT>> newPushedBack = new ArrayList<>();
+ for (WindowedValue<InputT> elem: pushedBack.get()) {
+
+ // we need to set the correct key in case the operator is
+ // a (keyed) window operator
+ setKeyContextElement1(new StreamRecord<>(elem));
+
+ Iterable<WindowedValue<InputT>> justPushedBack =
+ pushbackDoFnRunner.processElementInReadyWindows(elem);
+ Iterables.addAll(newPushedBack, justPushedBack);
+ }
+
+
+ ReducingState<Long> pushedBackWatermark =
+ sideInputStateBackend.getPartitionedState(
+ null,
+ VoidSerializer.INSTANCE,
+ pushedBackWatermarkDescriptor);
+
+ pushedBack.clear();
+ pushedBackWatermark.clear();
+ for (WindowedValue<InputT> pushedBackValue : newPushedBack) {
+ pushedBackWatermark.add(pushedBackValue.getTimestamp().getMillis());
+ pushedBack.add(pushedBackValue);
+ }
+
+ pushbackDoFnRunner.finishBundle();
+
+ // maybe output a new watermark
+ processWatermark1(new Watermark(currentInputWatermark));
}
@Override
public void processWatermark(Watermark mark) throws Exception {
- output.emitWatermark(mark);
+ processWatermark1(mark);
+ }
+
+ @Override
+ public void processWatermark1(Watermark mark) throws Exception {
+ this.currentInputWatermark = mark.getTimestamp();
+ long potentialOutputWatermark =
+ Math.min(getPushbackWatermarkHold(), currentInputWatermark);
+ if (potentialOutputWatermark > currentOutputWatermark) {
+ currentOutputWatermark = potentialOutputWatermark;
+ output.emitWatermark(new Watermark(currentOutputWatermark));
+ }
+ }
+
+ @Override
+ public void processWatermark2(Watermark mark) throws Exception {
+ // ignore watermarks from the side-input input
+ }
+
+ @Override
+ public StreamTaskState snapshotOperatorState(
+ long checkpointId,
+ long timestamp) throws Exception {
+
+ StreamTaskState streamTaskState = super.snapshotOperatorState(checkpointId, timestamp);
+
+ if (sideInputStateBackend != null) {
+ // we have to manually checkpoint the side-input state backend and store
+ // the handle in the "user state" of the task state
+ HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> sideInputSnapshot =
+ sideInputStateBackend.snapshotPartitionedState(checkpointId, timestamp);
+
+ if (sideInputSnapshot != null) {
+ @SuppressWarnings("unchecked,rawtypes")
+ StateHandle<Serializable> sideInputStateHandle =
+ (StateHandle) sideInputStateBackend.checkpointStateSerializable(
+ sideInputSnapshot, checkpointId, timestamp);
+
+ streamTaskState.setFunctionState(sideInputStateHandle);
+ }
+ }
+
+ return streamTaskState;
+ }
+
+ @Override
+ public void restoreState(StreamTaskState state, long recoveryTimestamp) throws Exception {
+ super.restoreState(state, recoveryTimestamp);
+
+ @SuppressWarnings("unchecked,rawtypes")
+ StateHandle<HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>>> sideInputStateHandle =
+ (StateHandle) state.getFunctionState();
+
+ if (sideInputStateHandle != null) {
+ restoredSideInputState = sideInputStateHandle.getState(getUserCodeClassloader());
+ }
}
/**
@@ -223,6 +461,16 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
}
/**
+ * For determining the pushback watermark in a {@link ReducingStateDescriptor}.
+ */
+ private static class LongMinReducer implements ReduceFunction<Long> {
+ @Override
+ public Long reduce(Long a, Long b) throws Exception {
+ return Math.min(a, b);
+ }
+ }
+
+ /**
* {@link StepContext} for running {@link DoFn DoFns} on Flink. This does not allow
* accessing state or timer internals.
*/
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dfbdc6c2/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
index 0a279cc..73c1eed 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
@@ -38,6 +38,7 @@ import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.runtime.state.AbstractStateBackend;
@@ -52,6 +53,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
+import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
@@ -71,12 +73,6 @@ import javax.annotation.Nullable;
public class WindowDoFnOperator<K, InputT, OutputT>
extends DoFnOperator<KeyedWorkItem<K, InputT>, KV<K, OutputT>, WindowedValue<KV<K, OutputT>>> {
- /**
- * To keep track of the current watermark so that we can immediately fire if a trigger
- * registers an event time callback for a timestamp that lies in the past.
- */
- private transient long currentWatermark = Long.MIN_VALUE;
-
private final Coder<K> keyCoder;
private final TimerInternals.TimerDataCoder timerCoder;
@@ -89,19 +85,23 @@ public class WindowDoFnOperator<K, InputT, OutputT>
public WindowDoFnOperator(
SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow> systemReduceFn,
+ TypeInformation<WindowedValue<KeyedWorkItem<K, InputT>>> inputType,
TupleTag<KV<K, OutputT>> mainOutputTag,
List<TupleTag<?>> sideOutputTags,
OutputManagerFactory<WindowedValue<KV<K, OutputT>>> outputManagerFactory,
WindowingStrategy<?, ?> windowingStrategy,
- Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs,
+ Map<Integer, PCollectionView<?>> sideInputTagMapping,
+ Collection<PCollectionView<?>> sideInputs,
PipelineOptions options,
Coder<K> keyCoder) {
super(
null,
+ inputType,
mainOutputTag,
sideOutputTags,
outputManagerFactory,
windowingStrategy,
+ sideInputTagMapping,
sideInputs,
options);
@@ -184,21 +184,33 @@ public class WindowDoFnOperator<K, InputT, OutputT>
@Override
public void processWatermark(Watermark mark) throws Exception {
- this.currentWatermark = mark.getTimestamp();
+ processWatermark1(mark);
+ }
+
+ @Override
+ public void processWatermark1(Watermark mark) throws Exception {
+ pushbackDoFnRunner.startBundle();
+
+ this.currentInputWatermark = mark.getTimestamp();
+
+ // hold back by the pushed back values waiting for side inputs
+ long actualInputWatermark = Math.min(getPushbackWatermarkHold(), mark.getTimestamp());
boolean fire;
do {
Tuple2<ByteBuffer, TimerInternals.TimerData> timer = watermarkTimersQueue.peek();
- if (timer != null && timer.f1.getTimestamp().getMillis() <= mark.getTimestamp()) {
+ if (timer != null && timer.f1.getTimestamp().getMillis() < actualInputWatermark) {
fire = true;
+ System.out.println("FIRING: " + timer);
+
watermarkTimersQueue.remove();
watermarkTimers.remove(timer);
setKeyContext(timer.f0);
- doFnRunner.processElement(WindowedValue.valueInGlobalWindow(
+ pushbackDoFnRunner.processElement(WindowedValue.valueInGlobalWindow(
KeyedWorkItems.<K, InputT>timersWorkItem(
stateInternals.getKey(),
Collections.singletonList(timer.f1))));
@@ -210,9 +222,16 @@ public class WindowDoFnOperator<K, InputT, OutputT>
Instant watermarkHold = stateInternals.watermarkHold();
- long outputWatermark = Math.min(currentWatermark, watermarkHold.getMillis());
+ long combinedWatermarkHold = Math.min(watermarkHold.getMillis(), getPushbackWatermarkHold());
+
+ long potentialOutputWatermark = Math.min(currentInputWatermark, combinedWatermarkHold);
+
+ if (potentialOutputWatermark > currentOutputWatermark) {
+ currentOutputWatermark = potentialOutputWatermark;
+ output.emitWatermark(new Watermark(currentOutputWatermark));
+ }
+ pushbackDoFnRunner.finishBundle();
- output.emitWatermark(new Watermark(outputWatermark));
}
@Override
@@ -311,13 +330,13 @@ public class WindowDoFnOperator<K, InputT, OutputT>
@Override
public Instant currentInputWatermarkTime() {
- return new Instant(currentWatermark);
+ return new Instant(Math.min(currentInputWatermark, getPushbackWatermarkHold()));
}
@Nullable
@Override
public Instant currentOutputWatermarkTime() {
- return new Instant(currentWatermark);
+ return new Instant(currentOutputWatermark);
}
};
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dfbdc6c2/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
new file mode 100644
index 0000000..9d983b0
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming.io;
+
+import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.operators.StreamSource;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Wrapper for executing {@link BoundedSource UnboundedSources} as a Flink Source.
+ */
+public class BoundedSourceWrapper<OutputT>
+ extends RichParallelSourceFunction<WindowedValue<OutputT>> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BoundedSourceWrapper.class);
+
+ /**
+ * Keep the options so that we can initialize the readers.
+ */
+ private final SerializedPipelineOptions serializedOptions;
+
+ /**
+ * The split sources. We split them in the constructor to ensure that all parallel
+ * sources are consistent about the split sources.
+ */
+ private List<? extends BoundedSource<OutputT>> splitSources;
+
+ /**
+ * Make it a field so that we can access it in {@link #close()}.
+ */
+ private transient List<BoundedSource.BoundedReader<OutputT>> readers;
+
+ /**
+ * Initialize here and not in run() to prevent races where we cancel a job before run() is
+ * ever called or run() is called after cancel().
+ */
+ private volatile boolean isRunning = true;
+
+ @SuppressWarnings("unchecked")
+ public BoundedSourceWrapper(
+ PipelineOptions pipelineOptions,
+ BoundedSource<OutputT> source,
+ int parallelism) throws Exception {
+ this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
+
+ long desiredBundleSize = source.getEstimatedSizeBytes(pipelineOptions) / parallelism;
+
+ // get the splits early. we assume that the generated splits are stable,
+ // this is necessary so that the mapping of state to source is correct
+ // when restoring
+ splitSources = source.splitIntoBundles(desiredBundleSize, pipelineOptions);
+ }
+
+ @Override
+ public void run(SourceContext<WindowedValue<OutputT>> ctx) throws Exception {
+ if (!(ctx instanceof StreamSource.ManualWatermarkContext)) {
+ throw new RuntimeException(
+ "Cannot emit watermarks, this hints at a misconfiguration/bug.");
+ }
+
+ // figure out which split sources we're responsible for
+ int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
+ int numSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
+
+ List<BoundedSource<OutputT>> localSources = new ArrayList<>();
+
+ for (int i = 0; i < splitSources.size(); i++) {
+ if (i % numSubtasks == subtaskIndex) {
+ localSources.add(splitSources.get(i));
+ }
+ }
+
+ LOG.info("Bounded Flink Source {}/{} is reading from sources: {}",
+ subtaskIndex,
+ numSubtasks,
+ localSources);
+
+ readers = new ArrayList<>();
+ // initialize readers from scratch
+ for (BoundedSource<OutputT> source : localSources) {
+ readers.add(source.createReader(serializedOptions.getPipelineOptions()));
+ }
+
+ if (readers.size() == 1) {
+ // the easy case, we just read from one reader
+ BoundedSource.BoundedReader<OutputT> reader = readers.get(0);
+
+ boolean dataAvailable = reader.start();
+ if (dataAvailable) {
+ emitElement(ctx, reader);
+ }
+
+ while (isRunning) {
+ dataAvailable = reader.advance();
+
+ if (dataAvailable) {
+ emitElement(ctx, reader);
+ } else {
+ break;
+ }
+ }
+ } else {
+ // a bit more complicated, we are responsible for several readers
+ // loop through them and sleep if none of them had any data
+
+ int currentReader = 0;
+
+ // start each reader and emit data if immediately available
+ for (BoundedSource.BoundedReader<OutputT> reader : readers) {
+ boolean dataAvailable = reader.start();
+ if (dataAvailable) {
+ emitElement(ctx, reader);
+ }
+ }
+
+ // a flag telling us whether any of the readers had data
+ // if no reader had data, sleep for bit
+ boolean hadData = false;
+ while (isRunning && !readers.isEmpty()) {
+ BoundedSource.BoundedReader<OutputT> reader = readers.get(currentReader);
+ boolean dataAvailable = reader.advance();
+
+ if (dataAvailable) {
+ emitElement(ctx, reader);
+ hadData = true;
+ } else {
+ readers.remove(currentReader);
+ currentReader--;
+ if (readers.isEmpty()) {
+ break;
+ }
+ }
+
+ currentReader = (currentReader + 1) % readers.size();
+ if (currentReader == 0 && !hadData) {
+ Thread.sleep(50);
+ } else if (currentReader == 0) {
+ hadData = false;
+ }
+ }
+
+ }
+
+ // emit final Long.MAX_VALUE watermark, just to be sure
+ ctx.emitWatermark(new Watermark(Long.MAX_VALUE));
+ }
+
+ /**
+ * Emit the current element from the given Reader. The reader is guaranteed to have data.
+ */
+ private void emitElement(
+ SourceContext<WindowedValue<OutputT>> ctx,
+ BoundedSource.BoundedReader<OutputT> reader) {
+ // make sure that reader state update and element emission are atomic
+ // with respect to snapshots
+ synchronized (ctx.getCheckpointLock()) {
+
+ OutputT item = reader.getCurrent();
+ Instant timestamp = reader.getCurrentTimestamp();
+
+ WindowedValue<OutputT> windowedValue =
+ WindowedValue.of(item, timestamp, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING);
+ ctx.collectWithTimestamp(windowedValue, timestamp.getMillis());
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ if (readers != null) {
+ for (BoundedSource.BoundedReader<OutputT> reader: readers) {
+ reader.close();
+ }
+ }
+ }
+
+ @Override
+ public void cancel() {
+ isRunning = false;
+ }
+
+ /**
+ * Visible so that we can check this in tests. Must not be used for anything else.
+ */
+ @VisibleForTesting
+ public List<? extends BoundedSource<OutputT>> getSplitSources() {
+ return splitSources;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dfbdc6c2/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java
deleted file mode 100644
index 0d72f65..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers.streaming.io;
-
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.WindowedValue;
-
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.util.Collector;
-import org.joda.time.Instant;
-
-import java.io.ByteArrayInputStream;
-import java.util.List;
-
-/**
- * This flat map function bootstraps from collection elements and turns them into WindowedValues
- * (as required by the Flink runner).
- */
-public class FlinkStreamingCreateFunction<IN, OUT> implements FlatMapFunction<IN, WindowedValue<OUT>> {
-
- private final List<byte[]> elements;
- private final Coder<OUT> coder;
-
- public FlinkStreamingCreateFunction(List<byte[]> elements, Coder<OUT> coder) {
- this.elements = elements;
- this.coder = coder;
- }
-
- @Override
- public void flatMap(IN value, Collector<WindowedValue<OUT>> out) throws Exception {
-
- for (byte[] element : elements) {
- ByteArrayInputStream bai = new ByteArrayInputStream(element);
- OUT outValue = coder.decode(bai, Coder.Context.OUTER);
-
- out.collect(WindowedValue.of(outValue, Instant.now(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dfbdc6c2/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
new file mode 100644
index 0000000..fb1b1e8
--- /dev/null
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.streaming;
+
+import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
+import static org.junit.Assert.assertThat;
+
+import org.apache.beam.runners.flink.FlinkPipelineOptions;
+import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
+import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.PCollectionViewTesting;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+
+import com.google.common.collect.ImmutableList;
+
+import org.apache.flink.shaded.com.google.common.base.Function;
+import org.apache.flink.shaded.com.google.common.base.Predicate;
+import org.apache.flink.shaded.com.google.common.collect.FluentIterable;
+import org.apache.flink.shaded.com.google.common.collect.ImmutableMap;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.util.Collections;
+import java.util.HashMap;
+import javax.annotation.Nullable;
+
+/**
+ * Tests for {@link DoFnOperator}.
+ */
+@RunWith(JUnit4.class)
+public class DoFnOperatorTest {
+
+ // views and windows for testing side inputs
+ private static final long WINDOW_MSECS_1 = 100;
+ private static final long WINDOW_MSECS_2 = 500;
+
+ private WindowingStrategy<Object, IntervalWindow> windowingStrategy1 =
+ WindowingStrategy.of(FixedWindows.of(new Duration(WINDOW_MSECS_1)));
+
+ private PCollectionView<Iterable<String>> view1 = PCollectionViewTesting.testingView(
+ new TupleTag<Iterable<WindowedValue<String>>>() {},
+ new PCollectionViewTesting.IdentityViewFn<String>(),
+ StringUtf8Coder.of(),
+ windowingStrategy1);
+
+ private WindowingStrategy<Object, IntervalWindow> windowingStrategy2 =
+ WindowingStrategy.of(FixedWindows.of(new Duration(WINDOW_MSECS_2)));
+
+ private PCollectionView<Iterable<String>> view2 = PCollectionViewTesting.testingView(
+ new TupleTag<Iterable<WindowedValue<String>>>() {},
+ new PCollectionViewTesting.IdentityViewFn<String>(),
+ StringUtf8Coder.of(),
+ windowingStrategy2);
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testSingleOutput() throws Exception {
+
+ WindowedValue.ValueOnlyWindowedValueCoder<String> windowedValueCoder =
+ WindowedValue.getValueOnlyCoder(StringUtf8Coder.of());
+
+ CoderTypeInformation<WindowedValue<String>> coderTypeInfo =
+ new CoderTypeInformation<>(windowedValueCoder);
+
+ TupleTag<String> outputTag = new TupleTag<>("main-output");
+
+ DoFnOperator<String, String, String> doFnOperator = new DoFnOperator<>(
+ new IdentityDoFn<String>(),
+ coderTypeInfo,
+ outputTag,
+ Collections.<TupleTag<?>>emptyList(),
+ new DoFnOperator.DefaultOutputManagerFactory(),
+ WindowingStrategy.globalDefault(),
+ new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */
+ Collections.<PCollectionView<?>>emptyList(), /* side inputs */
+ PipelineOptionsFactory.as(FlinkPipelineOptions.class));
+
+ OneInputStreamOperatorTestHarness<WindowedValue<String>, String> testHarness =
+ new OneInputStreamOperatorTestHarness<>(doFnOperator);
+
+ testHarness.open();
+
+ testHarness.processElement(new StreamRecord<>(WindowedValue.valueInGlobalWindow("Hello")));
+
+ assertThat(
+ this.<String>stripStreamRecordFromWindowedValue(testHarness.getOutput()),
+ contains(WindowedValue.valueInGlobalWindow("Hello")));
+
+ testHarness.close();
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testMultiOutputOutput() throws Exception {
+
+ WindowedValue.ValueOnlyWindowedValueCoder<String> windowedValueCoder =
+ WindowedValue.getValueOnlyCoder(StringUtf8Coder.of());
+
+ CoderTypeInformation<WindowedValue<String>> coderTypeInfo =
+ new CoderTypeInformation<>(windowedValueCoder);
+
+ TupleTag<String> mainOutput = new TupleTag<>("main-output");
+ TupleTag<String> sideOutput1 = new TupleTag<>("side-output-1");
+ TupleTag<String> sideOutput2 = new TupleTag<>("side-output-2");
+ ImmutableMap<TupleTag<?>, Integer> outputMapping = ImmutableMap.<TupleTag<?>, Integer>builder()
+ .put(mainOutput, 1)
+ .put(sideOutput1, 2)
+ .put(sideOutput2, 3)
+ .build();
+
+ DoFnOperator<String, String, RawUnionValue> doFnOperator = new DoFnOperator<>(
+ new MultiOutputDoFn(sideOutput1, sideOutput2),
+ coderTypeInfo,
+ mainOutput,
+ ImmutableList.<TupleTag<?>>of(sideOutput1, sideOutput2),
+ new DoFnOperator.MultiOutputOutputManagerFactory(outputMapping),
+ WindowingStrategy.globalDefault(),
+ new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */
+ Collections.<PCollectionView<?>>emptyList(), /* side inputs */
+ PipelineOptionsFactory.as(FlinkPipelineOptions.class));
+
+ OneInputStreamOperatorTestHarness<WindowedValue<String>, RawUnionValue> testHarness =
+ new OneInputStreamOperatorTestHarness<>(doFnOperator);
+
+ testHarness.open();
+
+ testHarness.processElement(new StreamRecord<>(WindowedValue.valueInGlobalWindow("one")));
+ testHarness.processElement(new StreamRecord<>(WindowedValue.valueInGlobalWindow("two")));
+ testHarness.processElement(new StreamRecord<>(WindowedValue.valueInGlobalWindow("hello")));
+
+ assertThat(
+ this.stripStreamRecordFromRawUnion(testHarness.getOutput()),
+ contains(
+ new RawUnionValue(2, WindowedValue.valueInGlobalWindow("side: one")),
+ new RawUnionValue(3, WindowedValue.valueInGlobalWindow("side: two")),
+ new RawUnionValue(1, WindowedValue.valueInGlobalWindow("got: hello")),
+ new RawUnionValue(2, WindowedValue.valueInGlobalWindow("got: hello")),
+ new RawUnionValue(3, WindowedValue.valueInGlobalWindow("got: hello"))));
+
+ testHarness.close();
+ }
+
+ /**
+ * For now, this test doesn't work because {@link TwoInputStreamOperatorTestHarness} is not
+ * sufficiently well equipped to handle more complex operators that require a state backend.
+ * We have to revisit this once we update to a newer version of Flink and also add some more
+ * tests that verify pushback behaviour and watermark hold behaviour.
+ *
+ * <p>The behaviour that we would test here is also exercised by the
+ * {@link org.apache.beam.sdk.testing.RunnableOnService} tests, so the code is not untested.
+ */
+ @Test
+ @Ignore
+ @SuppressWarnings("unchecked")
+ public void testSideInputs() throws Exception {
+
+ WindowedValue.ValueOnlyWindowedValueCoder<String> windowedValueCoder =
+ WindowedValue.getValueOnlyCoder(StringUtf8Coder.of());
+
+ CoderTypeInformation<WindowedValue<String>> coderTypeInfo =
+ new CoderTypeInformation<>(windowedValueCoder);
+
+ TupleTag<String> outputTag = new TupleTag<>("main-output");
+
+ ImmutableMap<Integer, PCollectionView<?>> sideInputMapping =
+ ImmutableMap.<Integer, PCollectionView<?>>builder()
+ .put(1, view1)
+ .put(2, view2)
+ .build();
+
+ DoFnOperator<String, String, String> doFnOperator = new DoFnOperator<>(
+ new IdentityDoFn<String>(),
+ coderTypeInfo,
+ outputTag,
+ Collections.<TupleTag<?>>emptyList(),
+ new DoFnOperator.DefaultOutputManagerFactory(),
+ WindowingStrategy.globalDefault(),
+ sideInputMapping, /* side-input mapping */
+ ImmutableList.<PCollectionView<?>>of(view1, view2), /* side inputs */
+ PipelineOptionsFactory.as(FlinkPipelineOptions.class));
+
+ TwoInputStreamOperatorTestHarness<WindowedValue<String>, RawUnionValue, String> testHarness =
+ new TwoInputStreamOperatorTestHarness<>(doFnOperator);
+
+ testHarness.open();
+
+ IntervalWindow firstWindow = new IntervalWindow(new Instant(0), new Instant(100));
+
+ // push in some side-input elements
+ testHarness.processElement2(
+ new StreamRecord<>(
+ new RawUnionValue(
+ 1,
+ valuesInWindow(ImmutableList.of("hello", "ciao"), new Instant(0), firstWindow))));
+
+ testHarness.processElement2(
+ new StreamRecord<>(
+ new RawUnionValue(
+ 2,
+ valuesInWindow(ImmutableList.of("foo", "bar"), new Instant(0), firstWindow))));
+
+ // push in a regular elements
+ testHarness.processElement1(new StreamRecord<>(WindowedValue.valueInGlobalWindow("Hello")));
+
+ assertThat(
+ this.<String>stripStreamRecordFromWindowedValue(testHarness.getOutput()),
+ contains(WindowedValue.valueInGlobalWindow("Hello")));
+
+ testHarness.close();
+ }
+
+ private <T> Iterable<WindowedValue<T>> stripStreamRecordFromWindowedValue(
+ Iterable<Object> input) {
+
+ return FluentIterable.from(input).filter(new Predicate<Object>() {
+ @Override
+ public boolean apply(@Nullable Object o) {
+ return o instanceof StreamRecord && ((StreamRecord) o).getValue() instanceof WindowedValue;
+ }
+ }).transform(new Function<Object, WindowedValue<T>>() {
+ @Nullable
+ @Override
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ public WindowedValue<T> apply(@Nullable Object o) {
+ if (o instanceof StreamRecord && ((StreamRecord) o).getValue() instanceof WindowedValue) {
+ return (WindowedValue) ((StreamRecord) o).getValue();
+ }
+ throw new RuntimeException("unreachable");
+ }
+ });
+ }
+
+ private Iterable<RawUnionValue> stripStreamRecordFromRawUnion(Iterable<Object> input) {
+ return FluentIterable.from(input).filter(new Predicate<Object>() {
+ @Override
+ public boolean apply(@Nullable Object o) {
+ return o instanceof StreamRecord && ((StreamRecord) o).getValue() instanceof RawUnionValue;
+ }
+ }).transform(new Function<Object, RawUnionValue>() {
+ @Nullable
+ @Override
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ public RawUnionValue apply(@Nullable Object o) {
+ if (o instanceof StreamRecord && ((StreamRecord) o).getValue() instanceof RawUnionValue) {
+ return (RawUnionValue) ((StreamRecord) o).getValue();
+ }
+ throw new RuntimeException("unreachable");
+ }
+ });
+ }
+
+ private static class MultiOutputDoFn extends OldDoFn<String, String> {
+ private TupleTag<String> sideOutput1;
+ private TupleTag<String> sideOutput2;
+
+ public MultiOutputDoFn(TupleTag<String> sideOutput1, TupleTag<String> sideOutput2) {
+ this.sideOutput1 = sideOutput1;
+ this.sideOutput2 = sideOutput2;
+ }
+
+ @Override
+ public void processElement(ProcessContext c) throws Exception {
+ if (c.element().equals("one")) {
+ c.sideOutput(sideOutput1, "side: one");
+ } else if (c.element().equals("two")) {
+ c.sideOutput(sideOutput2, "side: two");
+ } else {
+ c.output("got: " + c.element());
+ c.sideOutput(sideOutput1, "got: " + c.element());
+ c.sideOutput(sideOutput2, "got: " + c.element());
+ }
+ }
+ }
+
+ private static class IdentityDoFn<T> extends OldDoFn<T, T> {
+ @Override
+ public void processElement(OldDoFn<T, T>.ProcessContext c) throws Exception {
+ c.output(c.element());
+ }
+ }
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ private WindowedValue<Iterable<?>> valuesInWindow(
+ Iterable<?> values, Instant timestamp, BoundedWindow window) {
+ return (WindowedValue) WindowedValue.of(values, timestamp, window, PaneInfo.NO_FIRING);
+ }
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ private <T> WindowedValue<T> valueInWindow(
+ T value, Instant timestamp, BoundedWindow window) {
+ return WindowedValue.of(value, timestamp, window, PaneInfo.NO_FIRING);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dfbdc6c2/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
index f5a52f5..1122179 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
@@ -91,7 +91,7 @@ public class UnboundedSourceWrapperTest {
@Override
public void collect(
- StreamRecord<WindowedValue<KV<Integer,Integer>>> windowedValueStreamRecord) {
+ StreamRecord<WindowedValue<KV<Integer, Integer>>> windowedValueStreamRecord) {
count++;
if (count >= NUM_ELEMENTS) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dfbdc6c2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/RawUnionValue.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/RawUnionValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/RawUnionValue.java
index 159b100..07bfe69 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/RawUnionValue.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/RawUnionValue.java
@@ -48,4 +48,29 @@ public class RawUnionValue {
public String toString() {
return unionTag + ":" + value;
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ RawUnionValue that = (RawUnionValue) o;
+
+ if (unionTag != that.unionTag) {
+ return false;
+ }
+ return value != null ? value.equals(that.value) : that.value == null;
+
+ }
+
+ @Override
+ public int hashCode() {
+ int result = unionTag;
+ result = 31 * result + (value != null ? value.hashCode() : 0);
+ return result;
+ }
}