You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2016/11/08 18:19:08 UTC
[2/2] incubator-beam git commit: Update Watermarks even if a Reader
is empty
Update Watermarks even if a Reader is empty
This ensures that the pipeline will make progress even if a reader stops
producing elements.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/550978f6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/550978f6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/550978f6
Branch: refs/heads/master
Commit: 550978f630ee1e4424e981ddc5ff5e89aa8c797d
Parents: bfc527d
Author: Thomas Groh <tg...@google.com>
Authored: Mon Nov 7 12:59:06 2016 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Nov 8 10:17:13 2016 -0800
----------------------------------------------------------------------
.../direct/UnboundedReadEvaluatorFactory.java | 6 ++-
.../UnboundedReadEvaluatorFactoryTest.java | 47 +++++++++++++-------
2 files changed, 35 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/550978f6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
index e529088..fb09b3e 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
@@ -35,6 +35,7 @@ import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.WindowedValue;
@@ -143,12 +144,13 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
// If the reader had no elements available, but the shard is not done, reuse it later
resultBuilder.addUnprocessedElements(
Collections.<WindowedValue<?>>singleton(
- element.withValue(
+ WindowedValue.timestampedValueInGlobalWindow(
UnboundedSourceShard.of(
shard.getSource(),
shard.getDeduplicator(),
reader,
- shard.getCheckpoint()))));
+ shard.getCheckpoint()),
+ reader.getWatermark())));
}
} catch (IOException e) {
if (reader != null) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/550978f6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
index 9a7fec3..8d38275 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
@@ -49,7 +49,7 @@ import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.BigEndianLongCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.io.CountingInput;
+import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.io.CountingSource;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.UnboundedSource;
@@ -63,11 +63,11 @@ import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.util.VarInt;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.hamcrest.Matchers;
import org.joda.time.DateTime;
-import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;
import org.junit.Before;
@@ -230,7 +230,7 @@ public class UnboundedReadEvaluatorFactoryTest {
TestPipeline p = TestPipeline.create();
// Read with a very slow rate so by the second read there are no more elements
PCollection<Long> pcollection =
- p.apply(CountingInput.unbounded().withRate(1L, Duration.standardDays(1)));
+ p.apply(Read.from(new TestUnboundedSource<>(VarLongCoder.of(), 1L)));
AppliedPTransform<?, ?, ?> sourceTransform = pcollection.getProducingTransformInternal();
when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle());
@@ -260,6 +260,7 @@ public class UnboundedReadEvaluatorFactoryTest {
(WindowedValue<UnboundedSourceShard<Long, TestCheckpointMark>>)
Iterables.getOnlyElement(result.getUnprocessedElements());
secondEvaluator.processElement(residual);
+
TransformResult secondResult = secondEvaluator.finishBundle();
// Sanity check that nothing was output (The test would have to run for more than a day to do
@@ -268,11 +269,14 @@ public class UnboundedReadEvaluatorFactoryTest {
secondOutput.commit(Instant.now()).getElements(),
Matchers.<WindowedValue<Long>>emptyIterable());
- // Test that even though the reader produced no outputs, there is still a residual shard.
- UnboundedSourceShard<Long, TestCheckpointMark> residualShard =
- (UnboundedSourceShard<Long, TestCheckpointMark>)
- Iterables.getOnlyElement(secondResult.getUnprocessedElements()).getValue();
- assertThat(residualShard.getExistingReader(), not(nullValue()));
+ // Test that even though the reader produced no outputs, there is still a residual shard with
+ // the updated watermark.
+ WindowedValue<UnboundedSourceShard<Long, TestCheckpointMark>> unprocessed =
+ (WindowedValue<UnboundedSourceShard<Long, TestCheckpointMark>>)
+ Iterables.getOnlyElement(secondResult.getUnprocessedElements());
+ assertThat(
+ unprocessed.getTimestamp(), Matchers.<ReadableInstant>greaterThan(residual.getTimestamp()));
+ assertThat(unprocessed.getValue().getExistingReader(), not(nullValue()));
}
@Test
@@ -377,6 +381,8 @@ public class UnboundedReadEvaluatorFactoryTest {
}
private static class TestUnboundedSource<T> extends UnboundedSource<T, TestCheckpointMark> {
+ private static int getWatermarkCalls = 0;
+
static int readerClosedCount;
static int readerAdvancedCount;
private final Coder<T> coder;
@@ -398,8 +404,8 @@ public class UnboundedReadEvaluatorFactoryTest {
@Override
public UnboundedSource.UnboundedReader<T> createReader(
- PipelineOptions options, TestCheckpointMark checkpointMark) {
- return new TestUnboundedReader(elems);
+ PipelineOptions options, @Nullable TestCheckpointMark checkpointMark) {
+ return new TestUnboundedReader(elems, checkpointMark == null ? -1 : checkpointMark.index);
}
@Override
@@ -425,9 +431,9 @@ public class UnboundedReadEvaluatorFactoryTest {
private final List<T> elems;
private int index;
- public TestUnboundedReader(List<T> elems) {
+ public TestUnboundedReader(List<T> elems, int startIndex) {
this.elems = elems;
- this.index = -1;
+ this.index = startIndex;
}
@Override
@@ -447,12 +453,13 @@ public class UnboundedReadEvaluatorFactoryTest {
@Override
public Instant getWatermark() {
- return Instant.now();
+ getWatermarkCalls++;
+ return new Instant(index + getWatermarkCalls);
}
@Override
public CheckpointMark getCheckpointMark() {
- return new TestCheckpointMark();
+ return new TestCheckpointMark(index);
}
@Override
@@ -488,6 +495,12 @@ public class UnboundedReadEvaluatorFactoryTest {
}
private static class TestCheckpointMark implements CheckpointMark {
+ final int index;
+
+ private TestCheckpointMark(int index) {
+ this.index = index;
+ }
+
@Override
public void finalizeCheckpoint() throws IOException {}
@@ -497,13 +510,15 @@ public class UnboundedReadEvaluatorFactoryTest {
TestCheckpointMark value,
OutputStream outStream,
org.apache.beam.sdk.coders.Coder.Context context)
- throws CoderException, IOException {}
+ throws CoderException, IOException {
+ VarInt.encode(value.index, outStream);
+ }
@Override
public TestCheckpointMark decode(
InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context)
throws CoderException, IOException {
- return new TestCheckpointMark();
+ return new TestCheckpointMark(VarInt.decodeInt(inStream));
}
}
}