You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/04/14 06:48:25 UTC

[38/74] [partial] incubator-beam git commit: Rename com/google/cloud/dataflow->org/apache/beam

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java8/src/test/java/org/apache/beam/examples/complete/game/UserScoreTest.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/UserScoreTest.java b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/UserScoreTest.java
new file mode 100644
index 0000000..b907ae7
--- /dev/null
+++ b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/UserScoreTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.examples.complete.game;
+
+import com.google.cloud.dataflow.examples.complete.game.UserScore.ExtractAndSumScore;
+import com.google.cloud.dataflow.examples.complete.game.UserScore.GameActionInfo;
+import com.google.cloud.dataflow.examples.complete.game.UserScore.ParseEventFn;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
+import com.google.cloud.dataflow.sdk.testing.PAssert;
+import com.google.cloud.dataflow.sdk.testing.RunnableOnService;
+import com.google.cloud.dataflow.sdk.testing.TestPipeline;
+import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.transforms.DoFnTester;
+import com.google.cloud.dataflow.sdk.transforms.MapElements;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Tests of UserScore.
+ */
+@RunWith(JUnit4.class)
+public class UserScoreTest implements Serializable {
+
+  static final String[] GAME_EVENTS_ARRAY = new String[] {
+    "user0_MagentaKangaroo,MagentaKangaroo,3,1447955630000,2015-11-19 09:53:53.444",
+    "user13_ApricotQuokka,ApricotQuokka,15,1447955630000,2015-11-19 09:53:53.444",
+    "user6_AmberNumbat,AmberNumbat,11,1447955630000,2015-11-19 09:53:53.444",
+    "user7_AlmondWallaby,AlmondWallaby,15,1447955630000,2015-11-19 09:53:53.444",
+    "user7_AndroidGreenKookaburra,AndroidGreenKookaburra,12,1447955630000,2015-11-19 09:53:53.444",
+    "user6_AliceBlueDingo,AliceBlueDingo,4,xxxxxxx,2015-11-19 09:53:53.444",
+    "user7_AndroidGreenKookaburra,AndroidGreenKookaburra,11,1447955630000,2015-11-19 09:53:53.444",
+    "THIS IS A PARSE ERROR,2015-11-19 09:53:53.444",
+    "user19_BisqueBilby,BisqueBilby,6,1447955630000,2015-11-19 09:53:53.444",
+    "user19_BisqueBilby,BisqueBilby,8,1447955630000,2015-11-19 09:53:53.444"
+  };
+
+    static final String[] GAME_EVENTS_ARRAY2 = new String[] {
+    "user6_AliceBlueDingo,AliceBlueDingo,4,xxxxxxx,2015-11-19 09:53:53.444",
+    "THIS IS A PARSE ERROR,2015-11-19 09:53:53.444",
+    "user13_BisqueBilby,BisqueBilby,xxx,1447955630000,2015-11-19 09:53:53.444"
+  };
+
+  static final List<String> GAME_EVENTS = Arrays.asList(GAME_EVENTS_ARRAY);
+  static final List<String> GAME_EVENTS2 = Arrays.asList(GAME_EVENTS_ARRAY2);
+
+  static final List<KV<String, Integer>> USER_SUMS = Arrays.asList(
+      KV.of("user0_MagentaKangaroo", 3), KV.of("user13_ApricotQuokka", 15),
+      KV.of("user6_AmberNumbat", 11), KV.of("user7_AlmondWallaby", 15),
+      KV.of("user7_AndroidGreenKookaburra", 23),
+      KV.of("user19_BisqueBilby", 14));
+
+  static final List<KV<String, Integer>> TEAM_SUMS = Arrays.asList(
+      KV.of("MagentaKangaroo", 3), KV.of("ApricotQuokka", 15),
+      KV.of("AmberNumbat", 11), KV.of("AlmondWallaby", 15),
+      KV.of("AndroidGreenKookaburra", 23),
+      KV.of("BisqueBilby", 14));
+
+  /** Test the ParseEventFn DoFn. */
+  @Test
+  public void testParseEventFn() {
+    DoFnTester<String, GameActionInfo> parseEventFn =
+        DoFnTester.of(new ParseEventFn());
+
+    List<GameActionInfo> results = parseEventFn.processBatch(GAME_EVENTS_ARRAY);
+    Assert.assertEquals(results.size(), 8);
+    Assert.assertEquals(results.get(0).getUser(), "user0_MagentaKangaroo");
+    Assert.assertEquals(results.get(0).getTeam(), "MagentaKangaroo");
+    Assert.assertEquals(results.get(0).getScore(), new Integer(3));
+  }
+
+  /** Tests ExtractAndSumScore("user"). */
+  @Test
+  @Category(RunnableOnService.class)
+  public void testUserScoreSums() throws Exception {
+    Pipeline p = TestPipeline.create();
+
+    PCollection<String> input = p.apply(Create.of(GAME_EVENTS).withCoder(StringUtf8Coder.of()));
+
+    PCollection<KV<String, Integer>> output = input
+      .apply(ParDo.of(new ParseEventFn()))
+      // Extract and sum username/score pairs from the event data.
+      .apply("ExtractUserScore", new ExtractAndSumScore("user"));
+
+    // Check the user score sums.
+    PAssert.that(output).containsInAnyOrder(USER_SUMS);
+
+    p.run();
+  }
+
+  /** Tests ExtractAndSumScore("team"). */
+  @Test
+  @Category(RunnableOnService.class)
+  public void testTeamScoreSums() throws Exception {
+    Pipeline p = TestPipeline.create();
+
+    PCollection<String> input = p.apply(Create.of(GAME_EVENTS).withCoder(StringUtf8Coder.of()));
+
+    PCollection<KV<String, Integer>> output = input
+      .apply(ParDo.of(new ParseEventFn()))
+      // Extract and sum teamname/score pairs from the event data.
+      .apply("ExtractTeamScore", new ExtractAndSumScore("team"));
+
+    // Check the team score sums.
+    PAssert.that(output).containsInAnyOrder(TEAM_SUMS);
+
+    p.run();
+  }
+
+  /** Test that bad input data is dropped appropriately. */
+  @Test
+  @Category(RunnableOnService.class)
+  public void testUserScoresBadInput() throws Exception {
+    Pipeline p = TestPipeline.create();
+
+    PCollection<String> input = p.apply(Create.of(GAME_EVENTS2).withCoder(StringUtf8Coder.of()));
+
+    PCollection<KV<String, Integer>> extract = input
+      .apply(ParDo.of(new ParseEventFn()))
+      .apply(
+          MapElements.via((GameActionInfo gInfo) -> KV.of(gInfo.getUser(), gInfo.getScore()))
+          .withOutputType(new TypeDescriptor<KV<String, Integer>>() {}));
+
+    PAssert.that(extract).empty();
+
+    p.run();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/BlockingDataflowPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/BlockingDataflowPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/BlockingDataflowPipelineOptions.java
deleted file mode 100644
index 6bbafdd..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/BlockingDataflowPipelineOptions.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.options;
-
-import com.google.cloud.dataflow.sdk.runners.BlockingDataflowPipelineRunner;
-
-import com.fasterxml.jackson.annotation.JsonIgnore;
-
-import java.io.PrintStream;
-
-/**
- * Options that are used to configure the {@link BlockingDataflowPipelineRunner}.
- */
-@Description("Configure options on the BlockingDataflowPipelineRunner.")
-public interface BlockingDataflowPipelineOptions extends DataflowPipelineOptions {
-  /**
-   * Output stream for job status messages.
-   */
-  @Description("Where messages generated during execution of the Dataflow job will be output.")
-  @JsonIgnore
-  @Hidden
-  @Default.InstanceFactory(StandardOutputFactory.class)
-  PrintStream getJobMessageOutput();
-  void setJobMessageOutput(PrintStream value);
-
-  /**
-   * Returns a default of {@link System#out}.
-   */
-  public static class StandardOutputFactory implements DefaultValueFactory<PrintStream> {
-    @Override
-    public PrintStream create(PipelineOptions options) {
-      return System.out;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/CloudDebuggerOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/CloudDebuggerOptions.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/CloudDebuggerOptions.java
deleted file mode 100644
index 3f0503e..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/CloudDebuggerOptions.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.options;
-
-import org.apache.beam.sdk.annotations.Experimental;
-
-import com.google.api.services.clouddebugger.v2.model.Debuggee;
-
-import javax.annotation.Nullable;
-
-/**
- * Options for controlling Cloud Debugger.
- */
-@Description("[Experimental] Used to configure the Cloud Debugger")
-@Experimental
-@Hidden
-public interface CloudDebuggerOptions {
-
-  /** Whether to enable the Cloud Debugger snapshot agent for the current job. */
-  @Description("Whether to enable the Cloud Debugger snapshot agent for the current job.")
-  boolean getEnableCloudDebugger();
-  void setEnableCloudDebugger(boolean enabled);
-
-  /** The Cloud Debugger debuggee to associate with. This should not be set directly. */
-  @Description("The Cloud Debugger debuggee to associate with. This should not be set directly.")
-  @Hidden
-  @Nullable Debuggee getDebuggee();
-  void setDebuggee(Debuggee debuggee);
-
-  /** The maximum cost (as a ratio of CPU time) allowed for evaluating conditional snapshots. */
-  @Description(
-      "The maximum cost (as a ratio of CPU time) allowed for evaluating conditional snapshots. "
-      + "Should be a double between 0 and 1. "
-      + "Snapshots will be cancelled if evaluating conditions takes more than this ratio of time.")
-  @Default.Double(0.01)
-  double getMaxConditionCost();
-  void setMaxConditionCost(double maxConditionCost);
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineDebugOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineDebugOptions.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineDebugOptions.java
deleted file mode 100644
index 1be93eb..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineDebugOptions.java
+++ /dev/null
@@ -1,254 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.options;
-
-import org.apache.beam.sdk.annotations.Experimental;
-
-import com.google.api.services.dataflow.Dataflow;
-import com.google.cloud.dataflow.sdk.util.DataflowPathValidator;
-import com.google.cloud.dataflow.sdk.util.DataflowTransport;
-import com.google.cloud.dataflow.sdk.util.GcsStager;
-import com.google.cloud.dataflow.sdk.util.InstanceBuilder;
-import com.google.cloud.dataflow.sdk.util.PathValidator;
-import com.google.cloud.dataflow.sdk.util.Stager;
-
-import com.fasterxml.jackson.annotation.JsonIgnore;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * Internal. Options used to control execution of the Dataflow SDK for
- * debugging and testing purposes.
- */
-@Description("[Internal] Options used to control execution of the Dataflow SDK for "
-    + "debugging and testing purposes.")
-@Hidden
-public interface DataflowPipelineDebugOptions extends PipelineOptions {
-
-  /**
-   * The list of backend experiments to enable.
-   *
-   * <p>Dataflow provides a number of experimental features that can be enabled
-   * with this flag.
-   *
-   * <p>Please sync with the Dataflow team before enabling any experiments.
-   */
-  @Description("[Experimental] Dataflow provides a number of experimental features that can "
-      + "be enabled with this flag. Please sync with the Dataflow team before enabling any "
-      + "experiments.")
-  @Experimental
-  List<String> getExperiments();
-  void setExperiments(List<String> value);
-
-  /**
-   * The root URL for the Dataflow API. {@code dataflowEndpoint} can override this value
-   * if it contains an absolute URL, otherwise {@code apiRootUrl} will be combined with
-   * {@code dataflowEndpoint} to generate the full URL to communicate with the Dataflow API.
-   */
-  @Description("The root URL for the Dataflow API. dataflowEndpoint can override this "
-      + "value if it contains an absolute URL, otherwise apiRootUrl will be combined with "
-      + "dataflowEndpoint to generate the full URL to communicate with the Dataflow API.")
-  @Default.String(Dataflow.DEFAULT_ROOT_URL)
-  String getApiRootUrl();
-  void setApiRootUrl(String value);
-
-  /**
-   * Dataflow endpoint to use.
-   *
-   * <p>Defaults to the current version of the Google Cloud Dataflow
-   * API, at the time the current SDK version was released.
-   *
-   * <p>If the string contains "://", then this is treated as a URL,
-   * otherwise {@link #getApiRootUrl()} is used as the root
-   * URL.
-   */
-  @Description("The URL for the Dataflow API. If the string contains \"://\", this"
-      + " will be treated as the entire URL, otherwise will be treated relative to apiRootUrl.")
-  @Default.String(Dataflow.DEFAULT_SERVICE_PATH)
-  String getDataflowEndpoint();
-  void setDataflowEndpoint(String value);
-
-  /**
-   * The path to write the translated Dataflow job specification out to
-   * at job submission time. The Dataflow job specification will be represented in JSON
-   * format.
-   */
-  @Description("The path to write the translated Dataflow job specification out to "
-      + "at job submission time. The Dataflow job specification will be represented in JSON "
-      + "format.")
-  String getDataflowJobFile();
-  void setDataflowJobFile(String value);
-
-  /**
-   * The class of the validator that should be created and used to validate paths.
-   * If pathValidator has not been set explicitly, an instance of this class will be
-   * constructed and used as the path validator.
-   */
-  @Description("The class of the validator that should be created and used to validate paths. "
-      + "If pathValidator has not been set explicitly, an instance of this class will be "
-      + "constructed and used as the path validator.")
-  @Default.Class(DataflowPathValidator.class)
-  Class<? extends PathValidator> getPathValidatorClass();
-  void setPathValidatorClass(Class<? extends PathValidator> validatorClass);
-
-  /**
-   * The path validator instance that should be used to validate paths.
-   * If no path validator has been set explicitly, the default is to use the instance factory that
-   * constructs a path validator based upon the currently set pathValidatorClass.
-   */
-  @JsonIgnore
-  @Description("The path validator instance that should be used to validate paths. "
-      + "If no path validator has been set explicitly, the default is to use the instance factory "
-      + "that constructs a path validator based upon the currently set pathValidatorClass.")
-  @Default.InstanceFactory(PathValidatorFactory.class)
-  PathValidator getPathValidator();
-  void setPathValidator(PathValidator validator);
-
-  /**
-   * The class responsible for staging resources to be accessible by workers
-   * during job execution. If stager has not been set explicitly, an instance of this class
-   * will be created and used as the resource stager.
-   */
-  @Description("The class of the stager that should be created and used to stage resources. "
-      + "If stager has not been set explicitly, an instance of the this class will be created "
-      + "and used as the resource stager.")
-  @Default.Class(GcsStager.class)
-  Class<? extends Stager> getStagerClass();
-  void setStagerClass(Class<? extends Stager> stagerClass);
-
-  /**
-   * The resource stager instance that should be used to stage resources.
-   * If no stager has been set explicitly, the default is to use the instance factory
-   * that constructs a resource stager based upon the currently set stagerClass.
-   */
-  @JsonIgnore
-  @Description("The resource stager instance that should be used to stage resources. "
-      + "If no stager has been set explicitly, the default is to use the instance factory "
-      + "that constructs a resource stager based upon the currently set stagerClass.")
-  @Default.InstanceFactory(StagerFactory.class)
-  Stager getStager();
-  void setStager(Stager stager);
-
-  /**
-   * An instance of the Dataflow client. Defaults to creating a Dataflow client
-   * using the current set of options.
-   */
-  @JsonIgnore
-  @Description("An instance of the Dataflow client. Defaults to creating a Dataflow client "
-      + "using the current set of options.")
-  @Default.InstanceFactory(DataflowClientFactory.class)
-  Dataflow getDataflowClient();
-  void setDataflowClient(Dataflow value);
-
-  /** Returns the default Dataflow client built from the passed in PipelineOptions. */
-  public static class DataflowClientFactory implements DefaultValueFactory<Dataflow> {
-    @Override
-    public Dataflow create(PipelineOptions options) {
-        return DataflowTransport.newDataflowClient(
-            options.as(DataflowPipelineOptions.class)).build();
-    }
-  }
-
-  /**
-   * Whether to update the currently running pipeline with the same name as this one.
-   *
-   * @deprecated This property is replaced by {@link DataflowPipelineOptions#getUpdate()}
-   */
-  @Deprecated
-  @Description("If set, replace the existing pipeline with the name specified by --jobName with "
-      + "this pipeline, preserving state.")
-  boolean getUpdate();
-  @Deprecated
-  void setUpdate(boolean value);
-
-  /**
-   * Mapping of old PTranform names to new ones, specified as JSON
-   * <code>{"oldName":"newName",...}</code>. To mark a transform as deleted, make newName the
-   * empty string.
-   */
-  @JsonIgnore
-  @Description(
-      "Mapping of old PTranform names to new ones, specified as JSON "
-      + "{\"oldName\":\"newName\",...}. To mark a transform as deleted, make newName the empty "
-      + "string.")
-  Map<String, String> getTransformNameMapping();
-  void setTransformNameMapping(Map<String, String> value);
-
-  /**
-   * Custom windmill_main binary to use with the streaming runner.
-   */
-  @Description("Custom windmill_main binary to use with the streaming runner")
-  String getOverrideWindmillBinary();
-  void setOverrideWindmillBinary(String value);
-
-  /**
-   * Number of threads to use on the Dataflow worker harness. If left unspecified,
-   * the Dataflow service will compute an appropriate number of threads to use.
-   */
-  @Description("Number of threads to use on the Dataflow worker harness. If left unspecified, "
-      + "the Dataflow service will compute an appropriate number of threads to use.")
-  int getNumberOfWorkerHarnessThreads();
-  void setNumberOfWorkerHarnessThreads(int value);
-
-  /**
-   * If {@literal true}, save a heap dump before killing a thread or process which is GC
-   * thrashing or out of memory. The location of the heap file will either be echoed back
-   * to the user, or the user will be given the opportunity to download the heap file.
-   *
-   * <p>
-   * CAUTION: Heap dumps can of comparable size to the default boot disk. Consider increasing
-   * the boot disk size before setting this flag to true.
-   */
-  @Description("If {@literal true}, save a heap dump before killing a thread or process "
-      + "which is GC thrashing or out of memory.")
-  boolean getDumpHeapOnOOM();
-  void setDumpHeapOnOOM(boolean dumpHeapBeforeExit);
-
-  /**
-   * Creates a {@link PathValidator} object using the class specified in
-   * {@link #getPathValidatorClass()}.
-   */
-  public static class PathValidatorFactory implements DefaultValueFactory<PathValidator> {
-      @Override
-      public PathValidator create(PipelineOptions options) {
-      DataflowPipelineDebugOptions debugOptions = options.as(DataflowPipelineDebugOptions.class);
-      return InstanceBuilder.ofType(PathValidator.class)
-          .fromClass(debugOptions.getPathValidatorClass())
-          .fromFactoryMethod("fromOptions")
-          .withArg(PipelineOptions.class, options)
-          .build();
-    }
-  }
-
-  /**
-   * Creates a {@link Stager} object using the class specified in
-   * {@link #getStagerClass()}.
-   */
-  public static class StagerFactory implements DefaultValueFactory<Stager> {
-      @Override
-      public Stager create(PipelineOptions options) {
-      DataflowPipelineDebugOptions debugOptions = options.as(DataflowPipelineDebugOptions.class);
-      return InstanceBuilder.ofType(Stager.class)
-          .fromClass(debugOptions.getStagerClass())
-          .fromFactoryMethod("fromOptions")
-          .withArg(PipelineOptions.class, options)
-          .build();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineOptions.java
deleted file mode 100644
index dbfafd1..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineOptions.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.options;
-
-import com.google.cloud.dataflow.sdk.runners.DataflowPipeline;
-import com.google.common.base.MoreObjects;
-
-import org.joda.time.DateTimeUtils;
-import org.joda.time.DateTimeZone;
-import org.joda.time.format.DateTimeFormat;
-import org.joda.time.format.DateTimeFormatter;
-
-/**
- * Options that can be used to configure the {@link DataflowPipeline}.
- */
-@Description("Options that configure the Dataflow pipeline.")
-public interface DataflowPipelineOptions extends
-    PipelineOptions, GcpOptions, ApplicationNameOptions, DataflowPipelineDebugOptions,
-    DataflowPipelineWorkerPoolOptions, BigQueryOptions,
-    GcsOptions, StreamingOptions, CloudDebuggerOptions, DataflowWorkerLoggingOptions,
-    DataflowProfilingOptions, PubsubOptions {
-
-  @Description("Project id. Required when running a Dataflow in the cloud. "
-      + "See https://cloud.google.com/storage/docs/projects for further details.")
-  @Override
-  @Validation.Required
-  @Default.InstanceFactory(DefaultProjectFactory.class)
-  String getProject();
-  @Override
-  void setProject(String value);
-
-  /**
-   * GCS path for staging local files, e.g. gs://bucket/object
-   *
-   * <p>Must be a valid Cloud Storage URL, beginning with the prefix "gs://"
-   *
-   * <p>At least one of {@link PipelineOptions#getTempLocation()} or {@link #getStagingLocation()}
-   * must be set. If {@link #getStagingLocation()} is not set, then the Dataflow
-   * pipeline defaults to using {@link PipelineOptions#getTempLocation()}.
-   */
-  @Description("GCS path for staging local files, e.g. \"gs://bucket/object\". "
-      + "Must be a valid Cloud Storage URL, beginning with the prefix \"gs://\". "
-      + "At least one of stagingLocation or tempLocation must be set. If stagingLocation is unset, "
-      + "defaults to using tempLocation.")
-  String getStagingLocation();
-  void setStagingLocation(String value);
-
-  /**
-   * The Dataflow job name is used as an idempotence key within the Dataflow service.
-   * If there is an existing job that is currently active, another active job with the same
-   * name will not be able to be created. Defaults to using the ApplicationName-UserName-Date.
-   */
-  @Description("The Dataflow job name is used as an idempotence key within the Dataflow service. "
-      + "If there is an existing job that is currently active, another active job with the same "
-      + "name will not be able to be created. Defaults to using the ApplicationName-UserName-Date.")
-  @Default.InstanceFactory(JobNameFactory.class)
-  String getJobName();
-  void setJobName(String value);
-
-  /**
-   * Whether to update the currently running pipeline with the same name as this one.
-   */
-  @Override
-  @SuppressWarnings("deprecation") // base class member deprecated in favor of this one.
-  @Description(
-      "If set, replace the existing pipeline with the name specified by --jobName with "
-          + "this pipeline, preserving state.")
-  boolean getUpdate();
-  @Override
-  @SuppressWarnings("deprecation") // base class member deprecated in favor of this one.
-  void setUpdate(boolean value);
-
-  /**
-   * Returns a normalized job name constructed from {@link ApplicationNameOptions#getAppName()}, the
-   * local system user name (if available), and the current time. The normalization makes sure that
-   * the job name matches the required pattern of [a-z]([-a-z0-9]*[a-z0-9])? and length limit of 40
-   * characters.
-   *
-   * <p>This job name factory is only able to generate one unique name per second per application
-   * and user combination.
-   */
-  public static class JobNameFactory implements DefaultValueFactory<String> {
-    private static final DateTimeFormatter FORMATTER =
-        DateTimeFormat.forPattern("MMddHHmmss").withZone(DateTimeZone.UTC);
-
-    @Override
-    public String create(PipelineOptions options) {
-      String appName = options.as(ApplicationNameOptions.class).getAppName();
-      String normalizedAppName = appName == null || appName.length() == 0 ? "dataflow"
-          : appName.toLowerCase()
-                   .replaceAll("[^a-z0-9]", "0")
-                   .replaceAll("^[^a-z]", "a");
-      String userName = MoreObjects.firstNonNull(System.getProperty("user.name"), "");
-      String normalizedUserName = userName.toLowerCase()
-                                          .replaceAll("[^a-z0-9]", "0");
-      String datePart = FORMATTER.print(DateTimeUtils.currentTimeMillis());
-      return normalizedAppName + "-" + normalizedUserName + "-" + datePart;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineWorkerPoolOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineWorkerPoolOptions.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineWorkerPoolOptions.java
deleted file mode 100644
index 44a9b00..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineWorkerPoolOptions.java
+++ /dev/null
@@ -1,258 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.options;
-
-import org.apache.beam.sdk.annotations.Experimental;
-import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
-
-import com.fasterxml.jackson.annotation.JsonIgnore;
-
-import java.util.List;
-
-/**
- * Options that are used to configure the Dataflow pipeline worker pool.
- */
-@Description("Options that are used to configure the Dataflow pipeline worker pool.")
-public interface DataflowPipelineWorkerPoolOptions extends PipelineOptions {
-  /**
-   * Number of workers to use when executing the Dataflow job. Note that selection of an autoscaling
-   * algorithm other then {@code NONE} will affect the size of the worker pool. If left unspecified,
-   * the Dataflow service will determine the number of workers.
-   */
-  @Description("Number of workers to use when executing the Dataflow job. Note that "
-      + "selection of an autoscaling algorithm other then \"NONE\" will affect the "
-      + "size of the worker pool. If left unspecified, the Dataflow service will "
-      + "determine the number of workers.")
-  int getNumWorkers();
-  void setNumWorkers(int value);
-
-  /**
-   * Type of autoscaling algorithm to use.
-   */
-  @Experimental(Experimental.Kind.AUTOSCALING)
-  public enum AutoscalingAlgorithmType {
-    /** Use numWorkers machines. Do not autoscale the worker pool. */
-    NONE("AUTOSCALING_ALGORITHM_NONE"),
-
-    @Deprecated
-    BASIC("AUTOSCALING_ALGORITHM_BASIC"),
-
-    /** Autoscale the workerpool based on throughput (up to maxNumWorkers). */
-    THROUGHPUT_BASED("AUTOSCALING_ALGORITHM_BASIC");
-
-    private final String algorithm;
-
-    private AutoscalingAlgorithmType(String algorithm) {
-      this.algorithm = algorithm;
-    }
-
-    /** Returns the string representation of this type. */
-    public String getAlgorithm() {
-      return this.algorithm;
-    }
-  }
-
-  /**
-   * [Experimental] The autoscaling algorithm to use for the workerpool.
-   *
-   * <ul>
-   *   <li>NONE: does not change the size of the worker pool.</li>
-   *   <li>BASIC: autoscale the worker pool size up to maxNumWorkers until the job completes.</li>
-   *   <li>THROUGHPUT_BASED: autoscale the workerpool based on throughput (up to maxNumWorkers).
-   *   </li>
-   * </ul>
-   */
-  @Description("[Experimental] The autoscaling algorithm to use for the workerpool. "
-      + "NONE: does not change the size of the worker pool. "
-      + "BASIC (deprecated): autoscale the worker pool size up to maxNumWorkers until the job "
-      + "completes. "
-      + "THROUGHPUT_BASED: autoscale the workerpool based on throughput (up to maxNumWorkers).")
-  @Experimental(Experimental.Kind.AUTOSCALING)
-  AutoscalingAlgorithmType getAutoscalingAlgorithm();
-  void setAutoscalingAlgorithm(AutoscalingAlgorithmType value);
-
-  /**
-   * The maximum number of workers to use for the workerpool. This options limits the size of the
-   * workerpool for the lifetime of the job, including
-   * <a href="https://cloud.google.com/dataflow/pipelines/updating-a-pipeline">pipeline updates</a>.
-   * If left unspecified, the Dataflow service will compute a ceiling.
-   */
-  @Description("The maximum number of workers to use for the workerpool. This options limits the "
-      + "size of the workerpool for the lifetime of the job, including pipeline updates. "
-      + "If left unspecified, the Dataflow service will compute a ceiling.")
-  int getMaxNumWorkers();
-  void setMaxNumWorkers(int value);
-
-  /**
-   * Remote worker disk size, in gigabytes, or 0 to use the default size.
-   */
-  @Description("Remote worker disk size, in gigabytes, or 0 to use the default size.")
-  int getDiskSizeGb();
-  void setDiskSizeGb(int value);
-
-  /**
-   * Docker container image that executes Dataflow worker harness, residing in Google Container
-   * Registry.
-   */
-  @Default.InstanceFactory(WorkerHarnessContainerImageFactory.class)
-  @Description("Docker container image that executes Dataflow worker harness, residing in Google "
-      + " Container Registry.")
-  @Hidden
-  String getWorkerHarnessContainerImage();
-  void setWorkerHarnessContainerImage(String value);
-
-  /**
-   * Returns the default Docker container image that executes Dataflow worker harness, residing in
-   * Google Container Registry.
-   */
-  public static class WorkerHarnessContainerImageFactory
-      implements DefaultValueFactory<String> {
-    @Override
-    public String create(PipelineOptions options) {
-      DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
-      if (dataflowOptions.isStreaming()) {
-        return DataflowPipelineRunner.STREAMING_WORKER_HARNESS_CONTAINER_IMAGE;
-      } else {
-        return DataflowPipelineRunner.BATCH_WORKER_HARNESS_CONTAINER_IMAGE;
-      }
-    }
-  }
-
-  /**
-   * GCE <a href="https://cloud.google.com/compute/docs/networking">network</a> for launching
-   * workers.
-   *
-   * <p>Default is up to the Dataflow service.
-   */
-  @Description("GCE network for launching workers. For more information, see the reference "
-      + "documentation https://cloud.google.com/compute/docs/networking. "
-      + "Default is up to the Dataflow service.")
-  String getNetwork();
-  void setNetwork(String value);
-
-  /**
-   * GCE <a href="https://cloud.google.com/compute/docs/networking">subnetwork</a> for launching
-   * workers.
-   *
-   * <p>Default is up to the Dataflow service. Expected format is
-   * regions/REGION/subnetworks/SUBNETWORK.
-   *
-   * <p>You may also need to specify network option.
-   */
-  @Description("GCE subnetwork for launching workers. For more information, see the reference "
-      + "documentation https://cloud.google.com/compute/docs/networking. "
-      + "Default is up to the Dataflow service.")
-  String getSubnetwork();
-  void setSubnetwork(String value);
-
-  /**
-   * GCE <a href="https://developers.google.com/compute/docs/zones"
-   * >availability zone</a> for launching workers.
-   *
-   * <p>Default is up to the Dataflow service.
-   */
-  @Description("GCE availability zone for launching workers. See "
-      + "https://developers.google.com/compute/docs/zones for a list of valid options. "
-      + "Default is up to the Dataflow service.")
-  String getZone();
-  void setZone(String value);
-
-  /**
-   * Machine type to create Dataflow worker VMs as.
-   *
-   * <p>See <a href="https://cloud.google.com/compute/docs/machine-types">GCE machine types</a>
-   * for a list of valid options.
-   *
-   * <p>If unset, the Dataflow service will choose a reasonable default.
-   */
-  @Description("Machine type to create Dataflow worker VMs as. See "
-      + "https://cloud.google.com/compute/docs/machine-types for a list of valid options. "
-      + "If unset, the Dataflow service will choose a reasonable default.")
-  String getWorkerMachineType();
-  void setWorkerMachineType(String value);
-
-  /**
-   * The policy for tearing down the workers spun up by the service.
-   */
-  public enum TeardownPolicy {
-    /**
-     * All VMs created for a Dataflow job are deleted when the job finishes, regardless of whether
-     * it fails or succeeds.
-     */
-    TEARDOWN_ALWAYS("TEARDOWN_ALWAYS"),
-    /**
-     * All VMs created for a Dataflow job are left running when the job finishes, regardless of
-     * whether it fails or succeeds.
-     */
-    TEARDOWN_NEVER("TEARDOWN_NEVER"),
-    /**
-     * All VMs created for a Dataflow job are deleted when the job succeeds, but are left running
-     * when it fails. (This is typically used for debugging failing jobs by SSHing into the
-     * workers.)
-     */
-    TEARDOWN_ON_SUCCESS("TEARDOWN_ON_SUCCESS");
-
-    private final String teardownPolicy;
-
-    private TeardownPolicy(String teardownPolicy) {
-      this.teardownPolicy = teardownPolicy;
-    }
-
-    public String getTeardownPolicyName() {
-      return this.teardownPolicy;
-    }
-  }
-
-  /**
-   * The teardown policy for the VMs.
-   *
-   * <p>If unset, the Dataflow service will choose a reasonable default.
-   */
-  @Description("The teardown policy for the VMs. If unset, the Dataflow service will "
-      + "choose a reasonable default.")
-  TeardownPolicy getTeardownPolicy();
-  void setTeardownPolicy(TeardownPolicy value);
-
-  /**
-   * List of local files to make available to workers.
-   *
-   * <p>Files are placed on the worker's classpath.
-   *
-   * <p>The default value is the list of jars from the main program's classpath.
-   */
-  @Description("Files to stage on GCS and make available to workers. "
-      + "Files are placed on the worker's classpath. "
-      + "The default value is all files from the classpath.")
-  @JsonIgnore
-  List<String> getFilesToStage();
-  void setFilesToStage(List<String> value);
-
-  /**
-   * Specifies what type of persistent disk should be used. The value should be a full or partial
-   * URL of a disk type resource, e.g., zones/us-central1-f/disks/pd-standard. For
-   * more information, see the
-   * <a href="https://cloud.google.com/compute/docs/reference/latest/diskTypes">API reference
-   * documentation for DiskTypes</a>.
-   */
-  @Description("Specifies what type of persistent disk should be used. The value should be a full "
-      + "or partial URL of a disk type resource, e.g., zones/us-central1-f/disks/pd-standard. For "
-      + "more information, see the API reference documentation for DiskTypes: "
-      + "https://cloud.google.com/compute/docs/reference/latest/diskTypes")
-  String getWorkerDiskType();
-  void setWorkerDiskType(String value);
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowProfilingOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowProfilingOptions.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowProfilingOptions.java
deleted file mode 100644
index 3cd7b03..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowProfilingOptions.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.options;
-
-import org.apache.beam.sdk.annotations.Experimental;
-
-import java.util.HashMap;
-
-/**
- * Options for controlling profiling of pipeline execution.
- */
-@Description("[Experimental] Used to configure profiling of the Dataflow pipeline")
-@Experimental
-@Hidden
-public interface DataflowProfilingOptions {
-
-  @Description("Whether to periodically dump profiling information to local disk.\n"
-      + "WARNING: Enabling this option may fill local disk with profiling information.")
-  boolean getEnableProfilingAgent();
-  void setEnableProfilingAgent(boolean enabled);
-
-  @Description(
-      "[INTERNAL] Additional configuration for the profiling agent. Not typically necessary.")
-  @Hidden
-  DataflowProfilingAgentConfiguration getProfilingAgentConfiguration();
-  void setProfilingAgentConfiguration(DataflowProfilingAgentConfiguration configuration);
-
-  /**
-   * Configuration the for profiling agent.
-   */
-  public static class DataflowProfilingAgentConfiguration extends HashMap<String, Object> {
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowWorkerHarnessOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowWorkerHarnessOptions.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowWorkerHarnessOptions.java
deleted file mode 100644
index 7705b66..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowWorkerHarnessOptions.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.options;
-
-/**
- * Options that are used exclusively within the Dataflow worker harness.
- * These options have no effect at pipeline creation time.
- */
-@Description("[Internal] Options that are used exclusively within the Dataflow worker harness. "
-    + "These options have no effect at pipeline creation time.")
-@Hidden
-public interface DataflowWorkerHarnessOptions extends DataflowPipelineOptions {
-  /**
-   * The identity of the worker running this pipeline.
-   */
-  @Description("The identity of the worker running this pipeline.")
-  String getWorkerId();
-  void setWorkerId(String value);
-
-  /**
-   * The identity of the Dataflow job.
-   */
-  @Description("The identity of the Dataflow job.")
-  String getJobId();
-  void setJobId(String value);
-
-  /**
-   * The size of the worker's in-memory cache, in megabytes.
-   *
-   * <p>Currently, this cache is used for storing read values of side inputs.
-   */
-  @Description("The size of the worker's in-memory cache, in megabytes.")
-  @Default.Integer(100)
-  Integer getWorkerCacheMb();
-  void setWorkerCacheMb(Integer value);
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowWorkerLoggingOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowWorkerLoggingOptions.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowWorkerLoggingOptions.java
deleted file mode 100644
index ebd42d9..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowWorkerLoggingOptions.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.options;
-
-import com.google.common.base.Preconditions;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Options that are used to control logging configuration on the Dataflow worker.
- */
-@Description("Options that are used to control logging configuration on the Dataflow worker.")
-public interface DataflowWorkerLoggingOptions extends PipelineOptions {
-  /**
-   * The set of log levels that can be used on the Dataflow worker.
-   */
-  public enum Level {
-    DEBUG, ERROR, INFO, TRACE, WARN
-  }
-
-  /**
-   * This option controls the default log level of all loggers without a log level override.
-   */
-  @Description("Controls the default log level of all loggers without a log level override.")
-  @Default.Enum("INFO")
-  Level getDefaultWorkerLogLevel();
-  void setDefaultWorkerLogLevel(Level level);
-
-  /**
-   * This option controls the log levels for specifically named loggers.
-   *
-   * <p>Later options with equivalent names override earlier options.
-   *
-   * <p>See {@link WorkerLogLevelOverrides} for more information on how to configure logging
-   * on a per {@link Class}, {@link Package}, or name basis. If used from the command line,
-   * the expected format is {"Name":"Level",...}, further details on
-   * {@link WorkerLogLevelOverrides#from}.
-   */
-  @Description("This option controls the log levels for specifically named loggers. "
-      + "The expected format is {\"Name\":\"Level\",...}. The Dataflow worker uses "
-      + "java.util.logging, which supports a logging hierarchy based off of names that are '.' "
-      + "separated. For example, by specifying the value {\"a.b.c.Foo\":\"DEBUG\"}, the logger "
-      + "for the class 'a.b.c.Foo' will be configured to output logs at the DEBUG level. "
-      + "Similarly, by specifying the value {\"a.b.c\":\"WARN\"}, all loggers underneath the "
-      + "'a.b.c' package will be configured to output logs at the WARN level. Also, note that "
-      + "when multiple overrides are specified, the exact name followed by the closest parent "
-      + "takes precedence.")
-  WorkerLogLevelOverrides getWorkerLogLevelOverrides();
-  void setWorkerLogLevelOverrides(WorkerLogLevelOverrides value);
-
-  /**
-   * Defines a log level override for a specific class, package, or name.
-   *
-   * <p>{@code java.util.logging} is used on the Dataflow worker harness and supports
-   * a logging hierarchy based off of names that are "." separated. It is a common
-   * pattern to have the logger for a given class share the same name as the class itself.
-   * Given the classes {@code a.b.c.Foo}, {@code a.b.c.Xyz}, and {@code a.b.Bar}, with
-   * loggers named {@code "a.b.c.Foo"}, {@code "a.b.c.Xyz"}, and {@code "a.b.Bar"} respectively,
-   * we can override the log levels:
-   * <ul>
-   *    <li>for {@code Foo} by specifying the name {@code "a.b.c.Foo"} or the {@link Class}
-   *    representing {@code a.b.c.Foo}.
-   *    <li>for {@code Foo}, {@code Xyz}, and {@code Bar} by specifying the name {@code "a.b"} or
-   *    the {@link Package} representing {@code a.b}.
-   *    <li>for {@code Foo} and {@code Bar} by specifying both of their names or classes.
-   * </ul>
-   * Note that by specifying multiple overrides, the exact name followed by the closest parent
-   * takes precedence.
-   */
-  public static class WorkerLogLevelOverrides extends HashMap<String, Level> {
-    /**
-     * Overrides the default log level for the passed in class.
-     *
-     * <p>This is equivalent to calling
-     * {@link #addOverrideForName(String, DataflowWorkerLoggingOptions.Level)}
-     * and passing in the {@link Class#getName() class name}.
-     */
-    public WorkerLogLevelOverrides addOverrideForClass(Class<?> klass, Level level) {
-      Preconditions.checkNotNull(klass, "Expected class to be not null.");
-      addOverrideForName(klass.getName(), level);
-      return this;
-    }
-
-    /**
-     * Overrides the default log level for the passed in package.
-     *
-     * <p>This is equivalent to calling
-     * {@link #addOverrideForName(String, DataflowWorkerLoggingOptions.Level)}
-     * and passing in the {@link Package#getName() package name}.
-     */
-    public WorkerLogLevelOverrides addOverrideForPackage(Package pkg, Level level) {
-      Preconditions.checkNotNull(pkg, "Expected package to be not null.");
-      addOverrideForName(pkg.getName(), level);
-      return this;
-    }
-
-    /**
-     * Overrides the default log level for the passed in name.
-     *
-     * <p>Note that because of the hierarchical nature of logger names, this will
-     * override the log level of all loggers that have the passed in name or
-     * a parent logger that has the passed in name.
-     */
-    public WorkerLogLevelOverrides addOverrideForName(String name, Level level) {
-      Preconditions.checkNotNull(name, "Expected name to be not null.");
-      Preconditions.checkNotNull(level,
-          "Expected level to be one of %s.", Arrays.toString(Level.values()));
-      put(name, level);
-      return this;
-    }
-
-    /**
-     * Expects a map keyed by logger {@code Name}s with values representing {@code Level}s.
-     * The {@code Name} generally represents the fully qualified Java
-     * {@link Class#getName() class name}, or fully qualified Java
-     * {@link Package#getName() package name}, or custom logger name. The {@code Level}
-     * represents the log level and must be one of {@link Level}.
-     */
-    @JsonCreator
-    public static WorkerLogLevelOverrides from(Map<String, String> values) {
-      Preconditions.checkNotNull(values, "Expected values to be not null.");
-      WorkerLogLevelOverrides overrides = new WorkerLogLevelOverrides();
-      for (Map.Entry<String, String> entry : values.entrySet()) {
-        try {
-          overrides.addOverrideForName(entry.getKey(), Level.valueOf(entry.getValue()));
-        } catch (IllegalArgumentException e) {
-          throw new IllegalArgumentException(String.format(
-              "Unsupported log level '%s' requested for %s. Must be one of %s.",
-              entry.getValue(), entry.getKey(), Arrays.toString(Level.values())));
-        }
-
-      }
-      return overrides;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/BlockingDataflowPipelineRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/BlockingDataflowPipelineRunner.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/BlockingDataflowPipelineRunner.java
deleted file mode 100644
index 9096c2b..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/BlockingDataflowPipelineRunner.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.runners;
-
-import org.apache.beam.sdk.annotations.Experimental;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.PipelineResult.State;
-import com.google.cloud.dataflow.sdk.options.BlockingDataflowPipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsValidator;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.util.MonitoringUtil;
-import com.google.cloud.dataflow.sdk.values.PInput;
-import com.google.cloud.dataflow.sdk.values.POutput;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-
-import javax.annotation.Nullable;
-
-/**
- * A {@link PipelineRunner} that's like {@link DataflowPipelineRunner}
- * but that waits for the launched job to finish.
- *
- * <p>Prints out job status updates and console messages while it waits.
- *
- * <p>Returns the final job state, or throws an exception if the job
- * fails or cannot be monitored.
- *
- * <p><h3>Permissions</h3>
- * When reading from a Dataflow source or writing to a Dataflow sink using
- * {@code BlockingDataflowPipelineRunner}, the Google cloud services account and the Google compute
- * engine service account of the GCP project running the Dataflow Job will need access to the
- * corresponding source/sink.
- *
- * <p>Please see <a href="https://cloud.google.com/dataflow/security-and-permissions">Google Cloud
- * Dataflow Security and Permissions</a> for more details.
- */
-public class BlockingDataflowPipelineRunner extends
-    PipelineRunner<DataflowPipelineJob> {
-  private static final Logger LOG = LoggerFactory.getLogger(BlockingDataflowPipelineRunner.class);
-
-  // Defaults to an infinite wait period.
-  // TODO: make this configurable after removal of option map.
-  private static final long BUILTIN_JOB_TIMEOUT_SEC = -1L;
-
-  private final DataflowPipelineRunner dataflowPipelineRunner;
-  private final BlockingDataflowPipelineOptions options;
-
-  protected BlockingDataflowPipelineRunner(
-      DataflowPipelineRunner internalRunner,
-      BlockingDataflowPipelineOptions options) {
-    this.dataflowPipelineRunner = internalRunner;
-    this.options = options;
-  }
-
-  /**
-   * Constructs a runner from the provided options.
-   */
-  public static BlockingDataflowPipelineRunner fromOptions(
-      PipelineOptions options) {
-    BlockingDataflowPipelineOptions dataflowOptions =
-        PipelineOptionsValidator.validate(BlockingDataflowPipelineOptions.class, options);
-    DataflowPipelineRunner dataflowPipelineRunner =
-        DataflowPipelineRunner.fromOptions(dataflowOptions);
-
-    return new BlockingDataflowPipelineRunner(dataflowPipelineRunner, dataflowOptions);
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * @throws DataflowJobExecutionException if there is an exception during job execution.
-   * @throws DataflowServiceException if there is an exception retrieving information about the job.
-   */
-  @Override
-  public DataflowPipelineJob run(Pipeline p) {
-    final DataflowPipelineJob job = dataflowPipelineRunner.run(p);
-
-    // We ignore the potential race condition here (Ctrl-C after job submission but before the
-    // shutdown hook is registered). Even if we tried to do something smarter (eg., SettableFuture)
-    // the run method (which produces the job) could fail or be Ctrl-C'd before it had returned a
-    // job. The display of the command to cancel the job is best-effort anyways -- RPC's could fail,
-    // etc. If the user wants to verify the job was cancelled they should look at the job status.
-    Thread shutdownHook = new Thread() {
-      @Override
-      public void run() {
-        LOG.warn("Job is already running in Google Cloud Platform, Ctrl-C will not cancel it.\n"
-            + "To cancel the job in the cloud, run:\n> {}",
-            MonitoringUtil.getGcloudCancelCommand(options, job.getJobId()));
-      }
-    };
-
-    try {
-      Runtime.getRuntime().addShutdownHook(shutdownHook);
-
-      @Nullable
-      State result;
-      try {
-        result = job.waitToFinish(
-            BUILTIN_JOB_TIMEOUT_SEC, TimeUnit.SECONDS,
-            new MonitoringUtil.PrintHandler(options.getJobMessageOutput()));
-      } catch (IOException | InterruptedException ex) {
-        if (ex instanceof InterruptedException) {
-          Thread.currentThread().interrupt();
-        }
-        LOG.debug("Exception caught while retrieving status for job {}", job.getJobId(), ex);
-        throw new DataflowServiceException(
-            job, "Exception caught while retrieving status for job " + job.getJobId(), ex);
-      }
-
-      if (result == null) {
-        throw new DataflowServiceException(
-            job, "Timed out while retrieving status for job " + job.getJobId());
-      }
-
-      LOG.info("Job finished with status {}", result);
-      if (!result.isTerminal()) {
-        throw new IllegalStateException("Expected terminal state for job " + job.getJobId()
-            + ", got " + result);
-      }
-
-      if (result == State.DONE) {
-        return job;
-      } else if (result == State.UPDATED) {
-        DataflowPipelineJob newJob = job.getReplacedByJob();
-        LOG.info("Job {} has been updated and is running as the new job with id {}."
-            + "To access the updated job on the Dataflow monitoring console, please navigate to {}",
-            job.getJobId(),
-            newJob.getJobId(),
-            MonitoringUtil.getJobMonitoringPageURL(newJob.getProjectId(), newJob.getJobId()));
-        throw new DataflowJobUpdatedException(
-            job,
-            String.format("Job %s updated; new job is %s.", job.getJobId(), newJob.getJobId()),
-            newJob);
-      } else if (result == State.CANCELLED) {
-        String message = String.format("Job %s cancelled by user", job.getJobId());
-        LOG.info(message);
-        throw new DataflowJobCancelledException(job, message);
-      } else {
-        throw new DataflowJobExecutionException(job, "Job " + job.getJobId()
-            + " failed with status " + result);
-      }
-    } finally {
-      Runtime.getRuntime().removeShutdownHook(shutdownHook);
-    }
-  }
-
-  @Override
-  public <OutputT extends POutput, InputT extends PInput> OutputT apply(
-      PTransform<InputT, OutputT> transform, InputT input) {
-    return dataflowPipelineRunner.apply(transform, input);
-  }
-
-  /**
-   * Sets callbacks to invoke during execution. See {@link DataflowPipelineRunnerHooks}.
-   */
-  @Experimental
-  public void setHooks(DataflowPipelineRunnerHooks hooks) {
-    this.dataflowPipelineRunner.setHooks(hooks);
-  }
-
-  @Override
-  public String toString() {
-    return "BlockingDataflowPipelineRunner#" + options.getJobName();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobAlreadyExistsException.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobAlreadyExistsException.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobAlreadyExistsException.java
deleted file mode 100644
index 4a4f100..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobAlreadyExistsException.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.runners;
-
-/**
- * An exception that is thrown if the unique job name constraint of the Dataflow
- * service is broken because an existing job with the same job name is currently active.
- * The {@link DataflowPipelineJob} contained within this exception contains information
- * about the pre-existing job.
- */
-public class DataflowJobAlreadyExistsException extends DataflowJobException {
-  /**
-   * Create a new {@code DataflowJobAlreadyExistsException} with the specified {@link
-   * DataflowPipelineJob} and message.
-   */
-  public DataflowJobAlreadyExistsException(
-      DataflowPipelineJob job, String message) {
-    super(job, message, null);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobAlreadyUpdatedException.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobAlreadyUpdatedException.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobAlreadyUpdatedException.java
deleted file mode 100644
index 1f52c6a..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobAlreadyUpdatedException.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.runners;
-
-/**
- * An exception that is thrown if the existing job has already been updated within the Dataflow
- * service and is no longer able to be updated. The {@link DataflowPipelineJob} contained within
- * this exception contains information about the pre-existing updated job.
- */
-public class DataflowJobAlreadyUpdatedException extends DataflowJobException {
-  /**
-   * Create a new {@code DataflowJobAlreadyUpdatedException} with the specified {@link
-   * DataflowPipelineJob} and message.
-   */
-  public DataflowJobAlreadyUpdatedException(
-      DataflowPipelineJob job, String message) {
-    super(job, message, null);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobCancelledException.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobCancelledException.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobCancelledException.java
deleted file mode 100644
index 495ca5a..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobCancelledException.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.runners;
-
-/**
- * Signals that a job run by a {@link BlockingDataflowPipelineRunner} was updated during execution.
- */
-public class DataflowJobCancelledException extends DataflowJobException {
-  /**
-   * Create a new {@code DataflowJobAlreadyUpdatedException} with the specified {@link
-   * DataflowPipelineJob} and message.
-   */
-  public DataflowJobCancelledException(DataflowPipelineJob job, String message) {
-    super(job, message, null);
-  }
-
-  /**
-   * Create a new {@code DataflowJobAlreadyUpdatedException} with the specified {@link
-   * DataflowPipelineJob}, message, and cause.
-   */
-  public DataflowJobCancelledException(DataflowPipelineJob job, String message, Throwable cause) {
-    super(job, message, cause);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobException.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobException.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobException.java
deleted file mode 100644
index a22d13c..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobException.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.runners;
-
-import java.util.Objects;
-
-import javax.annotation.Nullable;
-
-/**
- * A {@link RuntimeException} that contains information about a {@link DataflowPipelineJob}.
- */
-public abstract class DataflowJobException extends RuntimeException {
-  private final DataflowPipelineJob job;
-
-  DataflowJobException(DataflowPipelineJob job, String message, @Nullable Throwable cause) {
-    super(message, cause);
-    this.job = Objects.requireNonNull(job);
-  }
-
-  /**
-   * Returns the failed job.
-   */
-  public DataflowPipelineJob getJob() {
-    return job;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobExecutionException.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobExecutionException.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobExecutionException.java
deleted file mode 100644
index 3dbb007..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobExecutionException.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.runners;
-
-import javax.annotation.Nullable;
-
-/**
- * Signals that a job run by a {@link BlockingDataflowPipelineRunner} fails during execution, and
- * provides access to the failed job.
- */
-public class DataflowJobExecutionException extends DataflowJobException {
-  DataflowJobExecutionException(DataflowPipelineJob job, String message) {
-    this(job, message, null);
-  }
-
-  DataflowJobExecutionException(
-      DataflowPipelineJob job, String message, @Nullable Throwable cause) {
-    super(job, message, cause);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobUpdatedException.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobUpdatedException.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobUpdatedException.java
deleted file mode 100644
index 72deb45..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobUpdatedException.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.runners;
-
-/**
- * Signals that a job run by a {@link BlockingDataflowPipelineRunner} was updated during execution.
- */
-public class DataflowJobUpdatedException extends DataflowJobException {
-  private DataflowPipelineJob replacedByJob;
-
-  /**
-   * Create a new {@code DataflowJobUpdatedException} with the specified original {@link
-   * DataflowPipelineJob}, message, and replacement {@link DataflowPipelineJob}.
-   */
-  public DataflowJobUpdatedException(
-      DataflowPipelineJob job, String message, DataflowPipelineJob replacedByJob) {
-    this(job, message, replacedByJob, null);
-  }
-
-  /**
-   * Create a new {@code DataflowJobUpdatedException} with the specified original {@link
-   * DataflowPipelineJob}, message, replacement {@link DataflowPipelineJob}, and cause.
-   */
-  public DataflowJobUpdatedException(
-      DataflowPipelineJob job, String message, DataflowPipelineJob replacedByJob, Throwable cause) {
-    super(job, message, cause);
-    this.replacedByJob = replacedByJob;
-  }
-
-  /**
-   * The new job that replaces the job terminated with this exception.
-   */
-  public DataflowPipelineJob getReplacedByJob() {
-    return replacedByJob;
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipeline.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipeline.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipeline.java
deleted file mode 100644
index 7b97c7d..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipeline.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.runners;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-
-/**
- * A {@link DataflowPipeline} is a {@link Pipeline} that returns a
- * {@link DataflowPipelineJob} when it is
- * {@link com.google.cloud.dataflow.sdk.Pipeline#run()}.
- *
- * <p>This is not intended for use by users of Cloud Dataflow.
- * Instead, use {@link Pipeline#create(PipelineOptions)} to initialize a
- * {@link Pipeline}.
- */
-public class DataflowPipeline extends Pipeline {
-
-  /**
-   * Creates and returns a new {@link DataflowPipeline} instance for tests.
-   */
-  public static DataflowPipeline create(DataflowPipelineOptions options) {
-    return new DataflowPipeline(options);
-  }
-
-  private DataflowPipeline(DataflowPipelineOptions options) {
-    super(DataflowPipelineRunner.fromOptions(options), options);
-  }
-
-  @Override
-  public DataflowPipelineJob run() {
-    return (DataflowPipelineJob) super.run();
-  }
-
-  @Override
-  public DataflowPipelineRunner getRunner() {
-    return (DataflowPipelineRunner) super.getRunner();
-  }
-
-  @Override
-  public String toString() {
-    return "DataflowPipeline#" + getOptions().as(DataflowPipelineOptions.class).getJobName();
-  }
-}