You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2016/05/10 20:27:02 UTC
[1/2] incubator-beam git commit: [BEAM-103][BEAM-130] Make Flink
Source Parallel and Checkpointed
Repository: incubator-beam
Updated Branches:
refs/heads/master 874ddef05 -> 4020e3645
[BEAM-103][BEAM-130] Make Flink Source Parallel and Checkpointed
This also changes how the setParallelism option of FlinkPipelineOptions
behaves. Now it defaults to either 1 or the default parallelism set in
the Flink configuration.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/336e90fa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/336e90fa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/336e90fa
Branch: refs/heads/master
Commit: 336e90fa795885e670ff3a523a6fefa6033c2663
Parents: 874ddef
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Tue May 3 13:35:35 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue May 10 22:20:08 2016 +0200
----------------------------------------------------------------------
.../flink/DefaultParallelismFactory.java | 39 +++
.../runners/flink/FlinkPipelineOptions.java | 2 +-
.../FlinkStreamingTransformTranslators.java | 18 +-
.../streaming/io/UnboundedSourceWrapper.java | 337 ++++++++++++++++---
.../flink/streaming/TestCountingSource.java | 256 ++++++++++++++
.../flink/streaming/UnboundedSourceITCase.java | 208 ------------
.../streaming/UnboundedSourceWrapperTest.java | 324 ++++++++++++++++++
7 files changed, 916 insertions(+), 268 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/336e90fa/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/DefaultParallelismFactory.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/DefaultParallelismFactory.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/DefaultParallelismFactory.java
new file mode 100644
index 0000000..e512db0
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/DefaultParallelismFactory.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import org.apache.beam.sdk.options.DefaultValueFactory;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.GlobalConfiguration;
+
+/**
+ * {@link DefaultValueFactory} for getting a default value for the parallelism option
+ * on {@link FlinkPipelineOptions}.
+ *
+ * <p>This will return either the default value from {@link GlobalConfiguration} or {@code 1}.
+ * A valid {@link GlobalConfiguration} is only available if the program is executed by the Flink
+ * run scripts.
+ */
+public class DefaultParallelismFactory implements DefaultValueFactory<Integer> {
+ @Override
+ public Integer create(PipelineOptions options) {
+ return GlobalConfiguration.getInteger(ConfigConstants.DEFAULT_PARALLELISM_KEY, 1);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/336e90fa/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
index 8c82abd..fd86bc9 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
@@ -70,7 +70,7 @@ public interface FlinkPipelineOptions extends PipelineOptions, ApplicationNameOp
void setFlinkMaster(String value);
@Description("The degree of parallelism to be used when distributing operations onto workers.")
- @Default.Integer(-1)
+ @Default.InstanceFactory(DefaultParallelismFactory.class)
Integer getParallelism();
void setParallelism(Integer value);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/336e90fa/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
index 618727d..2778d5c 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
@@ -36,6 +36,7 @@ import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.Sink;
import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.Write;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Create;
@@ -284,8 +285,17 @@ public class FlinkStreamingTransformTranslators {
}
}).assignTimestampsAndWatermarks(new IngestionTimeExtractor<WindowedValue<T>>());
} else {
- source = context.getExecutionEnvironment()
- .addSource(new UnboundedSourceWrapper<>(context.getPipelineOptions(), transform));
+ try {
+ transform.getSource();
+ UnboundedSourceWrapper<T, ?> sourceWrapper =
+ new UnboundedSourceWrapper<>(
+ context.getPipelineOptions(),
+ transform.getSource(),
+ context.getExecutionEnvironment().getParallelism());
+ source = context.getExecutionEnvironment().addSource(sourceWrapper).name(transform.getName());
+ } catch (Exception e) {
+ throw new RuntimeException("Error while translating UnboundedSource: " + transform.getSource(), e);
+ }
}
context.setOutputDataStream(output, source);
@@ -310,7 +320,9 @@ public class FlinkStreamingTransformTranslators {
FlinkParDoBoundWrapper<IN, OUT> doFnWrapper = new FlinkParDoBoundWrapper<>(
context.getPipelineOptions(), windowingStrategy, transform.getFn());
DataStream<WindowedValue<IN>> inputDataStream = context.getInputDataStream(context.getInput(transform));
- SingleOutputStreamOperator<WindowedValue<OUT>> outDataStream = inputDataStream.flatMap(doFnWrapper)
+ SingleOutputStreamOperator<WindowedValue<OUT>> outDataStream = inputDataStream
+ .flatMap(doFnWrapper)
+ .name(transform.getName())
.returns(outputWindowedValueCoder);
context.setOutputDataStream(context.getOutput(transform), outDataStream);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/336e90fa/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
index 9d15a33..b816e2a 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
@@ -18,107 +18,286 @@
package org.apache.beam.runners.flink.translation.wrappers.streaming.io;
import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
-import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TypeDescriptor;
-import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+
+import org.apache.commons.io.output.ByteArrayOutputStream;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.util.ArrayList;
+import java.util.List;
/**
- * A wrapper for Beam's unbounded sources. This class wraps around a source implementing the
- * {@link org.apache.beam.sdk.io.Read.Unbounded} interface.
- *
- * For now we support non-parallel sources, checkpointing is WIP.
- * */
-public class UnboundedSourceWrapper<T> extends RichSourceFunction<WindowedValue<T>> implements Triggerable {
+ * Wrapper for executing {@link UnboundedSource UnboundedSources} as a Flink Source.
+ */
+public class UnboundedSourceWrapper<
+ OutputT, CheckpointMarkT extends UnboundedSource.CheckpointMark>
+ extends RichParallelSourceFunction<WindowedValue<OutputT>>
+ implements Triggerable, Checkpointed<byte[]> {
- private final String name;
- private final UnboundedSource<T, ?> source;
+ private static final Logger LOG = LoggerFactory.getLogger(UnboundedSourceWrapper.class);
- private StreamingRuntimeContext runtime = null;
- private StreamSource.ManualWatermarkContext<WindowedValue<T>> context = null;
+ /**
+ * Keep the options so that we can initialize the readers.
+ */
+ private final SerializedPipelineOptions serializedOptions;
- private volatile boolean isRunning = false;
+ /**
+ * For snapshot and restore.
+ */
+ private final ListCoder<
+ KV<? extends UnboundedSource<OutputT, CheckpointMarkT>, CheckpointMarkT>> checkpointCoder;
- private final SerializedPipelineOptions serializedOptions;
+ /**
+ * The split sources. We split them in the constructor to ensure that all parallel
+ * sources are consistent about the split sources.
+ */
+ private List<? extends UnboundedSource<OutputT, CheckpointMarkT>> splitSources;
- /** Instantiated during runtime **/
- private transient UnboundedSource.UnboundedReader<T> reader;
+ /**
+ * Make it a field so that we can access it in {@link #trigger(long)} for
+ * emitting watermarks.
+ */
+ private transient List<UnboundedSource.UnboundedReader<OutputT>> readers;
- public UnboundedSourceWrapper(PipelineOptions pipelineOptions, Read.Unbounded<T> transform) {
- this.name = transform.getName();
- this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
- this.source = transform.getSource();
- }
+ /**
+ * Initialize here and not in run() to prevent races where we cancel a job before run() is
+ * ever called or run() is called after cancel().
+ */
+ private volatile boolean isRunning = true;
- public String getName() {
- return this.name;
- }
+ /**
+ * Make it a field so that we can access it in {@link #trigger(long)} for registering new
+ * triggers.
+ */
+ private transient StreamingRuntimeContext runtimeContext;
- WindowedValue<T> makeWindowedValue(T output, Instant timestamp) {
- if (timestamp == null) {
- timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
+ /**
+ * Make it a field so that we can access it in {@link #trigger(long)} for emitting
+ * watermarks.
+ */
+ private transient StreamSource.ManualWatermarkContext<WindowedValue<OutputT>> context;
+
+ /**
+ * When restoring from a snapshot we put the restored sources/checkpoint marks here
+ * and open in {@link #open(Configuration)}.
+ */
+ private transient List<
+ KV<? extends UnboundedSource<OutputT, CheckpointMarkT>, CheckpointMarkT>> restoredState;
+
+ @SuppressWarnings("unchecked")
+ public UnboundedSourceWrapper(
+ PipelineOptions pipelineOptions,
+ UnboundedSource<OutputT, CheckpointMarkT> source,
+ int parallelism) throws Exception {
+ this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
+
+ if (source.requiresDeduping()) {
+ LOG.warn("Source {} requires deduping but Flink runner doesn't support this yet.", source);
}
- return WindowedValue.of(output, timestamp, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING);
+
+ Coder<CheckpointMarkT> checkpointMarkCoder = source.getCheckpointMarkCoder();
+ Coder<? extends UnboundedSource<OutputT, CheckpointMarkT>> sourceCoder =
+ SerializableCoder.of(new TypeDescriptor<UnboundedSource<OutputT, CheckpointMarkT>>() {});
+
+ checkpointCoder = (ListCoder) ListCoder.of(KvCoder.of(sourceCoder, checkpointMarkCoder));
+
+ // get the splits early. we assume that the generated splits are stable,
+ // this is necessary so that the mapping of state to source is correct
+ // when restoring
+ splitSources = source.generateInitialSplits(parallelism, pipelineOptions);
}
@Override
- public void run(SourceContext<WindowedValue<T>> ctx) throws Exception {
+ public void run(SourceContext<WindowedValue<OutputT>> ctx) throws Exception {
if (!(ctx instanceof StreamSource.ManualWatermarkContext)) {
throw new RuntimeException(
- "We assume that all sources in Dataflow are EventTimeSourceFunction. " +
- "Apparently " + this.name + " is not. " +
- "Probably you should consider writing your own Wrapper for this source.");
+ "Cannot emit watermarks, this hints at a misconfiguration/bug.");
}
- context = (StreamSource.ManualWatermarkContext<WindowedValue<T>>) ctx;
- runtime = (StreamingRuntimeContext) getRuntimeContext();
+ context = (StreamSource.ManualWatermarkContext<WindowedValue<OutputT>>) ctx;
+ runtimeContext = (StreamingRuntimeContext) getRuntimeContext();
- isRunning = true;
+ // figure out which split sources we're responsible for
+ int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
+ int numSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
- reader = source.createReader(serializedOptions.getPipelineOptions(), null);
+ List<UnboundedSource<OutputT, CheckpointMarkT>> localSources = new ArrayList<>();
- boolean inputAvailable = reader.start();
+ for (int i = 0; i < splitSources.size(); i++) {
+ if (i % numSubtasks == subtaskIndex) {
+ localSources.add(splitSources.get(i));
+ }
+ }
+
+ LOG.info("Unbounded Flink Source {}/{} is reading from sources: {}",
+ subtaskIndex,
+ numSubtasks,
+ localSources);
+
+ readers = new ArrayList<>();
+ if (restoredState != null) {
+
+ // restore the splitSources from the checkpoint to ensure consistent ordering
+ // do it using a transform because otherwise we would have to do
+ // unchecked casts
+ splitSources = Lists.transform(
+ restoredState,
+ new Function<
+ KV<? extends UnboundedSource<OutputT, CheckpointMarkT>, CheckpointMarkT>,
+ UnboundedSource<OutputT, CheckpointMarkT>>() {
+ @Override
+ public UnboundedSource<OutputT, CheckpointMarkT> apply(
+ KV<? extends UnboundedSource<OutputT, CheckpointMarkT>, CheckpointMarkT> input) {
+ return input.getKey();
+ }
+ });
+
+ for (KV<? extends UnboundedSource<OutputT, CheckpointMarkT>, CheckpointMarkT> restored:
+ restoredState) {
+ readers.add(
+ restored.getKey().createReader(
+ serializedOptions.getPipelineOptions(), restored.getValue()));
+ }
+ restoredState = null;
+ } else {
+ // initialize readers from scratch
+ for (UnboundedSource<OutputT, CheckpointMarkT> source : localSources) {
+ readers.add(source.createReader(serializedOptions.getPipelineOptions(), null));
+ }
+ }
+
+ if (readers.size() == 0) {
+ // do nothing, but still look busy ...
+ // also, output a Long.MAX_VALUE watermark since we know that we're not
+ // going to emit anything
+ // we can't return here since Flink requires that all operators stay up,
+ // otherwise checkpointing would not work correctly anymore
+ ctx.emitWatermark(new Watermark(Long.MAX_VALUE));
- setNextWatermarkTimer(this.runtime);
+ // wait until this is canceled
+ final Object waitLock = new Object();
+ while (isRunning) {
+ try {
+ // Flink will interrupt us at some point
+ //noinspection SynchronizationOnLocalVariableOrMethodParameter
+ synchronized (waitLock) {
+ waitLock.wait();
+ }
+ } catch (InterruptedException e) {
+ if (!isRunning) {
+ // restore the interrupted state, and fall through the loop
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ } else if (readers.size() == 1) {
+ // the easy case, we just read from one reader
+ UnboundedSource.UnboundedReader<OutputT> reader = readers.get(0);
+ boolean dataAvailable = reader.start();
+ if (dataAvailable) {
+ emitElement(ctx, reader);
+ }
- try {
+ setNextWatermarkTimer(this.runtimeContext);
while (isRunning) {
+ dataAvailable = reader.advance();
- if (!inputAvailable && isRunning) {
- // wait a bit until we retry to pull more records
+ if (dataAvailable) {
+ emitElement(ctx, reader);
+ } else {
Thread.sleep(50);
- inputAvailable = reader.advance();
}
+ }
+ } else {
+ // a bit more complicated, we are responsible for several readers
+ // loop through them and sleep if none of them had any data
- if (inputAvailable) {
+ int numReaders = readers.size();
+ int currentReader = 0;
- // get it and its timestamp from the source
- T item = reader.getCurrent();
- Instant timestamp = reader.getCurrentTimestamp();
+ // start each reader and emit data if immediately available
+ for (UnboundedSource.UnboundedReader<OutputT> reader : readers) {
+ boolean dataAvailable = reader.start();
+ if (dataAvailable) {
+ emitElement(ctx, reader);
+ }
+ }
- // write it to the output collector
- synchronized (ctx.getCheckpointLock()) {
- context.collectWithTimestamp(makeWindowedValue(item, timestamp), timestamp.getMillis());
- }
+ // a flag telling us whether any of the readers had data
+ // if no reader had data, sleep for bit
+ boolean hadData = false;
+ while (isRunning) {
+ UnboundedSource.UnboundedReader<OutputT> reader = readers.get(currentReader);
+ boolean dataAvailable = reader.advance();
- inputAvailable = reader.advance();
+ if (dataAvailable) {
+ emitElement(ctx, reader);
+ hadData = true;
+ }
+
+ currentReader = (currentReader + 1) % numReaders;
+ if (currentReader == 0 && !hadData) {
+ Thread.sleep(50);
+ } else if (currentReader == 0) {
+ hadData = false;
}
}
- } finally {
- reader.close();
+ }
+ }
+
+ /**
+ * Emit the current element from the given Reader. The reader is guaranteed to have data.
+ */
+ private void emitElement(
+ SourceContext<WindowedValue<OutputT>> ctx,
+ UnboundedSource.UnboundedReader<OutputT> reader) {
+ // make sure that reader state update and element emission are atomic
+ // with respect to snapshots
+ synchronized (ctx.getCheckpointLock()) {
+
+ OutputT item = reader.getCurrent();
+ Instant timestamp = reader.getCurrentTimestamp();
+
+ WindowedValue<OutputT> windowedValue =
+ WindowedValue.of(item, timestamp, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING);
+ ctx.collectWithTimestamp(windowedValue, timestamp.getMillis());
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ if (readers != null) {
+ for (UnboundedSource.UnboundedReader<OutputT> reader: readers) {
+ reader.close();
+ }
}
}
@@ -128,13 +307,52 @@ public class UnboundedSourceWrapper<T> extends RichSourceFunction<WindowedValue<
}
@Override
+ public byte[] snapshotState(long l, long l1) throws Exception {
+ // we checkpoint the sources along with the CheckpointMarkT to ensure
+ // than we have a correct mapping of checkpoints to sources when
+ // restoring
+ List<KV<? extends UnboundedSource<OutputT, CheckpointMarkT>, CheckpointMarkT>> checkpoints =
+ new ArrayList<>();
+
+ for (int i = 0; i < splitSources.size(); i++) {
+ UnboundedSource<OutputT, CheckpointMarkT> source = splitSources.get(i);
+ UnboundedSource.UnboundedReader<OutputT> reader = readers.get(i);
+
+ @SuppressWarnings("unchecked")
+ CheckpointMarkT mark = (CheckpointMarkT) reader.getCheckpointMark();
+ KV<UnboundedSource<OutputT, CheckpointMarkT>, CheckpointMarkT> kv =
+ KV.of(source, mark);
+ checkpoints.add(kv);
+ }
+
+ try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+ checkpointCoder.encode(checkpoints, baos, Coder.Context.OUTER);
+ return baos.toByteArray();
+ }
+ }
+
+ @Override
+ public void restoreState(byte[] bytes) throws Exception {
+ try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes)) {
+ restoredState = checkpointCoder.decode(bais, Coder.Context.OUTER);
+ }
+ }
+
+ @Override
public void trigger(long timestamp) throws Exception {
if (this.isRunning) {
synchronized (context.getCheckpointLock()) {
- long watermarkMillis = this.reader.getWatermark().getMillis();
+ // find minimum watermark over all readers
+ long watermarkMillis = Long.MAX_VALUE;
+ for (UnboundedSource.UnboundedReader<OutputT> reader: readers) {
+ Instant watermark = reader.getWatermark();
+ if (watermark != null) {
+ watermarkMillis = Math.min(watermark.getMillis(), watermarkMillis);
+ }
+ }
context.emitWatermark(new Watermark(watermarkMillis));
}
- setNextWatermarkTimer(this.runtime);
+ setNextWatermarkTimer(this.runtimeContext);
}
}
@@ -150,4 +368,11 @@ public class UnboundedSourceWrapper<T> extends RichSourceFunction<WindowedValue<
return System.currentTimeMillis() + watermarkInterval;
}
+ /**
+ * Visible so that we can check this in tests. Must not be used for anything else.
+ */
+ @VisibleForTesting
+ public List<? extends UnboundedSource<OutputT, CheckpointMarkT>> getSplitSources() {
+ return splitSources;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/336e90fa/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java
new file mode 100644
index 0000000..3ced02e
--- /dev/null
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.streaming;
+
+import static org.apache.beam.sdk.util.CoderUtils.encodeToByteArray;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.DelegateCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.values.KV;
+
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import javax.annotation.Nullable;
+
+/**
+ * An unbounded source for testing the unbounded sources framework code.
+ *
+ * <p>Each split of this sources produces records of the form KV(split_id, i),
+ * where i counts up from 0. Each record has a timestamp of i, and the watermark
+ * accurately tracks these timestamps. The reader will occasionally return false
+ * from {@code advance}, in order to simulate a source where not all the data is
+ * available immediately.
+ */
+public class TestCountingSource
+ extends UnboundedSource<KV<Integer, Integer>, TestCountingSource.CounterMark> {
+ private static final Logger LOG = LoggerFactory.getLogger(TestCountingSource.class);
+
+ private static List<Integer> finalizeTracker;
+ private final int numMessagesPerShard;
+ private final int shardNumber;
+ private final boolean dedup;
+ private final boolean throwOnFirstSnapshot;
+ private final boolean allowSplitting;
+
+ /**
+ * We only allow an exception to be thrown from getCheckpointMark
+ * at most once. This must be static since the entire TestCountingSource
+ * instance may re-serialized when the pipeline recovers and retries.
+ */
+ private static boolean thrown = false;
+
+ public static void setFinalizeTracker(List<Integer> finalizeTracker) {
+ TestCountingSource.finalizeTracker = finalizeTracker;
+ }
+
+ public TestCountingSource(int numMessagesPerShard) {
+ this(numMessagesPerShard, 0, false, false, true);
+ }
+
+ public TestCountingSource withDedup() {
+ return new TestCountingSource(
+ numMessagesPerShard, shardNumber, true, throwOnFirstSnapshot, true);
+ }
+
+ private TestCountingSource withShardNumber(int shardNumber) {
+ return new TestCountingSource(
+ numMessagesPerShard, shardNumber, dedup, throwOnFirstSnapshot, true);
+ }
+
+ public TestCountingSource withThrowOnFirstSnapshot(boolean throwOnFirstSnapshot) {
+ return new TestCountingSource(
+ numMessagesPerShard, shardNumber, dedup, throwOnFirstSnapshot, true);
+ }
+
+ public TestCountingSource withoutSplitting() {
+ return new TestCountingSource(
+ numMessagesPerShard, shardNumber, dedup, throwOnFirstSnapshot, false);
+ }
+
+ private TestCountingSource(int numMessagesPerShard, int shardNumber, boolean dedup,
+ boolean throwOnFirstSnapshot, boolean allowSplitting) {
+ this.numMessagesPerShard = numMessagesPerShard;
+ this.shardNumber = shardNumber;
+ this.dedup = dedup;
+ this.throwOnFirstSnapshot = throwOnFirstSnapshot;
+ this.allowSplitting = allowSplitting;
+ }
+
+ public int getShardNumber() {
+ return shardNumber;
+ }
+
+ @Override
+ public List<TestCountingSource> generateInitialSplits(
+ int desiredNumSplits, PipelineOptions options) {
+ List<TestCountingSource> splits = new ArrayList<>();
+ int numSplits = allowSplitting ? desiredNumSplits : 1;
+ for (int i = 0; i < numSplits; i++) {
+ splits.add(withShardNumber(i));
+ }
+ return splits;
+ }
+
+ class CounterMark implements UnboundedSource.CheckpointMark {
+ int current;
+
+ public CounterMark(int current) {
+ this.current = current;
+ }
+
+ @Override
+ public void finalizeCheckpoint() {
+ if (finalizeTracker != null) {
+ finalizeTracker.add(current);
+ }
+ }
+ }
+
+ @Override
+ public Coder<CounterMark> getCheckpointMarkCoder() {
+ return DelegateCoder.of(
+ VarIntCoder.of(),
+ new DelegateCoder.CodingFunction<CounterMark, Integer>() {
+ @Override
+ public Integer apply(CounterMark input) {
+ return input.current;
+ }
+ },
+ new DelegateCoder.CodingFunction<Integer, CounterMark>() {
+ @Override
+ public CounterMark apply(Integer input) {
+ return new CounterMark(input);
+ }
+ });
+ }
+
+ @Override
+ public boolean requiresDeduping() {
+ return dedup;
+ }
+
+ /**
+ * Public only so that the checkpoint can be conveyed from {@link #getCheckpointMark()} to
+ * {@link TestCountingSource#createReader(PipelineOptions, CounterMark)} without cast.
+ */
+ public class CountingSourceReader extends UnboundedReader<KV<Integer, Integer>> {
+ private int current;
+
+ public CountingSourceReader(int startingPoint) {
+ this.current = startingPoint;
+ }
+
+ @Override
+ public boolean start() {
+ return advance();
+ }
+
+ @Override
+ public boolean advance() {
+ if (current >= numMessagesPerShard - 1) {
+ return false;
+ }
+ // If testing dedup, occasionally insert a duplicate value;
+ if (current >= 0 && dedup && ThreadLocalRandom.current().nextInt(5) == 0) {
+ return true;
+ }
+ current++;
+ return true;
+ }
+
+ @Override
+ public KV<Integer, Integer> getCurrent() {
+ return KV.of(shardNumber, current);
+ }
+
+ @Override
+ public Instant getCurrentTimestamp() {
+ return new Instant(current);
+ }
+
+ @Override
+ public byte[] getCurrentRecordId() {
+ try {
+ return encodeToByteArray(KvCoder.of(VarIntCoder.of(), VarIntCoder.of()), getCurrent());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void close() {}
+
+ @Override
+ public TestCountingSource getCurrentSource() {
+ return TestCountingSource.this;
+ }
+
+ @Override
+ public Instant getWatermark() {
+ // The watermark is a promise about future elements, and the timestamps of elements are
+ // strictly increasing for this source.
+ return new Instant(current + 1);
+ }
+
+ @Override
+ public CounterMark getCheckpointMark() {
+ if (throwOnFirstSnapshot && !thrown) {
+ thrown = true;
+ LOG.error("Throwing exception while checkpointing counter");
+ throw new RuntimeException("failed during checkpoint");
+ }
+ // The checkpoint can assume all records read, including the current, have
+ // been commited.
+ return new CounterMark(current);
+ }
+
+ @Override
+ public long getSplitBacklogBytes() {
+ return 7L;
+ }
+ }
+
+ @Override
+ public CountingSourceReader createReader(
+ PipelineOptions options, @Nullable CounterMark checkpointMark) {
+ if (checkpointMark == null) {
+ LOG.debug("creating reader");
+ } else {
+ LOG.debug("restoring reader from checkpoint with current = {}", checkpointMark.current);
+ }
+ return new CountingSourceReader(checkpointMark != null ? checkpointMark.current : -1);
+ }
+
+ @Override
+ public void validate() {}
+
+ @Override
+ public Coder<KV<Integer, Integer>> getDefaultOutputCoder() {
+ return KvCoder.of(VarIntCoder.of(), VarIntCoder.of());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/336e90fa/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceITCase.java
deleted file mode 100644
index 8a5de15..0000000
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceITCase.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.streaming;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.fail;
-
-import org.apache.beam.runners.flink.FlinkTestPipeline;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.windowing.AfterPane;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.values.PCollection;
-
-import com.google.common.base.Joiner;
-import com.google.common.collect.ImmutableList;
-
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
-import org.joda.time.Instant;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-import javax.annotation.Nullable;
-
-
-public class UnboundedSourceITCase extends StreamingProgramTestBase {
-
- protected static String resultPath;
-
- public UnboundedSourceITCase() {
- }
-
- static final String[] EXPECTED_RESULT = new String[]{
- "1", "2", "3", "4", "5", "6", "7", "8", "9"};
-
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
- }
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
- }
-
- @Override
- protected void testProgram() throws Exception {
- runProgram(resultPath);
- }
-
- private static void runProgram(String resultPath) {
-
- Pipeline p = FlinkTestPipeline.createForStreaming();
-
- PCollection<String> result = p
- .apply(Read.from(new RangeReadSource(1, 10)))
- .apply(Window.<Integer>into(new GlobalWindows())
- .triggering(AfterPane.elementCountAtLeast(10))
- .discardingFiredPanes())
- .apply(ParDo.of(new DoFn<Integer, String>() {
- @Override
- public void processElement(ProcessContext c) throws Exception {
- c.output(c.element().toString());
- }
- }));
-
- result.apply(TextIO.Write.to(resultPath));
-
- try {
- p.run();
- fail();
- } catch(Exception e) {
- assertEquals("The source terminates as expected.", e.getCause().getCause().getMessage());
- }
- }
-
-
- private static class RangeReadSource extends UnboundedSource<Integer, UnboundedSource.CheckpointMark> {
-
- final int from;
- final int to;
-
- RangeReadSource(int from, int to) {
- this.from = from;
- this.to = to;
- }
-
-
- @Override
- public List<? extends UnboundedSource<Integer, CheckpointMark>> generateInitialSplits(
- int desiredNumSplits, PipelineOptions options) throws Exception {
- return ImmutableList.of(this);
- }
-
- @Override
- public UnboundedReader<Integer> createReader(PipelineOptions options, @Nullable CheckpointMark checkpointMark) {
- return new RangeReadReader(options);
- }
-
- @Nullable
- @Override
- public Coder<CheckpointMark> getCheckpointMarkCoder() {
- return null;
- }
-
- @Override
- public void validate() {
- }
-
- @Override
- public Coder<Integer> getDefaultOutputCoder() {
- return BigEndianIntegerCoder.of();
- }
-
- private class RangeReadReader extends UnboundedReader<Integer> {
-
- private int current;
-
- private long watermark;
-
- public RangeReadReader(PipelineOptions options) {
- assertNotNull(options);
- current = from;
- }
-
- @Override
- public boolean start() throws IOException {
- return true;
- }
-
- @Override
- public boolean advance() throws IOException {
- current++;
- watermark++;
-
- if (current >= to) {
- try {
- compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
- throw new IOException("The source terminates as expected.");
- } catch (IOException e) {
- // pass on the exception to terminate the source
- throw e;
- } catch (Throwable t) {
- // expected here from the file check
- }
- }
- return current < to;
- }
-
- @Override
- public Integer getCurrent() throws NoSuchElementException {
- return current;
- }
-
- @Override
- public Instant getCurrentTimestamp() throws NoSuchElementException {
- return new Instant(current);
- }
-
- @Override
- public void close() throws IOException {
- }
-
- @Override
- public Instant getWatermark() {
- return new Instant(watermark);
- }
-
- @Override
- public CheckpointMark getCheckpointMark() {
- return null;
- }
-
- @Override
- public UnboundedSource<Integer, ?> getCurrentSource() {
- return RangeReadSource.this;
- }
- }
- }
-}
-
-
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/336e90fa/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
new file mode 100644
index 0000000..f5a52f5
--- /dev/null
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.streaming;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamSource;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Tests for {@link UnboundedSourceWrapper}.
+ */
+public class UnboundedSourceWrapperTest {
+
+ /**
+ * Creates a {@link UnboundedSourceWrapper} that has exactly one reader per source, since we
+ * specify a parallelism of 1 and also at runtime tell the source that it has 1 parallel subtask.
+ */
+ @Test
+ public void testWithOneReader() throws Exception {
+ final int NUM_ELEMENTS = 20;
+ final Object checkpointLock = new Object();
+ PipelineOptions options = PipelineOptionsFactory.create();
+
+ // this source will emit exactly NUM_ELEMENTS across all parallel readers,
+ // afterwards it will stall. We check whether we also receive NUM_ELEMENTS
+ // elements later.
+ TestCountingSource source = new TestCountingSource(NUM_ELEMENTS);
+ UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark> flinkWrapper =
+ new UnboundedSourceWrapper<>(options, source, 1);
+
+ assertEquals(1, flinkWrapper.getSplitSources().size());
+
+ StreamSource<
+ WindowedValue<KV<Integer, Integer>>,
+ UnboundedSourceWrapper<
+ KV<Integer, Integer>,
+ TestCountingSource.CounterMark>> sourceOperator = new StreamSource<>(flinkWrapper);
+
+ setupSourceOperator(sourceOperator);
+
+
+ try {
+ sourceOperator.run(checkpointLock,
+ new Output<StreamRecord<WindowedValue<KV<Integer, Integer>>>>() {
+ private int count = 0;
+
+ @Override
+ public void emitWatermark(Watermark watermark) {
+ }
+
+ @Override
+ public void collect(
+ StreamRecord<WindowedValue<KV<Integer,Integer>>> windowedValueStreamRecord) {
+
+ count++;
+ if (count >= NUM_ELEMENTS) {
+ throw new SuccessException();
+ }
+ }
+
+ @Override
+ public void close() {
+
+ }
+ });
+ } catch (SuccessException e) {
+ // success
+ } catch (Exception e) {
+ fail("We caught " + e);
+ }
+ }
+
+ /**
+ * Creates a {@link UnboundedSourceWrapper} that has multiple readers per source, since we
+ * specify a parallelism higher than 1 and at runtime tell the source that it has 1 parallel
+ * this means that one source will manage multiple readers.
+ */
+ @Test
+ public void testWithMultipleReaders() throws Exception {
+ final int NUM_ELEMENTS = 20;
+ final Object checkpointLock = new Object();
+ PipelineOptions options = PipelineOptionsFactory.create();
+
+ // this source will emit exactly NUM_ELEMENTS across all parallel readers,
+ // afterwards it will stall. We check whether we also receive NUM_ELEMENTS
+ // elements later.
+ TestCountingSource source = new TestCountingSource(NUM_ELEMENTS);
+ UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark> flinkWrapper =
+ new UnboundedSourceWrapper<>(options, source, 4);
+
+ assertEquals(4, flinkWrapper.getSplitSources().size());
+
+ StreamSource<WindowedValue<
+ KV<Integer, Integer>>,
+ UnboundedSourceWrapper<
+ KV<Integer, Integer>,
+ TestCountingSource.CounterMark>> sourceOperator = new StreamSource<>(flinkWrapper);
+
+ setupSourceOperator(sourceOperator);
+
+
+ try {
+ sourceOperator.run(checkpointLock,
+ new Output<StreamRecord<WindowedValue<KV<Integer, Integer>>>>() {
+ private int count = 0;
+
+ @Override
+ public void emitWatermark(Watermark watermark) {
+ }
+
+ @Override
+ public void collect(
+ StreamRecord<WindowedValue<KV<Integer,Integer>>> windowedValueStreamRecord) {
+
+ count++;
+ if (count >= NUM_ELEMENTS) {
+ throw new SuccessException();
+ }
+ }
+
+ @Override
+ public void close() {
+
+ }
+ });
+ } catch (SuccessException e) {
+ // success
+ return;
+ }
+ fail("Read terminated without producing expected number of outputs");
+ }
+
+ /**
+ * Verify that snapshot/restore work as expected. We bring up a source and cancel
+ * after seeing a certain number of elements. Then we snapshot that source,
+ * bring up a completely new source that we restore from the snapshot and verify
+ * that we see all expected elements in the end.
+ */
+ @Test
+ public void testRestore() throws Exception {
+ final int NUM_ELEMENTS = 20;
+ final Object checkpointLock = new Object();
+ PipelineOptions options = PipelineOptionsFactory.create();
+
+ // this source will emit exactly NUM_ELEMENTS across all parallel readers,
+ // afterwards it will stall. We check whether we also receive NUM_ELEMENTS
+ // elements later.
+ TestCountingSource source = new TestCountingSource(NUM_ELEMENTS);
+ UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark> flinkWrapper =
+ new UnboundedSourceWrapper<>(options, source, 1);
+
+ assertEquals(1, flinkWrapper.getSplitSources().size());
+
+ StreamSource<
+ WindowedValue<KV<Integer, Integer>>,
+ UnboundedSourceWrapper<
+ KV<Integer, Integer>,
+ TestCountingSource.CounterMark>> sourceOperator = new StreamSource<>(flinkWrapper);
+
+ setupSourceOperator(sourceOperator);
+
+ final Set<KV<Integer, Integer>> emittedElements = new HashSet<>();
+
+ boolean readFirstBatchOfElements = false;
+
+ try {
+ sourceOperator.run(checkpointLock,
+ new Output<StreamRecord<WindowedValue<KV<Integer, Integer>>>>() {
+ private int count = 0;
+
+ @Override
+ public void emitWatermark(Watermark watermark) {
+ }
+
+ @Override
+ public void collect(
+ StreamRecord<WindowedValue<KV<Integer,Integer>>> windowedValueStreamRecord) {
+
+ emittedElements.add(windowedValueStreamRecord.getValue().getValue());
+ count++;
+ if (count >= NUM_ELEMENTS / 2) {
+ throw new SuccessException();
+ }
+ }
+
+ @Override
+ public void close() {
+
+ }
+ });
+ } catch (SuccessException e) {
+ // success
+ readFirstBatchOfElements = true;
+ }
+
+ assertTrue("Did not successfully read first batch of elements.", readFirstBatchOfElements);
+
+ // draw a snapshot
+ byte[] snapshot = flinkWrapper.snapshotState(0, 0);
+
+ // create a completely new source but restore from the snapshot
+ TestCountingSource restoredSource = new TestCountingSource(NUM_ELEMENTS);
+ UnboundedSourceWrapper<
+ KV<Integer, Integer>, TestCountingSource.CounterMark> restoredFlinkWrapper =
+ new UnboundedSourceWrapper<>(options, restoredSource, 1);
+
+ assertEquals(1, restoredFlinkWrapper.getSplitSources().size());
+
+ StreamSource<
+ WindowedValue<KV<Integer, Integer>>,
+ UnboundedSourceWrapper<
+ KV<Integer, Integer>,
+ TestCountingSource.CounterMark>> restoredSourceOperator =
+ new StreamSource<>(restoredFlinkWrapper);
+
+ setupSourceOperator(restoredSourceOperator);
+
+ // restore snapshot
+ restoredFlinkWrapper.restoreState(snapshot);
+
+ boolean readSecondBatchOfElements = false;
+
+ // run again and verify that we see the other elements
+ try {
+ restoredSourceOperator.run(checkpointLock,
+ new Output<StreamRecord<WindowedValue<KV<Integer, Integer>>>>() {
+ private int count = 0;
+
+ @Override
+ public void emitWatermark(Watermark watermark) {
+ }
+
+ @Override
+ public void collect(
+ StreamRecord<WindowedValue<KV<Integer,Integer>>> windowedValueStreamRecord) {
+ emittedElements.add(windowedValueStreamRecord.getValue().getValue());
+ count++;
+ if (count >= NUM_ELEMENTS / 2) {
+ throw new SuccessException();
+ }
+ }
+
+ @Override
+ public void close() {
+
+ }
+ });
+ } catch (SuccessException e) {
+ // success
+ readSecondBatchOfElements = true;
+ }
+
+ assertTrue("Did not successfully read second batch of elements.", readSecondBatchOfElements);
+
+ // verify that we saw all NUM_ELEMENTS elements
+ assertTrue(emittedElements.size() == NUM_ELEMENTS);
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <T> void setupSourceOperator(StreamSource<T, ?> operator) {
+ ExecutionConfig executionConfig = new ExecutionConfig();
+ StreamConfig cfg = new StreamConfig(new Configuration());
+
+ cfg.setTimeCharacteristic(TimeCharacteristic.EventTime);
+
+ Environment env = new DummyEnvironment("MockTwoInputTask", 1, 0);
+
+ StreamTask<?, ?> mockTask = mock(StreamTask.class);
+ when(mockTask.getName()).thenReturn("Mock Task");
+ when(mockTask.getCheckpointLock()).thenReturn(new Object());
+ when(mockTask.getConfiguration()).thenReturn(cfg);
+ when(mockTask.getEnvironment()).thenReturn(env);
+ when(mockTask.getExecutionConfig()).thenReturn(executionConfig);
+ when(mockTask.getAccumulatorMap()).thenReturn(Collections.<String, Accumulator<?, ?>>emptyMap());
+
+ operator.setup(mockTask, cfg, (Output< StreamRecord<T>>) mock(Output.class));
+ }
+
+ /**
+ * A special {@link RuntimeException} that we throw to signal that the test was successful.
+ */
+ private static class SuccessException extends RuntimeException {}
+}
[2/2] incubator-beam git commit: This Closes #274
Posted by al...@apache.org.
This Closes #274
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4020e364
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4020e364
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4020e364
Branch: refs/heads/master
Commit: 4020e364598befbd68458f1c6eada6d90a986358
Parents: 874ddef 336e90f
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Tue May 10 22:20:47 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue May 10 22:20:47 2016 +0200
----------------------------------------------------------------------
.../flink/DefaultParallelismFactory.java | 39 +++
.../runners/flink/FlinkPipelineOptions.java | 2 +-
.../FlinkStreamingTransformTranslators.java | 18 +-
.../streaming/io/UnboundedSourceWrapper.java | 337 ++++++++++++++++---
.../flink/streaming/TestCountingSource.java | 256 ++++++++++++++
.../flink/streaming/UnboundedSourceITCase.java | 208 ------------
.../streaming/UnboundedSourceWrapperTest.java | 324 ++++++++++++++++++
7 files changed, 916 insertions(+), 268 deletions(-)
----------------------------------------------------------------------