You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/04/14 06:48:25 UTC
[38/74] [partial] incubator-beam git commit: Rename
com/google/cloud/dataflow->org/apache/beam
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java8/src/test/java/org/apache/beam/examples/complete/game/UserScoreTest.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/UserScoreTest.java b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/UserScoreTest.java
new file mode 100644
index 0000000..b907ae7
--- /dev/null
+++ b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/UserScoreTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.examples.complete.game;
+
+import com.google.cloud.dataflow.examples.complete.game.UserScore.ExtractAndSumScore;
+import com.google.cloud.dataflow.examples.complete.game.UserScore.GameActionInfo;
+import com.google.cloud.dataflow.examples.complete.game.UserScore.ParseEventFn;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
+import com.google.cloud.dataflow.sdk.testing.PAssert;
+import com.google.cloud.dataflow.sdk.testing.RunnableOnService;
+import com.google.cloud.dataflow.sdk.testing.TestPipeline;
+import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.transforms.DoFnTester;
+import com.google.cloud.dataflow.sdk.transforms.MapElements;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Tests of UserScore.
+ */
+@RunWith(JUnit4.class)
+public class UserScoreTest implements Serializable {
+
+ static final String[] GAME_EVENTS_ARRAY = new String[] {
+ "user0_MagentaKangaroo,MagentaKangaroo,3,1447955630000,2015-11-19 09:53:53.444",
+ "user13_ApricotQuokka,ApricotQuokka,15,1447955630000,2015-11-19 09:53:53.444",
+ "user6_AmberNumbat,AmberNumbat,11,1447955630000,2015-11-19 09:53:53.444",
+ "user7_AlmondWallaby,AlmondWallaby,15,1447955630000,2015-11-19 09:53:53.444",
+ "user7_AndroidGreenKookaburra,AndroidGreenKookaburra,12,1447955630000,2015-11-19 09:53:53.444",
+ "user6_AliceBlueDingo,AliceBlueDingo,4,xxxxxxx,2015-11-19 09:53:53.444",
+ "user7_AndroidGreenKookaburra,AndroidGreenKookaburra,11,1447955630000,2015-11-19 09:53:53.444",
+ "THIS IS A PARSE ERROR,2015-11-19 09:53:53.444",
+ "user19_BisqueBilby,BisqueBilby,6,1447955630000,2015-11-19 09:53:53.444",
+ "user19_BisqueBilby,BisqueBilby,8,1447955630000,2015-11-19 09:53:53.444"
+ };
+
+ static final String[] GAME_EVENTS_ARRAY2 = new String[] {
+ "user6_AliceBlueDingo,AliceBlueDingo,4,xxxxxxx,2015-11-19 09:53:53.444",
+ "THIS IS A PARSE ERROR,2015-11-19 09:53:53.444",
+ "user13_BisqueBilby,BisqueBilby,xxx,1447955630000,2015-11-19 09:53:53.444"
+ };
+
+ static final List<String> GAME_EVENTS = Arrays.asList(GAME_EVENTS_ARRAY);
+ static final List<String> GAME_EVENTS2 = Arrays.asList(GAME_EVENTS_ARRAY2);
+
+ static final List<KV<String, Integer>> USER_SUMS = Arrays.asList(
+ KV.of("user0_MagentaKangaroo", 3), KV.of("user13_ApricotQuokka", 15),
+ KV.of("user6_AmberNumbat", 11), KV.of("user7_AlmondWallaby", 15),
+ KV.of("user7_AndroidGreenKookaburra", 23),
+ KV.of("user19_BisqueBilby", 14));
+
+ static final List<KV<String, Integer>> TEAM_SUMS = Arrays.asList(
+ KV.of("MagentaKangaroo", 3), KV.of("ApricotQuokka", 15),
+ KV.of("AmberNumbat", 11), KV.of("AlmondWallaby", 15),
+ KV.of("AndroidGreenKookaburra", 23),
+ KV.of("BisqueBilby", 14));
+
+ /** Test the ParseEventFn DoFn. */
+ @Test
+ public void testParseEventFn() {
+ DoFnTester<String, GameActionInfo> parseEventFn =
+ DoFnTester.of(new ParseEventFn());
+
+ List<GameActionInfo> results = parseEventFn.processBatch(GAME_EVENTS_ARRAY);
+ Assert.assertEquals(results.size(), 8);
+ Assert.assertEquals(results.get(0).getUser(), "user0_MagentaKangaroo");
+ Assert.assertEquals(results.get(0).getTeam(), "MagentaKangaroo");
+ Assert.assertEquals(results.get(0).getScore(), new Integer(3));
+ }
+
+ /** Tests ExtractAndSumScore("user"). */
+ @Test
+ @Category(RunnableOnService.class)
+ public void testUserScoreSums() throws Exception {
+ Pipeline p = TestPipeline.create();
+
+ PCollection<String> input = p.apply(Create.of(GAME_EVENTS).withCoder(StringUtf8Coder.of()));
+
+ PCollection<KV<String, Integer>> output = input
+ .apply(ParDo.of(new ParseEventFn()))
+ // Extract and sum username/score pairs from the event data.
+ .apply("ExtractUserScore", new ExtractAndSumScore("user"));
+
+ // Check the user score sums.
+ PAssert.that(output).containsInAnyOrder(USER_SUMS);
+
+ p.run();
+ }
+
+ /** Tests ExtractAndSumScore("team"). */
+ @Test
+ @Category(RunnableOnService.class)
+ public void testTeamScoreSums() throws Exception {
+ Pipeline p = TestPipeline.create();
+
+ PCollection<String> input = p.apply(Create.of(GAME_EVENTS).withCoder(StringUtf8Coder.of()));
+
+ PCollection<KV<String, Integer>> output = input
+ .apply(ParDo.of(new ParseEventFn()))
+ // Extract and sum teamname/score pairs from the event data.
+ .apply("ExtractTeamScore", new ExtractAndSumScore("team"));
+
+ // Check the team score sums.
+ PAssert.that(output).containsInAnyOrder(TEAM_SUMS);
+
+ p.run();
+ }
+
+ /** Test that bad input data is dropped appropriately. */
+ @Test
+ @Category(RunnableOnService.class)
+ public void testUserScoresBadInput() throws Exception {
+ Pipeline p = TestPipeline.create();
+
+ PCollection<String> input = p.apply(Create.of(GAME_EVENTS2).withCoder(StringUtf8Coder.of()));
+
+ PCollection<KV<String, Integer>> extract = input
+ .apply(ParDo.of(new ParseEventFn()))
+ .apply(
+ MapElements.via((GameActionInfo gInfo) -> KV.of(gInfo.getUser(), gInfo.getScore()))
+ .withOutputType(new TypeDescriptor<KV<String, Integer>>() {}));
+
+ PAssert.that(extract).empty();
+
+ p.run();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/BlockingDataflowPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/BlockingDataflowPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/BlockingDataflowPipelineOptions.java
deleted file mode 100644
index 6bbafdd..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/BlockingDataflowPipelineOptions.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.options;
-
-import com.google.cloud.dataflow.sdk.runners.BlockingDataflowPipelineRunner;
-
-import com.fasterxml.jackson.annotation.JsonIgnore;
-
-import java.io.PrintStream;
-
-/**
- * Options that are used to configure the {@link BlockingDataflowPipelineRunner}.
- */
-@Description("Configure options on the BlockingDataflowPipelineRunner.")
-public interface BlockingDataflowPipelineOptions extends DataflowPipelineOptions {
- /**
- * Output stream for job status messages.
- */
- @Description("Where messages generated during execution of the Dataflow job will be output.")
- @JsonIgnore
- @Hidden
- @Default.InstanceFactory(StandardOutputFactory.class)
- PrintStream getJobMessageOutput();
- void setJobMessageOutput(PrintStream value);
-
- /**
- * Returns a default of {@link System#out}.
- */
- public static class StandardOutputFactory implements DefaultValueFactory<PrintStream> {
- @Override
- public PrintStream create(PipelineOptions options) {
- return System.out;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/CloudDebuggerOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/CloudDebuggerOptions.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/CloudDebuggerOptions.java
deleted file mode 100644
index 3f0503e..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/CloudDebuggerOptions.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.options;
-
-import org.apache.beam.sdk.annotations.Experimental;
-
-import com.google.api.services.clouddebugger.v2.model.Debuggee;
-
-import javax.annotation.Nullable;
-
-/**
- * Options for controlling Cloud Debugger.
- */
-@Description("[Experimental] Used to configure the Cloud Debugger")
-@Experimental
-@Hidden
-public interface CloudDebuggerOptions {
-
- /** Whether to enable the Cloud Debugger snapshot agent for the current job. */
- @Description("Whether to enable the Cloud Debugger snapshot agent for the current job.")
- boolean getEnableCloudDebugger();
- void setEnableCloudDebugger(boolean enabled);
-
- /** The Cloud Debugger debuggee to associate with. This should not be set directly. */
- @Description("The Cloud Debugger debuggee to associate with. This should not be set directly.")
- @Hidden
- @Nullable Debuggee getDebuggee();
- void setDebuggee(Debuggee debuggee);
-
- /** The maximum cost (as a ratio of CPU time) allowed for evaluating conditional snapshots. */
- @Description(
- "The maximum cost (as a ratio of CPU time) allowed for evaluating conditional snapshots. "
- + "Should be a double between 0 and 1. "
- + "Snapshots will be cancelled if evaluating conditions takes more than this ratio of time.")
- @Default.Double(0.01)
- double getMaxConditionCost();
- void setMaxConditionCost(double maxConditionCost);
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineDebugOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineDebugOptions.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineDebugOptions.java
deleted file mode 100644
index 1be93eb..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineDebugOptions.java
+++ /dev/null
@@ -1,254 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.options;
-
-import org.apache.beam.sdk.annotations.Experimental;
-
-import com.google.api.services.dataflow.Dataflow;
-import com.google.cloud.dataflow.sdk.util.DataflowPathValidator;
-import com.google.cloud.dataflow.sdk.util.DataflowTransport;
-import com.google.cloud.dataflow.sdk.util.GcsStager;
-import com.google.cloud.dataflow.sdk.util.InstanceBuilder;
-import com.google.cloud.dataflow.sdk.util.PathValidator;
-import com.google.cloud.dataflow.sdk.util.Stager;
-
-import com.fasterxml.jackson.annotation.JsonIgnore;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * Internal. Options used to control execution of the Dataflow SDK for
- * debugging and testing purposes.
- */
-@Description("[Internal] Options used to control execution of the Dataflow SDK for "
- + "debugging and testing purposes.")
-@Hidden
-public interface DataflowPipelineDebugOptions extends PipelineOptions {
-
- /**
- * The list of backend experiments to enable.
- *
- * <p>Dataflow provides a number of experimental features that can be enabled
- * with this flag.
- *
- * <p>Please sync with the Dataflow team before enabling any experiments.
- */
- @Description("[Experimental] Dataflow provides a number of experimental features that can "
- + "be enabled with this flag. Please sync with the Dataflow team before enabling any "
- + "experiments.")
- @Experimental
- List<String> getExperiments();
- void setExperiments(List<String> value);
-
- /**
- * The root URL for the Dataflow API. {@code dataflowEndpoint} can override this value
- * if it contains an absolute URL, otherwise {@code apiRootUrl} will be combined with
- * {@code dataflowEndpoint} to generate the full URL to communicate with the Dataflow API.
- */
- @Description("The root URL for the Dataflow API. dataflowEndpoint can override this "
- + "value if it contains an absolute URL, otherwise apiRootUrl will be combined with "
- + "dataflowEndpoint to generate the full URL to communicate with the Dataflow API.")
- @Default.String(Dataflow.DEFAULT_ROOT_URL)
- String getApiRootUrl();
- void setApiRootUrl(String value);
-
- /**
- * Dataflow endpoint to use.
- *
- * <p>Defaults to the current version of the Google Cloud Dataflow
- * API, at the time the current SDK version was released.
- *
- * <p>If the string contains "://", then this is treated as a URL,
- * otherwise {@link #getApiRootUrl()} is used as the root
- * URL.
- */
- @Description("The URL for the Dataflow API. If the string contains \"://\", this"
- + " will be treated as the entire URL, otherwise will be treated relative to apiRootUrl.")
- @Default.String(Dataflow.DEFAULT_SERVICE_PATH)
- String getDataflowEndpoint();
- void setDataflowEndpoint(String value);
-
- /**
- * The path to write the translated Dataflow job specification out to
- * at job submission time. The Dataflow job specification will be represented in JSON
- * format.
- */
- @Description("The path to write the translated Dataflow job specification out to "
- + "at job submission time. The Dataflow job specification will be represented in JSON "
- + "format.")
- String getDataflowJobFile();
- void setDataflowJobFile(String value);
-
- /**
- * The class of the validator that should be created and used to validate paths.
- * If pathValidator has not been set explicitly, an instance of this class will be
- * constructed and used as the path validator.
- */
- @Description("The class of the validator that should be created and used to validate paths. "
- + "If pathValidator has not been set explicitly, an instance of this class will be "
- + "constructed and used as the path validator.")
- @Default.Class(DataflowPathValidator.class)
- Class<? extends PathValidator> getPathValidatorClass();
- void setPathValidatorClass(Class<? extends PathValidator> validatorClass);
-
- /**
- * The path validator instance that should be used to validate paths.
- * If no path validator has been set explicitly, the default is to use the instance factory that
- * constructs a path validator based upon the currently set pathValidatorClass.
- */
- @JsonIgnore
- @Description("The path validator instance that should be used to validate paths. "
- + "If no path validator has been set explicitly, the default is to use the instance factory "
- + "that constructs a path validator based upon the currently set pathValidatorClass.")
- @Default.InstanceFactory(PathValidatorFactory.class)
- PathValidator getPathValidator();
- void setPathValidator(PathValidator validator);
-
- /**
- * The class responsible for staging resources to be accessible by workers
- * during job execution. If stager has not been set explicitly, an instance of this class
- * will be created and used as the resource stager.
- */
- @Description("The class of the stager that should be created and used to stage resources. "
- + "If stager has not been set explicitly, an instance of the this class will be created "
- + "and used as the resource stager.")
- @Default.Class(GcsStager.class)
- Class<? extends Stager> getStagerClass();
- void setStagerClass(Class<? extends Stager> stagerClass);
-
- /**
- * The resource stager instance that should be used to stage resources.
- * If no stager has been set explicitly, the default is to use the instance factory
- * that constructs a resource stager based upon the currently set stagerClass.
- */
- @JsonIgnore
- @Description("The resource stager instance that should be used to stage resources. "
- + "If no stager has been set explicitly, the default is to use the instance factory "
- + "that constructs a resource stager based upon the currently set stagerClass.")
- @Default.InstanceFactory(StagerFactory.class)
- Stager getStager();
- void setStager(Stager stager);
-
- /**
- * An instance of the Dataflow client. Defaults to creating a Dataflow client
- * using the current set of options.
- */
- @JsonIgnore
- @Description("An instance of the Dataflow client. Defaults to creating a Dataflow client "
- + "using the current set of options.")
- @Default.InstanceFactory(DataflowClientFactory.class)
- Dataflow getDataflowClient();
- void setDataflowClient(Dataflow value);
-
- /** Returns the default Dataflow client built from the passed in PipelineOptions. */
- public static class DataflowClientFactory implements DefaultValueFactory<Dataflow> {
- @Override
- public Dataflow create(PipelineOptions options) {
- return DataflowTransport.newDataflowClient(
- options.as(DataflowPipelineOptions.class)).build();
- }
- }
-
- /**
- * Whether to update the currently running pipeline with the same name as this one.
- *
- * @deprecated This property is replaced by {@link DataflowPipelineOptions#getUpdate()}
- */
- @Deprecated
- @Description("If set, replace the existing pipeline with the name specified by --jobName with "
- + "this pipeline, preserving state.")
- boolean getUpdate();
- @Deprecated
- void setUpdate(boolean value);
-
- /**
- * Mapping of old PTranform names to new ones, specified as JSON
- * <code>{"oldName":"newName",...}</code>. To mark a transform as deleted, make newName the
- * empty string.
- */
- @JsonIgnore
- @Description(
- "Mapping of old PTranform names to new ones, specified as JSON "
- + "{\"oldName\":\"newName\",...}. To mark a transform as deleted, make newName the empty "
- + "string.")
- Map<String, String> getTransformNameMapping();
- void setTransformNameMapping(Map<String, String> value);
-
- /**
- * Custom windmill_main binary to use with the streaming runner.
- */
- @Description("Custom windmill_main binary to use with the streaming runner")
- String getOverrideWindmillBinary();
- void setOverrideWindmillBinary(String value);
-
- /**
- * Number of threads to use on the Dataflow worker harness. If left unspecified,
- * the Dataflow service will compute an appropriate number of threads to use.
- */
- @Description("Number of threads to use on the Dataflow worker harness. If left unspecified, "
- + "the Dataflow service will compute an appropriate number of threads to use.")
- int getNumberOfWorkerHarnessThreads();
- void setNumberOfWorkerHarnessThreads(int value);
-
- /**
- * If {@literal true}, save a heap dump before killing a thread or process which is GC
- * thrashing or out of memory. The location of the heap file will either be echoed back
- * to the user, or the user will be given the opportunity to download the heap file.
- *
- * <p>
- * CAUTION: Heap dumps can of comparable size to the default boot disk. Consider increasing
- * the boot disk size before setting this flag to true.
- */
- @Description("If {@literal true}, save a heap dump before killing a thread or process "
- + "which is GC thrashing or out of memory.")
- boolean getDumpHeapOnOOM();
- void setDumpHeapOnOOM(boolean dumpHeapBeforeExit);
-
- /**
- * Creates a {@link PathValidator} object using the class specified in
- * {@link #getPathValidatorClass()}.
- */
- public static class PathValidatorFactory implements DefaultValueFactory<PathValidator> {
- @Override
- public PathValidator create(PipelineOptions options) {
- DataflowPipelineDebugOptions debugOptions = options.as(DataflowPipelineDebugOptions.class);
- return InstanceBuilder.ofType(PathValidator.class)
- .fromClass(debugOptions.getPathValidatorClass())
- .fromFactoryMethod("fromOptions")
- .withArg(PipelineOptions.class, options)
- .build();
- }
- }
-
- /**
- * Creates a {@link Stager} object using the class specified in
- * {@link #getStagerClass()}.
- */
- public static class StagerFactory implements DefaultValueFactory<Stager> {
- @Override
- public Stager create(PipelineOptions options) {
- DataflowPipelineDebugOptions debugOptions = options.as(DataflowPipelineDebugOptions.class);
- return InstanceBuilder.ofType(Stager.class)
- .fromClass(debugOptions.getStagerClass())
- .fromFactoryMethod("fromOptions")
- .withArg(PipelineOptions.class, options)
- .build();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineOptions.java
deleted file mode 100644
index dbfafd1..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineOptions.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.options;
-
-import com.google.cloud.dataflow.sdk.runners.DataflowPipeline;
-import com.google.common.base.MoreObjects;
-
-import org.joda.time.DateTimeUtils;
-import org.joda.time.DateTimeZone;
-import org.joda.time.format.DateTimeFormat;
-import org.joda.time.format.DateTimeFormatter;
-
-/**
- * Options that can be used to configure the {@link DataflowPipeline}.
- */
-@Description("Options that configure the Dataflow pipeline.")
-public interface DataflowPipelineOptions extends
- PipelineOptions, GcpOptions, ApplicationNameOptions, DataflowPipelineDebugOptions,
- DataflowPipelineWorkerPoolOptions, BigQueryOptions,
- GcsOptions, StreamingOptions, CloudDebuggerOptions, DataflowWorkerLoggingOptions,
- DataflowProfilingOptions, PubsubOptions {
-
- @Description("Project id. Required when running a Dataflow in the cloud. "
- + "See https://cloud.google.com/storage/docs/projects for further details.")
- @Override
- @Validation.Required
- @Default.InstanceFactory(DefaultProjectFactory.class)
- String getProject();
- @Override
- void setProject(String value);
-
- /**
- * GCS path for staging local files, e.g. gs://bucket/object
- *
- * <p>Must be a valid Cloud Storage URL, beginning with the prefix "gs://"
- *
- * <p>At least one of {@link PipelineOptions#getTempLocation()} or {@link #getStagingLocation()}
- * must be set. If {@link #getStagingLocation()} is not set, then the Dataflow
- * pipeline defaults to using {@link PipelineOptions#getTempLocation()}.
- */
- @Description("GCS path for staging local files, e.g. \"gs://bucket/object\". "
- + "Must be a valid Cloud Storage URL, beginning with the prefix \"gs://\". "
- + "At least one of stagingLocation or tempLocation must be set. If stagingLocation is unset, "
- + "defaults to using tempLocation.")
- String getStagingLocation();
- void setStagingLocation(String value);
-
- /**
- * The Dataflow job name is used as an idempotence key within the Dataflow service.
- * If there is an existing job that is currently active, another active job with the same
- * name will not be able to be created. Defaults to using the ApplicationName-UserName-Date.
- */
- @Description("The Dataflow job name is used as an idempotence key within the Dataflow service. "
- + "If there is an existing job that is currently active, another active job with the same "
- + "name will not be able to be created. Defaults to using the ApplicationName-UserName-Date.")
- @Default.InstanceFactory(JobNameFactory.class)
- String getJobName();
- void setJobName(String value);
-
- /**
- * Whether to update the currently running pipeline with the same name as this one.
- */
- @Override
- @SuppressWarnings("deprecation") // base class member deprecated in favor of this one.
- @Description(
- "If set, replace the existing pipeline with the name specified by --jobName with "
- + "this pipeline, preserving state.")
- boolean getUpdate();
- @Override
- @SuppressWarnings("deprecation") // base class member deprecated in favor of this one.
- void setUpdate(boolean value);
-
- /**
- * Returns a normalized job name constructed from {@link ApplicationNameOptions#getAppName()}, the
- * local system user name (if available), and the current time. The normalization makes sure that
- * the job name matches the required pattern of [a-z]([-a-z0-9]*[a-z0-9])? and length limit of 40
- * characters.
- *
- * <p>This job name factory is only able to generate one unique name per second per application
- * and user combination.
- */
- public static class JobNameFactory implements DefaultValueFactory<String> {
- private static final DateTimeFormatter FORMATTER =
- DateTimeFormat.forPattern("MMddHHmmss").withZone(DateTimeZone.UTC);
-
- @Override
- public String create(PipelineOptions options) {
- String appName = options.as(ApplicationNameOptions.class).getAppName();
- String normalizedAppName = appName == null || appName.length() == 0 ? "dataflow"
- : appName.toLowerCase()
- .replaceAll("[^a-z0-9]", "0")
- .replaceAll("^[^a-z]", "a");
- String userName = MoreObjects.firstNonNull(System.getProperty("user.name"), "");
- String normalizedUserName = userName.toLowerCase()
- .replaceAll("[^a-z0-9]", "0");
- String datePart = FORMATTER.print(DateTimeUtils.currentTimeMillis());
- return normalizedAppName + "-" + normalizedUserName + "-" + datePart;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineWorkerPoolOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineWorkerPoolOptions.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineWorkerPoolOptions.java
deleted file mode 100644
index 44a9b00..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineWorkerPoolOptions.java
+++ /dev/null
@@ -1,258 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.options;
-
-import org.apache.beam.sdk.annotations.Experimental;
-import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
-
-import com.fasterxml.jackson.annotation.JsonIgnore;
-
-import java.util.List;
-
-/**
- * Options that are used to configure the Dataflow pipeline worker pool.
- */
-@Description("Options that are used to configure the Dataflow pipeline worker pool.")
-public interface DataflowPipelineWorkerPoolOptions extends PipelineOptions {
- /**
- * Number of workers to use when executing the Dataflow job. Note that selection of an autoscaling
- * algorithm other then {@code NONE} will affect the size of the worker pool. If left unspecified,
- * the Dataflow service will determine the number of workers.
- */
- @Description("Number of workers to use when executing the Dataflow job. Note that "
- + "selection of an autoscaling algorithm other then \"NONE\" will affect the "
- + "size of the worker pool. If left unspecified, the Dataflow service will "
- + "determine the number of workers.")
- int getNumWorkers();
- void setNumWorkers(int value);
-
- /**
- * Type of autoscaling algorithm to use.
- */
- @Experimental(Experimental.Kind.AUTOSCALING)
- public enum AutoscalingAlgorithmType {
- /** Use numWorkers machines. Do not autoscale the worker pool. */
- NONE("AUTOSCALING_ALGORITHM_NONE"),
-
- @Deprecated
- BASIC("AUTOSCALING_ALGORITHM_BASIC"),
-
- /** Autoscale the workerpool based on throughput (up to maxNumWorkers). */
- THROUGHPUT_BASED("AUTOSCALING_ALGORITHM_BASIC");
-
- private final String algorithm;
-
- private AutoscalingAlgorithmType(String algorithm) {
- this.algorithm = algorithm;
- }
-
- /** Returns the string representation of this type. */
- public String getAlgorithm() {
- return this.algorithm;
- }
- }
-
- /**
- * [Experimental] The autoscaling algorithm to use for the workerpool.
- *
- * <ul>
- * <li>NONE: does not change the size of the worker pool.</li>
- * <li>BASIC: autoscale the worker pool size up to maxNumWorkers until the job completes.</li>
- * <li>THROUGHPUT_BASED: autoscale the workerpool based on throughput (up to maxNumWorkers).
- * </li>
- * </ul>
- */
- @Description("[Experimental] The autoscaling algorithm to use for the workerpool. "
- + "NONE: does not change the size of the worker pool. "
- + "BASIC (deprecated): autoscale the worker pool size up to maxNumWorkers until the job "
- + "completes. "
- + "THROUGHPUT_BASED: autoscale the workerpool based on throughput (up to maxNumWorkers).")
- @Experimental(Experimental.Kind.AUTOSCALING)
- AutoscalingAlgorithmType getAutoscalingAlgorithm();
- void setAutoscalingAlgorithm(AutoscalingAlgorithmType value);
-
- /**
- * The maximum number of workers to use for the workerpool. This options limits the size of the
- * workerpool for the lifetime of the job, including
- * <a href="https://cloud.google.com/dataflow/pipelines/updating-a-pipeline">pipeline updates</a>.
- * If left unspecified, the Dataflow service will compute a ceiling.
- */
- @Description("The maximum number of workers to use for the workerpool. This options limits the "
- + "size of the workerpool for the lifetime of the job, including pipeline updates. "
- + "If left unspecified, the Dataflow service will compute a ceiling.")
- int getMaxNumWorkers();
- void setMaxNumWorkers(int value);
-
- /**
- * Remote worker disk size, in gigabytes, or 0 to use the default size.
- */
- @Description("Remote worker disk size, in gigabytes, or 0 to use the default size.")
- int getDiskSizeGb();
- void setDiskSizeGb(int value);
-
- /**
- * Docker container image that executes Dataflow worker harness, residing in Google Container
- * Registry.
- */
- @Default.InstanceFactory(WorkerHarnessContainerImageFactory.class)
- @Description("Docker container image that executes Dataflow worker harness, residing in Google "
- + " Container Registry.")
- @Hidden
- String getWorkerHarnessContainerImage();
- void setWorkerHarnessContainerImage(String value);
-
- /**
- * Returns the default Docker container image that executes Dataflow worker harness, residing in
- * Google Container Registry.
- */
- public static class WorkerHarnessContainerImageFactory
- implements DefaultValueFactory<String> {
- @Override
- public String create(PipelineOptions options) {
- DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
- if (dataflowOptions.isStreaming()) {
- return DataflowPipelineRunner.STREAMING_WORKER_HARNESS_CONTAINER_IMAGE;
- } else {
- return DataflowPipelineRunner.BATCH_WORKER_HARNESS_CONTAINER_IMAGE;
- }
- }
- }
-
- /**
- * GCE <a href="https://cloud.google.com/compute/docs/networking">network</a> for launching
- * workers.
- *
- * <p>Default is up to the Dataflow service.
- */
- @Description("GCE network for launching workers. For more information, see the reference "
- + "documentation https://cloud.google.com/compute/docs/networking. "
- + "Default is up to the Dataflow service.")
- String getNetwork();
- void setNetwork(String value);
-
- /**
- * GCE <a href="https://cloud.google.com/compute/docs/networking">subnetwork</a> for launching
- * workers.
- *
- * <p>Default is up to the Dataflow service. Expected format is
- * regions/REGION/subnetworks/SUBNETWORK.
- *
- * <p>You may also need to specify network option.
- */
- @Description("GCE subnetwork for launching workers. For more information, see the reference "
- + "documentation https://cloud.google.com/compute/docs/networking. "
- + "Default is up to the Dataflow service.")
- String getSubnetwork();
- void setSubnetwork(String value);
-
- /**
- * GCE <a href="https://developers.google.com/compute/docs/zones"
- * >availability zone</a> for launching workers.
- *
- * <p>Default is up to the Dataflow service.
- */
- @Description("GCE availability zone for launching workers. See "
- + "https://developers.google.com/compute/docs/zones for a list of valid options. "
- + "Default is up to the Dataflow service.")
- String getZone();
- void setZone(String value);
-
- /**
- * Machine type to create Dataflow worker VMs as.
- *
- * <p>See <a href="https://cloud.google.com/compute/docs/machine-types">GCE machine types</a>
- * for a list of valid options.
- *
- * <p>If unset, the Dataflow service will choose a reasonable default.
- */
- @Description("Machine type to create Dataflow worker VMs as. See "
- + "https://cloud.google.com/compute/docs/machine-types for a list of valid options. "
- + "If unset, the Dataflow service will choose a reasonable default.")
- String getWorkerMachineType();
- void setWorkerMachineType(String value);
-
- /**
- * The policy for tearing down the workers spun up by the service.
- */
- public enum TeardownPolicy {
- /**
- * All VMs created for a Dataflow job are deleted when the job finishes, regardless of whether
- * it fails or succeeds.
- */
- TEARDOWN_ALWAYS("TEARDOWN_ALWAYS"),
- /**
- * All VMs created for a Dataflow job are left running when the job finishes, regardless of
- * whether it fails or succeeds.
- */
- TEARDOWN_NEVER("TEARDOWN_NEVER"),
- /**
- * All VMs created for a Dataflow job are deleted when the job succeeds, but are left running
- * when it fails. (This is typically used for debugging failing jobs by SSHing into the
- * workers.)
- */
- TEARDOWN_ON_SUCCESS("TEARDOWN_ON_SUCCESS");
-
- private final String teardownPolicy;
-
- private TeardownPolicy(String teardownPolicy) {
- this.teardownPolicy = teardownPolicy;
- }
-
- public String getTeardownPolicyName() {
- return this.teardownPolicy;
- }
- }
-
- /**
- * The teardown policy for the VMs.
- *
- * <p>If unset, the Dataflow service will choose a reasonable default.
- */
- @Description("The teardown policy for the VMs. If unset, the Dataflow service will "
- + "choose a reasonable default.")
- TeardownPolicy getTeardownPolicy();
- void setTeardownPolicy(TeardownPolicy value);
-
- /**
- * List of local files to make available to workers.
- *
- * <p>Files are placed on the worker's classpath.
- *
- * <p>The default value is the list of jars from the main program's classpath.
- */
- @Description("Files to stage on GCS and make available to workers. "
- + "Files are placed on the worker's classpath. "
- + "The default value is all files from the classpath.")
- @JsonIgnore
- List<String> getFilesToStage();
- void setFilesToStage(List<String> value);
-
- /**
- * Specifies what type of persistent disk should be used. The value should be a full or partial
- * URL of a disk type resource, e.g., zones/us-central1-f/disks/pd-standard. For
- * more information, see the
- * <a href="https://cloud.google.com/compute/docs/reference/latest/diskTypes">API reference
- * documentation for DiskTypes</a>.
- */
- @Description("Specifies what type of persistent disk should be used. The value should be a full "
- + "or partial URL of a disk type resource, e.g., zones/us-central1-f/disks/pd-standard. For "
- + "more information, see the API reference documentation for DiskTypes: "
- + "https://cloud.google.com/compute/docs/reference/latest/diskTypes")
- String getWorkerDiskType();
- void setWorkerDiskType(String value);
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowProfilingOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowProfilingOptions.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowProfilingOptions.java
deleted file mode 100644
index 3cd7b03..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowProfilingOptions.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.options;
-
-import org.apache.beam.sdk.annotations.Experimental;
-
-import java.util.HashMap;
-
-/**
- * Options for controlling profiling of pipeline execution.
- */
-@Description("[Experimental] Used to configure profiling of the Dataflow pipeline")
-@Experimental
-@Hidden
-public interface DataflowProfilingOptions {
-
- @Description("Whether to periodically dump profiling information to local disk.\n"
- + "WARNING: Enabling this option may fill local disk with profiling information.")
- boolean getEnableProfilingAgent();
- void setEnableProfilingAgent(boolean enabled);
-
- @Description(
- "[INTERNAL] Additional configuration for the profiling agent. Not typically necessary.")
- @Hidden
- DataflowProfilingAgentConfiguration getProfilingAgentConfiguration();
- void setProfilingAgentConfiguration(DataflowProfilingAgentConfiguration configuration);
-
- /**
- * Configuration the for profiling agent.
- */
- public static class DataflowProfilingAgentConfiguration extends HashMap<String, Object> {
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowWorkerHarnessOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowWorkerHarnessOptions.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowWorkerHarnessOptions.java
deleted file mode 100644
index 7705b66..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowWorkerHarnessOptions.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.options;
-
-/**
- * Options that are used exclusively within the Dataflow worker harness.
- * These options have no effect at pipeline creation time.
- */
-@Description("[Internal] Options that are used exclusively within the Dataflow worker harness. "
- + "These options have no effect at pipeline creation time.")
-@Hidden
-public interface DataflowWorkerHarnessOptions extends DataflowPipelineOptions {
- /**
- * The identity of the worker running this pipeline.
- */
- @Description("The identity of the worker running this pipeline.")
- String getWorkerId();
- void setWorkerId(String value);
-
- /**
- * The identity of the Dataflow job.
- */
- @Description("The identity of the Dataflow job.")
- String getJobId();
- void setJobId(String value);
-
- /**
- * The size of the worker's in-memory cache, in megabytes.
- *
- * <p>Currently, this cache is used for storing read values of side inputs.
- */
- @Description("The size of the worker's in-memory cache, in megabytes.")
- @Default.Integer(100)
- Integer getWorkerCacheMb();
- void setWorkerCacheMb(Integer value);
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowWorkerLoggingOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowWorkerLoggingOptions.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowWorkerLoggingOptions.java
deleted file mode 100644
index ebd42d9..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowWorkerLoggingOptions.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.options;
-
-import com.google.common.base.Preconditions;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Options that are used to control logging configuration on the Dataflow worker.
- */
-@Description("Options that are used to control logging configuration on the Dataflow worker.")
-public interface DataflowWorkerLoggingOptions extends PipelineOptions {
- /**
- * The set of log levels that can be used on the Dataflow worker.
- */
- public enum Level {
- DEBUG, ERROR, INFO, TRACE, WARN
- }
-
- /**
- * This option controls the default log level of all loggers without a log level override.
- */
- @Description("Controls the default log level of all loggers without a log level override.")
- @Default.Enum("INFO")
- Level getDefaultWorkerLogLevel();
- void setDefaultWorkerLogLevel(Level level);
-
- /**
- * This option controls the log levels for specifically named loggers.
- *
- * <p>Later options with equivalent names override earlier options.
- *
- * <p>See {@link WorkerLogLevelOverrides} for more information on how to configure logging
- * on a per {@link Class}, {@link Package}, or name basis. If used from the command line,
- * the expected format is {"Name":"Level",...}, further details on
- * {@link WorkerLogLevelOverrides#from}.
- */
- @Description("This option controls the log levels for specifically named loggers. "
- + "The expected format is {\"Name\":\"Level\",...}. The Dataflow worker uses "
- + "java.util.logging, which supports a logging hierarchy based off of names that are '.' "
- + "separated. For example, by specifying the value {\"a.b.c.Foo\":\"DEBUG\"}, the logger "
- + "for the class 'a.b.c.Foo' will be configured to output logs at the DEBUG level. "
- + "Similarly, by specifying the value {\"a.b.c\":\"WARN\"}, all loggers underneath the "
- + "'a.b.c' package will be configured to output logs at the WARN level. Also, note that "
- + "when multiple overrides are specified, the exact name followed by the closest parent "
- + "takes precedence.")
- WorkerLogLevelOverrides getWorkerLogLevelOverrides();
- void setWorkerLogLevelOverrides(WorkerLogLevelOverrides value);
-
- /**
- * Defines a log level override for a specific class, package, or name.
- *
- * <p>{@code java.util.logging} is used on the Dataflow worker harness and supports
- * a logging hierarchy based off of names that are "." separated. It is a common
- * pattern to have the logger for a given class share the same name as the class itself.
- * Given the classes {@code a.b.c.Foo}, {@code a.b.c.Xyz}, and {@code a.b.Bar}, with
- * loggers named {@code "a.b.c.Foo"}, {@code "a.b.c.Xyz"}, and {@code "a.b.Bar"} respectively,
- * we can override the log levels:
- * <ul>
- * <li>for {@code Foo} by specifying the name {@code "a.b.c.Foo"} or the {@link Class}
- * representing {@code a.b.c.Foo}.
- * <li>for {@code Foo}, {@code Xyz}, and {@code Bar} by specifying the name {@code "a.b"} or
- * the {@link Package} representing {@code a.b}.
- * <li>for {@code Foo} and {@code Bar} by specifying both of their names or classes.
- * </ul>
- * Note that by specifying multiple overrides, the exact name followed by the closest parent
- * takes precedence.
- */
- public static class WorkerLogLevelOverrides extends HashMap<String, Level> {
- /**
- * Overrides the default log level for the passed in class.
- *
- * <p>This is equivalent to calling
- * {@link #addOverrideForName(String, DataflowWorkerLoggingOptions.Level)}
- * and passing in the {@link Class#getName() class name}.
- */
- public WorkerLogLevelOverrides addOverrideForClass(Class<?> klass, Level level) {
- Preconditions.checkNotNull(klass, "Expected class to be not null.");
- addOverrideForName(klass.getName(), level);
- return this;
- }
-
- /**
- * Overrides the default log level for the passed in package.
- *
- * <p>This is equivalent to calling
- * {@link #addOverrideForName(String, DataflowWorkerLoggingOptions.Level)}
- * and passing in the {@link Package#getName() package name}.
- */
- public WorkerLogLevelOverrides addOverrideForPackage(Package pkg, Level level) {
- Preconditions.checkNotNull(pkg, "Expected package to be not null.");
- addOverrideForName(pkg.getName(), level);
- return this;
- }
-
- /**
- * Overrides the default log level for the passed in name.
- *
- * <p>Note that because of the hierarchical nature of logger names, this will
- * override the log level of all loggers that have the passed in name or
- * a parent logger that has the passed in name.
- */
- public WorkerLogLevelOverrides addOverrideForName(String name, Level level) {
- Preconditions.checkNotNull(name, "Expected name to be not null.");
- Preconditions.checkNotNull(level,
- "Expected level to be one of %s.", Arrays.toString(Level.values()));
- put(name, level);
- return this;
- }
-
- /**
- * Expects a map keyed by logger {@code Name}s with values representing {@code Level}s.
- * The {@code Name} generally represents the fully qualified Java
- * {@link Class#getName() class name}, or fully qualified Java
- * {@link Package#getName() package name}, or custom logger name. The {@code Level}
- * represents the log level and must be one of {@link Level}.
- */
- @JsonCreator
- public static WorkerLogLevelOverrides from(Map<String, String> values) {
- Preconditions.checkNotNull(values, "Expected values to be not null.");
- WorkerLogLevelOverrides overrides = new WorkerLogLevelOverrides();
- for (Map.Entry<String, String> entry : values.entrySet()) {
- try {
- overrides.addOverrideForName(entry.getKey(), Level.valueOf(entry.getValue()));
- } catch (IllegalArgumentException e) {
- throw new IllegalArgumentException(String.format(
- "Unsupported log level '%s' requested for %s. Must be one of %s.",
- entry.getValue(), entry.getKey(), Arrays.toString(Level.values())));
- }
-
- }
- return overrides;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/BlockingDataflowPipelineRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/BlockingDataflowPipelineRunner.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/BlockingDataflowPipelineRunner.java
deleted file mode 100644
index 9096c2b..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/BlockingDataflowPipelineRunner.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.runners;
-
-import org.apache.beam.sdk.annotations.Experimental;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.PipelineResult.State;
-import com.google.cloud.dataflow.sdk.options.BlockingDataflowPipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsValidator;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.util.MonitoringUtil;
-import com.google.cloud.dataflow.sdk.values.PInput;
-import com.google.cloud.dataflow.sdk.values.POutput;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-
-import javax.annotation.Nullable;
-
-/**
- * A {@link PipelineRunner} that's like {@link DataflowPipelineRunner}
- * but that waits for the launched job to finish.
- *
- * <p>Prints out job status updates and console messages while it waits.
- *
- * <p>Returns the final job state, or throws an exception if the job
- * fails or cannot be monitored.
- *
- * <p><h3>Permissions</h3>
- * When reading from a Dataflow source or writing to a Dataflow sink using
- * {@code BlockingDataflowPipelineRunner}, the Google cloud services account and the Google compute
- * engine service account of the GCP project running the Dataflow Job will need access to the
- * corresponding source/sink.
- *
- * <p>Please see <a href="https://cloud.google.com/dataflow/security-and-permissions">Google Cloud
- * Dataflow Security and Permissions</a> for more details.
- */
-public class BlockingDataflowPipelineRunner extends
- PipelineRunner<DataflowPipelineJob> {
- private static final Logger LOG = LoggerFactory.getLogger(BlockingDataflowPipelineRunner.class);
-
- // Defaults to an infinite wait period.
- // TODO: make this configurable after removal of option map.
- private static final long BUILTIN_JOB_TIMEOUT_SEC = -1L;
-
- private final DataflowPipelineRunner dataflowPipelineRunner;
- private final BlockingDataflowPipelineOptions options;
-
- protected BlockingDataflowPipelineRunner(
- DataflowPipelineRunner internalRunner,
- BlockingDataflowPipelineOptions options) {
- this.dataflowPipelineRunner = internalRunner;
- this.options = options;
- }
-
- /**
- * Constructs a runner from the provided options.
- */
- public static BlockingDataflowPipelineRunner fromOptions(
- PipelineOptions options) {
- BlockingDataflowPipelineOptions dataflowOptions =
- PipelineOptionsValidator.validate(BlockingDataflowPipelineOptions.class, options);
- DataflowPipelineRunner dataflowPipelineRunner =
- DataflowPipelineRunner.fromOptions(dataflowOptions);
-
- return new BlockingDataflowPipelineRunner(dataflowPipelineRunner, dataflowOptions);
- }
-
- /**
- * {@inheritDoc}
- *
- * @throws DataflowJobExecutionException if there is an exception during job execution.
- * @throws DataflowServiceException if there is an exception retrieving information about the job.
- */
- @Override
- public DataflowPipelineJob run(Pipeline p) {
- final DataflowPipelineJob job = dataflowPipelineRunner.run(p);
-
- // We ignore the potential race condition here (Ctrl-C after job submission but before the
- // shutdown hook is registered). Even if we tried to do something smarter (eg., SettableFuture)
- // the run method (which produces the job) could fail or be Ctrl-C'd before it had returned a
- // job. The display of the command to cancel the job is best-effort anyways -- RPC's could fail,
- // etc. If the user wants to verify the job was cancelled they should look at the job status.
- Thread shutdownHook = new Thread() {
- @Override
- public void run() {
- LOG.warn("Job is already running in Google Cloud Platform, Ctrl-C will not cancel it.\n"
- + "To cancel the job in the cloud, run:\n> {}",
- MonitoringUtil.getGcloudCancelCommand(options, job.getJobId()));
- }
- };
-
- try {
- Runtime.getRuntime().addShutdownHook(shutdownHook);
-
- @Nullable
- State result;
- try {
- result = job.waitToFinish(
- BUILTIN_JOB_TIMEOUT_SEC, TimeUnit.SECONDS,
- new MonitoringUtil.PrintHandler(options.getJobMessageOutput()));
- } catch (IOException | InterruptedException ex) {
- if (ex instanceof InterruptedException) {
- Thread.currentThread().interrupt();
- }
- LOG.debug("Exception caught while retrieving status for job {}", job.getJobId(), ex);
- throw new DataflowServiceException(
- job, "Exception caught while retrieving status for job " + job.getJobId(), ex);
- }
-
- if (result == null) {
- throw new DataflowServiceException(
- job, "Timed out while retrieving status for job " + job.getJobId());
- }
-
- LOG.info("Job finished with status {}", result);
- if (!result.isTerminal()) {
- throw new IllegalStateException("Expected terminal state for job " + job.getJobId()
- + ", got " + result);
- }
-
- if (result == State.DONE) {
- return job;
- } else if (result == State.UPDATED) {
- DataflowPipelineJob newJob = job.getReplacedByJob();
- LOG.info("Job {} has been updated and is running as the new job with id {}."
- + "To access the updated job on the Dataflow monitoring console, please navigate to {}",
- job.getJobId(),
- newJob.getJobId(),
- MonitoringUtil.getJobMonitoringPageURL(newJob.getProjectId(), newJob.getJobId()));
- throw new DataflowJobUpdatedException(
- job,
- String.format("Job %s updated; new job is %s.", job.getJobId(), newJob.getJobId()),
- newJob);
- } else if (result == State.CANCELLED) {
- String message = String.format("Job %s cancelled by user", job.getJobId());
- LOG.info(message);
- throw new DataflowJobCancelledException(job, message);
- } else {
- throw new DataflowJobExecutionException(job, "Job " + job.getJobId()
- + " failed with status " + result);
- }
- } finally {
- Runtime.getRuntime().removeShutdownHook(shutdownHook);
- }
- }
-
- @Override
- public <OutputT extends POutput, InputT extends PInput> OutputT apply(
- PTransform<InputT, OutputT> transform, InputT input) {
- return dataflowPipelineRunner.apply(transform, input);
- }
-
- /**
- * Sets callbacks to invoke during execution. See {@link DataflowPipelineRunnerHooks}.
- */
- @Experimental
- public void setHooks(DataflowPipelineRunnerHooks hooks) {
- this.dataflowPipelineRunner.setHooks(hooks);
- }
-
- @Override
- public String toString() {
- return "BlockingDataflowPipelineRunner#" + options.getJobName();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobAlreadyExistsException.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobAlreadyExistsException.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobAlreadyExistsException.java
deleted file mode 100644
index 4a4f100..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobAlreadyExistsException.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.runners;
-
-/**
- * An exception that is thrown if the unique job name constraint of the Dataflow
- * service is broken because an existing job with the same job name is currently active.
- * The {@link DataflowPipelineJob} contained within this exception contains information
- * about the pre-existing job.
- */
-public class DataflowJobAlreadyExistsException extends DataflowJobException {
- /**
- * Create a new {@code DataflowJobAlreadyExistsException} with the specified {@link
- * DataflowPipelineJob} and message.
- */
- public DataflowJobAlreadyExistsException(
- DataflowPipelineJob job, String message) {
- super(job, message, null);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobAlreadyUpdatedException.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobAlreadyUpdatedException.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobAlreadyUpdatedException.java
deleted file mode 100644
index 1f52c6a..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobAlreadyUpdatedException.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.runners;
-
-/**
- * An exception that is thrown if the existing job has already been updated within the Dataflow
- * service and is no longer able to be updated. The {@link DataflowPipelineJob} contained within
- * this exception contains information about the pre-existing updated job.
- */
-public class DataflowJobAlreadyUpdatedException extends DataflowJobException {
- /**
- * Create a new {@code DataflowJobAlreadyUpdatedException} with the specified {@link
- * DataflowPipelineJob} and message.
- */
- public DataflowJobAlreadyUpdatedException(
- DataflowPipelineJob job, String message) {
- super(job, message, null);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobCancelledException.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobCancelledException.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobCancelledException.java
deleted file mode 100644
index 495ca5a..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobCancelledException.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.runners;
-
-/**
- * Signals that a job run by a {@link BlockingDataflowPipelineRunner} was updated during execution.
- */
-public class DataflowJobCancelledException extends DataflowJobException {
- /**
- * Create a new {@code DataflowJobAlreadyUpdatedException} with the specified {@link
- * DataflowPipelineJob} and message.
- */
- public DataflowJobCancelledException(DataflowPipelineJob job, String message) {
- super(job, message, null);
- }
-
- /**
- * Create a new {@code DataflowJobAlreadyUpdatedException} with the specified {@link
- * DataflowPipelineJob}, message, and cause.
- */
- public DataflowJobCancelledException(DataflowPipelineJob job, String message, Throwable cause) {
- super(job, message, cause);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobException.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobException.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobException.java
deleted file mode 100644
index a22d13c..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobException.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.runners;
-
-import java.util.Objects;
-
-import javax.annotation.Nullable;
-
-/**
- * A {@link RuntimeException} that contains information about a {@link DataflowPipelineJob}.
- */
-public abstract class DataflowJobException extends RuntimeException {
- private final DataflowPipelineJob job;
-
- DataflowJobException(DataflowPipelineJob job, String message, @Nullable Throwable cause) {
- super(message, cause);
- this.job = Objects.requireNonNull(job);
- }
-
- /**
- * Returns the failed job.
- */
- public DataflowPipelineJob getJob() {
- return job;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobExecutionException.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobExecutionException.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobExecutionException.java
deleted file mode 100644
index 3dbb007..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobExecutionException.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.runners;
-
-import javax.annotation.Nullable;
-
-/**
- * Signals that a job run by a {@link BlockingDataflowPipelineRunner} fails during execution, and
- * provides access to the failed job.
- */
-public class DataflowJobExecutionException extends DataflowJobException {
- DataflowJobExecutionException(DataflowPipelineJob job, String message) {
- this(job, message, null);
- }
-
- DataflowJobExecutionException(
- DataflowPipelineJob job, String message, @Nullable Throwable cause) {
- super(job, message, cause);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobUpdatedException.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobUpdatedException.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobUpdatedException.java
deleted file mode 100644
index 72deb45..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobUpdatedException.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.runners;
-
-/**
- * Signals that a job run by a {@link BlockingDataflowPipelineRunner} was updated during execution.
- */
-public class DataflowJobUpdatedException extends DataflowJobException {
- private DataflowPipelineJob replacedByJob;
-
- /**
- * Create a new {@code DataflowJobUpdatedException} with the specified original {@link
- * DataflowPipelineJob}, message, and replacement {@link DataflowPipelineJob}.
- */
- public DataflowJobUpdatedException(
- DataflowPipelineJob job, String message, DataflowPipelineJob replacedByJob) {
- this(job, message, replacedByJob, null);
- }
-
- /**
- * Create a new {@code DataflowJobUpdatedException} with the specified original {@link
- * DataflowPipelineJob}, message, replacement {@link DataflowPipelineJob}, and cause.
- */
- public DataflowJobUpdatedException(
- DataflowPipelineJob job, String message, DataflowPipelineJob replacedByJob, Throwable cause) {
- super(job, message, cause);
- this.replacedByJob = replacedByJob;
- }
-
- /**
- * The new job that replaces the job terminated with this exception.
- */
- public DataflowPipelineJob getReplacedByJob() {
- return replacedByJob;
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipeline.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipeline.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipeline.java
deleted file mode 100644
index 7b97c7d..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipeline.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.runners;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-
-/**
- * A {@link DataflowPipeline} is a {@link Pipeline} that returns a
- * {@link DataflowPipelineJob} when it is
- * {@link com.google.cloud.dataflow.sdk.Pipeline#run()}.
- *
- * <p>This is not intended for use by users of Cloud Dataflow.
- * Instead, use {@link Pipeline#create(PipelineOptions)} to initialize a
- * {@link Pipeline}.
- */
-public class DataflowPipeline extends Pipeline {
-
- /**
- * Creates and returns a new {@link DataflowPipeline} instance for tests.
- */
- public static DataflowPipeline create(DataflowPipelineOptions options) {
- return new DataflowPipeline(options);
- }
-
- private DataflowPipeline(DataflowPipelineOptions options) {
- super(DataflowPipelineRunner.fromOptions(options), options);
- }
-
- @Override
- public DataflowPipelineJob run() {
- return (DataflowPipelineJob) super.run();
- }
-
- @Override
- public DataflowPipelineRunner getRunner() {
- return (DataflowPipelineRunner) super.getRunner();
- }
-
- @Override
- public String toString() {
- return "DataflowPipeline#" + getOptions().as(DataflowPipelineOptions.class).getJobName();
- }
-}