You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mx...@apache.org on 2016/09/09 14:12:27 UTC

[1/5] incubator-beam git commit: [BEAM-619] keep track of local split sources in UnboundedSourceWrapper

Repository: incubator-beam
Updated Branches:
  refs/heads/master a96ea98a4 -> b6205ffa3


[BEAM-619] keep track of local split sources in UnboundedSourceWrapper


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/145ad47d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/145ad47d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/145ad47d

Branch: refs/heads/master
Commit: 145ad47d9f945f816be7a91001cdf7cb3b6a7fac
Parents: be689df
Author: Maximilian Michels <mx...@apache.org>
Authored: Wed Sep 7 13:07:15 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Wed Sep 7 13:15:54 2016 +0200

----------------------------------------------------------------------
 .../streaming/io/UnboundedSourceWrapper.java    | 79 +++++++++++---------
 1 file changed, 43 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/145ad47d/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
index 8647322..2cd06ed 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
@@ -58,7 +58,7 @@ public class UnboundedSourceWrapper<
   private static final Logger LOG = LoggerFactory.getLogger(UnboundedSourceWrapper.class);
 
   /**
-   * Keep the options so that we can initialize the readers.
+   * Keep the options so that we can initialize the localReaders.
    */
   private final SerializedPipelineOptions serializedOptions;
 
@@ -72,13 +72,19 @@ public class UnboundedSourceWrapper<
    * The split sources. We split them in the constructor to ensure that all parallel
    * sources are consistent about the split sources.
    */
-  private List<? extends UnboundedSource<OutputT, CheckpointMarkT>> splitSources;
+  private final List<? extends UnboundedSource<OutputT, CheckpointMarkT>> splitSources;
 
   /**
+   * The local split sources. Assigned at runtime when the wrapper is executed in parallel.
+   */
+  private transient List<UnboundedSource<OutputT, CheckpointMarkT>> localSplitSources;
+
+  /**
+   * The local split readers. Assigned at runtime when the wrapper is executed in parallel.
    * Make it a field so that we can access it in {@link #trigger(long)} for
    * emitting watermarks.
    */
