You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/07/20 17:09:47 UTC

[20/28] beam git commit: Revert "[BEAM-2610] This closes #3553"

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java
index caad7f8..e7542cb 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java
@@ -24,7 +24,7 @@ import org.apache.beam.sdk.values.PCollectionView;
 
 /** A {@link DataflowRunner} marker class for creating a {@link PCollectionView}. */
 public class CreateDataflowView<ElemT, ViewT>
-    extends PTransform<PCollection<ElemT>, PCollection<ElemT>> {
+    extends PTransform<PCollection<ElemT>, PCollectionView<ViewT>> {
   public static <ElemT, ViewT> CreateDataflowView<ElemT, ViewT> of(PCollectionView<ViewT> view) {
     return new CreateDataflowView<>(view);
   }
@@ -36,10 +36,8 @@ public class CreateDataflowView<ElemT, ViewT>
   }
 
   @Override
-  public PCollection<ElemT> expand(PCollection<ElemT> input) {
-    return PCollection.<ElemT>createPrimitiveOutputInternal(
-            input.getPipeline(), input.getWindowingStrategy(), input.isBounded())
-        .setCoder(input.getCoder());
+  public PCollectionView<ViewT> expand(PCollection<ElemT> input) {
+    return view;
   }
 
   public PCollectionView<ViewT> getView() {

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index f1783de..8eaf61b 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -56,8 +56,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
 import javax.annotation.Nullable;
-import org.apache.beam.runners.core.construction.SplittableParDo;
-import org.apache.beam.runners.core.construction.TransformInputs;
 import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
 import org.apache.beam.runners.dataflow.BatchViewOverrides.GroupByKeyAndSortValuesOnly;
 import org.apache.beam.runners.dataflow.DataflowRunner.CombineGroupedValues;
@@ -397,9 +395,7 @@ public class DataflowPipelineTranslator {
 
     @Override
     public <InputT extends PValue> InputT getInput(PTransform<InputT, ?> transform) {
-      return (InputT)
-          Iterables.getOnlyElement(
-              TransformInputs.nonAdditionalInputs(getCurrentTransform(transform)));
+      return (InputT) Iterables.getOnlyElement(getInputs(transform).values());
     }
 
     @Override
@@ -444,14 +440,6 @@ public class DataflowPipelineTranslator {
     public void visitValue(PValue value, TransformHierarchy.Node producer) {
       LOG.debug("Checking translation of {}", value);
       // Primitive transforms are the only ones assigned step names.
-      if (producer.getTransform() instanceof CreateDataflowView) {
-        // CreateDataflowView produces a dummy output (as it must be a primitive transform) but
-        // in the Dataflow Job graph produces only the view and not the output PCollection.
-        asOutputReference(
-            ((CreateDataflowView) producer.getTransform()).getView(),
-            producer.toAppliedPTransform(getPipeline()));
-        return;
-      }
       asOutputReference(value, producer.toAppliedPTransform(getPipeline()));
     }
 
@@ -477,7 +465,6 @@ public class DataflowPipelineTranslator {
       StepTranslator stepContext = new StepTranslator(this, step);
       stepContext.addInput(PropertyNames.USER_NAME, getFullName(transform));
       stepContext.addDisplayData(step, stepName, transform);
-      LOG.info("Adding {} as step {}", getCurrentTransform(transform).getFullName(), stepName);
       return stepContext;
     }
 
@@ -690,7 +677,7 @@ public class DataflowPipelineTranslator {
                 context.addStep(transform, "CollectionToSingleton");
             PCollection<ElemT> input = context.getInput(transform);
             stepContext.addInput(PropertyNames.PARALLEL_INPUT, input);
-            stepContext.addCollectionToSingletonOutput(input, transform.getView());
+            stepContext.addCollectionToSingletonOutput(input, context.getOutput(transform));
           }
         });
 
@@ -793,7 +780,6 @@ public class DataflowPipelineTranslator {
                 context.getPipelineOptions().as(StreamingOptions.class).isStreaming();
             boolean disallowCombinerLifting =
                 !windowingStrategy.getWindowFn().isNonMerging()
-                    || !windowingStrategy.getWindowFn().assignsToOneWindow()
                     || (isStreaming && !transform.fewKeys())
                     // TODO: Allow combiner lifting on the non-default trigger, as appropriate.
                     || !(windowingStrategy.getTrigger() instanceof DefaultTrigger);
@@ -888,45 +874,6 @@ public class DataflowPipelineTranslator {
     // IO Translation.
 
     registerTransformTranslator(Read.Bounded.class, new ReadTranslator());
-
-    ///////////////////////////////////////////////////////////////////////////
-    // Splittable DoFn translation.
-
-    registerTransformTranslator(
-        SplittableParDo.ProcessKeyedElements.class,
-        new TransformTranslator<SplittableParDo.ProcessKeyedElements>() {
-          @Override
-          public void translate(
-              SplittableParDo.ProcessKeyedElements transform, TranslationContext context) {
-            translateTyped(transform, context);
-          }
-
-          private <InputT, OutputT, RestrictionT> void translateTyped(
-              SplittableParDo.ProcessKeyedElements<InputT, OutputT, RestrictionT> transform,
-              TranslationContext context) {
-            StepTranslationContext stepContext =
-                context.addStep(transform, "SplittableProcessKeyed");
-
-            translateInputs(
-                stepContext, context.getInput(transform), transform.getSideInputs(), context);
-            BiMap<Long, TupleTag<?>> outputMap =
-                translateOutputs(context.getOutputs(transform), stepContext);
-            stepContext.addInput(
-                PropertyNames.SERIALIZED_FN,
-                byteArrayToJsonString(
-                    serializeToByteArray(
-                        DoFnInfo.forFn(
-                            transform.getFn(),
-                            transform.getInputWindowingStrategy(),
-                            transform.getSideInputs(),
-                            transform.getElementCoder(),
-                            outputMap.inverse().get(transform.getMainOutputTag()),
-                            outputMap))));
-            stepContext.addInput(
-                PropertyNames.RESTRICTION_CODER,
-                CloudObjects.asCloudObject(transform.getRestrictionCoder()));
-          }
-        });
   }
 
   private static void translateInputs(
@@ -973,11 +920,6 @@ public class DataflowPipelineTranslator {
               fn));
     }
 
-    if (signature.usesState() || signature.usesTimers()) {
-      DataflowRunner.verifyStateSupported(fn);
-      DataflowRunner.verifyStateSupportForWindowingStrategy(windowingStrategy);
-    }
-
     stepContext.addInput(PropertyNames.USER_FN, fn.getClass().getName());
     stepContext.addInput(
         PropertyNames.SERIALIZED_FN,

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 8935759..3e7c8ce 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -67,12 +67,11 @@ import org.apache.beam.runners.core.construction.ReplacementOutputs;
 import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
 import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource;
 import org.apache.beam.runners.core.construction.UnconsumedReads;
-import org.apache.beam.runners.core.construction.WriteFilesTranslation;
+import org.apache.beam.runners.dataflow.BatchViewOverrides.BatchCombineGloballyAsSingletonViewFactory;
 import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.JobSpecification;
 import org.apache.beam.runners.dataflow.StreamingViewOverrides.StreamingCreatePCollectionViewFactory;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
-import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
 import org.apache.beam.runners.dataflow.util.DataflowTemplateJob;
 import org.apache.beam.runners.dataflow.util.DataflowTransport;
 import org.apache.beam.runners.dataflow.util.MonitoringUtil;
@@ -93,7 +92,6 @@ import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.io.WriteFiles;
 import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder;
@@ -107,8 +105,6 @@ import org.apache.beam.sdk.runners.PTransformOverride;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory;
 import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.runners.TransformHierarchy.Node;
-import org.apache.beam.sdk.state.MapState;
-import org.apache.beam.sdk.state.SetState;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.Combine.GroupedValues;
 import org.apache.beam.sdk.transforms.Create;
@@ -121,8 +117,6 @@ import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.WithKeys;
 import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.InstanceBuilder;
@@ -135,12 +129,10 @@ import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollection.IsBounded;
 import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PCollectionViews;
 import org.apache.beam.sdk.values.PDone;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.sdk.values.ValueWithRecordId;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import org.joda.time.DateTimeUtils;
@@ -333,24 +325,6 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
                 new StreamingFnApiCreateOverrideFactory()));
       }
       overridesBuilder
-          // Support Splittable DoFn for now only in streaming mode.
-          // The order of the following overrides is important because they are applied in order.
-
-          // By default Dataflow runner replaces single-output ParDo with a ParDoSingle override.
-          // However, we want a different expansion for single-output splittable ParDo.
-          .add(
-              PTransformOverride.of(
-                  PTransformMatchers.splittableParDoSingle(),
-                  new ReflectiveOneToOneOverrideFactory(
-                      SplittableParDoOverrides.ParDoSingleViaMulti.class, this)))
-          .add(
-              PTransformOverride.of(
-                  PTransformMatchers.splittableParDoMulti(),
-                  new SplittableParDoOverrides.SplittableParDoOverrideFactory()))
-          .add(
-              PTransformOverride.of(
-                  PTransformMatchers.writeWithRunnerDeterminedSharding(),
-                  new StreamingShardedWriteFactory(options)))
           .add(
               // Streaming Bounded Read is implemented in terms of Streaming Unbounded Read, and
               // must precede it
@@ -376,29 +350,34 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
               PTransformOverride.of(
                   PTransformMatchers.stateOrTimerParDoSingle(),
                   BatchStatefulParDoOverrides.singleOutputOverrideFactory()))
+
+          .add(
+              PTransformOverride.of(
+                  PTransformMatchers.classEqualTo(Combine.GloballyAsSingletonView.class),
+                  new BatchCombineGloballyAsSingletonViewFactory(this)))
           .add(
               PTransformOverride.of(
-                  PTransformMatchers.createViewWithViewFn(PCollectionViews.MapViewFn.class),
+                  PTransformMatchers.classEqualTo(View.AsMap.class),
                   new ReflectiveOneToOneOverrideFactory(
                       BatchViewOverrides.BatchViewAsMap.class, this)))
           .add(
               PTransformOverride.of(
-                  PTransformMatchers.createViewWithViewFn(PCollectionViews.MultimapViewFn.class),
+                  PTransformMatchers.classEqualTo(View.AsMultimap.class),
                   new ReflectiveOneToOneOverrideFactory(
                       BatchViewOverrides.BatchViewAsMultimap.class, this)))
           .add(
               PTransformOverride.of(
-                  PTransformMatchers.createViewWithViewFn(PCollectionViews.SingletonViewFn.class),
+                  PTransformMatchers.classEqualTo(View.AsSingleton.class),
                   new ReflectiveOneToOneOverrideFactory(
                       BatchViewOverrides.BatchViewAsSingleton.class, this)))
           .add(
               PTransformOverride.of(
-                  PTransformMatchers.createViewWithViewFn(PCollectionViews.ListViewFn.class),
+                  PTransformMatchers.classEqualTo(View.AsList.class),
                   new ReflectiveOneToOneOverrideFactory(
                       BatchViewOverrides.BatchViewAsList.class, this)))
           .add(
               PTransformOverride.of(
-                  PTransformMatchers.createViewWithViewFn(PCollectionViews.IterableViewFn.class),
+                  PTransformMatchers.classEqualTo(View.AsIterable.class),
                   new ReflectiveOneToOneOverrideFactory(
                       BatchViewOverrides.BatchViewAsIterable.class, this)));
     }
