You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/08/19 16:21:05 UTC

[1/4] incubator-beam git commit: Add TestStream to the Testing package

Repository: incubator-beam
Updated Branches:
  refs/heads/master bfa3b70ab -> 8d31ca0ca


Add TestStream to the Testing package

This is a source suitable for use with tests that have interesting
triggering behavior. It is an Unbounded source that emits elements in
bundles, and advances the watermark and processing time appropriately.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c72d4fcd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c72d4fcd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c72d4fcd

Branch: refs/heads/master
Commit: c72d4fcd4ca68c00c7edc6094976228a7e999953
Parents: f15fab8
Author: Thomas Groh <tg...@google.com>
Authored: Mon Aug 15 19:43:28 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Fri Aug 19 09:04:18 2016 -0700

----------------------------------------------------------------------
 runners/direct-java/pom.xml                     |   3 +
 .../org/apache/beam/sdk/testing/TestStream.java | 326 +++++++++++++++++++
 .../apache/beam/sdk/testing/TestStreamTest.java | 169 ++++++++++
 3 files changed, 498 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c72d4fcd/runners/direct-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/direct-java/pom.xml b/runners/direct-java/pom.xml
index e06883f..8b0f91d 100644
--- a/runners/direct-java/pom.xml
+++ b/runners/direct-java/pom.xml
@@ -85,6 +85,9 @@
                 <dependency>org.apache.beam:beam-sdks-java-core</dependency>
                 <dependency>org.apache.beam:beam-runners-java-core</dependency>
               </dependenciesToScan>