-  private transient List<UnboundedSource.UnboundedReader<OutputT>> readers;
+  private transient List<UnboundedSource.UnboundedReader<OutputT>> localReaders;
 
   /**
    * Initialize here and not in run() to prevent races where we cancel a job before run() is
@@ -149,26 +155,15 @@ public class UnboundedSourceWrapper<
     int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
     int numSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
 
-    List<UnboundedSource<OutputT, CheckpointMarkT>> localSources = new ArrayList<>();
-
-    for (int i = 0; i < splitSources.size(); i++) {
-      if (i % numSubtasks == subtaskIndex) {
-        localSources.add(splitSources.get(i));
-      }
-    }
+    localSplitSources = new ArrayList<>();
+    localReaders = new ArrayList<>();
 
-    LOG.info("Unbounded Flink Source {}/{} is reading from sources: {}",
-        subtaskIndex,
-        numSubtasks,
-        localSources);
-
-    readers = new ArrayList<>();
     if (restoredState != null) {
 
       // restore the splitSources from the checkpoint to ensure consistent ordering
       // do it using a transform because otherwise we would have to do
       // unchecked casts
-      splitSources = Lists.transform(
+      localSplitSources = Lists.transform(
           restoredState,
           new Function<
               KV<? extends UnboundedSource<OutputT, CheckpointMarkT>, CheckpointMarkT>,
@@ -182,19 +177,31 @@ public class UnboundedSourceWrapper<
 
       for (KV<? extends UnboundedSource<OutputT, CheckpointMarkT>, CheckpointMarkT> restored:
           restoredState) {
-        readers.add(
+        localReaders.add(
             restored.getKey().createReader(
                 serializedOptions.getPipelineOptions(), restored.getValue()));
       }
       restoredState = null;
     } else {
-      // initialize readers from scratch
-      for (UnboundedSource<OutputT, CheckpointMarkT> source : localSources) {
-        readers.add(source.createReader(serializedOptions.getPipelineOptions(), null));
+      // initialize localReaders and localSources from scratch
+      for (int i = 0; i < splitSources.size(); i++) {
+        if (i % numSubtasks == subtaskIndex) {
+          UnboundedSource<OutputT, CheckpointMarkT> source =
+              splitSources.get(i);
+          UnboundedSource.UnboundedReader<OutputT> reader =
+              source.createReader(serializedOptions.getPipelineOptions(), null);
+          localSplitSources.add(source);
+          localReaders.add(reader);
+        }
       }
     }
 
-    if (readers.size() == 0) {
+    LOG.info("Unbounded Flink Source {}/{} is reading from sources: {}",
+        subtaskIndex,
+        numSubtasks,
+        localSplitSources);
+
+    if (localReaders.size() == 0) {
       // do nothing, but still look busy ...
       // also, output a Long.MAX_VALUE watermark since we know that we're not
       // going to emit anything
@@ -218,9 +225,9 @@ public class UnboundedSourceWrapper<
           }
         }
       }
-    } else if (readers.size() == 1) {
+    } else if (localReaders.size() == 1) {
       // the easy case, we just read from one reader
-      UnboundedSource.UnboundedReader<OutputT> reader = readers.get(0);
+      UnboundedSource.UnboundedReader<OutputT> reader = localReaders.get(0);
 
       boolean dataAvailable = reader.start();
       if (dataAvailable) {
@@ -239,25 +246,25 @@ public class UnboundedSourceWrapper<
         }
       }
     } else {
-      // a bit more complicated, we are responsible for several readers
+      // a bit more complicated, we are responsible for several localReaders
       // loop through them and sleep if none of them had any data
 
-      int numReaders = readers.size();
+      int numReaders = localReaders.size();
       int currentReader = 0;
 
       // start each reader and emit data if immediately available
-      for (UnboundedSource.UnboundedReader<OutputT> reader : readers) {
+      for (UnboundedSource.UnboundedReader<OutputT> reader : localReaders) {
         boolean dataAvailable = reader.start();
         if (dataAvailable) {
           emitElement(ctx, reader);
         }
       }
 
-      // a flag telling us whether any of the readers had data
+      // a flag telling us whether any of the localReaders had data
       // if no reader had data, sleep for bit
       boolean hadData = false;
       while (isRunning) {
-        UnboundedSource.UnboundedReader<OutputT> reader = readers.get(currentReader);
+        UnboundedSource.UnboundedReader<OutputT> reader = localReaders.get(currentReader);
         boolean dataAvailable = reader.advance();
 
         if (dataAvailable) {
@@ -298,8 +305,8 @@ public class UnboundedSourceWrapper<
   @Override
   public void close() throws Exception {
     super.close();
-    if (readers != null) {
-      for (UnboundedSource.UnboundedReader<OutputT> reader: readers) {
+    if (localReaders != null) {
+      for (UnboundedSource.UnboundedReader<OutputT> reader: localReaders) {
         reader.close();
       }
     }
@@ -324,9 +331,9 @@ public class UnboundedSourceWrapper<
     List<KV<? extends UnboundedSource<OutputT, CheckpointMarkT>, CheckpointMarkT>> checkpoints =
         new ArrayList<>();
 
-    for (int i = 0; i < splitSources.size(); i++) {
-      UnboundedSource<OutputT, CheckpointMarkT> source = splitSources.get(i);
-      UnboundedSource.UnboundedReader<OutputT> reader = readers.get(i);
+    for (int i = 0; i < localSplitSources.size(); i++) {
+      UnboundedSource<OutputT, CheckpointMarkT> source = localSplitSources.get(i);
+      UnboundedSource.UnboundedReader<OutputT> reader = localReaders.get(i);
 
       @SuppressWarnings("unchecked")
       CheckpointMarkT mark = (CheckpointMarkT) reader.getCheckpointMark();
@@ -357,9 +364,9 @@ public class UnboundedSourceWrapper<
   public void trigger(long timestamp) throws Exception {
     if (this.isRunning) {
       synchronized (context.getCheckpointLock()) {
-        // find minimum watermark over all readers
+        // find minimum watermark over all localReaders
         long watermarkMillis = Long.MAX_VALUE;
-        for (UnboundedSource.UnboundedReader<OutputT> reader: readers) {
+        for (UnboundedSource.UnboundedReader<OutputT> reader: localReaders) {
           Instant watermark = reader.getWatermark();
           if (watermark != null) {
             watermarkMillis = Math.min(watermark.getMillis(), watermarkMillis);


[5/5] incubator-beam git commit: This closes #927

Posted by mx...@apache.org.
This closes #927


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b6205ffa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b6205ffa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b6205ffa

Branch: refs/heads/master
Commit: b6205ffa309af4e21ea2f63a211caae4961b81b1
Parents: c78db9a 4afd25a
Author: Maximilian Michels <mx...@apache.org>
Authored: Fri Sep 9 16:10:55 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Fri Sep 9 16:10:55 2016 +0200

----------------------------------------------------------------------
 .../streaming/io/UnboundedSourceWrapper.java    |  87 ++++++++------
 .../streaming/UnboundedSourceWrapperTest.java   | 113 +++++++------------
 2 files changed, 93 insertions(+), 107 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b6205ffa/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
----------------------------------------------------------------------


[2/5] incubator-beam git commit: [BEAM-333][flink] make bounded/unbounded sources stoppable

Posted by mx...@apache.org.
[BEAM-333][flink] make bounded/unbounded sources stoppable


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7e2820b0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7e2820b0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7e2820b0

Branch: refs/heads/master
Commit: 7e2820b06c19d958cbf7316ae28def7fe796a360
Parents: be689df
Author: Maximilian Michels <mx...@apache.org>
Authored: Tue Sep 6 16:38:43 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Fri Sep 9 16:06:42 2016 +0200

----------------------------------------------------------------------
 .../wrappers/streaming/io/BoundedSourceWrapper.java         | 9 ++++++++-
 .../wrappers/streaming/io/UnboundedSourceWrapper.java       | 8 +++++++-
 2 files changed, 15 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7e2820b0/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
index 3cb93c0..df49a49 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
@@ -26,6 +26,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.flink.api.common.functions.StoppableFunction;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.api.watermark.Watermark;
@@ -37,7 +38,8 @@ import org.slf4j.LoggerFactory;
  * Wrapper for executing {@link BoundedSource UnboundedSources} as a Flink Source.
  */
 public class BoundedSourceWrapper<OutputT>