@@ -1455,61 +1434,6 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
   }
 
   @VisibleForTesting
-  static class StreamingShardedWriteFactory<UserT, DestinationT, OutputT>
-      implements PTransformOverrideFactory<
-          PCollection<UserT>, PDone, WriteFiles<UserT, DestinationT, OutputT>> {
-    // We pick 10 as a a default, as it works well with the default number of workers started
-    // by Dataflow.
-    static final int DEFAULT_NUM_SHARDS = 10;
-    DataflowPipelineWorkerPoolOptions options;
-
-    StreamingShardedWriteFactory(PipelineOptions options) {
-      this.options = options.as(DataflowPipelineWorkerPoolOptions.class);
-    }
-
-    @Override
-    public PTransformReplacement<PCollection<UserT>, PDone> getReplacementTransform(
-        AppliedPTransform<PCollection<UserT>, PDone, WriteFiles<UserT, DestinationT, OutputT>>
-            transform) {
-      // By default, if numShards is not set WriteFiles will produce one file per bundle. In
-      // streaming, there are large numbers of small bundles, resulting in many tiny files.
-      // Instead we pick max workers * 2 to ensure full parallelism, but prevent too-many files.
-      // (current_num_workers * 2 might be a better choice, but that value is not easily available
-      // today).
-      // If the user does not set either numWorkers or maxNumWorkers, default to 10 shards.
-      int numShards;
-      if (options.getMaxNumWorkers() > 0) {
-        numShards = options.getMaxNumWorkers() * 2;
-      } else if (options.getNumWorkers() > 0) {
-        numShards = options.getNumWorkers() * 2;
-      } else {
-        numShards = DEFAULT_NUM_SHARDS;
-      }
-
-      try {
-        WriteFiles<UserT, DestinationT, OutputT> replacement =
-            WriteFiles.<UserT, DestinationT, OutputT>to(
-                WriteFilesTranslation.<UserT, DestinationT, OutputT>getSink(transform),
-                WriteFilesTranslation.<UserT, OutputT>getFormatFunction(transform));
-        if (WriteFilesTranslation.isWindowedWrites(transform)) {
-          replacement = replacement.withWindowedWrites();
-        }
-        return PTransformReplacement.of(
-            PTransformReplacements.getSingletonMainInput(transform),
-            replacement.withNumShards(numShards));
-      } catch (Exception e) {
-        throw new RuntimeException(e);
-      }
-    }
-
-    @Override
-    public Map<PValue, ReplacementOutput> mapOutputs(Map<TupleTag<?>, PValue> outputs,
-                                                     PDone newOutput) {
-      return Collections.emptyMap();
-    }
-  }
-
-  @VisibleForTesting
   static String getContainerImageForJob(DataflowPipelineOptions options) {
     String workerHarnessContainerImage = options.getWorkerHarnessContainerImage();
     if (!workerHarnessContainerImage.contains("IMAGE")) {
@@ -1522,39 +1446,4 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
       return workerHarnessContainerImage.replace("IMAGE", "beam-java-batch");
     }
   }
