You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/04/27 03:08:47 UTC

[15/21] incubator-beam git commit: Reorganize Java packages in the sources of the Google Cloud Dataflow runner

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRunnerHooks.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRunnerHooks.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRunnerHooks.java
new file mode 100644
index 0000000..4d37966
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRunnerHooks.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow;
+
+import org.apache.beam.sdk.annotations.Experimental;
+
+import com.google.api.services.dataflow.model.Environment;
+
+/**
+ * An instance of this class can be passed to the
+ * {@link DataflowPipelineRunner} to add user defined hooks to be
+ * invoked at various times during pipeline execution.
+ */
+@Experimental
+public class DataflowPipelineRunnerHooks {
+  /**
+   * Allows the user to modify the environment of their job before their job is submitted
+   * to the service for execution.
+   *
+   * @param environment The environment of the job. Users can make change to this instance in order
+   *     to change the environment with which their job executes on the service.
+   */
+  public void modifyEnvironmentBeforeSubmission(Environment environment) {}
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
new file mode 100644
index 0000000..0f2d325
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -0,0 +1,1059 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow;
+
+import static org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray;
+import static org.apache.beam.sdk.util.StringUtils.byteArrayToJsonString;
+import static org.apache.beam.sdk.util.StringUtils.jsonStringToByteArray;
+import static org.apache.beam.sdk.util.Structs.addBoolean;
+import static org.apache.beam.sdk.util.Structs.addDictionary;
+import static org.apache.beam.sdk.util.Structs.addList;
+import static org.apache.beam.sdk.util.Structs.addLong;
+import static org.apache.beam.sdk.util.Structs.addObject;
+import static org.apache.beam.sdk.util.Structs.addString;
+import static org.apache.beam.sdk.util.Structs.getString;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import org.apache.beam.runners.dataflow.DataflowPipelineRunner.GroupByKeyAndSortValuesOnly;
+import org.apache.beam.runners.dataflow.internal.BigQueryIOTranslator;
+import org.apache.beam.runners.dataflow.internal.PubsubIOTranslator;
+import org.apache.beam.runners.dataflow.internal.ReadTranslator;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.Pipeline.PipelineVisitor;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.io.BigQueryIO;
+import org.apache.beam.sdk.io.PubsubIO;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.options.StreamingOptions;
+import org.apache.beam.sdk.runners.TransformTreeNode;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.AppliedCombineFn;
+import org.apache.beam.sdk.util.CloudObject;
+import org.apache.beam.sdk.util.DoFnInfo;
+import org.apache.beam.sdk.util.OutputReference;
+import org.apache.beam.sdk.util.PropertyNames;
+import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TypedPValue;
+
+import com.google.api.services.dataflow.model.AutoscalingSettings;
+import com.google.api.services.dataflow.model.DataflowPackage;
+import com.google.api.services.dataflow.model.Disk;
+import com.google.api.services.dataflow.model.Environment;
+import com.google.api.services.dataflow.model.Job;
+import com.google.api.services.dataflow.model.Step;
+import com.google.api.services.dataflow.model.WorkerPool;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import javax.annotation.Nullable;
+
+/**
+ * {@link DataflowPipelineTranslator} knows how to translate {@link Pipeline} objects
+ * into Cloud Dataflow Service API {@link Job}s.
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class DataflowPipelineTranslator {
+  // Must be kept in sync with their internal counterparts.
+  private static final Logger LOG = LoggerFactory.getLogger(DataflowPipelineTranslator.class);
+  private static final ObjectMapper MAPPER = new ObjectMapper();
+
+  /**
+   * A map from {@link PTransform} subclass to the corresponding
+   * {@link TransformTranslator} to use to translate that transform.
+   *
+   * <p>A static map that contains system-wide defaults.
+   */
+  private static Map<Class, TransformTranslator> transformTranslators =
+      new HashMap<>();
+
+  /** Provided configuration options. */
+  private final DataflowPipelineOptions options;
+
+  /**
+   * Constructs a translator from the provided options.
+   *
+   * @param options Properties that configure the translator.
+   *
+   * @return The newly created translator.
+   */
+  public static DataflowPipelineTranslator fromOptions(
+      DataflowPipelineOptions options) {
+    return new DataflowPipelineTranslator(options);
+  }
+
+  private DataflowPipelineTranslator(DataflowPipelineOptions options) {
+    this.options = options;
+  }
+
+  /**
+   * Translates a {@link Pipeline} into a {@code JobSpecification}.
+   */
+  public JobSpecification translate(
+      Pipeline pipeline,
+      DataflowPipelineRunner runner,
+      List<DataflowPackage> packages) {
+
+    Translator translator = new Translator(pipeline, runner);
+    Job result = translator.translate(packages);
+    return new JobSpecification(result, Collections.unmodifiableMap(translator.stepNames));
+  }
+
+  /**
+   * The result of a job translation.
+   *
+   * <p>Used to pass the result {@link Job} and any state that was used to construct the job that
+   * may be of use to other classes (eg the {@link PTransform} to StepName mapping).
+   */
+  public static class JobSpecification {
+    private final Job job;
+    private final Map<AppliedPTransform<?, ?, ?>, String> stepNames;
+
+    public JobSpecification(Job job, Map<AppliedPTransform<?, ?, ?>, String> stepNames) {
+      this.job = job;
+      this.stepNames = stepNames;
+    }
+
+    public Job getJob() {
+      return job;
+    }
+
+    /**
+     * Returns the mapping of {@link AppliedPTransform AppliedPTransforms} to the internal step
+     * name for that {@code AppliedPTransform}.
+     */
+    public Map<AppliedPTransform<?, ?, ?>, String> getStepNames() {
+      return stepNames;
+    }
+  }
+
+  /**
+   * Renders a {@link Job} as a string.
+   */
+  public static String jobToString(Job job) {
+    try {
+      return MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(job);
+    } catch (JsonProcessingException exc) {
+      throw new IllegalStateException("Failed to render Job as String.", exc);
+    }
+  }
+
+  /////////////////////////////////////////////////////////////////////////////
+
+  /**
+   * Records that instances of the specified PTransform class
+   * should be translated by default by the corresponding
+   * {@link TransformTranslator}.
+   */
+  public static <TransformT extends PTransform> void registerTransformTranslator(
+      Class<TransformT> transformClass,
+      TransformTranslator<? extends TransformT> transformTranslator) {
+    if (transformTranslators.put(transformClass, transformTranslator) != null) {
+      throw new IllegalArgumentException(
+          "defining multiple translators for " + transformClass);
+    }
+  }
+
+  /**
+   * Returns the {@link TransformTranslator} to use for instances of the
+   * specified PTransform class, or null if none registered.
+   */
+  public <TransformT extends PTransform>
+      TransformTranslator<TransformT> getTransformTranslator(Class<TransformT> transformClass) {
+    return transformTranslators.get(transformClass);
+  }
+
+  /**
+   * A {@link TransformTranslator} knows how to translate
+   * a particular subclass of {@link PTransform} for the
+   * Cloud Dataflow service. It does so by
+   * mutating the {@link TranslationContext}.
+   */
+  public interface TransformTranslator<TransformT extends PTransform> {
+    public void translate(TransformT transform,
+                          TranslationContext context);
+  }
+
+  /**
+   * The interface provided to registered callbacks for interacting
+   * with the {@link DataflowPipelineRunner}, including reading and writing the
+   * values of {@link PCollection}s and side inputs ({@link PCollectionView}s).
+   */
+  public interface TranslationContext {
+    /**
+     * Returns the configured pipeline options.
+     */
+    DataflowPipelineOptions getPipelineOptions();
+
+    /**
+     * Returns the input of the currently being translated transform.
+     */
+    <InputT extends PInput> InputT getInput(PTransform<InputT, ?> transform);
+
+    /**
+     * Returns the output of the currently being translated transform.
+     */
+    <OutputT extends POutput> OutputT getOutput(PTransform<?, OutputT> transform);
+
+    /**
+     * Returns the full name of the currently being translated transform.
+     */
+    String getFullName(PTransform<?, ?> transform);
+
+    /**
+     * Adds a step to the Dataflow workflow for the given transform, with
+     * the given Dataflow step type.
+     * This step becomes "current" for the purpose of {@link #addInput} and
+     * {@link #addOutput}.
+     */
+    public void addStep(PTransform<?, ?> transform, String type);
+
+    /**
+     * Adds a pre-defined step to the Dataflow workflow. The given PTransform should be
+     * consistent with the Step, in terms of input, output and coder types.
+     *
+     * <p>This is a low-level operation, when using this method it is up to
+     * the caller to ensure that names do not collide.
+     */
+    public void addStep(PTransform<?, ? extends PValue> transform, Step step);
+
+    /**
+     * Sets the encoding for the current Dataflow step.
+     */
+    public void addEncodingInput(Coder<?> value);
+
+    /**
+     * Adds an input with the given name and value to the current
+     * Dataflow step.
+     */
+    public void addInput(String name, Boolean value);
+
+    /**
+     * Adds an input with the given name and value to the current
+     * Dataflow step.
+     */
+    public void addInput(String name, String value);
+
+    /**
+     * Adds an input with the given name and value to the current
+     * Dataflow step.
+     */
+    public void addInput(String name, Long value);
+
+    /**
+     * Adds an input with the given name to the previously added Dataflow
+     * step, coming from the specified input PValue.
+     */
+    public void addInput(String name, PInput value);
+
+    /**
+     * Adds an input that is a dictionary of strings to objects.
+     */
+    public void addInput(String name, Map<String, Object> elements);
+
+    /**
+     * Adds an input that is a list of objects.
+     */
+    public void addInput(String name, List<? extends Map<String, Object>> elements);
+
+    /**
+     * Adds an output with the given name to the previously added
+     * Dataflow step, producing the specified output {@code PValue},
+     * including its {@code Coder} if a {@code TypedPValue}.  If the
+     * {@code PValue} is a {@code PCollection}, wraps its coder inside
+     * a {@code WindowedValueCoder}.
+     */
+    public void addOutput(String name, PValue value);
+
+    /**
+     * Adds an output with the given name to the previously added
+     * Dataflow step, producing the specified output {@code PValue},
+     * including its {@code Coder} if a {@code TypedPValue}.  If the
+     * {@code PValue} is a {@code PCollection}, wraps its coder inside
+     * a {@code ValueOnlyCoder}.
+     */
+    public void addValueOnlyOutput(String name, PValue value);
+
+    /**
+     * Adds an output with the given name to the previously added
+     * CollectionToSingleton Dataflow step, consuming the specified
+     * input {@code PValue} and producing the specified output
+     * {@code PValue}.  This step requires special treatment for its
+     * output encoding.
+     */
+    public void addCollectionToSingletonOutput(String name,
+                                               PValue inputValue,
+                                               PValue outputValue);
+
+    /**
+     * Encode a PValue reference as an output reference.
+     */
+    public OutputReference asOutputReference(PValue value);
+  }
+
+
+  /////////////////////////////////////////////////////////////////////////////
+
+  /**
+   * Translates a Pipeline into the Dataflow representation.
+   */
+  class Translator implements PipelineVisitor, TranslationContext {
+    /** The Pipeline to translate. */
+    private final Pipeline pipeline;
+
+    /** The runner which will execute the pipeline. */
+    private final DataflowPipelineRunner runner;
+
+    /** The Cloud Dataflow Job representation. */
+    private final Job job = new Job();
+
+    /**
+     * Translator is stateful, as addProperty calls refer to the current step.
+     */
+    private Step currentStep;
+
+    /**
+     * A Map from AppliedPTransform to their unique Dataflow step names.
+     */
+    private final Map<AppliedPTransform<?, ?, ?>, String> stepNames = new HashMap<>();
+
+    /**
+     * A Map from PValues to their output names used by their producer
+     * Dataflow steps.
+     */
+    private final Map<POutput, String> outputNames = new HashMap<>();
+
+    /**
+     * A Map from PValues to the Coders used for them.
+     */
+    private final Map<POutput, Coder<?>> outputCoders = new HashMap<>();
+
+    /**
+     * The transform currently being applied.
+     */
+    private AppliedPTransform<?, ?, ?> currentTransform;
+
+    /**
+     * Constructs a Translator that will translate the specified
+     * Pipeline into Dataflow objects.
+     */
+    public Translator(Pipeline pipeline, DataflowPipelineRunner runner) {
+      this.pipeline = pipeline;
+      this.runner = runner;
+    }
+
+    /**
+     * Translates this Translator's pipeline onto its writer.
+     * @return a Job definition filled in with the type of job, the environment,
+     * and the job steps.
+     */
+    public Job translate(List<DataflowPackage> packages) {
+      job.setName(options.getJobName().toLowerCase());
+
+      Environment environment = new Environment();
+      job.setEnvironment(environment);
+
+      try {
+        environment.setSdkPipelineOptions(
+            MAPPER.readValue(MAPPER.writeValueAsBytes(options), Map.class));
+      } catch (IOException e) {
+        throw new IllegalArgumentException(
+            "PipelineOptions specified failed to serialize to JSON.", e);
+      }
+
+      WorkerPool workerPool = new WorkerPool();
+
+      if (options.getTeardownPolicy() != null) {
+        workerPool.setTeardownPolicy(options.getTeardownPolicy().getTeardownPolicyName());
+      }
+
+      if (options.isStreaming()) {
+        job.setType("JOB_TYPE_STREAMING");
+      } else {
+        job.setType("JOB_TYPE_BATCH");
+        workerPool.setDiskType(options.getWorkerDiskType());
+      }
+
+      if (options.getWorkerMachineType() != null) {
+        workerPool.setMachineType(options.getWorkerMachineType());
+      }
+
+      workerPool.setPackages(packages);
+      workerPool.setNumWorkers(options.getNumWorkers());
+
+      if (options.isStreaming()) {
+        // Use separate data disk for streaming.
+        Disk disk = new Disk();
+        disk.setDiskType(options.getWorkerDiskType());
+        workerPool.setDataDisks(Collections.singletonList(disk));
+      }
+      if (!Strings.isNullOrEmpty(options.getZone())) {
+        workerPool.setZone(options.getZone());
+      }
+      if (!Strings.isNullOrEmpty(options.getNetwork())) {
+        workerPool.setNetwork(options.getNetwork());
+      }
+      if (!Strings.isNullOrEmpty(options.getSubnetwork())) {
+        workerPool.setSubnetwork(options.getSubnetwork());
+      }
+      if (options.getDiskSizeGb() > 0) {
+        workerPool.setDiskSizeGb(options.getDiskSizeGb());
+      }
+      AutoscalingSettings settings = new AutoscalingSettings();
+      if (options.getAutoscalingAlgorithm() != null) {
+        settings.setAlgorithm(options.getAutoscalingAlgorithm().getAlgorithm());
+      }
+      settings.setMaxNumWorkers(options.getMaxNumWorkers());
+      workerPool.setAutoscalingSettings(settings);
+
+      List<WorkerPool> workerPools = new LinkedList<>();
+
+      workerPools.add(workerPool);
+      environment.setWorkerPools(workerPools);
+
+      pipeline.traverseTopologically(this);
+      return job;
+    }
+
+    @Override
+    public DataflowPipelineOptions getPipelineOptions() {
+      return options;
+    }
+
+    @Override
+    public <InputT extends PInput> InputT getInput(PTransform<InputT, ?> transform) {
+      return (InputT) getCurrentTransform(transform).getInput();
+    }
+
+    @Override
+    public <OutputT extends POutput> OutputT getOutput(PTransform<?, OutputT> transform) {
+      return (OutputT) getCurrentTransform(transform).getOutput();
+    }
+
+    @Override
+    public String getFullName(PTransform<?, ?> transform) {
+      return getCurrentTransform(transform).getFullName();
+    }
+
+    private AppliedPTransform<?, ?, ?> getCurrentTransform(PTransform<?, ?> transform) {
+      checkArgument(
+          currentTransform != null && currentTransform.getTransform() == transform,
+          "can only be called with current transform");
+      return currentTransform;
+    }
+
+    @Override
+    public void enterCompositeTransform(TransformTreeNode node) {
+    }
+
+    @Override
+    public void leaveCompositeTransform(TransformTreeNode node) {
+    }
+
+    @Override
+    public void visitTransform(TransformTreeNode node) {
+      PTransform<?, ?> transform = node.getTransform();
+      TransformTranslator translator =
+          getTransformTranslator(transform.getClass());
+      if (translator == null) {
+        throw new IllegalStateException(
+            "no translator registered for " + transform);
+      }
+      LOG.debug("Translating {}", transform);
+      currentTransform = AppliedPTransform.of(
+          node.getFullName(), node.getInput(), node.getOutput(), (PTransform) transform);
+      translator.translate(transform, this);
+      currentTransform = null;
+    }
+
+    @Override
+    public void visitValue(PValue value, TransformTreeNode producer) {
+      LOG.debug("Checking translation of {}", value);
+      if (value.getProducingTransformInternal() == null) {
+        throw new RuntimeException(
+            "internal error: expecting a PValue "
+            + "to have a producingTransform");
+      }
+      if (!producer.isCompositeNode()) {
+        // Primitive transforms are the only ones assigned step names.
+        asOutputReference(value);
+      }
+    }
+
+    @Override
+    public void addStep(PTransform<?, ?> transform, String type) {
+      String stepName = genStepName();
+      if (stepNames.put(getCurrentTransform(transform), stepName) != null) {
+        throw new IllegalArgumentException(
+            transform + " already has a name specified");
+      }
+      // Start the next "steps" list item.
+      List<Step> steps = job.getSteps();
+      if (steps == null) {
+        steps = new LinkedList<>();
+        job.setSteps(steps);
+      }
+
+      currentStep = new Step();
+      currentStep.setName(stepName);
+      currentStep.setKind(type);
+      steps.add(currentStep);
+      addInput(PropertyNames.USER_NAME, getFullName(transform));
+      addDisplayData(PropertyNames.DISPLAY_DATA, DisplayData.from(transform));
+    }
+
+    @Override
+    public void addStep(PTransform<?, ? extends PValue> transform, Step original) {
+      Step step = original.clone();
+      String stepName = step.getName();
+      if (stepNames.put(getCurrentTransform(transform), stepName) != null) {
+        throw new IllegalArgumentException(transform + " already has a name specified");
+      }
+
+      Map<String, Object> properties = step.getProperties();
+      if (properties != null) {
+        @Nullable List<Map<String, Object>> outputInfoList = null;
+        try {
+          // TODO: This should be done via a Structs accessor.
+          @Nullable List<Map<String, Object>> list =
+              (List<Map<String, Object>>) properties.get(PropertyNames.OUTPUT_INFO);
+          outputInfoList = list;
+        } catch (Exception e) {
+          throw new RuntimeException("Inconsistent dataflow pipeline translation", e);
+        }
+        if (outputInfoList != null && outputInfoList.size() > 0) {
+          Map<String, Object> firstOutputPort = outputInfoList.get(0);
+          @Nullable String name;
+          try {
+            name = getString(firstOutputPort, PropertyNames.OUTPUT_NAME);
+          } catch (Exception e) {
+            name = null;
+          }
+          if (name != null) {
+            registerOutputName(getOutput(transform), name);
+          }
+        }
+      }
+
+      List<Step> steps = job.getSteps();
+      if (steps == null) {
+        steps = new LinkedList<>();
+        job.setSteps(steps);
+      }
+      currentStep = step;
+      steps.add(step);
+    }
+
+    @Override
+    public void addEncodingInput(Coder<?> coder) {
+      CloudObject encoding = SerializableUtils.ensureSerializable(coder);
+      addObject(getProperties(), PropertyNames.ENCODING, encoding);
+    }
+
+    @Override
+    public void addInput(String name, Boolean value) {
+      addBoolean(getProperties(), name, value);
+    }
+
+    @Override
+    public void addInput(String name, String value) {
+      addString(getProperties(), name, value);
+    }
+
+    @Override
+    public void addInput(String name, Long value) {
+      addLong(getProperties(), name, value);
+    }
+
+    @Override
+    public void addInput(String name, Map<String, Object> elements) {
+      addDictionary(getProperties(), name, elements);
+    }
+
+    @Override
+    public void addInput(String name, List<? extends Map<String, Object>> elements) {
+      addList(getProperties(), name, elements);
+    }
+
+    @Override
+    public void addInput(String name, PInput value) {
+      if (value instanceof PValue) {
+        addInput(name, asOutputReference((PValue) value));
+      } else {
+        throw new IllegalStateException("Input must be a PValue");
+      }
+    }
+
+    @Override
+    public void addOutput(String name, PValue value) {
+      Coder<?> coder;
+      if (value instanceof TypedPValue) {
+        coder = ((TypedPValue<?>) value).getCoder();
+        if (value instanceof PCollection) {
+          // Wrap the PCollection element Coder inside a WindowedValueCoder.
+          coder = WindowedValue.getFullCoder(
+              coder,
+              ((PCollection<?>) value).getWindowingStrategy().getWindowFn().windowCoder());
+        }
+      } else {
+        // No output coder to encode.
+        coder = null;
+      }
+      addOutput(name, value, coder);
+    }
+
+    @Override
+    public void addValueOnlyOutput(String name, PValue value) {
+      Coder<?> coder;
+      if (value instanceof TypedPValue) {
+        coder = ((TypedPValue<?>) value).getCoder();
+        if (value instanceof PCollection) {
+          // Wrap the PCollection element Coder inside a ValueOnly
+          // WindowedValueCoder.
+          coder = WindowedValue.getValueOnlyCoder(coder);
+        }
+      } else {
+        // No output coder to encode.
+        coder = null;
+      }
+      addOutput(name, value, coder);
+    }
+
+    @Override
+    public void addCollectionToSingletonOutput(String name,
+                                               PValue inputValue,
+                                               PValue outputValue) {
+      Coder<?> inputValueCoder =
+          Preconditions.checkNotNull(outputCoders.get(inputValue));
+      // The inputValueCoder for the input PCollection should be some
+      // WindowedValueCoder of the input PCollection's element
+      // coder.
+      Preconditions.checkState(
+          inputValueCoder instanceof WindowedValue.WindowedValueCoder);
+      // The outputValueCoder for the output should be an
+      // IterableCoder of the inputValueCoder. This is a property
+      // of the backend "CollectionToSingleton" step.
+      Coder<?> outputValueCoder = IterableCoder.of(inputValueCoder);
+      addOutput(name, outputValue, outputValueCoder);
+    }
+
+    /**
+     * Adds an output with the given name to the previously added
+     * Dataflow step, producing the specified output {@code PValue}
+     * with the given {@code Coder} (if not {@code null}).
+     */
+    private void addOutput(String name, PValue value, Coder<?> valueCoder) {
+      registerOutputName(value, name);
+
+      Map<String, Object> properties = getProperties();
+      @Nullable List<Map<String, Object>> outputInfoList = null;
+      try {
+        // TODO: This should be done via a Structs accessor.
+        outputInfoList = (List<Map<String, Object>>) properties.get(PropertyNames.OUTPUT_INFO);
+      } catch (Exception e) {
+        throw new RuntimeException("Inconsistent dataflow pipeline translation", e);
+      }
+      if (outputInfoList == null) {
+        outputInfoList = new ArrayList<>();
+        // TODO: This should be done via a Structs accessor.
+        properties.put(PropertyNames.OUTPUT_INFO, outputInfoList);
+      }
+
+      Map<String, Object> outputInfo = new HashMap<>();
+      addString(outputInfo, PropertyNames.OUTPUT_NAME, name);
+      addString(outputInfo, PropertyNames.USER_NAME, value.getName());
+      if (value instanceof PCollection
+          && runner.doesPCollectionRequireIndexedFormat((PCollection<?>) value)) {
+        addBoolean(outputInfo, PropertyNames.USE_INDEXED_FORMAT, true);
+      }
+      if (valueCoder != null) {
+        // Verify that encoding can be decoded, in order to catch serialization
+        // failures as early as possible.
+        CloudObject encoding = SerializableUtils.ensureSerializable(valueCoder);
+        addObject(outputInfo, PropertyNames.ENCODING, encoding);
+        outputCoders.put(value, valueCoder);
+      }
+
+      outputInfoList.add(outputInfo);
+    }
+
+    private void addDisplayData(String name, DisplayData displayData) {
+      List<Map<String, Object>> list = MAPPER.convertValue(displayData, List.class);
+      addList(getProperties(), name, list);
+    }
+
+    @Override
+    public OutputReference asOutputReference(PValue value) {
+      AppliedPTransform<?, ?, ?> transform =
+          value.getProducingTransformInternal();
+      String stepName = stepNames.get(transform);
+      if (stepName == null) {
+        throw new IllegalArgumentException(transform + " doesn't have a name specified");
+      }
+
+      String outputName = outputNames.get(value);
+      if (outputName == null) {
+        throw new IllegalArgumentException(
+            "output " + value + " doesn't have a name specified");
+      }
+
+      return new OutputReference(stepName, outputName);
+    }
+
+    private Map<String, Object> getProperties() {
+      Map<String, Object> properties = currentStep.getProperties();
+      if (properties == null) {
+        properties = new HashMap<>();
+        currentStep.setProperties(properties);
+      }
+      return properties;
+    }
+
+    /**
+     * Returns a fresh Dataflow step name.
+     */
+    private String genStepName() {
+      return "s" + (stepNames.size() + 1);
+    }
+
+    /**
+     * Records the name of the given output PValue,
+     * within its producing transform.
+     */
+    private void registerOutputName(POutput value, String name) {
+      if (outputNames.put(value, name) != null) {
+        throw new IllegalArgumentException(
+            "output " + value + " already has a name specified");
+      }
+    }
+  }
+
+  /////////////////////////////////////////////////////////////////////////////
+
+  @Override
+  public String toString() {
+    return "DataflowPipelineTranslator#" + hashCode();
+  }
+
+
+  ///////////////////////////////////////////////////////////////////////////
+
+  static {
+    registerTransformTranslator(
+        View.CreatePCollectionView.class,
+        new TransformTranslator<View.CreatePCollectionView>() {
+          @Override
+          public void translate(
+              View.CreatePCollectionView transform,
+              TranslationContext context) {
+            translateTyped(transform, context);
+          }
+
+          private <ElemT, ViewT> void translateTyped(
+              View.CreatePCollectionView<ElemT, ViewT> transform,
+              TranslationContext context) {
+            context.addStep(transform, "CollectionToSingleton");
+            context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform));
+            context.addCollectionToSingletonOutput(
+                PropertyNames.OUTPUT,
+                context.getInput(transform),
+                context.getOutput(transform));
+          }
+        });
+
+    DataflowPipelineTranslator.registerTransformTranslator(
+        Combine.GroupedValues.class,
+        new DataflowPipelineTranslator.TransformTranslator<Combine.GroupedValues>() {
+          @Override
+          public void translate(
+              Combine.GroupedValues transform,
+              DataflowPipelineTranslator.TranslationContext context) {
+            translateHelper(transform, context);
+          }
+
+          private <K, InputT, OutputT> void translateHelper(
+              final Combine.GroupedValues<K, InputT, OutputT> transform,
+              DataflowPipelineTranslator.TranslationContext context) {
+            context.addStep(transform, "CombineValues");
+            translateInputs(context.getInput(transform), transform.getSideInputs(), context);
+
+            AppliedCombineFn<? super K, ? super InputT, ?, OutputT> fn =
+                transform.getAppliedFn(
+                    context.getInput(transform).getPipeline().getCoderRegistry(),
+                context.getInput(transform).getCoder(),
+                context.getInput(transform).getWindowingStrategy());
+
+            context.addEncodingInput(fn.getAccumulatorCoder());
+            context.addInput(
+                PropertyNames.SERIALIZED_FN,
+                byteArrayToJsonString(serializeToByteArray(fn)));
+            context.addOutput(PropertyNames.OUTPUT, context.getOutput(transform));
+          }
+        });
+
+    registerTransformTranslator(
+        Flatten.FlattenPCollectionList.class,
+        new TransformTranslator<Flatten.FlattenPCollectionList>() {
+          @Override
+          public void translate(
+              Flatten.FlattenPCollectionList transform,
+              TranslationContext context) {
+            flattenHelper(transform, context);
+          }
+
+          private <T> void flattenHelper(
+              Flatten.FlattenPCollectionList<T> transform,
+              TranslationContext context) {
+            context.addStep(transform, "Flatten");
+
+            List<OutputReference> inputs = new LinkedList<>();
+            for (PCollection<T> input : context.getInput(transform).getAll()) {
+              inputs.add(context.asOutputReference(input));
+            }
+            context.addInput(PropertyNames.INPUTS, inputs);
+            context.addOutput(PropertyNames.OUTPUT, context.getOutput(transform));
+          }
+        });
+
+    registerTransformTranslator(
+        GroupByKeyAndSortValuesOnly.class,
+        new TransformTranslator<GroupByKeyAndSortValuesOnly>() {
+          @Override
+          public void translate(
+              GroupByKeyAndSortValuesOnly transform,
+              TranslationContext context) {
+            groupByKeyAndSortValuesHelper(transform, context);
+          }
+
+          private <K1, K2, V> void groupByKeyAndSortValuesHelper(
+              GroupByKeyAndSortValuesOnly<K1, K2, V> transform,
+              TranslationContext context) {
+            context.addStep(transform, "GroupByKey");
+            context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform));
+            context.addOutput(PropertyNames.OUTPUT, context.getOutput(transform));
+            context.addInput(PropertyNames.SORT_VALUES, true);
+
+            // TODO: Add support for combiner lifting once the need arises.
+            context.addInput(
+                PropertyNames.DISALLOW_COMBINER_LIFTING, true);
+          }
+        });
+
+    registerTransformTranslator(
+        GroupByKey.class,
+        new TransformTranslator<GroupByKey>() {
+          @Override
+          public void translate(
+              GroupByKey transform,
+              TranslationContext context) {
+            groupByKeyHelper(transform, context);
+          }
+
+          private <K, V> void groupByKeyHelper(
+              GroupByKey<K, V> transform,
+              TranslationContext context) {
+            context.addStep(transform, "GroupByKey");
+            context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform));
+            context.addOutput(PropertyNames.OUTPUT, context.getOutput(transform));
+
+            WindowingStrategy<?, ?> windowingStrategy =
+                context.getInput(transform).getWindowingStrategy();
+            boolean isStreaming =
+                context.getPipelineOptions().as(StreamingOptions.class).isStreaming();
+            boolean disallowCombinerLifting =
+                !windowingStrategy.getWindowFn().isNonMerging()
+                || (isStreaming && !transform.fewKeys())
+                // TODO: Allow combiner lifting on the non-default trigger, as appropriate.
+                || !(windowingStrategy.getTrigger().getSpec() instanceof DefaultTrigger);
+            context.addInput(
+                PropertyNames.DISALLOW_COMBINER_LIFTING, disallowCombinerLifting);
+            context.addInput(
+                PropertyNames.SERIALIZED_FN,
+                byteArrayToJsonString(serializeToByteArray(windowingStrategy)));
+            context.addInput(
+                PropertyNames.IS_MERGING_WINDOW_FN,
+                !windowingStrategy.getWindowFn().isNonMerging());
+          }
+        });
+
+    registerTransformTranslator(
+        ParDo.BoundMulti.class,
+        new TransformTranslator<ParDo.BoundMulti>() {
+          @Override
+          public void translate(
+              ParDo.BoundMulti transform,
+              TranslationContext context) {
+            translateMultiHelper(transform, context);
+          }
+
+          private <InputT, OutputT> void translateMultiHelper(
+              ParDo.BoundMulti<InputT, OutputT> transform,
+              TranslationContext context) {
+            context.addStep(transform, "ParallelDo");
+            translateInputs(context.getInput(transform), transform.getSideInputs(), context);
+            translateFn(transform.getFn(), context.getInput(transform).getWindowingStrategy(),
+                transform.getSideInputs(), context.getInput(transform).getCoder(), context);
+            translateOutputs(context.getOutput(transform), context);
+          }
+        });
+
+    registerTransformTranslator(
+        ParDo.Bound.class,
+        new TransformTranslator<ParDo.Bound>() {
+          @Override
+          public void translate(
+              ParDo.Bound transform,
+              TranslationContext context) {
+            translateSingleHelper(transform, context);
+          }
+
+          private <InputT, OutputT> void translateSingleHelper(
+              ParDo.Bound<InputT, OutputT> transform,
+              TranslationContext context) {
+            context.addStep(transform, "ParallelDo");
+            translateInputs(context.getInput(transform), transform.getSideInputs(), context);
+            translateFn(
+                transform.getFn(),
+                context.getInput(transform).getWindowingStrategy(),
+                transform.getSideInputs(), context.getInput(transform).getCoder(), context);
+            context.addOutput(PropertyNames.OUTPUT, context.getOutput(transform));
+          }
+        });
+
+    registerTransformTranslator(
+        Window.Bound.class,
+        new DataflowPipelineTranslator.TransformTranslator<Window.Bound>() {
+          @Override
+          public void translate(
+              Window.Bound transform, TranslationContext context) {
+            translateHelper(transform, context);
+          }
+
+          private <T> void translateHelper(
+              Window.Bound<T> transform, TranslationContext context) {
+            context.addStep(transform, "Bucket");
+            context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform));
+            context.addOutput(PropertyNames.OUTPUT, context.getOutput(transform));
+
+            WindowingStrategy<?, ?> strategy = context.getOutput(transform).getWindowingStrategy();
+            byte[] serializedBytes = serializeToByteArray(strategy);
+            String serializedJson = byteArrayToJsonString(serializedBytes);
+            assert Arrays.equals(serializedBytes,
+                                 jsonStringToByteArray(serializedJson));
+            context.addInput(PropertyNames.SERIALIZED_FN, serializedJson);
+          }
+        });
+
+    ///////////////////////////////////////////////////////////////////////////
+    // IO Translation.
+
+    registerTransformTranslator(
+        BigQueryIO.Read.Bound.class, new BigQueryIOTranslator.ReadTranslator());
+
+    registerTransformTranslator(
+        PubsubIO.Read.Bound.class, new PubsubIOTranslator.ReadTranslator());
+    registerTransformTranslator(
+        DataflowPipelineRunner.StreamingPubsubIOWrite.class,
+        new PubsubIOTranslator.WriteTranslator());
+
+    registerTransformTranslator(Read.Bounded.class, new ReadTranslator());
+  }
+
+  private static void translateInputs(
+      PCollection<?> input,
+      List<PCollectionView<?>> sideInputs,
+      TranslationContext context) {
+    context.addInput(PropertyNames.PARALLEL_INPUT, input);
+    translateSideInputs(sideInputs, context);
+  }
+
+  // Used for ParDo
+  private static void translateSideInputs(
+      List<PCollectionView<?>> sideInputs,
+      TranslationContext context) {
+    Map<String, Object> nonParInputs = new HashMap<>();
+
+    for (PCollectionView<?> view : sideInputs) {
+      nonParInputs.put(
+          view.getTagInternal().getId(),
+          context.asOutputReference(view));
+    }
+
+    context.addInput(PropertyNames.NON_PARALLEL_INPUTS, nonParInputs);
+  }
+
+  private static void translateFn(
+      DoFn fn,
+      WindowingStrategy windowingStrategy,
+      Iterable<PCollectionView<?>> sideInputs,
+      Coder inputCoder,
+      TranslationContext context) {
+    context.addInput(PropertyNames.USER_FN, fn.getClass().getName());
+    context.addInput(
+        PropertyNames.SERIALIZED_FN,
+        byteArrayToJsonString(serializeToByteArray(
+            new DoFnInfo(fn, windowingStrategy, sideInputs, inputCoder))));
+  }
+
+  private static void translateOutputs(
+      PCollectionTuple outputs,
+      TranslationContext context) {
+    for (Map.Entry<TupleTag<?>, PCollection<?>> entry
+             : outputs.getAll().entrySet()) {
+      TupleTag<?> tag = entry.getKey();
+      PCollection<?> output = entry.getValue();
+      context.addOutput(tag.getId(), output);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowServiceException.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowServiceException.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowServiceException.java
new file mode 100644
index 0000000..5cd8a4d
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowServiceException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow;
+
+import javax.annotation.Nullable;
+
+/**
+ * Signals there was an error retrieving information about a job from the Cloud Dataflow Service.
+ */
+public class DataflowServiceException extends DataflowJobException {
+  DataflowServiceException(DataflowPipelineJob job, String message) {
+    this(job, message, null);
+  }
+
+  DataflowServiceException(DataflowPipelineJob job, String message, @Nullable Throwable cause) {
+    super(job, message, cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/AssignWindows.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/AssignWindows.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/AssignWindows.java
new file mode 100644
index 0000000..1b18c2a
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/AssignWindows.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.internal;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * A primitive {@link PTransform} that implements the {@link Window#into(WindowFn)}
+ * {@link PTransform}.
+ *
+ * For an application of {@link Window#into(WindowFn)} that changes the {@link WindowFn}, applies
+ * a primitive {@link PTransform} in the Dataflow service.
+ *
+ * For an application of {@link Window#into(WindowFn)} that does not change the {@link WindowFn},
+ * applies an identity {@link ParDo} and sets the windowing strategy of the output
+ * {@link PCollection}.
+ *
+ * For internal use only.
+ *
+ * @param <T> the type of input element
+ */
+public class AssignWindows<T> extends PTransform<PCollection<T>, PCollection<T>> {
+  private final Window.Bound<T> transform;
+
+  /**
+   * Builds an instance of this class from the overriden transform.
+   */
+  @SuppressWarnings("unused") // Used via reflection
+  public AssignWindows(Window.Bound<T> transform) {
+    this.transform = transform;
+  }
+
+  @Override
+  public PCollection<T> apply(PCollection<T> input) {
+    WindowingStrategy<?, ?> outputStrategy =
+        transform.getOutputStrategyInternal(input.getWindowingStrategy());
+    if (transform.getWindowFn() != null) {
+      // If the windowFn changed, we create a primitive, and run the AssignWindows operation here.
+      return PCollection.<T>createPrimitiveOutputInternal(
+                            input.getPipeline(), outputStrategy, input.isBounded());
+    } else {
+      // If the windowFn didn't change, we just run a pass-through transform and then set the
+      // new windowing strategy.
+      return input.apply(ParDo.named("Identity").of(new DoFn<T, T>() {
+        @Override
+        public void processElement(DoFn<T, T>.ProcessContext c) throws Exception {
+          c.output(c.element());
+        }
+      })).setWindowingStrategyInternal(outputStrategy);
+    }
+  }
+
+  @Override
+  public void validate(PCollection<T> input) {
+    transform.validate(input);
+  }
+
+  @Override
+  protected Coder<?> getDefaultOutputCoder(PCollection<T> input) {
+    return input.getCoder();
+  }
+
+  @Override
+  protected String getKindString() {
+    return "Window.Into()";
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/BigQueryIOTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/BigQueryIOTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/BigQueryIOTranslator.java
new file mode 100755
index 0000000..b344f0f
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/BigQueryIOTranslator.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.internal;
+
+import org.apache.beam.runners.dataflow.DataflowPipelineTranslator;
+import org.apache.beam.sdk.io.BigQueryIO;
+import org.apache.beam.sdk.util.PropertyNames;
+
+import com.google.api.services.bigquery.model.TableReference;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * BigQuery transform support code for the Dataflow backend.
+ */
+public class BigQueryIOTranslator {
+  private static final Logger LOG = LoggerFactory.getLogger(BigQueryIOTranslator.class);
+
+  /**
+   * Implements BigQueryIO Read translation for the Dataflow backend.
+   */
+  public static class ReadTranslator
+      implements DataflowPipelineTranslator.TransformTranslator<BigQueryIO.Read.Bound> {
+
+    @Override
+    public void translate(
+        BigQueryIO.Read.Bound transform, DataflowPipelineTranslator.TranslationContext context) {
+      // Actual translation.
+      context.addStep(transform, "ParallelRead");
+      context.addInput(PropertyNames.FORMAT, "bigquery");
+      context.addInput(PropertyNames.BIGQUERY_EXPORT_FORMAT, "FORMAT_AVRO");
+
+      if (transform.getQuery() != null) {
+        context.addInput(PropertyNames.BIGQUERY_QUERY, transform.getQuery());
+        context.addInput(PropertyNames.BIGQUERY_FLATTEN_RESULTS, transform.getFlattenResults());
+      } else {
+        TableReference table = transform.getTable();
+        if (table.getProjectId() == null) {
+          // If user does not specify a project we assume the table to be located in the project
+          // that owns the Dataflow job.
+          String projectIdFromOptions = context.getPipelineOptions().getProject();
+          LOG.warn(String.format(BigQueryIO.SET_PROJECT_FROM_OPTIONS_WARNING, table.getDatasetId(),
+              table.getDatasetId(), table.getTableId(), projectIdFromOptions));
+          table.setProjectId(projectIdFromOptions);
+        }
+
+        context.addInput(PropertyNames.BIGQUERY_TABLE, table.getTableId());
+        context.addInput(PropertyNames.BIGQUERY_DATASET, table.getDatasetId());
+        if (table.getProjectId() != null) {
+          context.addInput(PropertyNames.BIGQUERY_PROJECT, table.getProjectId());
+        }
+      }
+      context.addValueOnlyOutput(PropertyNames.OUTPUT, context.getOutput(transform));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/CustomSources.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/CustomSources.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/CustomSources.java
new file mode 100755
index 0000000..73e5da0
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/CustomSources.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.internal;
+
+import static org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray;
+import static org.apache.beam.sdk.util.Structs.addString;
+import static org.apache.beam.sdk.util.Structs.addStringList;
+
+import static com.google.api.client.util.Base64.encodeBase64String;
+import static com.google.common.base.Preconditions.checkArgument;
+
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.Source;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.CloudObject;
+
+import com.google.api.services.dataflow.model.SourceMetadata;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.ByteString;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+/**
+ * A helper class for supporting sources defined as {@code Source}.
+ *
+ * <p>Provides a bridge between the high-level {@code Source} API and the
+ * low-level {@code CloudSource} class.
+ */
+public class CustomSources {
+  private static final String SERIALIZED_SOURCE = "serialized_source";
+  @VisibleForTesting static final String SERIALIZED_SOURCE_SPLITS = "serialized_source_splits";
+  /**
+   * The current limit on the size of a ReportWorkItemStatus RPC to Google Cloud Dataflow, which
+   * includes the initial splits, is 20 MB.
+   */
+  public static final long DATAFLOW_SPLIT_RESPONSE_API_SIZE_BYTES = 20 * (1 << 20);
+
+  private static final Logger LOG = LoggerFactory.getLogger(CustomSources.class);
+
+  private static final ByteString firstSplitKey = ByteString.copyFromUtf8("0000000000000001");
+
+  public static boolean isFirstUnboundedSourceSplit(ByteString splitKey) {
+    return splitKey.equals(firstSplitKey);
+  }
+
+  private static int getDesiredNumUnboundedSourceSplits(DataflowPipelineOptions options) {
+    if (options.getMaxNumWorkers() > 0) {
+      return options.getMaxNumWorkers();
+    } else if (options.getNumWorkers() > 0) {
+      return options.getNumWorkers() * 3;
+    } else {
+      return 20;
+    }
+  }
+
+  public static com.google.api.services.dataflow.model.Source serializeToCloudSource(
+      Source<?> source, PipelineOptions options) throws Exception {
+    com.google.api.services.dataflow.model.Source cloudSource =
+        new com.google.api.services.dataflow.model.Source();
+    // We ourselves act as the SourceFormat.
+    cloudSource.setSpec(CloudObject.forClass(CustomSources.class));
+    addString(
+        cloudSource.getSpec(), SERIALIZED_SOURCE, encodeBase64String(serializeToByteArray(source)));
+
+    SourceMetadata metadata = new SourceMetadata();
+    if (source instanceof BoundedSource) {
+      BoundedSource<?> boundedSource = (BoundedSource<?>) source;
+      try {
+        metadata.setProducesSortedKeys(boundedSource.producesSortedKeys(options));
+      } catch (Exception e) {
+        LOG.warn("Failed to check if the source produces sorted keys: " + source, e);
+      }
+
+      // Size estimation is best effort so we continue even if it fails here.
+      try {
+        metadata.setEstimatedSizeBytes(boundedSource.getEstimatedSizeBytes(options));
+      } catch (Exception e) {
+        LOG.warn("Size estimation of the source failed: " + source, e);
+      }
+    } else if (source instanceof UnboundedSource) {
+      UnboundedSource<?, ?> unboundedSource = (UnboundedSource<?, ?>) source;
+      metadata.setInfinite(true);
+      List<String> encodedSplits = new ArrayList<>();
+      int desiredNumSplits =
+          getDesiredNumUnboundedSourceSplits(options.as(DataflowPipelineOptions.class));
+      for (UnboundedSource<?, ?> split :
+          unboundedSource.generateInitialSplits(desiredNumSplits, options)) {
+        encodedSplits.add(encodeBase64String(serializeToByteArray(split)));
+      }
+      checkArgument(!encodedSplits.isEmpty(), "UnboundedSources must have at least one split");
+      addStringList(cloudSource.getSpec(), SERIALIZED_SOURCE_SPLITS, encodedSplits);
+    } else {
+      throw new IllegalArgumentException("Unexpected source kind: " + source.getClass());
+    }
+
+    cloudSource.setMetadata(metadata);
+    return cloudSource;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowAggregatorTransforms.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowAggregatorTransforms.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowAggregatorTransforms.java
new file mode 100755
index 0000000..7a08fde
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowAggregatorTransforms.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.internal;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.PTransform;
+
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+
+/**
+ * A mapping relating {@link Aggregator}s and the {@link PTransform} in which they are used.
+ */
+public class DataflowAggregatorTransforms {
+  private final Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorTransforms;
+  private final Multimap<PTransform<?, ?>, AppliedPTransform<?, ?, ?>> transformAppliedTransforms;
+  private final BiMap<AppliedPTransform<?, ?, ?>, String> appliedStepNames;
+
+  public DataflowAggregatorTransforms(
+      Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorTransforms,
+      Map<AppliedPTransform<?, ?, ?>, String> transformStepNames) {
+    this.aggregatorTransforms = aggregatorTransforms;
+    appliedStepNames = HashBiMap.create(transformStepNames);
+
+    transformAppliedTransforms = HashMultimap.create();
+    for (AppliedPTransform<?, ?, ?> appliedTransform : transformStepNames.keySet()) {
+      transformAppliedTransforms.put(appliedTransform.getTransform(), appliedTransform);
+    }
+  }
+
+  /**
+   * Returns true if the provided {@link Aggregator} is used in the constructing {@link Pipeline}.
+   */
+  public boolean contains(Aggregator<?, ?> aggregator) {
+    return aggregatorTransforms.containsKey(aggregator);
+  }
+
+  /**
+   * Gets the step names in which the {@link Aggregator} is used.
+   */
+  public Collection<String> getAggregatorStepNames(Aggregator<?, ?> aggregator) {
+    Collection<String> names = new HashSet<>();
+    Collection<PTransform<?, ?>> transforms = aggregatorTransforms.get(aggregator);
+    for (PTransform<?, ?> transform : transforms) {
+      for (AppliedPTransform<?, ?, ?> applied : transformAppliedTransforms.get(transform)) {
+        names.add(appliedStepNames.get(applied));
+      }
+    }
+    return names;
+  }
+
+  /**
+   * Gets the {@link PTransform} that was assigned the provided step name.
+   */
+  public AppliedPTransform<?, ?, ?> getAppliedTransformForStepName(String stepName) {
+    return appliedStepNames.inverse().get(stepName);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowMetricUpdateExtractor.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowMetricUpdateExtractor.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowMetricUpdateExtractor.java
new file mode 100755
index 0000000..8ab59fc
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowMetricUpdateExtractor.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.internal;
+
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.PTransform;
+
+import com.google.api.services.dataflow.model.MetricStructuredName;
+import com.google.api.services.dataflow.model.MetricUpdate;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Methods for extracting the values of an {@link Aggregator} from a collection of {@link
+ * MetricUpdate MetricUpdates}.
+ */
+public final class DataflowMetricUpdateExtractor {
+  private static final String STEP_NAME_CONTEXT_KEY = "step";
+  private static final String IS_TENTATIVE_KEY = "tentative";
+
+  private DataflowMetricUpdateExtractor() {
+    // Do not instantiate.
+  }
+
+  /**
+   * Extract the values of the provided {@link Aggregator} at each {@link PTransform} it was used in
+   * according to the provided {@link DataflowAggregatorTransforms} from the given list of {@link
+   * MetricUpdate MetricUpdates}.
+   */
+  public static <OutputT> Map<String, OutputT> fromMetricUpdates(Aggregator<?, OutputT> aggregator,
+      DataflowAggregatorTransforms aggregatorTransforms, List<MetricUpdate> metricUpdates) {
+    Map<String, OutputT> results = new HashMap<>();
+    if (metricUpdates == null) {
+      return results;
+    }
+
+    String aggregatorName = aggregator.getName();
+    Collection<String> aggregatorSteps = aggregatorTransforms.getAggregatorStepNames(aggregator);
+
+    for (MetricUpdate metricUpdate : metricUpdates) {
+      MetricStructuredName metricStructuredName = metricUpdate.getName();
+      Map<String, String> context = metricStructuredName.getContext();
+      if (metricStructuredName.getName().equals(aggregatorName) && context != null
+          && aggregatorSteps.contains(context.get(STEP_NAME_CONTEXT_KEY))) {
+        AppliedPTransform<?, ?, ?> transform =
+            aggregatorTransforms.getAppliedTransformForStepName(
+                context.get(STEP_NAME_CONTEXT_KEY));
+        String fullName = transform.getFullName();
+        // Prefer the tentative (fresher) value if it exists.
+        if (Boolean.parseBoolean(context.get(IS_TENTATIVE_KEY)) || !results.containsKey(fullName)) {
+          results.put(fullName, toValue(aggregator, metricUpdate));
+        }
+      }
+    }
+
+    return results;
+
+  }
+
+  private static <OutputT> OutputT toValue(
+      Aggregator<?, OutputT> aggregator, MetricUpdate metricUpdate) {
+    CombineFn<?, ?, OutputT> combineFn = aggregator.getCombineFn();
+    Class<? super OutputT> outputType = combineFn.getOutputType().getRawType();
+
+    if (outputType.equals(Long.class)) {
+      @SuppressWarnings("unchecked")
+      OutputT asLong = (OutputT) Long.valueOf(toNumber(metricUpdate).longValue());
+      return asLong;
+    }
+    if (outputType.equals(Integer.class)) {
+      @SuppressWarnings("unchecked")
+      OutputT asInt = (OutputT) Integer.valueOf(toNumber(metricUpdate).intValue());
+      return asInt;
+    }
+    if (outputType.equals(Double.class)) {
+      @SuppressWarnings("unchecked")
+      OutputT asDouble = (OutputT) Double.valueOf(toNumber(metricUpdate).doubleValue());
+      return asDouble;
+    }
+    throw new UnsupportedOperationException(
+        "Unsupported Output Type " + outputType + " in aggregator " + aggregator);
+  }
+
+  private static Number toNumber(MetricUpdate update) {
+    if (update.getScalar() instanceof Number) {
+      return (Number) update.getScalar();
+    }
+    throw new IllegalArgumentException(
+        "Metric Update " + update + " does not have a numeric scalar");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/PubsubIOTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/PubsubIOTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/PubsubIOTranslator.java
new file mode 100755
index 0000000..976f948
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/PubsubIOTranslator.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.internal;
+
+import org.apache.beam.runners.dataflow.DataflowPipelineRunner;
+import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TransformTranslator;
+import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TranslationContext;
+import org.apache.beam.sdk.io.PubsubIO;
+import org.apache.beam.sdk.util.PropertyNames;
+import org.apache.beam.sdk.util.WindowedValue;
+
+/**
+ * Pubsub transform support code for the Dataflow backend.
+ */
+public class PubsubIOTranslator {
+
+  /**
+   * Implements PubsubIO Read translation for the Dataflow backend.
+   */
+  public static class ReadTranslator<T> implements TransformTranslator<PubsubIO.Read.Bound<T>> {
+    @Override
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    public void translate(
+        PubsubIO.Read.Bound transform,
+        TranslationContext context) {
+      translateReadHelper(transform, context);
+    }
+
+    private <T> void translateReadHelper(
+        PubsubIO.Read.Bound<T> transform,
+        TranslationContext context) {
+      if (!context.getPipelineOptions().isStreaming()) {
+        throw new IllegalArgumentException(
+            "PubsubIO.Read can only be used with the Dataflow streaming runner.");
+      }
+
+      context.addStep(transform, "ParallelRead");
+      context.addInput(PropertyNames.FORMAT, "pubsub");
+      if (transform.getTopic() != null) {
+        context.addInput(PropertyNames.PUBSUB_TOPIC, transform.getTopic().asV1Beta1Path());
+      }
+      if (transform.getSubscription() != null) {
+        context.addInput(
+            PropertyNames.PUBSUB_SUBSCRIPTION, transform.getSubscription().asV1Beta1Path());
+      }
+      if (transform.getTimestampLabel() != null) {
+        context.addInput(PropertyNames.PUBSUB_TIMESTAMP_LABEL, transform.getTimestampLabel());
+      }
+      if (transform.getIdLabel() != null) {
+        context.addInput(PropertyNames.PUBSUB_ID_LABEL, transform.getIdLabel());
+      }
+      context.addValueOnlyOutput(PropertyNames.OUTPUT, context.getOutput(transform));
+    }
+  }
+
+  /**
+   * Implements PubsubIO Write translation for the Dataflow backend.
+   */
+  public static class WriteTranslator<T>
+      implements TransformTranslator<DataflowPipelineRunner.StreamingPubsubIOWrite<T>> {
+
+    @Override
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    public void translate(
+        DataflowPipelineRunner.StreamingPubsubIOWrite transform,
+        TranslationContext context) {
+      translateWriteHelper(transform, context);
+    }
+
+    private <T> void translateWriteHelper(
+        DataflowPipelineRunner.StreamingPubsubIOWrite<T> customTransform,
+        TranslationContext context) {
+      if (!context.getPipelineOptions().isStreaming()) {
+        throw new IllegalArgumentException(
+            "PubsubIO.Write is non-primitive for the Dataflow batch runner.");
+      }
+
+      PubsubIO.Write.Bound<T> transform = customTransform.getOverriddenTransform();
+
+      context.addStep(customTransform, "ParallelWrite");
+      context.addInput(PropertyNames.FORMAT, "pubsub");
+      context.addInput(PropertyNames.PUBSUB_TOPIC, transform.getTopic().asV1Beta1Path());
+      if (transform.getTimestampLabel() != null) {
+        context.addInput(PropertyNames.PUBSUB_TIMESTAMP_LABEL, transform.getTimestampLabel());
+      }
+      if (transform.getIdLabel() != null) {
+        context.addInput(PropertyNames.PUBSUB_ID_LABEL, transform.getIdLabel());
+      }
+      context.addEncodingInput(WindowedValue.getValueOnlyCoder(transform.getCoder()));
+      context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(customTransform));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java
new file mode 100755
index 0000000..373738a
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.internal;
+
+import static org.apache.beam.sdk.util.Structs.addBoolean;
+import static org.apache.beam.sdk.util.Structs.addDictionary;
+import static org.apache.beam.sdk.util.Structs.addLong;
+
+import org.apache.beam.runners.dataflow.DataflowPipelineTranslator;
+import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TransformTranslator;
+import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TranslationContext;
+import org.apache.beam.sdk.io.FileBasedSource;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.Source;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.PropertyNames;
+import org.apache.beam.sdk.values.PValue;
+
+import com.google.api.services.dataflow.model.SourceMetadata;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Translator for the {@code Read} {@code PTransform} for the Dataflow back-end.
+ */
+public class ReadTranslator implements TransformTranslator<Read.Bounded<?>> {
+  @Override
+  public void translate(Read.Bounded<?> transform, TranslationContext context) {
+    translateReadHelper(transform.getSource(), transform, context);
+  }
+
+  public static <T> void translateReadHelper(Source<T> source,
+      PTransform<?, ? extends PValue> transform,
+      DataflowPipelineTranslator.TranslationContext context) {
+    try {
+      // TODO: Move this validation out of translation once IOChannelUtils is portable
+      // and can be reconstructed on the worker.
+      if (source instanceof FileBasedSource) {
+        String filePatternOrSpec = ((FileBasedSource<?>) source).getFileOrPatternSpec();
+        context.getPipelineOptions()
+               .getPathValidator()
+               .validateInputFilePatternSupported(filePatternOrSpec);
+      }
+
+      context.addStep(transform, "ParallelRead");
+      context.addInput(PropertyNames.FORMAT, PropertyNames.CUSTOM_SOURCE_FORMAT);
+      context.addInput(
+          PropertyNames.SOURCE_STEP_INPUT,
+          cloudSourceToDictionary(
+              CustomSources.serializeToCloudSource(source, context.getPipelineOptions())));
+      context.addValueOnlyOutput(PropertyNames.OUTPUT, context.getOutput(transform));
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  // Represents a cloud Source as a dictionary for encoding inside the {@code SOURCE_STEP_INPUT}
+  // property of CloudWorkflowStep.input.
+  private static Map<String, Object> cloudSourceToDictionary(
+      com.google.api.services.dataflow.model.Source source) {
+    // Do not translate encoding - the source's encoding is translated elsewhere
+    // to the step's output info.
+    Map<String, Object> res = new HashMap<>();
+    addDictionary(res, PropertyNames.SOURCE_SPEC, source.getSpec());
+    if (source.getMetadata() != null) {
+      addDictionary(res, PropertyNames.SOURCE_METADATA,
+          cloudSourceMetadataToDictionary(source.getMetadata()));
+    }
+    if (source.getDoesNotNeedSplitting() != null) {
+      addBoolean(
+          res, PropertyNames.SOURCE_DOES_NOT_NEED_SPLITTING, source.getDoesNotNeedSplitting());
+    }
+    return res;
+  }
+
+  private static Map<String, Object> cloudSourceMetadataToDictionary(SourceMetadata metadata) {
+    Map<String, Object> res = new HashMap<>();
+    if (metadata.getProducesSortedKeys() != null) {
+      addBoolean(res, PropertyNames.SOURCE_PRODUCES_SORTED_KEYS, metadata.getProducesSortedKeys());
+    }
+    if (metadata.getEstimatedSizeBytes() != null) {
+      addLong(res, PropertyNames.SOURCE_ESTIMATED_SIZE_BYTES, metadata.getEstimatedSizeBytes());
+    }
+    if (metadata.getInfinite() != null) {
+      addBoolean(res, PropertyNames.SOURCE_IS_INFINITE, metadata.getInfinite());
+    }
+    return res;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/package-info.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/package-info.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/package-info.java
new file mode 100755
index 0000000..f2e8459
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Implementation of the {@link org.apache.beam.runners.dataflow.DataflowPipelineRunner}.
+ */
+package org.apache.beam.runners.dataflow.internal;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/BlockingDataflowPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/BlockingDataflowPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/BlockingDataflowPipelineOptions.java
new file mode 100644
index 0000000..7fa5ad6
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/BlockingDataflowPipelineOptions.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.options;
+
+import org.apache.beam.runners.dataflow.BlockingDataflowPipelineRunner;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.DefaultValueFactory;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.Hidden;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+import java.io.PrintStream;
+
+/**
+ * Options that are used to configure the {@link BlockingDataflowPipelineRunner}.
+ */
+@Description("Configure options on the BlockingDataflowPipelineRunner.")
+public interface BlockingDataflowPipelineOptions extends DataflowPipelineOptions {
+  /**
+   * Output stream for job status messages.
+   */
+  @Description("Where messages generated during execution of the Dataflow job will be output.")
+  @JsonIgnore
+  @Hidden
+  @Default.InstanceFactory(StandardOutputFactory.class)
+  PrintStream getJobMessageOutput();
+  void setJobMessageOutput(PrintStream value);
+
+  /**
+   * Returns a default of {@link System#out}.
+   */
+  public static class StandardOutputFactory implements DefaultValueFactory<PrintStream> {
+    @Override
+    public PrintStream create(PipelineOptions options) {
+      return System.out;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/CloudDebuggerOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/CloudDebuggerOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/CloudDebuggerOptions.java
new file mode 100644
index 0000000..dbfbb16
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/CloudDebuggerOptions.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.options;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.Hidden;
+
+import com.google.api.services.clouddebugger.v2.model.Debuggee;
+
+import javax.annotation.Nullable;
+
+/**
+ * Options for controlling Cloud Debugger.
+ */
+@Description("[Experimental] Used to configure the Cloud Debugger")
+@Experimental
+@Hidden
+public interface CloudDebuggerOptions {
+
+  /** Whether to enable the Cloud Debugger snapshot agent for the current job. */
+  @Description("Whether to enable the Cloud Debugger snapshot agent for the current job.")
+  boolean getEnableCloudDebugger();
+  void setEnableCloudDebugger(boolean enabled);
+
+  /** The Cloud Debugger debuggee to associate with. This should not be set directly. */
+  @Description("The Cloud Debugger debuggee to associate with. This should not be set directly.")
+  @Hidden
+  @Nullable Debuggee getDebuggee();
+  void setDebuggee(Debuggee debuggee);
+
+  /** The maximum cost (as a ratio of CPU time) allowed for evaluating conditional snapshots. */
+  @Description(
+      "The maximum cost (as a ratio of CPU time) allowed for evaluating conditional snapshots. "
+      + "Should be a double between 0 and 1. "
+      + "Snapshots will be cancelled if evaluating conditions takes more than this ratio of time.")
+  @Default.Double(0.01)
+  double getMaxConditionCost();
+  void setMaxConditionCost(double maxConditionCost);
+}