You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/03/24 03:47:58 UTC

[34/67] [partial] incubator-beam git commit: Directory reorganization

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunnerHooks.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunnerHooks.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunnerHooks.java
deleted file mode 100644
index b9a0293..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunnerHooks.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.runners;
-
-import com.google.api.services.dataflow.model.Environment;
-import com.google.cloud.dataflow.sdk.annotations.Experimental;
-
-/**
- * An instance of this class can be passed to the
- * {@link DataflowPipelineRunner} to add user defined hooks to be
- * invoked at various times during pipeline execution.
- */
-@Experimental
-public class DataflowPipelineRunnerHooks {
-  /**
-   * Allows the user to modify the environment of their job before their job is submitted
-   * to the service for execution.
-   *
-   * @param environment The environment of the job. Users can make change to this instance in order
-   *     to change the environment with which their job executes on the service.
-   */
-  public void modifyEnvironmentBeforeSubmission(Environment environment) {}
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java
deleted file mode 100644
index 155c454..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java
+++ /dev/null
@@ -1,1104 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.runners;
-
-import static com.google.cloud.dataflow.sdk.util.CoderUtils.encodeToByteArray;
-import static com.google.cloud.dataflow.sdk.util.SerializableUtils.serializeToByteArray;
-import static com.google.cloud.dataflow.sdk.util.StringUtils.byteArrayToJsonString;
-import static com.google.cloud.dataflow.sdk.util.StringUtils.jsonStringToByteArray;
-import static com.google.cloud.dataflow.sdk.util.Structs.addBoolean;
-import static com.google.cloud.dataflow.sdk.util.Structs.addDictionary;
-import static com.google.cloud.dataflow.sdk.util.Structs.addList;
-import static com.google.cloud.dataflow.sdk.util.Structs.addLong;
-import static com.google.cloud.dataflow.sdk.util.Structs.addObject;
-import static com.google.cloud.dataflow.sdk.util.Structs.addString;
-import static com.google.cloud.dataflow.sdk.util.Structs.getString;
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.api.services.dataflow.model.AutoscalingSettings;
-import com.google.api.services.dataflow.model.DataflowPackage;
-import com.google.api.services.dataflow.model.Disk;
-import com.google.api.services.dataflow.model.Environment;
-import com.google.api.services.dataflow.model.Job;
-import com.google.api.services.dataflow.model.Step;
-import com.google.api.services.dataflow.model.WorkerPool;
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.Pipeline.PipelineVisitor;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.CoderException;
-import com.google.cloud.dataflow.sdk.coders.IterableCoder;
-import com.google.cloud.dataflow.sdk.io.BigQueryIO;
-import com.google.cloud.dataflow.sdk.io.PubsubIO;
-import com.google.cloud.dataflow.sdk.io.Read;
-import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
-import com.google.cloud.dataflow.sdk.options.StreamingOptions;
-import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner.GroupByKeyAndSortValuesOnly;
-import com.google.cloud.dataflow.sdk.runners.dataflow.BigQueryIOTranslator;
-import com.google.cloud.dataflow.sdk.runners.dataflow.PubsubIOTranslator;
-import com.google.cloud.dataflow.sdk.runners.dataflow.ReadTranslator;
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-import com.google.cloud.dataflow.sdk.transforms.Combine;
-import com.google.cloud.dataflow.sdk.transforms.Create;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.Flatten;
-import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.View;
-import com.google.cloud.dataflow.sdk.transforms.display.DisplayData;
-import com.google.cloud.dataflow.sdk.transforms.windowing.DefaultTrigger;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
-import com.google.cloud.dataflow.sdk.util.AppliedCombineFn;
-import com.google.cloud.dataflow.sdk.util.CloudObject;
-import com.google.cloud.dataflow.sdk.util.DoFnInfo;
-import com.google.cloud.dataflow.sdk.util.OutputReference;
-import com.google.cloud.dataflow.sdk.util.PropertyNames;
-import com.google.cloud.dataflow.sdk.util.SerializableUtils;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PCollectionTuple;
-import com.google.cloud.dataflow.sdk.values.PCollectionView;
-import com.google.cloud.dataflow.sdk.values.PInput;
-import com.google.cloud.dataflow.sdk.values.POutput;
-import com.google.cloud.dataflow.sdk.values.PValue;
-import com.google.cloud.dataflow.sdk.values.TupleTag;
-import com.google.cloud.dataflow.sdk.values.TypedPValue;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import javax.annotation.Nullable;
-
-/**
- * {@link DataflowPipelineTranslator} knows how to translate {@link Pipeline} objects
- * into Cloud Dataflow Service API {@link Job}s.
- */
-@SuppressWarnings({"rawtypes", "unchecked"})
-public class DataflowPipelineTranslator {
-  // Must be kept in sync with their internal counterparts.
-  private static final Logger LOG = LoggerFactory.getLogger(DataflowPipelineTranslator.class);
-  private static final ObjectMapper MAPPER = new ObjectMapper();
-
-  /**
-   * A map from {@link PTransform} subclass to the corresponding
-   * {@link TransformTranslator} to use to translate that transform.
-   *
-   * <p>A static map that contains system-wide defaults.
-   */
-  private static Map<Class, TransformTranslator> transformTranslators =
-      new HashMap<>();
-
-  /** Provided configuration options. */
-  private final DataflowPipelineOptions options;
-
-  /**
-   * Constructs a translator from the provided options.
-   *
-   * @param options Properties that configure the translator.
-   *
-   * @return The newly created translator.
-   */
-  public static DataflowPipelineTranslator fromOptions(
-      DataflowPipelineOptions options) {
-    return new DataflowPipelineTranslator(options);
-  }
-
-  private DataflowPipelineTranslator(DataflowPipelineOptions options) {
-    this.options = options;
-  }
-
-  /**
-   * Translates a {@link Pipeline} into a {@code JobSpecification}.
-   */
-  public JobSpecification translate(
-      Pipeline pipeline,
-      DataflowPipelineRunner runner,
-      List<DataflowPackage> packages) {
-
-    Translator translator = new Translator(pipeline, runner);
-    Job result = translator.translate(packages);
-    return new JobSpecification(result, Collections.unmodifiableMap(translator.stepNames));
-  }
-
-  /**
-   * The result of a job translation.
-   *
-   * <p>Used to pass the result {@link Job} and any state that was used to construct the job that
-   * may be of use to other classes (eg the {@link PTransform} to StepName mapping).
-   */
-  public static class JobSpecification {
-    private final Job job;
-    private final Map<AppliedPTransform<?, ?, ?>, String> stepNames;
-
-    public JobSpecification(Job job, Map<AppliedPTransform<?, ?, ?>, String> stepNames) {
-      this.job = job;
-      this.stepNames = stepNames;
-    }
-
-    public Job getJob() {
-      return job;
-    }
-
-    /**
-     * Returns the mapping of {@link AppliedPTransform AppliedPTransforms} to the internal step
-     * name for that {@code AppliedPTransform}.
-     */
-    public Map<AppliedPTransform<?, ?, ?>, String> getStepNames() {
-      return stepNames;
-    }
-  }
-
-  /**
-   * Renders a {@link Job} as a string.
-   */
-  public static String jobToString(Job job) {
-    try {
-      return MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(job);
-    } catch (JsonProcessingException exc) {
-      throw new IllegalStateException("Failed to render Job as String.", exc);
-    }
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  /**
-   * Records that instances of the specified PTransform class
-   * should be translated by default by the corresponding
-   * {@link TransformTranslator}.
-   */
-  public static <TransformT extends PTransform> void registerTransformTranslator(
-      Class<TransformT> transformClass,
-      TransformTranslator<? extends TransformT> transformTranslator) {
-    if (transformTranslators.put(transformClass, transformTranslator) != null) {
-      throw new IllegalArgumentException(
-          "defining multiple translators for " + transformClass);
-    }
-  }
-
-  /**
-   * Returns the {@link TransformTranslator} to use for instances of the
-   * specified PTransform class, or null if none registered.
-   */
-  public <TransformT extends PTransform>
-      TransformTranslator<TransformT> getTransformTranslator(Class<TransformT> transformClass) {
-    return transformTranslators.get(transformClass);
-  }
-
-  /**
-   * A {@link TransformTranslator} knows how to translate
-   * a particular subclass of {@link PTransform} for the
-   * Cloud Dataflow service. It does so by
-   * mutating the {@link TranslationContext}.
-   */
-  public interface TransformTranslator<TransformT extends PTransform> {
-    public void translate(TransformT transform,
-                          TranslationContext context);
-  }
-
-  /**
-   * The interface provided to registered callbacks for interacting
-   * with the {@link DataflowPipelineRunner}, including reading and writing the
-   * values of {@link PCollection}s and side inputs ({@link PCollectionView}s).
-   */
-  public interface TranslationContext {
-    /**
-     * Returns the configured pipeline options.
-     */
-    DataflowPipelineOptions getPipelineOptions();
-
-    /**
-     * Returns the input of the currently being translated transform.
-     */
-    <InputT extends PInput> InputT getInput(PTransform<InputT, ?> transform);
-
-    /**
-     * Returns the output of the currently being translated transform.
-     */
-    <OutputT extends POutput> OutputT getOutput(PTransform<?, OutputT> transform);
-
-    /**
-     * Returns the full name of the currently being translated transform.
-     */
-    String getFullName(PTransform<?, ?> transform);
-
-    /**
-     * Adds a step to the Dataflow workflow for the given transform, with
-     * the given Dataflow step type.
-     * This step becomes "current" for the purpose of {@link #addInput} and
-     * {@link #addOutput}.
-     */
-    public void addStep(PTransform<?, ?> transform, String type);
-
-    /**
-     * Adds a pre-defined step to the Dataflow workflow. The given PTransform should be
-     * consistent with the Step, in terms of input, output and coder types.
-     *
-     * <p>This is a low-level operation, when using this method it is up to
-     * the caller to ensure that names do not collide.
-     */
-    public void addStep(PTransform<?, ? extends PValue> transform, Step step);
-
-    /**
-     * Sets the encoding for the current Dataflow step.
-     */
-    public void addEncodingInput(Coder<?> value);
-
-    /**
-     * Adds an input with the given name and value to the current
-     * Dataflow step.
-     */
-    public void addInput(String name, Boolean value);
-
-    /**
-     * Adds an input with the given name and value to the current
-     * Dataflow step.
-     */
-    public void addInput(String name, String value);
-
-    /**
-     * Adds an input with the given name and value to the current
-     * Dataflow step.
-     */
-    public void addInput(String name, Long value);
-
-    /**
-     * Adds an input with the given name to the previously added Dataflow
-     * step, coming from the specified input PValue.
-     */
-    public void addInput(String name, PInput value);
-
-    /**
-     * Adds an input that is a dictionary of strings to objects.
-     */
-    public void addInput(String name, Map<String, Object> elements);
-
-    /**
-     * Adds an input that is a list of objects.
-     */
-    public void addInput(String name, List<? extends Map<String, Object>> elements);
-
-    /**
-     * Adds an output with the given name to the previously added
-     * Dataflow step, producing the specified output {@code PValue},
-     * including its {@code Coder} if a {@code TypedPValue}.  If the
-     * {@code PValue} is a {@code PCollection}, wraps its coder inside
-     * a {@code WindowedValueCoder}.
-     */
-    public void addOutput(String name, PValue value);
-
-    /**
-     * Adds an output with the given name to the previously added
-     * Dataflow step, producing the specified output {@code PValue},
-     * including its {@code Coder} if a {@code TypedPValue}.  If the
-     * {@code PValue} is a {@code PCollection}, wraps its coder inside
-     * a {@code ValueOnlyCoder}.
-     */
-    public void addValueOnlyOutput(String name, PValue value);
-
-    /**
-     * Adds an output with the given name to the previously added
-     * CollectionToSingleton Dataflow step, consuming the specified
-     * input {@code PValue} and producing the specified output
-     * {@code PValue}.  This step requires special treatment for its
-     * output encoding.
-     */
-    public void addCollectionToSingletonOutput(String name,
-                                               PValue inputValue,
-                                               PValue outputValue);
-
-    /**
-     * Encode a PValue reference as an output reference.
-     */
-    public OutputReference asOutputReference(PValue value);
-  }
-
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  /**
-   * Translates a Pipeline into the Dataflow representation.
-   */
-  class Translator implements PipelineVisitor, TranslationContext {
-    /** The Pipeline to translate. */
-    private final Pipeline pipeline;
-
-    /** The runner which will execute the pipeline. */
-    private final DataflowPipelineRunner runner;
-
-    /** The Cloud Dataflow Job representation. */
-    private final Job job = new Job();
-
-    /**
-     * Translator is stateful, as addProperty calls refer to the current step.
-     */
-    private Step currentStep;
-
-    /**
-     * A Map from AppliedPTransform to their unique Dataflow step names.
-     */
-    private final Map<AppliedPTransform<?, ?, ?>, String> stepNames = new HashMap<>();
-
-    /**
-     * A Map from PValues to their output names used by their producer
-     * Dataflow steps.
-     */
-    private final Map<POutput, String> outputNames = new HashMap<>();
-
-    /**
-     * A Map from PValues to the Coders used for them.
-     */
-    private final Map<POutput, Coder<?>> outputCoders = new HashMap<>();
-
-    /**
-     * The transform currently being applied.
-     */
-    private AppliedPTransform<?, ?, ?> currentTransform;
-
-    /**
-     * Constructs a Translator that will translate the specified
-     * Pipeline into Dataflow objects.
-     */
-    public Translator(Pipeline pipeline, DataflowPipelineRunner runner) {
-      this.pipeline = pipeline;
-      this.runner = runner;
-    }
-
-    /**
-     * Translates this Translator's pipeline onto its writer.
-     * @return a Job definition filled in with the type of job, the environment,
-     * and the job steps.
-     */
-    public Job translate(List<DataflowPackage> packages) {
-      job.setName(options.getJobName().toLowerCase());
-
-      Environment environment = new Environment();
-      job.setEnvironment(environment);
-
-      try {
-        environment.setSdkPipelineOptions(
-            MAPPER.readValue(MAPPER.writeValueAsBytes(options), Map.class));
-      } catch (IOException e) {
-        throw new IllegalArgumentException(
-            "PipelineOptions specified failed to serialize to JSON.", e);
-      }
-
-      WorkerPool workerPool = new WorkerPool();
-
-      if (options.getTeardownPolicy() != null) {
-        workerPool.setTeardownPolicy(options.getTeardownPolicy().getTeardownPolicyName());
-      }
-
-      if (options.isStreaming()) {
-        job.setType("JOB_TYPE_STREAMING");
-      } else {
-        job.setType("JOB_TYPE_BATCH");
-        workerPool.setDiskType(options.getWorkerDiskType());
-      }
-
-      if (options.getWorkerMachineType() != null) {
-        workerPool.setMachineType(options.getWorkerMachineType());
-      }
-
-      workerPool.setPackages(packages);
-      workerPool.setNumWorkers(options.getNumWorkers());
-
-      if (options.isStreaming()) {
-        // Use separate data disk for streaming.
-        Disk disk = new Disk();
-        disk.setDiskType(options.getWorkerDiskType());
-        workerPool.setDataDisks(Collections.singletonList(disk));
-      }
-      if (!Strings.isNullOrEmpty(options.getZone())) {
-        workerPool.setZone(options.getZone());
-      }
-      if (!Strings.isNullOrEmpty(options.getNetwork())) {
-        workerPool.setNetwork(options.getNetwork());
-      }
-      if (!Strings.isNullOrEmpty(options.getSubnetwork())) {
-        workerPool.setSubnetwork(options.getSubnetwork());
-      }
-      if (options.getDiskSizeGb() > 0) {
-        workerPool.setDiskSizeGb(options.getDiskSizeGb());
-      }
-      AutoscalingSettings settings = new AutoscalingSettings();
-      if (options.getAutoscalingAlgorithm() != null) {
-        settings.setAlgorithm(options.getAutoscalingAlgorithm().getAlgorithm());
-      }
-      settings.setMaxNumWorkers(options.getMaxNumWorkers());
-      workerPool.setAutoscalingSettings(settings);
-
-      List<WorkerPool> workerPools = new LinkedList<>();
-
-      workerPools.add(workerPool);
-      environment.setWorkerPools(workerPools);
-
-      pipeline.traverseTopologically(this);
-      return job;
-    }
-
-    @Override
-    public DataflowPipelineOptions getPipelineOptions() {
-      return options;
-    }
-
-    @Override
-    public <InputT extends PInput> InputT getInput(PTransform<InputT, ?> transform) {
-      return (InputT) getCurrentTransform(transform).getInput();
-    }
-
-    @Override
-    public <OutputT extends POutput> OutputT getOutput(PTransform<?, OutputT> transform) {
-      return (OutputT) getCurrentTransform(transform).getOutput();
-    }
-
-    @Override
-    public String getFullName(PTransform<?, ?> transform) {
-      return getCurrentTransform(transform).getFullName();
-    }
-
-    private AppliedPTransform<?, ?, ?> getCurrentTransform(PTransform<?, ?> transform) {
-      checkArgument(
-          currentTransform != null && currentTransform.getTransform() == transform,
-          "can only be called with current transform");
-      return currentTransform;
-    }
-
-    @Override
-    public void enterCompositeTransform(TransformTreeNode node) {
-    }
-
-    @Override
-    public void leaveCompositeTransform(TransformTreeNode node) {
-    }
-
-    @Override
-    public void visitTransform(TransformTreeNode node) {
-      PTransform<?, ?> transform = node.getTransform();
-      TransformTranslator translator =
-          getTransformTranslator(transform.getClass());
-      if (translator == null) {
-        throw new IllegalStateException(
-            "no translator registered for " + transform);
-      }
-      LOG.debug("Translating {}", transform);
-      currentTransform = AppliedPTransform.of(
-          node.getFullName(), node.getInput(), node.getOutput(), (PTransform) transform);
-      translator.translate(transform, this);
-      currentTransform = null;
-    }
-
-    @Override
-    public void visitValue(PValue value, TransformTreeNode producer) {
-      LOG.debug("Checking translation of {}", value);
-      if (value.getProducingTransformInternal() == null) {
-        throw new RuntimeException(
-            "internal error: expecting a PValue "
-            + "to have a producingTransform");
-      }
-      if (!producer.isCompositeNode()) {
-        // Primitive transforms are the only ones assigned step names.
-        asOutputReference(value);
-      }
-    }
-
-    @Override
-    public void addStep(PTransform<?, ?> transform, String type) {
-      String stepName = genStepName();
-      if (stepNames.put(getCurrentTransform(transform), stepName) != null) {
-        throw new IllegalArgumentException(
-            transform + " already has a name specified");
-      }
-      // Start the next "steps" list item.
-      List<Step> steps = job.getSteps();
-      if (steps == null) {
-        steps = new LinkedList<>();
-        job.setSteps(steps);
-      }
-
-      currentStep = new Step();
-      currentStep.setName(stepName);
-      currentStep.setKind(type);
-      steps.add(currentStep);
-      addInput(PropertyNames.USER_NAME, getFullName(transform));
-      addDisplayData(PropertyNames.DISPLAY_DATA, DisplayData.from(transform));
-    }
-
-    @Override
-    public void addStep(PTransform<?, ? extends PValue> transform, Step original) {
-      Step step = original.clone();
-      String stepName = step.getName();
-      if (stepNames.put(getCurrentTransform(transform), stepName) != null) {
-        throw new IllegalArgumentException(transform + " already has a name specified");
-      }
-
-      Map<String, Object> properties = step.getProperties();
-      if (properties != null) {
-        @Nullable List<Map<String, Object>> outputInfoList = null;
-        try {
-          // TODO: This should be done via a Structs accessor.
-          @Nullable List<Map<String, Object>> list =
-              (List<Map<String, Object>>) properties.get(PropertyNames.OUTPUT_INFO);
-          outputInfoList = list;
-        } catch (Exception e) {
-          throw new RuntimeException("Inconsistent dataflow pipeline translation", e);
-        }
-        if (outputInfoList != null && outputInfoList.size() > 0) {
-          Map<String, Object> firstOutputPort = outputInfoList.get(0);
-          @Nullable String name;
-          try {
-            name = getString(firstOutputPort, PropertyNames.OUTPUT_NAME);
-          } catch (Exception e) {
-            name = null;
-          }
-          if (name != null) {
-            registerOutputName(getOutput(transform), name);
-          }
-        }
-      }
-
-      List<Step> steps = job.getSteps();
-      if (steps == null) {
-        steps = new LinkedList<>();
-        job.setSteps(steps);
-      }
-      currentStep = step;
-      steps.add(step);
-    }
-
-    @Override
-    public void addEncodingInput(Coder<?> coder) {
-      CloudObject encoding = SerializableUtils.ensureSerializable(coder);
-      addObject(getProperties(), PropertyNames.ENCODING, encoding);
-    }
-
-    @Override
-    public void addInput(String name, Boolean value) {
-      addBoolean(getProperties(), name, value);
-    }
-
-    @Override
-    public void addInput(String name, String value) {
-      addString(getProperties(), name, value);
-    }
-
-    @Override
-    public void addInput(String name, Long value) {
-      addLong(getProperties(), name, value);
-    }
-
-    @Override
-    public void addInput(String name, Map<String, Object> elements) {
-      addDictionary(getProperties(), name, elements);
-    }
-
-    @Override
-    public void addInput(String name, List<? extends Map<String, Object>> elements) {
-      addList(getProperties(), name, elements);
-    }
-
-    @Override
-    public void addInput(String name, PInput value) {
-      if (value instanceof PValue) {
-        addInput(name, asOutputReference((PValue) value));
-      } else {
-        throw new IllegalStateException("Input must be a PValue");
-      }
-    }
-
-    @Override
-    public void addOutput(String name, PValue value) {
-      Coder<?> coder;
-      if (value instanceof TypedPValue) {
-        coder = ((TypedPValue<?>) value).getCoder();
-        if (value instanceof PCollection) {
-          // Wrap the PCollection element Coder inside a WindowedValueCoder.
-          coder = WindowedValue.getFullCoder(
-              coder,
-              ((PCollection<?>) value).getWindowingStrategy().getWindowFn().windowCoder());
-        }
-      } else {
-        // No output coder to encode.
-        coder = null;
-      }
-      addOutput(name, value, coder);
-    }
-
-    @Override
-    public void addValueOnlyOutput(String name, PValue value) {
-      Coder<?> coder;
-      if (value instanceof TypedPValue) {
-        coder = ((TypedPValue<?>) value).getCoder();
-        if (value instanceof PCollection) {
-          // Wrap the PCollection element Coder inside a ValueOnly
-          // WindowedValueCoder.
-          coder = WindowedValue.getValueOnlyCoder(coder);
-        }
-      } else {
-        // No output coder to encode.
-        coder = null;
-      }
-      addOutput(name, value, coder);
-    }
-
-    @Override
-    public void addCollectionToSingletonOutput(String name,
-                                               PValue inputValue,
-                                               PValue outputValue) {
-      Coder<?> inputValueCoder =
-          Preconditions.checkNotNull(outputCoders.get(inputValue));
-      // The inputValueCoder for the input PCollection should be some
-      // WindowedValueCoder of the input PCollection's element
-      // coder.
-      Preconditions.checkState(
-          inputValueCoder instanceof WindowedValue.WindowedValueCoder);
-      // The outputValueCoder for the output should be an
-      // IterableCoder of the inputValueCoder. This is a property
-      // of the backend "CollectionToSingleton" step.
-      Coder<?> outputValueCoder = IterableCoder.of(inputValueCoder);
-      addOutput(name, outputValue, outputValueCoder);
-    }
-
-    /**
-     * Adds an output with the given name to the previously added
-     * Dataflow step, producing the specified output {@code PValue}
-     * with the given {@code Coder} (if not {@code null}).
-     */
-    private void addOutput(String name, PValue value, Coder<?> valueCoder) {
-      registerOutputName(value, name);
-
-      Map<String, Object> properties = getProperties();
-      @Nullable List<Map<String, Object>> outputInfoList = null;
-      try {
-        // TODO: This should be done via a Structs accessor.
-        outputInfoList = (List<Map<String, Object>>) properties.get(PropertyNames.OUTPUT_INFO);
-      } catch (Exception e) {
-        throw new RuntimeException("Inconsistent dataflow pipeline translation", e);
-      }
-      if (outputInfoList == null) {
-        outputInfoList = new ArrayList<>();
-        // TODO: This should be done via a Structs accessor.
-        properties.put(PropertyNames.OUTPUT_INFO, outputInfoList);
-      }
-
-      Map<String, Object> outputInfo = new HashMap<>();
-      addString(outputInfo, PropertyNames.OUTPUT_NAME, name);
-      addString(outputInfo, PropertyNames.USER_NAME, value.getName());
-      if (value instanceof PCollection
-          && runner.doesPCollectionRequireIndexedFormat((PCollection<?>) value)) {
-        addBoolean(outputInfo, PropertyNames.USE_INDEXED_FORMAT, true);
-      }
-      if (valueCoder != null) {
-        // Verify that encoding can be decoded, in order to catch serialization
-        // failures as early as possible.
-        CloudObject encoding = SerializableUtils.ensureSerializable(valueCoder);
-        addObject(outputInfo, PropertyNames.ENCODING, encoding);
-        outputCoders.put(value, valueCoder);
-      }
-
-      outputInfoList.add(outputInfo);
-    }
-
-    private void addDisplayData(String name, DisplayData displayData) {
-      List<Map<String, Object>> serializedItems = Lists.newArrayList();
-      for (DisplayData.Item item : displayData.items()) {
-        serializedItems.add(MAPPER.convertValue(item, Map.class));
-      }
-
-      addList(getProperties(), name, serializedItems);
-    }
-
-    @Override
-    public OutputReference asOutputReference(PValue value) {
-      AppliedPTransform<?, ?, ?> transform =
-          value.getProducingTransformInternal();
-      String stepName = stepNames.get(transform);
-      if (stepName == null) {
-        throw new IllegalArgumentException(transform + " doesn't have a name specified");
-      }
-
-      String outputName = outputNames.get(value);
-      if (outputName == null) {
-        throw new IllegalArgumentException(
-            "output " + value + " doesn't have a name specified");
-      }
-
-      return new OutputReference(stepName, outputName);
-    }
-
-    private Map<String, Object> getProperties() {
-      Map<String, Object> properties = currentStep.getProperties();
-      if (properties == null) {
-        properties = new HashMap<>();
-        currentStep.setProperties(properties);
-      }
-      return properties;
-    }
-
-    /**
-     * Returns a fresh Dataflow step name.
-     */
-    private String genStepName() {
-      return "s" + (stepNames.size() + 1);
-    }
-
-    /**
-     * Records the name of the given output PValue,
-     * within its producing transform.
-     */
-    private void registerOutputName(POutput value, String name) {
-      if (outputNames.put(value, name) != null) {
-        throw new IllegalArgumentException(
-            "output " + value + " already has a name specified");
-      }
-    }
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  @Override
-  public String toString() {
-    return "DataflowPipelineTranslator#" + hashCode();
-  }
-
-
-  ///////////////////////////////////////////////////////////////////////////
-
-  static {
-    registerTransformTranslator(
-        View.CreatePCollectionView.class,
-        new TransformTranslator<View.CreatePCollectionView>() {
-          @Override
-          public void translate(
-              View.CreatePCollectionView transform,
-              TranslationContext context) {
-            translateTyped(transform, context);
-          }
-
-          private <ElemT, ViewT> void translateTyped(
-              View.CreatePCollectionView<ElemT, ViewT> transform,
-              TranslationContext context) {
-            context.addStep(transform, "CollectionToSingleton");
-            context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform));
-            context.addCollectionToSingletonOutput(
-                PropertyNames.OUTPUT,
-                context.getInput(transform),
-                context.getOutput(transform));
-          }
-        });
-
-    DataflowPipelineTranslator.registerTransformTranslator(
-        Combine.GroupedValues.class,
-        new DataflowPipelineTranslator.TransformTranslator<Combine.GroupedValues>() {
-          @Override
-          public void translate(
-              Combine.GroupedValues transform,
-              DataflowPipelineTranslator.TranslationContext context) {
-            translateHelper(transform, context);
-          }
-
-          private <K, InputT, OutputT> void translateHelper(
-              final Combine.GroupedValues<K, InputT, OutputT> transform,
-              DataflowPipelineTranslator.TranslationContext context) {
-            context.addStep(transform, "CombineValues");
-            translateInputs(context.getInput(transform), transform.getSideInputs(), context);
-
-            AppliedCombineFn<? super K, ? super InputT, ?, OutputT> fn =
-                transform.getAppliedFn(
-                    context.getInput(transform).getPipeline().getCoderRegistry(),
-                context.getInput(transform).getCoder(),
-                context.getInput(transform).getWindowingStrategy());
-
-            context.addEncodingInput(fn.getAccumulatorCoder());
-            context.addInput(
-                PropertyNames.SERIALIZED_FN,
-                byteArrayToJsonString(serializeToByteArray(fn)));
-            context.addOutput(PropertyNames.OUTPUT, context.getOutput(transform));
-          }
-        });
-
-    registerTransformTranslator(
-        Create.Values.class,
-        new TransformTranslator<Create.Values>() {
-          @Override
-          public void translate(
-              Create.Values transform,
-              TranslationContext context) {
-            createHelper(transform, context);
-          }
-
-          private <T> void createHelper(
-              Create.Values<T> transform,
-              TranslationContext context) {
-            context.addStep(transform, "CreateCollection");
-
-            Coder<T> coder = context.getOutput(transform).getCoder();
-            List<CloudObject> elements = new LinkedList<>();
-            for (T elem : transform.getElements()) {
-              byte[] encodedBytes;
-              try {
-                encodedBytes = encodeToByteArray(coder, elem);
-              } catch (CoderException exn) {
-                // TODO: Put in better element printing:
-                // truncate if too long.
-                throw new IllegalArgumentException(
-                    "Unable to encode element '" + elem + "' of transform '" + transform
-                    + "' using coder '" + coder + "'.",
-                    exn);
-              }
-              String encodedJson = byteArrayToJsonString(encodedBytes);
-              assert Arrays.equals(encodedBytes,
-                                   jsonStringToByteArray(encodedJson));
-              elements.add(CloudObject.forString(encodedJson));
-            }
-            context.addInput(PropertyNames.ELEMENT, elements);
-            context.addValueOnlyOutput(PropertyNames.OUTPUT, context.getOutput(transform));
-          }
-        });
-
-    registerTransformTranslator(
-        Flatten.FlattenPCollectionList.class,
-        new TransformTranslator<Flatten.FlattenPCollectionList>() {
-          @Override
-          public void translate(
-              Flatten.FlattenPCollectionList transform,
-              TranslationContext context) {
-            flattenHelper(transform, context);
-          }
-
-          private <T> void flattenHelper(
-              Flatten.FlattenPCollectionList<T> transform,
-              TranslationContext context) {
-            context.addStep(transform, "Flatten");
-
-            List<OutputReference> inputs = new LinkedList<>();
-            for (PCollection<T> input : context.getInput(transform).getAll()) {
-              inputs.add(context.asOutputReference(input));
-            }
-            context.addInput(PropertyNames.INPUTS, inputs);
-            context.addOutput(PropertyNames.OUTPUT, context.getOutput(transform));
-          }
-        });
-
-    registerTransformTranslator(
-        GroupByKeyAndSortValuesOnly.class,
-        new TransformTranslator<GroupByKeyAndSortValuesOnly>() {
-          @Override
-          public void translate(
-              GroupByKeyAndSortValuesOnly transform,
-              TranslationContext context) {
-            groupByKeyAndSortValuesHelper(transform, context);
-          }
-
-          private <K1, K2, V> void groupByKeyAndSortValuesHelper(
-              GroupByKeyAndSortValuesOnly<K1, K2, V> transform,
-              TranslationContext context) {
-            context.addStep(transform, "GroupByKey");
-            context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform));
-            context.addOutput(PropertyNames.OUTPUT, context.getOutput(transform));
-            context.addInput(PropertyNames.SORT_VALUES, true);
-
-            // TODO: Add support for combiner lifting once the need arises.
-            context.addInput(
-                PropertyNames.DISALLOW_COMBINER_LIFTING, true);
-          }
-        });
-
-    registerTransformTranslator(
-        GroupByKey.class,
-        new TransformTranslator<GroupByKey>() {
-          @Override
-          public void translate(
-              GroupByKey transform,
-              TranslationContext context) {
-            groupByKeyHelper(transform, context);
-          }
-
-          private <K, V> void groupByKeyHelper(
-              GroupByKey<K, V> transform,
-              TranslationContext context) {
-            context.addStep(transform, "GroupByKey");
-            context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform));
-            context.addOutput(PropertyNames.OUTPUT, context.getOutput(transform));
-
-            WindowingStrategy<?, ?> windowingStrategy =
-                context.getInput(transform).getWindowingStrategy();
-            boolean isStreaming =
-                context.getPipelineOptions().as(StreamingOptions.class).isStreaming();
-            boolean disallowCombinerLifting =
-                !windowingStrategy.getWindowFn().isNonMerging()
-                || (isStreaming && !transform.fewKeys())
-                // TODO: Allow combiner lifting on the non-default trigger, as appropriate.
-                || !(windowingStrategy.getTrigger().getSpec() instanceof DefaultTrigger);
-            context.addInput(
-                PropertyNames.DISALLOW_COMBINER_LIFTING, disallowCombinerLifting);
-            context.addInput(
-                PropertyNames.SERIALIZED_FN,
-                byteArrayToJsonString(serializeToByteArray(windowingStrategy)));
-            context.addInput(
-                PropertyNames.IS_MERGING_WINDOW_FN,
-                !windowingStrategy.getWindowFn().isNonMerging());
-          }
-        });
-
-    registerTransformTranslator(
-        ParDo.BoundMulti.class,
-        new TransformTranslator<ParDo.BoundMulti>() {
-          @Override
-          public void translate(
-              ParDo.BoundMulti transform,
-              TranslationContext context) {
-            translateMultiHelper(transform, context);
-          }
-
-          private <InputT, OutputT> void translateMultiHelper(
-              ParDo.BoundMulti<InputT, OutputT> transform,
-              TranslationContext context) {
-            context.addStep(transform, "ParallelDo");
-            translateInputs(context.getInput(transform), transform.getSideInputs(), context);
-            translateFn(transform.getFn(), context.getInput(transform).getWindowingStrategy(),
-                transform.getSideInputs(), context.getInput(transform).getCoder(), context);
-            translateOutputs(context.getOutput(transform), context);
-          }
-        });
-
-    registerTransformTranslator(
-        ParDo.Bound.class,
-        new TransformTranslator<ParDo.Bound>() {
-          @Override
-          public void translate(
-              ParDo.Bound transform,
-              TranslationContext context) {
-            translateSingleHelper(transform, context);
-          }
-
-          private <InputT, OutputT> void translateSingleHelper(
-              ParDo.Bound<InputT, OutputT> transform,
-              TranslationContext context) {
-            context.addStep(transform, "ParallelDo");
-            translateInputs(context.getInput(transform), transform.getSideInputs(), context);
-            translateFn(
-                transform.getFn(),
-                context.getInput(transform).getWindowingStrategy(),
-                transform.getSideInputs(), context.getInput(transform).getCoder(), context);
-            context.addOutput(PropertyNames.OUTPUT, context.getOutput(transform));
-          }
-        });
-
-    registerTransformTranslator(
-        Window.Bound.class,
-        new DataflowPipelineTranslator.TransformTranslator<Window.Bound>() {
-          @Override
-          public void translate(
-              Window.Bound transform, TranslationContext context) {
-            translateHelper(transform, context);
-          }
-
-          private <T> void translateHelper(
-              Window.Bound<T> transform, TranslationContext context) {
-            context.addStep(transform, "Bucket");
-            context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform));
-            context.addOutput(PropertyNames.OUTPUT, context.getOutput(transform));
-
-            WindowingStrategy<?, ?> strategy = context.getOutput(transform).getWindowingStrategy();
-            byte[] serializedBytes = serializeToByteArray(strategy);
-            String serializedJson = byteArrayToJsonString(serializedBytes);
-            assert Arrays.equals(serializedBytes,
-                                 jsonStringToByteArray(serializedJson));
-            context.addInput(PropertyNames.SERIALIZED_FN, serializedJson);
-          }
-        });
-
-    ///////////////////////////////////////////////////////////////////////////
-    // IO Translation.
-
-    registerTransformTranslator(
-        BigQueryIO.Read.Bound.class, new BigQueryIOTranslator.ReadTranslator());
-    registerTransformTranslator(
-        BigQueryIO.Write.Bound.class, new BigQueryIOTranslator.WriteTranslator());
-
-    registerTransformTranslator(
-        PubsubIO.Read.Bound.class, new PubsubIOTranslator.ReadTranslator());
-    registerTransformTranslator(
-        DataflowPipelineRunner.StreamingPubsubIOWrite.class,
-        new PubsubIOTranslator.WriteTranslator());
-
-    registerTransformTranslator(Read.Bounded.class, new ReadTranslator());
-  }
-
-  private static void translateInputs(
-      PCollection<?> input,
-      List<PCollectionView<?>> sideInputs,
-      TranslationContext context) {
-    context.addInput(PropertyNames.PARALLEL_INPUT, input);
-    translateSideInputs(sideInputs, context);
-  }
-
-  // Used for ParDo
-  private static void translateSideInputs(
-      List<PCollectionView<?>> sideInputs,
-      TranslationContext context) {
-    Map<String, Object> nonParInputs = new HashMap<>();
-
-    for (PCollectionView<?> view : sideInputs) {
-      nonParInputs.put(
-          view.getTagInternal().getId(),
-          context.asOutputReference(view));
-    }
-
-    context.addInput(PropertyNames.NON_PARALLEL_INPUTS, nonParInputs);
-  }
-
-  private static void translateFn(
-      DoFn fn,
-      WindowingStrategy windowingStrategy,
-      Iterable<PCollectionView<?>> sideInputs,
-      Coder inputCoder,
-      TranslationContext context) {
-    context.addInput(PropertyNames.USER_FN, fn.getClass().getName());
-    context.addInput(
-        PropertyNames.SERIALIZED_FN,
-        byteArrayToJsonString(serializeToByteArray(
-            new DoFnInfo(fn, windowingStrategy, sideInputs, inputCoder))));
-  }
-
-  private static void translateOutputs(
-      PCollectionTuple outputs,
-      TranslationContext context) {
-    for (Map.Entry<TupleTag<?>, PCollection<?>> entry
-             : outputs.getAll().entrySet()) {
-      TupleTag<?> tag = entry.getKey();
-      PCollection<?> output = entry.getValue();
-      context.addOutput(tag.getId(), output);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowServiceException.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowServiceException.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowServiceException.java
deleted file mode 100644
index 6e8301b..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowServiceException.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.runners;
-
-import javax.annotation.Nullable;
-
-/**
- * Signals there was an error retrieving information about a job from the Cloud Dataflow Service.
- */
-public class DataflowServiceException extends DataflowJobException {
-  DataflowServiceException(DataflowPipelineJob job, String message) {
-    this(job, message, null);
-  }
-
-  DataflowServiceException(DataflowPipelineJob job, String message, @Nullable Throwable cause) {
-    super(job, message, cause);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipeline.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipeline.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipeline.java
deleted file mode 100644
index 5217a90..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipeline.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.runners;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.options.DirectPipelineOptions;
-
-/**
- * A {@link DirectPipeline} is a {@link Pipeline} that returns
- * {@link DirectPipelineRunner.EvaluationResults} when it is
- * {@link com.google.cloud.dataflow.sdk.Pipeline#run()}.
- */
-public class DirectPipeline extends Pipeline {
-
-  /**
-   * Creates and returns a new DirectPipeline instance for tests.
-   */
-  public static DirectPipeline createForTest() {
-    DirectPipelineRunner runner = DirectPipelineRunner.createForTest();
-    return new DirectPipeline(runner, runner.getPipelineOptions());
-  }
-
-  private DirectPipeline(DirectPipelineRunner runner, DirectPipelineOptions options) {
-    super(runner, options);
-  }
-
-  @Override
-  public DirectPipelineRunner.EvaluationResults run() {
-    return (DirectPipelineRunner.EvaluationResults) super.run();
-  }
-
-  @Override
-  public DirectPipelineRunner getRunner() {
-    return (DirectPipelineRunner) super.getRunner();
-  }
-
-  @Override
-  public String toString() {
-    return "DirectPipeline#" + hashCode();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipelineRegistrar.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipelineRegistrar.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipelineRegistrar.java
deleted file mode 100644
index f2dd40c..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipelineRegistrar.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.runners;
-
-import com.google.auto.service.AutoService;
-import com.google.cloud.dataflow.sdk.options.DirectPipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsRegistrar;
-import com.google.common.collect.ImmutableList;
-
-/**
- * Contains the {@link PipelineOptionsRegistrar} and {@link PipelineRunnerRegistrar} for
- * the {@link DirectPipeline}.
- */
-public class DirectPipelineRegistrar {
-  private DirectPipelineRegistrar() { }
-
-  /**
-   * Register the {@link DirectPipelineRunner}.
-   */
-  @AutoService(PipelineRunnerRegistrar.class)
-  public static class Runner implements PipelineRunnerRegistrar {
-    @Override
-    public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() {
-      return ImmutableList.<Class<? extends PipelineRunner<?>>>of(DirectPipelineRunner.class);
-    }
-  }
-
-  /**
-   * Register the {@link DirectPipelineOptions}.
-   */
-  @AutoService(PipelineOptionsRegistrar.class)
-  public static class Options implements PipelineOptionsRegistrar {
-    @Override
-    public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
-      return ImmutableList.<Class<? extends PipelineOptions>>of(DirectPipelineOptions.class);
-    }
-  }
-}