-
-  static void verifyStateSupported(DoFn<?, ?> fn) {
-    DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass());
-
-    for (DoFnSignature.StateDeclaration stateDecl : signature.stateDeclarations().values()) {
-
-      // https://issues.apache.org/jira/browse/BEAM-1474
-      if (stateDecl.stateType().isSubtypeOf(TypeDescriptor.of(MapState.class))) {
-        throw new UnsupportedOperationException(String.format(
-            "%s does not currently support %s",
-            DataflowRunner.class.getSimpleName(),
-            MapState.class.getSimpleName()
-        ));
-      }
-
-      // https://issues.apache.org/jira/browse/BEAM-1479
-      if (stateDecl.stateType().isSubtypeOf(TypeDescriptor.of(SetState.class))) {
-        throw new UnsupportedOperationException(String.format(
-            "%s does not currently support %s",
-            DataflowRunner.class.getSimpleName(),
-            SetState.class.getSimpleName()
-        ));
-      }
-    }
-  }
-
-  static void verifyStateSupportForWindowingStrategy(WindowingStrategy strategy) {
-    // https://issues.apache.org/jira/browse/BEAM-2507
-    if (!strategy.getWindowFn().isNonMerging()) {
-      throw new UnsupportedOperationException(
-          String.format(
-              "%s does not currently support state or timers with merging windows",
-              DataflowRunner.class.getSimpleName()));
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/SplittableParDoOverrides.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/SplittableParDoOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/SplittableParDoOverrides.java
deleted file mode 100644
index fc010f8..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/SplittableParDoOverrides.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow;
-
-import java.util.Map;
-import org.apache.beam.runners.core.construction.ForwardingPTransform;
-import org.apache.beam.runners.core.construction.PTransformReplacements;
-import org.apache.beam.runners.core.construction.ReplacementOutputs;
-import org.apache.beam.runners.core.construction.SplittableParDo;
-import org.apache.beam.sdk.runners.AppliedPTransform;
-import org.apache.beam.sdk.runners.PTransformOverrideFactory;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.TupleTagList;
-
-/** Transform overrides for supporting {@link SplittableParDo} in the Dataflow runner. */
-class SplittableParDoOverrides {
-  static class ParDoSingleViaMulti<InputT, OutputT>
-      extends ForwardingPTransform<PCollection<? extends InputT>, PCollection<OutputT>> {
-    private final ParDo.SingleOutput<InputT, OutputT> original;
-
-    public ParDoSingleViaMulti(
-        DataflowRunner ignored, ParDo.SingleOutput<InputT, OutputT> original) {
-      this.original = original;
-    }
-
-    @Override
-    protected PTransform<PCollection<? extends InputT>, PCollection<OutputT>> delegate() {
-      return original;
-    }
-
-    @Override
-    public PCollection<OutputT> expand(PCollection<? extends InputT> input) {
-      TupleTag<OutputT> mainOutput = new TupleTag<>();
-      return input.apply(original.withOutputTags(mainOutput, TupleTagList.empty())).get(mainOutput);
-    }
-  }
-
-  static class SplittableParDoOverrideFactory<InputT, OutputT, RestrictionT>
-      implements PTransformOverrideFactory<
-          PCollection<InputT>, PCollectionTuple, ParDo.MultiOutput<InputT, OutputT>> {
-    @Override
-    public PTransformReplacement<PCollection<InputT>, PCollectionTuple> getReplacementTransform(
-        AppliedPTransform<PCollection<InputT>, PCollectionTuple, ParDo.MultiOutput<InputT, OutputT>>
-            appliedTransform) {
-      return PTransformReplacement.of(
-          PTransformReplacements.getSingletonMainInput(appliedTransform),
-          SplittableParDo.forJavaParDo(appliedTransform.getTransform()));
-    }
-
-    @Override
-    public Map<PValue, ReplacementOutput> mapOutputs(
-        Map<TupleTag<?>, PValue> outputs, PCollectionTuple newOutput) {
-      return ReplacementOutputs.tagged(outputs, newOutput);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java
index 1853248..6c385d7 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java
@@ -42,12 +42,12 @@ import org.apache.beam.sdk.values.PCollectionView;
 class StreamingViewOverrides {
   static class StreamingCreatePCollectionViewFactory<ElemT, ViewT>
       extends SingleInputOutputOverrideFactory<
-          PCollection<ElemT>, PCollection<ElemT>, CreatePCollectionView<ElemT, ViewT>> {
+          PCollection<ElemT>, PCollectionView<ViewT>, CreatePCollectionView<ElemT, ViewT>> {
     @Override
-    public PTransformReplacement<PCollection<ElemT>, PCollection<ElemT>>
+    public PTransformReplacement<PCollection<ElemT>, PCollectionView<ViewT>>
         getReplacementTransform(
             AppliedPTransform<
-                    PCollection<ElemT>, PCollection<ElemT>, CreatePCollectionView<ElemT, ViewT>>
+                    PCollection<ElemT>, PCollectionView<ViewT>, CreatePCollectionView<ElemT, ViewT>>
                 transform) {
       StreamingCreatePCollectionView<ElemT, ViewT> streamingView =
           new StreamingCreatePCollectionView<>(transform.getTransform().getView());
@@ -56,7 +56,7 @@ class StreamingViewOverrides {
     }
 
     private static class StreamingCreatePCollectionView<ElemT, ViewT>
-        extends PTransform<PCollection<ElemT>, PCollection<ElemT>> {
+        extends PTransform<PCollection<ElemT>, PCollectionView<ViewT>> {
       private final PCollectionView<ViewT> view;
 
       private StreamingCreatePCollectionView(PCollectionView<ViewT> view) {
@@ -64,7 +64,7 @@ class StreamingViewOverrides {
       }
 
       @Override
-      public PCollection<ElemT> expand(PCollection<ElemT> input) {
+      public PCollectionView<ViewT> expand(PCollection<ElemT> input) {
         return input
             .apply(Combine.globally(new Concatenate<ElemT>()).withoutDefaults())
             .apply(ParDo.of(StreamingPCollectionViewWriterFn.create(view, input.getCoder())))

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java
index 06ed1e0..a7452b2 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java
@@ -22,7 +22,6 @@ import java.util.Map;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.runners.dataflow.util.OutputReference;
 import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -37,8 +36,7 @@ import org.apache.beam.sdk.values.TupleTag;
  * A {@link TransformTranslator} knows how to translate a particular subclass of {@link PTransform}
  * for the Cloud Dataflow service. It does so by mutating the {@link TranslationContext}.
  */
-@Internal
-public interface TransformTranslator<TransformT extends PTransform> {
+interface TransformTranslator<TransformT extends PTransform> {
   void translate(TransformT transform, TranslationContext context);
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PropertyNames.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PropertyNames.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PropertyNames.java
index 55e0c4e..f82f1f1 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PropertyNames.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PropertyNames.java
@@ -63,5 +63,4 @@ public class PropertyNames {
   public static final String USES_KEYED_STATE = "uses_keyed_state";
   public static final String VALUE = "value";
   public static final String DISPLAY_DATA = "display_data";
-  public static final String RESTRICTION_CODER = "restriction_coder";
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/TimeUtil.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/TimeUtil.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/TimeUtil.java
index 172dc6e..bff379f 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/TimeUtil.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/TimeUtil.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.runners.dataflow.util;
 
-import com.google.common.base.Strings;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import javax.annotation.Nullable;
@@ -99,19 +98,26 @@ public final class TimeUtil {
     int hour = Integer.valueOf(matcher.group(4));
     int minute = Integer.valueOf(matcher.group(5));
     int second = Integer.valueOf(matcher.group(6));
-    int millis = computeMillis(matcher.group(7));
+    int millis = 0;
+
+    String frac = matcher.group(7);
+    if (frac != null) {
+      int fracs = Integer.valueOf(frac);
+      if (frac.length() == 3) {  // millisecond resolution
+        millis = fracs;
+      } else if (frac.length() == 6) {  // microsecond resolution
+        millis = fracs / 1000;
+      } else if (frac.length() == 9) {  // nanosecond resolution
+        millis = fracs / 1000000;
+      } else {
+        return null;
+      }
+    }
 
     return new DateTime(year, month, day, hour, minute, second, millis,
         ISOChronology.getInstanceUTC()).toInstant();
   }
 
-  private static int computeMillis(String frac) {
-    if (frac == null) {
-      return 0;
-    }
-    return Integer.valueOf(frac.length() > 3 ? frac.substring(0, 3) : Strings.padEnd(frac, 3, '0'));
-  }
-
   /**
    * Converts a {@link ReadableDuration} into a Dataflow API duration string.
    */

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 43b2788..89dc2d5 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -18,14 +18,11 @@
 package org.apache.beam.runners.dataflow;
 
 import static org.apache.beam.runners.dataflow.util.Structs.getString;
-import static org.apache.beam.sdk.util.StringUtils.jsonStringToByteArray;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasEntry;
 import static org.hamcrest.Matchers.hasKey;
-import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
@@ -69,22 +66,17 @@ import java.util.Set;
 import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.JobSpecification;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
-import org.apache.beam.runners.dataflow.util.CloudObject;
-import org.apache.beam.runners.dataflow.util.CloudObjects;
-import org.apache.beam.runners.dataflow.util.DoFnInfo;
 import org.apache.beam.runners.dataflow.util.OutputReference;
 import org.apache.beam.runners.dataflow.util.PropertyNames;
 import org.apache.beam.runners.dataflow.util.Structs;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
 import org.apache.beam.sdk.extensions.gcp.storage.GcsPathValidator;
 import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.io.range.OffsetRange;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.ValueProvider;
@@ -99,12 +91,7 @@ import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.GcsUtil;
-import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.util.gcsfs.GcsPath;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
@@ -113,8 +100,6 @@ import org.apache.beam.sdk.values.PDone;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
 import org.apache.beam.sdk.values.WindowingStrategy;
-import org.hamcrest.Matchers;
-import org.joda.time.Duration;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
@@ -911,68 +896,6 @@ public class DataflowPipelineTranslatorTest implements Serializable {
         not(equalTo("true")));
   }
 
-  /**
-   * Smoke test to fail fast if translation of a splittable ParDo
-   * in streaming breaks.
-   */
-  @Test
-  public void testStreamingSplittableParDoTranslation() throws Exception {
-    DataflowPipelineOptions options = buildPipelineOptions();
-    DataflowRunner runner = DataflowRunner.fromOptions(options);
-    options.setStreaming(true);
-    DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);
-
-    Pipeline pipeline = Pipeline.create(options);
-
-    PCollection<String> windowedInput = pipeline
-        .apply(Create.of("a"))
-        .apply(Window.<String>into(FixedWindows.of(Duration.standardMinutes(1))));
-    windowedInput.apply(ParDo.of(new TestSplittableFn()));
-
-    runner.replaceTransforms(pipeline);
-
-    Job job =
-        translator
-            .translate(
-                pipeline,
-                runner,
-                Collections.<DataflowPackage>emptyList())
-            .getJob();
-
-    // The job should contain a SplittableParDo.ProcessKeyedElements step, translated as
-    // "SplittableProcessKeyed".
-
-    List<Step> steps = job.getSteps();
-    Step processKeyedStep = null;
-    for (Step step : steps) {
-      if (step.getKind().equals("SplittableProcessKeyed")) {
-        assertNull(processKeyedStep);
-        processKeyedStep = step;
-      }
-    }
-    assertNotNull(processKeyedStep);
-
-    @SuppressWarnings({"unchecked", "rawtypes"})
-    DoFnInfo<String, Integer> fnInfo =
-        (DoFnInfo<String, Integer>)
-            SerializableUtils.deserializeFromByteArray(
-                jsonStringToByteArray(
-                    Structs.getString(
-                        processKeyedStep.getProperties(), PropertyNames.SERIALIZED_FN)),
-                "DoFnInfo");
-    assertThat(fnInfo.getDoFn(), instanceOf(TestSplittableFn.class));
-    assertThat(
-        fnInfo.getWindowingStrategy().getWindowFn(),
-        Matchers.<WindowFn>equalTo(FixedWindows.of(Duration.standardMinutes(1))));
-    Coder<?> restrictionCoder =
-        CloudObjects.coderFromCloudObject(
-            (CloudObject)
-                Structs.getObject(
-                    processKeyedStep.getProperties(), PropertyNames.RESTRICTION_CODER));
-
-    assertEquals(SerializableCoder.of(OffsetRange.class), restrictionCoder);
-  }
-
   @Test
   public void testToSingletonTranslationWithIsmSideInput() throws Exception {
     // A "change detector" test that makes sure the translation
@@ -997,15 +920,15 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     assertAllStepOutputsHaveUniqueIds(job);
 
     List<Step> steps = job.getSteps();
-    assertEquals(9, steps.size());
+    assertEquals(5, steps.size());
 
     @SuppressWarnings("unchecked")
     List<Map<String, Object>> toIsmRecordOutputs =
-        (List<Map<String, Object>>) steps.get(7).getProperties().get(PropertyNames.OUTPUT_INFO);
+        (List<Map<String, Object>>) steps.get(3).getProperties().get(PropertyNames.OUTPUT_INFO);
     assertTrue(
         Structs.getBoolean(Iterables.getOnlyElement(toIsmRecordOutputs), "use_indexed_format"));
 
-    Step collectionToSingletonStep = steps.get(8);
+    Step collectionToSingletonStep = steps.get(4);
     assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind());
   }
 
@@ -1167,16 +1090,4 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     assertTrue(String.format("Found duplicate output ids %s", outputIds),
         outputIds.size() == 0);
   }
-
-  private static class TestSplittableFn extends DoFn<String, Integer> {
-    @ProcessElement
-    public void process(ProcessContext c, OffsetRangeTracker tracker) {
-      // noop
-    }
-
-    @GetInitialRestriction
-    public OffsetRange getInitialRange(String element) {
-      return null;
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index 94985f8..8f10b18 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -23,7 +23,6 @@ import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasItem;
 import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.not;
 import static org.hamcrest.Matchers.startsWith;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -50,7 +49,6 @@ import com.google.common.collect.ImmutableList;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.io.Serializable;
 import java.net.URL;
 import java.net.URLClassLoader;
 import java.nio.channels.FileChannel;
@@ -64,60 +62,36 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.regex.Pattern;
-import org.apache.beam.runners.dataflow.DataflowRunner.StreamingShardedWriteFactory;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
-import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.Pipeline.PipelineVisitor;
 import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.extensions.gcp.auth.NoopCredentialFactory;
 import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
 import org.apache.beam.sdk.extensions.gcp.storage.NoopPathValidator;
-import org.apache.beam.sdk.io.DynamicFileDestinations;
-import org.apache.beam.sdk.io.FileBasedSink;
 import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.io.WriteFiles;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.options.StreamingOptions;
 import org.apache.beam.sdk.options.ValueProvider;
-import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
-import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.runners.TransformHierarchy.Node;
-import org.apache.beam.sdk.state.MapState;
-import org.apache.beam.sdk.state.SetState;
-import org.apache.beam.sdk.state.StateSpec;
-import org.apache.beam.sdk.state.StateSpecs;
-import org.apache.beam.sdk.state.ValueState;
 import org.apache.beam.sdk.testing.ExpectedLogs;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.SerializableFunctions;
-import org.apache.beam.sdk.transforms.windowing.Sessions;
-import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.GcsUtil;
 import org.apache.beam.sdk.util.ReleaseInfo;
 import org.apache.beam.sdk.util.gcsfs.GcsPath;
-import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
-import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TimestampedValue;
-import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import org.hamcrest.Description;
 import org.hamcrest.Matchers;
 import org.hamcrest.TypeSafeMatcher;
-import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.Before;
 import org.junit.Rule;
@@ -133,11 +107,9 @@ import org.mockito.stubbing.Answer;
 
 /**
  * Tests for the {@link DataflowRunner}.
- *
- * <p>Implements {@link Serializable} because it is caught in closures.
  */
 @RunWith(JUnit4.class)
-public class DataflowRunnerTest implements Serializable {
+public class DataflowRunnerTest {
 
   private static final String VALID_STAGING_BUCKET = "gs://valid-bucket/staging";
   private static final String VALID_TEMP_BUCKET = "gs://valid-bucket/temp";
@@ -147,12 +119,15 @@ public class DataflowRunnerTest implements Serializable {
   private static final String PROJECT_ID = "some-project";
   private static final String REGION_ID = "some-region-1";
 
-  @Rule public transient TemporaryFolder tmpFolder = new TemporaryFolder();
-  @Rule public transient ExpectedException thrown = ExpectedException.none();
-  @Rule public transient ExpectedLogs expectedLogs = ExpectedLogs.none(DataflowRunner.class);
+  @Rule
+  public TemporaryFolder tmpFolder = new TemporaryFolder();
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+  @Rule
+  public ExpectedLogs expectedLogs = ExpectedLogs.none(DataflowRunner.class);
 
-  private transient Dataflow.Projects.Locations.Jobs mockJobs;
-  private transient GcsUtil mockGcsUtil;
+  private Dataflow.Projects.Locations.Jobs mockJobs;
+  private GcsUtil mockGcsUtil;
 
   // Asserts that the given Job has all expected fields set.
   private static void assertValidJob(Job job) {
@@ -848,6 +823,7 @@ public class DataflowRunnerTest implements Serializable {
     DataflowRunner.fromOptions(options);
   }
 
+
   @Test
   public void testValidProfileLocation() throws IOException {
     DataflowPipelineOptions options = buildPipelineOptions();
@@ -1015,71 +991,6 @@ public class DataflowRunnerTest implements Serializable {
     assertTrue(transform.translated);
   }
 
-  private void verifyMapStateUnsupported(PipelineOptions options) throws Exception {
-    Pipeline p = Pipeline.create(options);
-    p.apply(Create.of(KV.of(13, 42)))
-        .apply(
-            ParDo.of(
-                new DoFn<KV<Integer, Integer>, Void>() {
-                  @StateId("fizzle")
-                  private final StateSpec<MapState<Void, Void>> voidState = StateSpecs.map();
-
-                  @ProcessElement
-                  public void process() {}
-                }));
-
-    thrown.expectMessage("MapState");
-    thrown.expect(UnsupportedOperationException.class);
-    p.run();
-  }
-
-  @Test
-  public void testMapStateUnsupportedInBatch() throws Exception {
-    PipelineOptions options = buildPipelineOptions();
-    options.as(StreamingOptions.class).setStreaming(false);
-    verifyMapStateUnsupported(options);
-  }
-
-  @Test
-  public void testMapStateUnsupportedInStreaming() throws Exception {
-    PipelineOptions options = buildPipelineOptions();
-    options.as(StreamingOptions.class).setStreaming(true);
-    verifyMapStateUnsupported(options);
-  }
-
-  private void verifySetStateUnsupported(PipelineOptions options) throws Exception {
-    Pipeline p = Pipeline.create(options);
-    p.apply(Create.of(KV.of(13, 42)))
-        .apply(
-            ParDo.of(
-                new DoFn<KV<Integer, Integer>, Void>() {
-                  @StateId("fizzle")
-                  private final StateSpec<SetState<Void>> voidState = StateSpecs.set();
-
-                  @ProcessElement
-                  public void process() {}
-                }));
-
-    thrown.expectMessage("SetState");
-    thrown.expect(UnsupportedOperationException.class);
-    p.run();
-  }
-
-  @Test
-  public void testSetStateUnsupportedInBatch() throws Exception {
-    PipelineOptions options = buildPipelineOptions();
-    options.as(StreamingOptions.class).setStreaming(false);
-    Pipeline p = Pipeline.create(options);
-    verifySetStateUnsupported(options);
-  }
-
-  @Test
-  public void testSetStateUnsupportedInStreaming() throws Exception {
-    PipelineOptions options = buildPipelineOptions();
-    options.as(StreamingOptions.class).setStreaming(true);
-    verifySetStateUnsupported(options);
-  }
-
   /** Records all the composite transforms visited within the Pipeline. */
   private static class CompositeTransformRecorder extends PipelineVisitor.Defaults {
     private List<PTransform<?, ?>> transforms = new ArrayList<>();
@@ -1136,8 +1047,8 @@ public class DataflowRunnerTest implements Serializable {
   }
 
   /**
-   * Tests that the {@link DataflowRunner} with {@code --templateLocation} returns normally when the
-   * runner is successfully run.
+   * Tests that the {@link DataflowRunner} with {@code --templateLocation} returns normally
+   * when the runner issuccessfully run.
    */
   @Test
   public void testTemplateRunnerFullCompletion() throws Exception {
@@ -1216,89 +1127,4 @@ public class DataflowRunnerTest implements Serializable {
     assertThat(
         getContainerImageForJob(options), equalTo("gcr.io/java/foo"));
   }
-
-  @Test
-  public void testStreamingWriteWithNoShardingReturnsNewTransform() {
-    PipelineOptions options = TestPipeline.testingPipelineOptions();
-    options.as(DataflowPipelineWorkerPoolOptions.class).setMaxNumWorkers(10);
-    testStreamingWriteOverride(options, 20);
-  }
-
-  @Test
-  public void testStreamingWriteWithNoShardingReturnsNewTransformMaxWorkersUnset() {
-    PipelineOptions options = TestPipeline.testingPipelineOptions();
-    testStreamingWriteOverride(options, StreamingShardedWriteFactory.DEFAULT_NUM_SHARDS);
-  }
-
-  private void verifyMergingStatefulParDoRejected(PipelineOptions options) throws Exception {
-    Pipeline p = Pipeline.create(options);
-
-    p.apply(Create.of(KV.of(13, 42)))
-        .apply(Window.<KV<Integer, Integer>>into(Sessions.withGapDuration(Duration.millis(1))))
-        .apply(ParDo.of(new DoFn<KV<Integer, Integer>, Void>() {
-          @StateId("fizzle")
-          private final StateSpec<ValueState<Void>> voidState = StateSpecs.value();
-
-          @ProcessElement
-          public void process() {}
-        }));
-
-    thrown.expectMessage("merging");
-    thrown.expect(UnsupportedOperationException.class);
-    p.run();
-  }
-
-  @Test
-  public void testMergingStatefulRejectedInStreaming() throws Exception {
-    PipelineOptions options = buildPipelineOptions();
-    options.as(StreamingOptions.class).setStreaming(true);
-    verifyMergingStatefulParDoRejected(options);
-  }
-
-  @Test
-  public void testMergingStatefulRejectedInBatch() throws Exception {
-    PipelineOptions options = buildPipelineOptions();
-    options.as(StreamingOptions.class).setStreaming(false);
-    verifyMergingStatefulParDoRejected(options);
-  }
-
-  private void testStreamingWriteOverride(PipelineOptions options, int expectedNumShards) {
-    TestPipeline p = TestPipeline.fromOptions(options);
-
-    StreamingShardedWriteFactory<Object, Void, Object> factory =
-        new StreamingShardedWriteFactory<>(p.getOptions());
-    WriteFiles<Object, Void, Object> original =
-        WriteFiles.to(new TestSink(tmpFolder.toString()), SerializableFunctions.identity());
-    PCollection<Object> objs = (PCollection) p.apply(Create.empty(VoidCoder.of()));
-    AppliedPTransform<PCollection<Object>, PDone, WriteFiles<Object, Void, Object>>
-        originalApplication =
-            AppliedPTransform.of(
-                "writefiles",
-                objs.expand(),
-                Collections.<TupleTag<?>, PValue>emptyMap(),
-                original,
-                p);
-
-    WriteFiles<Object, Void, Object> replacement =
-        (WriteFiles<Object, Void, Object>)
-            factory.getReplacementTransform(originalApplication).getTransform();
-    assertThat(replacement, not(equalTo((Object) original)));
-    assertThat(replacement.getNumShards().get(), equalTo(expectedNumShards));
-  }
-
-  private static class TestSink extends FileBasedSink<Object, Void> {
-    @Override
-    public void validate(PipelineOptions options) {}
-
-    TestSink(String tmpFolder) {
-      super(
-          StaticValueProvider.of(FileSystems.matchNewResource(tmpFolder, true)),
-          DynamicFileDestinations.constant(null));
-    }
-
-    @Override
-    public WriteOperation<Object, Void> createWriteOperation() {
-      throw new IllegalArgumentException("Should not be used");
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/TimeUtilTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/TimeUtilTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/TimeUtilTest.java
index 1ac9fab..e0785d4 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/TimeUtilTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/TimeUtilTest.java
@@ -47,14 +47,8 @@ public final class TimeUtilTest {
     assertEquals(new Instant(1), fromCloudTime("1970-01-01T00:00:00.001001Z"));
     assertEquals(new Instant(1), fromCloudTime("1970-01-01T00:00:00.001000000Z"));
     assertEquals(new Instant(1), fromCloudTime("1970-01-01T00:00:00.001000001Z"));
-    assertEquals(new Instant(0), fromCloudTime("1970-01-01T00:00:00.0Z"));
-    assertEquals(new Instant(0), fromCloudTime("1970-01-01T00:00:00.00Z"));
-    assertEquals(new Instant(420), fromCloudTime("1970-01-01T00:00:00.42Z"));
-    assertEquals(new Instant(300), fromCloudTime("1970-01-01T00:00:00.3Z"));
-    assertEquals(new Instant(20), fromCloudTime("1970-01-01T00:00:00.02Z"));
     assertNull(fromCloudTime(""));
     assertNull(fromCloudTime("1970-01-01T00:00:00"));
-    assertNull(fromCloudTime("1970-01-01T00:00:00.1e3Z"));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/pom.xml
----------------------------------------------------------------------
diff --git a/runners/pom.xml b/runners/pom.xml
index b00ba9c..38aada8 100644
--- a/runners/pom.xml
+++ b/runners/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-parent</artifactId>
-    <version>2.2.0-SNAPSHOT</version>
+    <version>2.1.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/spark/pom.xml
----------------------------------------------------------------------
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index 7f70204..ddb4aca 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-runners-parent</artifactId>
-    <version>2.2.0-SNAPSHOT</version>
+    <version>2.1.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 
@@ -34,6 +34,8 @@
   <properties>
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
     <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+    <spark.version>1.6.3</spark.version>
+    <hadoop.version>2.2.0</hadoop.version>
     <kafka.version>0.9.0.1</kafka.version>
     <jackson.version>2.4.4</jackson.version>
     <dropwizard.metrics.version>3.1.2</dropwizard.metrics.version>
@@ -101,27 +103,6 @@
                   <threadCount>4</threadCount>
                 </configuration>
               </execution>
-              <execution>
-                <id>streaming-tests</id>
-                <phase>test</phase>
-                <goals>
-                  <goal>test</goal>
-                </goals>
-                <configuration>
-                  <groups>
-                    org.apache.beam.runners.spark.StreamingTest
-                  </groups>
-                  <systemPropertyVariables>
-                    <beamTestPipelineOptions>
-                      [
-                      "--runner=TestSparkRunner",
-                      "--forceStreaming=true",
-                      "--enableSparkMetricSinks=true"
-                      ]
-                    </beamTestPipelineOptions>
-                  </systemPropertyVariables>
-                </configuration>
-              </execution>
             </executions>
           </plugin>
         </plugins>
@@ -133,33 +114,31 @@
     <dependency>
       <groupId>org.apache.spark</groupId>
       <artifactId>spark-core_2.10</artifactId>
+      <version>${spark.version}</version>
       <scope>provided</scope>
     </dependency>
     <dependency>
       <groupId>org.apache.spark</groupId>
       <artifactId>spark-streaming_2.10</artifactId>
+      <version>${spark.version}</version>
       <scope>provided</scope>
     </dependency>
     <dependency>
       <groupId>org.apache.spark</groupId>
       <artifactId>spark-network-common_2.10</artifactId>
+      <version>${spark.version}</version>
       <scope>provided</scope>
     </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-common</artifactId>
+      <version>${hadoop.version}</version>
       <scope>provided</scope>
-      <exclusions>
-        <!-- Fix build on JDK-9 -->
-        <exclusion>
-          <groupId>jdk.tools</groupId>
-          <artifactId>jdk.tools</artifactId>
-        </exclusion>
-      </exclusions>
     </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-mapreduce-client-core</artifactId>
+      <version>${hadoop.version}</version>
       <scope>provided</scope>
     </dependency>
     <dependency>
@@ -216,11 +195,6 @@
     <dependency>
       <groupId>org.apache.commons</groupId>
       <artifactId>commons-lang3</artifactId>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.commons</groupId>
-      <artifactId>commons-text</artifactId>
     </dependency>
     <dependency>
       <groupId>commons-io</groupId>
@@ -347,13 +321,6 @@
       <type>test-jar</type>
       <scope>test</scope>
     </dependency>
-
-    <dependency>
-      <groupId>org.apache.beam</groupId>
-      <artifactId>beam-runners-core-java</artifactId>
-      <type>test-jar</type>
-      <scope>test</scope>
-    </dependency>
   </dependencies>
 
   <build>
@@ -393,6 +360,27 @@
                 </systemPropertyVariables>
               </configuration>
             </execution>
+            <execution>
+              <id>streaming-tests</id>
+              <phase>test</phase>
+              <goals>
+                <goal>test</goal>
+              </goals>
+              <configuration>
+                <groups>
+                  org.apache.beam.runners.spark.StreamingTest
+                </groups>
+                <systemPropertyVariables>
+                  <beamTestPipelineOptions>
+                    [
+                    "--runner=TestSparkRunner",
+                    "--forceStreaming=true",
+                    "--enableSparkMetricSinks=true"
+                    ]
+                  </beamTestPipelineOptions>
+                </systemPropertyVariables>
+              </configuration>
+            </execution>
           </executions>
         </plugin>
         <plugin>

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java
index 6972acb..d75c955 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java
@@ -35,7 +35,8 @@ import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
-import org.apache.commons.text.WordUtils;
+import org.apache.commons.lang3.text.WordUtils;
+
 
 /**
  * Pipeline visitor for translating a Beam pipeline into equivalent Spark operations.

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
index 595521f..9e2426e 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
@@ -26,7 +26,6 @@ import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-import org.apache.beam.runners.core.construction.TransformInputs;
 import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator;
 import org.apache.beam.runners.spark.io.CreateStream;
 import org.apache.beam.runners.spark.metrics.AggregatorMetricSource;
@@ -171,7 +170,7 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
       }
 
       // register Watermarks listener to broadcast the advanced WMs.
-      jssc.addStreamingListener(new JavaStreamingListenerWrapper(new WatermarksListener()));
+      jssc.addStreamingListener(new JavaStreamingListenerWrapper(new WatermarksListener(jssc)));
 
       // The reason we call initAccumulators here even though it is called in
       // SparkRunnerStreamingContextFactory is because the factory is not called when resuming
@@ -360,12 +359,10 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
 
     protected boolean shouldDefer(TransformHierarchy.Node node) {
       // if the input is not a PCollection, or it is but with non merging windows, don't defer.
-      Collection<PValue> nonAdditionalInputs =
-          TransformInputs.nonAdditionalInputs(node.toAppliedPTransform(getPipeline()));
-      if (nonAdditionalInputs.size() != 1) {
+      if (node.getInputs().size() != 1) {
         return false;
       }
-      PValue input = Iterables.getOnlyElement(nonAdditionalInputs);
+      PValue input = Iterables.getOnlyElement(node.getInputs().values());
       if (!(input instanceof PCollection)
           || ((PCollection) input).getWindowingStrategy().getWindowFn().isNonMerging()) {
         return false;

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
index a13a3b1..eccee57 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
@@ -169,7 +169,7 @@ public final class TestSparkRunner extends PipelineRunner<SparkPipelineResult> {
     result.waitUntilFinish(Duration.millis(batchDurationMillis));
     do {
       SparkTimerInternals sparkTimerInternals =
-          SparkTimerInternals.global(GlobalWatermarkHolder.get(batchDurationMillis));
+          SparkTimerInternals.global(GlobalWatermarkHolder.get());
       sparkTimerInternals.advanceWatermark();
       globalWatermark = sparkTimerInternals.currentInputWatermarkTime();
       // let another batch-interval period of execution, just to reason about WM propagation.

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
index 1385e07..be4f3f6 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
@@ -104,15 +104,13 @@ public class SparkGroupAlsoByWindowViaWindowSet {
 
   public static <K, InputT, W extends BoundedWindow>
       JavaDStream<WindowedValue<KV<K, Iterable<InputT>>>> groupAlsoByWindow(
-          final JavaDStream<WindowedValue<KV<K, Iterable<WindowedValue<InputT>>>>> inputDStream,
+          JavaDStream<WindowedValue<KV<K, Iterable<WindowedValue<InputT>>>>> inputDStream,
           final Coder<K> keyCoder,
           final Coder<WindowedValue<InputT>> wvCoder,
           final WindowingStrategy<?, W> windowingStrategy,
           final SparkRuntimeContext runtimeContext,
           final List<Integer> sourceIds) {
 
-    final long batchDurationMillis =
-        runtimeContext.getPipelineOptions().as(SparkPipelineOptions.class).getBatchIntervalMillis();
     final IterableCoder<WindowedValue<InputT>> itrWvCoder = IterableCoder.of(wvCoder);
     final Coder<InputT> iCoder = ((FullWindowedValueCoder<InputT>) wvCoder).getValueCoder();
     final Coder<? extends BoundedWindow> wCoder =
@@ -241,7 +239,7 @@ public class SparkGroupAlsoByWindowViaWindowSet {
 
                       SparkStateInternals<K> stateInternals;
                       SparkTimerInternals timerInternals = SparkTimerInternals.forStreamFromSources(
-                          sourceIds, GlobalWatermarkHolder.get(batchDurationMillis));
+                          sourceIds, GlobalWatermarkHolder.get());
                       // get state(internals) per key.
                       if (prevStateAndTimersOpt.isEmpty()) {
                         // no previous state.

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java
index a68da55..107915f 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java
@@ -34,6 +34,7 @@ import org.apache.beam.runners.spark.coders.CoderHelpers;
 import org.apache.beam.runners.spark.util.GlobalWatermarkHolder.SparkWatermarks;
 import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.spark.broadcast.Broadcast;
 import org.joda.time.Instant;
 
 
@@ -57,10 +58,10 @@ public class SparkTimerInternals implements TimerInternals {
   /** Build the {@link TimerInternals} according to the feeding streams. */
   public static SparkTimerInternals forStreamFromSources(
       List<Integer> sourceIds,
-      Map<Integer, SparkWatermarks> watermarks) {
-    // if watermarks are invalid for the specific ids, use defaults.
-    if (watermarks == null || watermarks.isEmpty()
-        || Collections.disjoint(sourceIds, watermarks.keySet())) {
+      @Nullable Broadcast<Map<Integer, SparkWatermarks>> broadcast) {
+    // if broadcast is invalid for the specific ids, use defaults.
+    if (broadcast == null || broadcast.getValue().isEmpty()
+        || Collections.disjoint(sourceIds, broadcast.getValue().keySet())) {
       return new SparkTimerInternals(
           BoundedWindow.TIMESTAMP_MIN_VALUE, BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(0));
     }
@@ -70,7 +71,7 @@ public class SparkTimerInternals implements TimerInternals {
     // synchronized processing time should clearly be synchronized.
     Instant synchronizedProcessingTime = null;
     for (Integer sourceId: sourceIds) {
-      SparkWatermarks sparkWatermarks = watermarks.get(sourceId);
+      SparkWatermarks sparkWatermarks = broadcast.getValue().get(sourceId);
       if (sparkWatermarks != null) {
         // keep slowest WMs.
         slowestLowWatermark = slowestLowWatermark.isBefore(sparkWatermarks.getLowWatermark())
@@ -93,9 +94,10 @@ public class SparkTimerInternals implements TimerInternals {
   }
 
   /** Build a global {@link TimerInternals} for all feeding streams.*/
-  public static SparkTimerInternals global(Map<Integer, SparkWatermarks> watermarks) {
-    return watermarks == null ? forStreamFromSources(Collections.<Integer>emptyList(), null)
-        : forStreamFromSources(Lists.newArrayList(watermarks.keySet()), watermarks);
+  public static SparkTimerInternals global(
+      @Nullable Broadcast<Map<Integer, SparkWatermarks>> broadcast) {
+    return broadcast == null ? forStreamFromSources(Collections.<Integer>emptyList(), null)
+        : forStreamFromSources(Lists.newArrayList(broadcast.getValue().keySet()), broadcast);
   }
 
   Collection<TimerData> getTimers() {

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
index 0c6c4d1..8102926 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
@@ -26,7 +26,6 @@ import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.Map;
 import java.util.Set;
-import org.apache.beam.runners.core.construction.TransformInputs;
 import org.apache.beam.runners.spark.SparkPipelineOptions;
 import org.apache.beam.runners.spark.coders.CoderHelpers;
 import org.apache.beam.sdk.Pipeline;
@@ -104,8 +103,7 @@ public class EvaluationContext {
 
   public <T extends PValue> T getInput(PTransform<T, ?> transform) {
     @SuppressWarnings("unchecked")
-    T input =
-        (T) Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(getCurrentTransform()));
+    T input = (T) Iterables.getOnlyElement(getInputs(transform).values());
     return input;
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index ac5e0cd..64aa35a 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -508,6 +508,50 @@ public final class TransformTranslator {
     };
   }
 
+  private static <T> TransformEvaluator<View.AsSingleton<T>> viewAsSingleton() {
+    return new TransformEvaluator<View.AsSingleton<T>>() {
+      @Override
+      public void evaluate(View.AsSingleton<T> transform, EvaluationContext context) {
+        Iterable<? extends WindowedValue<?>> iter =
+        context.getWindowedValues(context.getInput(transform));
+        PCollectionView<T> output = context.getOutput(transform);
+        Coder<Iterable<WindowedValue<?>>> coderInternal = output.getCoderInternal();
+
+        @SuppressWarnings("unchecked")
+        Iterable<WindowedValue<?>> iterCast =  (Iterable<WindowedValue<?>>) iter;
+
+        context.putPView(output, iterCast, coderInternal);
+      }
+
+      @Override
+      public String toNativeString() {
+        return "collect()";
+      }
+    };
+  }
+
+  private static <T> TransformEvaluator<View.AsIterable<T>> viewAsIter() {
+    return new TransformEvaluator<View.AsIterable<T>>() {
+      @Override
+      public void evaluate(View.AsIterable<T> transform, EvaluationContext context) {
+        Iterable<? extends WindowedValue<?>> iter =
+            context.getWindowedValues(context.getInput(transform));
+        PCollectionView<Iterable<T>> output = context.getOutput(transform);
+        Coder<Iterable<WindowedValue<?>>> coderInternal = output.getCoderInternal();
+
+        @SuppressWarnings("unchecked")
+        Iterable<WindowedValue<?>> iterCast =  (Iterable<WindowedValue<?>>) iter;
+
+        context.putPView(output, iterCast, coderInternal);
+      }
+
+      @Override
+      public String toNativeString() {
+        return "collect()";
+      }
+    };
+  }
+
   private static <ReadT, WriteT> TransformEvaluator<View.CreatePCollectionView<ReadT, WriteT>>
   createPCollView() {
     return new TransformEvaluator<View.CreatePCollectionView<ReadT, WriteT>>() {
@@ -516,7 +560,7 @@ public final class TransformTranslator {
                            EvaluationContext context) {
         Iterable<? extends WindowedValue<?>> iter =
             context.getWindowedValues(context.getInput(transform));
-        PCollectionView<WriteT> output = transform.getView();
+        PCollectionView<WriteT> output = context.getOutput(transform);
         Coder<Iterable<WindowedValue<?>>> coderInternal = output.getCoderInternal();
 
         @SuppressWarnings("unchecked")
@@ -601,8 +645,8 @@ public final class TransformTranslator {
     EVALUATORS.put(Combine.PerKey.class, combinePerKey());
     EVALUATORS.put(Flatten.PCollections.class, flattenPColl());
     EVALUATORS.put(Create.Values.class, create());
-//    EVALUATORS.put(View.AsSingleton.class, viewAsSingleton());
-//    EVALUATORS.put(View.AsIterable.class, viewAsIter());
+    EVALUATORS.put(View.AsSingleton.class, viewAsSingleton());
+    EVALUATORS.put(View.AsIterable.class, viewAsIter());
     EVALUATORS.put(View.CreatePCollectionView.class, createPCollView());
     EVALUATORS.put(Window.Assign.class, window());
     EVALUATORS.put(Reshuffle.class, reshuffle());

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java
index 2cb6f26..8b384d8 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java
@@ -21,43 +21,31 @@ package org.apache.beam.runners.spark.util;
 import static com.google.common.base.Preconditions.checkState;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.collect.Maps;
 import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import javax.annotation.Nonnull;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.spark.SparkEnv;
+import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.broadcast.Broadcast;
-import org.apache.spark.storage.BlockId;
-import org.apache.spark.storage.BlockManager;
-import org.apache.spark.storage.BlockResult;
-import org.apache.spark.storage.BlockStore;
-import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
 import org.apache.spark.streaming.api.java.JavaStreamingListener;
 import org.apache.spark.streaming.api.java.JavaStreamingListenerBatchCompleted;
 import org.joda.time.Instant;
-import scala.Option;
+
 
 /**
- * A {@link BlockStore} variable to hold the global watermarks for a micro-batch.
+ * A {@link Broadcast} variable to hold the global watermarks for a micro-batch.
  *
  * <p>For each source, holds a queue for the watermarks of each micro-batch that was read,
  * and advances the watermarks according to the queue (first-in-first-out).
  */
 public class GlobalWatermarkHolder {
-  private static final Map<Integer, Queue<SparkWatermarks>> sourceTimes = new HashMap<>();
-  private static final BlockId WATERMARKS_BLOCK_ID = BlockId.apply("broadcast_0WATERMARKS");
-
-  private static volatile Map<Integer, SparkWatermarks> driverWatermarks = null;
-  private static volatile LoadingCache<String, Map<Integer, SparkWatermarks>> watermarkCache = null;
+  // the broadcast is broadcasted to the workers.
+  private static volatile Broadcast<Map<Integer, SparkWatermarks>> broadcast = null;
+  // this should only live in the driver so transient.
+  private static final transient Map<Integer, Queue<SparkWatermarks>> sourceTimes = new HashMap<>();
 
   public static void add(int sourceId, SparkWatermarks sparkWatermarks) {
     Queue<SparkWatermarks> timesQueue = sourceTimes.get(sourceId);
@@ -83,48 +71,22 @@ public class GlobalWatermarkHolder {
    * Returns the {@link Broadcast} containing the {@link SparkWatermarks} mapped
    * to their sources.
    */
-  @SuppressWarnings("unchecked")
-  public static Map<Integer, SparkWatermarks> get(Long cacheInterval) {
-    if (driverWatermarks != null) {
-      // if we are executing in local mode simply return the local values.
-      return driverWatermarks;
-    } else {
-      if (watermarkCache == null) {
-        initWatermarkCache(cacheInterval);
-      }
-      try {
-        return watermarkCache.get("SINGLETON");
-      } catch (ExecutionException e) {
-        throw new RuntimeException(e);
-      }
-    }
-  }
-
-  private static synchronized void initWatermarkCache(Long batchDuration) {
-    if (watermarkCache == null) {
-      watermarkCache =
-          CacheBuilder.newBuilder()
-              // expire watermarks every half batch duration to ensure they update in every batch.
-              .expireAfterWrite(batchDuration / 2, TimeUnit.MILLISECONDS)
-              .build(new WatermarksLoader());
-    }
+  public static Broadcast<Map<Integer, SparkWatermarks>> get() {
+    return broadcast;
   }
 
   /**
    * Advances the watermarks to the next-in-line watermarks.
    * SparkWatermarks are monotonically increasing.
    */
-  @SuppressWarnings("unchecked")
-  public static void advance() {
-    synchronized (GlobalWatermarkHolder.class) {
-      BlockManager blockManager = SparkEnv.get().blockManager();
-
+  public static void advance(JavaSparkContext jsc) {
+    synchronized (GlobalWatermarkHolder.class){
       if (sourceTimes.isEmpty()) {
         return;
       }
 
       // update all sources' watermarks into the new broadcast.
-      Map<Integer, SparkWatermarks> newValues = new HashMap<>();
+      Map<Integer, SparkWatermarks> newBroadcast = new HashMap<>();
 
       for (Map.Entry<Integer, Queue<SparkWatermarks>> en: sourceTimes.entrySet()) {
         if (en.getValue().isEmpty()) {
@@ -137,22 +99,8 @@ public class GlobalWatermarkHolder {
         Instant currentLowWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
         Instant currentHighWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
         Instant currentSynchronizedProcessingTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
-
-        Option<BlockResult> currentOption = blockManager.getRemote(WATERMARKS_BLOCK_ID);
-        Map<Integer, SparkWatermarks> current;
-        if (currentOption.isDefined()) {
-          current = (Map<Integer, SparkWatermarks>) currentOption.get().data().next();
-        } else {
-          current = Maps.newHashMap();
-          blockManager.putSingle(
-              WATERMARKS_BLOCK_ID,
-              current,
-              StorageLevel.MEMORY_ONLY(),
-              true);
-        }
-
-        if (current.containsKey(sourceId)) {
-          SparkWatermarks currentTimes = current.get(sourceId);
+        if (broadcast != null && broadcast.getValue().containsKey(sourceId)) {
+          SparkWatermarks currentTimes = broadcast.getValue().get(sourceId);
           currentLowWatermark = currentTimes.getLowWatermark();
           currentHighWatermark = currentTimes.getHighWatermark();
           currentSynchronizedProcessingTime = currentTimes.getSynchronizedProcessingTime();
@@ -171,21 +119,20 @@ public class GlobalWatermarkHolder {
                 nextLowWatermark, nextHighWatermark));
         checkState(nextSynchronizedProcessingTime.isAfter(currentSynchronizedProcessingTime),
             "Synchronized processing time must advance.");
-        newValues.put(
+        newBroadcast.put(
             sourceId,
             new SparkWatermarks(
                 nextLowWatermark, nextHighWatermark, nextSynchronizedProcessingTime));
       }
 
       // update the watermarks broadcast only if something has changed.
-      if (!newValues.isEmpty()) {
-        driverWatermarks = newValues;
-        blockManager.removeBlock(WATERMARKS_BLOCK_ID, true);
-        blockManager.putSingle(
-            WATERMARKS_BLOCK_ID,
-            newValues,
-            StorageLevel.MEMORY_ONLY(),
-            true);
+      if (!newBroadcast.isEmpty()) {
+        if (broadcast != null) {
+          // for now this is blocking, we could make this asynchronous
+          // but it could slow down WM propagation.
+          broadcast.destroy();
+        }
+        broadcast = jsc.broadcast(newBroadcast);
       }
     }
   }
@@ -193,12 +140,7 @@ public class GlobalWatermarkHolder {
   @VisibleForTesting
   public static synchronized void clear() {
     sourceTimes.clear();
-    driverWatermarks = null;
-    SparkEnv sparkEnv = SparkEnv.get();
-    if (sparkEnv != null) {
-      BlockManager blockManager = sparkEnv.blockManager();
-      blockManager.removeBlock(WATERMARKS_BLOCK_ID, true);
-    }
+    broadcast = null;
   }
 
   /**
@@ -243,24 +185,15 @@ public class GlobalWatermarkHolder {
 
   /** Advance the WMs onBatchCompleted event. */
   public static class WatermarksListener extends JavaStreamingListener {
-    @Override
-    public void onBatchCompleted(JavaStreamingListenerBatchCompleted batchCompleted) {
-      GlobalWatermarkHolder.advance();
-    }
-  }
+    private final JavaStreamingContext jssc;
 
-  private static class WatermarksLoader extends CacheLoader<String, Map<Integer, SparkWatermarks>> {
+    public WatermarksListener(JavaStreamingContext jssc) {
+      this.jssc = jssc;
+    }
 
-    @SuppressWarnings("unchecked")
     @Override
-    public Map<Integer, SparkWatermarks> load(@Nonnull String key) throws Exception {
-      Option<BlockResult> blockResultOption =
-          SparkEnv.get().blockManager().getRemote(WATERMARKS_BLOCK_ID);
-      if (blockResultOption.isDefined()) {
-        return (Map<Integer, SparkWatermarks>) blockResultOption.get().data().next();
-      } else {
-        return Maps.newHashMap();
-      }
+    public void onBatchCompleted(JavaStreamingListenerBatchCompleted batchCompleted) {
+      GlobalWatermarkHolder.advance(jssc.sparkContext());
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/spark/src/test/java/org/apache/beam/runners/spark/GlobalWatermarkHolderTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/GlobalWatermarkHolderTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/GlobalWatermarkHolderTest.java
index 1708123..47a6e3f 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/GlobalWatermarkHolderTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/GlobalWatermarkHolderTest.java
@@ -65,17 +65,17 @@ public class GlobalWatermarkHolderTest {
             instant.plus(Duration.millis(5)),
             instant.plus(Duration.millis(5)),
             instant));
-    GlobalWatermarkHolder.advance();
+    GlobalWatermarkHolder.advance(jsc);
     // low < high.
     GlobalWatermarkHolder.add(1,
         new SparkWatermarks(
             instant.plus(Duration.millis(10)),
             instant.plus(Duration.millis(15)),
             instant.plus(Duration.millis(100))));
-    GlobalWatermarkHolder.advance();
+    GlobalWatermarkHolder.advance(jsc);
 
     // assert watermarks in Broadcast.
-    SparkWatermarks currentWatermarks = GlobalWatermarkHolder.get(0L).get(1);
+    SparkWatermarks currentWatermarks = GlobalWatermarkHolder.get().getValue().get(1);
     assertThat(currentWatermarks.getLowWatermark(), equalTo(instant.plus(Duration.millis(10))));
     assertThat(currentWatermarks.getHighWatermark(), equalTo(instant.plus(Duration.millis(15))));
     assertThat(currentWatermarks.getSynchronizedProcessingTime(),
@@ -93,7 +93,7 @@ public class GlobalWatermarkHolderTest {
             instant.plus(Duration.millis(25)),
             instant.plus(Duration.millis(20)),
             instant.plus(Duration.millis(200))));
-    GlobalWatermarkHolder.advance();
+    GlobalWatermarkHolder.advance(jsc);
   }
 
   @Test
@@ -106,7 +106,7 @@ public class GlobalWatermarkHolderTest {
             instant.plus(Duration.millis(5)),
             instant.plus(Duration.millis(10)),
             instant));
-    GlobalWatermarkHolder.advance();
+    GlobalWatermarkHolder.advance(jsc);
 
     thrown.expect(IllegalStateException.class);
     thrown.expectMessage("Synchronized processing time must advance.");
@@ -117,7 +117,7 @@ public class GlobalWatermarkHolderTest {
             instant.plus(Duration.millis(5)),
             instant.plus(Duration.millis(10)),
             instant));
-    GlobalWatermarkHolder.advance();
+    GlobalWatermarkHolder.advance(jsc);
   }
 
   @Test
@@ -136,15 +136,15 @@ public class GlobalWatermarkHolderTest {
             instant.plus(Duration.millis(6)),
             instant));
 
-    GlobalWatermarkHolder.advance();
+    GlobalWatermarkHolder.advance(jsc);
 
     // assert watermarks for source 1.
-    SparkWatermarks watermarksForSource1 = GlobalWatermarkHolder.get(0L).get(1);
+    SparkWatermarks watermarksForSource1 = GlobalWatermarkHolder.get().getValue().get(1);
     assertThat(watermarksForSource1.getLowWatermark(), equalTo(instant.plus(Duration.millis(5))));
     assertThat(watermarksForSource1.getHighWatermark(), equalTo(instant.plus(Duration.millis(10))));
 
     // assert watermarks for source 2.
-    SparkWatermarks watermarksForSource2 = GlobalWatermarkHolder.get(0L).get(2);
+    SparkWatermarks watermarksForSource2 = GlobalWatermarkHolder.get().getValue().get(2);
     assertThat(watermarksForSource2.getLowWatermark(), equalTo(instant.plus(Duration.millis(3))));
     assertThat(watermarksForSource2.getHighWatermark(), equalTo(instant.plus(Duration.millis(6))));
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
index 246eb81..64ff98c 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
@@ -52,6 +52,7 @@ import org.hamcrest.Matchers;
 import org.joda.time.Duration;
 import org.junit.Test;
 
+
 /**
  * Test {@link SparkRunnerDebugger} with different pipelines.
  */
@@ -84,20 +85,17 @@ public class SparkRunnerDebuggerTest {
         .apply(MapElements.via(new WordCount.FormatAsTextFn()))
         .apply(TextIO.write().to("!!PLACEHOLDER-OUTPUT-DIR!!").withNumShards(3).withSuffix(".txt"));
 
-    final String expectedPipeline =
-        "sparkContext.parallelize(Arrays.asList(...))\n"
-            + "_.mapPartitions("
-            + "new org.apache.beam.runners.spark.examples.WordCount$ExtractWordsFn())\n"
-            + "_.mapPartitions(new org.apache.beam.sdk.transforms.Count$PerElement$1())\n"
-            + "_.combineByKey(..., new org.apache.beam.sdk.transforms.Count$CountFn(), ...)\n"
-            + "_.groupByKey()\n"
-            + "_.map(new org.apache.beam.sdk.transforms.Sum$SumLongFn())\n"
-            + "_.mapPartitions(new org.apache.beam.runners.spark"
-            + ".SparkRunnerDebuggerTest$PlusOne())\n"
-            + "sparkContext.union(...)\n"
-            + "_.mapPartitions("
-            + "new org.apache.beam.runners.spark.examples.WordCount$FormatAsTextFn())\n"
-            + "_.<org.apache.beam.sdk.io.TextIO$Write>";
+    final String expectedPipeline = "sparkContext.parallelize(Arrays.asList(...))\n"
+        + "_.mapPartitions(new org.apache.beam.runners.spark.examples.WordCount$ExtractWordsFn())\n"
+        + "_.mapPartitions(new org.apache.beam.sdk.transforms.Count$PerElement$1())\n"
+        + "_.combineByKey(..., new org.apache.beam.sdk.transforms.Count$CountFn(), ...)\n"
+        + "_.groupByKey()\n"
+        + "_.map(new org.apache.beam.sdk.transforms.Sum$SumLongFn())\n"
+        + "_.mapPartitions(new org.apache.beam.runners.spark"
+        + ".SparkRunnerDebuggerTest$PlusOne())\n"
+        + "sparkContext.union(...)\n"
+        + "_.mapPartitions(new org.apache.beam.runners.spark.examples.WordCount$FormatAsTextFn())\n"
+        + "_.<org.apache.beam.sdk.io.AutoValue_TextIO_Write>";
 
     SparkRunnerDebugger.DebugSparkPipelineResult result =
         (SparkRunnerDebugger.DebugSparkPipelineResult) pipeline.run();