You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2016/11/08 02:05:15 UTC
[1/2] incubator-beam git commit: Revert "Update Watermarks even if a
Reader is empty"
Repository: incubator-beam
Updated Branches:
refs/heads/master 339dee954 -> 99505e125
Revert "Update Watermarks even if a Reader is empty"
This reverts commit ff7fe07be96de393b763e7b3d213734040aa3795.
Updated test appears to be broken.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e2856fbf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e2856fbf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e2856fbf
Branch: refs/heads/master
Commit: e2856fbf076d34b7856391eafdfcfeb71bc6d7b2
Parents: 339dee9
Author: Thomas Groh <tg...@google.com>
Authored: Mon Nov 7 18:02:44 2016 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Mon Nov 7 18:02:44 2016 -0800
----------------------------------------------------------------------
.../direct/UnboundedReadEvaluatorFactory.java | 6 ++----
.../UnboundedReadEvaluatorFactoryTest.java | 19 ++++++-------------
2 files changed, 8 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2856fbf/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
index fb09b3e..e529088 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
@@ -35,7 +35,6 @@ import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.WindowedValue;
@@ -144,13 +143,12 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
// If the reader had no elements available, but the shard is not done, reuse it later
resultBuilder.addUnprocessedElements(
Collections.<WindowedValue<?>>singleton(
- WindowedValue.timestampedValueInGlobalWindow(
+ element.withValue(
UnboundedSourceShard.of(
shard.getSource(),
shard.getDeduplicator(),
reader,
- shard.getCheckpoint()),
- reader.getWatermark())));
+ shard.getCheckpoint()))));
}
} catch (IOException e) {
if (reader != null) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2856fbf/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
index 18c7cec..9a7fec3 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
@@ -260,7 +260,6 @@ public class UnboundedReadEvaluatorFactoryTest {
(WindowedValue<UnboundedSourceShard<Long, TestCheckpointMark>>)
Iterables.getOnlyElement(result.getUnprocessedElements());
secondEvaluator.processElement(residual);
-
TransformResult secondResult = secondEvaluator.finishBundle();
// Sanity check that nothing was output (The test would have to run for more than a day to do
@@ -269,14 +268,11 @@ public class UnboundedReadEvaluatorFactoryTest {
secondOutput.commit(Instant.now()).getElements(),
Matchers.<WindowedValue<Long>>emptyIterable());
- // Test that even though the reader produced no outputs, there is still a residual shard with
- // the updated watermark.
- WindowedValue<UnboundedSourceShard<Long, TestCheckpointMark>> unprocessed =
- (WindowedValue<UnboundedSourceShard<Long, TestCheckpointMark>>)
- Iterables.getOnlyElement(secondResult.getUnprocessedElements());
- assertThat(
- unprocessed.getTimestamp(), Matchers.<ReadableInstant>greaterThan(residual.getTimestamp()));
- assertThat(unprocessed.getValue().getExistingReader(), not(nullValue()));
+ // Test that even though the reader produced no outputs, there is still a residual shard.
+ UnboundedSourceShard<Long, TestCheckpointMark> residualShard =
+ (UnboundedSourceShard<Long, TestCheckpointMark>)
+ Iterables.getOnlyElement(secondResult.getUnprocessedElements()).getValue();
+ assertThat(residualShard.getExistingReader(), not(nullValue()));
}
@Test
@@ -381,8 +377,6 @@ public class UnboundedReadEvaluatorFactoryTest {
}
private static class TestUnboundedSource<T> extends UnboundedSource<T, TestCheckpointMark> {
- private static int getWatermarkCalls = 0;
-
static int readerClosedCount;
static int readerAdvancedCount;
private final Coder<T> coder;
@@ -453,8 +447,7 @@ public class UnboundedReadEvaluatorFactoryTest {
@Override
public Instant getWatermark() {
- getWatermarkCalls++;
- return new Instant(index + getWatermarkCalls);
+ return Instant.now();
}
@Override
[2/2] incubator-beam git commit: This closes #1303
Posted by tg...@apache.org.
This closes #1303
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/99505e12
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/99505e12
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/99505e12
Branch: refs/heads/master
Commit: 99505e1256082824aebab3da26128a1e52fd7c17
Parents: 339dee9 e2856fb
Author: Thomas Groh <tg...@google.com>
Authored: Mon Nov 7 18:04:51 2016 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Mon Nov 7 18:04:51 2016 -0800
----------------------------------------------------------------------
.../direct/UnboundedReadEvaluatorFactory.java | 6 ++----
.../UnboundedReadEvaluatorFactoryTest.java | 19 ++++++-------------
2 files changed, 8 insertions(+), 17 deletions(-)
----------------------------------------------------------------------