-    extends RichParallelSourceFunction<WindowedValue<OutputT>> {
+    extends RichParallelSourceFunction<WindowedValue<OutputT>>
+    implements StoppableFunction {
 
   private static final Logger LOG = LoggerFactory.getLogger(BoundedSourceWrapper.class);
 
@@ -206,6 +208,11 @@ public class BoundedSourceWrapper<OutputT>
     isRunning = false;
   }
 
+  @Override
+  public void stop() {
+    this.isRunning = false;
+  }
+
   /**
    * Visible so that we can check this in tests. Must not be used for anything else.
    */

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7e2820b0/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
index 8647322..debf52f 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
@@ -36,6 +36,7 @@ import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.commons.io.output.ByteArrayOutputStream;
+import org.apache.flink.api.common.functions.StoppableFunction;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
@@ -53,7 +54,7 @@ import org.slf4j.LoggerFactory;
 public class UnboundedSourceWrapper<
     OutputT, CheckpointMarkT extends UnboundedSource.CheckpointMark>
     extends RichParallelSourceFunction<WindowedValue<OutputT>>
-    implements Triggerable, Checkpointed<byte[]> {
+    implements Triggerable, StoppableFunction, Checkpointed<byte[]> {
 
   private static final Logger LOG = LoggerFactory.getLogger(UnboundedSourceWrapper.class);
 
@@ -311,6 +312,11 @@ public class UnboundedSourceWrapper<
   }
 
   @Override
+  public void stop() {
+    isRunning = false;
+  }
+
+  @Override
   public byte[] snapshotState(long l, long l1) throws Exception {
 
     if (checkpointCoder == null) {


[4/5] incubator-beam git commit: [BEAM-619] extend test case to be parameterized

Posted by mx...@apache.org.
[BEAM-619] extend test case to be parameterized

- extend test case with number of tasks and splits parameters


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4afd25a7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4afd25a7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4afd25a7

Branch: refs/heads/master
Commit: 4afd25a7a85a24ff0212a4791661d3c5e105662b
Parents: 145ad47
Author: Maximilian Michels <mx...@apache.org>
Authored: Wed Sep 7 14:23:12 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Fri Sep 9 16:09:44 2016 +0200

----------------------------------------------------------------------
 .../streaming/io/UnboundedSourceWrapper.java    |   8 ++
 .../streaming/UnboundedSourceWrapperTest.java   | 113 +++++++------------
 2 files changed, 50 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4afd25a7/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
index 2cd06ed..a62a754 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
@@ -397,4 +397,12 @@ public class UnboundedSourceWrapper<
   public List<? extends UnboundedSource<OutputT, CheckpointMarkT>> getSplitSources() {
     return splitSources;
   }
+
+  /**
+   * Visible so that we can check this in tests. Must not be used for anything else.
+   */
+  @VisibleForTesting
+  public List<? extends UnboundedSource<OutputT, CheckpointMarkT>> getLocalSplitSources() {
+    return localSplitSources;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4afd25a7/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
index 73124a9..0cc584e 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
@@ -23,6 +23,8 @@ import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
@@ -44,78 +46,43 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 /**
  * Tests for {@link UnboundedSourceWrapper}.
  */
+@RunWith(Parameterized.class)
 public class UnboundedSourceWrapperTest {
 
-  /**
-   * Creates a {@link UnboundedSourceWrapper} that has exactly one reader per source, since we
-   * specify a parallelism of 1 and also at runtime tell the source that it has 1 parallel subtask.
-   */
-  @Test
-  public void testWithOneReader() throws Exception {
-    final int numElements = 20;
-    final Object checkpointLock = new Object();
-    PipelineOptions options = PipelineOptionsFactory.create();
-
-    // this source will emit exactly NUM_ELEMENTS across all parallel readers,
-    // afterwards it will stall. We check whether we also receive NUM_ELEMENTS
-    // elements later.
-    TestCountingSource source = new TestCountingSource(numElements);
-    UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark> flinkWrapper =
-        new UnboundedSourceWrapper<>(options, source, 1);
-
-    assertEquals(1, flinkWrapper.getSplitSources().size());
-
-    StreamSource<
-        WindowedValue<KV<Integer, Integer>>,
-        UnboundedSourceWrapper<
-            KV<Integer, Integer>,
-            TestCountingSource.CounterMark>> sourceOperator = new StreamSource<>(flinkWrapper);
-
-    setupSourceOperator(sourceOperator);
-
-
-    try {
-      sourceOperator.run(checkpointLock,
-          new Output<StreamRecord<WindowedValue<KV<Integer, Integer>>>>() {
-            private int count = 0;
+  private final int numTasks;
+  private final int numSplits;
 
-            @Override
-            public void emitWatermark(Watermark watermark) {
-            }
-
-            @Override
-            public void collect(
-                StreamRecord<WindowedValue<KV<Integer, Integer>>> windowedValueStreamRecord) {
-
-              count++;
-              if (count >= numElements) {
-                throw new SuccessException();
-              }
-            }
-
-            @Override
-            public void close() {
+  public UnboundedSourceWrapperTest(int numTasks, int numSplits) {
+    this.numTasks = numTasks;
+    this.numSplits = numSplits;
+  }
 
-            }
-          });
-    } catch (SuccessException e) {
-      // success
-    } catch (Exception e) {
-      fail("We caught " + e);
-    }
+  @Parameterized.Parameters
+  public static Collection<Object[]> data() {
+    /*
+     * Parameters for initializing the tests:
+     * {numTasks, numSplits}
+     * The test currently assumes powers of two for some assertions.
+     */
+    return Arrays.asList(new Object[][] {
+      {1, 1}, {1, 2}, {1, 4},
+      {2, 1}, {2, 2}, {2, 4},
+      {4, 1}, {4, 2}, {4, 4}
+    });
   }
 
   /**
-   * Creates a {@link UnboundedSourceWrapper} that has multiple readers per source, since we
-   * specify a parallelism higher than 1 and at runtime tell the source that it has 1 parallel
-   * this means that one source will manage multiple readers.
+   * Creates a {@link UnboundedSourceWrapper} that has one or multiple readers per source.
+   * If numSplits > numTasks the source has one source will manage multiple readers.
    */
   @Test
-  public void testWithMultipleReaders() throws Exception {
+  public void testReaders() throws Exception {
     final int numElements = 20;
     final Object checkpointLock = new Object();
     PipelineOptions options = PipelineOptionsFactory.create();
@@ -125,9 +92,9 @@ public class UnboundedSourceWrapperTest {
     // elements later.
     TestCountingSource source = new TestCountingSource(numElements);
     UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark> flinkWrapper =
-        new UnboundedSourceWrapper<>(options, source, 4);
+        new UnboundedSourceWrapper<>(options, source, numSplits);
 
-    assertEquals(4, flinkWrapper.getSplitSources().size());
+    assertEquals(numSplits, flinkWrapper.getSplitSources().size());
 
     StreamSource<WindowedValue<
         KV<Integer, Integer>>,
@@ -135,8 +102,7 @@ public class UnboundedSourceWrapperTest {
             KV<Integer, Integer>,
             TestCountingSource.CounterMark>> sourceOperator = new StreamSource<>(flinkWrapper);
 
-    setupSourceOperator(sourceOperator);
-
+    setupSourceOperator(sourceOperator, numTasks);
 
     try {
       sourceOperator.run(checkpointLock,
@@ -163,6 +129,9 @@ public class UnboundedSourceWrapperTest {
             }
           });
     } catch (SuccessException e) {
+
+      assertEquals(Math.max(1, numSplits / numTasks), flinkWrapper.getLocalSplitSources().size());
+
       // success
       return;
     }
@@ -186,9 +155,9 @@ public class UnboundedSourceWrapperTest {
     // elements later.
     TestCountingSource source = new TestCountingSource(numElements);
     UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark> flinkWrapper =
-        new UnboundedSourceWrapper<>(options, source, 1);
+        new UnboundedSourceWrapper<>(options, source, numSplits);
 
-    assertEquals(1, flinkWrapper.getSplitSources().size());
+    assertEquals(numSplits, flinkWrapper.getSplitSources().size());
 
     StreamSource<
         WindowedValue<KV<Integer, Integer>>,
@@ -196,7 +165,7 @@ public class UnboundedSourceWrapperTest {
             KV<Integer, Integer>,
             TestCountingSource.CounterMark>> sourceOperator = new StreamSource<>(flinkWrapper);
 
-    setupSourceOperator(sourceOperator);
+    setupSourceOperator(sourceOperator, numTasks);
 
     final Set<KV<Integer, Integer>> emittedElements = new HashSet<>();
 
@@ -241,9 +210,9 @@ public class UnboundedSourceWrapperTest {
     TestCountingSource restoredSource = new TestCountingSource(numElements);
     UnboundedSourceWrapper<
         KV<Integer, Integer>, TestCountingSource.CounterMark> restoredFlinkWrapper =
-        new UnboundedSourceWrapper<>(options, restoredSource, 1);
+        new UnboundedSourceWrapper<>(options, restoredSource, numSplits);
 
-    assertEquals(1, restoredFlinkWrapper.getSplitSources().size());
+    assertEquals(numSplits, restoredFlinkWrapper.getSplitSources().size());
 
     StreamSource<
         WindowedValue<KV<Integer, Integer>>,
@@ -252,7 +221,7 @@ public class UnboundedSourceWrapperTest {
             TestCountingSource.CounterMark>> restoredSourceOperator =
         new StreamSource<>(restoredFlinkWrapper);
 
-    setupSourceOperator(restoredSourceOperator);
+    setupSourceOperator(restoredSourceOperator, numTasks);
 
     // restore snapshot
     restoredFlinkWrapper.restoreState(snapshot);
@@ -289,6 +258,8 @@ public class UnboundedSourceWrapperTest {
       readSecondBatchOfElements = true;
     }
 
+    assertEquals(Math.max(1, numSplits / numTasks), flinkWrapper.getLocalSplitSources().size());
+
     assertTrue("Did not successfully read second batch of elements.", readSecondBatchOfElements);
 
     // verify that we saw all NUM_ELEMENTS elements
@@ -296,13 +267,13 @@ public class UnboundedSourceWrapperTest {
   }
 
   @SuppressWarnings("unchecked")
-  private static <T> void setupSourceOperator(StreamSource<T, ?> operator) {
+  private static <T> void setupSourceOperator(StreamSource<T, ?> operator, int numSubTasks) {
     ExecutionConfig executionConfig = new ExecutionConfig();
     StreamConfig cfg = new StreamConfig(new Configuration());
 
     cfg.setTimeCharacteristic(TimeCharacteristic.EventTime);
 
-    Environment env = new DummyEnvironment("MockTwoInputTask", 1, 0);
+    Environment env = new DummyEnvironment("MockTwoInputTask", numSubTasks, 0);
 
     StreamTask<?, ?> mockTask = mock(StreamTask.class);
     when(mockTask.getName()).thenReturn("Mock Task");


[3/5] incubator-beam git commit: This closes #924

Posted by mx...@apache.org.
This closes #924


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c78db9ad
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c78db9ad
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c78db9ad

Branch: refs/heads/master
Commit: c78db9addf0b08b1b4a3ca4ec5e3e7f3a0899a02
Parents: a96ea98 7e2820b
Author: Maximilian Michels <mx...@apache.org>
Authored: Fri Sep 9 16:07:57 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Fri Sep 9 16:07:57 2016 +0200

----------------------------------------------------------------------
 .../wrappers/streaming/io/BoundedSourceWrapper.java         | 9 ++++++++-
 .../wrappers/streaming/io/UnboundedSourceWrapper.java       | 8 +++++++-
 2 files changed, 15 insertions(+), 2 deletions(-)
----------------------------------------------------------------------