You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/08/19 16:21:05 UTC
[1/4] incubator-beam git commit: Add TestStream to the Testing package
Repository: incubator-beam
Updated Branches:
refs/heads/master bfa3b70ab -> 8d31ca0ca
Add TestStream to the Testing package
This is a source suitable for use with tests that have interesting
triggering behavior. It is an Unbounded source that emits elements in
bundles, and advances the watermark and processing time appropriately.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c72d4fcd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c72d4fcd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c72d4fcd
Branch: refs/heads/master
Commit: c72d4fcd4ca68c00c7edc6094976228a7e999953
Parents: f15fab8
Author: Thomas Groh <tg...@google.com>
Authored: Mon Aug 15 19:43:28 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Fri Aug 19 09:04:18 2016 -0700
----------------------------------------------------------------------
runners/direct-java/pom.xml | 3 +
.../org/apache/beam/sdk/testing/TestStream.java | 326 +++++++++++++++++++
.../apache/beam/sdk/testing/TestStreamTest.java | 169 ++++++++++
3 files changed, 498 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c72d4fcd/runners/direct-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/direct-java/pom.xml b/runners/direct-java/pom.xml
index e06883f..8b0f91d 100644
--- a/runners/direct-java/pom.xml
+++ b/runners/direct-java/pom.xml
@@ -85,6 +85,9 @@
<dependency>org.apache.beam:beam-sdks-java-core</dependency>
<dependency>org.apache.beam:beam-runners-java-core</dependency>
</dependenciesToScan>
+ <excludes>
+ <exclude>org/apache/beam/sdk/testing/TestStreamTest.java</exclude>
+ </excludes>
<systemPropertyVariables>
<beamTestPipelineOptions>
[
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c72d4fcd/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
new file mode 100644
index 0000000..6d11f72
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.testing;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.DurationCoder;
+import org.apache.beam.sdk.coders.InstantCoder;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.StandardCoder;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.PropertyNames;
+import org.apache.beam.sdk.util.VarInt;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollection.IsBounded;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TimestampedValue.TimestampedValueCoder;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.joda.time.ReadableDuration;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * A testing input that generates an unbounded {@link PCollection} of elements, advancing the
+ * watermark and processing time as elements are emitted. After all of the specified elements are
+ * emitted, ceases to produce output.
+ *
+ * <p>Each call to a {@link TestStream.Builder} method will only be reflected in the state of the
+ * {@link Pipeline} after each method before it has completed and no more progress can be made by
+ * the {@link Pipeline}. A {@link PipelineRunner} must ensure that no more progress can be made in
+ * the {@link Pipeline} before advancing the state of the {@link TestStream}.
+ */
+public final class TestStream<T> extends PTransform<PBegin, PCollection<T>> {
+ private final List<Event<T>> events;
+ private final Coder<T> coder;
+
+ /**
+ * Create a new {@link TestStream.Builder} with no elements and watermark equal to {@link
+ * BoundedWindow#TIMESTAMP_MIN_VALUE}.
+ */
+ public static <T> Builder<T> create(Coder<T> coder) {
+ return new Builder<>(coder);
+ }
+
+ private TestStream(Coder<T> coder, List<Event<T>> events) {
+ this.coder = coder;
+ this.events = checkNotNull(events);
+ }
+
+ public Coder<Event<T>> getEventCoder() {
+ return EventCoder.of(coder);
+ }
+
+ /**
+ * An incomplete {@link TestStream}. Elements added to this builder will be produced in sequence
+ * when the pipeline created by the {@link TestStream} is run.
+ */
+ public static class Builder<T> {
+ private final Coder<T> coder;
+ private final ImmutableList.Builder<Event<T>> events;
+ private Instant currentWatermark;
+
+ private Builder(Coder<T> coder) {
+ this.coder = coder;
+ events = ImmutableList.builder();
+
+ currentWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
+ }
+
+ /**
+ * Adds the specified elements to the source with timestamp equal to the current watermark.
+ *
+ * @return this {@link TestStream.Builder}
+ */
+ @SafeVarargs
+ public final Builder<T> addElements(T element, T... elements) {
+ TimestampedValue<T> firstElement = TimestampedValue.of(element, currentWatermark);
+ @SuppressWarnings("unchecked")
+ TimestampedValue<T>[] remainingElements = new TimestampedValue[elements.length];
+ for (int i = 0; i < elements.length; i++) {
+ remainingElements[i] = TimestampedValue.of(elements[i], currentWatermark);
+ }
+ return addElements(firstElement, remainingElements);
+ }
+
+ /**
+ * Adds the specified elements to the source with the provided timestamps.
+ *
+ * @return this {@link TestStream.Builder}
+ */
+ @SafeVarargs
+ public final Builder<T> addElements(
+ TimestampedValue<T> element, TimestampedValue<T>... elements) {
+ events.add(ElementEvent.add(element, elements));
+ return this;
+ }
+
+ /**
+ * Advance the watermark of this source to the specified instant.
+ *
+ * <p>The watermark must advance monotonically and to at most {@link
+ * BoundedWindow#TIMESTAMP_MAX_VALUE}.
+ *
+ * @return this {@link TestStream.Builder}
+ */
+ public Builder<T> advanceWatermarkTo(Instant newWatermark) {
+ checkArgument(
+ newWatermark.isAfter(currentWatermark), "The watermark must monotonically advance");
+ checkArgument(
+ newWatermark.isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE),
+ "The Watermark cannot progress beyond the maximum. Got: %s. Maximum: %s",
+ newWatermark,
+ BoundedWindow.TIMESTAMP_MAX_VALUE);
+ events.add(WatermarkEvent.<T>advanceTo(newWatermark));
+ currentWatermark = newWatermark;
+ return this;
+ }
+
+ /**
+ * Advance the processing time by the specified amount.
+ *
+ * @return this {@link TestStream.Builder}
+ */
+ public Builder<T> advanceProcessingTime(Duration amount) {
+ checkArgument(
+ amount.getMillis() > 0,
+ "Must advance the processing time by a positive amount. Got: ",
+ amount);
+ events.add(ProcessingTimeEvent.<T>advanceBy(amount));
+ return this;
+ }
+
+ /**
+ * Advance the watermark to infinity, completing this {@link TestStream}. Future calls to the
+ * same builder will not affect the returned {@link TestStream}.
+ */
+ public TestStream<T> advanceWatermarkToInfinity() {
+ events.add(WatermarkEvent.<T>advanceTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
+ return new TestStream<>(coder, events.build());
+ }
+ }
+
+ /**
+ * An event in a {@link TestStream}. A marker interface for all events that happen while
+ * evaluating a {@link TestStream}.
+ */
+ public interface Event<T> {
+ EventType getType();
+ }
+
+ /**
+ * The types of {@link Event} that are supported by {@link TestStream}.
+ */
+ public enum EventType {
+ ELEMENT,
+ WATERMARK,
+ PROCESSING_TIME
+ }
+
+ /** A {@link Event} that produces elements. */
+ @AutoValue
+ public abstract static class ElementEvent<T> implements Event<T> {
+ public abstract Iterable<TimestampedValue<T>> getElements();
+
+ @SafeVarargs
+ static <T> Event<T> add(TimestampedValue<T> element, TimestampedValue<T>... elements) {
+ return add(ImmutableList.<TimestampedValue<T>>builder().add(element).add(elements).build());
+ }
+
+ static <T> Event<T> add(Iterable<TimestampedValue<T>> elements) {
+ return new AutoValue_TestStream_ElementEvent<>(EventType.ELEMENT, elements);
+ }
+ }
+
+ /** A {@link Event} that advances the watermark. */
+ @AutoValue
+ public abstract static class WatermarkEvent<T> implements Event<T> {
+ public abstract Instant getWatermark();
+
+ static <T> Event<T> advanceTo(Instant newWatermark) {
+ return new AutoValue_TestStream_WatermarkEvent<>(EventType.WATERMARK, newWatermark);
+ }
+ }
+
+ /** A {@link Event} that advances the processing time clock. */
+ @AutoValue
+ public abstract static class ProcessingTimeEvent<T> implements Event<T> {
+ public abstract Duration getProcessingTimeAdvance();
+
+ static <T> Event<T> advanceBy(Duration amount) {
+ return new AutoValue_TestStream_ProcessingTimeEvent<>(EventType.PROCESSING_TIME, amount);
+ }
+ }
+
+ @Override
+ public PCollection<T> apply(PBegin input) {
+ return PCollection.<T>createPrimitiveOutputInternal(
+ input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED)
+ .setCoder(coder);
+ }
+
+ public List<Event<T>> getStreamEvents() {
+ return events;
+ }
+
+ /**
+ * A {@link Coder} that encodes and decodes {@link TestStream.Event Events}.
+ *
+ * @param <T> the type of elements in {@link ElementEvent ElementEvents} encoded and decoded by
+ * this {@link EventCoder}
+ */
+ @VisibleForTesting
+ static final class EventCoder<T> extends StandardCoder<Event<T>> {
+ private static final Coder<ReadableDuration> DURATION_CODER = DurationCoder.of();
+ private static final Coder<Instant> INSTANT_CODER = InstantCoder.of();
+ private final Coder<T> valueCoder;
+ private final Coder<Iterable<TimestampedValue<T>>> elementCoder;
+
+ public static <T> EventCoder<T> of(Coder<T> valueCoder) {
+ return new EventCoder<>(valueCoder);
+ }
+
+ @JsonCreator
+ public static <T> EventCoder<T> of(
+ @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) List<? extends Coder<?>> components) {
+ checkArgument(
+ components.size() == 1,
+ "Was expecting exactly one component coder, got %s",
+ components.size());
+ return new EventCoder<>((Coder<T>) components.get(0));
+ }
+
+ private EventCoder(Coder<T> valueCoder) {
+ this.valueCoder = valueCoder;
+ this.elementCoder = IterableCoder.of(TimestampedValueCoder.of(valueCoder));
+ }
+
+ @Override
+ public void encode(
+ Event<T> value, OutputStream outStream, Context context)
+ throws IOException {
+ VarInt.encode(value.getType().ordinal(), outStream);
+ switch (value.getType()) {
+ case ELEMENT:
+ Iterable<TimestampedValue<T>> elems = ((ElementEvent<T>) value).getElements();
+ elementCoder.encode(elems, outStream, context);
+ break;
+ case WATERMARK:
+ Instant ts = ((WatermarkEvent<T>) value).getWatermark();
+ INSTANT_CODER.encode(ts, outStream, context);
+ break;
+ case PROCESSING_TIME:
+ Duration processingAdvance = ((ProcessingTimeEvent<T>) value).getProcessingTimeAdvance();
+ DURATION_CODER.encode(processingAdvance, outStream, context);
+ break;
+ default:
+ throw new AssertionError("Unreachable");
+ }
+ }
+
+ @Override
+ public Event<T> decode(
+ InputStream inStream, Context context) throws IOException {
+ switch (EventType.values()[VarInt.decodeInt(inStream)]) {
+ case ELEMENT:
+ Iterable<TimestampedValue<T>> elements = elementCoder.decode(inStream, context);
+ return ElementEvent.add(elements);
+ case WATERMARK:
+ return WatermarkEvent.advanceTo(INSTANT_CODER.decode(inStream, context));
+ case PROCESSING_TIME:
+ return ProcessingTimeEvent.advanceBy(
+ DURATION_CODER.decode(inStream, context).toDuration());
+ default:
+ throw new AssertionError("Unreachable");
+ }
+ }
+
+ @Override
+ public List<? extends Coder<?>> getCoderArguments() {
+ return Collections.singletonList(valueCoder);
+ }
+
+ @Override
+ public void verifyDeterministic() throws NonDeterministicException {
+ elementCoder.verifyDeterministic();
+ DURATION_CODER.verifyDeterministic();
+ INSTANT_CODER.verifyDeterministic();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c72d4fcd/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
new file mode 100644
index 0000000..09bccfa
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.testing;
+
+import static org.hamcrest.Matchers.allOf;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.junit.Assert.assertThat;
+
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.io.Serializable;
+
+/**
+ * Tests for {@link TestStream}.
+ */
+@RunWith(JUnit4.class)
+public class TestStreamTest implements Serializable {
+ @Test
+ @Category(NeedsRunner.class)
+ public void testLateDataAccumulating() {
+ Instant instant = new Instant(0);
+ TestStream<Integer> source = TestStream.create(VarIntCoder.of())
+ .addElements(TimestampedValue.of(1, instant),
+ TimestampedValue.of(2, instant),
+ TimestampedValue.of(3, instant))
+ .advanceWatermarkTo(instant.plus(Duration.standardMinutes(6)))
+ // These elements are late but within the allowed lateness
+ .addElements(TimestampedValue.of(4, instant), TimestampedValue.of(5, instant))
+ .advanceWatermarkTo(instant.plus(Duration.standardMinutes(20)))
+ // These elements are droppably late
+ .addElements(TimestampedValue.of(-1, instant),
+ TimestampedValue.of(-2, instant),
+ TimestampedValue.of(-3, instant))
+ .advanceWatermarkToInfinity();
+
+ TestPipeline p = TestPipeline.create();
+ PCollection<Integer> windowed = p
+ .apply(source)
+ .apply(Window.<Integer>into(FixedWindows.of(Duration.standardMinutes(5))).triggering(
+ AfterWatermark.pastEndOfWindow()
+ .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
+ .plusDelayOf(Duration.standardMinutes(2)))
+ .withLateFirings(AfterPane.elementCountAtLeast(1)))
+ .accumulatingFiredPanes()
+ .withAllowedLateness(Duration.standardMinutes(5), ClosingBehavior.FIRE_ALWAYS));
+ PCollection<Integer> triggered = windowed.apply(WithKeys.<Integer, Integer>of(1))
+ .apply(GroupByKey.<Integer, Integer>create())
+ .apply(Values.<Iterable<Integer>>create())
+ .apply(Flatten.<Integer>iterables());
+ PCollection<Long> count = windowed.apply(Count.<Integer>globally().withoutDefaults());
+ PCollection<Integer> sum = windowed.apply(Sum.integersGlobally().withoutDefaults());
+
+ IntervalWindow window = new IntervalWindow(instant, instant.plus(Duration.standardMinutes(5L)));
+ PAssert.that(triggered)
+ .inFinalPane(window)
+ .containsInAnyOrder(1, 2, 3, 4, 5);
+ PAssert.that(triggered)
+ .inOnTimePane(window)
+ .containsInAnyOrder(1, 2, 3);
+ PAssert.that(count)
+ .inWindow(window)
+ .satisfies(new SerializableFunction<Iterable<Long>, Void>() {
+ @Override
+ public Void apply(Iterable<Long> input) {
+ for (Long count : input) {
+ assertThat(count, allOf(greaterThanOrEqualTo(3L), lessThanOrEqualTo(5L)));
+ }
+ return null;
+ }
+ });
+ PAssert.that(sum)
+ .inWindow(window)
+ .satisfies(new SerializableFunction<Iterable<Integer>, Void>() {
+ @Override
+ public Void apply(Iterable<Integer> input) {
+ for (Integer sum : input) {
+ assertThat(sum, allOf(greaterThanOrEqualTo(6), lessThanOrEqualTo(15)));
+ }
+ return null;
+ }
+ });
+
+ p.run();
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void testProcessingTimeTrigger() {
+ TestStream<Long> source = TestStream.create(VarLongCoder.of())
+ .addElements(TimestampedValue.of(1L, new Instant(1000L)),
+ TimestampedValue.of(2L, new Instant(2000L)))
+ .advanceProcessingTime(Duration.standardMinutes(12))
+ .addElements(TimestampedValue.of(3L, new Instant(3000L)))
+ .advanceProcessingTime(Duration.standardMinutes(6))
+ .advanceWatermarkToInfinity();
+
+ TestPipeline p = TestPipeline.create();
+ PCollection<Long> sum = p.apply(source)
+ .apply(Window.<Long>triggering(AfterWatermark.pastEndOfWindow()
+ .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
+ .plusDelayOf(Duration.standardMinutes(5)))).accumulatingFiredPanes()
+ .withAllowedLateness(Duration.ZERO))
+ .apply(Sum.longsGlobally());
+
+ PAssert.that(sum).inEarlyGlobalWindowPanes().containsInAnyOrder(3L, 6L);
+
+ p.run();
+ }
+
+ @Test
+ public void testEncodeDecode() throws Exception {
+ TestStream.Event<Integer> elems =
+ TestStream.ElementEvent.add(
+ TimestampedValue.of(1, new Instant()),
+ TimestampedValue.of(-10, new Instant()),
+ TimestampedValue.of(Integer.MAX_VALUE, new Instant()));
+ TestStream.Event<Integer> wm = TestStream.WatermarkEvent.advanceTo(new Instant(100));
+ TestStream.Event<Integer> procTime =
+ TestStream.ProcessingTimeEvent.advanceBy(Duration.millis(90548));
+
+ TestStream.EventCoder<Integer> coder = TestStream.EventCoder.of(VarIntCoder.of());
+
+ CoderProperties.coderSerializable(coder);
+ CoderProperties.coderDecodeEncodeEqual(coder, elems);
+ CoderProperties.coderDecodeEncodeEqual(coder, wm);
+ CoderProperties.coderDecodeEncodeEqual(coder, procTime);
+ }
+}
[3/4] incubator-beam git commit: Implement TestStream in the
DirectRunner
Posted by lc...@apache.org.
Implement TestStream in the DirectRunner
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a5ef9a96
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a5ef9a96
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a5ef9a96
Branch: refs/heads/master
Commit: a5ef9a9689147105854f12d3ea054b3887a94e24
Parents: c72d4fc
Author: Thomas Groh <tg...@google.com>
Authored: Mon Aug 15 19:45:58 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Fri Aug 19 09:04:19 2016 -0700
----------------------------------------------------------------------
runners/direct-java/pom.xml | 3 -
.../beam/runners/direct/DirectOptions.java | 45 +---
.../beam/runners/direct/DirectRunner.java | 47 ++++-
.../beam/runners/direct/EvaluationContext.java | 10 +-
.../FixedThreadPoolExecutorServiceFactory.java | 45 ----
.../beam/runners/direct/NanosOffsetClock.java | 13 --
.../direct/TestStreamEvaluatorFactory.java | 204 +++++++++++++++++++
.../direct/TransformEvaluatorRegistry.java | 11 +
.../direct/WriteWithShardingFactory.java | 2 +-
.../runners/direct/EvaluationContextTest.java | 1 +
.../org/apache/beam/sdk/testing/TestStream.java | 114 +++++++----
.../apache/beam/sdk/testing/TestStreamTest.java | 159 +++++++++++++++
12 files changed, 508 insertions(+), 146 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a5ef9a96/runners/direct-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/direct-java/pom.xml b/runners/direct-java/pom.xml
index 8b0f91d..e06883f 100644
--- a/runners/direct-java/pom.xml
+++ b/runners/direct-java/pom.xml
@@ -85,9 +85,6 @@
<dependency>org.apache.beam:beam-sdks-java-core</dependency>
<dependency>org.apache.beam:beam-runners-java-core</dependency>
</dependenciesToScan>
- <excludes>
- <exclude>org/apache/beam/sdk/testing/TestStreamTest.java</exclude>
- </excludes>
<systemPropertyVariables>
<beamTestPipelineOptions>
[
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a5ef9a96/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java
index 3901c04..798fda4 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java
@@ -17,59 +17,16 @@
*/
package org.apache.beam.runners.direct;
-import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.ApplicationNameOptions;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
-import org.apache.beam.sdk.options.Hidden;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.Validation.Required;
-import org.apache.beam.sdk.transforms.PTransform;
-
-import com.fasterxml.jackson.annotation.JsonIgnore;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
/**
* Options that can be used to configure the {@link org.apache.beam.runners.direct.DirectRunner}.
*/
public interface DirectOptions extends PipelineOptions, ApplicationNameOptions {
- /**
- * Gets the {@link ExecutorServiceFactory} to use to create instances of {@link ExecutorService}
- * to execute {@link PTransform PTransforms}.
- *
- * <p>Note that {@link ExecutorService ExecutorServices} returned by the factory must ensure that
- * it cannot enter a state in which it will not schedule additional pending work unless currently
- * scheduled work completes, as this may cause the {@link Pipeline} to cease processing.
- *
- * <p>Defaults to a {@link FixedThreadPoolExecutorServiceFactory}, which produces instances of
- * {@link Executors#newCachedThreadPool()}.
- */
- @JsonIgnore
- @Required
- @Hidden
- @Default.InstanceFactory(FixedThreadPoolExecutorServiceFactory.class)
- ExecutorServiceFactory getExecutorServiceFactory();
-
- void setExecutorServiceFactory(ExecutorServiceFactory executorService);
-
- /**
- * Gets the {@link Clock} used by this pipeline. The clock is used in place of accessing the
- * system time when time values are required by the evaluator.
- */
- @Default.InstanceFactory(NanosOffsetClock.Factory.class)
- @JsonIgnore
- @Required
- @Hidden
- @Description(
- "The processing time source used by the pipeline. When the current time is "
- + "needed by the evaluator, the result of clock#now() is used.")
- Clock getClock();
-
- void setClock(Clock clock);
-
- @Default.Boolean(false)
+ @Default.Boolean(true)
@Description(
"If the pipeline should shut down producers which have reached the maximum "
+ "representable watermark. If this is set to true, a pipeline in which all PTransforms "
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a5ef9a96/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index f2b781e..68184de 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.direct;
import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly;
import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult;
+import org.apache.beam.runners.direct.TestStreamEvaluatorFactory.DirectTestStreamFactory;
import org.apache.beam.runners.direct.ViewEvaluatorFactory.ViewOverrideFactory;
import org.apache.beam.sdk.AggregatorRetrievalException;
import org.apache.beam.sdk.AggregatorValues;
@@ -29,6 +30,7 @@ import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.io.Write;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.GroupByKey;
@@ -46,6 +48,7 @@ import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.PValue;
import com.google.common.base.MoreObjects;
+import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
@@ -58,6 +61,7 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
/**
* An In-Memory implementation of the Dataflow Programming Model. Supports Unbounded
@@ -76,8 +80,9 @@ public class DirectRunner
private static Map<Class<? extends PTransform>, PTransformOverrideFactory>
defaultTransformOverrides =
ImmutableMap.<Class<? extends PTransform>, PTransformOverrideFactory>builder()
- .put(GroupByKey.class, new DirectGroupByKeyOverrideFactory())
.put(CreatePCollectionView.class, new ViewOverrideFactory())
+ .put(GroupByKey.class, new DirectGroupByKeyOverrideFactory())
+ .put(TestStream.class, new DirectTestStreamFactory())
.put(Write.Bound.class, new WriteWithShardingFactory())
.build();
@@ -175,6 +180,8 @@ public class DirectRunner
////////////////////////////////////////////////////////////////////////////////////////////////
private final DirectOptions options;
+ private Supplier<ExecutorService> executorServiceSupplier = new FixedThreadPoolSupplier();
+ private Supplier<Clock> clockSupplier = new NanosOffsetClockSupplier();
public static DirectRunner fromOptions(PipelineOptions options) {
return new DirectRunner(options.as(DirectOptions.class));
@@ -191,6 +198,14 @@ public class DirectRunner
return options;
}
+ Supplier<Clock> getClockSupplier() {
+ return clockSupplier;
+ }
+
+ void setClockSupplier(Supplier<Clock> supplier) {
+ this.clockSupplier = supplier;
+ }
+
@Override
public <OutputT extends POutput, InputT extends PInput> OutputT apply(
PTransform<InputT, OutputT> transform, InputT input) {
@@ -223,6 +238,7 @@ public class DirectRunner
EvaluationContext context =
EvaluationContext.create(
getPipelineOptions(),
+ clockSupplier.get(),
createBundleFactory(getPipelineOptions()),
consumerTrackingVisitor.getRootTransforms(),
consumerTrackingVisitor.getValueToConsumers(),
@@ -230,14 +246,15 @@ public class DirectRunner
consumerTrackingVisitor.getViews());
// independent executor service for each run
- ExecutorService executorService =
- context.getPipelineOptions().getExecutorServiceFactory().create();
+ ExecutorService executorService = executorServiceSupplier.get();
+
+ TransformEvaluatorRegistry registry = TransformEvaluatorRegistry.defaultRegistry();
PipelineExecutor executor =
ExecutorServiceParallelExecutor.create(
executorService,
consumerTrackingVisitor.getValueToConsumers(),
keyedPValueVisitor.getKeyedPValues(),
- TransformEvaluatorRegistry.defaultRegistry(),
+ registry,
defaultModelEnforcements(options),
context);
executor.start(consumerTrackingVisitor.getRootTransforms());
@@ -392,4 +409,26 @@ public class DirectRunner
"DirectPipelineResult does not support waitUntilFinish.");
}
}
+
+ /**
+ * A {@link Supplier} that creates a {@link ExecutorService} based on
+ * {@link Executors#newFixedThreadPool(int)}.
+ */
+ private static class FixedThreadPoolSupplier implements Supplier<ExecutorService> {
+ @Override
+ public ExecutorService get() {
+ return Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
+ }
+ }
+
+
+ /**
+ * A {@link Supplier} that creates a {@link NanosOffsetClock}.
+ */
+ private static class NanosOffsetClockSupplier implements Supplier<Clock> {
+ @Override
+ public Clock get() {
+ return NanosOffsetClock.create();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a5ef9a96/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
index 94f28e2..b9f159a 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
@@ -102,24 +102,26 @@ class EvaluationContext {
public static EvaluationContext create(
DirectOptions options,
+ Clock clock,
BundleFactory bundleFactory,
Collection<AppliedPTransform<?, ?, ?>> rootTransforms,
Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers,
Map<AppliedPTransform<?, ?, ?>, String> stepNames,
Collection<PCollectionView<?>> views) {
return new EvaluationContext(
- options, bundleFactory, rootTransforms, valueToConsumers, stepNames, views);
+ options, clock, bundleFactory, rootTransforms, valueToConsumers, stepNames, views);
}
private EvaluationContext(
DirectOptions options,
+ Clock clock,
BundleFactory bundleFactory,
Collection<AppliedPTransform<?, ?, ?>> rootTransforms,
Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers,
Map<AppliedPTransform<?, ?, ?>, String> stepNames,
Collection<PCollectionView<?>> views) {
this.options = checkNotNull(options);
- this.clock = options.getClock();
+ this.clock = clock;
this.bundleFactory = checkNotNull(bundleFactory);
checkNotNull(rootTransforms);
checkNotNull(valueToConsumers);
@@ -433,4 +435,8 @@ class EvaluationContext {
public Instant now() {
return clock.now();
}
+
+ Clock getClock() {
+ return clock;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a5ef9a96/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FixedThreadPoolExecutorServiceFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FixedThreadPoolExecutorServiceFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FixedThreadPoolExecutorServiceFactory.java
deleted file mode 100644
index 74c4292..0000000
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FixedThreadPoolExecutorServiceFactory.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct;
-
-import org.apache.beam.sdk.options.DefaultValueFactory;
-import org.apache.beam.sdk.options.PipelineOptions;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-/**
- * A {@link ExecutorServiceFactory} that produces fixed thread pools via
- * {@link Executors#newFixedThreadPool(int)}, with the number of threads equal to the available
- * processors as provided by {@link Runtime#availableProcessors()}.
- */
-class FixedThreadPoolExecutorServiceFactory
- implements DefaultValueFactory<ExecutorServiceFactory>, ExecutorServiceFactory {
- private static final FixedThreadPoolExecutorServiceFactory INSTANCE =
- new FixedThreadPoolExecutorServiceFactory();
-
- @Override
- public ExecutorServiceFactory create(PipelineOptions options) {
- return INSTANCE;
- }
-
- @Override
- public ExecutorService create() {
- return Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a5ef9a96/runners/direct-java/src/main/java/org/apache/beam/runners/direct/NanosOffsetClock.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/NanosOffsetClock.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/NanosOffsetClock.java
index ffdee9d..77fa196 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/NanosOffsetClock.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/NanosOffsetClock.java
@@ -17,9 +17,6 @@
*/
package org.apache.beam.runners.direct;
-import org.apache.beam.sdk.options.DefaultValueFactory;
-import org.apache.beam.sdk.options.PipelineOptions;
-
import org.joda.time.Instant;
import java.util.concurrent.TimeUnit;
@@ -46,14 +43,4 @@ public class NanosOffsetClock implements Clock {
baseMillis + (TimeUnit.MILLISECONDS.convert(
System.nanoTime() - nanosAtBaseMillis, TimeUnit.NANOSECONDS)));
}
-
- /**
- * Creates instances of {@link NanosOffsetClock}.
- */
- public static class Factory implements DefaultValueFactory<Clock> {
- @Override
- public Clock create(PipelineOptions options) {
- return new NanosOffsetClock();
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a5ef9a96/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
new file mode 100644
index 0000000..90a83b0
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.testing.TestStream;
+import org.apache.beam.sdk.testing.TestStream.ElementEvent;
+import org.apache.beam.sdk.testing.TestStream.Event;
+import org.apache.beam.sdk.testing.TestStream.EventType;
+import org.apache.beam.sdk.testing.TestStream.ProcessingTimeEvent;
+import org.apache.beam.sdk.testing.TestStream.WatermarkEvent;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollection.IsBounded;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.TimestampedValue;
+
+import com.google.common.base.Supplier;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.annotation.Nullable;
+
+/**
+ * The {@link TransformEvaluatorFactory} for the {@link TestStream} primitive.
+ */
+class TestStreamEvaluatorFactory implements TransformEvaluatorFactory {
+ private final AtomicBoolean inUse = new AtomicBoolean(false);
+ private final AtomicReference<Evaluator<?>> evaluator = new AtomicReference<>();
+
+ @Nullable
+ @Override
+ public <InputT> TransformEvaluator<InputT> forApplication(
+ AppliedPTransform<?, ?, ?> application,
+ @Nullable CommittedBundle<?> inputBundle,
+ EvaluationContext evaluationContext) throws Exception {
+ return createEvaluator((AppliedPTransform) application, evaluationContext);
+ }
+
+ @Override
+ public void cleanup() throws Exception {}
+
+ private <InputT, OutputT> TransformEvaluator<? super InputT> createEvaluator(
+ AppliedPTransform<PBegin, PCollection<OutputT>, TestStream<OutputT>> application,
+ EvaluationContext evaluationContext) {
+ if (evaluator.get() == null) {
+ Evaluator<OutputT> createdEvaluator = new Evaluator<>(application, evaluationContext, inUse);
+ evaluator.compareAndSet(null, createdEvaluator);
+ }
+ if (inUse.compareAndSet(false, true)) {
+ return evaluator.get();
+ } else {
+ return null;
+ }
+ }
+
+ private static class Evaluator<T> implements TransformEvaluator<Object> {
+ private final AppliedPTransform<PBegin, PCollection<T>, TestStream<T>> application;
+ private final EvaluationContext context;
+ private final AtomicBoolean inUse;
+ private final List<Event<T>> events;
+ private int index;
+ private Instant currentWatermark;
+
+ private Evaluator(
+ AppliedPTransform<PBegin, PCollection<T>, TestStream<T>> application,
+ EvaluationContext context,
+ AtomicBoolean inUse) {
+ this.application = application;
+ this.context = context;
+ this.inUse = inUse;
+ this.events = application.getTransform().getEvents();
+ index = 0;
+ currentWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
+ }
+
+ @Override
+ public void processElement(WindowedValue<Object> element) throws Exception {
+ }
+
+ @Override
+ public TransformResult finishBundle() throws Exception {
+ if (index >= events.size()) {
+ return StepTransformResult.withHold(application, BoundedWindow.TIMESTAMP_MAX_VALUE).build();
+ }
+ Event<T> event = events.get(index);
+ if (event.getType().equals(EventType.WATERMARK)) {
+ currentWatermark = ((WatermarkEvent<T>) event).getWatermark();
+ }
+ StepTransformResult.Builder result =
+ StepTransformResult.withHold(application, currentWatermark);
+ if (event.getType().equals(EventType.ELEMENT)) {
+ UncommittedBundle<T> bundle = context.createRootBundle(application.getOutput());
+ for (TimestampedValue<T> elem : ((ElementEvent<T>) event).getElements()) {
+ bundle.add(WindowedValue.timestampedValueInGlobalWindow(elem.getValue(),
+ elem.getTimestamp()));
+ }
+ result.addOutput(bundle);
+ }
+ if (event.getType().equals(EventType.PROCESSING_TIME)) {
+ ((TestClock) context.getClock())
+ .advance(((ProcessingTimeEvent<T>) event).getProcessingTimeAdvance());
+ }
+ index++;
+ checkState(inUse.compareAndSet(true, false),
+ "The InUse flag of a %s was changed while the source evaluator was executing. "
+ + "%s cannot be split or evaluated in parallel.",
+ TestStream.class.getSimpleName(),
+ TestStream.class.getSimpleName());
+ return result.build();
+ }
+ }
+
+ private static class TestClock implements Clock {
+ private final AtomicReference<Instant> currentTime =
+ new AtomicReference<>(BoundedWindow.TIMESTAMP_MIN_VALUE);
+
+ public void advance(Duration amount) {
+ Instant now = currentTime.get();
+ currentTime.compareAndSet(now, now.plus(amount));
+ }
+
+ @Override
+ public Instant now() {
+ return currentTime.get();
+ }
+ }
+
+ private static class TestClockSupplier implements Supplier<Clock> {
+ @Override
+ public Clock get() {
+ return new TestClock();
+ }
+ }
+
+ static class DirectTestStreamFactory implements PTransformOverrideFactory {
+ @Override
+ public <InputT extends PInput, OutputT extends POutput> PTransform<InputT, OutputT> override(
+ PTransform<InputT, OutputT> transform) {
+ if (transform instanceof TestStream) {
+ return (PTransform<InputT, OutputT>)
+ new DirectTestStream<OutputT>((TestStream<OutputT>) transform);
+ }
+ return transform;
+ }
+
+ private static class DirectTestStream<T> extends PTransform<PBegin, PCollection<T>> {
+ private final TestStream<T> original;
+
+ private DirectTestStream(TestStream transform) {
+ this.original = transform;
+ }
+
+ @Override
+ public PCollection<T> apply(PBegin input) {
+ setup(input.getPipeline());
+ return PCollection.<T>createPrimitiveOutputInternal(
+ input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED)
+ .setCoder(original.getValueCoder());
+ }
+
+ private void setup(Pipeline p) {
+ PipelineRunner runner = p.getRunner();
+ checkState(runner instanceof DirectRunner,
+ "%s can only be used when running with the %s",
+ getClass().getSimpleName(),
+ DirectRunner.class.getSimpleName());
+ ((DirectRunner) runner).setClockSupplier(new TestClockSupplier());
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a5ef9a96/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
index b469237..c35e8b1 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
@@ -23,6 +23,7 @@ import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupAlsoByWindow;
import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly;
import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.Flatten.FlattenPCollectionList;
import org.apache.beam.sdk.transforms.PTransform;
@@ -61,6 +62,7 @@ class TransformEvaluatorRegistry implements TransformEvaluatorFactory {
// Runner-specific primitives used in expansion of GroupByKey
.put(DirectGroupByKeyOnly.class, new GroupByKeyOnlyEvaluatorFactory())
.put(DirectGroupAlsoByWindow.class, new GroupAlsoByWindowEvaluatorFactory())
+ .put(TestStream.class, new TestStreamEvaluatorFactory())
.build();
return new TransformEvaluatorRegistry(primitives);
}
@@ -117,4 +119,13 @@ class TransformEvaluatorRegistry implements TransformEvaluatorFactory {
throw toThrow;
}
}
+
+ /**
+ * A factory to create Transform Evaluator Registries.
+ */
+ public static class Factory {
+ public TransformEvaluatorRegistry create() {
+ return TransformEvaluatorRegistry.defaultRegistry();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a5ef9a96/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
index c2157b8..1ab3403 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
@@ -66,7 +66,7 @@ class WriteWithShardingFactory implements PTransformOverrideFactory {
return transform;
}
- private static class DynamicallyReshardedWrite <T> extends PTransform<PCollection<T>, PDone> {
+ private static class DynamicallyReshardedWrite<T> extends PTransform<PCollection<T>, PDone> {
private final transient Write.Bound<T> original;
private DynamicallyReshardedWrite(Bound<T> original) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a5ef9a96/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
index d4b5773..7ac0caa 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
@@ -117,6 +117,7 @@ public class EvaluationContextTest {
context =
EvaluationContext.create(
runner.getPipelineOptions(),
+ NanosOffsetClock.create(),
ImmutableListBundleFactory.create(),
rootTransforms,
valueToConsumers,
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a5ef9a96/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
index 6d11f72..e2eda32 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
@@ -32,10 +32,8 @@ import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.PropertyNames;
import org.apache.beam.sdk.util.VarInt;
-import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollection.IsBounded;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TimestampedValue.TimestampedValueCoder;
@@ -83,30 +81,30 @@ public final class TestStream<T> extends PTransform<PBegin, PCollection<T>> {
this.events = checkNotNull(events);
}
- public Coder<Event<T>> getEventCoder() {
- return EventCoder.of(coder);
- }
-
/**
* An incomplete {@link TestStream}. Elements added to this builder will be produced in sequence
* when the pipeline created by the {@link TestStream} is run.
*/
public static class Builder<T> {
private final Coder<T> coder;
- private final ImmutableList.Builder<Event<T>> events;
- private Instant currentWatermark;
+ private final ImmutableList<Event<T>> events;
+ private final Instant currentWatermark;
private Builder(Coder<T> coder) {
- this.coder = coder;
- events = ImmutableList.builder();
+ this(coder, ImmutableList.<Event<T>>of(), BoundedWindow.TIMESTAMP_MIN_VALUE);
+ }
- currentWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
+ private Builder(Coder<T> coder, ImmutableList<Event<T>> events, Instant currentWatermark) {
+ this.coder = coder;
+ this.events = events;
+ this.currentWatermark = currentWatermark;
}
/**
* Adds the specified elements to the source with timestamp equal to the current watermark.
*
- * @return this {@link TestStream.Builder}
+ * @return A {@link TestStream.Builder} like this one that will add the provided elements
+ * after all earlier events have completed.
*/
@SafeVarargs
public final Builder<T> addElements(T element, T... elements) {
@@ -122,22 +120,40 @@ public final class TestStream<T> extends PTransform<PBegin, PCollection<T>> {
/**
* Adds the specified elements to the source with the provided timestamps.
*
- * @return this {@link TestStream.Builder}
+ * @return A {@link TestStream.Builder} like this one that will add the provided elements
+ * after all earlier events have completed.
*/
@SafeVarargs
public final Builder<T> addElements(
TimestampedValue<T> element, TimestampedValue<T>... elements) {
- events.add(ElementEvent.add(element, elements));
- return this;
+ checkArgument(
+ element.getTimestamp().isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE),
+ "Elements must have timestamps before %s. Got: %s",
+ BoundedWindow.TIMESTAMP_MAX_VALUE,
+ element.getTimestamp());
+ for (TimestampedValue<T> multiElement : elements) {
+ checkArgument(
+ multiElement.getTimestamp().isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE),
+ "Elements must have timestamps before %s. Got: %s",
+ BoundedWindow.TIMESTAMP_MAX_VALUE,
+ multiElement.getTimestamp());
+ }
+ ImmutableList<Event<T>> newEvents =
+ ImmutableList.<Event<T>>builder()
+ .addAll(events)
+ .add(ElementEvent.add(element, elements))
+ .build();
+ return new Builder<T>(coder, newEvents, currentWatermark);
}
/**
* Advance the watermark of this source to the specified instant.
*
- * <p>The watermark must advance monotonically and to at most {@link
- * BoundedWindow#TIMESTAMP_MAX_VALUE}.
+ * <p>The watermark must advance monotonically and cannot advance to {@link
+ * BoundedWindow#TIMESTAMP_MAX_VALUE} or beyond.
*
- * @return this {@link TestStream.Builder}
+ * @return A {@link TestStream.Builder} like this one that will advance the watermark to the
+ * specified point after all earlier events have completed.
*/
public Builder<T> advanceWatermarkTo(Instant newWatermark) {
checkArgument(
@@ -147,23 +163,30 @@ public final class TestStream<T> extends PTransform<PBegin, PCollection<T>> {
"The Watermark cannot progress beyond the maximum. Got: %s. Maximum: %s",
newWatermark,
BoundedWindow.TIMESTAMP_MAX_VALUE);
- events.add(WatermarkEvent.<T>advanceTo(newWatermark));
- currentWatermark = newWatermark;
- return this;
+ ImmutableList<Event<T>> newEvents = ImmutableList.<Event<T>>builder()
+ .addAll(events)
+ .add(WatermarkEvent.<T>advanceTo(newWatermark))
+ .build();
+ return new Builder<T>(coder, newEvents, newWatermark);
}
/**
* Advance the processing time by the specified amount.
*
- * @return this {@link TestStream.Builder}
+ * @return A {@link TestStream.Builder} like this one that will advance the processing time by
+ * the specified amount after all earlier events have completed.
*/
public Builder<T> advanceProcessingTime(Duration amount) {
checkArgument(
amount.getMillis() > 0,
"Must advance the processing time by a positive amount. Got: ",
amount);
- events.add(ProcessingTimeEvent.<T>advanceBy(amount));
- return this;
+ ImmutableList<Event<T>> newEvents =
+ ImmutableList.<Event<T>>builder()
+ .addAll(events)
+ .add(ProcessingTimeEvent.<T>advanceBy(amount))
+ .build();
+ return new Builder<T>(coder, newEvents, currentWatermark);
}
/**
@@ -171,8 +194,12 @@ public final class TestStream<T> extends PTransform<PBegin, PCollection<T>> {
* same builder will not affect the returned {@link TestStream}.
*/
public TestStream<T> advanceWatermarkToInfinity() {
- events.add(WatermarkEvent.<T>advanceTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
- return new TestStream<>(coder, events.build());
+ ImmutableList<Event<T>> newEvents =
+ ImmutableList.<Event<T>>builder()
+ .addAll(events)
+ .add(WatermarkEvent.<T>advanceTo(BoundedWindow.TIMESTAMP_MAX_VALUE))
+ .build();
+ return new TestStream<>(coder, newEvents);
}
}
@@ -230,12 +257,30 @@ public final class TestStream<T> extends PTransform<PBegin, PCollection<T>> {
@Override
public PCollection<T> apply(PBegin input) {
- return PCollection.<T>createPrimitiveOutputInternal(
- input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED)
- .setCoder(coder);
+ throw new IllegalStateException(
+ String.format(
+ "Pipeline Runner %s does not provide a required override for %s",
+ input.getPipeline().getRunner().getClass().getSimpleName(),
+ getClass().getSimpleName()));
+ }
+
+ public Coder<T> getValueCoder() {
+ return coder;
+ }
+
+ /**
+ * Returns a coder suitable for encoding {@link TestStream.Event}.
+ */
+ public Coder<Event<T>> getEventCoder() {
+ return EventCoder.of(coder);
}
- public List<Event<T>> getStreamEvents() {
+ /**
+ * Returns the sequence of {@link Event Events} in this {@link TestStream}.
+ *
+ * <p>For use by {@link PipelineRunner} authors.
+ */
+ public List<Event<T>> getEvents() {
return events;
}
@@ -243,7 +288,7 @@ public final class TestStream<T> extends PTransform<PBegin, PCollection<T>> {
* A {@link Coder} that encodes and decodes {@link TestStream.Event Events}.
*
* @param <T> the type of elements in {@link ElementEvent ElementEvents} encoded and decoded by
- * this {@link EventCoder}
+ * this {@link EventCoder}
*/
@VisibleForTesting
static final class EventCoder<T> extends StandardCoder<Event<T>> {
@@ -290,14 +335,15 @@ public final class TestStream<T> extends PTransform<PBegin, PCollection<T>> {
DURATION_CODER.encode(processingAdvance, outStream, context);
break;
default:
- throw new AssertionError("Unreachable");
+ throw new AssertionError("Unreachable: Unsupported Event Type " + value.getType());
}
}
@Override
public Event<T> decode(
InputStream inStream, Context context) throws IOException {
- switch (EventType.values()[VarInt.decodeInt(inStream)]) {
+ EventType eventType = EventType.values()[VarInt.decodeInt(inStream)];
+ switch (eventType) {
case ELEMENT:
Iterable<TimestampedValue<T>> elements = elementCoder.decode(inStream, context);
return ElementEvent.add(elements);
@@ -307,7 +353,7 @@ public final class TestStream<T> extends PTransform<PBegin, PCollection<T>> {
return ProcessingTimeEvent.advanceBy(
DURATION_CODER.decode(inStream, context).toDuration());
default:
- throw new AssertionError("Unreachable");
+ throw new AssertionError("Unreachable: Unsupported Event Type " + eventType);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a5ef9a96/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
index 09bccfa..df37d7f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
@@ -23,8 +23,13 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.junit.Assert.assertThat;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.TestStream.Builder;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
@@ -35,8 +40,12 @@ import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.windowing.AfterPane;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.Never;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
import org.apache.beam.sdk.values.PCollection;
@@ -44,8 +53,10 @@ import org.apache.beam.sdk.values.TimestampedValue;
import org.joda.time.Duration;
import org.joda.time.Instant;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -56,6 +67,8 @@ import java.io.Serializable;
*/
@RunWith(JUnit4.class)
public class TestStreamTest implements Serializable {
+ @Rule public transient ExpectedException thrown = ExpectedException.none();
+
@Test
@Category(NeedsRunner.class)
public void testLateDataAccumulating() {
@@ -149,6 +162,152 @@ public class TestStreamTest implements Serializable {
}
@Test
+ @Category(NeedsRunner.class)
+ public void testDiscardingMode() {
+ TestStream<String> stream =
+ TestStream.create(StringUtf8Coder.of())
+ .advanceWatermarkTo(new Instant(0))
+ .addElements(
+ TimestampedValue.of("firstPane", new Instant(100)),
+ TimestampedValue.of("alsoFirstPane", new Instant(200)))
+ .addElements(TimestampedValue.of("onTimePane", new Instant(500)))
+ .advanceWatermarkTo(new Instant(1001L))
+ .addElements(
+ TimestampedValue.of("finalLatePane", new Instant(750)),
+ TimestampedValue.of("alsoFinalLatePane", new Instant(250)))
+ .advanceWatermarkToInfinity();
+
+ TestPipeline p = TestPipeline.create();
+ FixedWindows windowFn = FixedWindows.of(Duration.millis(1000L));
+ Duration allowedLateness = Duration.millis(5000L);
+ PCollection<String> values =
+ p.apply(stream)
+ .apply(
+ Window.<String>into(windowFn)
+ .triggering(
+ AfterWatermark.pastEndOfWindow()
+ .withEarlyFirings(AfterPane.elementCountAtLeast(2))
+ .withLateFirings(Never.ever()))
+ .discardingFiredPanes()
+ .withAllowedLateness(allowedLateness))
+ .apply(WithKeys.<Integer, String>of(1))
+ .apply(GroupByKey.<Integer, String>create())
+ .apply(Values.<Iterable<String>>create())
+ .apply(Flatten.<String>iterables());
+
+ IntervalWindow window = windowFn.assignWindow(new Instant(100));
+ PAssert.that(values)
+ .inWindow(window)
+ .containsInAnyOrder(
+ "firstPane", "alsoFirstPane", "onTimePane", "finalLatePane", "alsoFinalLatePane");
+ PAssert.that(values)
+ .inCombinedNonLatePanes(window)
+ .containsInAnyOrder("firstPane", "alsoFirstPane", "onTimePane");
+ PAssert.that(values).inOnTimePane(window).containsInAnyOrder("onTimePane");
+ PAssert.that(values)
+ .inFinalPane(window)
+ .containsInAnyOrder("finalLatePane", "alsoFinalLatePane");
+
+ p.run();
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void testFirstElementLate() {
+ Instant lateElementTimestamp = new Instant(-1_000_000);
+ TestStream<String> stream =
+ TestStream.create(StringUtf8Coder.of())
+ .advanceWatermarkTo(new Instant(0))
+ .addElements(TimestampedValue.of("late", lateElementTimestamp))
+ .addElements(TimestampedValue.of("onTime", new Instant(100)))
+ .advanceWatermarkToInfinity();
+
+ TestPipeline p = TestPipeline.create();
+ FixedWindows windowFn = FixedWindows.of(Duration.millis(1000L));
+ Duration allowedLateness = Duration.millis(5000L);
+ PCollection<String> values = p.apply(stream)
+ .apply(Window.<String>into(windowFn).triggering(DefaultTrigger.of())
+ .discardingFiredPanes()
+ .withAllowedLateness(allowedLateness))
+ .apply(WithKeys.<Integer, String>of(1))
+ .apply(GroupByKey.<Integer, String>create())
+ .apply(Values.<Iterable<String>>create())
+ .apply(Flatten.<String>iterables());
+
+ PAssert.that(values).inWindow(windowFn.assignWindow(lateElementTimestamp)).empty();
+ PAssert.that(values)
+ .inWindow(windowFn.assignWindow(new Instant(100)))
+ .containsInAnyOrder("onTime");
+
+ p.run();
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void testElementsAtAlmostPositiveInfinity() {
+ Instant endOfGlobalWindow = GlobalWindow.INSTANCE.maxTimestamp();
+ TestStream<String> stream = TestStream.create(StringUtf8Coder.of())
+ .addElements(TimestampedValue.of("foo", endOfGlobalWindow),
+ TimestampedValue.of("bar", endOfGlobalWindow))
+ .advanceWatermarkToInfinity();
+
+ TestPipeline p = TestPipeline.create();
+ FixedWindows windows = FixedWindows.of(Duration.standardHours(6));
+ PCollection<String> windowedValues = p.apply(stream)
+ .apply(Window.<String>into(windows))
+ .apply(WithKeys.<Integer, String>of(1))
+ .apply(GroupByKey.<Integer, String>create())
+ .apply(Values.<Iterable<String>>create())
+ .apply(Flatten.<String>iterables());
+
+ PAssert.that(windowedValues)
+ .inWindow(windows.assignWindow(GlobalWindow.INSTANCE.maxTimestamp()))
+ .containsInAnyOrder("foo", "bar");
+ p.run();
+ }
+
+ @Test
+ public void testElementAtPositiveInfinityThrows() {
+ Builder<Integer> stream =
+ TestStream.create(VarIntCoder.of())
+ .addElements(TimestampedValue.of(-1, BoundedWindow.TIMESTAMP_MAX_VALUE.minus(1L)));
+ thrown.expect(IllegalArgumentException.class);
+ stream.addElements(TimestampedValue.of(1, BoundedWindow.TIMESTAMP_MAX_VALUE));
+ }
+
+ @Test
+ public void testAdvanceWatermarkNonMonotonicThrows() {
+ Builder<Integer> stream =
+ TestStream.create(VarIntCoder.of())
+ .advanceWatermarkTo(new Instant(0L));
+ thrown.expect(IllegalArgumentException.class);
+ stream.advanceWatermarkTo(new Instant(-1L));
+ }
+
+ @Test
+ public void testAdvanceWatermarkEqualToPositiveInfinityThrows() {
+ Builder<Integer> stream =
+ TestStream.create(VarIntCoder.of())
+ .advanceWatermarkTo(BoundedWindow.TIMESTAMP_MAX_VALUE.minus(1L));
+ thrown.expect(IllegalArgumentException.class);
+ stream.advanceWatermarkTo(BoundedWindow.TIMESTAMP_MAX_VALUE);
+ }
+
+ @Test
+ public void testUnsupportedRunnerThrows() {
+ PipelineOptions opts = PipelineOptionsFactory.create();
+ opts.setRunner(CrashingRunner.class);
+
+ Pipeline p = Pipeline.create(opts);
+
+ thrown.expect(IllegalStateException.class);
+ thrown.expectMessage("does not provide a required override");
+ thrown.expectMessage(TestStream.class.getSimpleName());
+ thrown.expectMessage(CrashingRunner.class.getSimpleName());
+ p.apply(TestStream.create(VarIntCoder.of()).advanceWatermarkToInfinity());
+ }
+
+ @Test
public void testEncodeDecode() throws Exception {
TestStream.Event<Integer> elems =
TestStream.ElementEvent.add(
[2/4] incubator-beam git commit: Add inEarlyPanesInGlobalWindow as a
PAssert Extractor
Posted by lc...@apache.org.
Add inEarlyPanesInGlobalWindow as a PAssert Extractor
This is for use in asserting the contents of speculative panes in the
global window.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f15fab8c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f15fab8c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f15fab8c
Branch: refs/heads/master
Commit: f15fab8ccdb3b40004583e8f7e4e32a0b8ba5121
Parents: bfa3b70
Author: Thomas Groh <tg...@google.com>
Authored: Thu Aug 11 15:46:10 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Fri Aug 19 09:04:18 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/testing/PAssert.java | 18 ++++++++++++++++++
.../apache/beam/sdk/testing/PaneExtractors.java | 18 ++++++++++++++++++
2 files changed, 36 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f15fab8c/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
index e07ee3d..3f1a741 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
@@ -49,6 +49,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Never;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
@@ -176,6 +177,13 @@ public class PAssert {
IterableAssert<T> inCombinedNonLatePanes(BoundedWindow window);
/**
+ * Creates a new {@link IterableAssert} like this one, but with the assertion restricted to only
+ * run on panes in the {@link GlobalWindow} that were emitted before the {@link GlobalWindow}
+ * closed. These panes have {@link Timing#EARLY}.
+ */
+ IterableAssert<T> inEarlyGlobalWindowPanes();
+
+ /**
* Asserts that the iterable in question contains the provided elements.
*
* @return the same {@link IterableAssert} builder for further assertions
@@ -381,6 +389,11 @@ public class PAssert {
return withPane(window, PaneExtractors.<T>nonLatePanes());
}
+ @Override
+ public IterableAssert<T> inEarlyGlobalWindowPanes() {
+ return withPane(GlobalWindow.INSTANCE, PaneExtractors.<T>earlyPanes());
+ }
+
private PCollectionContentsAssert<T> withPane(
BoundedWindow window,
SimpleFunction<Iterable<WindowedValue<T>>, Iterable<T>> paneExtractor) {
@@ -557,6 +570,11 @@ public class PAssert {
return withPanes(window, PaneExtractors.<Iterable<T>>nonLatePanes());
}
+ @Override
+ public IterableAssert<T> inEarlyGlobalWindowPanes() {
+ return withPanes(GlobalWindow.INSTANCE, PaneExtractors.<Iterable<T>>earlyPanes());
+ }
+
private PCollectionSingletonIterableAssert<T> withPanes(
BoundedWindow window,
SimpleFunction<Iterable<WindowedValue<Iterable<T>>>, Iterable<Iterable<T>>> paneExtractor) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f15fab8c/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PaneExtractors.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PaneExtractors.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PaneExtractors.java
index f699bfc..899612b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PaneExtractors.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PaneExtractors.java
@@ -59,6 +59,10 @@ final class PaneExtractors {
return new ExtractNonLatePanes<>();
}
+ static <T> SimpleFunction<Iterable<WindowedValue<T>>, Iterable<T>> earlyPanes() {
+ return new ExtractEarlyPanes<>();
+ }
+
static <T> SimpleFunction<Iterable<WindowedValue<T>>, Iterable<T>> allPanes() {
return new ExtractAllPanes<>();
}
@@ -137,4 +141,18 @@ final class PaneExtractors {
return outputs;
}
}
+
+ private static class ExtractEarlyPanes<T>
+ extends SimpleFunction<Iterable<WindowedValue<T>>, Iterable<T>> {
+ @Override
+ public Iterable<T> apply(Iterable<WindowedValue<T>> input) {
+ List<T> outputs = new ArrayList<>();
+ for (WindowedValue<T> value : input) {
+ if (value.getPane().getTiming() == PaneInfo.Timing.EARLY) {
+ outputs.add(value.getValue());
+ }
+ }
+ return outputs;
+ }
+ }
}
[4/4] incubator-beam git commit: Add TestStream to the DirectRunner
Package
Posted by lc...@apache.org.
Add TestStream to the DirectRunner Package
This closes #817
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8d31ca0c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8d31ca0c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8d31ca0c
Branch: refs/heads/master
Commit: 8d31ca0ca084c37dca5a436fa8b784622b25348d
Parents: bfa3b70 a5ef9a9
Author: Luke Cwik <lc...@google.com>
Authored: Fri Aug 19 09:06:08 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Fri Aug 19 09:06:08 2016 -0700
----------------------------------------------------------------------
.../beam/runners/direct/DirectOptions.java | 45 +--
.../beam/runners/direct/DirectRunner.java | 47 ++-
.../beam/runners/direct/EvaluationContext.java | 10 +-
.../FixedThreadPoolExecutorServiceFactory.java | 45 ---
.../beam/runners/direct/NanosOffsetClock.java | 13 -
.../direct/TestStreamEvaluatorFactory.java | 204 ++++++++++
.../direct/TransformEvaluatorRegistry.java | 11 +
.../direct/WriteWithShardingFactory.java | 2 +-
.../runners/direct/EvaluationContextTest.java | 1 +
.../org/apache/beam/sdk/testing/PAssert.java | 18 +
.../apache/beam/sdk/testing/PaneExtractors.java | 18 +
.../org/apache/beam/sdk/testing/TestStream.java | 372 +++++++++++++++++++
.../apache/beam/sdk/testing/TestStreamTest.java | 328 ++++++++++++++++
13 files changed, 1005 insertions(+), 109 deletions(-)
----------------------------------------------------------------------