You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/01/06 20:07:56 UTC

[1/5] beam git commit: Add explicit translation builder for a Step to in Dataflow translator

Repository: beam
Updated Branches:
  refs/heads/master b4d870272 -> a6caa82a6


Add explicit translation builder for a Step to in Dataflow translator

Previously, there was always a "current" step that was the most recent
step created. This makes it cumbersome or impossible to do things like
translate one primitive transform into a small subgraph of steps. Thus
we added hacks like CreatePCollectionView which are not actually part
of the model at all - in fact, we should be able to add the needed
CollectionToSingleton steps simply by looking at the side inputs of a
ParDo node.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f04537cc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f04537cc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f04537cc

Branch: refs/heads/master
Commit: f04537ccbc2897ea4337941d5ca8121432daef43
Parents: b4d8702
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Dec 21 14:34:27 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Jan 6 11:24:49 2017 -0800

----------------------------------------------------------------------
 .../dataflow/DataflowPipelineTranslator.java    | 313 ++++++++++---------
 .../beam/runners/dataflow/DataflowRunner.java   |  60 ++--
 .../dataflow/internal/ReadTranslator.java       |   9 +-
 .../runners/dataflow/DataflowRunnerTest.java    |   5 +-
 4 files changed, 196 insertions(+), 191 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/f04537cc/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index 8d2b076..2385fa1 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -213,14 +213,12 @@ public class DataflowPipelineTranslator {
   }
 
   /**
-   * A {@link TransformTranslator} knows how to translate
-   * a particular subclass of {@link PTransform} for the
-   * Cloud Dataflow service. It does so by
-   * mutating the {@link TranslationContext}.
+   * A {@link TransformTranslator} knows how to translate a particular subclass of {@link
+   * PTransform} for the Cloud Dataflow service. It does so by mutating the {@link
+   * TranslationContext}.
    */
   public interface TransformTranslator<TransformT extends PTransform> {
-    void translate(TransformT transform,
-                          TranslationContext context);
+    void translate(TransformT transform, TranslationContext context);
   }
 
   /**
@@ -252,10 +250,8 @@ public class DataflowPipelineTranslator {
     /**
      * Adds a step to the Dataflow workflow for the given transform, with
      * the given Dataflow step type.
-     * This step becomes "current" for the purpose of {@link #addInput} and
-     * {@link #addOutput}.
      */
-    void addStep(PTransform<?, ?> transform, String type);
+    StepTranslationContext addStep(PTransform<?, ?> transform, String type);
 
     /**
      * Adds a pre-defined step to the Dataflow workflow. The given PTransform should be
@@ -264,8 +260,14 @@ public class DataflowPipelineTranslator {
      * <p>This is a low-level operation, when using this method it is up to
      * the caller to ensure that names do not collide.
      */
-    void addStep(PTransform<?, ? extends PValue> transform, Step step);
+    Step addStep(PTransform<?, ? extends PValue> transform, Step step);
+    /**
+     * Encode a PValue reference as an output reference.
+     */
+    OutputReference asOutputReference(PValue value);
+  }
 
+  public interface StepTranslationContext {
     /**
      * Sets the encoding for the current Dataflow step.
      */
@@ -330,12 +332,7 @@ public class DataflowPipelineTranslator {
      * output encoding.  Returns a pipeline level unique id.
      */
     long addCollectionToSingletonOutput(PValue inputValue,
-                                               PValue outputValue);
-
-    /**
-     * Encode a PValue reference as an output reference.
-     */
-    OutputReference asOutputReference(PValue value);
+        PValue outputValue);
   }
 
 
