You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/11/23 06:52:00 UTC

[04/50] incubator-beam git commit: Block earlier in BoundedReadEvaluatorFactoryTest

Block earlier in BoundedReadEvaluatorFactoryTest

This ensures that the reader doesn't claim the split point, which in
turn ensures the dynamic split request will not be refused by the
OffsetBasedSource. If the split is refused, ...ProducesDynamicSplits
flakes, as if the reader is faster than the split thread it can run past
the point at which the splitter thread will attempt to split the source,
which causes the reader to read all of the elements.

Sleep within TestReader#advanceImpl if the reader is being dynamically
split, to ensure that the dynamic split fully completes before
continuing a call to advance.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4a8d32e5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4a8d32e5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4a8d32e5

Branch: refs/heads/gearpump-runner
Commit: 4a8d32e5d3726b851329d507a8d0392cc03f6e85
Parents: 1543ea9
Author: Thomas Groh <tg...@google.com>
Authored: Thu Nov 17 10:56:49 2016 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Thu Nov 17 14:37:47 2016 -0800

----------------------------------------------------------------------
 .../direct/BoundedReadEvaluatorFactoryTest.java | 26 +++++++++++---------
 1 file changed, 14 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4a8d32e5/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
index 9d8503a..e956c34 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
@@ -49,6 +49,7 @@ import org.apache.beam.sdk.io.CountingSource;
 import org.apache.beam.sdk.io.OffsetBasedSource;
 import org.apache.beam.sdk.io.OffsetBasedSource.OffsetBasedReader;
 import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.Source;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.SourceTestUtils;
@@ -142,9 +143,7 @@ public class BoundedReadEvaluatorFactoryTest {
         TestPipeline.create().apply(Read.from(new TestSource<>(VarLongCoder.of(), 5, elems)));
     AppliedPTransform<?, ?, ?> transform = read.getProducingTransformInternal();
     Collection<CommittedBundle<?>> unreadInputs =
-        new BoundedReadEvaluatorFactory.InputProvider(context)
-            .getInitialInputs(transform,
-                1);
+        new BoundedReadEvaluatorFactory.InputProvider(context).getInitialInputs(transform, 1);
 
     Collection<WindowedValue<?>> outputs = new ArrayList<>();
     int numIterations = 0;
@@ -155,8 +154,7 @@ public class BoundedReadEvaluatorFactoryTest {
 
       Collection<CommittedBundle<?>> newUnreadInputs = new ArrayList<>();
       for (CommittedBundle<?> shardBundle : unreadInputs) {
-        TransformEvaluator<?> evaluator =
-            factory.forApplication(transform, null);
+        TransformEvaluator<?> evaluator = factory.forApplication(transform, null);
         for (WindowedValue<?> shard : shardBundle.getElements()) {
           evaluator.processElement((WindowedValue) shard);
         }
@@ -178,8 +176,6 @@ public class BoundedReadEvaluatorFactoryTest {
       unreadInputs = newUnreadInputs;
     }
 
-    // We produced at least one split before we read 1000 elements, as we will attempt to split as
-    // quickly as possible.
     assertThat(numIterations, greaterThan(1));
     WindowedValue[] expectedValues = new WindowedValue[numElements];
     for (long i = 0L; i < numElements; i++) {
@@ -343,7 +339,7 @@ public class BoundedReadEvaluatorFactoryTest {
     private static boolean readerClosed;
     private final Coder<T> coder;
     private final T[] elems;
-    private final int awaitSplitIndex;
+    private final int firstSplitIndex;
 
     private transient CountDownLatch subrangesCompleted;
 
@@ -351,11 +347,11 @@ public class BoundedReadEvaluatorFactoryTest {
       this(coder, elems.length, elems);
     }
 
-    public TestSource(Coder<T> coder, int awaitSplitIndex, T... elems) {
+    public TestSource(Coder<T> coder, int firstSplitIndex, T... elems) {
       super(0L, elems.length, 1L);
       this.elems = elems;
       this.coder = coder;
-      this.awaitSplitIndex = awaitSplitIndex;
+      this.firstSplitIndex = firstSplitIndex;
       readerClosed = false;
 
       subrangesCompleted = new CountDownLatch(2);
@@ -380,7 +376,7 @@ public class BoundedReadEvaluatorFactoryTest {
     @Override
     public BoundedSource.BoundedReader<T> createReader(PipelineOptions options) throws IOException {
       subrangesCompleted = new CountDownLatch(2);
-      return new TestReader<>(this, awaitSplitIndex, subrangesCompleted);
+      return new TestReader<>(this, firstSplitIndex, subrangesCompleted);
     }
 
     @Override
@@ -405,6 +401,7 @@ public class BoundedReadEvaluatorFactoryTest {
   }
 
   private static class TestReader<T> extends OffsetBasedReader<T> {
+    private final Source<T> initialSource;
     private final int sleepIndex;
     private final CountDownLatch dynamicallySplit;
 
@@ -412,6 +409,7 @@ public class BoundedReadEvaluatorFactoryTest {
 
     TestReader(OffsetBasedSource<T> source, int sleepIndex, CountDownLatch dynamicallySplit) {
       super(source);
+      this.initialSource = source;
       this.sleepIndex = sleepIndex;
       this.dynamicallySplit = dynamicallySplit;
       this.index = -1;
@@ -434,9 +432,13 @@ public class BoundedReadEvaluatorFactoryTest {
 
     @Override
     public boolean advanceImpl() throws IOException {
-      if (index == sleepIndex) {
+      // Sleep before the sleep/split index is claimed so long as it will be claimed
+      if (index + 1 == sleepIndex && sleepIndex < getCurrentSource().elems.length) {
         try {
           dynamicallySplit.await();
+          while (initialSource.equals(getCurrentSource())) {
+            // Spin until the current source is updated
+          }
         } catch (InterruptedException e) {
           Thread.currentThread().interrupt();
           throw new IOException(e);