You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2016/11/08 02:05:15 UTC

[1/2] incubator-beam git commit: Revert "Update Watermarks even if a Reader is empty"

Repository: incubator-beam
Updated Branches:
  refs/heads/master 339dee954 -> 99505e125


Revert "Update Watermarks even if a Reader is empty"

This reverts commit ff7fe07be96de393b763e7b3d213734040aa3795.

Updated test appears to be broken.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e2856fbf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e2856fbf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e2856fbf

Branch: refs/heads/master
Commit: e2856fbf076d34b7856391eafdfcfeb71bc6d7b2
Parents: 339dee9
Author: Thomas Groh <tg...@google.com>
Authored: Mon Nov 7 18:02:44 2016 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Mon Nov 7 18:02:44 2016 -0800

----------------------------------------------------------------------
 .../direct/UnboundedReadEvaluatorFactory.java    |  6 ++----
 .../UnboundedReadEvaluatorFactoryTest.java       | 19 ++++++-------------
 2 files changed, 8 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2856fbf/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
index fb09b3e..e529088 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
@@ -35,7 +35,6 @@ import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
 import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -144,13 +143,12 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
           // If the reader had no elements available, but the shard is not done, reuse it later
           resultBuilder.addUnprocessedElements(
               Collections.<WindowedValue<?>>singleton(
-                  WindowedValue.timestampedValueInGlobalWindow(
+                  element.withValue(
                       UnboundedSourceShard.of(
                           shard.getSource(),
                           shard.getDeduplicator(),
                           reader,
-                          shard.getCheckpoint()),
-                      reader.getWatermark())));
+                          shard.getCheckpoint()))));
         }
       } catch (IOException e) {
         if (reader != null) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2856fbf/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
index 18c7cec..9a7fec3 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
@@ -260,7 +260,6 @@ public class UnboundedReadEvaluatorFactoryTest {
         (WindowedValue<UnboundedSourceShard<Long, TestCheckpointMark>>)
             Iterables.getOnlyElement(result.getUnprocessedElements());
     secondEvaluator.processElement(residual);
-
     TransformResult secondResult = secondEvaluator.finishBundle();
 
     // Sanity check that nothing was output (The test would have to run for more than a day to do
@@ -269,14 +268,11 @@ public class UnboundedReadEvaluatorFactoryTest {
         secondOutput.commit(Instant.now()).getElements(),
         Matchers.<WindowedValue<Long>>emptyIterable());
 
-    // Test that even though the reader produced no outputs, there is still a residual shard with
-    // the updated watermark.
-    WindowedValue<UnboundedSourceShard<Long, TestCheckpointMark>> unprocessed =
-        (WindowedValue<UnboundedSourceShard<Long, TestCheckpointMark>>)
-            Iterables.getOnlyElement(secondResult.getUnprocessedElements());
-    assertThat(
-        unprocessed.getTimestamp(), Matchers.<ReadableInstant>greaterThan(residual.getTimestamp()));
-    assertThat(unprocessed.getValue().getExistingReader(), not(nullValue()));
+    // Test that even though the reader produced no outputs, there is still a residual shard.
+    UnboundedSourceShard<Long, TestCheckpointMark> residualShard =
+        (UnboundedSourceShard<Long, TestCheckpointMark>)
+            Iterables.getOnlyElement(secondResult.getUnprocessedElements()).getValue();
+    assertThat(residualShard.getExistingReader(), not(nullValue()));
   }
 
   @Test
@@ -381,8 +377,6 @@ public class UnboundedReadEvaluatorFactoryTest {
   }
 
   private static class TestUnboundedSource<T> extends UnboundedSource<T, TestCheckpointMark> {
-    private static int getWatermarkCalls = 0;
-
     static int readerClosedCount;
     static int readerAdvancedCount;
     private final Coder<T> coder;
@@ -453,8 +447,7 @@ public class UnboundedReadEvaluatorFactoryTest {
 
       @Override
       public Instant getWatermark() {
-        getWatermarkCalls++;
-        return new Instant(index + getWatermarkCalls);
+        return Instant.now();
       }
 
       @Override


[2/2] incubator-beam git commit: This closes #1303

Posted by tg...@apache.org.
This closes #1303


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/99505e12
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/99505e12
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/99505e12

Branch: refs/heads/master
Commit: 99505e1256082824aebab3da26128a1e52fd7c17
Parents: 339dee9 e2856fb
Author: Thomas Groh <tg...@google.com>
Authored: Mon Nov 7 18:04:51 2016 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Mon Nov 7 18:04:51 2016 -0800

----------------------------------------------------------------------
 .../direct/UnboundedReadEvaluatorFactory.java    |  6 ++----
 .../UnboundedReadEvaluatorFactoryTest.java       | 19 ++++++-------------
 2 files changed, 8 insertions(+), 17 deletions(-)
----------------------------------------------------------------------