You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/08/19 16:21:07 UTC

[3/4] incubator-beam git commit: Implement TestStream in the DirectRunner

Implement TestStream in the DirectRunner


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a5ef9a96
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a5ef9a96
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a5ef9a96

Branch: refs/heads/master
Commit: a5ef9a9689147105854f12d3ea054b3887a94e24
Parents: c72d4fc
Author: Thomas Groh <tg...@google.com>
Authored: Mon Aug 15 19:45:58 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Fri Aug 19 09:04:19 2016 -0700

----------------------------------------------------------------------
 runners/direct-java/pom.xml                     |   3 -
 .../beam/runners/direct/DirectOptions.java      |  45 +---
 .../beam/runners/direct/DirectRunner.java       |  47 ++++-
 .../beam/runners/direct/EvaluationContext.java  |  10 +-
 .../FixedThreadPoolExecutorServiceFactory.java  |  45 ----
 .../beam/runners/direct/NanosOffsetClock.java   |  13 --
 .../direct/TestStreamEvaluatorFactory.java      | 204 +++++++++++++++++++
 .../direct/TransformEvaluatorRegistry.java      |  11 +
 .../direct/WriteWithShardingFactory.java        |   2 +-
 .../runners/direct/EvaluationContextTest.java   |   1 +
 .../org/apache/beam/sdk/testing/TestStream.java | 114 +++++++----
 .../apache/beam/sdk/testing/TestStreamTest.java | 159 +++++++++++++++
 12 files changed, 508 insertions(+), 146 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a5ef9a96/runners/direct-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/direct-java/pom.xml b/runners/direct-java/pom.xml
index 8b0f91d..e06883f 100644
--- a/runners/direct-java/pom.xml
+++ b/runners/direct-java/pom.xml
@@ -85,9 +85,6 @@
                 <dependency>org.apache.beam:beam-sdks-java-core</dependency>
                 <dependency>org.apache.beam:beam-runners-java-core</dependency>
               </dependenciesToScan>
