You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mx...@apache.org on 2016/09/09 14:12:27 UTC
[1/5] incubator-beam git commit: [BEAM-619] keep track of local split
sources in UnboundedSourceWrapper
Repository: incubator-beam
Updated Branches:
refs/heads/master a96ea98a4 -> b6205ffa3
[BEAM-619] keep track of local split sources in UnboundedSourceWrapper
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/145ad47d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/145ad47d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/145ad47d
Branch: refs/heads/master
Commit: 145ad47d9f945f816be7a91001cdf7cb3b6a7fac
Parents: be689df
Author: Maximilian Michels <mx...@apache.org>
Authored: Wed Sep 7 13:07:15 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Wed Sep 7 13:15:54 2016 +0200
----------------------------------------------------------------------
.../streaming/io/UnboundedSourceWrapper.java | 79 +++++++++++---------
1 file changed, 43 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/145ad47d/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
index 8647322..2cd06ed 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
@@ -58,7 +58,7 @@ public class UnboundedSourceWrapper<
private static final Logger LOG = LoggerFactory.getLogger(UnboundedSourceWrapper.class);
/**
- * Keep the options so that we can initialize the readers.
+ * Keep the options so that we can initialize the localReaders.
*/
private final SerializedPipelineOptions serializedOptions;
@@ -72,13 +72,19 @@ public class UnboundedSourceWrapper<
* The split sources. We split them in the constructor to ensure that all parallel
* sources are consistent about the split sources.
*/
- private List<? extends UnboundedSource<OutputT, CheckpointMarkT>> splitSources;
+ private final List<? extends UnboundedSource<OutputT, CheckpointMarkT>> splitSources;
/**
+ * The local split sources. Assigned at runtime when the wrapper is executed in parallel.
+ */
+ private transient List<UnboundedSource<OutputT, CheckpointMarkT>> localSplitSources;
+
+ /**
+ * The local split readers. Assigned at runtime when the wrapper is executed in parallel.
* Make it a field so that we can access it in {@link #trigger(long)} for
* emitting watermarks.
*/
- private transient List<UnboundedSource.UnboundedReader<OutputT>> readers;
+ private transient List<UnboundedSource.UnboundedReader<OutputT>> localReaders;
/**
* Initialize here and not in run() to prevent races where we cancel a job before run() is
@@ -149,26 +155,15 @@ public class UnboundedSourceWrapper<
int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
int numSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
- List<UnboundedSource<OutputT, CheckpointMarkT>> localSources = new ArrayList<>();
-
- for (int i = 0; i < splitSources.size(); i++) {
- if (i % numSubtasks == subtaskIndex) {
- localSources.add(splitSources.get(i));
- }
- }
+ localSplitSources = new ArrayList<>();
+ localReaders = new ArrayList<>();
- LOG.info("Unbounded Flink Source {}/{} is reading from sources: {}",
- subtaskIndex,
- numSubtasks,
- localSources);
-
- readers = new ArrayList<>();
if (restoredState != null) {
// restore the splitSources from the checkpoint to ensure consistent ordering
// do it using a transform because otherwise we would have to do
// unchecked casts
- splitSources = Lists.transform(
+ localSplitSources = Lists.transform(
restoredState,
new Function<
KV<? extends UnboundedSource<OutputT, CheckpointMarkT>, CheckpointMarkT>,
@@ -182,19 +177,31 @@ public class UnboundedSourceWrapper<
for (KV<? extends UnboundedSource<OutputT, CheckpointMarkT>, CheckpointMarkT> restored:
restoredState) {
- readers.add(
+ localReaders.add(
restored.getKey().createReader(
serializedOptions.getPipelineOptions(), restored.getValue()));
}
restoredState = null;
} else {
- // initialize readers from scratch
- for (UnboundedSource<OutputT, CheckpointMarkT> source : localSources) {
- readers.add(source.createReader(serializedOptions.getPipelineOptions(), null));
+ // initialize localReaders and localSources from scratch
+ for (int i = 0; i < splitSources.size(); i++) {
+ if (i % numSubtasks == subtaskIndex) {
+ UnboundedSource<OutputT, CheckpointMarkT> source =
+ splitSources.get(i);
+ UnboundedSource.UnboundedReader<OutputT> reader =
+ source.createReader(serializedOptions.getPipelineOptions(), null);
+ localSplitSources.add(source);
+ localReaders.add(reader);
+ }
}
}
- if (readers.size() == 0) {
+ LOG.info("Unbounded Flink Source {}/{} is reading from sources: {}",
+ subtaskIndex,
+ numSubtasks,
+ localSplitSources);
+
+ if (localReaders.size() == 0) {
// do nothing, but still look busy ...
// also, output a Long.MAX_VALUE watermark since we know that we're not
// going to emit anything
@@ -218,9 +225,9 @@ public class UnboundedSourceWrapper<
}
}
}
- } else if (readers.size() == 1) {
+ } else if (localReaders.size() == 1) {
// the easy case, we just read from one reader
- UnboundedSource.UnboundedReader<OutputT> reader = readers.get(0);
+ UnboundedSource.UnboundedReader<OutputT> reader = localReaders.get(0);
boolean dataAvailable = reader.start();
if (dataAvailable) {
@@ -239,25 +246,25 @@ public class UnboundedSourceWrapper<
}
}
} else {
- // a bit more complicated, we are responsible for several readers
+ // a bit more complicated, we are responsible for several localReaders
// loop through them and sleep if none of them had any data
- int numReaders = readers.size();
+ int numReaders = localReaders.size();
int currentReader = 0;
// start each reader and emit data if immediately available
- for (UnboundedSource.UnboundedReader<OutputT> reader : readers) {
+ for (UnboundedSource.UnboundedReader<OutputT> reader : localReaders) {
boolean dataAvailable = reader.start();
if (dataAvailable) {
emitElement(ctx, reader);
}
}
- // a flag telling us whether any of the readers had data
+ // a flag telling us whether any of the localReaders had data
// if no reader had data, sleep for bit
boolean hadData = false;
while (isRunning) {
- UnboundedSource.UnboundedReader<OutputT> reader = readers.get(currentReader);
+ UnboundedSource.UnboundedReader<OutputT> reader = localReaders.get(currentReader);
boolean dataAvailable = reader.advance();
if (dataAvailable) {
@@ -298,8 +305,8 @@ public class UnboundedSourceWrapper<
@Override
public void close() throws Exception {
super.close();
- if (readers != null) {
- for (UnboundedSource.UnboundedReader<OutputT> reader: readers) {
+ if (localReaders != null) {
+ for (UnboundedSource.UnboundedReader<OutputT> reader: localReaders) {
reader.close();
}
}
@@ -324,9 +331,9 @@ public class UnboundedSourceWrapper<
List<KV<? extends UnboundedSource<OutputT, CheckpointMarkT>, CheckpointMarkT>> checkpoints =
new ArrayList<>();
- for (int i = 0; i < splitSources.size(); i++) {
- UnboundedSource<OutputT, CheckpointMarkT> source = splitSources.get(i);
- UnboundedSource.UnboundedReader<OutputT> reader = readers.get(i);
+ for (int i = 0; i < localSplitSources.size(); i++) {
+ UnboundedSource<OutputT, CheckpointMarkT> source = localSplitSources.get(i);
+ UnboundedSource.UnboundedReader<OutputT> reader = localReaders.get(i);
@SuppressWarnings("unchecked")
CheckpointMarkT mark = (CheckpointMarkT) reader.getCheckpointMark();
@@ -357,9 +364,9 @@ public class UnboundedSourceWrapper<
public void trigger(long timestamp) throws Exception {
if (this.isRunning) {
synchronized (context.getCheckpointLock()) {
- // find minimum watermark over all readers
+ // find minimum watermark over all localReaders
long watermarkMillis = Long.MAX_VALUE;
- for (UnboundedSource.UnboundedReader<OutputT> reader: readers) {
+ for (UnboundedSource.UnboundedReader<OutputT> reader: localReaders) {
Instant watermark = reader.getWatermark();
if (watermark != null) {
watermarkMillis = Math.min(watermark.getMillis(), watermarkMillis);
[5/5] incubator-beam git commit: This closes #927
Posted by mx...@apache.org.
This closes #927
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b6205ffa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b6205ffa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b6205ffa
Branch: refs/heads/master
Commit: b6205ffa309af4e21ea2f63a211caae4961b81b1
Parents: c78db9a 4afd25a
Author: Maximilian Michels <mx...@apache.org>
Authored: Fri Sep 9 16:10:55 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Fri Sep 9 16:10:55 2016 +0200
----------------------------------------------------------------------
.../streaming/io/UnboundedSourceWrapper.java | 87 ++++++++------
.../streaming/UnboundedSourceWrapperTest.java | 113 +++++++------------
2 files changed, 93 insertions(+), 107 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b6205ffa/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
----------------------------------------------------------------------
[2/5] incubator-beam git commit: [BEAM-333][flink] make
bounded/unbounded sources stoppable
Posted by mx...@apache.org.
[BEAM-333][flink] make bounded/unbounded sources stoppable
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7e2820b0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7e2820b0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7e2820b0
Branch: refs/heads/master
Commit: 7e2820b06c19d958cbf7316ae28def7fe796a360
Parents: be689df
Author: Maximilian Michels <mx...@apache.org>
Authored: Tue Sep 6 16:38:43 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Fri Sep 9 16:06:42 2016 +0200
----------------------------------------------------------------------
.../wrappers/streaming/io/BoundedSourceWrapper.java | 9 ++++++++-
.../wrappers/streaming/io/UnboundedSourceWrapper.java | 8 +++++++-
2 files changed, 15 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7e2820b0/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
index 3cb93c0..df49a49 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
@@ -26,6 +26,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.flink.api.common.functions.StoppableFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.watermark.Watermark;
@@ -37,7 +38,8 @@ import org.slf4j.LoggerFactory;
* Wrapper for executing {@link BoundedSource UnboundedSources} as a Flink Source.
*/
public class BoundedSourceWrapper<OutputT>
- extends RichParallelSourceFunction<WindowedValue<OutputT>> {
+ extends RichParallelSourceFunction<WindowedValue<OutputT>>
+ implements StoppableFunction {
private static final Logger LOG = LoggerFactory.getLogger(BoundedSourceWrapper.class);
@@ -206,6 +208,11 @@ public class BoundedSourceWrapper<OutputT>
isRunning = false;
}
+ @Override
+ public void stop() {
+ this.isRunning = false;
+ }
+
/**
* Visible so that we can check this in tests. Must not be used for anything else.
*/
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7e2820b0/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
index 8647322..debf52f 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
@@ -36,6 +36,7 @@ import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.commons.io.output.ByteArrayOutputStream;
+import org.apache.flink.api.common.functions.StoppableFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
@@ -53,7 +54,7 @@ import org.slf4j.LoggerFactory;
public class UnboundedSourceWrapper<
OutputT, CheckpointMarkT extends UnboundedSource.CheckpointMark>
extends RichParallelSourceFunction<WindowedValue<OutputT>>
- implements Triggerable, Checkpointed<byte[]> {
+ implements Triggerable, StoppableFunction, Checkpointed<byte[]> {
private static final Logger LOG = LoggerFactory.getLogger(UnboundedSourceWrapper.class);
@@ -311,6 +312,11 @@ public class UnboundedSourceWrapper<
}
@Override
+ public void stop() {
+ isRunning = false;
+ }
+
+ @Override
public byte[] snapshotState(long l, long l1) throws Exception {
if (checkpointCoder == null) {
[4/5] incubator-beam git commit: [BEAM-619] extend test case to be
parameterized
Posted by mx...@apache.org.
[BEAM-619] extend test case to be parameterized
- extend test case with number of tasks and splits parameters
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4afd25a7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4afd25a7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4afd25a7
Branch: refs/heads/master
Commit: 4afd25a7a85a24ff0212a4791661d3c5e105662b
Parents: 145ad47
Author: Maximilian Michels <mx...@apache.org>
Authored: Wed Sep 7 14:23:12 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Fri Sep 9 16:09:44 2016 +0200
----------------------------------------------------------------------
.../streaming/io/UnboundedSourceWrapper.java | 8 ++
.../streaming/UnboundedSourceWrapperTest.java | 113 +++++++------------
2 files changed, 50 insertions(+), 71 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4afd25a7/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
index 2cd06ed..a62a754 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
@@ -397,4 +397,12 @@ public class UnboundedSourceWrapper<
public List<? extends UnboundedSource<OutputT, CheckpointMarkT>> getSplitSources() {
return splitSources;
}
+
+ /**
+ * Visible so that we can check this in tests. Must not be used for anything else.
+ */
+ @VisibleForTesting
+ public List<? extends UnboundedSource<OutputT, CheckpointMarkT>> getLocalSplitSources() {
+ return localSplitSources;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4afd25a7/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
index 73124a9..0cc584e 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
@@ -23,6 +23,8 @@ import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
@@ -44,78 +46,43 @@ import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
/**
* Tests for {@link UnboundedSourceWrapper}.
*/
+@RunWith(Parameterized.class)
public class UnboundedSourceWrapperTest {
- /**
- * Creates a {@link UnboundedSourceWrapper} that has exactly one reader per source, since we
- * specify a parallelism of 1 and also at runtime tell the source that it has 1 parallel subtask.
- */
- @Test
- public void testWithOneReader() throws Exception {
- final int numElements = 20;
- final Object checkpointLock = new Object();
- PipelineOptions options = PipelineOptionsFactory.create();
-
- // this source will emit exactly NUM_ELEMENTS across all parallel readers,
- // afterwards it will stall. We check whether we also receive NUM_ELEMENTS
- // elements later.
- TestCountingSource source = new TestCountingSource(numElements);
- UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark> flinkWrapper =
- new UnboundedSourceWrapper<>(options, source, 1);
-
- assertEquals(1, flinkWrapper.getSplitSources().size());
-
- StreamSource<
- WindowedValue<KV<Integer, Integer>>,
- UnboundedSourceWrapper<
- KV<Integer, Integer>,
- TestCountingSource.CounterMark>> sourceOperator = new StreamSource<>(flinkWrapper);
-
- setupSourceOperator(sourceOperator);
-
-
- try {
- sourceOperator.run(checkpointLock,
- new Output<StreamRecord<WindowedValue<KV<Integer, Integer>>>>() {
- private int count = 0;
+ private final int numTasks;
+ private final int numSplits;
- @Override
- public void emitWatermark(Watermark watermark) {
- }
-
- @Override
- public void collect(
- StreamRecord<WindowedValue<KV<Integer, Integer>>> windowedValueStreamRecord) {
-
- count++;
- if (count >= numElements) {
- throw new SuccessException();
- }
- }
-
- @Override
- public void close() {
+ public UnboundedSourceWrapperTest(int numTasks, int numSplits) {
+ this.numTasks = numTasks;
+ this.numSplits = numSplits;
+ }
- }
- });
- } catch (SuccessException e) {
- // success
- } catch (Exception e) {
- fail("We caught " + e);
- }
+ @Parameterized.Parameters
+ public static Collection<Object[]> data() {
+ /*
+ * Parameters for initializing the tests:
+ * {numTasks, numSplits}
+ * The test currently assumes powers of two for some assertions.
+ */
+ return Arrays.asList(new Object[][] {
+ {1, 1}, {1, 2}, {1, 4},
+ {2, 1}, {2, 2}, {2, 4},
+ {4, 1}, {4, 2}, {4, 4}
+ });
}
/**
- * Creates a {@link UnboundedSourceWrapper} that has multiple readers per source, since we
- * specify a parallelism higher than 1 and at runtime tell the source that it has 1 parallel
- * this means that one source will manage multiple readers.
+ * Creates a {@link UnboundedSourceWrapper} that has one or multiple readers per source.
+ * If numSplits > numTasks the source has one source will manage multiple readers.
*/
@Test
- public void testWithMultipleReaders() throws Exception {
+ public void testReaders() throws Exception {
final int numElements = 20;
final Object checkpointLock = new Object();
PipelineOptions options = PipelineOptionsFactory.create();
@@ -125,9 +92,9 @@ public class UnboundedSourceWrapperTest {
// elements later.
TestCountingSource source = new TestCountingSource(numElements);
UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark> flinkWrapper =
- new UnboundedSourceWrapper<>(options, source, 4);
+ new UnboundedSourceWrapper<>(options, source, numSplits);
- assertEquals(4, flinkWrapper.getSplitSources().size());
+ assertEquals(numSplits, flinkWrapper.getSplitSources().size());
StreamSource<WindowedValue<
KV<Integer, Integer>>,
@@ -135,8 +102,7 @@ public class UnboundedSourceWrapperTest {
KV<Integer, Integer>,
TestCountingSource.CounterMark>> sourceOperator = new StreamSource<>(flinkWrapper);
- setupSourceOperator(sourceOperator);
-
+ setupSourceOperator(sourceOperator, numTasks);
try {
sourceOperator.run(checkpointLock,
@@ -163,6 +129,9 @@ public class UnboundedSourceWrapperTest {
}
});
} catch (SuccessException e) {
+
+ assertEquals(Math.max(1, numSplits / numTasks), flinkWrapper.getLocalSplitSources().size());
+
// success
return;
}
@@ -186,9 +155,9 @@ public class UnboundedSourceWrapperTest {
// elements later.
TestCountingSource source = new TestCountingSource(numElements);
UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark> flinkWrapper =
- new UnboundedSourceWrapper<>(options, source, 1);
+ new UnboundedSourceWrapper<>(options, source, numSplits);
- assertEquals(1, flinkWrapper.getSplitSources().size());
+ assertEquals(numSplits, flinkWrapper.getSplitSources().size());
StreamSource<
WindowedValue<KV<Integer, Integer>>,
@@ -196,7 +165,7 @@ public class UnboundedSourceWrapperTest {
KV<Integer, Integer>,
TestCountingSource.CounterMark>> sourceOperator = new StreamSource<>(flinkWrapper);
- setupSourceOperator(sourceOperator);
+ setupSourceOperator(sourceOperator, numTasks);
final Set<KV<Integer, Integer>> emittedElements = new HashSet<>();
@@ -241,9 +210,9 @@ public class UnboundedSourceWrapperTest {
TestCountingSource restoredSource = new TestCountingSource(numElements);
UnboundedSourceWrapper<
KV<Integer, Integer>, TestCountingSource.CounterMark> restoredFlinkWrapper =
- new UnboundedSourceWrapper<>(options, restoredSource, 1);
+ new UnboundedSourceWrapper<>(options, restoredSource, numSplits);
- assertEquals(1, restoredFlinkWrapper.getSplitSources().size());
+ assertEquals(numSplits, restoredFlinkWrapper.getSplitSources().size());
StreamSource<
WindowedValue<KV<Integer, Integer>>,
@@ -252,7 +221,7 @@ public class UnboundedSourceWrapperTest {
TestCountingSource.CounterMark>> restoredSourceOperator =
new StreamSource<>(restoredFlinkWrapper);
- setupSourceOperator(restoredSourceOperator);
+ setupSourceOperator(restoredSourceOperator, numTasks);
// restore snapshot
restoredFlinkWrapper.restoreState(snapshot);
@@ -289,6 +258,8 @@ public class UnboundedSourceWrapperTest {
readSecondBatchOfElements = true;
}
+ assertEquals(Math.max(1, numSplits / numTasks), flinkWrapper.getLocalSplitSources().size());
+
assertTrue("Did not successfully read second batch of elements.", readSecondBatchOfElements);
// verify that we saw all NUM_ELEMENTS elements
@@ -296,13 +267,13 @@ public class UnboundedSourceWrapperTest {
}
@SuppressWarnings("unchecked")
- private static <T> void setupSourceOperator(StreamSource<T, ?> operator) {
+ private static <T> void setupSourceOperator(StreamSource<T, ?> operator, int numSubTasks) {
ExecutionConfig executionConfig = new ExecutionConfig();
StreamConfig cfg = new StreamConfig(new Configuration());
cfg.setTimeCharacteristic(TimeCharacteristic.EventTime);
- Environment env = new DummyEnvironment("MockTwoInputTask", 1, 0);
+ Environment env = new DummyEnvironment("MockTwoInputTask", numSubTasks, 0);
StreamTask<?, ?> mockTask = mock(StreamTask.class);
when(mockTask.getName()).thenReturn("Mock Task");
[3/5] incubator-beam git commit: This closes #924
Posted by mx...@apache.org.
This closes #924
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c78db9ad
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c78db9ad
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c78db9ad
Branch: refs/heads/master
Commit: c78db9addf0b08b1b4a3ca4ec5e3e7f3a0899a02
Parents: a96ea98 7e2820b
Author: Maximilian Michels <mx...@apache.org>
Authored: Fri Sep 9 16:07:57 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Fri Sep 9 16:07:57 2016 +0200
----------------------------------------------------------------------
.../wrappers/streaming/io/BoundedSourceWrapper.java | 9 ++++++++-
.../wrappers/streaming/io/UnboundedSourceWrapper.java | 8 +++++++-
2 files changed, 15 insertions(+), 2 deletions(-)
----------------------------------------------------------------------