You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/05/03 05:08:05 UTC
[1/2] incubator-beam git commit: Use CommittedResult in
InMemoryWatermarkManager
Repository: incubator-beam
Updated Branches:
refs/heads/master 69ec223e5 -> 34e05954d
Use CommittedResult in InMemoryWatermarkManager
This enable unprocessed elements to be handled in the Watermark manager
after they are added to the CommittedResult structure.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/dff82cae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/dff82cae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/dff82cae
Branch: refs/heads/master
Commit: dff82cae2f9d6393e4bdbb7fd527f58eb2cdaa01
Parents: 659cf2e
Author: Thomas Groh <tg...@google.com>
Authored: Thu Apr 28 13:42:36 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Mon May 2 12:54:02 2016 -0700
----------------------------------------------------------------------
.../direct/InMemoryWatermarkManager.java | 19 +-
.../direct/InProcessEvaluationContext.java | 6 +-
.../direct/InMemoryWatermarkManagerTest.java | 368 ++++++++++++-------
3 files changed, 252 insertions(+), 141 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dff82cae/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java
index 769457a..4d5a3a1 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java
@@ -800,18 +800,19 @@ public class InMemoryWatermarkManager {
* </pre>.
*
* @param completed the input that has completed
- * @param transform the transform that has completed processing the input
- * @param outputs the bundles the transform has output
+ * @param timerUpdate the timers that were added, removed, and completed as part of producing
+ * this update
+ * @param result the result that was produced by processing the input
* @param earliestHold the earliest watermark hold in the transform's state. {@code null} if there
* is no hold
*/
public void updateWatermarks(
@Nullable CommittedBundle<?> completed,
- AppliedPTransform<?, ?, ?> transform,
TimerUpdate timerUpdate,
- Iterable<? extends CommittedBundle<?>> outputs,
+ CommittedResult result,
@Nullable Instant earliestHold) {
- updatePending(completed, transform, timerUpdate, outputs);
+ AppliedPTransform<?, ?, ?> transform = result.getTransform();
+ updatePending(completed, timerUpdate, result);
TransformWatermarks transformWms = transformToWatermarks.get(transform);
transformWms.setEventTimeHold(completed == null ? null : completed.getKey(), earliestHold);
refreshWatermarks(transform);
@@ -846,15 +847,14 @@ public class InMemoryWatermarkManager {
*/
private void updatePending(
CommittedBundle<?> input,
- AppliedPTransform<?, ?, ?> transform,
TimerUpdate timerUpdate,
- Iterable<? extends CommittedBundle<?>> outputs) {
- TransformWatermarks completedTransform = transformToWatermarks.get(transform);
+ CommittedResult result) {
+ TransformWatermarks completedTransform = transformToWatermarks.get(result.getTransform());
// Newly pending elements must be added before completed elements are removed, as the two
// do not share a Mutex within this call and thus can be interleaved with external calls to
// refresh.
- for (CommittedBundle<?> bundle : outputs) {
+ for (CommittedBundle<?> bundle : result.getOutputs()) {
for (AppliedPTransform<?, ?, ?> consumer : consumers.get(bundle.getPCollection())) {
TransformWatermarks watermarks = transformToWatermarks.get(consumer);
watermarks.addPending(bundle);
@@ -865,7 +865,6 @@ public class InMemoryWatermarkManager {
if (input != null) {
completedTransform.removePending(input);
}
-
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dff82cae/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
index d9a7ff0..d4f891e 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
@@ -152,11 +152,11 @@ class InProcessEvaluationContext {
Iterable<? extends CommittedBundle<?>> committedBundles =
commitBundles(result.getOutputBundles());
// Update watermarks and timers
+ CommittedResult committedResult = CommittedResult.create(result, committedBundles);
watermarkManager.updateWatermarks(
completedBundle,
- result.getTransform(),
result.getTimerUpdate().withCompletedTimers(completedTimers),
- committedBundles,
+ committedResult,
result.getWatermarkHold());
fireAllAvailableCallbacks();
// Update counters
@@ -176,7 +176,7 @@ class InProcessEvaluationContext {
applicationStateInternals.remove(stepAndKey);
}
}
- return CommittedResult.create(result, committedBundles);
+ return committedResult;
}
private Iterable<? extends CommittedBundle<?>> commitBundles(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dff82cae/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InMemoryWatermarkManagerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InMemoryWatermarkManagerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InMemoryWatermarkManagerTest.java
index 2880ade..15cdf8a 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InMemoryWatermarkManagerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InMemoryWatermarkManagerTest.java
@@ -159,8 +159,11 @@ public class InMemoryWatermarkManagerTest implements Serializable {
@Test
public void getWatermarkForUpdatedSourceTransform() {
CommittedBundle<Integer> output = multiWindowedBundle(createdInts, 1);
- manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
- Collections.<CommittedBundle<?>>singleton(output), new Instant(8000L));
+ manager.updateWatermarks(null,
+ TimerUpdate.empty(),
+ result(createdInts.getProducingTransformInternal(),
+ Collections.<CommittedBundle<?>>singleton(output)),
+ new Instant(8000L));
TransformWatermarks updatedSourceWatermark =
manager.getWatermarks(createdInts.getProducingTransformInternal());
@@ -175,8 +178,10 @@ public class InMemoryWatermarkManagerTest implements Serializable {
public void getWatermarkForMultiInputTransform() {
CommittedBundle<Integer> secondPcollectionBundle = multiWindowedBundle(intsToFlatten, -1);
- manager.updateWatermarks(null, intsToFlatten.getProducingTransformInternal(),
- TimerUpdate.empty(), Collections.<CommittedBundle<?>>singleton(secondPcollectionBundle),
+ manager.updateWatermarks(null,
+ TimerUpdate.empty(),
+ result(intsToFlatten.getProducingTransformInternal(),
+ Collections.<CommittedBundle<?>>singleton(secondPcollectionBundle)),
BoundedWindow.TIMESTAMP_MAX_VALUE);
// We didn't do anything for the first source, so we shouldn't have progressed the watermark
@@ -205,13 +210,17 @@ public class InMemoryWatermarkManagerTest implements Serializable {
CommittedBundle<Integer> flattenedBundleSecondCreate = multiWindowedBundle(flattened, -1);
// We have finished processing the bundle from the second PCollection, but we haven't consumed
// anything from the first PCollection yet; so our watermark shouldn't advance
- manager.updateWatermarks(secondPcollectionBundle, flattened.getProducingTransformInternal(),
- TimerUpdate.empty(), Collections.<CommittedBundle<?>>singleton(flattenedBundleSecondCreate),
+ manager.updateWatermarks(secondPcollectionBundle,
+ TimerUpdate.empty(),
+ result(flattened.getProducingTransformInternal(),
+ Collections.<CommittedBundle<?>>singleton(flattenedBundleSecondCreate)),
null);
TransformWatermarks transformAfterProcessing =
manager.getWatermarks(flattened.getProducingTransformInternal());
- manager.updateWatermarks(secondPcollectionBundle, flattened.getProducingTransformInternal(),
- TimerUpdate.empty(), Collections.<CommittedBundle<?>>singleton(flattenedBundleSecondCreate),
+ manager.updateWatermarks(secondPcollectionBundle,
+ TimerUpdate.empty(),
+ result(flattened.getProducingTransformInternal(),
+ Collections.<CommittedBundle<?>>singleton(flattenedBundleSecondCreate)),
null);
assertThat(
transformAfterProcessing.getInputWatermark(),
@@ -225,8 +234,10 @@ public class InMemoryWatermarkManagerTest implements Serializable {
timestampedBundle(createdInts, TimestampedValue.<Integer>of(5, firstCollectionTimestamp));
// the source is done, but elements are still buffered. The source output watermark should be
// past the end of the global window
- manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
- Collections.<CommittedBundle<?>>singleton(firstPcollectionBundle),
+ manager.updateWatermarks(null,
+ TimerUpdate.empty(),
+ result(createdInts.getProducingTransformInternal(),
+ Collections.<CommittedBundle<?>>singleton(firstPcollectionBundle)),
new Instant(Long.MAX_VALUE));
TransformWatermarks firstSourceWatermarks =
manager.getWatermarks(createdInts.getProducingTransformInternal());
@@ -253,8 +264,10 @@ public class InMemoryWatermarkManagerTest implements Serializable {
CommittedBundle<?> completedFlattenBundle =
bundleFactory.createRootBundle(flattened).commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
- manager.updateWatermarks(firstPcollectionBundle, flattened.getProducingTransformInternal(),
- TimerUpdate.empty(), Collections.<CommittedBundle<?>>singleton(completedFlattenBundle),
+ manager.updateWatermarks(firstPcollectionBundle,
+ TimerUpdate.empty(),
+ result(flattened.getProducingTransformInternal(),
+ Collections.<CommittedBundle<?>>singleton(completedFlattenBundle)),
null);
TransformWatermarks afterConsumingAllInput =
manager.getWatermarks(flattened.getProducingTransformInternal());
@@ -275,8 +288,11 @@ public class InMemoryWatermarkManagerTest implements Serializable {
CommittedBundle<Integer> createdBundle = timestampedBundle(createdInts,
TimestampedValue.of(1, new Instant(1_000_000L)), TimestampedValue.of(2, new Instant(1234L)),
TimestampedValue.of(3, new Instant(-1000L)));
- manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
- Collections.<CommittedBundle<?>>singleton(createdBundle), new Instant(Long.MAX_VALUE));
+ manager.updateWatermarks(null,
+ TimerUpdate.empty(),
+ result(createdInts.getProducingTransformInternal(),
+ Collections.<CommittedBundle<?>>singleton(createdBundle)),
+ new Instant(Long.MAX_VALUE));
TransformWatermarks createdAfterProducing =
manager.getWatermarks(createdInts.getProducingTransformInternal());
assertThat(
@@ -287,8 +303,11 @@ public class InMemoryWatermarkManagerTest implements Serializable {
timestampedBundle(keyed, TimestampedValue.of(KV.of("MyKey", 1), new Instant(1_000_000L)),
TimestampedValue.of(KV.of("MyKey", 2), new Instant(1234L)),
TimestampedValue.of(KV.of("MyKey", 3), new Instant(-1000L)));
- manager.updateWatermarks(createdBundle, keyed.getProducingTransformInternal(),
- TimerUpdate.empty(), Collections.<CommittedBundle<?>>singleton(keyBundle), null);
+ manager.updateWatermarks(createdBundle,
+ TimerUpdate.empty(),
+ result(keyed.getProducingTransformInternal(),
+ Collections.<CommittedBundle<?>>singleton(keyBundle)),
+ null);
TransformWatermarks keyedWatermarks =
manager.getWatermarks(keyed.getProducingTransformInternal());
assertThat(
@@ -303,8 +322,11 @@ public class InMemoryWatermarkManagerTest implements Serializable {
CommittedBundle<Integer> filteredBundle =
timestampedBundle(filtered, TimestampedValue.of(2, new Instant(1234L)));
- manager.updateWatermarks(createdBundle, filtered.getProducingTransformInternal(),
- TimerUpdate.empty(), Collections.<CommittedBundle<?>>singleton(filteredBundle), null);
+ manager.updateWatermarks(createdBundle,
+ TimerUpdate.empty(),
+ result(filtered.getProducingTransformInternal(),
+ Collections.<CommittedBundle<?>>singleton(filteredBundle)),
+ null);
TransformWatermarks filteredProcessedWatermarks =
manager.getWatermarks(filtered.getProducingTransformInternal());
assertThat(
@@ -322,17 +344,23 @@ public class InMemoryWatermarkManagerTest implements Serializable {
@Test
public void updateWatermarkWithWatermarkHolds() {
CommittedBundle<Integer> createdBundle = timestampedBundle(createdInts,
- TimestampedValue.of(1, new Instant(1_000_000L)), TimestampedValue.of(2, new Instant(1234L)),
+ TimestampedValue.of(1, new Instant(1_000_000L)),
+ TimestampedValue.of(2, new Instant(1234L)),
TimestampedValue.of(3, new Instant(-1000L)));
- manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
- Collections.<CommittedBundle<?>>singleton(createdBundle), new Instant(Long.MAX_VALUE));
+ manager.updateWatermarks(null,
+ TimerUpdate.empty(),
+ result(createdInts.getProducingTransformInternal(),
+ Collections.<CommittedBundle<?>>singleton(createdBundle)),
+ new Instant(Long.MAX_VALUE));
- CommittedBundle<KV<String, Integer>> keyBundle =
- timestampedBundle(keyed, TimestampedValue.of(KV.of("MyKey", 1), new Instant(1_000_000L)),
- TimestampedValue.of(KV.of("MyKey", 2), new Instant(1234L)),
- TimestampedValue.of(KV.of("MyKey", 3), new Instant(-1000L)));
- manager.updateWatermarks(createdBundle, keyed.getProducingTransformInternal(),
- TimerUpdate.empty(), Collections.<CommittedBundle<?>>singleton(keyBundle),
+ CommittedBundle<KV<String, Integer>> keyBundle = timestampedBundle(keyed,
+ TimestampedValue.of(KV.of("MyKey", 1), new Instant(1_000_000L)),
+ TimestampedValue.of(KV.of("MyKey", 2), new Instant(1234L)),
+ TimestampedValue.of(KV.of("MyKey", 3), new Instant(-1000L)));
+ manager.updateWatermarks(createdBundle,
+ TimerUpdate.empty(),
+ result(keyed.getProducingTransformInternal(),
+ Collections.<CommittedBundle<?>>singleton(keyBundle)),
new Instant(500L));
TransformWatermarks keyedWatermarks =
manager.getWatermarks(keyed.getProducingTransformInternal());
@@ -358,40 +386,54 @@ public class InMemoryWatermarkManagerTest implements Serializable {
.add(WindowedValue.timestampedValueInGlobalWindow(2, new Instant(1234L)))
.commit(clock.now());
- manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
- ImmutableList.of(firstKeyBundle, secondKeyBundle), BoundedWindow.TIMESTAMP_MAX_VALUE);
+ manager.updateWatermarks(null,
+ TimerUpdate.empty(),
+ result(createdInts.getProducingTransformInternal(),
+ ImmutableList.of(firstKeyBundle, secondKeyBundle)),
+ BoundedWindow.TIMESTAMP_MAX_VALUE);
- manager.updateWatermarks(firstKeyBundle, filtered.getProducingTransformInternal(),
- TimerUpdate.empty(), Collections.<CommittedBundle<?>>emptyList(), new Instant(-1000L));
- manager.updateWatermarks(secondKeyBundle, filtered.getProducingTransformInternal(),
- TimerUpdate.empty(), Collections.<CommittedBundle<?>>emptyList(), new Instant(1234L));
+ manager.updateWatermarks(firstKeyBundle,
+ TimerUpdate.empty(),
+ result(filtered.getProducingTransformInternal(),
+ Collections.<CommittedBundle<?>>emptyList()),
+ new Instant(-1000L));
+ manager.updateWatermarks(secondKeyBundle,
+ TimerUpdate.empty(),
+ result(filtered.getProducingTransformInternal(),
+ Collections.<CommittedBundle<?>>emptyList()),
+ new Instant(1234L));
TransformWatermarks filteredWatermarks =
manager.getWatermarks(filtered.getProducingTransformInternal());
- assertThat(
- filteredWatermarks.getInputWatermark(),
+ assertThat(filteredWatermarks.getInputWatermark(),
not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
assertThat(filteredWatermarks.getOutputWatermark(), not(laterThan(new Instant(-1000L))));
CommittedBundle<Integer> fauxFirstKeyTimerBundle =
bundleFactory.createKeyedBundle(null, "Odd", createdInts).commit(clock.now());
- manager.updateWatermarks(fauxFirstKeyTimerBundle, filtered.getProducingTransformInternal(),
- TimerUpdate.empty(), Collections.<CommittedBundle<?>>emptyList(),
+ manager.updateWatermarks(fauxFirstKeyTimerBundle,
+ TimerUpdate.empty(),
+ result(filtered.getProducingTransformInternal(),
+ Collections.<CommittedBundle<?>>emptyList()),
BoundedWindow.TIMESTAMP_MAX_VALUE);
assertThat(filteredWatermarks.getOutputWatermark(), equalTo(new Instant(1234L)));
CommittedBundle<Integer> fauxSecondKeyTimerBundle =
bundleFactory.createKeyedBundle(null, "Even", createdInts).commit(clock.now());
- manager.updateWatermarks(fauxSecondKeyTimerBundle, filtered.getProducingTransformInternal(),
- TimerUpdate.empty(), Collections.<CommittedBundle<?>>emptyList(), new Instant(5678L));
+ manager.updateWatermarks(fauxSecondKeyTimerBundle,
+ TimerUpdate.empty(),
+ result(filtered.getProducingTransformInternal(),
+ Collections.<CommittedBundle<?>>emptyList()),
+ new Instant(5678L));
assertThat(filteredWatermarks.getOutputWatermark(), equalTo(new Instant(5678L)));
- manager.updateWatermarks(fauxSecondKeyTimerBundle, filtered.getProducingTransformInternal(),
- TimerUpdate.empty(), Collections.<CommittedBundle<?>>emptyList(),
+ manager.updateWatermarks(fauxSecondKeyTimerBundle,
+ TimerUpdate.empty(),
+ result(filtered.getProducingTransformInternal(),
+ Collections.<CommittedBundle<?>>emptyList()),
BoundedWindow.TIMESTAMP_MAX_VALUE);
- assertThat(
- filteredWatermarks.getOutputWatermark(),
+ assertThat(filteredWatermarks.getOutputWatermark(),
not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
}
@@ -403,16 +445,21 @@ public class InMemoryWatermarkManagerTest implements Serializable {
public void updateOutputWatermarkShouldBeMonotonic() {
CommittedBundle<?> firstInput =
bundleFactory.createRootBundle(createdInts).commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
- manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
- Collections.<CommittedBundle<?>>singleton(firstInput), new Instant(0L));
+ manager.updateWatermarks(null, TimerUpdate.empty(),
+ result(createdInts.getProducingTransformInternal(),
+ Collections.<CommittedBundle<?>>singleton(firstInput)),
+ new Instant(0L));
TransformWatermarks firstWatermarks =
manager.getWatermarks(createdInts.getProducingTransformInternal());
assertThat(firstWatermarks.getOutputWatermark(), equalTo(new Instant(0L)));
CommittedBundle<?> secondInput =
bundleFactory.createRootBundle(createdInts).commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
- manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
- Collections.<CommittedBundle<?>>singleton(secondInput), new Instant(-250L));
+ manager.updateWatermarks(null,
+ TimerUpdate.empty(),
+ result(createdInts.getProducingTransformInternal(),
+ Collections.<CommittedBundle<?>>singleton(secondInput)),
+ new Instant(-250L));
TransformWatermarks secondWatermarks =
manager.getWatermarks(createdInts.getProducingTransformInternal());
assertThat(secondWatermarks.getOutputWatermark(), not(earlierThan(new Instant(0L))));
@@ -425,17 +472,22 @@ public class InMemoryWatermarkManagerTest implements Serializable {
@Test
public void updateWatermarkWithHoldsShouldBeMonotonic() {
CommittedBundle<Integer> createdBundle = timestampedBundle(createdInts,
- TimestampedValue.of(1, new Instant(1_000_000L)), TimestampedValue.of(2, new Instant(1234L)),
- TimestampedValue.of(3, new Instant(-1000L)));
- manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
- Collections.<CommittedBundle<?>>singleton(createdBundle), new Instant(Long.MAX_VALUE));
+ TimestampedValue.of(1, new Instant(1_000_000L)),
+ TimestampedValue.of(2, new Instant(1234L)),
+ TimestampedValue.of(3, new Instant(-1000L))); manager.updateWatermarks(null,
+ TimerUpdate.empty(),
+ result(createdInts.getProducingTransformInternal(),
+ Collections.<CommittedBundle<?>>singleton(createdBundle)),
+ new Instant(Long.MAX_VALUE));
CommittedBundle<KV<String, Integer>> keyBundle =
timestampedBundle(keyed, TimestampedValue.of(KV.of("MyKey", 1), new Instant(1_000_000L)),
TimestampedValue.of(KV.of("MyKey", 2), new Instant(1234L)),
TimestampedValue.of(KV.of("MyKey", 3), new Instant(-1000L)));
- manager.updateWatermarks(createdBundle, keyed.getProducingTransformInternal(),
- TimerUpdate.empty(), Collections.<CommittedBundle<?>>singleton(keyBundle),
+ manager.updateWatermarks(createdBundle,
+ TimerUpdate.empty(),
+ result(keyed.getProducingTransformInternal(),
+ Collections.<CommittedBundle<?>>singleton(keyBundle)),
new Instant(500L));
TransformWatermarks keyedWatermarks =
manager.getWatermarks(keyed.getProducingTransformInternal());
@@ -462,16 +514,22 @@ public class InMemoryWatermarkManagerTest implements Serializable {
CommittedBundle<Integer> createdBundle = timestampedBundle(createdInts,
TimestampedValue.of(1, sourceWatermark), TimestampedValue.of(2, new Instant(1234L)));
- manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
- Collections.<CommittedBundle<?>>singleton(createdBundle), sourceWatermark);
+ manager.updateWatermarks(null,
+ TimerUpdate.empty(),
+ result(createdInts.getProducingTransformInternal(),
+ Collections.<CommittedBundle<?>>singleton(createdBundle)),
+ sourceWatermark);
CommittedBundle<KV<String, Integer>> keyBundle =
timestampedBundle(keyed, TimestampedValue.of(KV.of("MyKey", 1), sourceWatermark),
TimestampedValue.of(KV.of("MyKey", 2), new Instant(1234L)));
// Finish processing the on-time data. The watermarks should progress to be equal to the source
- manager.updateWatermarks(createdBundle, keyed.getProducingTransformInternal(),
- TimerUpdate.empty(), Collections.<CommittedBundle<?>>singleton(keyBundle), null);
+ manager.updateWatermarks(createdBundle,
+ TimerUpdate.empty(),
+ result(keyed.getProducingTransformInternal(),
+ Collections.<CommittedBundle<?>>singleton(keyBundle)),
+ null);
TransformWatermarks onTimeWatermarks =
manager.getWatermarks(keyed.getProducingTransformInternal());
assertThat(onTimeWatermarks.getInputWatermark(), equalTo(sourceWatermark));
@@ -481,8 +539,11 @@ public class InMemoryWatermarkManagerTest implements Serializable {
timestampedBundle(createdInts, TimestampedValue.of(3, new Instant(-1000L)));
// the late data arrives in a downstream PCollection after its watermark has advanced past it;
// we don't advance the watermark past the current watermark until we've consumed the late data
- manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
- Collections.<CommittedBundle<?>>singleton(lateDataBundle), new Instant(2_000_000L));
+ manager.updateWatermarks(null,
+ TimerUpdate.empty(),
+ result(createdInts.getProducingTransformInternal(),
+ Collections.<CommittedBundle<?>>singleton(lateDataBundle)),
+ new Instant(2_000_000L));
TransformWatermarks bufferedLateWm =
manager.getWatermarks(createdInts.getProducingTransformInternal());
assertThat(bufferedLateWm.getOutputWatermark(), equalTo(new Instant(2_000_000L)));
@@ -496,30 +557,31 @@ public class InMemoryWatermarkManagerTest implements Serializable {
CommittedBundle<KV<String, Integer>> lateKeyedBundle =
timestampedBundle(keyed, TimestampedValue.of(KV.of("MyKey", 3), new Instant(-1000L)));
- manager.updateWatermarks(lateDataBundle, keyed.getProducingTransformInternal(),
- TimerUpdate.empty(), Collections.<CommittedBundle<?>>singleton(lateKeyedBundle), null);
+ manager.updateWatermarks(lateDataBundle,
+ TimerUpdate.empty(),
+ result(keyed.getProducingTransformInternal(),
+ Collections.<CommittedBundle<?>>singleton(lateKeyedBundle)),
+ null);
}
public void updateWatermarkWithDifferentWindowedValueInstances() {
manager.updateWatermarks(
null,
- createdInts.getProducingTransformInternal(),
TimerUpdate.empty(),
+ result(createdInts.getProducingTransformInternal(),
Collections.<CommittedBundle<?>>singleton(
bundleFactory
.createRootBundle(createdInts)
.add(WindowedValue.valueInGlobalWindow(1))
- .commit(Instant.now())),
+ .commit(Instant.now()))),
BoundedWindow.TIMESTAMP_MAX_VALUE);
manager.updateWatermarks(
- bundleFactory
- .createRootBundle(createdInts)
+ bundleFactory.createRootBundle(createdInts)
.add(WindowedValue.valueInGlobalWindow(1))
.commit(Instant.now()),
- keyed.getProducingTransformInternal(),
TimerUpdate.empty(),
- Collections.<CommittedBundle<?>>emptyList(),
+ result(keyed.getProducingTransformInternal(), Collections.<CommittedBundle<?>>emptyList()),
null);
TransformWatermarks onTimeWatermarks =
manager.getWatermarks(keyed.getProducingTransformInternal());
@@ -533,8 +595,10 @@ public class InMemoryWatermarkManagerTest implements Serializable {
@Test
public void getWatermarksAfterOnlyEmptyOutput() {
CommittedBundle<Integer> emptyCreateOutput = multiWindowedBundle(createdInts);
- manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
- Collections.<CommittedBundle<?>>singleton(emptyCreateOutput),
+ manager.updateWatermarks(null,
+ TimerUpdate.empty(),
+ result(createdInts.getProducingTransformInternal(),
+ Collections.<CommittedBundle<?>>singleton(emptyCreateOutput)),
BoundedWindow.TIMESTAMP_MAX_VALUE);
TransformWatermarks updatedSourceWatermarks =
manager.getWatermarks(createdInts.getProducingTransformInternal());
@@ -560,12 +624,17 @@ public class InMemoryWatermarkManagerTest implements Serializable {
@Test
public void getWatermarksAfterHoldAndEmptyOutput() {
CommittedBundle<Integer> firstCreateOutput = multiWindowedBundle(createdInts, 1, 2);
- manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
- Collections.<CommittedBundle<?>>singleton(firstCreateOutput), new Instant(12_000L));
+ manager.updateWatermarks(null,
+ TimerUpdate.empty(),
+ result(createdInts.getProducingTransformInternal(),
+ Collections.<CommittedBundle<?>>singleton(firstCreateOutput)),
+ new Instant(12_000L));
CommittedBundle<Integer> firstFilterOutput = multiWindowedBundle(filtered);
- manager.updateWatermarks(firstCreateOutput, filtered.getProducingTransformInternal(),
- TimerUpdate.empty(), Collections.<CommittedBundle<?>>singleton(firstFilterOutput),
+ manager.updateWatermarks(firstCreateOutput,
+ TimerUpdate.empty(),
+ result(filtered.getProducingTransformInternal(),
+ Collections.<CommittedBundle<?>>singleton(firstFilterOutput)),
new Instant(10_000L));
TransformWatermarks firstFilterWatermarks =
manager.getWatermarks(filtered.getProducingTransformInternal());
@@ -573,8 +642,10 @@ public class InMemoryWatermarkManagerTest implements Serializable {
assertThat(firstFilterWatermarks.getOutputWatermark(), not(laterThan(new Instant(10_000L))));
CommittedBundle<Integer> emptyCreateOutput = multiWindowedBundle(createdInts);
- manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
- Collections.<CommittedBundle<?>>singleton(emptyCreateOutput),
+ manager.updateWatermarks(null,
+ TimerUpdate.empty(),
+ result(createdInts.getProducingTransformInternal(),
+ Collections.<CommittedBundle<?>>singleton(emptyCreateOutput)),
BoundedWindow.TIMESTAMP_MAX_VALUE);
TransformWatermarks updatedSourceWatermarks =
manager.getWatermarks(createdInts.getProducingTransformInternal());
@@ -613,8 +684,11 @@ public class InMemoryWatermarkManagerTest implements Serializable {
CommittedBundle<Integer> createOutput =
bundleFactory.createRootBundle(createdInts).commit(new Instant(1250L));
- manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
- Collections.<CommittedBundle<?>>singleton(createOutput), BoundedWindow.TIMESTAMP_MAX_VALUE);
+ manager.updateWatermarks(null,
+ TimerUpdate.empty(),
+ result(createdInts.getProducingTransformInternal(),
+ Collections.<CommittedBundle<?>>singleton(createOutput)),
+ BoundedWindow.TIMESTAMP_MAX_VALUE);
TransformWatermarks createAfterUpdate =
manager.getWatermarks(createdInts.getProducingTransformInternal());
assertThat(createAfterUpdate.getSynchronizedProcessingInputTime(), equalTo(clock.now()));
@@ -639,8 +713,10 @@ public class InMemoryWatermarkManagerTest implements Serializable {
CommittedBundle<?> filterOutputBundle =
bundleFactory.createRootBundle(intsToFlatten).commit(new Instant(1250L));
- manager.updateWatermarks(createOutput, filtered.getProducingTransformInternal(),
- TimerUpdate.empty(), Collections.<CommittedBundle<?>>singleton(filterOutputBundle),
+ manager.updateWatermarks(createOutput,
+ TimerUpdate.empty(),
+ result(filtered.getProducingTransformInternal(),
+ Collections.<CommittedBundle<?>>singleton(filterOutputBundle)),
BoundedWindow.TIMESTAMP_MAX_VALUE);
TransformWatermarks filterAfterConsumed =
manager.getWatermarks(filtered.getProducingTransformInternal());
@@ -661,8 +737,10 @@ public class InMemoryWatermarkManagerTest implements Serializable {
// @Test
public void getSynchronizedProcessingTimeOutputHeldToPendingTimers() {
CommittedBundle<Integer> createdBundle = multiWindowedBundle(createdInts, 1, 2, 4, 8);
- manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
- Collections.<CommittedBundle<?>>singleton(createdBundle), new Instant(1248L));
+ manager.updateWatermarks(null, TimerUpdate.empty(),
+ result(createdInts.getProducingTransformInternal(),
+ Collections.<CommittedBundle<?>>singleton(createdBundle)),
+ new Instant(1248L));
TransformWatermarks filteredWms =
manager.getWatermarks(filtered.getProducingTransformInternal());
@@ -678,8 +756,10 @@ public class InMemoryWatermarkManagerTest implements Serializable {
TimerData.of(StateNamespaces.global(), new Instant(4096L), TimeDomain.PROCESSING_TIME);
TimerUpdate timers =
TimerUpdate.builder("key").setTimer(pastTimer).setTimer(futureTimer).build();
- manager.updateWatermarks(createdBundle, filtered.getProducingTransformInternal(), timers,
- Collections.<CommittedBundle<?>>singleton(filteredBundle),
+ manager.updateWatermarks(createdBundle,
+ timers,
+ result(filtered.getProducingTransformInternal(),
+ Collections.<CommittedBundle<?>>singleton(filteredBundle)),
BoundedWindow.TIMESTAMP_MAX_VALUE);
Instant startTime = clock.now();
clock.set(startTime.plus(250L));
@@ -712,11 +792,11 @@ public class InMemoryWatermarkManagerTest implements Serializable {
bundleFactory.createKeyedBundle(null, "key", filteredTimesTwo)
.commit(filteredWms.getSynchronizedProcessingOutputTime());
// Complete the processing time timer
- manager.updateWatermarks(filteredTimerBundle, filtered.getProducingTransformInternal(),
+ manager.updateWatermarks(filteredTimerBundle,
TimerUpdate.builder("key")
- .withCompletedTimers(Collections.<TimerData>singleton(pastTimer))
- .build(),
- Collections.<CommittedBundle<?>>singleton(filteredTimerResult),
+ .withCompletedTimers(Collections.<TimerData>singleton(pastTimer)).build(),
+ result(filtered.getProducingTransformInternal(),
+ Collections.<CommittedBundle<?>>singleton(filteredTimerResult)),
BoundedWindow.TIMESTAMP_MAX_VALUE);
clock.set(startTime.plus(500L));
@@ -726,8 +806,10 @@ public class InMemoryWatermarkManagerTest implements Serializable {
filteredDoubledWms.getSynchronizedProcessingOutputTime(),
not(earlierThan(filteredTimerResult.getSynchronizedProcessingOutputWatermark())));
- manager.updateWatermarks(filteredTimerResult, filteredTimesTwo.getProducingTransformInternal(),
- TimerUpdate.empty(), Collections.<CommittedBundle<?>>emptyList(),
+ manager.updateWatermarks(filteredTimerResult,
+ TimerUpdate.empty(),
+ result(filteredTimesTwo.getProducingTransformInternal(),
+ Collections.<CommittedBundle<?>>emptyList()),
BoundedWindow.TIMESTAMP_MAX_VALUE);
assertThat(filteredDoubledWms.getSynchronizedProcessingOutputTime(), equalTo(clock.now()));
@@ -761,18 +843,23 @@ public class InMemoryWatermarkManagerTest implements Serializable {
CommittedBundle<Integer> createOutput =
bundleFactory.createRootBundle(createdInts).commit(new Instant(1250L));
- manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
- Collections.<CommittedBundle<?>>singleton(createOutput), BoundedWindow.TIMESTAMP_MAX_VALUE);
+ manager.updateWatermarks(null,
+ TimerUpdate.empty(),
+ result(createdInts.getProducingTransformInternal(),
+ Collections.<CommittedBundle<?>>singleton(createOutput)),
+ BoundedWindow.TIMESTAMP_MAX_VALUE);
TransformWatermarks createAfterUpdate =
manager.getWatermarks(createdInts.getProducingTransformInternal());
assertThat(createAfterUpdate.getSynchronizedProcessingInputTime(), not(laterThan(clock.now())));
- assertThat(
- createAfterUpdate.getSynchronizedProcessingOutputTime(), not(laterThan(clock.now())));
+ assertThat(createAfterUpdate.getSynchronizedProcessingOutputTime(),
+ not(laterThan(clock.now())));
CommittedBundle<Integer> createSecondOutput =
bundleFactory.createRootBundle(createdInts).commit(new Instant(750L));
- manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
- Collections.<CommittedBundle<?>>singleton(createSecondOutput),
+ manager.updateWatermarks(null,
+ TimerUpdate.empty(),
+ result(createdInts.getProducingTransformInternal(),
+ Collections.<CommittedBundle<?>>singleton(createSecondOutput)),
BoundedWindow.TIMESTAMP_MAX_VALUE);
assertThat(createAfterUpdate.getSynchronizedProcessingOutputTime(), equalTo(clock.now()));
@@ -781,16 +868,20 @@ public class InMemoryWatermarkManagerTest implements Serializable {
@Test
public void synchronizedProcessingInputTimeIsHeldToUpstreamProcessingTimeTimers() {
CommittedBundle<Integer> created = multiWindowedBundle(createdInts, 1, 2, 3);
- manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
- Collections.<CommittedBundle<?>>singleton(created), new Instant(40_900L));
+ manager.updateWatermarks(null,
+ TimerUpdate.empty(),
+ result(createdInts.getProducingTransformInternal(),
+ Collections.<CommittedBundle<?>>singleton(created)),
+ new Instant(40_900L));
CommittedBundle<Integer> filteredBundle = multiWindowedBundle(filtered, 2, 4);
Instant upstreamHold = new Instant(2048L);
TimerData upstreamProcessingTimer =
TimerData.of(StateNamespaces.global(), upstreamHold, TimeDomain.PROCESSING_TIME);
- manager.updateWatermarks(created, filtered.getProducingTransformInternal(),
+ manager.updateWatermarks(created,
TimerUpdate.builder("key").setTimer(upstreamProcessingTimer).build(),
- Collections.<CommittedBundle<?>>singleton(filteredBundle),
+ result(filtered.getProducingTransformInternal(),
+ Collections.<CommittedBundle<?>>singleton(filteredBundle)),
BoundedWindow.TIMESTAMP_MAX_VALUE);
TransformWatermarks downstreamWms =
@@ -806,11 +897,12 @@ public class InMemoryWatermarkManagerTest implements Serializable {
assertThat(downstreamWms.getSynchronizedProcessingInputTime(), equalTo(upstreamHold));
CommittedBundle<Integer> otherCreated = multiWindowedBundle(createdInts, 4, 8, 12);
- manager.updateWatermarks(otherCreated, filtered.getProducingTransformInternal(),
+ manager.updateWatermarks(otherCreated,
TimerUpdate.builder("key")
- .withCompletedTimers(Collections.singleton(upstreamProcessingTimer))
- .build(),
- Collections.<CommittedBundle<?>>emptyList(), BoundedWindow.TIMESTAMP_MAX_VALUE);
+ .withCompletedTimers(Collections.singleton(upstreamProcessingTimer)).build(),
+ result(filtered.getProducingTransformInternal(),
+ Collections.<CommittedBundle<?>>emptyList()),
+ BoundedWindow.TIMESTAMP_MAX_VALUE);
assertThat(downstreamWms.getSynchronizedProcessingInputTime(), not(earlierThan(clock.now())));
}
@@ -820,9 +912,9 @@ public class InMemoryWatermarkManagerTest implements Serializable {
CommittedBundle<Integer> created = multiWindowedBundle(createdInts, 1, 2, 3);
manager.updateWatermarks(
null,
- createdInts.getProducingTransformInternal(),
TimerUpdate.empty(),
- Collections.<CommittedBundle<?>>singleton(created),
+ result(createdInts.getProducingTransformInternal(),
+ Collections.<CommittedBundle<?>>singleton(created)),
new Instant(29_919_235L));
Instant upstreamHold = new Instant(2048L);
@@ -830,9 +922,9 @@ public class InMemoryWatermarkManagerTest implements Serializable {
bundleFactory.createKeyedBundle(created, "key", filtered).commit(upstreamHold);
manager.updateWatermarks(
created,
- filtered.getProducingTransformInternal(),
TimerUpdate.empty(),
- Collections.<CommittedBundle<?>>singleton(filteredBundle),
+ result(filtered.getProducingTransformInternal(),
+ Collections.<CommittedBundle<?>>singleton(filteredBundle)),
BoundedWindow.TIMESTAMP_MAX_VALUE);
TransformWatermarks downstreamWms =
@@ -852,8 +944,10 @@ public class InMemoryWatermarkManagerTest implements Serializable {
// Advance WM of keyed past the first timer, but ahead of the second and third
CommittedBundle<Integer> createdBundle = multiWindowedBundle(filtered);
- manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
- Collections.singleton(createdBundle), new Instant(1500L));
+ manager.updateWatermarks(null,
+ TimerUpdate.empty(),
+ result(createdInts.getProducingTransformInternal(), Collections.singleton(createdBundle)),
+ new Instant(1500L));
TimerData earliestTimer =
TimerData.of(StateNamespaces.global(), new Instant(1000), TimeDomain.EVENT_TIME);
@@ -869,11 +963,10 @@ public class InMemoryWatermarkManagerTest implements Serializable {
.setTimer(lastTimer)
.build();
- manager.updateWatermarks(
- createdBundle,
- filtered.getProducingTransformInternal(),
+ manager.updateWatermarks(createdBundle,
update,
- Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(intsToFlatten)),
+ result(filtered.getProducingTransformInternal(),
+ Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(intsToFlatten))),
new Instant(1000L));
Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> firstTransformFiredTimers =
@@ -886,8 +979,11 @@ public class InMemoryWatermarkManagerTest implements Serializable {
FiredTimers firstFired = firstFilteredTimers.get(key);
assertThat(firstFired.getTimers(TimeDomain.EVENT_TIME), contains(earliestTimer));
- manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
- Collections.<CommittedBundle<?>>emptyList(), new Instant(50_000L));
+ manager.updateWatermarks(null,
+ TimerUpdate.empty(),
+ result(createdInts.getProducingTransformInternal(),
+ Collections.<CommittedBundle<?>>emptyList()),
+ new Instant(50_000L));
Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> secondTransformFiredTimers =
manager.extractFiredTimers();
assertThat(
@@ -909,8 +1005,10 @@ public class InMemoryWatermarkManagerTest implements Serializable {
// Advance WM of keyed past the first timer, but ahead of the second and third
CommittedBundle<Integer> createdBundle = multiWindowedBundle(filtered);
- manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
- Collections.singleton(createdBundle), new Instant(1500L));
+ manager.updateWatermarks(null,
+ TimerUpdate.empty(),
+ result(createdInts.getProducingTransformInternal(), Collections.singleton(createdBundle)),
+ new Instant(1500L));
TimerData earliestTimer =
TimerData.of(StateNamespaces.global(), new Instant(999L), TimeDomain.PROCESSING_TIME);
@@ -928,9 +1026,9 @@ public class InMemoryWatermarkManagerTest implements Serializable {
manager.updateWatermarks(
createdBundle,
- filtered.getProducingTransformInternal(),
update,
- Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(intsToFlatten)),
+ result(filtered.getProducingTransformInternal(),
+ Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(intsToFlatten))),
new Instant(1000L));
Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> firstTransformFiredTimers =
@@ -944,8 +1042,11 @@ public class InMemoryWatermarkManagerTest implements Serializable {
assertThat(firstFired.getTimers(TimeDomain.PROCESSING_TIME), contains(earliestTimer));
clock.set(new Instant(50_000L));
- manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
- Collections.<CommittedBundle<?>>emptyList(), new Instant(50_000L));
+ manager.updateWatermarks(null,
+ TimerUpdate.empty(),
+ result(createdInts.getProducingTransformInternal(),
+ Collections.<CommittedBundle<?>>emptyList()),
+ new Instant(50_000L));
Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> secondTransformFiredTimers =
manager.extractFiredTimers();
assertThat(
@@ -967,8 +1068,10 @@ public class InMemoryWatermarkManagerTest implements Serializable {
// Advance WM of keyed past the first timer, but ahead of the second and third
CommittedBundle<Integer> createdBundle = multiWindowedBundle(filtered);
- manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
- Collections.singleton(createdBundle), new Instant(1500L));
+ manager.updateWatermarks(null,
+ TimerUpdate.empty(),
+ result(createdInts.getProducingTransformInternal(), Collections.singleton(createdBundle)),
+ new Instant(1500L));
TimerData earliestTimer = TimerData.of(
StateNamespaces.global(), new Instant(999L), TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
@@ -986,9 +1089,9 @@ public class InMemoryWatermarkManagerTest implements Serializable {
manager.updateWatermarks(
createdBundle,
- filtered.getProducingTransformInternal(),
update,
- Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(intsToFlatten)),
+ result(filtered.getProducingTransformInternal(),
+ Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(intsToFlatten))),
new Instant(1000L));
Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> firstTransformFiredTimers =
@@ -1003,8 +1106,11 @@ public class InMemoryWatermarkManagerTest implements Serializable {
firstFired.getTimers(TimeDomain.SYNCHRONIZED_PROCESSING_TIME), contains(earliestTimer));
clock.set(new Instant(50_000L));
- manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
- Collections.<CommittedBundle<?>>emptyList(), new Instant(50_000L));
+ manager.updateWatermarks(null,
+ TimerUpdate.empty(),
+ result(createdInts.getProducingTransformInternal(),
+ Collections.<CommittedBundle<?>>emptyList()),
+ new Instant(50_000L));
Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> secondTransformFiredTimers =
manager.extractFiredTimers();
assertThat(
@@ -1133,7 +1239,6 @@ public class InMemoryWatermarkManagerTest implements Serializable {
ReadableInstant instant = (ReadableInstant) item;
return instant.isAfter(shouldBeEarlier);
}
-
@Override
public void describeTo(Description description) {
description.appendText("later than ").appendValue(shouldBeEarlier);
@@ -1165,4 +1270,11 @@ public class InMemoryWatermarkManagerTest implements Serializable {
}
return bundle.commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
}
+
+ private final CommittedResult result(
+ AppliedPTransform<?, ?, ?> transform,
+ Iterable<? extends CommittedBundle<?>> bundles) {
+ return CommittedResult.create(StepTransformResult.withoutHold(transform)
+ .build(), bundles);
+ }
}
[2/2] incubator-beam git commit: This closes #265
Posted by ke...@apache.org.
This closes #265
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/34e05954
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/34e05954
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/34e05954
Branch: refs/heads/master
Commit: 34e05954dc70610cb519348dc6b7ced1934bf574
Parents: 69ec223 dff82ca
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon May 2 20:07:35 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon May 2 20:07:35 2016 -0700
----------------------------------------------------------------------
.../direct/InMemoryWatermarkManager.java | 19 +-
.../direct/InProcessEvaluationContext.java | 6 +-
.../direct/InMemoryWatermarkManagerTest.java | 368 ++++++++++++-------
3 files changed, 252 insertions(+), 141 deletions(-)
----------------------------------------------------------------------