-              <excludes>
-                <exclude>org/apache/beam/sdk/testing/TestStreamTest.java</exclude>
-              </excludes>
               <systemPropertyVariables>
                 <beamTestPipelineOptions>
                   [

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a5ef9a96/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java
index 3901c04..798fda4 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java
@@ -17,59 +17,16 @@
  */
 package org.apache.beam.runners.direct;
 
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.options.ApplicationNameOptions;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
-import org.apache.beam.sdk.options.Hidden;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.Validation.Required;
-import org.apache.beam.sdk.transforms.PTransform;
-
-import com.fasterxml.jackson.annotation.JsonIgnore;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 
 /**
  * Options that can be used to configure the {@link org.apache.beam.runners.direct.DirectRunner}.
  */
 public interface DirectOptions extends PipelineOptions, ApplicationNameOptions {
-  /**
-   * Gets the {@link ExecutorServiceFactory} to use to create instances of {@link ExecutorService}
-   * to execute {@link PTransform PTransforms}.
-   *
-   * <p>Note that {@link ExecutorService ExecutorServices} returned by the factory must ensure that
-   * it cannot enter a state in which it will not schedule additional pending work unless currently
-   * scheduled work completes, as this may cause the {@link Pipeline} to cease processing.
-   *
-   * <p>Defaults to a {@link FixedThreadPoolExecutorServiceFactory}, which produces instances of
-   * {@link Executors#newCachedThreadPool()}.
-   */
-  @JsonIgnore
-  @Required
-  @Hidden
-  @Default.InstanceFactory(FixedThreadPoolExecutorServiceFactory.class)
-  ExecutorServiceFactory getExecutorServiceFactory();
-
-  void setExecutorServiceFactory(ExecutorServiceFactory executorService);
-
-  /**
-   * Gets the {@link Clock} used by this pipeline. The clock is used in place of accessing the
-   * system time when time values are required by the evaluator.
-   */
-  @Default.InstanceFactory(NanosOffsetClock.Factory.class)
-  @JsonIgnore
-  @Required
-  @Hidden
-  @Description(
-      "The processing time source used by the pipeline. When the current time is "
-          + "needed by the evaluator, the result of clock#now() is used.")
-  Clock getClock();
-
-  void setClock(Clock clock);
-
-  @Default.Boolean(false)
+  @Default.Boolean(true)
   @Description(
       "If the pipeline should shut down producers which have reached the maximum "
           + "representable watermark. If this is set to true, a pipeline in which all PTransforms "

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a5ef9a96/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index f2b781e..68184de 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.direct;
 
 import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly;
 import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult;
+import org.apache.beam.runners.direct.TestStreamEvaluatorFactory.DirectTestStreamFactory;
 import org.apache.beam.runners.direct.ViewEvaluatorFactory.ViewOverrideFactory;
 import org.apache.beam.sdk.AggregatorRetrievalException;
 import org.apache.beam.sdk.AggregatorValues;
@@ -29,6 +30,7 @@ import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.io.Write;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.testing.TestStream;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.GroupByKey;
@@ -46,6 +48,7 @@ import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.PValue;
 
 import com.google.common.base.MoreObjects;
+import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
@@ -58,6 +61,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 /**
  * An In-Memory implementation of the Dataflow Programming Model. Supports Unbounded
@@ -76,8 +80,9 @@ public class DirectRunner
   private static Map<Class<? extends PTransform>, PTransformOverrideFactory>
       defaultTransformOverrides =
           ImmutableMap.<Class<? extends PTransform>, PTransformOverrideFactory>builder()
-              .put(GroupByKey.class, new DirectGroupByKeyOverrideFactory())
               .put(CreatePCollectionView.class, new ViewOverrideFactory())
+              .put(GroupByKey.class, new DirectGroupByKeyOverrideFactory())
+              .put(TestStream.class, new DirectTestStreamFactory())
               .put(Write.Bound.class, new WriteWithShardingFactory())
               .build();
 
@@ -175,6 +180,8 @@ public class DirectRunner
 
   ////////////////////////////////////////////////////////////////////////////////////////////////
   private final DirectOptions options;
+  private Supplier<ExecutorService> executorServiceSupplier = new FixedThreadPoolSupplier();
+  private Supplier<Clock> clockSupplier = new NanosOffsetClockSupplier();
 
   public static DirectRunner fromOptions(PipelineOptions options) {
     return new DirectRunner(options.as(DirectOptions.class));
@@ -191,6 +198,14 @@ public class DirectRunner
     return options;
   }
 
+  Supplier<Clock> getClockSupplier() {
+    return clockSupplier;
+  }
+
+  void setClockSupplier(Supplier<Clock> supplier) {
+    this.clockSupplier = supplier;
+  }
+
   @Override
   public <OutputT extends POutput, InputT extends PInput> OutputT apply(
       PTransform<InputT, OutputT> transform, InputT input) {
@@ -223,6 +238,7 @@ public class DirectRunner
     EvaluationContext context =
         EvaluationContext.create(
             getPipelineOptions(),
+            clockSupplier.get(),
             createBundleFactory(getPipelineOptions()),
             consumerTrackingVisitor.getRootTransforms(),
             consumerTrackingVisitor.getValueToConsumers(),
@@ -230,14 +246,15 @@ public class DirectRunner
             consumerTrackingVisitor.getViews());
 
     // independent executor service for each run
-    ExecutorService executorService =
-        context.getPipelineOptions().getExecutorServiceFactory().create();
+    ExecutorService executorService = executorServiceSupplier.get();
+
+    TransformEvaluatorRegistry registry = TransformEvaluatorRegistry.defaultRegistry();
     PipelineExecutor executor =
         ExecutorServiceParallelExecutor.create(
             executorService,
             consumerTrackingVisitor.getValueToConsumers(),
             keyedPValueVisitor.getKeyedPValues(),
-            TransformEvaluatorRegistry.defaultRegistry(),
+            registry,
             defaultModelEnforcements(options),
             context);
     executor.start(consumerTrackingVisitor.getRootTransforms());
@@ -392,4 +409,26 @@ public class DirectRunner
           "DirectPipelineResult does not support waitUntilFinish.");
     }
   }
+
+  /**
+   * A {@link Supplier} that creates a {@link ExecutorService} based on
+   * {@link Executors#newFixedThreadPool(int)}.
+   */
+  private static class FixedThreadPoolSupplier implements Supplier<ExecutorService> {
+    @Override
+    public ExecutorService get() {
+      return Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
+    }
+  }
+
+
+  /**
+   * A {@link Supplier} that creates a {@link NanosOffsetClock}.
+   */
+  private static class NanosOffsetClockSupplier implements Supplier<Clock> {
+    @Override
+    public Clock get() {
+      return NanosOffsetClock.create();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a5ef9a96/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
index 94f28e2..b9f159a 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
@@ -102,24 +102,26 @@ class EvaluationContext {
 
   public static EvaluationContext create(
       DirectOptions options,
+      Clock clock,
       BundleFactory bundleFactory,
       Collection<AppliedPTransform<?, ?, ?>> rootTransforms,
       Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers,
       Map<AppliedPTransform<?, ?, ?>, String> stepNames,
       Collection<PCollectionView<?>> views) {
     return new EvaluationContext(
-        options, bundleFactory, rootTransforms, valueToConsumers, stepNames, views);
+        options, clock, bundleFactory, rootTransforms, valueToConsumers, stepNames, views);
   }
 
   private EvaluationContext(
       DirectOptions options,
+      Clock clock,
       BundleFactory bundleFactory,
       Collection<AppliedPTransform<?, ?, ?>> rootTransforms,
       Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers,
       Map<AppliedPTransform<?, ?, ?>, String> stepNames,
       Collection<PCollectionView<?>> views) {
     this.options = checkNotNull(options);
-    this.clock = options.getClock();
+    this.clock = clock;
     this.bundleFactory = checkNotNull(bundleFactory);
     checkNotNull(rootTransforms);
     checkNotNull(valueToConsumers);
@@ -433,4 +435,8 @@ class EvaluationContext {
   public Instant now() {
     return clock.now();
   }
+
+  Clock getClock() {
+    return clock;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a5ef9a96/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FixedThreadPoolExecutorServiceFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FixedThreadPoolExecutorServiceFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FixedThreadPoolExecutorServiceFactory.java
deleted file mode 100644
index 74c4292..0000000
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FixedThreadPoolExecutorServiceFactory.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct;
-
-import org.apache.beam.sdk.options.DefaultValueFactory;
-import org.apache.beam.sdk.options.PipelineOptions;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-/**
- * A {@link ExecutorServiceFactory} that produces fixed thread pools via
- * {@link Executors#newFixedThreadPool(int)}, with the number of threads equal to the available
- * processors as provided by {@link Runtime#availableProcessors()}.
- */
-class FixedThreadPoolExecutorServiceFactory
-    implements DefaultValueFactory<ExecutorServiceFactory>, ExecutorServiceFactory {
-  private static final FixedThreadPoolExecutorServiceFactory INSTANCE =
-      new FixedThreadPoolExecutorServiceFactory();
-
-  @Override
-  public ExecutorServiceFactory create(PipelineOptions options) {
-    return INSTANCE;
-  }
-
-  @Override
-  public ExecutorService create() {
-    return Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a5ef9a96/runners/direct-java/src/main/java/org/apache/beam/runners/direct/NanosOffsetClock.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/NanosOffsetClock.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/NanosOffsetClock.java
index ffdee9d..77fa196 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/NanosOffsetClock.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/NanosOffsetClock.java
@@ -17,9 +17,6 @@
  */
 package org.apache.beam.runners.direct;
 
-import org.apache.beam.sdk.options.DefaultValueFactory;
-import org.apache.beam.sdk.options.PipelineOptions;
-
 import org.joda.time.Instant;
 
 import java.util.concurrent.TimeUnit;
@@ -46,14 +43,4 @@ public class NanosOffsetClock implements Clock {
         baseMillis + (TimeUnit.MILLISECONDS.convert(
             System.nanoTime() - nanosAtBaseMillis, TimeUnit.NANOSECONDS)));
   }
-
-  /**
-   * Creates instances of {@link NanosOffsetClock}.
-   */
-  public static class Factory implements DefaultValueFactory<Clock> {
-    @Override
-    public Clock create(PipelineOptions options) {
-      return new NanosOffsetClock();
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a5ef9a96/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
new file mode 100644
index 0000000..90a83b0
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.testing.TestStream;
+import org.apache.beam.sdk.testing.TestStream.ElementEvent;
+import org.apache.beam.sdk.testing.TestStream.Event;
+import org.apache.beam.sdk.testing.TestStream.EventType;
+import org.apache.beam.sdk.testing.TestStream.ProcessingTimeEvent;
+import org.apache.beam.sdk.testing.TestStream.WatermarkEvent;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollection.IsBounded;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.TimestampedValue;
+
+import com.google.common.base.Supplier;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.annotation.Nullable;
+
+/**
+ * The {@link TransformEvaluatorFactory} for the {@link TestStream} primitive.
+ */
+class TestStreamEvaluatorFactory implements TransformEvaluatorFactory {
+  private final AtomicBoolean inUse = new AtomicBoolean(false);
+  private final AtomicReference<Evaluator<?>> evaluator = new AtomicReference<>();
+
+  @Nullable
+  @Override
+  public <InputT> TransformEvaluator<InputT> forApplication(
+      AppliedPTransform<?, ?, ?> application,
+      @Nullable CommittedBundle<?> inputBundle,
+      EvaluationContext evaluationContext) throws Exception {
+    return createEvaluator((AppliedPTransform) application, evaluationContext);
+  }
+
+  @Override
+  public void cleanup() throws Exception {}
+
+  private <InputT, OutputT> TransformEvaluator<? super InputT> createEvaluator(
+      AppliedPTransform<PBegin, PCollection<OutputT>, TestStream<OutputT>> application,
+      EvaluationContext evaluationContext) {
+    if (evaluator.get() == null) {
+      Evaluator<OutputT> createdEvaluator = new Evaluator<>(application, evaluationContext, inUse);
+      evaluator.compareAndSet(null, createdEvaluator);
+    }
+    if (inUse.compareAndSet(false, true)) {
+      return evaluator.get();
+    } else {
+      return null;
+    }
+  }
+
+  private static class Evaluator<T> implements TransformEvaluator<Object> {
+    private final AppliedPTransform<PBegin, PCollection<T>, TestStream<T>> application;
+    private final EvaluationContext context;
+    private final AtomicBoolean inUse;
+    private final List<Event<T>> events;
+    private int index;
+    private Instant currentWatermark;
+
+    private Evaluator(
+        AppliedPTransform<PBegin, PCollection<T>, TestStream<T>> application,
+        EvaluationContext context,
+        AtomicBoolean inUse) {
+      this.application = application;
+      this.context = context;
+      this.inUse = inUse;
+      this.events = application.getTransform().getEvents();
+      index = 0;
+      currentWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
+    }
+
+    @Override
+    public void processElement(WindowedValue<Object> element) throws Exception {
+    }
+
+    @Override
+    public TransformResult finishBundle() throws Exception {
+      if (index >= events.size()) {
+        return StepTransformResult.withHold(application, BoundedWindow.TIMESTAMP_MAX_VALUE).build();
+      }
+      Event<T> event = events.get(index);
+      if (event.getType().equals(EventType.WATERMARK)) {
+        currentWatermark = ((WatermarkEvent<T>) event).getWatermark();
+      }
+      StepTransformResult.Builder result =
+          StepTransformResult.withHold(application, currentWatermark);
+      if (event.getType().equals(EventType.ELEMENT)) {
+        UncommittedBundle<T> bundle = context.createRootBundle(application.getOutput());
+        for (TimestampedValue<T> elem : ((ElementEvent<T>) event).getElements()) {
+          bundle.add(WindowedValue.timestampedValueInGlobalWindow(elem.getValue(),
+              elem.getTimestamp()));
+        }
+        result.addOutput(bundle);
+      }
+      if (event.getType().equals(EventType.PROCESSING_TIME)) {
+        ((TestClock) context.getClock())
+            .advance(((ProcessingTimeEvent<T>) event).getProcessingTimeAdvance());
+      }
+      index++;
+      checkState(inUse.compareAndSet(true, false),
+          "The InUse flag of a %s was changed while the source evaluator was executing. "
+              + "%s cannot be split or evaluated in parallel.",
+          TestStream.class.getSimpleName(),
+          TestStream.class.getSimpleName());
+      return result.build();
+    }
+  }
+
+  private static class TestClock implements Clock {
+    private final AtomicReference<Instant> currentTime =
+        new AtomicReference<>(BoundedWindow.TIMESTAMP_MIN_VALUE);
+
+    public void advance(Duration amount) {
+      Instant now = currentTime.get();
+      currentTime.compareAndSet(now, now.plus(amount));
+    }
+
+    @Override
+    public Instant now() {
+      return currentTime.get();
+    }
+  }
+
+  private static class TestClockSupplier implements Supplier<Clock> {
+    @Override
+    public Clock get() {
+      return new TestClock();
+    }
+  }
+
+  static class DirectTestStreamFactory implements PTransformOverrideFactory {
+    @Override
+    public <InputT extends PInput, OutputT extends POutput> PTransform<InputT, OutputT> override(
+        PTransform<InputT, OutputT> transform) {
+      if (transform instanceof TestStream) {
+        return (PTransform<InputT, OutputT>)
+            new DirectTestStream<OutputT>((TestStream<OutputT>) transform);
+      }
+      return transform;
+    }
+
+    private static class DirectTestStream<T> extends PTransform<PBegin, PCollection<T>> {
+      private final TestStream<T> original;
+
+      private DirectTestStream(TestStream transform) {
+        this.original = transform;
+      }
+
+      @Override
+      public PCollection<T> apply(PBegin input) {
+        setup(input.getPipeline());
+        return PCollection.<T>createPrimitiveOutputInternal(
+                input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED)
+            .setCoder(original.getValueCoder());
+      }
+
+      private void setup(Pipeline p) {
+        PipelineRunner runner = p.getRunner();
+        checkState(runner instanceof DirectRunner,
+            "%s can only be used when running with the %s",
+            getClass().getSimpleName(),
+            DirectRunner.class.getSimpleName());
+        ((DirectRunner) runner).setClockSupplier(new TestClockSupplier());
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a5ef9a96/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
index b469237..c35e8b1 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
@@ -23,6 +23,7 @@ import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupAlsoByWindow;
 import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.testing.TestStream;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Flatten.FlattenPCollectionList;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -61,6 +62,7 @@ class TransformEvaluatorRegistry implements TransformEvaluatorFactory {
             // Runner-specific primitives used in expansion of GroupByKey
             .put(DirectGroupByKeyOnly.class, new GroupByKeyOnlyEvaluatorFactory())
             .put(DirectGroupAlsoByWindow.class, new GroupAlsoByWindowEvaluatorFactory())
+            .put(TestStream.class, new TestStreamEvaluatorFactory())
             .build();
     return new TransformEvaluatorRegistry(primitives);
   }
@@ -117,4 +119,13 @@ class TransformEvaluatorRegistry implements TransformEvaluatorFactory {
       throw toThrow;
     }
   }
+
+  /**
+   * A factory to create Transform Evaluator Registries.
+   */
+  public static class Factory {
+    public TransformEvaluatorRegistry create() {
+      return TransformEvaluatorRegistry.defaultRegistry();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a5ef9a96/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
index c2157b8..1ab3403 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
@@ -66,7 +66,7 @@ class WriteWithShardingFactory implements PTransformOverrideFactory {
     return transform;
   }
 
-  private static class DynamicallyReshardedWrite <T> extends PTransform<PCollection<T>, PDone> {
+  private static class DynamicallyReshardedWrite<T> extends PTransform<PCollection<T>, PDone> {
     private final transient Write.Bound<T> original;
 
     private DynamicallyReshardedWrite(Bound<T> original) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a5ef9a96/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
index d4b5773..7ac0caa 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
@@ -117,6 +117,7 @@ public class EvaluationContextTest {
     context =
         EvaluationContext.create(
             runner.getPipelineOptions(),
+            NanosOffsetClock.create(),
             ImmutableListBundleFactory.create(),
             rootTransforms,
             valueToConsumers,

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a5ef9a96/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
index 6d11f72..e2eda32 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
@@ -32,10 +32,8 @@ import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.PropertyNames;
 import org.apache.beam.sdk.util.VarInt;
-import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollection.IsBounded;
 import org.apache.beam.sdk.values.TimestampedValue;
 import org.apache.beam.sdk.values.TimestampedValue.TimestampedValueCoder;
 
@@ -83,30 +81,30 @@ public final class TestStream<T> extends PTransform<PBegin, PCollection<T>> {
     this.events = checkNotNull(events);
   }
 
-  public Coder<Event<T>> getEventCoder() {
-    return EventCoder.of(coder);
-  }
-
   /**
    * An incomplete {@link TestStream}. Elements added to this builder will be produced in sequence
    * when the pipeline created by the {@link TestStream} is run.
    */
   public static class Builder<T> {
     private final Coder<T> coder;
-    private final ImmutableList.Builder<Event<T>> events;
-    private Instant currentWatermark;
+    private final ImmutableList<Event<T>> events;
+    private final Instant currentWatermark;
 
     private Builder(Coder<T> coder) {
-      this.coder = coder;
-      events = ImmutableList.builder();
+      this(coder, ImmutableList.<Event<T>>of(), BoundedWindow.TIMESTAMP_MIN_VALUE);
+    }
 
-      currentWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
+    private Builder(Coder<T> coder, ImmutableList<Event<T>> events, Instant currentWatermark) {
+      this.coder = coder;
+      this.events = events;
+      this.currentWatermark = currentWatermark;
     }
 
     /**
      * Adds the specified elements to the source with timestamp equal to the current watermark.
      *
-     * @return this {@link TestStream.Builder}
+     * @return A {@link TestStream.Builder} like this one that will add the provided elements
+     *         after all earlier events have completed.
      */
     @SafeVarargs
     public final Builder<T> addElements(T element, T... elements) {
@@ -122,22 +120,40 @@ public final class TestStream<T> extends PTransform<PBegin, PCollection<T>> {
     /**
      * Adds the specified elements to the source with the provided timestamps.
      *
-     * @return this {@link TestStream.Builder}
+     * @return A {@link TestStream.Builder} like this one that will add the provided elements
+     *         after all earlier events have completed.
      */
     @SafeVarargs
     public final Builder<T> addElements(
         TimestampedValue<T> element, TimestampedValue<T>... elements) {
-      events.add(ElementEvent.add(element, elements));
-      return this;
+      checkArgument(
+          element.getTimestamp().isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE),
+          "Elements must have timestamps before %s. Got: %s",
+          BoundedWindow.TIMESTAMP_MAX_VALUE,
+          element.getTimestamp());
+      for (TimestampedValue<T> multiElement : elements) {
+        checkArgument(
+            multiElement.getTimestamp().isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE),
+            "Elements must have timestamps before %s. Got: %s",
+            BoundedWindow.TIMESTAMP_MAX_VALUE,
+            multiElement.getTimestamp());
+      }
+      ImmutableList<Event<T>> newEvents =
+          ImmutableList.<Event<T>>builder()
+              .addAll(events)
+              .add(ElementEvent.add(element, elements))
+              .build();
+      return new Builder<T>(coder, newEvents, currentWatermark);
     }
 
     /**
      * Advance the watermark of this source to the specified instant.
      *
-     * <p>The watermark must advance monotonically and to at most {@link
-     * BoundedWindow#TIMESTAMP_MAX_VALUE}.
+     * <p>The watermark must advance monotonically and cannot advance to {@link
+     * BoundedWindow#TIMESTAMP_MAX_VALUE} or beyond.
      *
-     * @return this {@link TestStream.Builder}
+     * @return A {@link TestStream.Builder} like this one that will advance the watermark to the
+     *         specified point after all earlier events have completed.
      */
     public Builder<T> advanceWatermarkTo(Instant newWatermark) {
       checkArgument(
@@ -147,23 +163,30 @@ public final class TestStream<T> extends PTransform<PBegin, PCollection<T>> {
           "The Watermark cannot progress beyond the maximum. Got: %s. Maximum: %s",
           newWatermark,
           BoundedWindow.TIMESTAMP_MAX_VALUE);
-      events.add(WatermarkEvent.<T>advanceTo(newWatermark));
-      currentWatermark = newWatermark;
-      return this;
+      ImmutableList<Event<T>> newEvents = ImmutableList.<Event<T>>builder()
+          .addAll(events)
+          .add(WatermarkEvent.<T>advanceTo(newWatermark))
+          .build();
+      return new Builder<T>(coder, newEvents, newWatermark);
     }
 
     /**
      * Advance the processing time by the specified amount.
      *
-     * @return this {@link TestStream.Builder}
+     * @return A {@link TestStream.Builder} like this one that will advance the processing time by
+     *         the specified amount after all earlier events have completed.
      */
     public Builder<T> advanceProcessingTime(Duration amount) {
       checkArgument(
           amount.getMillis() > 0,
           "Must advance the processing time by a positive amount. Got: ",
           amount);
-      events.add(ProcessingTimeEvent.<T>advanceBy(amount));
-      return this;
+      ImmutableList<Event<T>> newEvents =
+          ImmutableList.<Event<T>>builder()
+              .addAll(events)
+              .add(ProcessingTimeEvent.<T>advanceBy(amount))
+              .build();
+      return new Builder<T>(coder, newEvents, currentWatermark);
     }
 
     /**
@@ -171,8 +194,12 @@ public final class TestStream<T> extends PTransform<PBegin, PCollection<T>> {
      * same builder will not affect the returned {@link TestStream}.
      */
     public TestStream<T> advanceWatermarkToInfinity() {
-      events.add(WatermarkEvent.<T>advanceTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
-      return new TestStream<>(coder, events.build());
+      ImmutableList<Event<T>> newEvents =
+          ImmutableList.<Event<T>>builder()
+              .addAll(events)
+              .add(WatermarkEvent.<T>advanceTo(BoundedWindow.TIMESTAMP_MAX_VALUE))
+              .build();
+      return new TestStream<>(coder, newEvents);
     }
   }
 
@@ -230,12 +257,30 @@ public final class TestStream<T> extends PTransform<PBegin, PCollection<T>> {
 
   @Override
   public PCollection<T> apply(PBegin input) {
-    return PCollection.<T>createPrimitiveOutputInternal(
-            input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED)
-        .setCoder(coder);
+    throw new IllegalStateException(
+        String.format(
+            "Pipeline Runner %s does not provide a required override for %s",
+            input.getPipeline().getRunner().getClass().getSimpleName(),
+            getClass().getSimpleName()));
+  }
+
+  public Coder<T> getValueCoder() {
+    return coder;
+  }
+
+  /**
+   * Returns a coder suitable for encoding {@link TestStream.Event}.
+   */
+  public Coder<Event<T>> getEventCoder() {
+    return EventCoder.of(coder);
   }
 
-  public List<Event<T>> getStreamEvents() {
+  /**
+   * Returns the sequence of {@link Event Events} in this {@link TestStream}.
+   *
+   * <p>For use by {@link PipelineRunner} authors.
+   */
+  public List<Event<T>> getEvents() {
     return events;
   }
 
@@ -243,7 +288,7 @@ public final class TestStream<T> extends PTransform<PBegin, PCollection<T>> {
    * A {@link Coder} that encodes and decodes {@link TestStream.Event Events}.
    *
    * @param <T> the type of elements in {@link ElementEvent ElementEvents} encoded and decoded by
-   *     this {@link EventCoder}
+   *            this {@link EventCoder}
    */
   @VisibleForTesting
   static final class EventCoder<T> extends StandardCoder<Event<T>> {
@@ -290,14 +335,15 @@ public final class TestStream<T> extends PTransform<PBegin, PCollection<T>> {
           DURATION_CODER.encode(processingAdvance, outStream, context);
           break;
         default:
-          throw new AssertionError("Unreachable");
+          throw new AssertionError("Unreachable: Unsupported Event Type " + value.getType());
       }
     }
 
     @Override
     public Event<T> decode(
         InputStream inStream, Context context) throws IOException {
-      switch (EventType.values()[VarInt.decodeInt(inStream)]) {
+      EventType eventType = EventType.values()[VarInt.decodeInt(inStream)];
+      switch (eventType) {
         case ELEMENT:
           Iterable<TimestampedValue<T>> elements = elementCoder.decode(inStream, context);
           return ElementEvent.add(elements);
@@ -307,7 +353,7 @@ public final class TestStream<T> extends PTransform<PBegin, PCollection<T>> {
           return ProcessingTimeEvent.advanceBy(
               DURATION_CODER.decode(inStream, context).toDuration());
         default:
-          throw new AssertionError("Unreachable");
+          throw new AssertionError("Unreachable: Unsupported Event Type " + eventType);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a5ef9a96/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
index 09bccfa..df37d7f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
@@ -23,8 +23,13 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.lessThanOrEqualTo;
 import static org.junit.Assert.assertThat;
 
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.TestStream.Builder;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
@@ -35,8 +40,12 @@ import org.apache.beam.sdk.transforms.WithKeys;
 import org.apache.beam.sdk.transforms.windowing.AfterPane;
 import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
 import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.Never;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
 import org.apache.beam.sdk.values.PCollection;
@@ -44,8 +53,10 @@ import org.apache.beam.sdk.values.TimestampedValue;
 
 import org.joda.time.Duration;
 import org.joda.time.Instant;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
@@ -56,6 +67,8 @@ import java.io.Serializable;
  */
 @RunWith(JUnit4.class)
 public class TestStreamTest implements Serializable {
+  @Rule public transient ExpectedException thrown = ExpectedException.none();
+
   @Test
   @Category(NeedsRunner.class)
   public void testLateDataAccumulating() {
@@ -149,6 +162,152 @@ public class TestStreamTest implements Serializable {
   }
 
   @Test
+  @Category(NeedsRunner.class)
+  public void testDiscardingMode() {
+    TestStream<String> stream =
+        TestStream.create(StringUtf8Coder.of())
+            .advanceWatermarkTo(new Instant(0))
+            .addElements(
+                TimestampedValue.of("firstPane", new Instant(100)),
+                TimestampedValue.of("alsoFirstPane", new Instant(200)))
+            .addElements(TimestampedValue.of("onTimePane", new Instant(500)))
+            .advanceWatermarkTo(new Instant(1001L))
+            .addElements(
+                TimestampedValue.of("finalLatePane", new Instant(750)),
+                TimestampedValue.of("alsoFinalLatePane", new Instant(250)))
+            .advanceWatermarkToInfinity();
+
+    TestPipeline p = TestPipeline.create();
+    FixedWindows windowFn = FixedWindows.of(Duration.millis(1000L));
+    Duration allowedLateness = Duration.millis(5000L);
+    PCollection<String> values =
+        p.apply(stream)
+            .apply(
+                Window.<String>into(windowFn)
+                    .triggering(
+                        AfterWatermark.pastEndOfWindow()
+                            .withEarlyFirings(AfterPane.elementCountAtLeast(2))
+                            .withLateFirings(Never.ever()))
+                    .discardingFiredPanes()
+                    .withAllowedLateness(allowedLateness))
+            .apply(WithKeys.<Integer, String>of(1))
+            .apply(GroupByKey.<Integer, String>create())
+            .apply(Values.<Iterable<String>>create())
+            .apply(Flatten.<String>iterables());
+
+    IntervalWindow window = windowFn.assignWindow(new Instant(100));
+    PAssert.that(values)
+        .inWindow(window)
+        .containsInAnyOrder(
+            "firstPane", "alsoFirstPane", "onTimePane", "finalLatePane", "alsoFinalLatePane");
+    PAssert.that(values)
+        .inCombinedNonLatePanes(window)
+        .containsInAnyOrder("firstPane", "alsoFirstPane", "onTimePane");
+    PAssert.that(values).inOnTimePane(window).containsInAnyOrder("onTimePane");
+    PAssert.that(values)
+        .inFinalPane(window)
+        .containsInAnyOrder("finalLatePane", "alsoFinalLatePane");
+
+    p.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testFirstElementLate() {
+    Instant lateElementTimestamp = new Instant(-1_000_000);
+    TestStream<String> stream =
+        TestStream.create(StringUtf8Coder.of())
+            .advanceWatermarkTo(new Instant(0))
+            .addElements(TimestampedValue.of("late", lateElementTimestamp))
+            .addElements(TimestampedValue.of("onTime", new Instant(100)))
+            .advanceWatermarkToInfinity();
+
+    TestPipeline p = TestPipeline.create();
+    FixedWindows windowFn = FixedWindows.of(Duration.millis(1000L));
+    Duration allowedLateness = Duration.millis(5000L);
+    PCollection<String> values = p.apply(stream)
+        .apply(Window.<String>into(windowFn).triggering(DefaultTrigger.of())
+            .discardingFiredPanes()
+            .withAllowedLateness(allowedLateness))
+        .apply(WithKeys.<Integer, String>of(1))
+        .apply(GroupByKey.<Integer, String>create())
+        .apply(Values.<Iterable<String>>create())
+        .apply(Flatten.<String>iterables());
+
+    PAssert.that(values).inWindow(windowFn.assignWindow(lateElementTimestamp)).empty();
+    PAssert.that(values)
+        .inWindow(windowFn.assignWindow(new Instant(100)))
+        .containsInAnyOrder("onTime");
+
+    p.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testElementsAtAlmostPositiveInfinity() {
+    Instant endOfGlobalWindow = GlobalWindow.INSTANCE.maxTimestamp();
+    TestStream<String> stream = TestStream.create(StringUtf8Coder.of())
+        .addElements(TimestampedValue.of("foo", endOfGlobalWindow),
+            TimestampedValue.of("bar", endOfGlobalWindow))
+        .advanceWatermarkToInfinity();
+
+    TestPipeline p = TestPipeline.create();
+    FixedWindows windows = FixedWindows.of(Duration.standardHours(6));
+    PCollection<String> windowedValues = p.apply(stream)
+        .apply(Window.<String>into(windows))
+        .apply(WithKeys.<Integer, String>of(1))
+        .apply(GroupByKey.<Integer, String>create())
+        .apply(Values.<Iterable<String>>create())
+        .apply(Flatten.<String>iterables());
+
+    PAssert.that(windowedValues)
+        .inWindow(windows.assignWindow(GlobalWindow.INSTANCE.maxTimestamp()))
+        .containsInAnyOrder("foo", "bar");
+    p.run();
+  }
+
+  @Test
+  public void testElementAtPositiveInfinityThrows() {
+    Builder<Integer> stream =
+        TestStream.create(VarIntCoder.of())
+            .addElements(TimestampedValue.of(-1, BoundedWindow.TIMESTAMP_MAX_VALUE.minus(1L)));
+    thrown.expect(IllegalArgumentException.class);
+    stream.addElements(TimestampedValue.of(1, BoundedWindow.TIMESTAMP_MAX_VALUE));
+  }
+
+  @Test
+  public void testAdvanceWatermarkNonMonotonicThrows() {
+    Builder<Integer> stream =
+        TestStream.create(VarIntCoder.of())
+            .advanceWatermarkTo(new Instant(0L));
+    thrown.expect(IllegalArgumentException.class);
+    stream.advanceWatermarkTo(new Instant(-1L));
+  }
+
+  @Test
+  public void testAdvanceWatermarkEqualToPositiveInfinityThrows() {
+    Builder<Integer> stream =
+        TestStream.create(VarIntCoder.of())
+            .advanceWatermarkTo(BoundedWindow.TIMESTAMP_MAX_VALUE.minus(1L));
+    thrown.expect(IllegalArgumentException.class);
+    stream.advanceWatermarkTo(BoundedWindow.TIMESTAMP_MAX_VALUE);
+  }
+
+  @Test
+  public void testUnsupportedRunnerThrows() {
+    PipelineOptions opts = PipelineOptionsFactory.create();
+    opts.setRunner(CrashingRunner.class);
+
+    Pipeline p = Pipeline.create(opts);
+
+    thrown.expect(IllegalStateException.class);
+    thrown.expectMessage("does not provide a required override");
+    thrown.expectMessage(TestStream.class.getSimpleName());
+    thrown.expectMessage(CrashingRunner.class.getSimpleName());
+    p.apply(TestStream.create(VarIntCoder.of()).advanceWatermarkToInfinity());
+  }
+
+  @Test
   public void testEncodeDecode() throws Exception {
     TestStream.Event<Integer> elems =
         TestStream.ElementEvent.add(