@@ -343,6 +340,8 @@ public class DataflowPipelineTranslator {
 
   /**
    * Translates a Pipeline into the Dataflow representation.
+   *
+   * <p>For internal use only.
    */
   class Translator extends PipelineVisitor.Defaults implements TranslationContext {
     /**
@@ -368,11 +367,6 @@ public class DataflowPipelineTranslator {
     private final Job job = new Job();
 
     /**
-     * Translator is stateful, as addProperty calls refer to the current step.
-     */
-    private Step currentStep;
-
-    /**
      * A Map from AppliedPTransform to their unique Dataflow step names.
      */
     private final Map<AppliedPTransform<?, ?, ?>, String> stepNames = new HashMap<>();
@@ -546,7 +540,7 @@ public class DataflowPipelineTranslator {
     }
 
     @Override
-    public void addStep(PTransform<?, ?> transform, String type) {
+    public StepTranslator addStep(PTransform<?, ?> transform, String type) {
       String stepName = genStepName();
       if (stepNames.put(getCurrentTransform(transform), stepName) != null) {
         throw new IllegalArgumentException(
@@ -559,16 +553,19 @@ public class DataflowPipelineTranslator {
         job.setSteps(steps);
       }
 
-      currentStep = new Step();
-      currentStep.setName(stepName);
-      currentStep.setKind(type);
-      steps.add(currentStep);
-      addInput(PropertyNames.USER_NAME, getFullName(transform));
-      addDisplayData(stepName, transform);
+      Step step = new Step();
+      step.setName(stepName);
+      step.setKind(type);
+      steps.add(step);
+
+      StepTranslator stepContext = new StepTranslator(this, step);
+      stepContext.addInput(PropertyNames.USER_NAME, getFullName(transform));
+      stepContext.addDisplayData(step, stepName, transform);
+      return stepContext;
     }
 
     @Override
-    public void addStep(PTransform<?, ? extends PValue> transform, Step original) {
+    public Step addStep(PTransform<?, ? extends PValue> transform, Step original) {
       Step step = original.clone();
       String stepName = step.getName();
       if (stepNames.put(getCurrentTransform(transform), stepName) != null) {
@@ -605,8 +602,59 @@ public class DataflowPipelineTranslator {
         steps = new LinkedList<>();
         job.setSteps(steps);
       }
-      currentStep = step;
       steps.add(step);
+      return step;
+    }
+
+    @Override
+    public OutputReference asOutputReference(PValue value) {
+      AppliedPTransform<?, ?, ?> transform =
+          value.getProducingTransformInternal();
+      String stepName = stepNames.get(transform);
+      if (stepName == null) {
+        throw new IllegalArgumentException(transform + " doesn't have a name specified");
+      }
+
+      String outputName = outputNames.get(value);
+      if (outputName == null) {
+        throw new IllegalArgumentException(
+            "output " + value + " doesn't have a name specified");
+      }
+
+      return new OutputReference(stepName, outputName);
+    }
+
+    /**
+     * Returns a fresh Dataflow step name.
+     */
+    private String genStepName() {
+      return "s" + (stepNames.size() + 1);
+    }
+
+    /**
+     * Records the name of the given output PValue,
+     * within its producing transform.
+     */
+    private void registerOutputName(POutput value, String name) {
+      if (outputNames.put(value, name) != null) {
+        throw new IllegalArgumentException(
+            "output " + value + " already has a name specified");
+      }
+    }
+  }
+
+  static class StepTranslator implements StepTranslationContext {
+
+    private final Translator translator;
+    private final Step step;
+
+    private StepTranslator(Translator translator, Step step) {
+      this.translator = translator;
+      this.step = step;
+    }
+
+    private Map<String, Object> getProperties() {
+      return DataflowPipelineTranslator.getProperties(step);
     }
 
     @Override
@@ -643,7 +691,7 @@ public class DataflowPipelineTranslator {
     @Override
     public void addInput(String name, PInput value) {
       if (value instanceof PValue) {
-        addInput(name, asOutputReference((PValue) value));
+        addInput(name, translator.asOutputReference((PValue) value));
       } else {
         throw new IllegalStateException("Input must be a PValue");
       }
@@ -685,10 +733,10 @@ public class DataflowPipelineTranslator {
     }
 
     @Override
-    public long addCollectionToSingletonOutput(PValue inputValue,
-                                               PValue outputValue) {
+    public long addCollectionToSingletonOutput(
+        PValue inputValue, PValue outputValue) {
       Coder<?> inputValueCoder =
-          checkNotNull(outputCoders.get(inputValue));
+          checkNotNull(translator.outputCoders.get(inputValue));
       // The inputValueCoder for the input PCollection should be some
       // WindowedValueCoder of the input PCollection's element
       // coder.
@@ -707,8 +755,8 @@ public class DataflowPipelineTranslator {
      * with the given {@code Coder} (if not {@code null}).
      */
     private long addOutput(PValue value, Coder<?> valueCoder) {
-      long id = idGenerator.get();
-      registerOutputName(value, Long.toString(id));
+      long id = translator.idGenerator.get();
+      translator.registerOutputName(value, Long.toString(id));
 
       Map<String, Object> properties = getProperties();
       @Nullable List<Map<String, Object>> outputInfoList = null;
@@ -728,7 +776,7 @@ public class DataflowPipelineTranslator {
       addString(outputInfo, PropertyNames.OUTPUT_NAME, Long.toString(id));
       addString(outputInfo, PropertyNames.USER_NAME, value.getName());
       if (value instanceof PCollection
-          && runner.doesPCollectionRequireIndexedFormat((PCollection<?>) value)) {
+          && translator.runner.doesPCollectionRequireIndexedFormat((PCollection<?>) value)) {
         addBoolean(outputInfo, PropertyNames.USE_INDEXED_FORMAT, true);
       }
       if (valueCoder != null) {
@@ -736,63 +784,19 @@ public class DataflowPipelineTranslator {
         // failures as early as possible.
         CloudObject encoding = SerializableUtils.ensureSerializable(valueCoder);
         addObject(outputInfo, PropertyNames.ENCODING, encoding);
-        outputCoders.put(value, valueCoder);
+        translator.outputCoders.put(value, valueCoder);
       }
 
       outputInfoList.add(outputInfo);
       return id;
     }
 
-    private void addDisplayData(String stepName, HasDisplayData hasDisplayData) {
+    private void addDisplayData(Step step, String stepName, HasDisplayData hasDisplayData) {
       DisplayData displayData = DisplayData.from(hasDisplayData);
       List<Map<String, Object>> list = MAPPER.convertValue(displayData, List.class);
       addList(getProperties(), PropertyNames.DISPLAY_DATA, list);
     }
 
-    @Override
-    public OutputReference asOutputReference(PValue value) {
-      AppliedPTransform<?, ?, ?> transform =
-          value.getProducingTransformInternal();
-      String stepName = stepNames.get(transform);
-      if (stepName == null) {
-        throw new IllegalArgumentException(transform + " doesn't have a name specified");
-      }
-
-      String outputName = outputNames.get(value);
-      if (outputName == null) {
-        throw new IllegalArgumentException(
-            "output " + value + " doesn't have a name specified");
-      }
-
-      return new OutputReference(stepName, outputName);
-    }
-
-    private Map<String, Object> getProperties() {
-      Map<String, Object> properties = currentStep.getProperties();
-      if (properties == null) {
-        properties = new HashMap<>();
-        currentStep.setProperties(properties);
-      }
-      return properties;
-    }
-
-    /**
-     * Returns a fresh Dataflow step name.
-     */
-    private String genStepName() {
-      return "s" + (stepNames.size() + 1);
-    }
-
-    /**
-     * Records the name of the given output PValue,
-     * within its producing transform.
-     */
-    private void registerOutputName(POutput value, String name) {
-      if (outputNames.put(value, name) != null) {
-        throw new IllegalArgumentException(
-            "output " + value + " already has a name specified");
-      }
-    }
   }
 
   /////////////////////////////////////////////////////////////////////////////
@@ -802,6 +806,14 @@ public class DataflowPipelineTranslator {
     return "DataflowPipelineTranslator#" + hashCode();
   }
 
+  private static Map<String, Object> getProperties(Step step) {
+    Map<String, Object> properties = step.getProperties();
+    if (properties == null) {
+      properties = new HashMap<>();
+      step.setProperties(properties);
+    }
+    return properties;
+  }
 
   ///////////////////////////////////////////////////////////////////////////
 
@@ -810,20 +822,17 @@ public class DataflowPipelineTranslator {
         View.CreatePCollectionView.class,
         new TransformTranslator<View.CreatePCollectionView>() {
           @Override
-          public void translate(
-              View.CreatePCollectionView transform,
-              TranslationContext context) {
+          public void translate(View.CreatePCollectionView transform, TranslationContext context) {
             translateTyped(transform, context);
           }
 
           private <ElemT, ViewT> void translateTyped(
-              View.CreatePCollectionView<ElemT, ViewT> transform,
-              TranslationContext context) {
-            context.addStep(transform, "CollectionToSingleton");
-            context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform));
-            context.addCollectionToSingletonOutput(
-                context.getInput(transform),
-                context.getOutput(transform));
+              View.CreatePCollectionView<ElemT, ViewT> transform, TranslationContext context) {
+            StepTranslationContext stepContext =
+                context.addStep(transform, "CollectionToSingleton");
+            stepContext.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform));
+            stepContext.addCollectionToSingletonOutput(
+                context.getInput(transform), context.getOutput(transform));
           }
         });
 
@@ -839,21 +848,21 @@ public class DataflowPipelineTranslator {
 
           private <K, InputT, OutputT> void translateHelper(
               final Combine.GroupedValues<K, InputT, OutputT> transform,
-              DataflowPipelineTranslator.TranslationContext context) {
-            context.addStep(transform, "CombineValues");
-            translateInputs(context.getInput(transform), transform.getSideInputs(), context);
+              TranslationContext context) {
+            StepTranslationContext stepContext = context.addStep(transform, "CombineValues");
+            translateInputs(
+                stepContext, context.getInput(transform), transform.getSideInputs(), context);
 
             AppliedCombineFn<? super K, ? super InputT, ?, OutputT> fn =
                 transform.getAppliedFn(
                     context.getInput(transform).getPipeline().getCoderRegistry(),
-                context.getInput(transform).getCoder(),
-                context.getInput(transform).getWindowingStrategy());
+                    context.getInput(transform).getCoder(),
+                    context.getInput(transform).getWindowingStrategy());
 
-            context.addEncodingInput(fn.getAccumulatorCoder());
-            context.addInput(
-                PropertyNames.SERIALIZED_FN,
-                byteArrayToJsonString(serializeToByteArray(fn)));
-            context.addOutput(context.getOutput(transform));
+            stepContext.addEncodingInput(fn.getAccumulatorCoder());
+            stepContext.addInput(
+                PropertyNames.SERIALIZED_FN, byteArrayToJsonString(serializeToByteArray(fn)));
+            stepContext.addOutput(context.getOutput(transform));
           }
         });
 
@@ -870,14 +879,14 @@ public class DataflowPipelineTranslator {
           private <T> void flattenHelper(
               Flatten.FlattenPCollectionList<T> transform,
               TranslationContext context) {
-            context.addStep(transform, "Flatten");
+            StepTranslationContext stepContext = context.addStep(transform, "Flatten");
 
             List<OutputReference> inputs = new LinkedList<>();
             for (PCollection<T> input : context.getInput(transform).getAll()) {
               inputs.add(context.asOutputReference(input));
             }
-            context.addInput(PropertyNames.INPUTS, inputs);
-            context.addOutput(context.getOutput(transform));
+            stepContext.addInput(PropertyNames.INPUTS, inputs);
+            stepContext.addOutput(context.getOutput(transform));
           }
         });
 
@@ -885,23 +894,19 @@ public class DataflowPipelineTranslator {
         GroupByKeyAndSortValuesOnly.class,
         new TransformTranslator<GroupByKeyAndSortValuesOnly>() {
           @Override
-          public void translate(
-              GroupByKeyAndSortValuesOnly transform,
-              TranslationContext context) {
+          public void translate(GroupByKeyAndSortValuesOnly transform, TranslationContext context) {
             groupByKeyAndSortValuesHelper(transform, context);
           }
 
           private <K1, K2, V> void groupByKeyAndSortValuesHelper(
-              GroupByKeyAndSortValuesOnly<K1, K2, V> transform,
-              TranslationContext context) {
-            context.addStep(transform, "GroupByKey");
-            context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform));
-            context.addOutput(context.getOutput(transform));
-            context.addInput(PropertyNames.SORT_VALUES, true);
+              GroupByKeyAndSortValuesOnly<K1, K2, V> transform, TranslationContext context) {
+            StepTranslationContext stepContext = context.addStep(transform, "GroupByKey");
+            stepContext.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform));
+            stepContext.addOutput(context.getOutput(transform));
+            stepContext.addInput(PropertyNames.SORT_VALUES, true);
 
             // TODO: Add support for combiner lifting once the need arises.
-            context.addInput(
-                PropertyNames.DISALLOW_COMBINER_LIFTING, true);
+            stepContext.addInput(PropertyNames.DISALLOW_COMBINER_LIFTING, true);
           }
         });
 
@@ -918,9 +923,9 @@ public class DataflowPipelineTranslator {
           private <K, V> void groupByKeyHelper(
               GroupByKey<K, V> transform,
               TranslationContext context) {
-            context.addStep(transform, "GroupByKey");
-            context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform));
-            context.addOutput(context.getOutput(transform));
+            StepTranslationContext stepContext = context.addStep(transform, "GroupByKey");
+            stepContext.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform));
+            stepContext.addOutput(context.getOutput(transform));
 
             WindowingStrategy<?, ?> windowingStrategy =
                 context.getInput(transform).getWindowingStrategy();
@@ -931,12 +936,12 @@ public class DataflowPipelineTranslator {
                 || (isStreaming && !transform.fewKeys())
                 // TODO: Allow combiner lifting on the non-default trigger, as appropriate.
                 || !(windowingStrategy.getTrigger() instanceof DefaultTrigger);
-            context.addInput(
+            stepContext.addInput(
                 PropertyNames.DISALLOW_COMBINER_LIFTING, disallowCombinerLifting);
-            context.addInput(
+            stepContext.addInput(
                 PropertyNames.SERIALIZED_FN,
                 byteArrayToJsonString(serializeToByteArray(windowingStrategy)));
-            context.addInput(
+            stepContext.addInput(
                 PropertyNames.IS_MERGING_WINDOW_FN,
                 !windowingStrategy.getWindowFn().isNonMerging());
           }
@@ -946,22 +951,21 @@ public class DataflowPipelineTranslator {
         ParDo.BoundMulti.class,
         new TransformTranslator<ParDo.BoundMulti>() {
           @Override
-          public void translate(
-              ParDo.BoundMulti transform,
-              TranslationContext context) {
+          public void translate(ParDo.BoundMulti transform, TranslationContext context) {
             translateMultiHelper(transform, context);
           }
 
           private <InputT, OutputT> void translateMultiHelper(
-              ParDo.BoundMulti<InputT, OutputT> transform,
-              TranslationContext context) {
-            rejectStatefulDoFn(transform.getFn());
+              ParDo.BoundMulti<InputT, OutputT> transform, TranslationContext context) {
+            DataflowPipelineTranslator.rejectStatefulDoFn(transform.getFn());
 
-            context.addStep(transform, "ParallelDo");
-            translateInputs(context.getInput(transform), transform.getSideInputs(), context);
+            StepTranslationContext stepContext = context.addStep(transform, "ParallelDo");
+            translateInputs(
+                stepContext, context.getInput(transform), transform.getSideInputs(), context);
             BiMap<Long, TupleTag<?>> outputMap =
-                translateOutputs(context.getOutput(transform), context);
+                translateOutputs(context.getOutput(transform), stepContext);
             translateFn(
+                stepContext,
                 transform.getFn(),
                 context.getInput(transform).getWindowingStrategy(),
                 transform.getSideInputs(),
@@ -976,30 +980,28 @@ public class DataflowPipelineTranslator {
         ParDo.Bound.class,
         new TransformTranslator<ParDo.Bound>() {
           @Override
-          public void translate(
-              ParDo.Bound transform,
-              TranslationContext context) {
+          public void translate(ParDo.Bound transform, TranslationContext context) {
             translateSingleHelper(transform, context);
           }
 
           private <InputT, OutputT> void translateSingleHelper(
-              ParDo.Bound<InputT, OutputT> transform,
-              TranslationContext context) {
+              ParDo.Bound<InputT, OutputT> transform, TranslationContext context) {
             rejectStatefulDoFn(transform.getFn());
 
-            context.addStep(transform, "ParallelDo");
-            translateInputs(context.getInput(transform), transform.getSideInputs(), context);
-            long mainOutput = context.addOutput(context.getOutput(transform));
+            StepTranslationContext stepContext = context.addStep(transform, "ParallelDo");
+            translateInputs(
+                stepContext, context.getInput(transform), transform.getSideInputs(), context);
+            long mainOutput = stepContext.addOutput(context.getOutput(transform));
             translateFn(
+                stepContext,
                 transform.getFn(),
                 context.getInput(transform).getWindowingStrategy(),
                 transform.getSideInputs(),
                 context.getInput(transform).getCoder(),
                 context,
                 mainOutput,
-                ImmutableMap.<Long, TupleTag<?>>of(mainOutput,
-                  new TupleTag<>(PropertyNames.OUTPUT)));
-
+                ImmutableMap.<Long, TupleTag<?>>of(
+                    mainOutput, new TupleTag<>(PropertyNames.OUTPUT)));
           }
         });
 
@@ -1014,16 +1016,16 @@ public class DataflowPipelineTranslator {
 
           private <T> void translateHelper(
               Window.Bound<T> transform, TranslationContext context) {
-            context.addStep(transform, "Bucket");
-            context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform));
-            context.addOutput(context.getOutput(transform));
+            StepTranslationContext stepContext = context.addStep(transform, "Bucket");
+            stepContext.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform));
+            stepContext.addOutput(context.getOutput(transform));
 
             WindowingStrategy<?, ?> strategy = context.getOutput(transform).getWindowingStrategy();
             byte[] serializedBytes = serializeToByteArray(strategy);
             String serializedJson = byteArrayToJsonString(serializedBytes);
             assert Arrays.equals(serializedBytes,
                                  jsonStringToByteArray(serializedJson));
-            context.addInput(PropertyNames.SERIALIZED_FN, serializedJson);
+            stepContext.addInput(PropertyNames.SERIALIZED_FN, serializedJson);
           }
         });
 
@@ -1046,15 +1048,17 @@ public class DataflowPipelineTranslator {
   }
 
   private static void translateInputs(
+      StepTranslationContext stepContext,
       PCollection<?> input,
       List<PCollectionView<?>> sideInputs,
       TranslationContext context) {
-    context.addInput(PropertyNames.PARALLEL_INPUT, input);
-    translateSideInputs(sideInputs, context);
+    stepContext.addInput(PropertyNames.PARALLEL_INPUT, input);
+    translateSideInputs(stepContext, sideInputs, context);
   }
 
   // Used for ParDo
   private static void translateSideInputs(
+      StepTranslationContext stepContext,
       List<PCollectionView<?>> sideInputs,
       TranslationContext context) {
     Map<String, Object> nonParInputs = new HashMap<>();
@@ -1065,10 +1069,11 @@ public class DataflowPipelineTranslator {
           context.asOutputReference(view));
     }
 
-    context.addInput(PropertyNames.NON_PARALLEL_INPUTS, nonParInputs);
+    stepContext.addInput(PropertyNames.NON_PARALLEL_INPUTS, nonParInputs);
   }
 
   private static void translateFn(
+      StepTranslationContext stepContext,
       DoFn fn,
       WindowingStrategy windowingStrategy,
       Iterable<PCollectionView<?>> sideInputs,
@@ -1076,8 +1081,8 @@ public class DataflowPipelineTranslator {
       TranslationContext context,
       long mainOutput,
       Map<Long, TupleTag<?>> outputMap) {
-    context.addInput(PropertyNames.USER_FN, fn.getClass().getName());
-    context.addInput(
+    stepContext.addInput(PropertyNames.USER_FN, fn.getClass().getName());
+    stepContext.addInput(
         PropertyNames.SERIALIZED_FN,
         byteArrayToJsonString(
             serializeToByteArray(
@@ -1087,13 +1092,13 @@ public class DataflowPipelineTranslator {
 
   private static BiMap<Long, TupleTag<?>> translateOutputs(
       PCollectionTuple outputs,
-      TranslationContext context) {
+      StepTranslationContext stepContext) {
     ImmutableBiMap.Builder<Long, TupleTag<?>> mapBuilder = ImmutableBiMap.builder();
     for (Map.Entry<TupleTag<?>, PCollection<?>> entry
              : outputs.getAll().entrySet()) {
       TupleTag<?> tag = entry.getKey();
       PCollection<?> output = entry.getValue();
-      mapBuilder.put(context.addOutput(output), tag);
+      mapBuilder.put(stepContext.addOutput(output), tag);
     }
     return mapBuilder.build();
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/f04537cc/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 03e5dfc..d2c1e66 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -33,6 +33,7 @@ import com.google.api.services.clouddebugger.v2.model.RegisterDebuggeeResponse;
 import com.google.api.services.dataflow.model.DataflowPackage;
 import com.google.api.services.dataflow.model.Job;
 import com.google.api.services.dataflow.model.ListJobsResponse;
+import com.google.api.services.dataflow.model.Step;
 import com.google.api.services.dataflow.model.WorkerPool;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
@@ -72,6 +73,7 @@ import java.util.SortedSet;
 import java.util.TreeSet;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.JobSpecification;
+import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.StepTranslationContext;
 import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TransformTranslator;
 import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TranslationContext;
 import org.apache.beam.runners.dataflow.internal.AssignWindows;
@@ -2116,50 +2118,46 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     }
   }
 
-  /**
-   * Rewrite {@link StreamingPubsubIORead} to the appropriate internal node.
-   */
-  private static class StreamingPubsubIOReadTranslator<T> implements
-      TransformTranslator<StreamingPubsubIORead<T>> {
+  /** Rewrite {@link StreamingPubsubIORead} to the appropriate internal node. */
+  private static class StreamingPubsubIOReadTranslator<T>
+      implements TransformTranslator<StreamingPubsubIORead<T>> {
     @Override
-    public void translate(
-        StreamingPubsubIORead<T> transform,
-        TranslationContext context) {
-      checkArgument(context.getPipelineOptions().isStreaming(),
-                    "StreamingPubsubIORead is only for streaming pipelines.");
+    public void translate(StreamingPubsubIORead<T> transform, TranslationContext context) {
+      checkArgument(
+          context.getPipelineOptions().isStreaming(),
+          "StreamingPubsubIORead is only for streaming pipelines.");
       PubsubUnboundedSource<T> overriddenTransform = transform.getOverriddenTransform();
-      context.addStep(transform, "ParallelRead");
-      context.addInput(PropertyNames.FORMAT, "pubsub");
+      StepTranslationContext stepContext = context.addStep(transform, "ParallelRead");
+      stepContext.addInput(PropertyNames.FORMAT, "pubsub");
       if (overriddenTransform.getTopicProvider() != null) {
         if (overriddenTransform.getTopicProvider().isAccessible()) {
-          context.addInput(
+          stepContext.addInput(
               PropertyNames.PUBSUB_TOPIC, overriddenTransform.getTopic().getV1Beta1Path());
         } else {
-          context.addInput(
+          stepContext.addInput(
               PropertyNames.PUBSUB_TOPIC_OVERRIDE,
               ((NestedValueProvider) overriddenTransform.getTopicProvider()).propertyName());
         }
       }
       if (overriddenTransform.getSubscriptionProvider() != null) {
         if (overriddenTransform.getSubscriptionProvider().isAccessible()) {
-          context.addInput(
+          stepContext.addInput(
               PropertyNames.PUBSUB_SUBSCRIPTION,
               overriddenTransform.getSubscription().getV1Beta1Path());
         } else {
-          context.addInput(
+          stepContext.addInput(
               PropertyNames.PUBSUB_SUBSCRIPTION_OVERRIDE,
-              ((NestedValueProvider) overriddenTransform.getSubscriptionProvider())
-              .propertyName());
+              ((NestedValueProvider) overriddenTransform.getSubscriptionProvider()).propertyName());
         }
       }
       if (overriddenTransform.getTimestampLabel() != null) {
-        context.addInput(PropertyNames.PUBSUB_TIMESTAMP_LABEL,
-                         overriddenTransform.getTimestampLabel());
+        stepContext.addInput(
+            PropertyNames.PUBSUB_TIMESTAMP_LABEL, overriddenTransform.getTimestampLabel());
       }
       if (overriddenTransform.getIdLabel() != null) {
-        context.addInput(PropertyNames.PUBSUB_ID_LABEL, overriddenTransform.getIdLabel());
+        stepContext.addInput(PropertyNames.PUBSUB_ID_LABEL, overriddenTransform.getIdLabel());
       }
-      context.addValueOnlyOutput(context.getOutput(transform));
+      stepContext.addValueOnlyOutput(context.getOutput(transform));
     }
   }
 
@@ -2211,26 +2209,26 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
       checkArgument(context.getPipelineOptions().isStreaming(),
                     "StreamingPubsubIOWrite is only for streaming pipelines.");
       PubsubUnboundedSink<T> overriddenTransform = transform.getOverriddenTransform();
-      context.addStep(transform, "ParallelWrite");
-      context.addInput(PropertyNames.FORMAT, "pubsub");
+      StepTranslationContext stepContext = context.addStep(transform, "ParallelWrite");
+      stepContext.addInput(PropertyNames.FORMAT, "pubsub");
       if (overriddenTransform.getTopicProvider().isAccessible()) {
-        context.addInput(
+        stepContext.addInput(
             PropertyNames.PUBSUB_TOPIC, overriddenTransform.getTopic().getV1Beta1Path());
       } else {
-        context.addInput(
+        stepContext.addInput(
             PropertyNames.PUBSUB_TOPIC_OVERRIDE,
             ((NestedValueProvider) overriddenTransform.getTopicProvider()).propertyName());
       }
       if (overriddenTransform.getTimestampLabel() != null) {
-        context.addInput(PropertyNames.PUBSUB_TIMESTAMP_LABEL,
-                         overriddenTransform.getTimestampLabel());
+        stepContext.addInput(
+            PropertyNames.PUBSUB_TIMESTAMP_LABEL, overriddenTransform.getTimestampLabel());
       }
       if (overriddenTransform.getIdLabel() != null) {
-        context.addInput(PropertyNames.PUBSUB_ID_LABEL, overriddenTransform.getIdLabel());
+        stepContext.addInput(PropertyNames.PUBSUB_ID_LABEL, overriddenTransform.getIdLabel());
       }
-      context.addEncodingInput(
+      stepContext.addEncodingInput(
           WindowedValue.getValueOnlyCoder(overriddenTransform.getElementCoder()));
-      context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform));
+      stepContext.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/f04537cc/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java
index 84950f7..1a5a9a5 100755
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java
@@ -25,6 +25,7 @@ import com.google.api.services.dataflow.model.SourceMetadata;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.beam.runners.dataflow.DataflowPipelineTranslator;
+import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.StepTranslationContext;
 import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TransformTranslator;
 import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TranslationContext;
 import org.apache.beam.sdk.io.FileBasedSource;
@@ -60,13 +61,13 @@ public class ReadTranslator implements TransformTranslator<Read.Bounded<?>> {
         }
       }
 
-      context.addStep(transform, "ParallelRead");
-      context.addInput(PropertyNames.FORMAT, PropertyNames.CUSTOM_SOURCE_FORMAT);
-      context.addInput(
+      StepTranslationContext stepContext = context.addStep(transform, "ParallelRead");
+      stepContext.addInput(PropertyNames.FORMAT, PropertyNames.CUSTOM_SOURCE_FORMAT);
+      stepContext.addInput(
           PropertyNames.SOURCE_STEP_INPUT,
           cloudSourceToDictionary(
               CustomSources.serializeToCloudSource(source, context.getPipelineOptions())));
-      context.addValueOnlyOutput(context.getOutput(transform));
+      stepContext.addValueOnlyOutput(context.getOutput(transform));
     } catch (Exception e) {
       throw new RuntimeException(e);
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/f04537cc/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index 21d575a..a19fd8c 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -61,6 +61,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.regex.Pattern;
+import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.StepTranslationContext;
 import org.apache.beam.runners.dataflow.DataflowRunner.BatchViewAsList;
 import org.apache.beam.runners.dataflow.DataflowRunner.BatchViewAsMap;
 import org.apache.beam.runners.dataflow.DataflowRunner.BatchViewAsMultimap;
@@ -998,8 +999,8 @@ public class DataflowRunnerTest {
 
             // Note: This is about the minimum needed to fake out a
             // translation. This obviously isn't a real translation.
-            context.addStep(transform, "TestTranslate");
-            context.addOutput(context.getOutput(transform));
+            StepTranslationContext stepContext = context.addStep(transform, "TestTranslate");
+            stepContext.addOutput(context.getOutput(transform));
           }
         });
 


[2/5] beam git commit: Move some pieces of Dataflow translator to top level

Posted by ke...@apache.org.
Move some pieces of Dataflow translator to top level


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5d2cb3e2
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5d2cb3e2
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5d2cb3e2

Branch: refs/heads/master
Commit: 5d2cb3e2310dbf7046785e9e8f6403b854b2dd03
Parents: f04537c
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Jan 5 16:51:23 2017 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Jan 6 11:36:51 2017 -0800

----------------------------------------------------------------------
 .../dataflow/DataflowPipelineTranslator.java    | 134 +------------------
 .../beam/runners/dataflow/DataflowRunner.java   |   8 +-
 .../runners/dataflow/TransformTranslator.java   | 123 +++++++++++++++++
 .../dataflow/internal/ReadTranslator.java       |   7 +-
 .../DataflowPipelineTranslatorTest.java         |   3 +-
 .../runners/dataflow/DataflowRunnerTest.java    |   5 +-
 6 files changed, 137 insertions(+), 143 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/5d2cb3e2/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index 2385fa1..e9cf6f4 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -56,6 +56,8 @@ import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.dataflow.DataflowRunner.GroupByKeyAndSortValuesOnly;
+import org.apache.beam.runners.dataflow.TransformTranslator.StepTranslationContext;
+import org.apache.beam.runners.dataflow.TransformTranslator.TranslationContext;
 import org.apache.beam.runners.dataflow.internal.ReadTranslator;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.runners.dataflow.util.DoFnInfo;
@@ -69,6 +71,7 @@ import org.apache.beam.sdk.options.StreamingOptions;
 import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Combine.GroupedValues;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
@@ -80,6 +83,7 @@ import org.apache.beam.sdk.transforms.display.HasDisplayData;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
 import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.Window.Bound;
 import org.apache.beam.sdk.util.AppliedCombineFn;
 import org.apache.beam.sdk.util.CloudObject;
 import org.apache.beam.sdk.util.PropertyNames;
@@ -212,130 +216,6 @@ public class DataflowPipelineTranslator {
     return transformTranslators.get(transformClass);
   }
 
-  /**
-   * A {@link TransformTranslator} knows how to translate a particular subclass of {@link
-   * PTransform} for the Cloud Dataflow service. It does so by mutating the {@link
-   * TranslationContext}.
-   */
-  public interface TransformTranslator<TransformT extends PTransform> {
-    void translate(TransformT transform, TranslationContext context);
-  }
-
-  /**
-   * The interface provided to registered callbacks for interacting
-   * with the {@link DataflowRunner}, including reading and writing the
-   * values of {@link PCollection}s and side inputs ({@link PCollectionView}s).
-   */
-  public interface TranslationContext {
-    /**
-     * Returns the configured pipeline options.
-     */
-    DataflowPipelineOptions getPipelineOptions();
-
-    /**
-     * Returns the input of the currently being translated transform.
-     */
-    <InputT extends PInput> InputT getInput(PTransform<InputT, ?> transform);
-
-    /**
-     * Returns the output of the currently being translated transform.
-     */
-    <OutputT extends POutput> OutputT getOutput(PTransform<?, OutputT> transform);
-
-    /**
-     * Returns the full name of the currently being translated transform.
-     */
-    String getFullName(PTransform<?, ?> transform);
-
-    /**
-     * Adds a step to the Dataflow workflow for the given transform, with
-     * the given Dataflow step type.
-     */
-    StepTranslationContext addStep(PTransform<?, ?> transform, String type);
-
-    /**
-     * Adds a pre-defined step to the Dataflow workflow. The given PTransform should be
-     * consistent with the Step, in terms of input, output and coder types.
-     *
-     * <p>This is a low-level operation, when using this method it is up to
-     * the caller to ensure that names do not collide.
-     */
-    Step addStep(PTransform<?, ? extends PValue> transform, Step step);
-    /**
-     * Encode a PValue reference as an output reference.
-     */
-    OutputReference asOutputReference(PValue value);
-  }
-
-  public interface StepTranslationContext {
-    /**
-     * Sets the encoding for the current Dataflow step.
-     */
-    void addEncodingInput(Coder<?> value);
-
-    /**
-     * Adds an input with the given name and value to the current
-     * Dataflow step.
-     */
-    void addInput(String name, Boolean value);
-
-    /**
-     * Adds an input with the given name and value to the current
-     * Dataflow step.
-     */
-    void addInput(String name, String value);
-
-    /**
-     * Adds an input with the given name and value to the current
-     * Dataflow step.
-     */
-    void addInput(String name, Long value);
-
-    /**
-     * Adds an input with the given name to the previously added Dataflow
-     * step, coming from the specified input PValue.
-     */
-    void addInput(String name, PInput value);
-
-    /**
-     * Adds an input that is a dictionary of strings to objects.
-     */
-    void addInput(String name, Map<String, Object> elements);
-
-    /**
-     * Adds an input that is a list of objects.
-     */
-    void addInput(String name, List<? extends Map<String, Object>> elements);
-
-    /**
-     * Adds an output to the previously added Dataflow step,
-     * producing the specified output {@code PValue},
-     * including its {@code Coder} if a {@code TypedPValue}.  If the
-     * {@code PValue} is a {@code PCollection}, wraps its coder inside
-     * a {@code WindowedValueCoder}.  Returns a pipeline level unique id.
-     */
-    long addOutput(PValue value);
-
-    /**
-     * Adds an output to the previously added Dataflow step,
-     * producing the specified output {@code PValue},
-     * including its {@code Coder} if a {@code TypedPValue}.  If the
-     * {@code PValue} is a {@code PCollection}, wraps its coder inside
-     * a {@code ValueOnlyCoder}.  Returns a pipeline level unique id.
-     */
-    long addValueOnlyOutput(PValue value);
-
-    /**
-     * Adds an output to the previously added CollectionToSingleton Dataflow step,
-     * consuming the specified input {@code PValue} and producing the specified output
-     * {@code PValue}.  This step requires special treatment for its
-     * output encoding.  Returns a pipeline level unique id.
-     */
-    long addCollectionToSingletonOutput(PValue inputValue,
-        PValue outputValue);
-  }
-
-
   /////////////////////////////////////////////////////////////////////////////
 
   /**
@@ -838,11 +718,11 @@ public class DataflowPipelineTranslator {
 
     DataflowPipelineTranslator.registerTransformTranslator(
         Combine.GroupedValues.class,
-        new DataflowPipelineTranslator.TransformTranslator<Combine.GroupedValues>() {
+        new TransformTranslator<GroupedValues>() {
           @Override
           public void translate(
               Combine.GroupedValues transform,
-              DataflowPipelineTranslator.TranslationContext context) {
+              TranslationContext context) {
             translateHelper(transform, context);
           }
 
@@ -1007,7 +887,7 @@ public class DataflowPipelineTranslator {
 
     registerTransformTranslator(
         Window.Bound.class,
-        new DataflowPipelineTranslator.TransformTranslator<Window.Bound>() {
+        new TransformTranslator<Bound>() {
           @Override
           public void translate(
               Window.Bound transform, TranslationContext context) {

http://git-wip-us.apache.org/repos/asf/beam/blob/5d2cb3e2/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index d2c1e66..9da7d24 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -33,7 +33,6 @@ import com.google.api.services.clouddebugger.v2.model.RegisterDebuggeeResponse;
 import com.google.api.services.dataflow.model.DataflowPackage;
 import com.google.api.services.dataflow.model.Job;
 import com.google.api.services.dataflow.model.ListJobsResponse;
-import com.google.api.services.dataflow.model.Step;
 import com.google.api.services.dataflow.model.WorkerPool;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
@@ -73,9 +72,6 @@ import java.util.SortedSet;
 import java.util.TreeSet;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.JobSpecification;
-import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.StepTranslationContext;
-import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TransformTranslator;
-import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TranslationContext;
 import org.apache.beam.runners.dataflow.internal.AssignWindows;
 import org.apache.beam.runners.dataflow.internal.DataflowAggregatorTransforms;
 import org.apache.beam.runners.dataflow.internal.DataflowUnboundedReadFromBoundedSource;
@@ -2315,10 +2311,10 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     }
 
     private static class ReadWithIdsTranslator
-        implements DataflowPipelineTranslator.TransformTranslator<ReadWithIds<?>> {
+        implements TransformTranslator<ReadWithIds<?>> {
       @Override
       public void translate(ReadWithIds<?> transform,
-          DataflowPipelineTranslator.TranslationContext context) {
+          TranslationContext context) {
         ReadTranslator.translateReadHelper(transform.getSource(), transform, context);
       }
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/5d2cb3e2/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java
new file mode 100644
index 0000000..2aa8327
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow;
+
+import com.google.api.services.dataflow.model.Step;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import org.apache.beam.runners.dataflow.util.OutputReference;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PValue;
+
+/**
+ * A {@link TransformTranslator} knows how to translate a particular subclass of {@link PTransform}
+ * for the Cloud Dataflow service. It does so by mutating the {@link TranslationContext}.
+ */
+public interface TransformTranslator<TransformT extends PTransform> {
+  void translate(TransformT transform, TranslationContext context);
+
+  /**
+   * The interface provided to registered callbacks for interacting with the {@link DataflowRunner},
+   * including reading and writing the values of {@link PCollection}s and side inputs.
+   */
+  interface TranslationContext {
+    /** Returns the configured pipeline options. */
+    DataflowPipelineOptions getPipelineOptions();
+
+    /** Returns the input of the currently being translated transform. */
+    <InputT extends PInput> InputT getInput(PTransform<InputT, ?> transform);
+
+    /** Returns the output of the currently being translated transform. */
+    <OutputT extends POutput> OutputT getOutput(PTransform<?, OutputT> transform);
+
+    /** Returns the full name of the currently being translated transform. */
+    String getFullName(PTransform<?, ?> transform);
+
+    /**
+     * Adds a step to the Dataflow workflow for the given transform, with the given Dataflow step
+     * type.
+     */
+    StepTranslationContext addStep(PTransform<?, ?> transform, String type);
+
+    /**
+     * Adds a pre-defined step to the Dataflow workflow. The given PTransform should be consistent
+     * with the Step, in terms of input, output and coder types.
+     *
+     * <p>This is a low-level operation, when using this method it is up to the caller to ensure
+     * that names do not collide.
+     */
+    Step addStep(PTransform<?, ? extends PValue> transform, Step step);
+    /** Encode a PValue reference as an output reference. */
+    OutputReference asOutputReference(PValue value);
+  }
+
+  /** The interface for a {@link TransformTranslator} to build a Dataflow step. */
+  interface StepTranslationContext {
+    /** Sets the encoding for this Dataflow step. */
+    void addEncodingInput(Coder<?> value);
+
+    /** Adds an input with the given name and value to this Dataflow step. */
+    void addInput(String name, Boolean value);
+
+    /** Adds an input with the given name and value to this Dataflow step. */
+    void addInput(String name, String value);
+
+    /** Adds an input with the given name and value to this Dataflow step. */
+    void addInput(String name, Long value);
+
+    /**
+     * Adds an input with the given name to this Dataflow step, coming from the specified input
+     * PValue.
+     */
+    void addInput(String name, PInput value);
+
+    /** Adds an input that is a dictionary of strings to objects. */
+    void addInput(String name, Map<String, Object> elements);
+
+    /** Adds an input that is a list of objects. */
+    void addInput(String name, List<? extends Map<String, Object>> elements);
+
+    /**
+     * Adds an output to this Dataflow step, producing the specified output {@code PValue},
+     * including its {@code Coder} if a {@code TypedPValue}. If the {@code PValue} is a {@code
+     * PCollection}, wraps its coder inside a {@code WindowedValueCoder}. Returns a pipeline level
+     * unique id.
+     */
+    long addOutput(PValue value);
+
+    /**
+     * Adds an output to this Dataflow step, producing the specified output {@code PValue},
+     * including its {@code Coder} if a {@code TypedPValue}. If the {@code PValue} is a {@code
+     * PCollection}, wraps its coder inside a {@code ValueOnlyCoder}. Returns a pipeline level
+     * unique id.
+     */
+    long addValueOnlyOutput(PValue value);
+
+    /**
+     * Adds an output to this {@code CollectionToSingleton} Dataflow step, consuming the specified
+     * input {@code PValue} and producing the specified output {@code PValue}. This step requires
+     * special treatment for its output encoding. Returns a pipeline level unique id.
+     */
+    long addCollectionToSingletonOutput(PValue inputValue, PValue outputValue);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/5d2cb3e2/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java
index 1a5a9a5..a15a2a3 100755
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java
@@ -24,10 +24,7 @@ import static org.apache.beam.sdk.util.Structs.addLong;
 import com.google.api.services.dataflow.model.SourceMetadata;
 import java.util.HashMap;
 import java.util.Map;
-import org.apache.beam.runners.dataflow.DataflowPipelineTranslator;
-import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.StepTranslationContext;
-import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TransformTranslator;
-import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TranslationContext;
+import org.apache.beam.runners.dataflow.TransformTranslator;
 import org.apache.beam.sdk.io.FileBasedSource;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.Source;
@@ -47,7 +44,7 @@ public class ReadTranslator implements TransformTranslator<Read.Bounded<?>> {
 
   public static <T> void translateReadHelper(Source<T> source,
       PTransform<?, ? extends PValue> transform,
-      DataflowPipelineTranslator.TranslationContext context) {
+      TranslationContext context) {
     try {
       // TODO: Move this validation out of translation once IOChannelUtils is portable
       // and can be reconstructed on the worker.

http://git-wip-us.apache.org/repos/asf/beam/blob/5d2cb3e2/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index ab82941..84b585a 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -56,7 +56,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.JobSpecification;
-import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TranslationContext;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
 import org.apache.beam.runners.dataflow.util.OutputReference;
@@ -566,7 +565,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
    * {@link TranslationContext#addStep} and remaps the input port reference.
    */
   private static class EmbeddedTranslator
-      implements DataflowPipelineTranslator.TransformTranslator<EmbeddedTransform> {
+      implements TransformTranslator<EmbeddedTransform> {
     @Override public void translate(EmbeddedTransform transform, TranslationContext context) {
       addObject(transform.step.getProperties(), PropertyNames.PARALLEL_INPUT,
           context.asOutputReference(context.getInput(transform)));

http://git-wip-us.apache.org/repos/asf/beam/blob/5d2cb3e2/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index a19fd8c..4fff1c6 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -61,7 +61,6 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.regex.Pattern;
-import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.StepTranslationContext;
 import org.apache.beam.runners.dataflow.DataflowRunner.BatchViewAsList;
 import org.apache.beam.runners.dataflow.DataflowRunner.BatchViewAsMap;
 import org.apache.beam.runners.dataflow.DataflowRunner.BatchViewAsMultimap;
@@ -989,12 +988,12 @@ public class DataflowRunnerTest {
 
     DataflowPipelineTranslator.registerTransformTranslator(
         TestTransform.class,
-        new DataflowPipelineTranslator.TransformTranslator<TestTransform>() {
+        new TransformTranslator<TestTransform>() {
           @SuppressWarnings("unchecked")
           @Override
           public void translate(
               TestTransform transform,
-              DataflowPipelineTranslator.TranslationContext context) {
+              TranslationContext context) {
             transform.translated = true;
 
             // Note: This is about the minimum needed to fake out a


[5/5] beam git commit: This closes #1678: Refactor Dataflow translator to decouple steps

Posted by ke...@apache.org.
This closes #1678: Refactor Dataflow translator to decouple steps

  Reduce visibility of many Dataflow runner internals
  Move some pieces of Dataflow translator to top level
  Add explicit translation builder for a Step to in Dataflow translator


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a6caa82a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a6caa82a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a6caa82a

Branch: refs/heads/master
Commit: a6caa82a621a0a9c1cd2cd92c26304e9f79546bb
Parents: b4d8702 33907f8
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Jan 6 11:37:07 2017 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Jan 6 11:37:07 2017 -0800

----------------------------------------------------------------------
 .../beam/runners/dataflow/AssignWindows.java    |  89 +++
 .../dataflow/DataflowAggregatorTransforms.java  |  79 +++
 .../dataflow/DataflowMetricUpdateExtractor.java | 109 ++++
 .../runners/dataflow/DataflowPipelineJob.java   |   2 -
 .../dataflow/DataflowPipelineTranslator.java    | 424 ++++++--------
 .../beam/runners/dataflow/DataflowRunner.java   |  68 +--
 .../DataflowUnboundedReadFromBoundedSource.java | 547 +++++++++++++++++++
 .../beam/runners/dataflow/ReadTranslator.java   | 102 ++++
 .../runners/dataflow/TransformTranslator.java   | 123 +++++
 .../dataflow/internal/AssignWindows.java        |  89 ---
 .../internal/DataflowAggregatorTransforms.java  |  79 ---
 .../internal/DataflowMetricUpdateExtractor.java | 109 ----
 .../DataflowUnboundedReadFromBoundedSource.java | 547 -------------------
 .../dataflow/internal/ReadTranslator.java       | 104 ----
 .../dataflow/DataflowPipelineJobTest.java       |   1 -
 .../DataflowPipelineTranslatorTest.java         |   3 +-
 .../runners/dataflow/DataflowRunnerTest.java    |   8 +-
 ...aflowUnboundedReadFromBoundedSourceTest.java |  79 +++
 ...aflowUnboundedReadFromBoundedSourceTest.java |  79 ---
 19 files changed, 1316 insertions(+), 1325 deletions(-)
----------------------------------------------------------------------



[4/5] beam git commit: Reduce visibility of many Dataflow runner internals

Posted by ke...@apache.org.
Reduce visibility of many Dataflow runner internals


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/33907f89
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/33907f89
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/33907f89

Branch: refs/heads/master
Commit: 33907f8908238199b166070bc1e12796af32829a
Parents: 5d2cb3e
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Jan 5 17:15:52 2017 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Jan 6 11:36:51 2017 -0800

----------------------------------------------------------------------
 .../beam/runners/dataflow/AssignWindows.java    |  89 +++
 .../dataflow/DataflowAggregatorTransforms.java  |  79 +++
 .../dataflow/DataflowMetricUpdateExtractor.java | 109 ++++
 .../runners/dataflow/DataflowPipelineJob.java   |   2 -
 .../dataflow/DataflowPipelineTranslator.java    |   3 +-
 .../beam/runners/dataflow/DataflowRunner.java   |   4 -
 .../DataflowUnboundedReadFromBoundedSource.java | 547 +++++++++++++++++++
 .../beam/runners/dataflow/ReadTranslator.java   | 102 ++++
 .../runners/dataflow/TransformTranslator.java   |   2 +-
 .../dataflow/internal/AssignWindows.java        |  89 ---
 .../internal/DataflowAggregatorTransforms.java  |  79 ---
 .../internal/DataflowMetricUpdateExtractor.java | 109 ----
 .../DataflowUnboundedReadFromBoundedSource.java | 547 -------------------
 .../dataflow/internal/ReadTranslator.java       | 102 ----
 .../dataflow/DataflowPipelineJobTest.java       |   1 -
 ...aflowUnboundedReadFromBoundedSourceTest.java |  79 +++
 ...aflowUnboundedReadFromBoundedSourceTest.java |  79 ---
 17 files changed, 1007 insertions(+), 1015 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/33907f89/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/AssignWindows.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/AssignWindows.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/AssignWindows.java
new file mode 100644
index 0000000..880cd26
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/AssignWindows.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * A primitive {@link PTransform} that implements the {@link Window#into(WindowFn)}
+ * {@link PTransform}.
+ *
+ * <p>For an application of {@link Window#into(WindowFn)} that changes the {@link WindowFn}, applies
+ * a primitive {@link PTransform} in the Dataflow service.
+ *
+ * <p>For an application of {@link Window#into(WindowFn)} that does not change the {@link WindowFn},
+ * applies an identity {@link ParDo} and sets the windowing strategy of the output
+ * {@link PCollection}.
+ *
+ * <p>For internal use only.
+ *
+ * @param <T> the type of input element
+ */
+class AssignWindows<T> extends PTransform<PCollection<T>, PCollection<T>> {
+  private final Window.Bound<T> transform;
+
+  /**
+   * Builds an instance of this class from the overriden transform.
+   */
+  @SuppressWarnings("unused") // Used via reflection
+  public AssignWindows(Window.Bound<T> transform) {
+    this.transform = transform;
+  }
+
+  @Override
+  public PCollection<T> expand(PCollection<T> input) {
+    WindowingStrategy<?, ?> outputStrategy =
+        transform.getOutputStrategyInternal(input.getWindowingStrategy());
+    if (transform.getWindowFn() != null) {
+      // If the windowFn changed, we create a primitive, and run the AssignWindows operation here.
+      return PCollection.<T>createPrimitiveOutputInternal(
+                            input.getPipeline(), outputStrategy, input.isBounded());
+    } else {
+      // If the windowFn didn't change, we just run a pass-through transform and then set the
+      // new windowing strategy.
+      return input.apply("Identity", ParDo.of(new DoFn<T, T>() {
+        @ProcessElement
+        public void processElement(ProcessContext c) throws Exception {
+          c.output(c.element());
+        }
+      })).setWindowingStrategyInternal(outputStrategy);
+    }
+  }
+
+  @Override
+  public void validate(PCollection<T> input) {
+    transform.validate(input);
+  }
+
+  @Override
+  protected Coder<?> getDefaultOutputCoder(PCollection<T> input) {
+    return input.getCoder();
+  }
+
+  @Override
+  protected String getKindString() {
+    return "Window.Into()";
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/33907f89/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowAggregatorTransforms.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowAggregatorTransforms.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowAggregatorTransforms.java
new file mode 100755
index 0000000..0198cca
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowAggregatorTransforms.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow;
+
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.PTransform;
+
+/**
+ * A mapping relating {@link Aggregator}s and the {@link PTransform} in which they are used.
+ */
+class DataflowAggregatorTransforms {
+  private final Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorTransforms;
+  private final Multimap<PTransform<?, ?>, AppliedPTransform<?, ?, ?>> transformAppliedTransforms;
+  private final BiMap<AppliedPTransform<?, ?, ?>, String> appliedStepNames;
+
+  public DataflowAggregatorTransforms(
+      Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorTransforms,
+      Map<AppliedPTransform<?, ?, ?>, String> transformStepNames) {
+    this.aggregatorTransforms = aggregatorTransforms;
+    appliedStepNames = HashBiMap.create(transformStepNames);
+
+    transformAppliedTransforms = HashMultimap.create();
+    for (AppliedPTransform<?, ?, ?> appliedTransform : transformStepNames.keySet()) {
+      transformAppliedTransforms.put(appliedTransform.getTransform(), appliedTransform);
+    }
+  }
+
+  /**
+   * Returns true if the provided {@link Aggregator} is used in the constructing {@link Pipeline}.
+   */
+  public boolean contains(Aggregator<?, ?> aggregator) {
+    return aggregatorTransforms.containsKey(aggregator);
+  }
+
+  /**
+   * Gets the step names in which the {@link Aggregator} is used.
+   */
+  public Collection<String> getAggregatorStepNames(Aggregator<?, ?> aggregator) {
+    Collection<String> names = new HashSet<>();
+    Collection<PTransform<?, ?>> transforms = aggregatorTransforms.get(aggregator);
+    for (PTransform<?, ?> transform : transforms) {
+      for (AppliedPTransform<?, ?, ?> applied : transformAppliedTransforms.get(transform)) {
+        names.add(appliedStepNames.get(applied));
+      }
+    }
+    return names;
+  }
+
+  /**
+   * Gets the {@link PTransform} that was assigned the provided step name.
+   */
+  public AppliedPTransform<?, ?, ?> getAppliedTransformForStepName(String stepName) {
+    return appliedStepNames.inverse().get(stepName);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/33907f89/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetricUpdateExtractor.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetricUpdateExtractor.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetricUpdateExtractor.java
new file mode 100755
index 0000000..f725c46
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetricUpdateExtractor.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow;
+
+import com.google.api.services.dataflow.model.MetricStructuredName;
+import com.google.api.services.dataflow.model.MetricUpdate;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.PTransform;
+
+/**
+ * Methods for extracting the values of an {@link Aggregator} from a collection of {@link
+ * MetricUpdate MetricUpdates}.
+ */
+final class DataflowMetricUpdateExtractor {
+  private static final String STEP_NAME_CONTEXT_KEY = "step";
+  private static final String IS_TENTATIVE_KEY = "tentative";
+
+  private DataflowMetricUpdateExtractor() {
+    // Do not instantiate.
+  }
+
+  /**
+   * Extract the values of the provided {@link Aggregator} at each {@link PTransform} it was used in
+   * according to the provided {@link DataflowAggregatorTransforms} from the given list of {@link
+   * MetricUpdate MetricUpdates}.
+   */
+  public static <OutputT> Map<String, OutputT> fromMetricUpdates(Aggregator<?, OutputT> aggregator,
+      DataflowAggregatorTransforms aggregatorTransforms, List<MetricUpdate> metricUpdates) {
+    Map<String, OutputT> results = new HashMap<>();
+    if (metricUpdates == null) {
+      return results;
+    }
+
+    String aggregatorName = aggregator.getName();
+    Collection<String> aggregatorSteps = aggregatorTransforms.getAggregatorStepNames(aggregator);
+
+    for (MetricUpdate metricUpdate : metricUpdates) {
+      MetricStructuredName metricStructuredName = metricUpdate.getName();
+      Map<String, String> context = metricStructuredName.getContext();
+      if (metricStructuredName.getName().equals(aggregatorName) && context != null
+          && aggregatorSteps.contains(context.get(STEP_NAME_CONTEXT_KEY))) {
+        AppliedPTransform<?, ?, ?> transform =
+            aggregatorTransforms.getAppliedTransformForStepName(
+                context.get(STEP_NAME_CONTEXT_KEY));
+        String fullName = transform.getFullName();
+        // Prefer the tentative (fresher) value if it exists.
+        if (Boolean.parseBoolean(context.get(IS_TENTATIVE_KEY)) || !results.containsKey(fullName)) {
+          results.put(fullName, toValue(aggregator, metricUpdate));
+        }
+      }
+    }
+
+    return results;
+
+  }
+
+  private static <OutputT> OutputT toValue(
+      Aggregator<?, OutputT> aggregator, MetricUpdate metricUpdate) {
+    CombineFn<?, ?, OutputT> combineFn = aggregator.getCombineFn();
+    Class<? super OutputT> outputType = combineFn.getOutputType().getRawType();
+
+    if (outputType.equals(Long.class)) {
+      @SuppressWarnings("unchecked")
+      OutputT asLong = (OutputT) Long.valueOf(toNumber(metricUpdate).longValue());
+      return asLong;
+    }
+    if (outputType.equals(Integer.class)) {
+      @SuppressWarnings("unchecked")
+      OutputT asInt = (OutputT) Integer.valueOf(toNumber(metricUpdate).intValue());
+      return asInt;
+    }
+    if (outputType.equals(Double.class)) {
+      @SuppressWarnings("unchecked")
+      OutputT asDouble = (OutputT) Double.valueOf(toNumber(metricUpdate).doubleValue());
+      return asDouble;
+    }
+    throw new UnsupportedOperationException(
+        "Unsupported Output Type " + outputType + " in aggregator " + aggregator);
+  }
+
+  private static Number toNumber(MetricUpdate update) {
+    if (update.getScalar() instanceof Number) {
+      return (Number) update.getScalar();
+    }
+    throw new IllegalArgumentException(
+        "Metric Update " + update + " does not have a numeric scalar");
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/33907f89/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
index 00c88f9..0da7137 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
@@ -35,8 +35,6 @@ import java.net.SocketTimeoutException;
 import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
-import org.apache.beam.runners.dataflow.internal.DataflowAggregatorTransforms;
-import org.apache.beam.runners.dataflow.internal.DataflowMetricUpdateExtractor;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.runners.dataflow.util.MonitoringUtil;
 import org.apache.beam.sdk.AggregatorRetrievalException;

http://git-wip-us.apache.org/repos/asf/beam/blob/33907f89/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index e9cf6f4..8e5901e 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -58,7 +58,6 @@ import javax.annotation.Nullable;
 import org.apache.beam.runners.dataflow.DataflowRunner.GroupByKeyAndSortValuesOnly;
 import org.apache.beam.runners.dataflow.TransformTranslator.StepTranslationContext;
 import org.apache.beam.runners.dataflow.TransformTranslator.TranslationContext;
-import org.apache.beam.runners.dataflow.internal.ReadTranslator;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.runners.dataflow.util.DoFnInfo;
 import org.apache.beam.runners.dataflow.util.OutputReference;
@@ -106,7 +105,7 @@ import org.slf4j.LoggerFactory;
  * into Cloud Dataflow Service API {@link Job}s.
  */
 @SuppressWarnings({"rawtypes", "unchecked"})
-public class DataflowPipelineTranslator {
+class DataflowPipelineTranslator {
   // Must be kept in sync with their internal counterparts.
   private static final Logger LOG = LoggerFactory.getLogger(DataflowPipelineTranslator.class);
   private static final ObjectMapper MAPPER = new ObjectMapper();

http://git-wip-us.apache.org/repos/asf/beam/blob/33907f89/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 9da7d24..9ff856a 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -72,14 +72,10 @@ import java.util.SortedSet;
 import java.util.TreeSet;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.JobSpecification;
-import org.apache.beam.runners.dataflow.internal.AssignWindows;
-import org.apache.beam.runners.dataflow.internal.DataflowAggregatorTransforms;
-import org.apache.beam.runners.dataflow.internal.DataflowUnboundedReadFromBoundedSource;
 import org.apache.beam.runners.dataflow.internal.IsmFormat;
 import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmRecord;
 import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmRecordCoder;
 import org.apache.beam.runners.dataflow.internal.IsmFormat.MetadataKeyCoder;
-import org.apache.beam.runners.dataflow.internal.ReadTranslator;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;

http://git-wip-us.apache.org/repos/asf/beam/blob/33907f89/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowUnboundedReadFromBoundedSource.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowUnboundedReadFromBoundedSource.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowUnboundedReadFromBoundedSource.java
new file mode 100644
index 0000000..cfb5ebc
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowUnboundedReadFromBoundedSource.java
@@ -0,0 +1,547 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.NullableCoder;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.StandardCoder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.NameUtils;
+import org.apache.beam.sdk.util.PropertyNames;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link PTransform} that converts a {@link BoundedSource} as an {@link UnboundedSource}.
+ *
+ * <p>{@link BoundedSource} is read directly without calling {@link BoundedSource#splitIntoBundles},
+ * and element timestamps are propagated. While any elements remain, the watermark is the beginning
+ * of time {@link BoundedWindow#TIMESTAMP_MIN_VALUE}, and after all elements have been produced
+ * the watermark goes to the end of time {@link BoundedWindow#TIMESTAMP_MAX_VALUE}.
+ *
+ * <p>Checkpoints are created by calling {@link BoundedReader#splitAtFraction} on inner
+ * {@link BoundedSource}.
+ * Sources that cannot be split are read entirely into memory, so this transform does not work well
+ * with large, unsplittable sources.
+ *
+ * <p>This transform is intended to be used by a runner during pipeline translation to convert
+ * a Read.Bounded into a Read.Unbounded.
+ *
+ * @deprecated This class is copied from beam runners core in order to avoid pipeline construction
+ * time dependency. It should be replaced in the dataflow worker as an execution time dependency.
+ */
+@Deprecated
+class DataflowUnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PCollection<T>> {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(DataflowUnboundedReadFromBoundedSource.class);
+
+  private final BoundedSource<T> source;
+
+  /**
+   * Constructs a {@link PTransform} that performs an unbounded read from a {@link BoundedSource}.
+   */
+  public DataflowUnboundedReadFromBoundedSource(BoundedSource<T> source) {
+    this.source = source;
+  }
+
+  @Override
+  public PCollection<T> expand(PBegin input) {
+    return input.getPipeline().apply(
+        Read.from(new BoundedToUnboundedSourceAdapter<>(source)));
+  }
+
+  @Override
+  protected Coder<T> getDefaultOutputCoder() {
+    return source.getDefaultOutputCoder();
+  }
+
+  @Override
+  public String getKindString() {
+    return String.format("Read(%s)", NameUtils.approximateSimpleName(source, "AnonymousSource"));
+  }
+
+  @Override
+  public void populateDisplayData(DisplayData.Builder builder) {
+    // We explicitly do not register base-class data, instead we use the delegate inner source.
+    builder
+        .add(DisplayData.item("source", source.getClass()))
+        .include("source", source);
+  }
+
+  /**
+   * A {@code BoundedSource} to {@code UnboundedSource} adapter.
+   */
+  @VisibleForTesting
+  public static class BoundedToUnboundedSourceAdapter<T>
+      extends UnboundedSource<T, BoundedToUnboundedSourceAdapter.Checkpoint<T>> {
+
+    private BoundedSource<T> boundedSource;
+
+    public BoundedToUnboundedSourceAdapter(BoundedSource<T> boundedSource) {
+      this.boundedSource = boundedSource;
+    }
+
+    @Override
+    public void validate() {
+      boundedSource.validate();
+    }
+
+    @Override
+    public List<BoundedToUnboundedSourceAdapter<T>> generateInitialSplits(
+        int desiredNumSplits, PipelineOptions options) throws Exception {
+      try {
+        long desiredBundleSize = boundedSource.getEstimatedSizeBytes(options) / desiredNumSplits;
+        if (desiredBundleSize <= 0) {
+          LOG.warn("BoundedSource {} cannot estimate its size, skips the initial splits.",
+              boundedSource);
+          return ImmutableList.of(this);
+        }
+        List<? extends BoundedSource<T>> splits =
+            boundedSource.splitIntoBundles(desiredBundleSize, options);
+        if (splits == null) {
+          LOG.warn("BoundedSource cannot split {}, skips the initial splits.", boundedSource);
+          return ImmutableList.of(this);
+        }
+        return Lists.transform(
+            splits,
+            new Function<BoundedSource<T>, BoundedToUnboundedSourceAdapter<T>>() {
+              @Override
+              public BoundedToUnboundedSourceAdapter<T> apply(BoundedSource<T> input) {
+                return new BoundedToUnboundedSourceAdapter<>(input);
+              }});
+      } catch (Exception e) {
+        LOG.warn("Exception while splitting {}, skips the initial splits.", boundedSource, e);
+        return ImmutableList.of(this);
+      }
+    }
+
+    @Override
+    public Reader createReader(PipelineOptions options, Checkpoint<T> checkpoint)
+        throws IOException {
+      if (checkpoint == null) {
+        return new Reader(null /* residualElements */, boundedSource, options);
+      } else {
+        return new Reader(checkpoint.residualElements, checkpoint.residualSource, options);
+      }
+    }
+
+    @Override
+    public Coder<T> getDefaultOutputCoder() {
+      return boundedSource.getDefaultOutputCoder();
+    }
+
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    @Override
+    public Coder<Checkpoint<T>> getCheckpointMarkCoder() {
+      return new CheckpointCoder<>(boundedSource.getDefaultOutputCoder());
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder.add(DisplayData.item("source", boundedSource.getClass()));
+      builder.include("source", boundedSource);
+    }
+
+    @VisibleForTesting
+    static class Checkpoint<T> implements UnboundedSource.CheckpointMark {
+      private final @Nullable List<TimestampedValue<T>> residualElements;
+      private final @Nullable BoundedSource<T> residualSource;
+
+      public Checkpoint(
+          @Nullable List<TimestampedValue<T>> residualElements,
+          @Nullable BoundedSource<T> residualSource) {
+        this.residualElements = residualElements;
+        this.residualSource = residualSource;
+      }
+
+      @Override
+      public void finalizeCheckpoint() {}
+
+      @VisibleForTesting
+      @Nullable List<TimestampedValue<T>> getResidualElements() {
+        return residualElements;
+      }
+
+      @VisibleForTesting
+      @Nullable BoundedSource<T> getResidualSource() {
+        return residualSource;
+      }
+    }
+
+    @VisibleForTesting
+    static class CheckpointCoder<T> extends StandardCoder<Checkpoint<T>> {
+
+      @JsonCreator
+      public static CheckpointCoder<?> of(
+          @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
+          List<Coder<?>> components) {
+        checkArgument(components.size() == 1,
+            "Expecting 1 components, got %s", components.size());
+        return new CheckpointCoder<>(components.get(0));
+      }
+
+      // The coder for a list of residual elements and their timestamps
+      private final Coder<List<TimestampedValue<T>>> elemsCoder;
+      // The coder from the BoundedReader for coding each element
+      private final Coder<T> elemCoder;
+      // The nullable and serializable coder for the BoundedSource.
+      @SuppressWarnings("rawtypes")
+      private final Coder<BoundedSource> sourceCoder;
+
+      CheckpointCoder(Coder<T> elemCoder) {
+        this.elemsCoder = NullableCoder.of(
+            ListCoder.of(TimestampedValue.TimestampedValueCoder.of(elemCoder)));
+        this.elemCoder = elemCoder;
+        this.sourceCoder = NullableCoder.of(SerializableCoder.of(BoundedSource.class));
+      }
+
+      @Override
+      public void encode(Checkpoint<T> value, OutputStream outStream, Context context)
+          throws CoderException, IOException {
+        elemsCoder.encode(value.residualElements, outStream, context.nested());
+        sourceCoder.encode(value.residualSource, outStream, context);
+      }
+
+      @SuppressWarnings("unchecked")
+      @Override
+      public Checkpoint<T> decode(InputStream inStream, Context context)
+          throws CoderException, IOException {
+        return new Checkpoint<>(
+            elemsCoder.decode(inStream, context.nested()),
+            sourceCoder.decode(inStream, context));
+      }
+
+      @Override
+      public List<Coder<?>> getCoderArguments() {
+        return Arrays.<Coder<?>>asList(elemCoder);
+      }
+
+      @Override
+      public void verifyDeterministic() throws NonDeterministicException {
+        throw new NonDeterministicException(this,
+            "CheckpointCoder uses Java Serialization, which may be non-deterministic.");
+      }
+    }
+
+    /**
+     * An {@code UnboundedReader<T>} that wraps a {@code BoundedSource<T>} into
+     * {@link ResidualElements} and {@link ResidualSource}.
+     *
+     * <p>In the initial state, {@link ResidualElements} is null and {@link ResidualSource} contains
+     * the {@code BoundedSource<T>}. After the first checkpoint, the {@code BoundedSource<T>} will
+     * be split into {@link ResidualElements} and {@link ResidualSource}.
+     */
+    @VisibleForTesting
+    class Reader extends UnboundedReader<T> {
+      private ResidualElements residualElements;
+      private @Nullable ResidualSource residualSource;
+      private final PipelineOptions options;
+      private boolean done;
+
+      Reader(
+          @Nullable List<TimestampedValue<T>> residualElementsList,
+          @Nullable BoundedSource<T> residualSource,
+          PipelineOptions options) {
+        init(residualElementsList, residualSource, options);
+        this.options = checkNotNull(options, "options");
+        this.done = false;
+      }
+
+      private void init(
+          @Nullable List<TimestampedValue<T>> residualElementsList,
+          @Nullable BoundedSource<T> residualSource,
+          PipelineOptions options) {
+        this.residualElements = residualElementsList == null
+            ? new ResidualElements(Collections.<TimestampedValue<T>>emptyList())
+                : new ResidualElements(residualElementsList);
+        this.residualSource =
+            residualSource == null ? null : new ResidualSource(residualSource, options);
+      }
+
+      @Override
+      public boolean start() throws IOException {
+        return advance();
+      }
+
+      @Override
+      public boolean advance() throws IOException {
+        if (residualElements.advance()) {
+          return true;
+        } else if (residualSource != null && residualSource.advance()) {
+          return true;
+        } else {
+          done = true;
+          return false;
+        }
+      }
+
+      @Override
+      public void close() throws IOException {
+        if (residualSource != null) {
+          residualSource.close();
+        }
+      }
+
+      @Override
+      public T getCurrent() throws NoSuchElementException {
+        if (residualElements.hasCurrent()) {
+          return residualElements.getCurrent();
+        } else if (residualSource != null) {
+          return residualSource.getCurrent();
+        } else {
+          throw new NoSuchElementException();
+        }
+      }
+
+      @Override
+      public Instant getCurrentTimestamp() throws NoSuchElementException {
+        if (residualElements.hasCurrent()) {
+          return residualElements.getCurrentTimestamp();
+        } else if (residualSource != null) {
+          return residualSource.getCurrentTimestamp();
+        } else {
+          throw new NoSuchElementException();
+        }
+      }
+
+      @Override
+      public Instant getWatermark() {
+        return done ? BoundedWindow.TIMESTAMP_MAX_VALUE : BoundedWindow.TIMESTAMP_MIN_VALUE;
+      }
+
+      /**
+       * {@inheritDoc}
+       *
+       * <p>If only part of the {@link ResidualElements} is consumed, the new
+       * checkpoint will contain the remaining elements in {@link ResidualElements} and
+       * the {@link ResidualSource}.
+       *
+       * <p>If all {@link ResidualElements} and part of the
+       * {@link ResidualSource} are consumed, the new checkpoint is done by splitting
+       * {@link ResidualSource} into new {@link ResidualElements} and {@link ResidualSource}.
+       * {@link ResidualSource} is the source split from the current source,
+       * and {@link ResidualElements} contains rest elements from the current source after
+       * the splitting. For unsplittable source, it will put all remaining elements into
+       * the {@link ResidualElements}.
+       */
+      @Override
+      public Checkpoint<T> getCheckpointMark() {
+        Checkpoint<T> newCheckpoint;
+        if (!residualElements.done()) {
+          // Part of residualElements are consumed.
+          // Checkpoints the remaining elements and residualSource.
+          newCheckpoint = new Checkpoint<>(
+              residualElements.getRestElements(),
+              residualSource == null ? null : residualSource.getSource());
+        } else if (residualSource != null) {
+          newCheckpoint = residualSource.getCheckpointMark();
+        } else {
+          newCheckpoint = new Checkpoint<>(null /* residualElements */, null /* residualSource */);
+        }
+        // Re-initialize since the residualElements and the residualSource might be
+        // consumed or split by checkpointing.
+        init(newCheckpoint.residualElements, newCheckpoint.residualSource, options);
+        return newCheckpoint;
+      }
+
+      @Override
+      public BoundedToUnboundedSourceAdapter<T> getCurrentSource() {
+        return BoundedToUnboundedSourceAdapter.this;
+      }
+    }
+
+    private class ResidualElements {
+      private final List<TimestampedValue<T>> elementsList;
+      private @Nullable Iterator<TimestampedValue<T>> elementsIterator;
+      private @Nullable TimestampedValue<T> currentT;
+      private boolean hasCurrent;
+      private boolean done;
+
+      ResidualElements(List<TimestampedValue<T>> residualElementsList) {
+        this.elementsList = checkNotNull(residualElementsList, "residualElementsList");
+        this.elementsIterator = null;
+        this.currentT = null;
+        this.hasCurrent = false;
+        this.done = false;
+      }
+
+      public boolean advance() {
+        if (elementsIterator == null) {
+          elementsIterator = elementsList.iterator();
+        }
+        if (elementsIterator.hasNext()) {
+          currentT = elementsIterator.next();
+          hasCurrent = true;
+          return true;
+        } else {
+          done = true;
+          hasCurrent = false;
+          return false;
+        }
+      }
+
+      boolean hasCurrent() {
+        return hasCurrent;
+      }
+
+      boolean done() {
+        return done;
+      }
+
+      TimestampedValue<T> getCurrentTimestampedValue() {
+        if (!hasCurrent) {
+          throw new NoSuchElementException();
+        }
+        return currentT;
+      }
+
+      T getCurrent() {
+        return getCurrentTimestampedValue().getValue();
+      }
+
+      Instant getCurrentTimestamp() {
+        return getCurrentTimestampedValue().getTimestamp();
+      }
+
+      List<TimestampedValue<T>> getRestElements() {
+        if (elementsIterator == null) {
+          return elementsList;
+        } else {
+          List<TimestampedValue<T>> newResidualElements = Lists.newArrayList();
+          while (elementsIterator.hasNext()) {
+            newResidualElements.add(elementsIterator.next());
+          }
+          return newResidualElements;
+        }
+      }
+    }
+
+    private class ResidualSource {
+      private BoundedSource<T> residualSource;
+      private PipelineOptions options;
+      private @Nullable BoundedReader<T> reader;
+      private boolean closed;
+
+      public ResidualSource(BoundedSource<T> residualSource, PipelineOptions options) {
+        this.residualSource = checkNotNull(residualSource, "residualSource");
+        this.options = checkNotNull(options, "options");
+        this.reader = null;
+        this.closed = false;
+      }
+
+      private boolean advance() throws IOException {
+        if (reader == null && !closed) {
+          reader = residualSource.createReader(options);
+          return reader.start();
+        } else {
+          return reader.advance();
+        }
+      }
+
+      T getCurrent() throws NoSuchElementException {
+        if (reader == null) {
+          throw new NoSuchElementException();
+        }
+        return reader.getCurrent();
+      }
+
+      Instant getCurrentTimestamp() throws NoSuchElementException {
+        if (reader == null) {
+          throw new NoSuchElementException();
+        }
+        return reader.getCurrentTimestamp();
+      }
+
+      void close() throws IOException {
+        if (reader != null) {
+          reader.close();
+          reader = null;
+        }
+        closed = true;
+      }
+
+      BoundedSource<T> getSource() {
+        return residualSource;
+      }
+
+      Checkpoint<T> getCheckpointMark() {
+        if (reader == null) {
+          // Reader hasn't started, checkpoint the residualSource.
+          return new Checkpoint<>(null /* residualElements */, residualSource);
+        } else {
+          // Part of residualSource are consumed.
+          // Splits the residualSource and tracks the new residualElements in current source.
+          BoundedSource<T> residualSplit = null;
+          Double fractionConsumed = reader.getFractionConsumed();
+          if (fractionConsumed != null && 0 <= fractionConsumed && fractionConsumed <= 1) {
+            double fractionRest = 1 - fractionConsumed;
+            int splitAttempts = 8;
+            for (int i = 0; i < 8 && residualSplit == null; ++i) {
+              double fractionToSplit = fractionConsumed + fractionRest * i / splitAttempts;
+              residualSplit = reader.splitAtFraction(fractionToSplit);
+            }
+          }
+          List<TimestampedValue<T>> newResidualElements = Lists.newArrayList();
+          try {
+            while (advance()) {
+              newResidualElements.add(
+                  TimestampedValue.of(reader.getCurrent(), reader.getCurrentTimestamp()));
+            }
+          } catch (IOException e) {
+            throw new RuntimeException("Failed to read elements from the bounded reader.", e);
+          }
+          return new Checkpoint<>(newResidualElements, residualSplit);
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/33907f89/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java
new file mode 100755
index 0000000..ed03b53
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow;
+
+import static org.apache.beam.sdk.util.Structs.addBoolean;
+import static org.apache.beam.sdk.util.Structs.addDictionary;
+import static org.apache.beam.sdk.util.Structs.addLong;
+
+import com.google.api.services.dataflow.model.SourceMetadata;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.runners.dataflow.internal.CustomSources;
+import org.apache.beam.sdk.io.FileBasedSource;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.Source;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.PropertyNames;
+import org.apache.beam.sdk.values.PValue;
+
+/**
+ * Translator for the {@code Read} {@code PTransform} for the Dataflow back-end.
+ */
+class ReadTranslator implements TransformTranslator<Read.Bounded<?>> {
+  @Override
+  public void translate(Read.Bounded<?> transform, TranslationContext context) {
+    translateReadHelper(transform.getSource(), transform, context);
+  }
+
+  public static <T> void translateReadHelper(Source<T> source,
+      PTransform<?, ? extends PValue> transform,
+      TranslationContext context) {
+    try {
+      // TODO: Move this validation out of translation once IOChannelUtils is portable
+      // and can be reconstructed on the worker.
+      if (source instanceof FileBasedSource) {
+        ValueProvider<String> filePatternOrSpec =
+            ((FileBasedSource<?>) source).getFileOrPatternSpecProvider();
+        if (filePatternOrSpec.isAccessible()) {
+          context.getPipelineOptions()
+              .getPathValidator()
+              .validateInputFilePatternSupported(filePatternOrSpec.get());
+        }
+      }
+
+      StepTranslationContext stepContext = context.addStep(transform, "ParallelRead");
+      stepContext.addInput(PropertyNames.FORMAT, PropertyNames.CUSTOM_SOURCE_FORMAT);
+      stepContext.addInput(
+          PropertyNames.SOURCE_STEP_INPUT,
+          cloudSourceToDictionary(
+              CustomSources.serializeToCloudSource(source, context.getPipelineOptions())));
+      stepContext.addValueOnlyOutput(context.getOutput(transform));
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  // Represents a cloud Source as a dictionary for encoding inside the {@code SOURCE_STEP_INPUT}
+  // property of CloudWorkflowStep.input.
+  private static Map<String, Object> cloudSourceToDictionary(
+      com.google.api.services.dataflow.model.Source source) {
+    // Do not translate encoding - the source's encoding is translated elsewhere
+    // to the step's output info.
+    Map<String, Object> res = new HashMap<>();
+    addDictionary(res, PropertyNames.SOURCE_SPEC, source.getSpec());
+    if (source.getMetadata() != null) {
+      addDictionary(res, PropertyNames.SOURCE_METADATA,
+          cloudSourceMetadataToDictionary(source.getMetadata()));
+    }
+    if (source.getDoesNotNeedSplitting() != null) {
+      addBoolean(
+          res, PropertyNames.SOURCE_DOES_NOT_NEED_SPLITTING, source.getDoesNotNeedSplitting());
+    }
+    return res;
+  }
+
+  private static Map<String, Object> cloudSourceMetadataToDictionary(SourceMetadata metadata) {
+    Map<String, Object> res = new HashMap<>();
+    if (metadata.getEstimatedSizeBytes() != null) {
+      addLong(res, PropertyNames.SOURCE_ESTIMATED_SIZE_BYTES, metadata.getEstimatedSizeBytes());
+    }
+    if (metadata.getInfinite() != null) {
+      addBoolean(res, PropertyNames.SOURCE_IS_INFINITE, metadata.getInfinite());
+    }
+    return res;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/33907f89/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java
index 2aa8327..fb883a7 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java
@@ -33,7 +33,7 @@ import org.apache.beam.sdk.values.PValue;
  * A {@link TransformTranslator} knows how to translate a particular subclass of {@link PTransform}
  * for the Cloud Dataflow service. It does so by mutating the {@link TranslationContext}.
  */
-public interface TransformTranslator<TransformT extends PTransform> {
+interface TransformTranslator<TransformT extends PTransform> {
   void translate(TransformT transform, TranslationContext context);
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/33907f89/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/AssignWindows.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/AssignWindows.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/AssignWindows.java
deleted file mode 100644
index 27fe13d..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/AssignWindows.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.internal;
-
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.values.PCollection;
-
-/**
- * A primitive {@link PTransform} that implements the {@link Window#into(WindowFn)}
- * {@link PTransform}.
- *
- * <p>For an application of {@link Window#into(WindowFn)} that changes the {@link WindowFn}, applies
- * a primitive {@link PTransform} in the Dataflow service.
- *
- * <p>For an application of {@link Window#into(WindowFn)} that does not change the {@link WindowFn},
- * applies an identity {@link ParDo} and sets the windowing strategy of the output
- * {@link PCollection}.
- *
- * <p>For internal use only.
- *
- * @param <T> the type of input element
- */
-public class AssignWindows<T> extends PTransform<PCollection<T>, PCollection<T>> {
-  private final Window.Bound<T> transform;
-
-  /**
-   * Builds an instance of this class from the overriden transform.
-   */
-  @SuppressWarnings("unused") // Used via reflection
-  public AssignWindows(Window.Bound<T> transform) {
-    this.transform = transform;
-  }
-
-  @Override
-  public PCollection<T> expand(PCollection<T> input) {
-    WindowingStrategy<?, ?> outputStrategy =
-        transform.getOutputStrategyInternal(input.getWindowingStrategy());
-    if (transform.getWindowFn() != null) {
-      // If the windowFn changed, we create a primitive, and run the AssignWindows operation here.
-      return PCollection.<T>createPrimitiveOutputInternal(
-                            input.getPipeline(), outputStrategy, input.isBounded());
-    } else {
-      // If the windowFn didn't change, we just run a pass-through transform and then set the
-      // new windowing strategy.
-      return input.apply("Identity", ParDo.of(new DoFn<T, T>() {
-        @ProcessElement
-        public void processElement(ProcessContext c) throws Exception {
-          c.output(c.element());
-        }
-      })).setWindowingStrategyInternal(outputStrategy);
-    }
-  }
-
-  @Override
-  public void validate(PCollection<T> input) {
-    transform.validate(input);
-  }
-
-  @Override
-  protected Coder<?> getDefaultOutputCoder(PCollection<T> input) {
-    return input.getCoder();
-  }
-
-  @Override
-  protected String getKindString() {
-    return "Window.Into()";
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/33907f89/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowAggregatorTransforms.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowAggregatorTransforms.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowAggregatorTransforms.java
deleted file mode 100755
index fb78973..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowAggregatorTransforms.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.internal;
-
-import com.google.common.collect.BiMap;
-import com.google.common.collect.HashBiMap;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Map;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.PTransform;
-
-/**
- * A mapping relating {@link Aggregator}s and the {@link PTransform} in which they are used.
- */
-public class DataflowAggregatorTransforms {
-  private final Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorTransforms;
-  private final Multimap<PTransform<?, ?>, AppliedPTransform<?, ?, ?>> transformAppliedTransforms;
-  private final BiMap<AppliedPTransform<?, ?, ?>, String> appliedStepNames;
-
-  public DataflowAggregatorTransforms(
-      Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorTransforms,
-      Map<AppliedPTransform<?, ?, ?>, String> transformStepNames) {
-    this.aggregatorTransforms = aggregatorTransforms;
-    appliedStepNames = HashBiMap.create(transformStepNames);
-
-    transformAppliedTransforms = HashMultimap.create();
-    for (AppliedPTransform<?, ?, ?> appliedTransform : transformStepNames.keySet()) {
-      transformAppliedTransforms.put(appliedTransform.getTransform(), appliedTransform);
-    }
-  }
-
-  /**
-   * Returns true if the provided {@link Aggregator} is used in the constructing {@link Pipeline}.
-   */
-  public boolean contains(Aggregator<?, ?> aggregator) {
-    return aggregatorTransforms.containsKey(aggregator);
-  }
-
-  /**
-   * Gets the step names in which the {@link Aggregator} is used.
-   */
-  public Collection<String> getAggregatorStepNames(Aggregator<?, ?> aggregator) {
-    Collection<String> names = new HashSet<>();
-    Collection<PTransform<?, ?>> transforms = aggregatorTransforms.get(aggregator);
-    for (PTransform<?, ?> transform : transforms) {
-      for (AppliedPTransform<?, ?, ?> applied : transformAppliedTransforms.get(transform)) {
-        names.add(appliedStepNames.get(applied));
-      }
-    }
-    return names;
-  }
-
-  /**
-   * Gets the {@link PTransform} that was assigned the provided step name.
-   */
-  public AppliedPTransform<?, ?, ?> getAppliedTransformForStepName(String stepName) {
-    return appliedStepNames.inverse().get(stepName);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/33907f89/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowMetricUpdateExtractor.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowMetricUpdateExtractor.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowMetricUpdateExtractor.java
deleted file mode 100755
index d715437..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowMetricUpdateExtractor.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.internal;
-
-import com.google.api.services.dataflow.model.MetricStructuredName;
-import com.google.api.services.dataflow.model.MetricUpdate;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.PTransform;
-
-/**
- * Methods for extracting the values of an {@link Aggregator} from a collection of {@link
- * MetricUpdate MetricUpdates}.
- */
-public final class DataflowMetricUpdateExtractor {
-  private static final String STEP_NAME_CONTEXT_KEY = "step";
-  private static final String IS_TENTATIVE_KEY = "tentative";
-
-  private DataflowMetricUpdateExtractor() {
-    // Do not instantiate.
-  }
-
-  /**
-   * Extract the values of the provided {@link Aggregator} at each {@link PTransform} it was used in
-   * according to the provided {@link DataflowAggregatorTransforms} from the given list of {@link
-   * MetricUpdate MetricUpdates}.
-   */
-  public static <OutputT> Map<String, OutputT> fromMetricUpdates(Aggregator<?, OutputT> aggregator,
-      DataflowAggregatorTransforms aggregatorTransforms, List<MetricUpdate> metricUpdates) {
-    Map<String, OutputT> results = new HashMap<>();
-    if (metricUpdates == null) {
-      return results;
-    }
-
-    String aggregatorName = aggregator.getName();
-    Collection<String> aggregatorSteps = aggregatorTransforms.getAggregatorStepNames(aggregator);
-
-    for (MetricUpdate metricUpdate : metricUpdates) {
-      MetricStructuredName metricStructuredName = metricUpdate.getName();
-      Map<String, String> context = metricStructuredName.getContext();
-      if (metricStructuredName.getName().equals(aggregatorName) && context != null
-          && aggregatorSteps.contains(context.get(STEP_NAME_CONTEXT_KEY))) {
-        AppliedPTransform<?, ?, ?> transform =
-            aggregatorTransforms.getAppliedTransformForStepName(
-                context.get(STEP_NAME_CONTEXT_KEY));
-        String fullName = transform.getFullName();
-        // Prefer the tentative (fresher) value if it exists.
-        if (Boolean.parseBoolean(context.get(IS_TENTATIVE_KEY)) || !results.containsKey(fullName)) {
-          results.put(fullName, toValue(aggregator, metricUpdate));
-        }
-      }
-    }
-
-    return results;
-
-  }
-
-  private static <OutputT> OutputT toValue(
-      Aggregator<?, OutputT> aggregator, MetricUpdate metricUpdate) {
-    CombineFn<?, ?, OutputT> combineFn = aggregator.getCombineFn();
-    Class<? super OutputT> outputType = combineFn.getOutputType().getRawType();
-
-    if (outputType.equals(Long.class)) {
-      @SuppressWarnings("unchecked")
-      OutputT asLong = (OutputT) Long.valueOf(toNumber(metricUpdate).longValue());
-      return asLong;
-    }
-    if (outputType.equals(Integer.class)) {
-      @SuppressWarnings("unchecked")
-      OutputT asInt = (OutputT) Integer.valueOf(toNumber(metricUpdate).intValue());
-      return asInt;
-    }
-    if (outputType.equals(Double.class)) {
-      @SuppressWarnings("unchecked")
-      OutputT asDouble = (OutputT) Double.valueOf(toNumber(metricUpdate).doubleValue());
-      return asDouble;
-    }
-    throw new UnsupportedOperationException(
-        "Unsupported Output Type " + outputType + " in aggregator " + aggregator);
-  }
-
-  private static Number toNumber(MetricUpdate update) {
-    if (update.getScalar() instanceof Number) {
-      return (Number) update.getScalar();
-    }
-    throw new IllegalArgumentException(
-        "Metric Update " + update + " does not have a numeric scalar");
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/33907f89/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java
deleted file mode 100644
index a2ae799..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java
+++ /dev/null
@@ -1,547 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.internal;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.ListCoder;
-import org.apache.beam.sdk.coders.NullableCoder;
-import org.apache.beam.sdk.coders.SerializableCoder;
-import org.apache.beam.sdk.coders.StandardCoder;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
-import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.NameUtils;
-import org.apache.beam.sdk.util.PropertyNames;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TimestampedValue;
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * {@link PTransform} that converts a {@link BoundedSource} as an {@link UnboundedSource}.
- *
- * <p>{@link BoundedSource} is read directly without calling {@link BoundedSource#splitIntoBundles},
- * and element timestamps are propagated. While any elements remain, the watermark is the beginning
- * of time {@link BoundedWindow#TIMESTAMP_MIN_VALUE}, and after all elements have been produced
- * the watermark goes to the end of time {@link BoundedWindow#TIMESTAMP_MAX_VALUE}.
- *
- * <p>Checkpoints are created by calling {@link BoundedReader#splitAtFraction} on inner
- * {@link BoundedSource}.
- * Sources that cannot be split are read entirely into memory, so this transform does not work well
- * with large, unsplittable sources.
- *
- * <p>This transform is intended to be used by a runner during pipeline translation to convert
- * a Read.Bounded into a Read.Unbounded.
- *
- * @deprecated This class is copied from beam runners core in order to avoid pipeline construction
- * time dependency. It should be replaced in the dataflow worker as an execution time dependency.
- */
-@Deprecated
-public class DataflowUnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PCollection<T>> {
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(DataflowUnboundedReadFromBoundedSource.class);
-
-  private final BoundedSource<T> source;
-
-  /**
-   * Constructs a {@link PTransform} that performs an unbounded read from a {@link BoundedSource}.
-   */
-  public DataflowUnboundedReadFromBoundedSource(BoundedSource<T> source) {
-    this.source = source;
-  }
-
-  @Override
-  public PCollection<T> expand(PBegin input) {
-    return input.getPipeline().apply(
-        Read.from(new BoundedToUnboundedSourceAdapter<>(source)));
-  }
-
-  @Override
-  protected Coder<T> getDefaultOutputCoder() {
-    return source.getDefaultOutputCoder();
-  }
-
-  @Override
-  public String getKindString() {
-    return String.format("Read(%s)", NameUtils.approximateSimpleName(source, "AnonymousSource"));
-  }
-
-  @Override
-  public void populateDisplayData(DisplayData.Builder builder) {
-    // We explicitly do not register base-class data, instead we use the delegate inner source.
-    builder
-        .add(DisplayData.item("source", source.getClass()))
-        .include("source", source);
-  }
-
-  /**
-   * A {@code BoundedSource} to {@code UnboundedSource} adapter.
-   */
-  @VisibleForTesting
-  public static class BoundedToUnboundedSourceAdapter<T>
-      extends UnboundedSource<T, BoundedToUnboundedSourceAdapter.Checkpoint<T>> {
-
-    private BoundedSource<T> boundedSource;
-
-    public BoundedToUnboundedSourceAdapter(BoundedSource<T> boundedSource) {
-      this.boundedSource = boundedSource;
-    }
-
-    @Override
-    public void validate() {
-      boundedSource.validate();
-    }
-
-    @Override
-    public List<BoundedToUnboundedSourceAdapter<T>> generateInitialSplits(
-        int desiredNumSplits, PipelineOptions options) throws Exception {
-      try {
-        long desiredBundleSize = boundedSource.getEstimatedSizeBytes(options) / desiredNumSplits;
-        if (desiredBundleSize <= 0) {
-          LOG.warn("BoundedSource {} cannot estimate its size, skips the initial splits.",
-              boundedSource);
-          return ImmutableList.of(this);
-        }
-        List<? extends BoundedSource<T>> splits =
-            boundedSource.splitIntoBundles(desiredBundleSize, options);
-        if (splits == null) {
-          LOG.warn("BoundedSource cannot split {}, skips the initial splits.", boundedSource);
-          return ImmutableList.of(this);
-        }
-        return Lists.transform(
-            splits,
-            new Function<BoundedSource<T>, BoundedToUnboundedSourceAdapter<T>>() {
-              @Override
-              public BoundedToUnboundedSourceAdapter<T> apply(BoundedSource<T> input) {
-                return new BoundedToUnboundedSourceAdapter<>(input);
-              }});
-      } catch (Exception e) {
-        LOG.warn("Exception while splitting {}, skips the initial splits.", boundedSource, e);
-        return ImmutableList.of(this);
-      }
-    }
-
-    @Override
-    public Reader createReader(PipelineOptions options, Checkpoint<T> checkpoint)
-        throws IOException {
-      if (checkpoint == null) {
-        return new Reader(null /* residualElements */, boundedSource, options);
-      } else {
-        return new Reader(checkpoint.residualElements, checkpoint.residualSource, options);
-      }
-    }
-
-    @Override
-    public Coder<T> getDefaultOutputCoder() {
-      return boundedSource.getDefaultOutputCoder();
-    }
-
-    @SuppressWarnings({"rawtypes", "unchecked"})
-    @Override
-    public Coder<Checkpoint<T>> getCheckpointMarkCoder() {
-      return new CheckpointCoder<>(boundedSource.getDefaultOutputCoder());
-    }
-
-    @Override
-    public void populateDisplayData(DisplayData.Builder builder) {
-      super.populateDisplayData(builder);
-      builder.add(DisplayData.item("source", boundedSource.getClass()));
-      builder.include("source", boundedSource);
-    }
-
-    @VisibleForTesting
-    static class Checkpoint<T> implements UnboundedSource.CheckpointMark {
-      private final @Nullable List<TimestampedValue<T>> residualElements;
-      private final @Nullable BoundedSource<T> residualSource;
-
-      public Checkpoint(
-          @Nullable List<TimestampedValue<T>> residualElements,
-          @Nullable BoundedSource<T> residualSource) {
-        this.residualElements = residualElements;
-        this.residualSource = residualSource;
-      }
-
-      @Override
-      public void finalizeCheckpoint() {}
-
-      @VisibleForTesting
-      @Nullable List<TimestampedValue<T>> getResidualElements() {
-        return residualElements;
-      }
-
-      @VisibleForTesting
-      @Nullable BoundedSource<T> getResidualSource() {
-        return residualSource;
-      }
-    }
-
-    @VisibleForTesting
-    static class CheckpointCoder<T> extends StandardCoder<Checkpoint<T>> {
-
-      @JsonCreator
-      public static CheckpointCoder<?> of(
-          @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
-          List<Coder<?>> components) {
-        checkArgument(components.size() == 1,
-            "Expecting 1 components, got %s", components.size());
-        return new CheckpointCoder<>(components.get(0));
-      }
-
-      // The coder for a list of residual elements and their timestamps
-      private final Coder<List<TimestampedValue<T>>> elemsCoder;
-      // The coder from the BoundedReader for coding each element
-      private final Coder<T> elemCoder;
-      // The nullable and serializable coder for the BoundedSource.
-      @SuppressWarnings("rawtypes")
-      private final Coder<BoundedSource> sourceCoder;
-
-      CheckpointCoder(Coder<T> elemCoder) {
-        this.elemsCoder = NullableCoder.of(
-            ListCoder.of(TimestampedValue.TimestampedValueCoder.of(elemCoder)));
-        this.elemCoder = elemCoder;
-        this.sourceCoder = NullableCoder.of(SerializableCoder.of(BoundedSource.class));
-      }
-
-      @Override
-      public void encode(Checkpoint<T> value, OutputStream outStream, Context context)
-          throws CoderException, IOException {
-        elemsCoder.encode(value.residualElements, outStream, context.nested());
-        sourceCoder.encode(value.residualSource, outStream, context);
-      }
-
-      @SuppressWarnings("unchecked")
-      @Override
-      public Checkpoint<T> decode(InputStream inStream, Context context)
-          throws CoderException, IOException {
-        return new Checkpoint<>(
-            elemsCoder.decode(inStream, context.nested()),
-            sourceCoder.decode(inStream, context));
-      }
-
-      @Override
-      public List<Coder<?>> getCoderArguments() {
-        return Arrays.<Coder<?>>asList(elemCoder);
-      }
-
-      @Override
-      public void verifyDeterministic() throws NonDeterministicException {
-        throw new NonDeterministicException(this,
-            "CheckpointCoder uses Java Serialization, which may be non-deterministic.");
-      }
-    }
-
-    /**
-     * An {@code UnboundedReader<T>} that wraps a {@code BoundedSource<T>} into
-     * {@link ResidualElements} and {@link ResidualSource}.
-     *
-     * <p>In the initial state, {@link ResidualElements} is null and {@link ResidualSource} contains
-     * the {@code BoundedSource<T>}. After the first checkpoint, the {@code BoundedSource<T>} will
-     * be split into {@link ResidualElements} and {@link ResidualSource}.
-     */
-    @VisibleForTesting
-    class Reader extends UnboundedReader<T> {
-      private ResidualElements residualElements;
-      private @Nullable ResidualSource residualSource;
-      private final PipelineOptions options;
-      private boolean done;
-
-      Reader(
-          @Nullable List<TimestampedValue<T>> residualElementsList,
-          @Nullable BoundedSource<T> residualSource,
-          PipelineOptions options) {
-        init(residualElementsList, residualSource, options);
-        this.options = checkNotNull(options, "options");
-        this.done = false;
-      }
-
-      private void init(
-          @Nullable List<TimestampedValue<T>> residualElementsList,
-          @Nullable BoundedSource<T> residualSource,
-          PipelineOptions options) {
-        this.residualElements = residualElementsList == null
-            ? new ResidualElements(Collections.<TimestampedValue<T>>emptyList())
-                : new ResidualElements(residualElementsList);
-        this.residualSource =
-            residualSource == null ? null : new ResidualSource(residualSource, options);
-      }
-
-      @Override
-      public boolean start() throws IOException {
-        return advance();
-      }
-
-      @Override
-      public boolean advance() throws IOException {
-        if (residualElements.advance()) {
-          return true;
-        } else if (residualSource != null && residualSource.advance()) {
-          return true;
-        } else {
-          done = true;
-          return false;
-        }
-      }
-
-      @Override
-      public void close() throws IOException {
-        if (residualSource != null) {
-          residualSource.close();
-        }
-      }
-
-      @Override
-      public T getCurrent() throws NoSuchElementException {
-        if (residualElements.hasCurrent()) {
-          return residualElements.getCurrent();
-        } else if (residualSource != null) {
-          return residualSource.getCurrent();
-        } else {
-          throw new NoSuchElementException();
-        }
-      }
-
-      @Override
-      public Instant getCurrentTimestamp() throws NoSuchElementException {
-        if (residualElements.hasCurrent()) {
-          return residualElements.getCurrentTimestamp();
-        } else if (residualSource != null) {
-          return residualSource.getCurrentTimestamp();
-        } else {
-          throw new NoSuchElementException();
-        }
-      }
-
-      @Override
-      public Instant getWatermark() {
-        return done ? BoundedWindow.TIMESTAMP_MAX_VALUE : BoundedWindow.TIMESTAMP_MIN_VALUE;
-      }
-
-      /**
-       * {@inheritDoc}
-       *
-       * <p>If only part of the {@link ResidualElements} is consumed, the new
-       * checkpoint will contain the remaining elements in {@link ResidualElements} and
-       * the {@link ResidualSource}.
-       *
-       * <p>If all {@link ResidualElements} and part of the
-       * {@link ResidualSource} are consumed, the new checkpoint is done by splitting
-       * {@link ResidualSource} into new {@link ResidualElements} and {@link ResidualSource}.
-       * {@link ResidualSource} is the source split from the current source,
-       * and {@link ResidualElements} contains rest elements from the current source after
-       * the splitting. For unsplittable source, it will put all remaining elements into
-       * the {@link ResidualElements}.
-       */
-      @Override
-      public Checkpoint<T> getCheckpointMark() {
-        Checkpoint<T> newCheckpoint;
-        if (!residualElements.done()) {
-          // Part of residualElements are consumed.
-          // Checkpoints the remaining elements and residualSource.
-          newCheckpoint = new Checkpoint<>(
-              residualElements.getRestElements(),
-              residualSource == null ? null : residualSource.getSource());
-        } else if (residualSource != null) {
-          newCheckpoint = residualSource.getCheckpointMark();
-        } else {
-          newCheckpoint = new Checkpoint<>(null /* residualElements */, null /* residualSource */);
-        }
-        // Re-initialize since the residualElements and the residualSource might be
-        // consumed or split by checkpointing.
-        init(newCheckpoint.residualElements, newCheckpoint.residualSource, options);
-        return newCheckpoint;
-      }
-
-      @Override
-      public BoundedToUnboundedSourceAdapter<T> getCurrentSource() {
-        return BoundedToUnboundedSourceAdapter.this;
-      }
-    }
-
-    private class ResidualElements {
-      private final List<TimestampedValue<T>> elementsList;
-      private @Nullable Iterator<TimestampedValue<T>> elementsIterator;
-      private @Nullable TimestampedValue<T> currentT;
-      private boolean hasCurrent;
-      private boolean done;
-
-      ResidualElements(List<TimestampedValue<T>> residualElementsList) {
-        this.elementsList = checkNotNull(residualElementsList, "residualElementsList");
-        this.elementsIterator = null;
-        this.currentT = null;
-        this.hasCurrent = false;
-        this.done = false;
-      }
-
-      public boolean advance() {
-        if (elementsIterator == null) {
-          elementsIterator = elementsList.iterator();
-        }
-        if (elementsIterator.hasNext()) {
-          currentT = elementsIterator.next();
-          hasCurrent = true;
-          return true;
-        } else {
-          done = true;
-          hasCurrent = false;
-          return false;
-        }
-      }
-
-      boolean hasCurrent() {
-        return hasCurrent;
-      }
-
-      boolean done() {
-        return done;
-      }
-
-      TimestampedValue<T> getCurrentTimestampedValue() {
-        if (!hasCurrent) {
-          throw new NoSuchElementException();
-        }
-        return currentT;
-      }
-
-      T getCurrent() {
-        return getCurrentTimestampedValue().getValue();
-      }
-
-      Instant getCurrentTimestamp() {
-        return getCurrentTimestampedValue().getTimestamp();
-      }
-
-      List<TimestampedValue<T>> getRestElements() {
-        if (elementsIterator == null) {
-          return elementsList;
-        } else {
-          List<TimestampedValue<T>> newResidualElements = Lists.newArrayList();
-          while (elementsIterator.hasNext()) {
-            newResidualElements.add(elementsIterator.next());
-          }
-          return newResidualElements;
-        }
-      }
-    }
-
-    private class ResidualSource {
-      private BoundedSource<T> residualSource;
-      private PipelineOptions options;
-      private @Nullable BoundedReader<T> reader;
-      private boolean closed;
-
-      public ResidualSource(BoundedSource<T> residualSource, PipelineOptions options) {
-        this.residualSource = checkNotNull(residualSource, "residualSource");
-        this.options = checkNotNull(options, "options");
-        this.reader = null;
-        this.closed = false;
-      }
-
-      private boolean advance() throws IOException {
-        if (reader == null && !closed) {
-          reader = residualSource.createReader(options);
-          return reader.start();
-        } else {
-          return reader.advance();
-        }
-      }
-
-      T getCurrent() throws NoSuchElementException {
-        if (reader == null) {
-          throw new NoSuchElementException();
-        }
-        return reader.getCurrent();
-      }
-
-      Instant getCurrentTimestamp() throws NoSuchElementException {
-        if (reader == null) {
-          throw new NoSuchElementException();
-        }
-        return reader.getCurrentTimestamp();
-      }
-
-      void close() throws IOException {
-        if (reader != null) {
-          reader.close();
-          reader = null;
-        }
-        closed = true;
-      }
-
-      BoundedSource<T> getSource() {
-        return residualSource;
-      }
-
-      Checkpoint<T> getCheckpointMark() {
-        if (reader == null) {
-          // Reader hasn't started, checkpoint the residualSource.
-          return new Checkpoint<>(null /* residualElements */, residualSource);
-        } else {
-          // Part of residualSource are consumed.
-          // Splits the residualSource and tracks the new residualElements in current source.
-          BoundedSource<T> residualSplit = null;
-          Double fractionConsumed = reader.getFractionConsumed();
-          if (fractionConsumed != null && 0 <= fractionConsumed && fractionConsumed <= 1) {
-            double fractionRest = 1 - fractionConsumed;
-            int splitAttempts = 8;
-            for (int i = 0; i < 8 && residualSplit == null; ++i) {
-              double fractionToSplit = fractionConsumed + fractionRest * i / splitAttempts;
-              residualSplit = reader.splitAtFraction(fractionToSplit);
-            }
-          }
-          List<TimestampedValue<T>> newResidualElements = Lists.newArrayList();
-          try {
-            while (advance()) {
-              newResidualElements.add(
-                  TimestampedValue.of(reader.getCurrent(), reader.getCurrentTimestamp()));
-            }
-          } catch (IOException e) {
-            throw new RuntimeException("Failed to read elements from the bounded reader.", e);
-          }
-          return new Checkpoint<>(newResidualElements, residualSplit);
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/33907f89/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java
deleted file mode 100755
index a15a2a3..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.internal;
-
-import static org.apache.beam.sdk.util.Structs.addBoolean;
-import static org.apache.beam.sdk.util.Structs.addDictionary;
-import static org.apache.beam.sdk.util.Structs.addLong;
-
-import com.google.api.services.dataflow.model.SourceMetadata;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.beam.runners.dataflow.TransformTranslator;
-import org.apache.beam.sdk.io.FileBasedSource;
-import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.io.Source;
-import org.apache.beam.sdk.options.ValueProvider;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.util.PropertyNames;
-import org.apache.beam.sdk.values.PValue;
-
-/**
- * Translator for the {@code Read} {@code PTransform} for the Dataflow back-end.
- */
-public class ReadTranslator implements TransformTranslator<Read.Bounded<?>> {
-  @Override
-  public void translate(Read.Bounded<?> transform, TranslationContext context) {
-    translateReadHelper(transform.getSource(), transform, context);
-  }
-
-  public static <T> void translateReadHelper(Source<T> source,
-      PTransform<?, ? extends PValue> transform,
-      TranslationContext context) {
-    try {
-      // TODO: Move this validation out of translation once IOChannelUtils is portable
-      // and can be reconstructed on the worker.
-      if (source instanceof FileBasedSource) {
-        ValueProvider<String> filePatternOrSpec =
-            ((FileBasedSource<?>) source).getFileOrPatternSpecProvider();
-        if (filePatternOrSpec.isAccessible()) {
-          context.getPipelineOptions()
-              .getPathValidator()
-              .validateInputFilePatternSupported(filePatternOrSpec.get());
-        }
-      }
-
-      StepTranslationContext stepContext = context.addStep(transform, "ParallelRead");
-      stepContext.addInput(PropertyNames.FORMAT, PropertyNames.CUSTOM_SOURCE_FORMAT);
-      stepContext.addInput(
-          PropertyNames.SOURCE_STEP_INPUT,
-          cloudSourceToDictionary(
-              CustomSources.serializeToCloudSource(source, context.getPipelineOptions())));
-      stepContext.addValueOnlyOutput(context.getOutput(transform));
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  // Represents a cloud Source as a dictionary for encoding inside the {@code SOURCE_STEP_INPUT}
-  // property of CloudWorkflowStep.input.
-  private static Map<String, Object> cloudSourceToDictionary(
-      com.google.api.services.dataflow.model.Source source) {
-    // Do not translate encoding - the source's encoding is translated elsewhere
-    // to the step's output info.
-    Map<String, Object> res = new HashMap<>();
-    addDictionary(res, PropertyNames.SOURCE_SPEC, source.getSpec());
-    if (source.getMetadata() != null) {
-      addDictionary(res, PropertyNames.SOURCE_METADATA,
-          cloudSourceMetadataToDictionary(source.getMetadata()));
-    }
-    if (source.getDoesNotNeedSplitting() != null) {
-      addBoolean(
-          res, PropertyNames.SOURCE_DOES_NOT_NEED_SPLITTING, source.getDoesNotNeedSplitting());
-    }
-    return res;
-  }
-
-  private static Map<String, Object> cloudSourceMetadataToDictionary(SourceMetadata metadata) {
-    Map<String, Object> res = new HashMap<>();
-    if (metadata.getEstimatedSizeBytes() != null) {
-      addLong(res, PropertyNames.SOURCE_ESTIMATED_SIZE_BYTES, metadata.getEstimatedSizeBytes());
-    }
-    if (metadata.getInfinite() != null) {
-      addBoolean(res, PropertyNames.SOURCE_IS_INFINITE, metadata.getInfinite());
-    }
-    return res;
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/33907f89/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
index 6999e03..d5d7aa9 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
@@ -54,7 +54,6 @@ import java.math.BigDecimal;
 import java.net.SocketTimeoutException;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
-import org.apache.beam.runners.dataflow.internal.DataflowAggregatorTransforms;
 import org.apache.beam.runners.dataflow.testing.TestDataflowPipelineOptions;
 import org.apache.beam.runners.dataflow.util.MonitoringUtil;
 import org.apache.beam.sdk.AggregatorRetrievalException;


[3/5] beam git commit: Reduce visibility of many Dataflow runner internals

Posted by ke...@apache.org.
http://git-wip-us.apache.org/repos/asf/beam/blob/33907f89/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowUnboundedReadFromBoundedSourceTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowUnboundedReadFromBoundedSourceTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowUnboundedReadFromBoundedSourceTest.java
new file mode 100644
index 0000000..c479332
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowUnboundedReadFromBoundedSourceTest.java
@@ -0,0 +1,79 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.beam.runners.dataflow;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@see DataflowUnboundedReadFromBoundedSource}.
+ */
+@RunWith(JUnit4.class)
+public class DataflowUnboundedReadFromBoundedSourceTest {
+  @Test
+  public void testKind() {
+    DataflowUnboundedReadFromBoundedSource<?> read = new
+        DataflowUnboundedReadFromBoundedSource<>(new NoopNamedSource());
+
+    assertEquals("Read(NoopNamedSource)", read.getKindString());
+  }
+
+  @Test
+  public void testKindAnonymousSource() {
+    NoopNamedSource anonSource = new NoopNamedSource() {};
+    DataflowUnboundedReadFromBoundedSource<?> read = new
+        DataflowUnboundedReadFromBoundedSource<>(anonSource);
+
+    assertEquals("Read(AnonymousSource)", read.getKindString());
+  }
+
+  /** Source implementation only useful for its identity. */
+  static class NoopNamedSource extends BoundedSource<String> {
+    @Override
+    public List<? extends BoundedSource<String>> splitIntoBundles(long desiredBundleSizeBytes,
+        PipelineOptions options) throws Exception {
+      return null;
+    }
+    @Override
+    public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
+      return 0;
+    }
+    @Override
+    public BoundedReader<String> createReader(
+        PipelineOptions options) throws IOException {
+      return null;
+    }
+    @Override
+    public void validate() {
+
+    }
+    @Override
+    public Coder<String> getDefaultOutputCoder() {
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/33907f89/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSourceTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSourceTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSourceTest.java
deleted file mode 100644
index d38428b..0000000
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSourceTest.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.beam.runners.dataflow.internal;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-import java.util.List;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@see DataflowUnboundedReadFromBoundedSource}.
- */
-@RunWith(JUnit4.class)
-public class DataflowUnboundedReadFromBoundedSourceTest {
-  @Test
-  public void testKind() {
-    DataflowUnboundedReadFromBoundedSource<?> read = new
-        DataflowUnboundedReadFromBoundedSource<>(new NoopNamedSource());
-
-    assertEquals("Read(NoopNamedSource)", read.getKindString());
-  }
-
-  @Test
-  public void testKindAnonymousSource() {
-    NoopNamedSource anonSource = new NoopNamedSource() {};
-    DataflowUnboundedReadFromBoundedSource<?> read = new
-        DataflowUnboundedReadFromBoundedSource<>(anonSource);
-
-    assertEquals("Read(AnonymousSource)", read.getKindString());
-  }
-
-  /** Source implementation only useful for its identity. */
-  static class NoopNamedSource extends BoundedSource<String> {
-    @Override
-    public List<? extends BoundedSource<String>> splitIntoBundles(long desiredBundleSizeBytes,
-        PipelineOptions options) throws Exception {
-      return null;
-    }
-    @Override
-    public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
-      return 0;
-    }
-    @Override
-    public BoundedReader<String> createReader(
-        PipelineOptions options) throws IOException {
-      return null;
-    }
-    @Override
-    public void validate() {
-
-    }
-    @Override
-    public Coder<String> getDefaultOutputCoder() {
-      return null;
-    }
-  }
-}