+              <excludes>
+                <exclude>org/apache/beam/sdk/testing/TestStreamTest.java</exclude>
+              </excludes>
               <systemPropertyVariables>
                 <beamTestPipelineOptions>
                   [

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c72d4fcd/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
new file mode 100644
index 0000000..6d11f72
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.testing;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.DurationCoder;
+import org.apache.beam.sdk.coders.InstantCoder;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.StandardCoder;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.PropertyNames;
+import org.apache.beam.sdk.util.VarInt;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollection.IsBounded;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TimestampedValue.TimestampedValueCoder;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.joda.time.ReadableDuration;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * A testing input that generates an unbounded {@link PCollection} of elements, advancing the
+ * watermark and processing time as elements are emitted. After all of the specified elements are
+ * emitted, ceases to produce output.
+ *
+ * <p>Each call to a {@link TestStream.Builder} method will only be reflected in the state of the
+ * {@link Pipeline} after each method before it has completed and no more progress can be made by
+ * the {@link Pipeline}. A {@link PipelineRunner} must ensure that no more progress can be made in
+ * the {@link Pipeline} before advancing the state of the {@link TestStream}.
+ */
+public final class TestStream<T> extends PTransform<PBegin, PCollection<T>> {
+  private final List<Event<T>> events;
+  private final Coder<T> coder;
+
+  /**
+   * Create a new {@link TestStream.Builder} with no elements and watermark equal to {@link
+   * BoundedWindow#TIMESTAMP_MIN_VALUE}.
+   */
+  public static <T> Builder<T> create(Coder<T> coder) {
+    return new Builder<>(coder);
+  }
+
+  private TestStream(Coder<T> coder, List<Event<T>> events) {
+    this.coder = coder;
+    this.events = checkNotNull(events);
+  }
+
+  public Coder<Event<T>> getEventCoder() {
+    return EventCoder.of(coder);
+  }
+
+  /**
+   * An incomplete {@link TestStream}. Elements added to this builder will be produced in sequence
+   * when the pipeline created by the {@link TestStream} is run.
+   */
+  public static class Builder<T> {
+    private final Coder<T> coder;
+    private final ImmutableList.Builder<Event<T>> events;
+    private Instant currentWatermark;
+
+    private Builder(Coder<T> coder) {
+      this.coder = coder;
+      events = ImmutableList.builder();
+
+      currentWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
+    }
+
+    /**
+     * Adds the specified elements to the source with timestamp equal to the current watermark.
+     *
+     * @return this {@link TestStream.Builder}
+     */
+    @SafeVarargs
+    public final Builder<T> addElements(T element, T... elements) {
+      TimestampedValue<T> firstElement = TimestampedValue.of(element, currentWatermark);
+      @SuppressWarnings("unchecked")
+      TimestampedValue<T>[] remainingElements = new TimestampedValue[elements.length];
+      for (int i = 0; i < elements.length; i++) {
+        remainingElements[i] = TimestampedValue.of(elements[i], currentWatermark);
+      }
+      return addElements(firstElement, remainingElements);
+    }
+
+    /**
+     * Adds the specified elements to the source with the provided timestamps.
+     *
+     * @return this {@link TestStream.Builder}
+     */
+    @SafeVarargs
+    public final Builder<T> addElements(
+        TimestampedValue<T> element, TimestampedValue<T>... elements) {
+      events.add(ElementEvent.add(element, elements));
+      return this;
+    }
+
+    /**
+     * Advance the watermark of this source to the specified instant.
+     *
+     * <p>The watermark must advance monotonically and to at most {@link
+     * BoundedWindow#TIMESTAMP_MAX_VALUE}.
+     *
+     * @return this {@link TestStream.Builder}
+     */
+    public Builder<T> advanceWatermarkTo(Instant newWatermark) {
+      checkArgument(
+          newWatermark.isAfter(currentWatermark), "The watermark must monotonically advance");
+      checkArgument(
+          newWatermark.isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE),
+          "The Watermark cannot progress beyond the maximum. Got: %s. Maximum: %s",
+          newWatermark,
+          BoundedWindow.TIMESTAMP_MAX_VALUE);
+      events.add(WatermarkEvent.<T>advanceTo(newWatermark));
+      currentWatermark = newWatermark;
+      return this;
+    }
+
+    /**
+     * Advance the processing time by the specified amount.
+     *
+     * @return this {@link TestStream.Builder}
+     */
+    public Builder<T> advanceProcessingTime(Duration amount) {
+      checkArgument(
+          amount.getMillis() > 0,
+          "Must advance the processing time by a positive amount. Got: ",
+          amount);
+      events.add(ProcessingTimeEvent.<T>advanceBy(amount));
+      return this;
+    }
+
+    /**
+     * Advance the watermark to infinity, completing this {@link TestStream}. Future calls to the
+     * same builder will not affect the returned {@link TestStream}.
+     */
+    public TestStream<T> advanceWatermarkToInfinity() {
+      events.add(WatermarkEvent.<T>advanceTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
+      return new TestStream<>(coder, events.build());
+    }
+  }
+
+  /**
+   * An event in a {@link TestStream}. A marker interface for all events that happen while
+   * evaluating a {@link TestStream}.
+   */
+  public interface Event<T> {
+    EventType getType();
+  }
+
+  /**
+   * The types of {@link Event} that are supported by {@link TestStream}.
+   */
+  public enum EventType {
+    ELEMENT,
+    WATERMARK,
+    PROCESSING_TIME
+  }
+
+  /** A {@link Event} that produces elements. */
+  @AutoValue
+  public abstract static class ElementEvent<T> implements Event<T> {
+    public abstract Iterable<TimestampedValue<T>> getElements();
+
+    @SafeVarargs
+    static <T> Event<T> add(TimestampedValue<T> element, TimestampedValue<T>... elements) {
+      return add(ImmutableList.<TimestampedValue<T>>builder().add(element).add(elements).build());
+    }
+
+    static <T> Event<T> add(Iterable<TimestampedValue<T>> elements) {
+      return new AutoValue_TestStream_ElementEvent<>(EventType.ELEMENT, elements);
+    }
+  }
+
+  /** A {@link Event} that advances the watermark. */
+  @AutoValue
+  public abstract static class WatermarkEvent<T> implements Event<T> {
+    public abstract Instant getWatermark();
+
+    static <T> Event<T> advanceTo(Instant newWatermark) {
+      return new AutoValue_TestStream_WatermarkEvent<>(EventType.WATERMARK, newWatermark);
+    }
+  }
+
+  /** A {@link Event} that advances the processing time clock. */
+  @AutoValue
+  public abstract static class ProcessingTimeEvent<T> implements Event<T> {
+    public abstract Duration getProcessingTimeAdvance();
+
+    static <T> Event<T> advanceBy(Duration amount) {
+      return new AutoValue_TestStream_ProcessingTimeEvent<>(EventType.PROCESSING_TIME, amount);
+    }
+  }
+
+  @Override
+  public PCollection<T> apply(PBegin input) {
+    return PCollection.<T>createPrimitiveOutputInternal(
+            input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED)
+        .setCoder(coder);
+  }
+
+  public List<Event<T>> getStreamEvents() {
+    return events;
+  }
+
+  /**
+   * A {@link Coder} that encodes and decodes {@link TestStream.Event Events}.
+   *
+   * @param <T> the type of elements in {@link ElementEvent ElementEvents} encoded and decoded by
+   *     this {@link EventCoder}
+   */
+  @VisibleForTesting
+  static final class EventCoder<T> extends StandardCoder<Event<T>> {
+    private static final Coder<ReadableDuration> DURATION_CODER = DurationCoder.of();
+    private static final Coder<Instant> INSTANT_CODER = InstantCoder.of();
+    private final Coder<T> valueCoder;
+    private final Coder<Iterable<TimestampedValue<T>>> elementCoder;
+
+    public static <T> EventCoder<T> of(Coder<T> valueCoder) {
+      return new EventCoder<>(valueCoder);
+    }
+
+    @JsonCreator
+    public static <T> EventCoder<T> of(
+        @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) List<? extends Coder<?>> components) {
+      checkArgument(
+          components.size() == 1,
+          "Was expecting exactly one component coder, got %s",
+          components.size());
+      return new EventCoder<>((Coder<T>) components.get(0));
+    }
+
+    private EventCoder(Coder<T> valueCoder) {
+      this.valueCoder = valueCoder;
+      this.elementCoder = IterableCoder.of(TimestampedValueCoder.of(valueCoder));
+    }
+
+    @Override
+    public void encode(
+        Event<T> value, OutputStream outStream, Context context)
+        throws IOException {
+      VarInt.encode(value.getType().ordinal(), outStream);
+      switch (value.getType()) {
+        case ELEMENT:
+          Iterable<TimestampedValue<T>> elems = ((ElementEvent<T>) value).getElements();
+          elementCoder.encode(elems, outStream, context);
+          break;
+        case WATERMARK:
+          Instant ts = ((WatermarkEvent<T>) value).getWatermark();
+          INSTANT_CODER.encode(ts, outStream, context);
+          break;
+        case PROCESSING_TIME:
+          Duration processingAdvance = ((ProcessingTimeEvent<T>) value).getProcessingTimeAdvance();
+          DURATION_CODER.encode(processingAdvance, outStream, context);
+          break;
+        default:
+          throw new AssertionError("Unreachable");
+      }
+    }
+
+    @Override
+    public Event<T> decode(
+        InputStream inStream, Context context) throws IOException {
+      switch (EventType.values()[VarInt.decodeInt(inStream)]) {
+        case ELEMENT:
+          Iterable<TimestampedValue<T>> elements = elementCoder.decode(inStream, context);
+          return ElementEvent.add(elements);
+        case WATERMARK:
+          return WatermarkEvent.advanceTo(INSTANT_CODER.decode(inStream, context));
+        case PROCESSING_TIME:
+          return ProcessingTimeEvent.advanceBy(
+              DURATION_CODER.decode(inStream, context).toDuration());
+        default:
+          throw new AssertionError("Unreachable");
+      }
+    }
+
+    @Override
+    public List<? extends Coder<?>> getCoderArguments() {
+      return Collections.singletonList(valueCoder);
+    }
+
+    @Override
+    public void verifyDeterministic() throws NonDeterministicException {
+      elementCoder.verifyDeterministic();
+      DURATION_CODER.verifyDeterministic();
+      INSTANT_CODER.verifyDeterministic();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c72d4fcd/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
new file mode 100644
index 0000000..09bccfa
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.testing;
+
+import static org.hamcrest.Matchers.allOf;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.junit.Assert.assertThat;
+
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.io.Serializable;
+
+/**
+ * Tests for {@link TestStream}.
+ */
+@RunWith(JUnit4.class)
+public class TestStreamTest implements Serializable {
+  @Test
+  @Category(NeedsRunner.class)
+  public void testLateDataAccumulating() {
+    Instant instant = new Instant(0);
+    TestStream<Integer> source = TestStream.create(VarIntCoder.of())
+        .addElements(TimestampedValue.of(1, instant),
+            TimestampedValue.of(2, instant),
+            TimestampedValue.of(3, instant))
+        .advanceWatermarkTo(instant.plus(Duration.standardMinutes(6)))
+        // These elements are late but within the allowed lateness
+        .addElements(TimestampedValue.of(4, instant), TimestampedValue.of(5, instant))
+        .advanceWatermarkTo(instant.plus(Duration.standardMinutes(20)))
+        // These elements are droppably late
+        .addElements(TimestampedValue.of(-1, instant),
+            TimestampedValue.of(-2, instant),
+            TimestampedValue.of(-3, instant))
+        .advanceWatermarkToInfinity();
+
+    TestPipeline p = TestPipeline.create();
+    PCollection<Integer> windowed = p
+        .apply(source)
+        .apply(Window.<Integer>into(FixedWindows.of(Duration.standardMinutes(5))).triggering(
+            AfterWatermark.pastEndOfWindow()
+                .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
+                    .plusDelayOf(Duration.standardMinutes(2)))
+                .withLateFirings(AfterPane.elementCountAtLeast(1)))
+            .accumulatingFiredPanes()
+            .withAllowedLateness(Duration.standardMinutes(5), ClosingBehavior.FIRE_ALWAYS));
+    PCollection<Integer> triggered = windowed.apply(WithKeys.<Integer, Integer>of(1))
+        .apply(GroupByKey.<Integer, Integer>create())
+        .apply(Values.<Iterable<Integer>>create())
+        .apply(Flatten.<Integer>iterables());
+    PCollection<Long> count = windowed.apply(Count.<Integer>globally().withoutDefaults());
+    PCollection<Integer> sum = windowed.apply(Sum.integersGlobally().withoutDefaults());
+
+    IntervalWindow window = new IntervalWindow(instant, instant.plus(Duration.standardMinutes(5L)));
+    PAssert.that(triggered)
+        .inFinalPane(window)
+        .containsInAnyOrder(1, 2, 3, 4, 5);
+    PAssert.that(triggered)
+        .inOnTimePane(window)
+        .containsInAnyOrder(1, 2, 3);
+    PAssert.that(count)
+        .inWindow(window)
+        .satisfies(new SerializableFunction<Iterable<Long>, Void>() {
+          @Override
+          public Void apply(Iterable<Long> input) {
+            for (Long count : input) {
+              assertThat(count, allOf(greaterThanOrEqualTo(3L), lessThanOrEqualTo(5L)));
+            }
+            return null;
+          }
+        });
+    PAssert.that(sum)
+        .inWindow(window)
+        .satisfies(new SerializableFunction<Iterable<Integer>, Void>() {
+          @Override
+          public Void apply(Iterable<Integer> input) {
+            for (Integer sum : input) {
+              assertThat(sum, allOf(greaterThanOrEqualTo(6), lessThanOrEqualTo(15)));
+            }
+            return null;
+          }
+        });
+
+    p.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testProcessingTimeTrigger() {
+    TestStream<Long> source = TestStream.create(VarLongCoder.of())
+        .addElements(TimestampedValue.of(1L, new Instant(1000L)),
+            TimestampedValue.of(2L, new Instant(2000L)))
+        .advanceProcessingTime(Duration.standardMinutes(12))
+        .addElements(TimestampedValue.of(3L, new Instant(3000L)))
+        .advanceProcessingTime(Duration.standardMinutes(6))
+        .advanceWatermarkToInfinity();
+
+    TestPipeline p = TestPipeline.create();
+    PCollection<Long> sum = p.apply(source)
+        .apply(Window.<Long>triggering(AfterWatermark.pastEndOfWindow()
+            .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
+                .plusDelayOf(Duration.standardMinutes(5)))).accumulatingFiredPanes()
+            .withAllowedLateness(Duration.ZERO))
+        .apply(Sum.longsGlobally());
+
+    PAssert.that(sum).inEarlyGlobalWindowPanes().containsInAnyOrder(3L, 6L);
+
+    p.run();
+  }
+
+  @Test
+  public void testEncodeDecode() throws Exception {
+    TestStream.Event<Integer> elems =
+        TestStream.ElementEvent.add(
+            TimestampedValue.of(1, new Instant()),
+            TimestampedValue.of(-10, new Instant()),
+            TimestampedValue.of(Integer.MAX_VALUE, new Instant()));
+    TestStream.Event<Integer> wm = TestStream.WatermarkEvent.advanceTo(new Instant(100));
+    TestStream.Event<Integer> procTime =
+        TestStream.ProcessingTimeEvent.advanceBy(Duration.millis(90548));
+
+    TestStream.EventCoder<Integer> coder = TestStream.EventCoder.of(VarIntCoder.of());
+
+    CoderProperties.coderSerializable(coder);
+    CoderProperties.coderDecodeEncodeEqual(coder, elems);
+    CoderProperties.coderDecodeEncodeEqual(coder, wm);
+    CoderProperties.coderDecodeEncodeEqual(coder, procTime);
+  }
+}


[3/4] incubator-beam git commit: Implement TestStream in the DirectRunner

Posted by lc...@apache.org.
Implement TestStream in the DirectRunner


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a5ef9a96
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a5ef9a96
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a5ef9a96

Branch: refs/heads/master
Commit: a5ef9a9689147105854f12d3ea054b3887a94e24
Parents: c72d4fc
Author: Thomas Groh <tg...@google.com>
Authored: Mon Aug 15 19:45:58 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Fri Aug 19 09:04:19 2016 -0700

----------------------------------------------------------------------
 runners/direct-java/pom.xml                     |   3 -
 .../beam/runners/direct/DirectOptions.java      |  45 +---
 .../beam/runners/direct/DirectRunner.java       |  47 ++++-
 .../beam/runners/direct/EvaluationContext.java  |  10 +-
 .../FixedThreadPoolExecutorServiceFactory.java  |  45 ----
 .../beam/runners/direct/NanosOffsetClock.java   |  13 --
 .../direct/TestStreamEvaluatorFactory.java      | 204 +++++++++++++++++++
 .../direct/TransformEvaluatorRegistry.java      |  11 +
 .../direct/WriteWithShardingFactory.java        |   2 +-
 .../runners/direct/EvaluationContextTest.java   |   1 +
 .../org/apache/beam/sdk/testing/TestStream.java | 114 +++++++----
 .../apache/beam/sdk/testing/TestStreamTest.java | 159 +++++++++++++++
 12 files changed, 508 insertions(+), 146 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a5ef9a96/runners/direct-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/direct-java/pom.xml b/runners/direct-java/pom.xml
index 8b0f91d..e06883f 100644
--- a/runners/direct-java/pom.xml
+++ b/runners/direct-java/pom.xml
@@ -85,9 +85,6 @@
                 <dependency>org.apache.beam:beam-sdks-java-core</dependency>
                 <dependency>org.apache.beam:beam-runners-java-core</dependency>
               </dependenciesToScan>
-              <excludes>
-                <exclude>org/apache/beam/sdk/testing/TestStreamTest.java</exclude>
-              </excludes>
               <systemPropertyVariables>
                 <beamTestPipelineOptions>
                   [

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a5ef9a96/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java
index 3901c04..798fda4 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java
@@ -17,59 +17,16 @@
  */
 package org.apache.beam.runners.direct;
 
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.options.ApplicationNameOptions;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
-import org.apache.beam.sdk.options.Hidden;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.Validation.Required;
-import org.apache.beam.sdk.transforms.PTransform;
-
-import com.fasterxml.jackson.annotation.JsonIgnore;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 
 /**
  * Options that can be used to configure the {@link org.apache.beam.runners.direct.DirectRunner}.
  */
 public interface DirectOptions extends PipelineOptions, ApplicationNameOptions {
-  /**
-   * Gets the {@link ExecutorServiceFactory} to use to create instances of {@link ExecutorService}
-   * to execute {@link PTransform PTransforms}.
-   *
-   * <p>Note that {@link ExecutorService ExecutorServices} returned by the factory must ensure that
-   * it cannot enter a state in which it will not schedule additional pending work unless currently
-   * scheduled work completes, as this may cause the {@link Pipeline} to cease processing.
-   *
-   * <p>Defaults to a {@link FixedThreadPoolExecutorServiceFactory}, which produces instances of
-   * {@link Executors#newCachedThreadPool()}.
-   */
-  @JsonIgnore
-  @Required
-  @Hidden
-  @Default.InstanceFactory(FixedThreadPoolExecutorServiceFactory.class)
-  ExecutorServiceFactory getExecutorServiceFactory();
-
-  void setExecutorServiceFactory(ExecutorServiceFactory executorService);
-
-  /**
-   * Gets the {@link Clock} used by this pipeline. The clock is used in place of accessing the
-   * system time when time values are required by the evaluator.
-   */
-  @Default.InstanceFactory(NanosOffsetClock.Factory.class)
-  @JsonIgnore
-  @Required
-  @Hidden
-  @Description(
-      "The processing time source used by the pipeline. When the current time is "
-          + "needed by the evaluator, the result of clock#now() is used.")
-  Clock getClock();
-
-  void setClock(Clock clock);
-
-  @Default.Boolean(false)
+  @Default.Boolean(true)
   @Description(
       "If the pipeline should shut down producers which have reached the maximum "
           + "representable watermark. If this is set to true, a pipeline in which all PTransforms "

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a5ef9a96/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index f2b781e..68184de 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.direct;
 
 import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly;
 import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult;
+import org.apache.beam.runners.direct.TestStreamEvaluatorFactory.DirectTestStreamFactory;
 import org.apache.beam.runners.direct.ViewEvaluatorFactory.ViewOverrideFactory;
 import org.apache.beam.sdk.AggregatorRetrievalException;
 import org.apache.beam.sdk.AggregatorValues;
@@ -29,6 +30,7 @@ import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.io.Write;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.testing.TestStream;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.GroupByKey;
@@ -46,6 +48,7 @@ import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.PValue;
 
 import com.google.common.base.MoreObjects;
+import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
@@ -58,6 +61,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 /**
  * An In-Memory implementation of the Dataflow Programming Model. Supports Unbounded
@@ -76,8 +80,9 @@ public class DirectRunner
   private static Map<Class<? extends PTransform>, PTransformOverrideFactory>
       defaultTransformOverrides =
           ImmutableMap.<Class<? extends PTransform>, PTransformOverrideFactory>builder()
-              .put(GroupByKey.class, new DirectGroupByKeyOverrideFactory())
               .put(CreatePCollectionView.class, new ViewOverrideFactory())
+              .put(GroupByKey.class, new DirectGroupByKeyOverrideFactory())
+              .put(TestStream.class, new DirectTestStreamFactory())
               .put(Write.Bound.class, new WriteWithShardingFactory())
               .build();
 
@@ -175,6 +180,8 @@ public class DirectRunner
 
   ////////////////////////////////////////////////////////////////////////////////////////////////
   private final DirectOptions options;
+  private Supplier<ExecutorService> executorServiceSupplier = new FixedThreadPoolSupplier();
+  private Supplier<Clock> clockSupplier = new NanosOffsetClockSupplier();
 
   public static DirectRunner fromOptions(PipelineOptions options) {
     return new DirectRunner(options.as(DirectOptions.class));
@@ -191,6 +198,14 @@ public class DirectRunner
     return options;
   }
 
+  Supplier<Clock> getClockSupplier() {
+    return clockSupplier;
+  }
+
+  void setClockSupplier(Supplier<Clock> supplier) {
+    this.clockSupplier = supplier;
+  }
+
   @Override
   public <OutputT extends POutput, InputT extends PInput> OutputT apply(
       PTransform<InputT, OutputT> transform, InputT input) {
@@ -223,6 +238,7 @@ public class DirectRunner
     EvaluationContext context =
         EvaluationContext.create(
             getPipelineOptions(),
+            clockSupplier.get(),
             createBundleFactory(getPipelineOptions()),
             consumerTrackingVisitor.getRootTransforms(),
             consumerTrackingVisitor.getValueToConsumers(),
@@ -230,14 +246,15 @@ public class DirectRunner
             consumerTrackingVisitor.getViews());
 
     // independent executor service for each run
-    ExecutorService executorService =
-        context.getPipelineOptions().getExecutorServiceFactory().create();
+    ExecutorService executorService = executorServiceSupplier.get();
+
+    TransformEvaluatorRegistry registry = TransformEvaluatorRegistry.defaultRegistry();
     PipelineExecutor executor =
         ExecutorServiceParallelExecutor.create(
             executorService,
             consumerTrackingVisitor.getValueToConsumers(),
             keyedPValueVisitor.getKeyedPValues(),
-            TransformEvaluatorRegistry.defaultRegistry(),
+            registry,
             defaultModelEnforcements(options),
             context);
     executor.start(consumerTrackingVisitor.getRootTransforms());
@@ -392,4 +409,26 @@ public class DirectRunner
           "DirectPipelineResult does not support waitUntilFinish.");
     }
   }
+
+  /**
+   * A {@link Supplier} that creates a {@link ExecutorService} based on
+   * {@link Executors#newFixedThreadPool(int)}.
+   */
+  private static class FixedThreadPoolSupplier implements Supplier<ExecutorService> {
+    @Override
+    public ExecutorService get() {
+      return Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
+    }
+  }
+
+
+  /**
+   * A {@link Supplier} that creates a {@link NanosOffsetClock}.
+   */
+  private static class NanosOffsetClockSupplier implements Supplier<Clock> {
+    @Override
+    public Clock get() {
+      return NanosOffsetClock.create();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a5ef9a96/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
index 94f28e2..b9f159a 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
@@ -102,24 +102,26 @@ class EvaluationContext {
 
   public static EvaluationContext create(
       DirectOptions options,
+      Clock clock,
       BundleFactory bundleFactory,
       Collection<AppliedPTransform<?, ?, ?>> rootTransforms,
       Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers,
       Map<AppliedPTransform<?, ?, ?>, String> stepNames,
       Collection<PCollectionView<?>> views) {
     return new EvaluationContext(
-        options, bundleFactory, rootTransforms, valueToConsumers, stepNames, views);
+        options, clock, bundleFactory, rootTransforms, valueToConsumers, stepNames, views);
   }
 
   private EvaluationContext(
       DirectOptions options,
+      Clock clock,
       BundleFactory bundleFactory,
       Collection<AppliedPTransform<?, ?, ?>> rootTransforms,
       Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers,
       Map<AppliedPTransform<?, ?, ?>, String> stepNames,
       Collection<PCollectionView<?>> views) {
     this.options = checkNotNull(options);
-    this.clock = options.getClock();
+    this.clock = clock;
     this.bundleFactory = checkNotNull(bundleFactory);
     checkNotNull(rootTransforms);
     checkNotNull(valueToConsumers);
@@ -433,4 +435,8 @@ class EvaluationContext {
   public Instant now() {
     return clock.now();
   }
+
+  Clock getClock() {
+    return clock;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a5ef9a96/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FixedThreadPoolExecutorServiceFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FixedThreadPoolExecutorServiceFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FixedThreadPoolExecutorServiceFactory.java
deleted file mode 100644
index 74c4292..0000000
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FixedThreadPoolExecutorServiceFactory.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct;
-
-import org.apache.beam.sdk.options.DefaultValueFactory;
-import org.apache.beam.sdk.options.PipelineOptions;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-/**
- * A {@link ExecutorServiceFactory} that produces fixed thread pools via
- * {@link Executors#newFixedThreadPool(int)}, with the number of threads equal to the available
- * processors as provided by {@link Runtime#availableProcessors()}.
- */
-class FixedThreadPoolExecutorServiceFactory
-    implements DefaultValueFactory<ExecutorServiceFactory>, ExecutorServiceFactory {
-  private static final FixedThreadPoolExecutorServiceFactory INSTANCE =
-      new FixedThreadPoolExecutorServiceFactory();
-
-  @Override
-  public ExecutorServiceFactory create(PipelineOptions options) {
-    return INSTANCE;
-  }
-
-  @Override
-  public ExecutorService create() {
-    return Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a5ef9a96/runners/direct-java/src/main/java/org/apache/beam/runners/direct/NanosOffsetClock.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/NanosOffsetClock.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/NanosOffsetClock.java
index ffdee9d..77fa196 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/NanosOffsetClock.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/NanosOffsetClock.java
@@ -17,9 +17,6 @@
  */
 package org.apache.beam.runners.direct;
 
-import org.apache.beam.sdk.options.DefaultValueFactory;
-import org.apache.beam.sdk.options.PipelineOptions;
-
 import org.joda.time.Instant;
 
 import java.util.concurrent.TimeUnit;
@@ -46,14 +43,4 @@ public class NanosOffsetClock implements Clock {
         baseMillis + (TimeUnit.MILLISECONDS.convert(
             System.nanoTime() - nanosAtBaseMillis, TimeUnit.NANOSECONDS)));
   }
-
-  /**
-   * Creates instances of {@link NanosOffsetClock}.
-   */
-  public static class Factory implements DefaultValueFactory<Clock> {
-    @Override
-    public Clock create(PipelineOptions options) {
-      return new NanosOffsetClock();
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a5ef9a96/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
new file mode 100644
index 0000000..90a83b0
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.testing.TestStream;
+import org.apache.beam.sdk.testing.TestStream.ElementEvent;
+import org.apache.beam.sdk.testing.TestStream.Event;
+import org.apache.beam.sdk.testing.TestStream.EventType;
+import org.apache.beam.sdk.testing.TestStream.ProcessingTimeEvent;
+import org.apache.beam.sdk.testing.TestStream.WatermarkEvent;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollection.IsBounded;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.TimestampedValue;
+
+import com.google.common.base.Supplier;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.annotation.Nullable;
+
+/**
+ * The {@link TransformEvaluatorFactory} for the {@link TestStream} primitive.
+ */
+class TestStreamEvaluatorFactory implements TransformEvaluatorFactory {
+  private final AtomicBoolean inUse = new AtomicBoolean(false);
+  private final AtomicReference<Evaluator<?>> evaluator = new AtomicReference<>();
+
+  @Nullable
+  @Override
+  public <InputT> TransformEvaluator<InputT> forApplication(
+      AppliedPTransform<?, ?, ?> application,
+      @Nullable CommittedBundle<?> inputBundle,
+      EvaluationContext evaluationContext) throws Exception {
+    return createEvaluator((AppliedPTransform) application, evaluationContext);
+  }
+
+  @Override
+  public void cleanup() throws Exception {}
+
+  private <InputT, OutputT> TransformEvaluator<? super InputT> createEvaluator(
+      AppliedPTransform<PBegin, PCollection<OutputT>, TestStream<OutputT>> application,
+      EvaluationContext evaluationContext) {
+    if (evaluator.get() == null) {
+      Evaluator<OutputT> createdEvaluator = new Evaluator<>(application, evaluationContext, inUse);
+      evaluator.compareAndSet(null, createdEvaluator);
+    }
+    if (inUse.compareAndSet(false, true)) {
+      return evaluator.get();
+    } else {
+      return null;
+    }
+  }
+
+  private static class Evaluator<T> implements TransformEvaluator<Object> {
+    private final AppliedPTransform<PBegin, PCollection<T>, TestStream<T>> application;
+    private final EvaluationContext context;
+    private final AtomicBoolean inUse;
+    private final List<Event<T>> events;
+    private int index;
+    private Instant currentWatermark;
+
+    private Evaluator(
+        AppliedPTransform<PBegin, PCollection<T>, TestStream<T>> application,
+        EvaluationContext context,
+        AtomicBoolean inUse) {
+      this.application = application;
+      this.context = context;
+      this.inUse = inUse;
+      this.events = application.getTransform().getEvents();
+      index = 0;
+      currentWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
+    }
+
+    @Override
+    public void processElement(WindowedValue<Object> element) throws Exception {
+    }
+
+    @Override
+    public TransformResult finishBundle() throws Exception {
+      if (index >= events.size()) {
+        return StepTransformResult.withHold(application, BoundedWindow.TIMESTAMP_MAX_VALUE).build();
+      }
+      Event<T> event = events.get(index);
+      if (event.getType().equals(EventType.WATERMARK)) {
+        currentWatermark = ((WatermarkEvent<T>) event).getWatermark();
+      }
+      StepTransformResult.Builder result =
+          StepTransformResult.withHold(application, currentWatermark);
+      if (event.getType().equals(EventType.ELEMENT)) {
+        UncommittedBundle<T> bundle = context.createRootBundle(application.getOutput());
+        for (TimestampedValue<T> elem : ((ElementEvent<T>) event).getElements()) {
+          bundle.add(WindowedValue.timestampedValueInGlobalWindow(elem.getValue(),
+              elem.getTimestamp()));
+        }
+        result.addOutput(bundle);
+      }
+      if (event.getType().equals(EventType.PROCESSING_TIME)) {
+        ((TestClock) context.getClock())
+            .advance(((ProcessingTimeEvent<T>) event).getProcessingTimeAdvance());
+      }
+      index++;
+      checkState(inUse.compareAndSet(true, false),
+          "The InUse flag of a %s was changed while the source evaluator was executing. "
+              + "%s cannot be split or evaluated in parallel.",
+          TestStream.class.getSimpleName(),
+          TestStream.class.getSimpleName());
+      return result.build();
+    }
+  }
+
+  private static class TestClock implements Clock {
+    private final AtomicReference<Instant> currentTime =
+        new AtomicReference<>(BoundedWindow.TIMESTAMP_MIN_VALUE);
+
+    public void advance(Duration amount) {
+      Instant now = currentTime.get();
+      currentTime.compareAndSet(now, now.plus(amount));
+    }
+
+    @Override
+    public Instant now() {
+      return currentTime.get();
+    }
+  }
+
+  private static class TestClockSupplier implements Supplier<Clock> {
+    @Override
+    public Clock get() {
+      return new TestClock();
+    }
+  }
+
+  static class DirectTestStreamFactory implements PTransformOverrideFactory {
+    @Override
+    public <InputT extends PInput, OutputT extends POutput> PTransform<InputT, OutputT> override(
+        PTransform<InputT, OutputT> transform) {
+      if (transform instanceof TestStream) {
+        return (PTransform<InputT, OutputT>)
+            new DirectTestStream<OutputT>((TestStream<OutputT>) transform);
+      }
+      return transform;
+    }
+
+    private static class DirectTestStream<T> extends PTransform<PBegin, PCollection<T>> {
+      private final TestStream<T> original;
+
+      private DirectTestStream(TestStream transform) {
+        this.original = transform;
+      }
+
+      @Override
+      public PCollection<T> apply(PBegin input) {
+        setup(input.getPipeline());
+        return PCollection.<T>createPrimitiveOutputInternal(
+                input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED)
+            .setCoder(original.getValueCoder());
+      }
+
+      private void setup(Pipeline p) {
+        PipelineRunner runner = p.getRunner();
+        checkState(runner instanceof DirectRunner,
+            "%s can only be used when running with the %s",
+            getClass().getSimpleName(),
+            DirectRunner.class.getSimpleName());
+        ((DirectRunner) runner).setClockSupplier(new TestClockSupplier());
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a5ef9a96/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
index b469237..c35e8b1 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
@@ -23,6 +23,7 @@ import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupAlsoByWindow;
 import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.testing.TestStream;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Flatten.FlattenPCollectionList;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -61,6 +62,7 @@ class TransformEvaluatorRegistry implements TransformEvaluatorFactory {
             // Runner-specific primitives used in expansion of GroupByKey
             .put(DirectGroupByKeyOnly.class, new GroupByKeyOnlyEvaluatorFactory())
             .put(DirectGroupAlsoByWindow.class, new GroupAlsoByWindowEvaluatorFactory())
+            .put(TestStream.class, new TestStreamEvaluatorFactory())
             .build();
     return new TransformEvaluatorRegistry(primitives);
   }
@@ -117,4 +119,13 @@ class TransformEvaluatorRegistry implements TransformEvaluatorFactory {
       throw toThrow;
     }
   }
+
+  /**
+   * A factory to create Transform Evaluator Registries.
+   */
+  public static class Factory {
+    public TransformEvaluatorRegistry create() {
+      return TransformEvaluatorRegistry.defaultRegistry();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a5ef9a96/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
index c2157b8..1ab3403 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
@@ -66,7 +66,7 @@ class WriteWithShardingFactory implements PTransformOverrideFactory {
     return transform;
   }
 
-  private static class DynamicallyReshardedWrite <T> extends PTransform<PCollection<T>, PDone> {
+  private static class DynamicallyReshardedWrite<T> extends PTransform<PCollection<T>, PDone> {
     private final transient Write.Bound<T> original;
 
     private DynamicallyReshardedWrite(Bound<T> original) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a5ef9a96/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
index d4b5773..7ac0caa 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
@@ -117,6 +117,7 @@ public class EvaluationContextTest {
     context =
         EvaluationContext.create(
             runner.getPipelineOptions(),
+            NanosOffsetClock.create(),
             ImmutableListBundleFactory.create(),
             rootTransforms,
             valueToConsumers,

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a5ef9a96/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
index 6d11f72..e2eda32 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
@@ -32,10 +32,8 @@ import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.PropertyNames;
 import org.apache.beam.sdk.util.VarInt;
-import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollection.IsBounded;
 import org.apache.beam.sdk.values.TimestampedValue;
 import org.apache.beam.sdk.values.TimestampedValue.TimestampedValueCoder;
 
@@ -83,30 +81,30 @@ public final class TestStream<T> extends PTransform<PBegin, PCollection<T>> {
     this.events = checkNotNull(events);
   }
 
-  public Coder<Event<T>> getEventCoder() {
-    return EventCoder.of(coder);
-  }
-
   /**
    * An incomplete {@link TestStream}. Elements added to this builder will be produced in sequence
    * when the pipeline created by the {@link TestStream} is run.
    */
   public static class Builder<T> {
     private final Coder<T> coder;
-    private final ImmutableList.Builder<Event<T>> events;
-    private Instant currentWatermark;
+    private final ImmutableList<Event<T>> events;
+    private final Instant currentWatermark;
 
     private Builder(Coder<T> coder) {
-      this.coder = coder;
-      events = ImmutableList.builder();
+      this(coder, ImmutableList.<Event<T>>of(), BoundedWindow.TIMESTAMP_MIN_VALUE);
+    }
 
-      currentWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
+    private Builder(Coder<T> coder, ImmutableList<Event<T>> events, Instant currentWatermark) {
+      this.coder = coder;
+      this.events = events;
+      this.currentWatermark = currentWatermark;
     }
 
     /**
      * Adds the specified elements to the source with timestamp equal to the current watermark.
      *
-     * @return this {@link TestStream.Builder}
+     * @return A {@link TestStream.Builder} like this one that will add the provided elements
+     *         after all earlier events have completed.
      */
     @SafeVarargs
     public final Builder<T> addElements(T element, T... elements) {
@@ -122,22 +120,40 @@ public final class TestStream<T> extends PTransform<PBegin, PCollection<T>> {
     /**
      * Adds the specified elements to the source with the provided timestamps.
      *
-     * @return this {@link TestStream.Builder}
+     * @return A {@link TestStream.Builder} like this one that will add the provided elements
+     *         after all earlier events have completed.
      */
     @SafeVarargs
     public final Builder<T> addElements(
         TimestampedValue<T> element, TimestampedValue<T>... elements) {
-      events.add(ElementEvent.add(element, elements));
-      return this;
+      checkArgument(
+          element.getTimestamp().isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE),
+          "Elements must have timestamps before %s. Got: %s",
+          BoundedWindow.TIMESTAMP_MAX_VALUE,
+          element.getTimestamp());
+      for (TimestampedValue<T> multiElement : elements) {
+        checkArgument(
+            multiElement.getTimestamp().isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE),
+            "Elements must have timestamps before %s. Got: %s",
+            BoundedWindow.TIMESTAMP_MAX_VALUE,
+            multiElement.getTimestamp());
+      }
+      ImmutableList<Event<T>> newEvents =
+          ImmutableList.<Event<T>>builder()
+              .addAll(events)
+              .add(ElementEvent.add(element, elements))
+              .build();
+      return new Builder<T>(coder, newEvents, currentWatermark);
     }
 
     /**
      * Advance the watermark of this source to the specified instant.
      *
-     * <p>The watermark must advance monotonically and to at most {@link
-     * BoundedWindow#TIMESTAMP_MAX_VALUE}.
+     * <p>The watermark must advance monotonically and cannot advance to {@link
+     * BoundedWindow#TIMESTAMP_MAX_VALUE} or beyond.
      *
-     * @return this {@link TestStream.Builder}
+     * @return A {@link TestStream.Builder} like this one that will advance the watermark to the
+     *         specified point after all earlier events have completed.
      */
     public Builder<T> advanceWatermarkTo(Instant newWatermark) {
       checkArgument(
@@ -147,23 +163,30 @@ public final class TestStream<T> extends PTransform<PBegin, PCollection<T>> {
           "The Watermark cannot progress beyond the maximum. Got: %s. Maximum: %s",
           newWatermark,
           BoundedWindow.TIMESTAMP_MAX_VALUE);
-      events.add(WatermarkEvent.<T>advanceTo(newWatermark));
-      currentWatermark = newWatermark;
-      return this;
+      ImmutableList<Event<T>> newEvents = ImmutableList.<Event<T>>builder()
+          .addAll(events)
+          .add(WatermarkEvent.<T>advanceTo(newWatermark))
+          .build();
+      return new Builder<T>(coder, newEvents, newWatermark);
     }
 
     /**
      * Advance the processing time by the specified amount.
      *
-     * @return this {@link TestStream.Builder}
+     * @return A {@link TestStream.Builder} like this one that will advance the processing time by
+     *         the specified amount after all earlier events have completed.
      */
     public Builder<T> advanceProcessingTime(Duration amount) {
       checkArgument(
           amount.getMillis() > 0,
           "Must advance the processing time by a positive amount. Got: ",
           amount);
-      events.add(ProcessingTimeEvent.<T>advanceBy(amount));
-      return this;
+      ImmutableList<Event<T>> newEvents =
+          ImmutableList.<Event<T>>builder()
+              .addAll(events)
+              .add(ProcessingTimeEvent.<T>advanceBy(amount))
+              .build();
+      return new Builder<T>(coder, newEvents, currentWatermark);
     }
 
     /**
@@ -171,8 +194,12 @@ public final class TestStream<T> extends PTransform<PBegin, PCollection<T>> {
      * same builder will not affect the returned {@link TestStream}.
      */
     public TestStream<T> advanceWatermarkToInfinity() {
-      events.add(WatermarkEvent.<T>advanceTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
-      return new TestStream<>(coder, events.build());
+      ImmutableList<Event<T>> newEvents =
+          ImmutableList.<Event<T>>builder()
+              .addAll(events)
+              .add(WatermarkEvent.<T>advanceTo(BoundedWindow.TIMESTAMP_MAX_VALUE))
+              .build();
+      return new TestStream<>(coder, newEvents);
     }
   }
 
@@ -230,12 +257,30 @@ public final class TestStream<T> extends PTransform<PBegin, PCollection<T>> {
 
   @Override
   public PCollection<T> apply(PBegin input) {
-    return PCollection.<T>createPrimitiveOutputInternal(
-            input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED)
-        .setCoder(coder);
+    throw new IllegalStateException(
+        String.format(
+            "Pipeline Runner %s does not provide a required override for %s",
+            input.getPipeline().getRunner().getClass().getSimpleName(),
+            getClass().getSimpleName()));
+  }
+
+  public Coder<T> getValueCoder() {
+    return coder;
+  }
+
+  /**
+   * Returns a coder suitable for encoding {@link TestStream.Event}.
+   */
+  public Coder<Event<T>> getEventCoder() {
+    return EventCoder.of(coder);
   }
 
-  public List<Event<T>> getStreamEvents() {
+  /**
+   * Returns the sequence of {@link Event Events} in this {@link TestStream}.
+   *
+   * <p>For use by {@link PipelineRunner} authors.
+   */
+  public List<Event<T>> getEvents() {
     return events;
   }
 
@@ -243,7 +288,7 @@ public final class TestStream<T> extends PTransform<PBegin, PCollection<T>> {
    * A {@link Coder} that encodes and decodes {@link TestStream.Event Events}.
    *
    * @param <T> the type of elements in {@link ElementEvent ElementEvents} encoded and decoded by
-   *     this {@link EventCoder}
+   *            this {@link EventCoder}
    */
   @VisibleForTesting
   static final class EventCoder<T> extends StandardCoder<Event<T>> {
@@ -290,14 +335,15 @@ public final class TestStream<T> extends PTransform<PBegin, PCollection<T>> {
           DURATION_CODER.encode(processingAdvance, outStream, context);
           break;
         default:
-          throw new AssertionError("Unreachable");
+          throw new AssertionError("Unreachable: Unsupported Event Type " + value.getType());
       }
     }
 
     @Override
     public Event<T> decode(
         InputStream inStream, Context context) throws IOException {
-      switch (EventType.values()[VarInt.decodeInt(inStream)]) {
+      EventType eventType = EventType.values()[VarInt.decodeInt(inStream)];
+      switch (eventType) {
         case ELEMENT:
           Iterable<TimestampedValue<T>> elements = elementCoder.decode(inStream, context);
           return ElementEvent.add(elements);
@@ -307,7 +353,7 @@ public final class TestStream<T> extends PTransform<PBegin, PCollection<T>> {
           return ProcessingTimeEvent.advanceBy(
               DURATION_CODER.decode(inStream, context).toDuration());
         default:
-          throw new AssertionError("Unreachable");
+          throw new AssertionError("Unreachable: Unsupported Event Type " + eventType);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a5ef9a96/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
index 09bccfa..df37d7f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
@@ -23,8 +23,13 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.lessThanOrEqualTo;
 import static org.junit.Assert.assertThat;
 
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.TestStream.Builder;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
@@ -35,8 +40,12 @@ import org.apache.beam.sdk.transforms.WithKeys;
 import org.apache.beam.sdk.transforms.windowing.AfterPane;
 import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
 import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.Never;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
 import org.apache.beam.sdk.values.PCollection;
@@ -44,8 +53,10 @@ import org.apache.beam.sdk.values.TimestampedValue;
 
 import org.joda.time.Duration;
 import org.joda.time.Instant;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
@@ -56,6 +67,8 @@ import java.io.Serializable;
  */
 @RunWith(JUnit4.class)
 public class TestStreamTest implements Serializable {
+  @Rule public transient ExpectedException thrown = ExpectedException.none();
+
   @Test
   @Category(NeedsRunner.class)
   public void testLateDataAccumulating() {
@@ -149,6 +162,152 @@ public class TestStreamTest implements Serializable {
   }
 
   @Test
+  @Category(NeedsRunner.class)
+  public void testDiscardingMode() {
+    TestStream<String> stream =
+        TestStream.create(StringUtf8Coder.of())
+            .advanceWatermarkTo(new Instant(0))
+            .addElements(
+                TimestampedValue.of("firstPane", new Instant(100)),
+                TimestampedValue.of("alsoFirstPane", new Instant(200)))
+            .addElements(TimestampedValue.of("onTimePane", new Instant(500)))
+            .advanceWatermarkTo(new Instant(1001L))
+            .addElements(
+                TimestampedValue.of("finalLatePane", new Instant(750)),
+                TimestampedValue.of("alsoFinalLatePane", new Instant(250)))
+            .advanceWatermarkToInfinity();
+
+    TestPipeline p = TestPipeline.create();
+    FixedWindows windowFn = FixedWindows.of(Duration.millis(1000L));
+    Duration allowedLateness = Duration.millis(5000L);
+    PCollection<String> values =
+        p.apply(stream)
+            .apply(
+                Window.<String>into(windowFn)
+                    .triggering(
+                        AfterWatermark.pastEndOfWindow()
+                            .withEarlyFirings(AfterPane.elementCountAtLeast(2))
+                            .withLateFirings(Never.ever()))
+                    .discardingFiredPanes()
+                    .withAllowedLateness(allowedLateness))
+            .apply(WithKeys.<Integer, String>of(1))
+            .apply(GroupByKey.<Integer, String>create())
+            .apply(Values.<Iterable<String>>create())
+            .apply(Flatten.<String>iterables());
+
+    IntervalWindow window = windowFn.assignWindow(new Instant(100));
+    PAssert.that(values)
+        .inWindow(window)
+        .containsInAnyOrder(
+            "firstPane", "alsoFirstPane", "onTimePane", "finalLatePane", "alsoFinalLatePane");
+    PAssert.that(values)
+        .inCombinedNonLatePanes(window)
+        .containsInAnyOrder("firstPane", "alsoFirstPane", "onTimePane");
+    PAssert.that(values).inOnTimePane(window).containsInAnyOrder("onTimePane");
+    PAssert.that(values)
+        .inFinalPane(window)
+        .containsInAnyOrder("finalLatePane", "alsoFinalLatePane");
+
+    p.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testFirstElementLate() {
+    Instant lateElementTimestamp = new Instant(-1_000_000);
+    TestStream<String> stream =
+        TestStream.create(StringUtf8Coder.of())
+            .advanceWatermarkTo(new Instant(0))
+            .addElements(TimestampedValue.of("late", lateElementTimestamp))
+            .addElements(TimestampedValue.of("onTime", new Instant(100)))
+            .advanceWatermarkToInfinity();
+
+    TestPipeline p = TestPipeline.create();
+    FixedWindows windowFn = FixedWindows.of(Duration.millis(1000L));
+    Duration allowedLateness = Duration.millis(5000L);
+    PCollection<String> values = p.apply(stream)
+        .apply(Window.<String>into(windowFn).triggering(DefaultTrigger.of())
+            .discardingFiredPanes()
+            .withAllowedLateness(allowedLateness))
+        .apply(WithKeys.<Integer, String>of(1))
+        .apply(GroupByKey.<Integer, String>create())
+        .apply(Values.<Iterable<String>>create())
+        .apply(Flatten.<String>iterables());
+
+    PAssert.that(values).inWindow(windowFn.assignWindow(lateElementTimestamp)).empty();
+    PAssert.that(values)
+        .inWindow(windowFn.assignWindow(new Instant(100)))
+        .containsInAnyOrder("onTime");
+
+    p.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testElementsAtAlmostPositiveInfinity() {
+    Instant endOfGlobalWindow = GlobalWindow.INSTANCE.maxTimestamp();
+    TestStream<String> stream = TestStream.create(StringUtf8Coder.of())
+        .addElements(TimestampedValue.of("foo", endOfGlobalWindow),
+            TimestampedValue.of("bar", endOfGlobalWindow))
+        .advanceWatermarkToInfinity();
+
+    TestPipeline p = TestPipeline.create();
+    FixedWindows windows = FixedWindows.of(Duration.standardHours(6));
+    PCollection<String> windowedValues = p.apply(stream)
+        .apply(Window.<String>into(windows))
+        .apply(WithKeys.<Integer, String>of(1))
+        .apply(GroupByKey.<Integer, String>create())
+        .apply(Values.<Iterable<String>>create())
+        .apply(Flatten.<String>iterables());
+
+    PAssert.that(windowedValues)
+        .inWindow(windows.assignWindow(GlobalWindow.INSTANCE.maxTimestamp()))
+        .containsInAnyOrder("foo", "bar");
+    p.run();
+  }
+
+  @Test
+  public void testElementAtPositiveInfinityThrows() {
+    Builder<Integer> stream =
+        TestStream.create(VarIntCoder.of())
+            .addElements(TimestampedValue.of(-1, BoundedWindow.TIMESTAMP_MAX_VALUE.minus(1L)));
+    thrown.expect(IllegalArgumentException.class);
+    stream.addElements(TimestampedValue.of(1, BoundedWindow.TIMESTAMP_MAX_VALUE));
+  }
+
+  @Test
+  public void testAdvanceWatermarkNonMonotonicThrows() {
+    Builder<Integer> stream =
+        TestStream.create(VarIntCoder.of())
+            .advanceWatermarkTo(new Instant(0L));
+    thrown.expect(IllegalArgumentException.class);
+    stream.advanceWatermarkTo(new Instant(-1L));
+  }
+
+  @Test
+  public void testAdvanceWatermarkEqualToPositiveInfinityThrows() {
+    Builder<Integer> stream =
+        TestStream.create(VarIntCoder.of())
+            .advanceWatermarkTo(BoundedWindow.TIMESTAMP_MAX_VALUE.minus(1L));
+    thrown.expect(IllegalArgumentException.class);
+    stream.advanceWatermarkTo(BoundedWindow.TIMESTAMP_MAX_VALUE);
+  }
+
+  @Test
+  public void testUnsupportedRunnerThrows() {
+    PipelineOptions opts = PipelineOptionsFactory.create();
+    opts.setRunner(CrashingRunner.class);
+
+    Pipeline p = Pipeline.create(opts);
+
+    thrown.expect(IllegalStateException.class);
+    thrown.expectMessage("does not provide a required override");
+    thrown.expectMessage(TestStream.class.getSimpleName());
+    thrown.expectMessage(CrashingRunner.class.getSimpleName());
+    p.apply(TestStream.create(VarIntCoder.of()).advanceWatermarkToInfinity());
+  }
+
+  @Test
   public void testEncodeDecode() throws Exception {
     TestStream.Event<Integer> elems =
         TestStream.ElementEvent.add(



[2/4] incubator-beam git commit: Add inEarlyPanesInGlobalWindow as a PAssert Extractor

Posted by lc...@apache.org.
Add inEarlyPanesInGlobalWindow as a PAssert Extractor

This is for use in asserting the contents of speculative panes in the
global window.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f15fab8c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f15fab8c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f15fab8c

Branch: refs/heads/master
Commit: f15fab8ccdb3b40004583e8f7e4e32a0b8ba5121
Parents: bfa3b70
Author: Thomas Groh <tg...@google.com>
Authored: Thu Aug 11 15:46:10 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Fri Aug 19 09:04:18 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/testing/PAssert.java | 18 ++++++++++++++++++
 .../apache/beam/sdk/testing/PaneExtractors.java   | 18 ++++++++++++++++++
 2 files changed, 36 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f15fab8c/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
index e07ee3d..3f1a741 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
@@ -49,6 +49,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.transforms.windowing.Never;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
 import org.apache.beam.sdk.transforms.windowing.Trigger;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
@@ -176,6 +177,13 @@ public class PAssert {
     IterableAssert<T> inCombinedNonLatePanes(BoundedWindow window);
 
     /**
+     * Creates a new {@link IterableAssert} like this one, but with the assertion restricted to only
+     * run on panes in the {@link GlobalWindow} that were emitted before the {@link GlobalWindow}
+     * closed. These panes have {@link Timing#EARLY}.
+     */
+    IterableAssert<T> inEarlyGlobalWindowPanes();
+
+    /**
      * Asserts that the iterable in question contains the provided elements.
      *
      * @return the same {@link IterableAssert} builder for further assertions
@@ -381,6 +389,11 @@ public class PAssert {
       return withPane(window, PaneExtractors.<T>nonLatePanes());
     }
 
+    @Override
+    public IterableAssert<T> inEarlyGlobalWindowPanes() {
+      return withPane(GlobalWindow.INSTANCE, PaneExtractors.<T>earlyPanes());
+    }
+
     private PCollectionContentsAssert<T> withPane(
         BoundedWindow window,
         SimpleFunction<Iterable<WindowedValue<T>>, Iterable<T>> paneExtractor) {
@@ -557,6 +570,11 @@ public class PAssert {
       return withPanes(window, PaneExtractors.<Iterable<T>>nonLatePanes());
     }
 
+    @Override
+    public IterableAssert<T> inEarlyGlobalWindowPanes() {
+      return withPanes(GlobalWindow.INSTANCE, PaneExtractors.<Iterable<T>>earlyPanes());
+    }
+
     private PCollectionSingletonIterableAssert<T> withPanes(
         BoundedWindow window,
         SimpleFunction<Iterable<WindowedValue<Iterable<T>>>, Iterable<Iterable<T>>> paneExtractor) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f15fab8c/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PaneExtractors.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PaneExtractors.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PaneExtractors.java
index f699bfc..899612b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PaneExtractors.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PaneExtractors.java
@@ -59,6 +59,10 @@ final class PaneExtractors {
     return new ExtractNonLatePanes<>();
   }
 
+  static <T> SimpleFunction<Iterable<WindowedValue<T>>, Iterable<T>> earlyPanes() {
+    return new ExtractEarlyPanes<>();
+  }
+
   static <T> SimpleFunction<Iterable<WindowedValue<T>>, Iterable<T>> allPanes() {
     return new ExtractAllPanes<>();
   }
@@ -137,4 +141,18 @@ final class PaneExtractors {
       return outputs;
     }
   }
+
+  private static class ExtractEarlyPanes<T>
+      extends SimpleFunction<Iterable<WindowedValue<T>>, Iterable<T>> {
+    @Override
+    public Iterable<T> apply(Iterable<WindowedValue<T>> input) {
+      List<T> outputs = new ArrayList<>();
+      for (WindowedValue<T> value : input) {
+        if (value.getPane().getTiming() == PaneInfo.Timing.EARLY) {
+          outputs.add(value.getValue());
+        }
+      }
+      return outputs;
+    }
+  }
 }


[4/4] incubator-beam git commit: Add TestStream to the DirectRunner Package

Posted by lc...@apache.org.
Add TestStream to the DirectRunner Package

This closes #817


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8d31ca0c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8d31ca0c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8d31ca0c

Branch: refs/heads/master
Commit: 8d31ca0ca084c37dca5a436fa8b784622b25348d
Parents: bfa3b70 a5ef9a9
Author: Luke Cwik <lc...@google.com>
Authored: Fri Aug 19 09:06:08 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Fri Aug 19 09:06:08 2016 -0700

----------------------------------------------------------------------
 .../beam/runners/direct/DirectOptions.java      |  45 +--
 .../beam/runners/direct/DirectRunner.java       |  47 ++-
 .../beam/runners/direct/EvaluationContext.java  |  10 +-
 .../FixedThreadPoolExecutorServiceFactory.java  |  45 ---
 .../beam/runners/direct/NanosOffsetClock.java   |  13 -
 .../direct/TestStreamEvaluatorFactory.java      | 204 ++++++++++
 .../direct/TransformEvaluatorRegistry.java      |  11 +
 .../direct/WriteWithShardingFactory.java        |   2 +-
 .../runners/direct/EvaluationContextTest.java   |   1 +
 .../org/apache/beam/sdk/testing/PAssert.java    |  18 +
 .../apache/beam/sdk/testing/PaneExtractors.java |  18 +
 .../org/apache/beam/sdk/testing/TestStream.java | 372 +++++++++++++++++++
 .../apache/beam/sdk/testing/TestStreamTest.java | 328 ++++++++++++++++
 13 files changed, 1005 insertions(+), 109 deletions(-)
----------------------------------------------------------------------