You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/04/12 01:42:40 UTC

[06/18] incubator-beam git commit: [BEAM-151] Move a large portion of the Dataflow runner to separate maven module

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunnerHooks.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunnerHooks.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunnerHooks.java
deleted file mode 100644
index 2cf77a0..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunnerHooks.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.runners;
-
-import com.google.api.services.dataflow.model.Environment;
-import com.google.cloud.dataflow.sdk.annotations.Experimental;
-
-/**
- * An instance of this class can be passed to the
- * {@link DataflowPipelineRunner} to add user defined hooks to be
- * invoked at various times during pipeline execution.
- */
-@Experimental
-public class DataflowPipelineRunnerHooks {
-  /**
-   * Allows the user to modify the environment of their job before their job is submitted
-   * to the service for execution.
-   *
-   * @param environment The environment of the job. Users can make change to this instance in order
-   *     to change the environment with which their job executes on the service.
-   */
-  public void modifyEnvironmentBeforeSubmission(Environment environment) {}
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java
deleted file mode 100644
index 5bff46c..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java
+++ /dev/null
@@ -1,1105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.runners;
-
-import static com.google.cloud.dataflow.sdk.util.CoderUtils.encodeToByteArray;
-import static com.google.cloud.dataflow.sdk.util.SerializableUtils.serializeToByteArray;
-import static com.google.cloud.dataflow.sdk.util.StringUtils.byteArrayToJsonString;
-import static com.google.cloud.dataflow.sdk.util.StringUtils.jsonStringToByteArray;
-import static com.google.cloud.dataflow.sdk.util.Structs.addBoolean;
-import static com.google.cloud.dataflow.sdk.util.Structs.addDictionary;
-import static com.google.cloud.dataflow.sdk.util.Structs.addList;
-import static com.google.cloud.dataflow.sdk.util.Structs.addLong;
-import static com.google.cloud.dataflow.sdk.util.Structs.addObject;
-import static com.google.cloud.dataflow.sdk.util.Structs.addString;
-import static com.google.cloud.dataflow.sdk.util.Structs.getString;
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.api.services.dataflow.model.AutoscalingSettings;
-import com.google.api.services.dataflow.model.DataflowPackage;
-import com.google.api.services.dataflow.model.Disk;
-import com.google.api.services.dataflow.model.Environment;
-import com.google.api.services.dataflow.model.Job;
-import com.google.api.services.dataflow.model.Step;
-import com.google.api.services.dataflow.model.WorkerPool;
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.Pipeline.PipelineVisitor;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.CoderException;
-import com.google.cloud.dataflow.sdk.coders.IterableCoder;
-import com.google.cloud.dataflow.sdk.io.BigQueryIO;
-import com.google.cloud.dataflow.sdk.io.PubsubIO;
-import com.google.cloud.dataflow.sdk.io.Read;
-import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
-import com.google.cloud.dataflow.sdk.options.StreamingOptions;
-import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner.GroupByKeyAndSortValuesOnly;
-import com.google.cloud.dataflow.sdk.runners.dataflow.BigQueryIOTranslator;
-import com.google.cloud.dataflow.sdk.runners.dataflow.PubsubIOTranslator;
-import com.google.cloud.dataflow.sdk.runners.dataflow.ReadTranslator;
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-import com.google.cloud.dataflow.sdk.transforms.Combine;
-import com.google.cloud.dataflow.sdk.transforms.Create;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.Flatten;
-import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.View;
-import com.google.cloud.dataflow.sdk.transforms.display.DisplayData;
-import com.google.cloud.dataflow.sdk.transforms.windowing.DefaultTrigger;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
-import com.google.cloud.dataflow.sdk.util.AppliedCombineFn;
-import com.google.cloud.dataflow.sdk.util.CloudObject;
-import com.google.cloud.dataflow.sdk.util.DoFnInfo;
-import com.google.cloud.dataflow.sdk.util.OutputReference;
-import com.google.cloud.dataflow.sdk.util.PropertyNames;
-import com.google.cloud.dataflow.sdk.util.SerializableUtils;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PCollectionTuple;
-import com.google.cloud.dataflow.sdk.values.PCollectionView;
-import com.google.cloud.dataflow.sdk.values.PInput;
-import com.google.cloud.dataflow.sdk.values.POutput;
-import com.google.cloud.dataflow.sdk.values.PValue;
-import com.google.cloud.dataflow.sdk.values.TupleTag;
-import com.google.cloud.dataflow.sdk.values.TypedPValue;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import javax.annotation.Nullable;
-
-/**
- * {@link DataflowPipelineTranslator} knows how to translate {@link Pipeline} objects
- * into Cloud Dataflow Service API {@link Job}s.
- */
-@SuppressWarnings({"rawtypes", "unchecked"})
-public class DataflowPipelineTranslator {
-  // Must be kept in sync with their internal counterparts.
-  private static final Logger LOG = LoggerFactory.getLogger(DataflowPipelineTranslator.class);
-  private static final ObjectMapper MAPPER = new ObjectMapper();
-
-  /**
-   * A map from {@link PTransform} subclass to the corresponding
-   * {@link TransformTranslator} to use to translate that transform.
-   *
-   * <p>A static map that contains system-wide defaults.
-   */
-  private static Map<Class, TransformTranslator> transformTranslators =
-      new HashMap<>();
-
-  /** Provided configuration options. */
-  private final DataflowPipelineOptions options;
-
-  /**
-   * Constructs a translator from the provided options.
-   *
-   * @param options Properties that configure the translator.
-   *
-   * @return The newly created translator.
-   */
-  public static DataflowPipelineTranslator fromOptions(
-      DataflowPipelineOptions options) {
-    return new DataflowPipelineTranslator(options);
-  }
-
-  private DataflowPipelineTranslator(DataflowPipelineOptions options) {
-    this.options = options;
-  }
-
-  /**
-   * Translates a {@link Pipeline} into a {@code JobSpecification}.
-   */
-  public JobSpecification translate(
-      Pipeline pipeline,
-      DataflowPipelineRunner runner,
-      List<DataflowPackage> packages) {
-
-    Translator translator = new Translator(pipeline, runner);
-    Job result = translator.translate(packages);
-    return new JobSpecification(result, Collections.unmodifiableMap(translator.stepNames));
-  }
-
-  /**
-   * The result of a job translation.
-   *
-   * <p>Used to pass the result {@link Job} and any state that was used to construct the job that
-   * may be of use to other classes (eg the {@link PTransform} to StepName mapping).
-   */
-  public static class JobSpecification {
-    private final Job job;
-    private final Map<AppliedPTransform<?, ?, ?>, String> stepNames;
-
-    public JobSpecification(Job job, Map<AppliedPTransform<?, ?, ?>, String> stepNames) {
-      this.job = job;
-      this.stepNames = stepNames;
-    }
-
-    public Job getJob() {
-      return job;
-    }
-
-    /**
-     * Returns the mapping of {@link AppliedPTransform AppliedPTransforms} to the internal step
-     * name for that {@code AppliedPTransform}.
-     */
-    public Map<AppliedPTransform<?, ?, ?>, String> getStepNames() {
-      return stepNames;
-    }
-  }
-
-  /**
-   * Renders a {@link Job} as a string.
-   */
-  public static String jobToString(Job job) {
-    try {
-      return MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(job);
-    } catch (JsonProcessingException exc) {
-      throw new IllegalStateException("Failed to render Job as String.", exc);
-    }
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  /**
-   * Records that instances of the specified PTransform class
-   * should be translated by default by the corresponding
-   * {@link TransformTranslator}.
-   */
-  public static <TransformT extends PTransform> void registerTransformTranslator(
-      Class<TransformT> transformClass,
-      TransformTranslator<? extends TransformT> transformTranslator) {
-    if (transformTranslators.put(transformClass, transformTranslator) != null) {
-      throw new IllegalArgumentException(
-          "defining multiple translators for " + transformClass);
-    }
-  }
-
-  /**
-   * Returns the {@link TransformTranslator} to use for instances of the
-   * specified PTransform class, or null if none registered.
-   */
-  public <TransformT extends PTransform>
-      TransformTranslator<TransformT> getTransformTranslator(Class<TransformT> transformClass) {
-    return transformTranslators.get(transformClass);
-  }
-
-  /**
-   * A {@link TransformTranslator} knows how to translate
-   * a particular subclass of {@link PTransform} for the
-   * Cloud Dataflow service. It does so by
-   * mutating the {@link TranslationContext}.
-   */
-  public interface TransformTranslator<TransformT extends PTransform> {
-    public void translate(TransformT transform,
-                          TranslationContext context);
-  }
-
-  /**
-   * The interface provided to registered callbacks for interacting
-   * with the {@link DataflowPipelineRunner}, including reading and writing the
-   * values of {@link PCollection}s and side inputs ({@link PCollectionView}s).
-   */
-  public interface TranslationContext {
-    /**
-     * Returns the configured pipeline options.
-     */
-    DataflowPipelineOptions getPipelineOptions();
-
-    /**
-     * Returns the input of the currently being translated transform.
-     */
-    <InputT extends PInput> InputT getInput(PTransform<InputT, ?> transform);
-
-    /**
-     * Returns the output of the currently being translated transform.
-     */
-    <OutputT extends POutput> OutputT getOutput(PTransform<?, OutputT> transform);
-
-    /**
-     * Returns the full name of the currently being translated transform.
-     */
-    String getFullName(PTransform<?, ?> transform);
-
-    /**
-     * Adds a step to the Dataflow workflow for the given transform, with
-     * the given Dataflow step type.
-     * This step becomes "current" for the purpose of {@link #addInput} and
-     * {@link #addOutput}.
-     */
-    public void addStep(PTransform<?, ?> transform, String type);
-
-    /**
-     * Adds a pre-defined step to the Dataflow workflow. The given PTransform should be
-     * consistent with the Step, in terms of input, output and coder types.
-     *
-     * <p>This is a low-level operation, when using this method it is up to
-     * the caller to ensure that names do not collide.
-     */
-    public void addStep(PTransform<?, ? extends PValue> transform, Step step);
-
-    /**
-     * Sets the encoding for the current Dataflow step.
-     */
-    public void addEncodingInput(Coder<?> value);
-
-    /**
-     * Adds an input with the given name and value to the current
-     * Dataflow step.
-     */
-    public void addInput(String name, Boolean value);
-
-    /**
-     * Adds an input with the given name and value to the current
-     * Dataflow step.
-     */
-    public void addInput(String name, String value);
-
-    /**
-     * Adds an input with the given name and value to the current
-     * Dataflow step.
-     */
-    public void addInput(String name, Long value);
-
-    /**
-     * Adds an input with the given name to the previously added Dataflow
-     * step, coming from the specified input PValue.
-     */
-    public void addInput(String name, PInput value);
-
-    /**
-     * Adds an input that is a dictionary of strings to objects.
-     */
-    public void addInput(String name, Map<String, Object> elements);
-
-    /**
-     * Adds an input that is a list of objects.
-     */
-    public void addInput(String name, List<? extends Map<String, Object>> elements);
-
-    /**
-     * Adds an output with the given name to the previously added
-     * Dataflow step, producing the specified output {@code PValue},
-     * including its {@code Coder} if a {@code TypedPValue}.  If the
-     * {@code PValue} is a {@code PCollection}, wraps its coder inside
-     * a {@code WindowedValueCoder}.
-     */
-    public void addOutput(String name, PValue value);
-
-    /**
-     * Adds an output with the given name to the previously added
-     * Dataflow step, producing the specified output {@code PValue},
-     * including its {@code Coder} if a {@code TypedPValue}.  If the
-     * {@code PValue} is a {@code PCollection}, wraps its coder inside
-     * a {@code ValueOnlyCoder}.
-     */
-    public void addValueOnlyOutput(String name, PValue value);
-
-    /**
-     * Adds an output with the given name to the previously added
-     * CollectionToSingleton Dataflow step, consuming the specified
-     * input {@code PValue} and producing the specified output
-     * {@code PValue}.  This step requires special treatment for its
-     * output encoding.
-     */
-    public void addCollectionToSingletonOutput(String name,
-                                               PValue inputValue,
-                                               PValue outputValue);
-
-    /**
-     * Encode a PValue reference as an output reference.
-     */
-    public OutputReference asOutputReference(PValue value);
-  }
-
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  /**
-   * Translates a Pipeline into the Dataflow representation.
-   */
-  class Translator implements PipelineVisitor, TranslationContext {
-    /** The Pipeline to translate. */
-    private final Pipeline pipeline;
-
-    /** The runner which will execute the pipeline. */
-    private final DataflowPipelineRunner runner;
-
-    /** The Cloud Dataflow Job representation. */
-    private final Job job = new Job();
-
-    /**
-     * Translator is stateful, as addProperty calls refer to the current step.
-     */
-    private Step currentStep;
-
-    /**
-     * A Map from AppliedPTransform to their unique Dataflow step names.
-     */
-    private final Map<AppliedPTransform<?, ?, ?>, String> stepNames = new HashMap<>();
-
-    /**
-     * A Map from PValues to their output names used by their producer
-     * Dataflow steps.
-     */
-    private final Map<POutput, String> outputNames = new HashMap<>();
-
-    /**
-     * A Map from PValues to the Coders used for them.
-     */
-    private final Map<POutput, Coder<?>> outputCoders = new HashMap<>();
-
-    /**
-     * The transform currently being applied.
-     */
-    private AppliedPTransform<?, ?, ?> currentTransform;
-
-    /**
-     * Constructs a Translator that will translate the specified
-     * Pipeline into Dataflow objects.
-     */
-    public Translator(Pipeline pipeline, DataflowPipelineRunner runner) {
-      this.pipeline = pipeline;
-      this.runner = runner;
-    }
-
-    /**
-     * Translates this Translator's pipeline onto its writer.
-     * @return a Job definition filled in with the type of job, the environment,
-     * and the job steps.
-     */
-    public Job translate(List<DataflowPackage> packages) {
-      job.setName(options.getJobName().toLowerCase());
-
-      Environment environment = new Environment();
-      job.setEnvironment(environment);
-
-      try {
-        environment.setSdkPipelineOptions(
-            MAPPER.readValue(MAPPER.writeValueAsBytes(options), Map.class));
-      } catch (IOException e) {
-        throw new IllegalArgumentException(
-            "PipelineOptions specified failed to serialize to JSON.", e);
-      }
-
-      WorkerPool workerPool = new WorkerPool();
-
-      if (options.getTeardownPolicy() != null) {
-        workerPool.setTeardownPolicy(options.getTeardownPolicy().getTeardownPolicyName());
-      }
-
-      if (options.isStreaming()) {
-        job.setType("JOB_TYPE_STREAMING");
-      } else {
-        job.setType("JOB_TYPE_BATCH");
-        workerPool.setDiskType(options.getWorkerDiskType());
-      }
-
-      if (options.getWorkerMachineType() != null) {
-        workerPool.setMachineType(options.getWorkerMachineType());
-      }
-
-      workerPool.setPackages(packages);
-      workerPool.setNumWorkers(options.getNumWorkers());
-
-      if (options.isStreaming()) {
-        // Use separate data disk for streaming.
-        Disk disk = new Disk();
-        disk.setDiskType(options.getWorkerDiskType());
-        workerPool.setDataDisks(Collections.singletonList(disk));
-      }
-      if (!Strings.isNullOrEmpty(options.getZone())) {
-        workerPool.setZone(options.getZone());
-      }
-      if (!Strings.isNullOrEmpty(options.getNetwork())) {
-        workerPool.setNetwork(options.getNetwork());
-      }
-      if (!Strings.isNullOrEmpty(options.getSubnetwork())) {
-        workerPool.setSubnetwork(options.getSubnetwork());
-      }
-      if (options.getDiskSizeGb() > 0) {
-        workerPool.setDiskSizeGb(options.getDiskSizeGb());
-      }
-      AutoscalingSettings settings = new AutoscalingSettings();
-      if (options.getAutoscalingAlgorithm() != null) {
-        settings.setAlgorithm(options.getAutoscalingAlgorithm().getAlgorithm());
-      }
-      settings.setMaxNumWorkers(options.getMaxNumWorkers());
-      workerPool.setAutoscalingSettings(settings);
-
-      List<WorkerPool> workerPools = new LinkedList<>();
-
-      workerPools.add(workerPool);
-      environment.setWorkerPools(workerPools);
-
-      pipeline.traverseTopologically(this);
-      return job;
-    }
-
-    @Override
-    public DataflowPipelineOptions getPipelineOptions() {
-      return options;
-    }
-
-    @Override
-    public <InputT extends PInput> InputT getInput(PTransform<InputT, ?> transform) {
-      return (InputT) getCurrentTransform(transform).getInput();
-    }
-
-    @Override
-    public <OutputT extends POutput> OutputT getOutput(PTransform<?, OutputT> transform) {
-      return (OutputT) getCurrentTransform(transform).getOutput();
-    }
-
-    @Override
-    public String getFullName(PTransform<?, ?> transform) {
-      return getCurrentTransform(transform).getFullName();
-    }
-
-    private AppliedPTransform<?, ?, ?> getCurrentTransform(PTransform<?, ?> transform) {
-      checkArgument(
-          currentTransform != null && currentTransform.getTransform() == transform,
-          "can only be called with current transform");
-      return currentTransform;
-    }
-
-    @Override
-    public void enterCompositeTransform(TransformTreeNode node) {
-    }
-
-    @Override
-    public void leaveCompositeTransform(TransformTreeNode node) {
-    }
-
-    @Override
-    public void visitTransform(TransformTreeNode node) {
-      PTransform<?, ?> transform = node.getTransform();
-      TransformTranslator translator =
-          getTransformTranslator(transform.getClass());
-      if (translator == null) {
-        throw new IllegalStateException(
-            "no translator registered for " + transform);
-      }
-      LOG.debug("Translating {}", transform);
-      currentTransform = AppliedPTransform.of(
-          node.getFullName(), node.getInput(), node.getOutput(), (PTransform) transform);
-      translator.translate(transform, this);
-      currentTransform = null;
-    }
-
-    @Override
-    public void visitValue(PValue value, TransformTreeNode producer) {
-      LOG.debug("Checking translation of {}", value);
-      if (value.getProducingTransformInternal() == null) {
-        throw new RuntimeException(
-            "internal error: expecting a PValue "
-            + "to have a producingTransform");
-      }
-      if (!producer.isCompositeNode()) {
-        // Primitive transforms are the only ones assigned step names.
-        asOutputReference(value);
-      }
-    }
-
-    @Override
-    public void addStep(PTransform<?, ?> transform, String type) {
-      String stepName = genStepName();
-      if (stepNames.put(getCurrentTransform(transform), stepName) != null) {
-        throw new IllegalArgumentException(
-            transform + " already has a name specified");
-      }
-      // Start the next "steps" list item.
-      List<Step> steps = job.getSteps();
-      if (steps == null) {
-        steps = new LinkedList<>();
-        job.setSteps(steps);
-      }
-
-      currentStep = new Step();
-      currentStep.setName(stepName);
-      currentStep.setKind(type);
-      steps.add(currentStep);
-      addInput(PropertyNames.USER_NAME, getFullName(transform));
-      addDisplayData(PropertyNames.DISPLAY_DATA, DisplayData.from(transform));
-    }
-
-    @Override
-    public void addStep(PTransform<?, ? extends PValue> transform, Step original) {
-      Step step = original.clone();
-      String stepName = step.getName();
-      if (stepNames.put(getCurrentTransform(transform), stepName) != null) {
-        throw new IllegalArgumentException(transform + " already has a name specified");
-      }
-
-      Map<String, Object> properties = step.getProperties();
-      if (properties != null) {
-        @Nullable List<Map<String, Object>> outputInfoList = null;
-        try {
-          // TODO: This should be done via a Structs accessor.
-          @Nullable List<Map<String, Object>> list =
-              (List<Map<String, Object>>) properties.get(PropertyNames.OUTPUT_INFO);
-          outputInfoList = list;
-        } catch (Exception e) {
-          throw new RuntimeException("Inconsistent dataflow pipeline translation", e);
-        }
-        if (outputInfoList != null && outputInfoList.size() > 0) {
-          Map<String, Object> firstOutputPort = outputInfoList.get(0);
-          @Nullable String name;
-          try {
-            name = getString(firstOutputPort, PropertyNames.OUTPUT_NAME);
-          } catch (Exception e) {
-            name = null;
-          }
-          if (name != null) {
-            registerOutputName(getOutput(transform), name);
-          }
-        }
-      }
-
-      List<Step> steps = job.getSteps();
-      if (steps == null) {
-        steps = new LinkedList<>();
-        job.setSteps(steps);
-      }
-      currentStep = step;
-      steps.add(step);
-    }
-
-    @Override
-    public void addEncodingInput(Coder<?> coder) {
-      CloudObject encoding = SerializableUtils.ensureSerializable(coder);
-      addObject(getProperties(), PropertyNames.ENCODING, encoding);
-    }
-
-    @Override
-    public void addInput(String name, Boolean value) {
-      addBoolean(getProperties(), name, value);
-    }
-
-    @Override
-    public void addInput(String name, String value) {
-      addString(getProperties(), name, value);
-    }
-
-    @Override
-    public void addInput(String name, Long value) {
-      addLong(getProperties(), name, value);
-    }
-
-    @Override
-    public void addInput(String name, Map<String, Object> elements) {
-      addDictionary(getProperties(), name, elements);
-    }
-
-    @Override
-    public void addInput(String name, List<? extends Map<String, Object>> elements) {
-      addList(getProperties(), name, elements);
-    }
-
-    @Override
-    public void addInput(String name, PInput value) {
-      if (value instanceof PValue) {
-        addInput(name, asOutputReference((PValue) value));
-      } else {
-        throw new IllegalStateException("Input must be a PValue");
-      }
-    }
-
-    @Override
-    public void addOutput(String name, PValue value) {
-      Coder<?> coder;
-      if (value instanceof TypedPValue) {
-        coder = ((TypedPValue<?>) value).getCoder();
-        if (value instanceof PCollection) {
-          // Wrap the PCollection element Coder inside a WindowedValueCoder.
-          coder = WindowedValue.getFullCoder(
-              coder,
-              ((PCollection<?>) value).getWindowingStrategy().getWindowFn().windowCoder());
-        }
-      } else {
-        // No output coder to encode.
-        coder = null;
-      }
-      addOutput(name, value, coder);
-    }
-
-    @Override
-    public void addValueOnlyOutput(String name, PValue value) {
-      Coder<?> coder;
-      if (value instanceof TypedPValue) {
-        coder = ((TypedPValue<?>) value).getCoder();
-        if (value instanceof PCollection) {
-          // Wrap the PCollection element Coder inside a ValueOnly
-          // WindowedValueCoder.
-          coder = WindowedValue.getValueOnlyCoder(coder);
-        }
-      } else {
-        // No output coder to encode.
-        coder = null;
-      }
-      addOutput(name, value, coder);
-    }
-
-    @Override
-    public void addCollectionToSingletonOutput(String name,
-                                               PValue inputValue,
-                                               PValue outputValue) {
-      Coder<?> inputValueCoder =
-          Preconditions.checkNotNull(outputCoders.get(inputValue));
-      // The inputValueCoder for the input PCollection should be some
-      // WindowedValueCoder of the input PCollection's element
-      // coder.
-      Preconditions.checkState(
-          inputValueCoder instanceof WindowedValue.WindowedValueCoder);
-      // The outputValueCoder for the output should be an
-      // IterableCoder of the inputValueCoder. This is a property
-      // of the backend "CollectionToSingleton" step.
-      Coder<?> outputValueCoder = IterableCoder.of(inputValueCoder);
-      addOutput(name, outputValue, outputValueCoder);
-    }
-
-    /**
-     * Adds an output with the given name to the previously added
-     * Dataflow step, producing the specified output {@code PValue}
-     * with the given {@code Coder} (if not {@code null}).
-     */
-    private void addOutput(String name, PValue value, Coder<?> valueCoder) {
-      registerOutputName(value, name);
-
-      Map<String, Object> properties = getProperties();
-      @Nullable List<Map<String, Object>> outputInfoList = null;
-      try {
-        // TODO: This should be done via a Structs accessor.
-        outputInfoList = (List<Map<String, Object>>) properties.get(PropertyNames.OUTPUT_INFO);
-      } catch (Exception e) {
-        throw new RuntimeException("Inconsistent dataflow pipeline translation", e);
-      }
-      if (outputInfoList == null) {
-        outputInfoList = new ArrayList<>();
-        // TODO: This should be done via a Structs accessor.
-        properties.put(PropertyNames.OUTPUT_INFO, outputInfoList);
-      }
-
-      Map<String, Object> outputInfo = new HashMap<>();
-      addString(outputInfo, PropertyNames.OUTPUT_NAME, name);
-      addString(outputInfo, PropertyNames.USER_NAME, value.getName());
-      if (value instanceof PCollection
-          && runner.doesPCollectionRequireIndexedFormat((PCollection<?>) value)) {
-        addBoolean(outputInfo, PropertyNames.USE_INDEXED_FORMAT, true);
-      }
-      if (valueCoder != null) {
-        // Verify that encoding can be decoded, in order to catch serialization
-        // failures as early as possible.
-        CloudObject encoding = SerializableUtils.ensureSerializable(valueCoder);
-        addObject(outputInfo, PropertyNames.ENCODING, encoding);
-        outputCoders.put(value, valueCoder);
-      }
-
-      outputInfoList.add(outputInfo);
-    }
-
-    private void addDisplayData(String name, DisplayData displayData) {
-      List<Map<String, Object>> serializedItems = Lists.newArrayList();
-      for (DisplayData.Item item : displayData.items()) {
-        serializedItems.add(MAPPER.convertValue(item, Map.class));
-      }
-
-      addList(getProperties(), name, serializedItems);
-    }
-
-    @Override
-    public OutputReference asOutputReference(PValue value) {
-      AppliedPTransform<?, ?, ?> transform =
-          value.getProducingTransformInternal();
-      String stepName = stepNames.get(transform);
-      if (stepName == null) {
-        throw new IllegalArgumentException(transform + " doesn't have a name specified");
-      }
-
-      String outputName = outputNames.get(value);
-      if (outputName == null) {
-        throw new IllegalArgumentException(
-            "output " + value + " doesn't have a name specified");
-      }
-
-      return new OutputReference(stepName, outputName);
-    }
-
-    private Map<String, Object> getProperties() {
-      Map<String, Object> properties = currentStep.getProperties();
-      if (properties == null) {
-        properties = new HashMap<>();
-        currentStep.setProperties(properties);
-      }
-      return properties;
-    }
-
-    /**
-     * Returns a fresh Dataflow step name.
-     */
-    private String genStepName() {
-      return "s" + (stepNames.size() + 1);
-    }
-
-    /**
-     * Records the name of the given output PValue,
-     * within its producing transform.
-     */
-    private void registerOutputName(POutput value, String name) {
-      if (outputNames.put(value, name) != null) {
-        throw new IllegalArgumentException(
-            "output " + value + " already has a name specified");
-      }
-    }
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  @Override
-  public String toString() {
-    return "DataflowPipelineTranslator#" + hashCode();
-  }
-
-
-  ///////////////////////////////////////////////////////////////////////////
-
-  static {
-    registerTransformTranslator(
-        View.CreatePCollectionView.class,
-        new TransformTranslator<View.CreatePCollectionView>() {
-          @Override
-          public void translate(
-              View.CreatePCollectionView transform,
-              TranslationContext context) {
-            translateTyped(transform, context);
-          }
-
-          private <ElemT, ViewT> void translateTyped(
-              View.CreatePCollectionView<ElemT, ViewT> transform,
-              TranslationContext context) {
-            context.addStep(transform, "CollectionToSingleton");
-            context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform));
-            context.addCollectionToSingletonOutput(
-                PropertyNames.OUTPUT,
-                context.getInput(transform),
-                context.getOutput(transform));
-          }
-        });
-
-    DataflowPipelineTranslator.registerTransformTranslator(
-        Combine.GroupedValues.class,
-        new DataflowPipelineTranslator.TransformTranslator<Combine.GroupedValues>() {
-          @Override
-          public void translate(
-              Combine.GroupedValues transform,
-              DataflowPipelineTranslator.TranslationContext context) {
-            translateHelper(transform, context);
-          }
-
-          private <K, InputT, OutputT> void translateHelper(
-              final Combine.GroupedValues<K, InputT, OutputT> transform,
-              DataflowPipelineTranslator.TranslationContext context) {
-            context.addStep(transform, "CombineValues");
-            translateInputs(context.getInput(transform), transform.getSideInputs(), context);
-
-            AppliedCombineFn<? super K, ? super InputT, ?, OutputT> fn =
-                transform.getAppliedFn(
-                    context.getInput(transform).getPipeline().getCoderRegistry(),
-                context.getInput(transform).getCoder(),
-                context.getInput(transform).getWindowingStrategy());
-
-            context.addEncodingInput(fn.getAccumulatorCoder());
-            context.addInput(
-                PropertyNames.SERIALIZED_FN,
-                byteArrayToJsonString(serializeToByteArray(fn)));
-            context.addOutput(PropertyNames.OUTPUT, context.getOutput(transform));
-          }
-        });
-
-    registerTransformTranslator(
-        Create.Values.class,
-        new TransformTranslator<Create.Values>() {
-          @Override
-          public void translate(
-              Create.Values transform,
-              TranslationContext context) {
-            createHelper(transform, context);
-          }
-
-          private <T> void createHelper(
-              Create.Values<T> transform,
-              TranslationContext context) {
-            context.addStep(transform, "CreateCollection");
-
-            Coder<T> coder = context.getOutput(transform).getCoder();
-            List<CloudObject> elements = new LinkedList<>();
-            for (T elem : transform.getElements()) {
-              byte[] encodedBytes;
-              try {
-                encodedBytes = encodeToByteArray(coder, elem);
-              } catch (CoderException exn) {
-                // TODO: Put in better element printing:
-                // truncate if too long.
-                throw new IllegalArgumentException(
-                    "Unable to encode element '" + elem + "' of transform '" + transform
-                    + "' using coder '" + coder + "'.",
-                    exn);
-              }
-              String encodedJson = byteArrayToJsonString(encodedBytes);
-              assert Arrays.equals(encodedBytes,
-                                   jsonStringToByteArray(encodedJson));
-              elements.add(CloudObject.forString(encodedJson));
-            }
-            context.addInput(PropertyNames.ELEMENT, elements);
-            context.addValueOnlyOutput(PropertyNames.OUTPUT, context.getOutput(transform));
-          }
-        });
-
-    registerTransformTranslator(
-        Flatten.FlattenPCollectionList.class,
-        new TransformTranslator<Flatten.FlattenPCollectionList>() {
-          @Override
-          public void translate(
-              Flatten.FlattenPCollectionList transform,
-              TranslationContext context) {
-            flattenHelper(transform, context);
-          }
-
-          private <T> void flattenHelper(
-              Flatten.FlattenPCollectionList<T> transform,
-              TranslationContext context) {
-            context.addStep(transform, "Flatten");
-
-            List<OutputReference> inputs = new LinkedList<>();
-            for (PCollection<T> input : context.getInput(transform).getAll()) {
-              inputs.add(context.asOutputReference(input));
-            }
-            context.addInput(PropertyNames.INPUTS, inputs);
-            context.addOutput(PropertyNames.OUTPUT, context.getOutput(transform));
-          }
-        });
-
-    registerTransformTranslator(
-        GroupByKeyAndSortValuesOnly.class,
-        new TransformTranslator<GroupByKeyAndSortValuesOnly>() {
-          @Override
-          public void translate(
-              GroupByKeyAndSortValuesOnly transform,
-              TranslationContext context) {
-            groupByKeyAndSortValuesHelper(transform, context);
-          }
-
-          private <K1, K2, V> void groupByKeyAndSortValuesHelper(
-              GroupByKeyAndSortValuesOnly<K1, K2, V> transform,
-              TranslationContext context) {
-            context.addStep(transform, "GroupByKey");
-            context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform));
-            context.addOutput(PropertyNames.OUTPUT, context.getOutput(transform));
-            context.addInput(PropertyNames.SORT_VALUES, true);
-
-            // TODO: Add support for combiner lifting once the need arises.
-            context.addInput(
-                PropertyNames.DISALLOW_COMBINER_LIFTING, true);
-          }
-        });
-
-    registerTransformTranslator(
-        GroupByKey.class,
-        new TransformTranslator<GroupByKey>() {
-          @Override
-          public void translate(
-              GroupByKey transform,
-              TranslationContext context) {
-            groupByKeyHelper(transform, context);
-          }
-
-          private <K, V> void groupByKeyHelper(
-              GroupByKey<K, V> transform,
-              TranslationContext context) {
-            context.addStep(transform, "GroupByKey");
-            context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform));
-            context.addOutput(PropertyNames.OUTPUT, context.getOutput(transform));
-
-            WindowingStrategy<?, ?> windowingStrategy =
-                context.getInput(transform).getWindowingStrategy();
-            boolean isStreaming =
-                context.getPipelineOptions().as(StreamingOptions.class).isStreaming();
-            boolean disallowCombinerLifting =
-                !windowingStrategy.getWindowFn().isNonMerging()
-                || (isStreaming && !transform.fewKeys())
-                // TODO: Allow combiner lifting on the non-default trigger, as appropriate.
-                || !(windowingStrategy.getTrigger().getSpec() instanceof DefaultTrigger);
-            context.addInput(
-                PropertyNames.DISALLOW_COMBINER_LIFTING, disallowCombinerLifting);
-            context.addInput(
-                PropertyNames.SERIALIZED_FN,
-                byteArrayToJsonString(serializeToByteArray(windowingStrategy)));
-            context.addInput(
-                PropertyNames.IS_MERGING_WINDOW_FN,
-                !windowingStrategy.getWindowFn().isNonMerging());
-          }
-        });
-
-    registerTransformTranslator(
-        ParDo.BoundMulti.class,
-        new TransformTranslator<ParDo.BoundMulti>() {
-          @Override
-          public void translate(
-              ParDo.BoundMulti transform,
-              TranslationContext context) {
-            translateMultiHelper(transform, context);
-          }
-
-          private <InputT, OutputT> void translateMultiHelper(
-              ParDo.BoundMulti<InputT, OutputT> transform,
-              TranslationContext context) {
-            context.addStep(transform, "ParallelDo");
-            translateInputs(context.getInput(transform), transform.getSideInputs(), context);
-            translateFn(transform.getFn(), context.getInput(transform).getWindowingStrategy(),
-                transform.getSideInputs(), context.getInput(transform).getCoder(), context);
-            translateOutputs(context.getOutput(transform), context);
-          }
-        });
-
-    registerTransformTranslator(
-        ParDo.Bound.class,
-        new TransformTranslator<ParDo.Bound>() {
-          @Override
-          public void translate(
-              ParDo.Bound transform,
-              TranslationContext context) {
-            translateSingleHelper(transform, context);
-          }
-
-          private <InputT, OutputT> void translateSingleHelper(
-              ParDo.Bound<InputT, OutputT> transform,
-              TranslationContext context) {
-            context.addStep(transform, "ParallelDo");
-            translateInputs(context.getInput(transform), transform.getSideInputs(), context);
-            translateFn(
-                transform.getFn(),
-                context.getInput(transform).getWindowingStrategy(),
-                transform.getSideInputs(), context.getInput(transform).getCoder(), context);
-            context.addOutput(PropertyNames.OUTPUT, context.getOutput(transform));
-          }
-        });
-
-    registerTransformTranslator(
-        Window.Bound.class,
-        new DataflowPipelineTranslator.TransformTranslator<Window.Bound>() {
-          @Override
-          public void translate(
-              Window.Bound transform, TranslationContext context) {
-            translateHelper(transform, context);
-          }
-
-          private <T> void translateHelper(
-              Window.Bound<T> transform, TranslationContext context) {
-            context.addStep(transform, "Bucket");
-            context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform));
-            context.addOutput(PropertyNames.OUTPUT, context.getOutput(transform));
-
-            WindowingStrategy<?, ?> strategy = context.getOutput(transform).getWindowingStrategy();
-            byte[] serializedBytes = serializeToByteArray(strategy);
-            String serializedJson = byteArrayToJsonString(serializedBytes);
-            assert Arrays.equals(serializedBytes,
-                                 jsonStringToByteArray(serializedJson));
-            context.addInput(PropertyNames.SERIALIZED_FN, serializedJson);
-          }
-        });
-
-    ///////////////////////////////////////////////////////////////////////////
-    // IO Translation.
-
-    registerTransformTranslator(
-        BigQueryIO.Read.Bound.class, new BigQueryIOTranslator.ReadTranslator());
-    registerTransformTranslator(
-        BigQueryIO.Write.Bound.class, new BigQueryIOTranslator.WriteTranslator());
-
-    registerTransformTranslator(
-        PubsubIO.Read.Bound.class, new PubsubIOTranslator.ReadTranslator());
-    registerTransformTranslator(
-        DataflowPipelineRunner.StreamingPubsubIOWrite.class,
-        new PubsubIOTranslator.WriteTranslator());
-
-    registerTransformTranslator(Read.Bounded.class, new ReadTranslator());
-  }
-
-  private static void translateInputs(
-      PCollection<?> input,
-      List<PCollectionView<?>> sideInputs,
-      TranslationContext context) {
-    context.addInput(PropertyNames.PARALLEL_INPUT, input);
-    translateSideInputs(sideInputs, context);
-  }
-
-  // Used for ParDo
-  private static void translateSideInputs(
-      List<PCollectionView<?>> sideInputs,
-      TranslationContext context) {
-    Map<String, Object> nonParInputs = new HashMap<>();
-
-    for (PCollectionView<?> view : sideInputs) {
-      nonParInputs.put(
-          view.getTagInternal().getId(),
-          context.asOutputReference(view));
-    }
-
-    context.addInput(PropertyNames.NON_PARALLEL_INPUTS, nonParInputs);
-  }
-
-  private static void translateFn(
-      DoFn fn,
-      WindowingStrategy windowingStrategy,
-      Iterable<PCollectionView<?>> sideInputs,
-      Coder inputCoder,
-      TranslationContext context) {
-    context.addInput(PropertyNames.USER_FN, fn.getClass().getName());
-    context.addInput(
-        PropertyNames.SERIALIZED_FN,
-        byteArrayToJsonString(serializeToByteArray(
-            new DoFnInfo(fn, windowingStrategy, sideInputs, inputCoder))));
-  }
-
-  private static void translateOutputs(
-      PCollectionTuple outputs,
-      TranslationContext context) {
-    for (Map.Entry<TupleTag<?>, PCollection<?>> entry
-             : outputs.getAll().entrySet()) {
-      TupleTag<?> tag = entry.getKey();
-      PCollection<?> output = entry.getValue();
-      context.addOutput(tag.getId(), output);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowServiceException.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowServiceException.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowServiceException.java
deleted file mode 100644
index 8b9f3f4..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowServiceException.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.runners;
-
-import javax.annotation.Nullable;
-
-/**
- * Signals there was an error retrieving information about a job from the Cloud Dataflow Service.
- */
-public class DataflowServiceException extends DataflowJobException {
-  DataflowServiceException(DataflowPipelineJob job, String message) {
-    this(job, message, null);
-  }
-
-  DataflowServiceException(DataflowPipelineJob job, String message, @Nullable Throwable cause) {
-    super(job, message, cause);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/AssignWindows.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/AssignWindows.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/AssignWindows.java
deleted file mode 100644
index 974c3a9..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/AssignWindows.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.runners.dataflow;
-
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
-import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn;
-import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-
-/**
- * A primitive {@link PTransform} that implements the {@link Window#into(WindowFn)}
- * {@link PTransform}.
- *
- * For an application of {@link Window#into(WindowFn)} that changes the {@link WindowFn}, applies
- * a primitive {@link PTransform} in the Dataflow service.
- *
- * For an application of {@link Window#into(WindowFn)} that does not change the {@link WindowFn},
- * applies an identity {@link ParDo} and sets the windowing strategy of the output
- * {@link PCollection}.
- *
- * For internal use only.
- *
- * @param <T> the type of input element
- */
-public class AssignWindows<T> extends PTransform<PCollection<T>, PCollection<T>> {
-  private final Window.Bound<T> transform;
-
-  /**
-   * Builds an instance of this class from the overriden transform.
-   */
-  @SuppressWarnings("unused") // Used via reflection
-  public AssignWindows(Window.Bound<T> transform) {
-    this.transform = transform;
-  }
-
-  @Override
-  public PCollection<T> apply(PCollection<T> input) {
-    WindowingStrategy<?, ?> outputStrategy =
-        transform.getOutputStrategyInternal(input.getWindowingStrategy());
-    if (transform.getWindowFn() != null) {
-      // If the windowFn changed, we create a primitive, and run the AssignWindows operation here.
-      return PCollection.<T>createPrimitiveOutputInternal(
-                            input.getPipeline(), outputStrategy, input.isBounded());
-    } else {
-      // If the windowFn didn't change, we just run a pass-through transform and then set the
-      // new windowing strategy.
-      return input.apply(ParDo.named("Identity").of(new DoFn<T, T>() {
-        @Override
-        public void processElement(DoFn<T, T>.ProcessContext c) throws Exception {
-          c.output(c.element());
-        }
-      })).setWindowingStrategyInternal(outputStrategy);
-    }
-  }
-
-  @Override
-  public void validate(PCollection<T> input) {
-    transform.validate(input);
-  }
-
-  @Override
-  protected Coder<?> getDefaultOutputCoder(PCollection<T> input) {
-    return input.getCoder();
-  }
-
-  @Override
-  protected String getKindString() {
-    return "Window.Into()";
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/BigQueryIOTranslator.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/BigQueryIOTranslator.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/BigQueryIOTranslator.java
deleted file mode 100644
index 203ce4f..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/BigQueryIOTranslator.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.runners.dataflow;
-
-import com.google.api.client.json.JsonFactory;
-import com.google.api.services.bigquery.model.TableReference;
-import com.google.cloud.dataflow.sdk.coders.TableRowJsonCoder;
-import com.google.cloud.dataflow.sdk.io.BigQueryIO;
-import com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator;
-import com.google.cloud.dataflow.sdk.util.PropertyNames;
-import com.google.cloud.dataflow.sdk.util.Transport;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-/**
- * BigQuery transform support code for the Dataflow backend.
- */
-public class BigQueryIOTranslator {
-  private static final JsonFactory JSON_FACTORY = Transport.getJsonFactory();
-  private static final Logger LOG = LoggerFactory.getLogger(BigQueryIOTranslator.class);
-
-  /**
-   * Implements BigQueryIO Read translation for the Dataflow backend.
-   */
-  public static class ReadTranslator
-      implements DataflowPipelineTranslator.TransformTranslator<BigQueryIO.Read.Bound> {
-
-    @Override
-    public void translate(
-        BigQueryIO.Read.Bound transform, DataflowPipelineTranslator.TranslationContext context) {
-      // Actual translation.
-      context.addStep(transform, "ParallelRead");
-      context.addInput(PropertyNames.FORMAT, "bigquery");
-      context.addInput(PropertyNames.BIGQUERY_EXPORT_FORMAT, "FORMAT_AVRO");
-
-      if (transform.getQuery() != null) {
-        context.addInput(PropertyNames.BIGQUERY_QUERY, transform.getQuery());
-        context.addInput(PropertyNames.BIGQUERY_FLATTEN_RESULTS, transform.getFlattenResults());
-      } else {
-        TableReference table = transform.getTable();
-        if (table.getProjectId() == null) {
-          // If user does not specify a project we assume the table to be located in the project
-          // that owns the Dataflow job.
-          String projectIdFromOptions = context.getPipelineOptions().getProject();
-          LOG.warn(String.format(BigQueryIO.SET_PROJECT_FROM_OPTIONS_WARNING, table.getDatasetId(),
-              table.getDatasetId(), table.getTableId(), projectIdFromOptions));
-          table.setProjectId(projectIdFromOptions);
-        }
-
-        context.addInput(PropertyNames.BIGQUERY_TABLE, table.getTableId());
-        context.addInput(PropertyNames.BIGQUERY_DATASET, table.getDatasetId());
-        if (table.getProjectId() != null) {
-          context.addInput(PropertyNames.BIGQUERY_PROJECT, table.getProjectId());
-        }
-      }
-      context.addValueOnlyOutput(PropertyNames.OUTPUT, context.getOutput(transform));
-    }
-  }
-
-  /**
-   * Implements BigQueryIO Write translation for the Dataflow backend.
-   */
-  public static class WriteTranslator
-      implements DataflowPipelineTranslator.TransformTranslator<BigQueryIO.Write.Bound> {
-
-    @Override
-    public void translate(BigQueryIO.Write.Bound transform,
-                          DataflowPipelineTranslator.TranslationContext context) {
-      if (context.getPipelineOptions().isStreaming()) {
-        // Streaming is handled by the streaming runner.
-        throw new AssertionError(
-            "BigQueryIO is specified to use streaming write in batch mode.");
-      }
-
-      TableReference table = transform.getTable();
-
-      // Actual translation.
-      context.addStep(transform, "ParallelWrite");
-      context.addInput(PropertyNames.FORMAT, "bigquery");
-      context.addInput(PropertyNames.BIGQUERY_TABLE,
-                       table.getTableId());
-      context.addInput(PropertyNames.BIGQUERY_DATASET,
-                       table.getDatasetId());
-      if (table.getProjectId() != null) {
-        context.addInput(PropertyNames.BIGQUERY_PROJECT, table.getProjectId());
-      }
-      if (transform.getSchema() != null) {
-        try {
-          context.addInput(PropertyNames.BIGQUERY_SCHEMA,
-                           JSON_FACTORY.toString(transform.getSchema()));
-        } catch (IOException exn) {
-          throw new IllegalArgumentException("Invalid table schema.", exn);
-        }
-      }
-      context.addInput(
-          PropertyNames.BIGQUERY_CREATE_DISPOSITION,
-          transform.getCreateDisposition().name());
-      context.addInput(
-          PropertyNames.BIGQUERY_WRITE_DISPOSITION,
-          transform.getWriteDisposition().name());
-      // Set sink encoding to TableRowJsonCoder.
-      context.addEncodingInput(
-          WindowedValue.getValueOnlyCoder(TableRowJsonCoder.of()));
-      context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/CustomSources.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/CustomSources.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/CustomSources.java
deleted file mode 100644
index 3473c41..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/CustomSources.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.runners.dataflow;
-
-import static com.google.api.client.util.Base64.encodeBase64String;
-import static com.google.cloud.dataflow.sdk.util.SerializableUtils.serializeToByteArray;
-import static com.google.cloud.dataflow.sdk.util.Structs.addString;
-import static com.google.cloud.dataflow.sdk.util.Structs.addStringList;
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.api.services.dataflow.model.SourceMetadata;
-import com.google.cloud.dataflow.sdk.io.BoundedSource;
-import com.google.cloud.dataflow.sdk.io.Source;
-import com.google.cloud.dataflow.sdk.io.UnboundedSource;
-import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.util.CloudObject;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.ByteString;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.List;
-
-
-/**
- * A helper class for supporting sources defined as {@code Source}.
- *
- * <p>Provides a bridge between the high-level {@code Source} API and the
- * low-level {@code CloudSource} class.
- */
-public class CustomSources {
-  private static final String SERIALIZED_SOURCE = "serialized_source";
-  @VisibleForTesting static final String SERIALIZED_SOURCE_SPLITS = "serialized_source_splits";
-  /**
-   * The current limit on the size of a ReportWorkItemStatus RPC to Google Cloud Dataflow, which
-   * includes the initial splits, is 20 MB.
-   */
-  public static final long DATAFLOW_SPLIT_RESPONSE_API_SIZE_BYTES = 20 * (1 << 20);
-
-  private static final Logger LOG = LoggerFactory.getLogger(CustomSources.class);
-
-  private static final ByteString firstSplitKey = ByteString.copyFromUtf8("0000000000000001");
-
-  public static boolean isFirstUnboundedSourceSplit(ByteString splitKey) {
-    return splitKey.equals(firstSplitKey);
-  }
-
-  private static int getDesiredNumUnboundedSourceSplits(DataflowPipelineOptions options) {
-    if (options.getMaxNumWorkers() > 0) {
-      return options.getMaxNumWorkers();
-    } else if (options.getNumWorkers() > 0) {
-      return options.getNumWorkers() * 3;
-    } else {
-      return 20;
-    }
-  }
-
-  public static com.google.api.services.dataflow.model.Source serializeToCloudSource(
-      Source<?> source, PipelineOptions options) throws Exception {
-    com.google.api.services.dataflow.model.Source cloudSource =
-        new com.google.api.services.dataflow.model.Source();
-    // We ourselves act as the SourceFormat.
-    cloudSource.setSpec(CloudObject.forClass(CustomSources.class));
-    addString(
-        cloudSource.getSpec(), SERIALIZED_SOURCE, encodeBase64String(serializeToByteArray(source)));
-
-    SourceMetadata metadata = new SourceMetadata();
-    if (source instanceof BoundedSource) {
-      BoundedSource<?> boundedSource = (BoundedSource<?>) source;
-      try {
-        metadata.setProducesSortedKeys(boundedSource.producesSortedKeys(options));
-      } catch (Exception e) {
-        LOG.warn("Failed to check if the source produces sorted keys: " + source, e);
-      }
-
-      // Size estimation is best effort so we continue even if it fails here.
-      try {
-        metadata.setEstimatedSizeBytes(boundedSource.getEstimatedSizeBytes(options));
-      } catch (Exception e) {
-        LOG.warn("Size estimation of the source failed: " + source, e);
-      }
-    } else if (source instanceof UnboundedSource) {
-      UnboundedSource<?, ?> unboundedSource = (UnboundedSource<?, ?>) source;
-      metadata.setInfinite(true);
-      List<String> encodedSplits = new ArrayList<>();
-      int desiredNumSplits =
-          getDesiredNumUnboundedSourceSplits(options.as(DataflowPipelineOptions.class));
-      for (UnboundedSource<?, ?> split :
-          unboundedSource.generateInitialSplits(desiredNumSplits, options)) {
-        encodedSplits.add(encodeBase64String(serializeToByteArray(split)));
-      }
-      checkArgument(!encodedSplits.isEmpty(), "UnboundedSources must have at least one split");
-      addStringList(cloudSource.getSpec(), SERIALIZED_SOURCE_SPLITS, encodedSplits);
-    } else {
-      throw new IllegalArgumentException("Unexpected source kind: " + source.getClass());
-    }
-
-    cloudSource.setMetadata(metadata);
-    return cloudSource;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/DataflowAggregatorTransforms.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/DataflowAggregatorTransforms.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/DataflowAggregatorTransforms.java
deleted file mode 100644
index 5c1dd95..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/DataflowAggregatorTransforms.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.runners.dataflow;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.transforms.Aggregator;
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.common.collect.BiMap;
-import com.google.common.collect.HashBiMap;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
-
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Map;
-
-/**
- * A mapping relating {@link Aggregator}s and the {@link PTransform} in which they are used.
- */
-public class DataflowAggregatorTransforms {
-  private final Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorTransforms;
-  private final Multimap<PTransform<?, ?>, AppliedPTransform<?, ?, ?>> transformAppliedTransforms;
-  private final BiMap<AppliedPTransform<?, ?, ?>, String> appliedStepNames;
-
-  public DataflowAggregatorTransforms(
-      Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorTransforms,
-      Map<AppliedPTransform<?, ?, ?>, String> transformStepNames) {
-    this.aggregatorTransforms = aggregatorTransforms;
-    appliedStepNames = HashBiMap.create(transformStepNames);
-
-    transformAppliedTransforms = HashMultimap.create();
-    for (AppliedPTransform<?, ?, ?> appliedTransform : transformStepNames.keySet()) {
-      transformAppliedTransforms.put(appliedTransform.getTransform(), appliedTransform);
-    }
-  }
-
-  /**
-   * Returns true if the provided {@link Aggregator} is used in the constructing {@link Pipeline}.
-   */
-  public boolean contains(Aggregator<?, ?> aggregator) {
-    return aggregatorTransforms.containsKey(aggregator);
-  }
-
-  /**
-   * Gets the step names in which the {@link Aggregator} is used.
-   */
-  public Collection<String> getAggregatorStepNames(Aggregator<?, ?> aggregator) {
-    Collection<String> names = new HashSet<>();
-    Collection<PTransform<?, ?>> transforms = aggregatorTransforms.get(aggregator);
-    for (PTransform<?, ?> transform : transforms) {
-      for (AppliedPTransform<?, ?, ?> applied : transformAppliedTransforms.get(transform)) {
-        names.add(appliedStepNames.get(applied));
-      }
-    }
-    return names;
-  }
-
-  /**
-   * Gets the {@link PTransform} that was assigned the provided step name.
-   */
-  public AppliedPTransform<?, ?, ?> getAppliedTransformForStepName(String stepName) {
-    return appliedStepNames.inverse().get(stepName);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/DataflowMetricUpdateExtractor.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/DataflowMetricUpdateExtractor.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/DataflowMetricUpdateExtractor.java
deleted file mode 100644
index 8de1038..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/DataflowMetricUpdateExtractor.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.runners.dataflow;
-
-import com.google.api.services.dataflow.model.MetricStructuredName;
-import com.google.api.services.dataflow.model.MetricUpdate;
-import com.google.cloud.dataflow.sdk.transforms.Aggregator;
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-import com.google.cloud.dataflow.sdk.transforms.Combine.CombineFn;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Methods for extracting the values of an {@link Aggregator} from a collection of {@link
- * MetricUpdate MetricUpdates}.
- */
-public final class DataflowMetricUpdateExtractor {
-  private static final String STEP_NAME_CONTEXT_KEY = "step";
-  private static final String IS_TENTATIVE_KEY = "tentative";
-
-  private DataflowMetricUpdateExtractor() {
-    // Do not instantiate.
-  }
-
-  /**
-   * Extract the values of the provided {@link Aggregator} at each {@link PTransform} it was used in
-   * according to the provided {@link DataflowAggregatorTransforms} from the given list of {@link
-   * MetricUpdate MetricUpdates}.
-   */
-  public static <OutputT> Map<String, OutputT> fromMetricUpdates(Aggregator<?, OutputT> aggregator,
-      DataflowAggregatorTransforms aggregatorTransforms, List<MetricUpdate> metricUpdates) {
-    Map<String, OutputT> results = new HashMap<>();
-    if (metricUpdates == null) {
-      return results;
-    }
-
-    String aggregatorName = aggregator.getName();
-    Collection<String> aggregatorSteps = aggregatorTransforms.getAggregatorStepNames(aggregator);
-
-    for (MetricUpdate metricUpdate : metricUpdates) {
-      MetricStructuredName metricStructuredName = metricUpdate.getName();
-      Map<String, String> context = metricStructuredName.getContext();
-      if (metricStructuredName.getName().equals(aggregatorName) && context != null
-          && aggregatorSteps.contains(context.get(STEP_NAME_CONTEXT_KEY))) {
-        AppliedPTransform<?, ?, ?> transform =
-            aggregatorTransforms.getAppliedTransformForStepName(
-                context.get(STEP_NAME_CONTEXT_KEY));
-        String fullName = transform.getFullName();
-        // Prefer the tentative (fresher) value if it exists.
-        if (Boolean.parseBoolean(context.get(IS_TENTATIVE_KEY)) || !results.containsKey(fullName)) {
-          results.put(fullName, toValue(aggregator, metricUpdate));
-        }
-      }
-    }
-
-    return results;
-
-  }
-
-  private static <OutputT> OutputT toValue(
-      Aggregator<?, OutputT> aggregator, MetricUpdate metricUpdate) {
-    CombineFn<?, ?, OutputT> combineFn = aggregator.getCombineFn();
-    Class<? super OutputT> outputType = combineFn.getOutputType().getRawType();
-
-    if (outputType.equals(Long.class)) {
-      @SuppressWarnings("unchecked")
-      OutputT asLong = (OutputT) Long.valueOf(toNumber(metricUpdate).longValue());
-      return asLong;
-    }
-    if (outputType.equals(Integer.class)) {
-      @SuppressWarnings("unchecked")
-      OutputT asInt = (OutputT) Integer.valueOf(toNumber(metricUpdate).intValue());
-      return asInt;
-    }
-    if (outputType.equals(Double.class)) {
-      @SuppressWarnings("unchecked")
-      OutputT asDouble = (OutputT) Double.valueOf(toNumber(metricUpdate).doubleValue());
-      return asDouble;
-    }
-    throw new UnsupportedOperationException(
-        "Unsupported Output Type " + outputType + " in aggregator " + aggregator);
-  }
-
-  private static Number toNumber(MetricUpdate update) {
-    if (update.getScalar() instanceof Number) {
-      return (Number) update.getScalar();
-    }
-    throw new IllegalArgumentException(
-        "Metric Update " + update + " does not have a numeric scalar");
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/PubsubIOTranslator.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/PubsubIOTranslator.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/PubsubIOTranslator.java
deleted file mode 100644
index b54d5c6..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/PubsubIOTranslator.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.runners.dataflow;
-
-import com.google.cloud.dataflow.sdk.io.PubsubIO;
-import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
-import com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator.TransformTranslator;
-import com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator.TranslationContext;
-import com.google.cloud.dataflow.sdk.util.PropertyNames;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-
-/**
- * Pubsub transform support code for the Dataflow backend.
- */
-public class PubsubIOTranslator {
-
-  /**
-   * Implements PubsubIO Read translation for the Dataflow backend.
-   */
-  public static class ReadTranslator<T> implements TransformTranslator<PubsubIO.Read.Bound<T>> {
-    @Override
-    @SuppressWarnings({"rawtypes", "unchecked"})
-    public void translate(
-        PubsubIO.Read.Bound transform,
-        TranslationContext context) {
-      translateReadHelper(transform, context);
-    }
-
-    private <T> void translateReadHelper(
-        PubsubIO.Read.Bound<T> transform,
-        TranslationContext context) {
-      if (!context.getPipelineOptions().isStreaming()) {
-        throw new IllegalArgumentException(
-            "PubsubIO.Read can only be used with the Dataflow streaming runner.");
-      }
-
-      context.addStep(transform, "ParallelRead");
-      context.addInput(PropertyNames.FORMAT, "pubsub");
-      if (transform.getTopic() != null) {
-        context.addInput(PropertyNames.PUBSUB_TOPIC, transform.getTopic().asV1Beta1Path());
-      }
-      if (transform.getSubscription() != null) {
-        context.addInput(
-            PropertyNames.PUBSUB_SUBSCRIPTION, transform.getSubscription().asV1Beta1Path());
-      }
-      if (transform.getTimestampLabel() != null) {
-        context.addInput(PropertyNames.PUBSUB_TIMESTAMP_LABEL, transform.getTimestampLabel());
-      }
-      if (transform.getIdLabel() != null) {
-        context.addInput(PropertyNames.PUBSUB_ID_LABEL, transform.getIdLabel());
-      }
-      context.addValueOnlyOutput(PropertyNames.OUTPUT, context.getOutput(transform));
-    }
-  }
-
-  /**
-   * Implements PubsubIO Write translation for the Dataflow backend.
-   */
-  public static class WriteTranslator<T>
-      implements TransformTranslator<DataflowPipelineRunner.StreamingPubsubIOWrite<T>> {
-
-    @Override
-    @SuppressWarnings({"rawtypes", "unchecked"})
-    public void translate(
-        DataflowPipelineRunner.StreamingPubsubIOWrite transform,
-        TranslationContext context) {
-      translateWriteHelper(transform, context);
-    }
-
-    private <T> void translateWriteHelper(
-        DataflowPipelineRunner.StreamingPubsubIOWrite<T> customTransform,
-        TranslationContext context) {
-      if (!context.getPipelineOptions().isStreaming()) {
-        throw new IllegalArgumentException(
-            "PubsubIO.Write is non-primitive for the Dataflow batch runner.");
-      }
-
-      PubsubIO.Write.Bound<T> transform = customTransform.getOverriddenTransform();
-
-      context.addStep(customTransform, "ParallelWrite");
-      context.addInput(PropertyNames.FORMAT, "pubsub");
-      context.addInput(PropertyNames.PUBSUB_TOPIC, transform.getTopic().asV1Beta1Path());
-      if (transform.getTimestampLabel() != null) {
-        context.addInput(PropertyNames.PUBSUB_TIMESTAMP_LABEL, transform.getTimestampLabel());
-      }
-      if (transform.getIdLabel() != null) {
-        context.addInput(PropertyNames.PUBSUB_ID_LABEL, transform.getIdLabel());
-      }
-      context.addEncodingInput(WindowedValue.getValueOnlyCoder(transform.getCoder()));
-      context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(customTransform));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/ReadTranslator.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/ReadTranslator.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/ReadTranslator.java
deleted file mode 100644
index 4998af3..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/ReadTranslator.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.runners.dataflow;
-
-import static com.google.cloud.dataflow.sdk.util.Structs.addBoolean;
-import static com.google.cloud.dataflow.sdk.util.Structs.addDictionary;
-import static com.google.cloud.dataflow.sdk.util.Structs.addLong;
-
-import com.google.api.services.dataflow.model.SourceMetadata;
-import com.google.cloud.dataflow.sdk.io.FileBasedSource;
-import com.google.cloud.dataflow.sdk.io.Read;
-import com.google.cloud.dataflow.sdk.io.Source;
-import com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator;
-import com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator.TransformTranslator;
-import com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator.TranslationContext;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.util.PropertyNames;
-import com.google.cloud.dataflow.sdk.values.PValue;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Translator for the {@code Read} {@code PTransform} for the Dataflow back-end.
- */
-public class ReadTranslator implements TransformTranslator<Read.Bounded<?>> {
-  @Override
-  public void translate(Read.Bounded<?> transform, TranslationContext context) {
-    translateReadHelper(transform.getSource(), transform, context);
-  }
-
-  public static <T> void translateReadHelper(Source<T> source,
-      PTransform<?, ? extends PValue> transform,
-      DataflowPipelineTranslator.TranslationContext context) {
-    try {
-      // TODO: Move this validation out of translation once IOChannelUtils is portable
-      // and can be reconstructed on the worker.
-      if (source instanceof FileBasedSource) {
-        String filePatternOrSpec = ((FileBasedSource<?>) source).getFileOrPatternSpec();
-        context.getPipelineOptions()
-               .getPathValidator()
-               .validateInputFilePatternSupported(filePatternOrSpec);
-      }
-
-      context.addStep(transform, "ParallelRead");
-      context.addInput(PropertyNames.FORMAT, PropertyNames.CUSTOM_SOURCE_FORMAT);
-      context.addInput(
-          PropertyNames.SOURCE_STEP_INPUT,
-          cloudSourceToDictionary(
-              CustomSources.serializeToCloudSource(source, context.getPipelineOptions())));
-      context.addValueOnlyOutput(PropertyNames.OUTPUT, context.getOutput(transform));
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  // Represents a cloud Source as a dictionary for encoding inside the {@code SOURCE_STEP_INPUT}
-  // property of CloudWorkflowStep.input.
-  private static Map<String, Object> cloudSourceToDictionary(
-      com.google.api.services.dataflow.model.Source source) {
-    // Do not translate encoding - the source's encoding is translated elsewhere
-    // to the step's output info.
-    Map<String, Object> res = new HashMap<>();
-    addDictionary(res, PropertyNames.SOURCE_SPEC, source.getSpec());
-    if (source.getMetadata() != null) {
-      addDictionary(res, PropertyNames.SOURCE_METADATA,
-          cloudSourceMetadataToDictionary(source.getMetadata()));
-    }
-    if (source.getDoesNotNeedSplitting() != null) {
-      addBoolean(
-          res, PropertyNames.SOURCE_DOES_NOT_NEED_SPLITTING, source.getDoesNotNeedSplitting());
-    }
-    return res;
-  }
-
-  private static Map<String, Object> cloudSourceMetadataToDictionary(SourceMetadata metadata) {
-    Map<String, Object> res = new HashMap<>();
-    if (metadata.getProducesSortedKeys() != null) {
-      addBoolean(res, PropertyNames.SOURCE_PRODUCES_SORTED_KEYS, metadata.getProducesSortedKeys());
-    }
-    if (metadata.getEstimatedSizeBytes() != null) {
-      addLong(res, PropertyNames.SOURCE_ESTIMATED_SIZE_BYTES, metadata.getEstimatedSizeBytes());
-    }
-    if (metadata.getInfinite() != null) {
-      addBoolean(res, PropertyNames.SOURCE_IS_INFINITE, metadata.getInfinite());
-    }
-    return res;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/package-info.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/package-info.java
deleted file mode 100644
index d1484ce..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Implementation of the {@link com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner}.
- */
-package com.google.cloud.dataflow.sdk.runners.dataflow;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineOptions.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineOptions.java
deleted file mode 100644
index 2f920a6..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineOptions.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.testing;
-
-import com.google.cloud.dataflow.sdk.options.BlockingDataflowPipelineOptions;
-
-/**
- * A set of options used to configure the {@link TestPipeline}.
- */
-public interface TestDataflowPipelineOptions extends BlockingDataflowPipelineOptions {
-
-}