You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2016/11/08 18:19:08 UTC

[2/2] incubator-beam git commit: Update Watermarks even if a Reader is empty

Update Watermarks even if a Reader is empty

This ensures that the pipeline will make progress even if a reader stops
producing elements.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/550978f6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/550978f6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/550978f6

Branch: refs/heads/master
Commit: 550978f630ee1e4424e981ddc5ff5e89aa8c797d
Parents: bfc527d
Author: Thomas Groh <tg...@google.com>
Authored: Mon Nov 7 12:59:06 2016 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Nov 8 10:17:13 2016 -0800

----------------------------------------------------------------------
 .../direct/UnboundedReadEvaluatorFactory.java   |  6 ++-
 .../UnboundedReadEvaluatorFactoryTest.java      | 47 +++++++++++++-------
 2 files changed, 35 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/550978f6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
index e529088..fb09b3e 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
@@ -35,6 +35,7 @@ import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
 import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -143,12 +144,13 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
           // If the reader had no elements available, but the shard is not done, reuse it later
           resultBuilder.addUnprocessedElements(
               Collections.<WindowedValue<?>>singleton(
-                  element.withValue(
+                  WindowedValue.timestampedValueInGlobalWindow(
                       UnboundedSourceShard.of(
                           shard.getSource(),
                           shard.getDeduplicator(),
                           reader,
-                          shard.getCheckpoint()))));
+                          shard.getCheckpoint()),
+                      reader.getWatermark())));
         }
       } catch (IOException e) {
         if (reader != null) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/550978f6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
index 9a7fec3..8d38275 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
@@ -49,7 +49,7 @@ import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.BigEndianLongCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.io.CountingInput;
+import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.io.CountingSource;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.UnboundedSource;
@@ -63,11 +63,11 @@ import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.util.VarInt;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
 import org.hamcrest.Matchers;
 import org.joda.time.DateTime;
-import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.joda.time.ReadableInstant;
 import org.junit.Before;
@@ -230,7 +230,7 @@ public class UnboundedReadEvaluatorFactoryTest {
     TestPipeline p = TestPipeline.create();
     // Read with a very slow rate so by the second read there are no more elements
     PCollection<Long> pcollection =
-        p.apply(CountingInput.unbounded().withRate(1L, Duration.standardDays(1)));
+        p.apply(Read.from(new TestUnboundedSource<>(VarLongCoder.of(), 1L)));
     AppliedPTransform<?, ?, ?> sourceTransform = pcollection.getProducingTransformInternal();
 
     when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle());
@@ -260,6 +260,7 @@ public class UnboundedReadEvaluatorFactoryTest {
         (WindowedValue<UnboundedSourceShard<Long, TestCheckpointMark>>)
             Iterables.getOnlyElement(result.getUnprocessedElements());
     secondEvaluator.processElement(residual);
+
     TransformResult secondResult = secondEvaluator.finishBundle();
 
     // Sanity check that nothing was output (The test would have to run for more than a day to do
@@ -268,11 +269,14 @@ public class UnboundedReadEvaluatorFactoryTest {
         secondOutput.commit(Instant.now()).getElements(),
         Matchers.<WindowedValue<Long>>emptyIterable());
 
-    // Test that even though the reader produced no outputs, there is still a residual shard.
-    UnboundedSourceShard<Long, TestCheckpointMark> residualShard =
-        (UnboundedSourceShard<Long, TestCheckpointMark>)
-            Iterables.getOnlyElement(secondResult.getUnprocessedElements()).getValue();
-    assertThat(residualShard.getExistingReader(), not(nullValue()));
+    // Test that even though the reader produced no outputs, there is still a residual shard with
+    // the updated watermark.
+    WindowedValue<UnboundedSourceShard<Long, TestCheckpointMark>> unprocessed =
+        (WindowedValue<UnboundedSourceShard<Long, TestCheckpointMark>>)
+            Iterables.getOnlyElement(secondResult.getUnprocessedElements());
+    assertThat(
+        unprocessed.getTimestamp(), Matchers.<ReadableInstant>greaterThan(residual.getTimestamp()));
+    assertThat(unprocessed.getValue().getExistingReader(), not(nullValue()));
   }
 
   @Test
@@ -377,6 +381,8 @@ public class UnboundedReadEvaluatorFactoryTest {
   }
 
   private static class TestUnboundedSource<T> extends UnboundedSource<T, TestCheckpointMark> {
+    private static int getWatermarkCalls = 0;
+
     static int readerClosedCount;
     static int readerAdvancedCount;
     private final Coder<T> coder;
@@ -398,8 +404,8 @@ public class UnboundedReadEvaluatorFactoryTest {
 
     @Override
     public UnboundedSource.UnboundedReader<T> createReader(
-        PipelineOptions options, TestCheckpointMark checkpointMark) {
-      return new TestUnboundedReader(elems);
+        PipelineOptions options, @Nullable TestCheckpointMark checkpointMark) {
+      return new TestUnboundedReader(elems, checkpointMark == null ? -1 : checkpointMark.index);
     }
 
     @Override
@@ -425,9 +431,9 @@ public class UnboundedReadEvaluatorFactoryTest {
       private final List<T> elems;
       private int index;
 
-      public TestUnboundedReader(List<T> elems) {
+      public TestUnboundedReader(List<T> elems, int startIndex) {
         this.elems = elems;
-        this.index = -1;
+        this.index = startIndex;
       }
 
       @Override
@@ -447,12 +453,13 @@ public class UnboundedReadEvaluatorFactoryTest {
 
       @Override
       public Instant getWatermark() {
-        return Instant.now();
+        getWatermarkCalls++;
+        return new Instant(index + getWatermarkCalls);
       }
 
       @Override
       public CheckpointMark getCheckpointMark() {
-        return new TestCheckpointMark();
+        return new TestCheckpointMark(index);
       }
 
       @Override
@@ -488,6 +495,12 @@ public class UnboundedReadEvaluatorFactoryTest {
   }
 
   private static class TestCheckpointMark implements CheckpointMark {
+    final int index;
+
+    private TestCheckpointMark(int index) {
+      this.index = index;
+    }
+
     @Override
     public void finalizeCheckpoint() throws IOException {}
 
@@ -497,13 +510,15 @@ public class UnboundedReadEvaluatorFactoryTest {
           TestCheckpointMark value,
           OutputStream outStream,
           org.apache.beam.sdk.coders.Coder.Context context)
-          throws CoderException, IOException {}
+          throws CoderException, IOException {
+        VarInt.encode(value.index, outStream);
+      }
 
       @Override
       public TestCheckpointMark decode(
           InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context)
           throws CoderException, IOException {
-        return new TestCheckpointMark();
+        return new TestCheckpointMark(VarInt.decodeInt(inStream));
       }
     }
   }