You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/04/14 06:48:21 UTC
[34/74] [partial] incubator-beam git commit: Rename
com/google/cloud/dataflow->org/apache/beam
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineRunner.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineRunner.java
deleted file mode 100644
index e961066..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineRunner.java
+++ /dev/null
@@ -1,256 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.testing;
-
-import com.google.api.services.dataflow.model.JobMessage;
-import com.google.api.services.dataflow.model.JobMetrics;
-import com.google.api.services.dataflow.model.MetricUpdate;
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.PipelineResult.State;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.runners.DataflowJobExecutionException;
-import com.google.cloud.dataflow.sdk.runners.DataflowPipelineJob;
-import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
-import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.util.MonitoringUtil;
-import com.google.cloud.dataflow.sdk.util.MonitoringUtil.JobMessagesHandler;
-import com.google.cloud.dataflow.sdk.values.PInput;
-import com.google.cloud.dataflow.sdk.values.POutput;
-import com.google.common.base.Optional;
-import com.google.common.base.Throwables;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-/**
- * {@link TestDataflowPipelineRunner} is a pipeline runner that wraps a
- * {@link DataflowPipelineRunner} when running tests against the {@link TestPipeline}.
- *
- * @see TestPipeline
- */
-public class TestDataflowPipelineRunner extends PipelineRunner<DataflowPipelineJob> {
- private static final String TENTATIVE_COUNTER = "tentative";
- private static final Logger LOG = LoggerFactory.getLogger(TestDataflowPipelineRunner.class);
-
- private final TestDataflowPipelineOptions options;
- private final DataflowPipelineRunner runner;
- private int expectedNumberOfAssertions = 0;
-
- TestDataflowPipelineRunner(TestDataflowPipelineOptions options) {
- this.options = options;
- this.runner = DataflowPipelineRunner.fromOptions(options);
- }
-
- /**
- * Constructs a runner from the provided options.
- */
- public static TestDataflowPipelineRunner fromOptions(
- PipelineOptions options) {
- TestDataflowPipelineOptions dataflowOptions = options.as(TestDataflowPipelineOptions.class);
-
- return new TestDataflowPipelineRunner(dataflowOptions);
- }
-
- @Override
- public DataflowPipelineJob run(Pipeline pipeline) {
- return run(pipeline, runner);
- }
-
- DataflowPipelineJob run(Pipeline pipeline, DataflowPipelineRunner runner) {
-
- final DataflowPipelineJob job;
- try {
- job = runner.run(pipeline);
- } catch (DataflowJobExecutionException ex) {
- throw new IllegalStateException("The dataflow failed.");
- }
-
- LOG.info("Running Dataflow job {} with {} expected assertions.",
- job.getJobId(), expectedNumberOfAssertions);
-
- CancelWorkflowOnError messageHandler = new CancelWorkflowOnError(
- job, new MonitoringUtil.PrintHandler(options.getJobMessageOutput()));
-
- try {
- final Optional<Boolean> result;
-
- if (options.isStreaming()) {
- Future<Optional<Boolean>> resultFuture = options.getExecutorService().submit(
- new Callable<Optional<Boolean>>() {
- @Override
- public Optional<Boolean> call() throws Exception {
- try {
- for (;;) {
- Optional<Boolean> result = checkForSuccess(job);
- if (result.isPresent()) {
- return result;
- }
- Thread.sleep(10000L);
- }
- } finally {
- LOG.info("Cancelling Dataflow job {}", job.getJobId());
- job.cancel();
- }
- }
- });
- State finalState = job.waitToFinish(10L, TimeUnit.MINUTES, messageHandler);
- if (finalState == null || finalState == State.RUNNING) {
- LOG.info("Dataflow job {} took longer than 10 minutes to complete, cancelling.",
- job.getJobId());
- job.cancel();
- }
- result = resultFuture.get();
- } else {
- job.waitToFinish(-1, TimeUnit.SECONDS, messageHandler);
- result = checkForSuccess(job);
- }
- if (!result.isPresent()) {
- throw new IllegalStateException(
- "The dataflow did not output a success or failure metric.");
- } else if (!result.get()) {
- throw new AssertionError(messageHandler.getErrorMessage() == null ?
- "The dataflow did not return a failure reason."
- : messageHandler.getErrorMessage());
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException(e);
- } catch (ExecutionException e) {
- Throwables.propagateIfPossible(e.getCause());
- throw new RuntimeException(e.getCause());
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- return job;
- }
-
- @Override
- public <OutputT extends POutput, InputT extends PInput> OutputT apply(
- PTransform<InputT, OutputT> transform, InputT input) {
- if (transform instanceof PAssert.OneSideInputAssert
- || transform instanceof PAssert.TwoSideInputAssert) {
- expectedNumberOfAssertions += 1;
- }
-
- return runner.apply(transform, input);
- }
-
- Optional<Boolean> checkForSuccess(DataflowPipelineJob job)
- throws IOException {
- State state = job.getState();
- if (state == State.FAILED || state == State.CANCELLED) {
- LOG.info("The pipeline failed");
- return Optional.of(false);
- }
-
- JobMetrics metrics = job.getDataflowClient().projects().jobs()
- .getMetrics(job.getProjectId(), job.getJobId()).execute();
-
- if (metrics == null || metrics.getMetrics() == null) {
- LOG.warn("Metrics not present for Dataflow job {}.", job.getJobId());
- } else {
- int successes = 0;
- int failures = 0;
- for (MetricUpdate metric : metrics.getMetrics()) {
- if (metric.getName() == null || metric.getName().getContext() == null
- || !metric.getName().getContext().containsKey(TENTATIVE_COUNTER)) {
- // Don't double count using the non-tentative version of the metric.
- continue;
- }
- if (PAssert.SUCCESS_COUNTER.equals(metric.getName().getName())) {
- successes += ((BigDecimal) metric.getScalar()).intValue();
- } else if (PAssert.FAILURE_COUNTER.equals(metric.getName().getName())) {
- failures += ((BigDecimal) metric.getScalar()).intValue();
- }
- }
-
- if (failures > 0) {
- LOG.info("Found result while running Dataflow job {}. Found {} success, {} failures out of "
- + "{} expected assertions.", job.getJobId(), successes, failures,
- expectedNumberOfAssertions);
- return Optional.of(false);
- } else if (successes >= expectedNumberOfAssertions) {
- LOG.info("Found result while running Dataflow job {}. Found {} success, {} failures out of "
- + "{} expected assertions.", job.getJobId(), successes, failures,
- expectedNumberOfAssertions);
- return Optional.of(true);
- }
-
- LOG.info("Running Dataflow job {}. Found {} success, {} failures out of {} expected "
- + "assertions.", job.getJobId(), successes, failures, expectedNumberOfAssertions);
- }
-
- return Optional.<Boolean>absent();
- }
-
- @Override
- public String toString() {
- return "TestDataflowPipelineRunner#" + options.getAppName();
- }
-
- /**
- * Cancels the workflow on the first error message it sees.
- *
- * <p>Creates an error message representing the concatenation of all error messages seen.
- */
- private static class CancelWorkflowOnError implements JobMessagesHandler {
- private final DataflowPipelineJob job;
- private final JobMessagesHandler messageHandler;
- private final StringBuffer errorMessage;
- private CancelWorkflowOnError(DataflowPipelineJob job, JobMessagesHandler messageHandler) {
- this.job = job;
- this.messageHandler = messageHandler;
- this.errorMessage = new StringBuffer();
- }
-
- @Override
- public void process(List<JobMessage> messages) {
- messageHandler.process(messages);
- for (JobMessage message : messages) {
- if (message.getMessageImportance() != null
- && message.getMessageImportance().equals("JOB_MESSAGE_ERROR")) {
- LOG.info("Dataflow job {} threw exception. Failure message was: {}",
- job.getJobId(), message.getMessageText());
- errorMessage.append(message.getMessageText());
- }
- }
- if (errorMessage.length() > 0) {
- LOG.info("Cancelling Dataflow job {}", job.getJobId());
- try {
- job.cancel();
- } catch (Exception ignore) {
- // The TestDataflowPipelineRunner will thrown an AssertionError with the job failure
- // messages.
- }
- }
- }
-
- private String getErrorMessage() {
- return errorMessage.toString();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/util/DataflowPathValidator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/util/DataflowPathValidator.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/util/DataflowPathValidator.java
deleted file mode 100644
index aeb864a..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/util/DataflowPathValidator.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath;
-import com.google.common.base.Preconditions;
-
-import java.io.IOException;
-
-/**
- * GCP implementation of {@link PathValidator}. Only GCS paths are allowed.
- */
-public class DataflowPathValidator implements PathValidator {
-
- private DataflowPipelineOptions dataflowOptions;
-
- DataflowPathValidator(DataflowPipelineOptions options) {
- this.dataflowOptions = options;
- }
-
- public static DataflowPathValidator fromOptions(PipelineOptions options) {
- return new DataflowPathValidator(options.as(DataflowPipelineOptions.class));
- }
-
- /**
- * Validates the the input GCS path is accessible and that the path
- * is well formed.
- */
- @Override
- public String validateInputFilePatternSupported(String filepattern) {
- GcsPath gcsPath = getGcsPath(filepattern);
- Preconditions.checkArgument(
- dataflowOptions.getGcsUtil().isGcsPatternSupported(gcsPath.getObject()));
- String returnValue = verifyPath(filepattern);
- verifyPathIsAccessible(filepattern, "Could not find file %s");
- return returnValue;
- }
-
- /**
- * Validates the the output GCS path is accessible and that the path
- * is well formed.
- */
- @Override
- public String validateOutputFilePrefixSupported(String filePrefix) {
- String returnValue = verifyPath(filePrefix);
- verifyPathIsAccessible(filePrefix, "Output path does not exist or is not writeable: %s");
- return returnValue;
- }
-
- @Override
- public String verifyPath(String path) {
- GcsPath gcsPath = getGcsPath(path);
- Preconditions.checkArgument(gcsPath.isAbsolute(),
- "Must provide absolute paths for Dataflow");
- Preconditions.checkArgument(!gcsPath.getObject().contains("//"),
- "Dataflow Service does not allow objects with consecutive slashes");
- return gcsPath.toResourceName();
- }
-
- private void verifyPathIsAccessible(String path, String errorMessage) {
- GcsPath gcsPath = getGcsPath(path);
- try {
- Preconditions.checkArgument(dataflowOptions.getGcsUtil().bucketExists(gcsPath),
- errorMessage, path);
- } catch (IOException e) {
- throw new RuntimeException(
- String.format("Unable to verify that GCS bucket gs://%s exists.", gcsPath.getBucket()),
- e);
- }
- }
-
- private GcsPath getGcsPath(String path) {
- try {
- return GcsPath.fromUri(path);
- } catch (IllegalArgumentException e) {
- throw new IllegalArgumentException(String.format(
- "%s expected a valid 'gs://' path but was given '%s'",
- dataflowOptions.getRunner().getSimpleName(), path), e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/util/DataflowTransport.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/util/DataflowTransport.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/util/DataflowTransport.java
deleted file mode 100644
index 18e6654..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/util/DataflowTransport.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.util;
-
-import static com.google.cloud.dataflow.sdk.util.Transport.getJsonFactory;
-import static com.google.cloud.dataflow.sdk.util.Transport.getTransport;
-
-import com.google.api.client.auth.oauth2.Credential;
-import com.google.api.client.http.HttpRequestInitializer;
-import com.google.api.services.clouddebugger.v2.Clouddebugger;
-import com.google.api.services.dataflow.Dataflow;
-import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
-import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer;
-import com.google.common.collect.ImmutableList;
-
-import java.net.MalformedURLException;
-import java.net.URL;
-
-/**
- * Helpers for cloud communication.
- */
-public class DataflowTransport {
-
-
- private static class ApiComponents {
- public String rootUrl;
- public String servicePath;
-
- public ApiComponents(String root, String path) {
- this.rootUrl = root;
- this.servicePath = path;
- }
- }
-
- private static ApiComponents apiComponentsFromUrl(String urlString) {
- try {
- URL url = new URL(urlString);
- String rootUrl = url.getProtocol() + "://" + url.getHost() +
- (url.getPort() > 0 ? ":" + url.getPort() : "");
- return new ApiComponents(rootUrl, url.getPath());
- } catch (MalformedURLException e) {
- throw new RuntimeException("Invalid URL: " + urlString);
- }
- }
-
- /**
- * Returns a Google Cloud Dataflow client builder.
- */
- public static Dataflow.Builder newDataflowClient(DataflowPipelineOptions options) {
- String servicePath = options.getDataflowEndpoint();
- ApiComponents components;
- if (servicePath.contains("://")) {
- components = apiComponentsFromUrl(servicePath);
- } else {
- components = new ApiComponents(options.getApiRootUrl(), servicePath);
- }
-
- return new Dataflow.Builder(getTransport(),
- getJsonFactory(),
- chainHttpRequestInitializer(
- options.getGcpCredential(),
- // Do not log 404. It clutters the output and is possibly even required by the caller.
- new RetryHttpRequestInitializer(ImmutableList.of(404))))
- .setApplicationName(options.getAppName())
- .setRootUrl(components.rootUrl)
- .setServicePath(components.servicePath)
- .setGoogleClientRequestInitializer(options.getGoogleApiTrace());
- }
-
- public static Clouddebugger.Builder newClouddebuggerClient(DataflowPipelineOptions options) {
- return new Clouddebugger.Builder(getTransport(),
- getJsonFactory(),
- chainHttpRequestInitializer(options.getGcpCredential(), new RetryHttpRequestInitializer()))
- .setApplicationName(options.getAppName())
- .setGoogleClientRequestInitializer(options.getGoogleApiTrace());
- }
-
- /**
- * Returns a Dataflow client that does not automatically retry failed
- * requests.
- */
- public static Dataflow.Builder
- newRawDataflowClient(DataflowPipelineOptions options) {
- return newDataflowClient(options)
- .setHttpRequestInitializer(options.getGcpCredential())
- .setGoogleClientRequestInitializer(options.getGoogleApiTrace());
- }
-
- private static HttpRequestInitializer chainHttpRequestInitializer(
- Credential credential, HttpRequestInitializer httpRequestInitializer) {
- if (credential == null) {
- return httpRequestInitializer;
- } else {
- return new ChainingHttpRequestInitializer(credential, httpRequestInitializer);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/util/GcsStager.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/util/GcsStager.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/util/GcsStager.java
deleted file mode 100644
index 7307e83..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/util/GcsStager.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.api.services.dataflow.model.DataflowPackage;
-import com.google.cloud.dataflow.sdk.options.DataflowPipelineDebugOptions;
-import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.common.base.Preconditions;
-
-import java.util.List;
-
-/**
- * Utility class for staging files to GCS.
- */
-public class GcsStager implements Stager {
- private DataflowPipelineOptions options;
-
- private GcsStager(DataflowPipelineOptions options) {
- this.options = options;
- }
-
- public static GcsStager fromOptions(PipelineOptions options) {
- return new GcsStager(options.as(DataflowPipelineOptions.class));
- }
-
- @Override
- public List<DataflowPackage> stageFiles() {
- Preconditions.checkNotNull(options.getStagingLocation());
- List<String> filesToStage = options.getFilesToStage();
- String windmillBinary =
- options.as(DataflowPipelineDebugOptions.class).getOverrideWindmillBinary();
- if (windmillBinary != null) {
- filesToStage.add("windmill_main=" + windmillBinary);
- }
- return PackageUtil.stageClasspathElements(
- options.getFilesToStage(), options.getStagingLocation());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/util/MonitoringUtil.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/util/MonitoringUtil.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/util/MonitoringUtil.java
deleted file mode 100644
index 2c06a92..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/util/MonitoringUtil.java
+++ /dev/null
@@ -1,235 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.util;
-
-import static com.google.cloud.dataflow.sdk.util.TimeUtil.fromCloudTime;
-
-import com.google.api.services.dataflow.Dataflow;
-import com.google.api.services.dataflow.Dataflow.Projects.Jobs.Messages;
-import com.google.api.services.dataflow.model.JobMessage;
-import com.google.api.services.dataflow.model.ListJobMessagesResponse;
-import com.google.cloud.dataflow.sdk.PipelineResult.State;
-import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
-import com.google.common.base.MoreObjects;
-import com.google.common.collect.ImmutableMap;
-
-import org.joda.time.Instant;
-
-import java.io.IOException;
-import java.io.PrintStream;
-import java.io.UnsupportedEncodingException;
-import java.net.URLEncoder;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-
-import javax.annotation.Nullable;
-
-/**
- * A helper class for monitoring jobs submitted to the service.
- */
-public final class MonitoringUtil {
-
- private static final String GCLOUD_DATAFLOW_PREFIX = "gcloud alpha dataflow";
- private static final String ENDPOINT_OVERRIDE_ENV_VAR =
- "CLOUDSDK_API_ENDPOINT_OVERRIDES_DATAFLOW";
-
- private static final Map<String, State> DATAFLOW_STATE_TO_JOB_STATE =
- ImmutableMap
- .<String, State>builder()
- .put("JOB_STATE_UNKNOWN", State.UNKNOWN)
- .put("JOB_STATE_STOPPED", State.STOPPED)
- .put("JOB_STATE_RUNNING", State.RUNNING)
- .put("JOB_STATE_DONE", State.DONE)
- .put("JOB_STATE_FAILED", State.FAILED)
- .put("JOB_STATE_CANCELLED", State.CANCELLED)
- .put("JOB_STATE_UPDATED", State.UPDATED)
- .build();
-
- private String projectId;
- private Messages messagesClient;
-
- /**
- * An interface that can be used for defining callbacks to receive a list
- * of JobMessages containing monitoring information.
- */
- public interface JobMessagesHandler {
- /** Process the rows. */
- void process(List<JobMessage> messages);
- }
-
- /** A handler that prints monitoring messages to a stream. */
- public static class PrintHandler implements JobMessagesHandler {
- private PrintStream out;
-
- /**
- * Construct the handler.
- *
- * @param stream The stream to write the messages to.
- */
- public PrintHandler(PrintStream stream) {
- out = stream;
- }
-
- @Override
- public void process(List<JobMessage> messages) {
- for (JobMessage message : messages) {
- if (message.getMessageText() == null || message.getMessageText().isEmpty()) {
- continue;
- }
- String importanceString = null;
- if (message.getMessageImportance() == null) {
- continue;
- } else if (message.getMessageImportance().equals("JOB_MESSAGE_ERROR")) {
- importanceString = "Error: ";
- } else if (message.getMessageImportance().equals("JOB_MESSAGE_WARNING")) {
- importanceString = "Warning: ";
- } else if (message.getMessageImportance().equals("JOB_MESSAGE_BASIC")) {
- importanceString = "Basic: ";
- } else if (message.getMessageImportance().equals("JOB_MESSAGE_DETAILED")) {
- importanceString = "Detail: ";
- } else {
- // TODO: Remove filtering here once getJobMessages supports minimum
- // importance.
- continue;
- }
- @Nullable Instant time = TimeUtil.fromCloudTime(message.getTime());
- if (time == null) {
- out.print("UNKNOWN TIMESTAMP: ");
- } else {
- out.print(time + ": ");
- }
- if (importanceString != null) {
- out.print(importanceString);
- }
- out.println(message.getMessageText());
- }
- out.flush();
- }
- }
-
- /** Construct a helper for monitoring. */
- public MonitoringUtil(String projectId, Dataflow dataflow) {
- this(projectId, dataflow.projects().jobs().messages());
- }
-
- // @VisibleForTesting
- MonitoringUtil(String projectId, Messages messagesClient) {
- this.projectId = projectId;
- this.messagesClient = messagesClient;
- }
-
- /**
- * Comparator for sorting rows in increasing order based on timestamp.
- */
- public static class TimeStampComparator implements Comparator<JobMessage> {
- @Override
- public int compare(JobMessage o1, JobMessage o2) {
- @Nullable Instant t1 = fromCloudTime(o1.getTime());
- if (t1 == null) {
- return -1;
- }
- @Nullable Instant t2 = fromCloudTime(o2.getTime());
- if (t2 == null) {
- return 1;
- }
- return t1.compareTo(t2);
- }
- }
-
- /**
- * Return job messages sorted in ascending order by timestamp.
- * @param jobId The id of the job to get the messages for.
- * @param startTimestampMs Return only those messages with a
- * timestamp greater than this value.
- * @return collection of messages
- * @throws IOException
- */
- public ArrayList<JobMessage> getJobMessages(
- String jobId, long startTimestampMs) throws IOException {
- // TODO: Allow filtering messages by importance
- Instant startTimestamp = new Instant(startTimestampMs);
- ArrayList<JobMessage> allMessages = new ArrayList<>();
- String pageToken = null;
- while (true) {
- Messages.List listRequest = messagesClient.list(projectId, jobId);
- if (pageToken != null) {
- listRequest.setPageToken(pageToken);
- }
- ListJobMessagesResponse response = listRequest.execute();
-
- if (response == null || response.getJobMessages() == null) {
- return allMessages;
- }
-
- for (JobMessage m : response.getJobMessages()) {
- @Nullable Instant timestamp = fromCloudTime(m.getTime());
- if (timestamp == null) {
- continue;
- }
- if (timestamp.isAfter(startTimestamp)) {
- allMessages.add(m);
- }
- }
-
- if (response.getNextPageToken() == null) {
- break;
- } else {
- pageToken = response.getNextPageToken();
- }
- }
-
- Collections.sort(allMessages, new TimeStampComparator());
- return allMessages;
- }
-
- public static String getJobMonitoringPageURL(String projectName, String jobId) {
- try {
- // Project name is allowed in place of the project id: the user will be redirected to a URL
- // that has the project name replaced with project id.
- return String.format(
- "https://console.developers.google.com/project/%s/dataflow/job/%s",
- URLEncoder.encode(projectName, "UTF-8"),
- URLEncoder.encode(jobId, "UTF-8"));
- } catch (UnsupportedEncodingException e) {
- // Should never happen.
- throw new AssertionError("UTF-8 encoding is not supported by the environment", e);
- }
- }
-
- public static String getGcloudCancelCommand(DataflowPipelineOptions options, String jobId) {
-
- // If using a different Dataflow API than default, prefix command with an API override.
- String dataflowApiOverridePrefix = "";
- String apiUrl = options.getDataflowClient().getBaseUrl();
- if (!apiUrl.equals(Dataflow.DEFAULT_BASE_URL)) {
- dataflowApiOverridePrefix = String.format("%s=%s ", ENDPOINT_OVERRIDE_ENV_VAR, apiUrl);
- }
-
- // Assemble cancel command from optional prefix and project/job parameters.
- return String.format("%s%s jobs --project=%s cancel %s",
- dataflowApiOverridePrefix, GCLOUD_DATAFLOW_PREFIX, options.getProject(), jobId);
- }
-
- public static State toState(String stateName) {
- return MoreObjects.firstNonNull(DATAFLOW_STATE_TO_JOB_STATE.get(stateName),
- State.UNKNOWN);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/util/PackageUtil.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/util/PackageUtil.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/util/PackageUtil.java
deleted file mode 100644
index 0e234a8..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/util/PackageUtil.java
+++ /dev/null
@@ -1,328 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.api.client.util.BackOffUtils;
-import com.google.api.client.util.Sleeper;
-import com.google.api.services.dataflow.model.DataflowPackage;
-import com.google.cloud.hadoop.util.ApiErrorExtractor;
-import com.google.common.hash.Funnels;
-import com.google.common.hash.Hasher;
-import com.google.common.hash.Hashing;
-import com.google.common.io.CountingOutputStream;
-import com.google.common.io.Files;
-
-import com.fasterxml.jackson.core.Base64Variants;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.channels.Channels;
-import java.nio.channels.WritableByteChannel;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Objects;
-
-/** Helper routines for packages. */
-public class PackageUtil {
- private static final Logger LOG = LoggerFactory.getLogger(PackageUtil.class);
- /**
- * A reasonable upper bound on the number of jars required to launch a Dataflow job.
- */
- public static final int SANE_CLASSPATH_SIZE = 1000;
- /**
- * The initial interval to use between package staging attempts.
- */
- private static final long INITIAL_BACKOFF_INTERVAL_MS = 5000L;
- /**
- * The maximum number of attempts when staging a file.
- */
- private static final int MAX_ATTEMPTS = 5;
-
- /**
- * Translates exceptions from API calls.
- */
- private static final ApiErrorExtractor ERROR_EXTRACTOR = new ApiErrorExtractor();
-
- /**
- * Creates a DataflowPackage containing information about how a classpath element should be
- * staged, including the staging destination as well as its size and hash.
- *
- * @param classpathElement The local path for the classpath element.
- * @param stagingPath The base location for staged classpath elements.
- * @param overridePackageName If non-null, use the given value as the package name
- * instead of generating one automatically.
- * @return The package.
- */
- @Deprecated
- public static DataflowPackage createPackage(File classpathElement,
- String stagingPath, String overridePackageName) {
- return createPackageAttributes(classpathElement, stagingPath, overridePackageName)
- .getDataflowPackage();
- }
-
- /**
- * Compute and cache the attributes of a classpath element that we will need to stage it.
- *
- * @param classpathElement the file or directory to be staged.
- * @param stagingPath The base location for staged classpath elements.
- * @param overridePackageName If non-null, use the given value as the package name
- * instead of generating one automatically.
- * @return a {@link PackageAttributes} that containing metadata about the object to be staged.
- */
- static PackageAttributes createPackageAttributes(File classpathElement,
- String stagingPath, String overridePackageName) {
- try {
- boolean directory = classpathElement.isDirectory();
-
- // Compute size and hash in one pass over file or directory.
- Hasher hasher = Hashing.md5().newHasher();
- OutputStream hashStream = Funnels.asOutputStream(hasher);
- CountingOutputStream countingOutputStream = new CountingOutputStream(hashStream);
-
- if (!directory) {
- // Files are staged as-is.
- Files.asByteSource(classpathElement).copyTo(countingOutputStream);
- } else {
- // Directories are recursively zipped.
- ZipFiles.zipDirectory(classpathElement, countingOutputStream);
- }
-
- long size = countingOutputStream.getCount();
- String hash = Base64Variants.MODIFIED_FOR_URL.encode(hasher.hash().asBytes());
-
- // Create the DataflowPackage with staging name and location.
- String uniqueName = getUniqueContentName(classpathElement, hash);
- String resourcePath = IOChannelUtils.resolve(stagingPath, uniqueName);
- DataflowPackage target = new DataflowPackage();
- target.setName(overridePackageName != null ? overridePackageName : uniqueName);
- target.setLocation(resourcePath);
-
- return new PackageAttributes(size, hash, directory, target);
- } catch (IOException e) {
- throw new RuntimeException("Package setup failure for " + classpathElement, e);
- }
- }
-
- /**
- * Transfers the classpath elements to the staging location.
- *
- * @param classpathElements The elements to stage.
- * @param stagingPath The base location to stage the elements to.
- * @return A list of cloud workflow packages, each representing a classpath element.
- */
- public static List<DataflowPackage> stageClasspathElements(
- Collection<String> classpathElements, String stagingPath) {
- return stageClasspathElements(classpathElements, stagingPath, Sleeper.DEFAULT);
- }
-
- // Visible for testing.
- static List<DataflowPackage> stageClasspathElements(
- Collection<String> classpathElements, String stagingPath,
- Sleeper retrySleeper) {
- LOG.info("Uploading {} files from PipelineOptions.filesToStage to staging location to "
- + "prepare for execution.", classpathElements.size());
-
- if (classpathElements.size() > SANE_CLASSPATH_SIZE) {
- LOG.warn("Your classpath contains {} elements, which Google Cloud Dataflow automatically "
- + "copies to all workers. Having this many entries on your classpath may be indicative "
- + "of an issue in your pipeline. You may want to consider trimming the classpath to "
- + "necessary dependencies only, using --filesToStage pipeline option to override "
- + "what files are being staged, or bundling several dependencies into one.",
- classpathElements.size());
- }
-
- ArrayList<DataflowPackage> packages = new ArrayList<>();
-
- if (stagingPath == null) {
- throw new IllegalArgumentException(
- "Can't stage classpath elements on because no staging location has been provided");
- }
-
- int numUploaded = 0;
- int numCached = 0;
- for (String classpathElement : classpathElements) {
- String packageName = null;
- if (classpathElement.contains("=")) {
- String[] components = classpathElement.split("=", 2);
- packageName = components[0];
- classpathElement = components[1];
- }
-
- File file = new File(classpathElement);
- if (!file.exists()) {
- LOG.warn("Skipping non-existent classpath element {} that was specified.",
- classpathElement);
- continue;
- }
-
- PackageAttributes attributes = createPackageAttributes(file, stagingPath, packageName);
-
- DataflowPackage workflowPackage = attributes.getDataflowPackage();
- packages.add(workflowPackage);
- String target = workflowPackage.getLocation();
-
- // TODO: Should we attempt to detect the Mime type rather than
- // always using MimeTypes.BINARY?
- try {
- try {
- long remoteLength = IOChannelUtils.getSizeBytes(target);
- if (remoteLength == attributes.getSize()) {
- LOG.debug("Skipping classpath element already staged: {} at {}",
- classpathElement, target);
- numCached++;
- continue;
- }
- } catch (FileNotFoundException expected) {
- // If the file doesn't exist, it means we need to upload it.
- }
-
- // Upload file, retrying on failure.
- AttemptBoundedExponentialBackOff backoff = new AttemptBoundedExponentialBackOff(
- MAX_ATTEMPTS,
- INITIAL_BACKOFF_INTERVAL_MS);
- while (true) {
- try {
- LOG.debug("Uploading classpath element {} to {}", classpathElement, target);
- try (WritableByteChannel writer = IOChannelUtils.create(target, MimeTypes.BINARY)) {
- copyContent(classpathElement, writer);
- }
- numUploaded++;
- break;
- } catch (IOException e) {
- if (ERROR_EXTRACTOR.accessDenied(e)) {
- String errorMessage = String.format(
- "Uploaded failed due to permissions error, will NOT retry staging "
- + "of classpath %s. Please verify credentials are valid and that you have "
- + "write access to %s. Stale credentials can be resolved by executing "
- + "'gcloud auth login'.", classpathElement, target);
- LOG.error(errorMessage);
- throw new IOException(errorMessage, e);
- } else if (!backoff.atMaxAttempts()) {
- LOG.warn("Upload attempt failed, sleeping before retrying staging of classpath: {}",
- classpathElement, e);
- BackOffUtils.next(retrySleeper, backoff);
- } else {
- // Rethrow last error, to be included as a cause in the catch below.
- LOG.error("Upload failed, will NOT retry staging of classpath: {}",
- classpathElement, e);
- throw e;
- }
- }
- }
- } catch (Exception e) {
- throw new RuntimeException("Could not stage classpath element: " + classpathElement, e);
- }
- }
-
- LOG.info("Uploading PipelineOptions.filesToStage complete: {} files newly uploaded, "
- + "{} files cached",
- numUploaded, numCached);
-
- return packages;
- }
-
- /**
- * Returns a unique name for a file with a given content hash.
- *
- * <p>Directory paths are removed. Example:
- * <pre>
- * dir="a/b/c/d", contentHash="f000" => d-f000.jar
- * file="a/b/c/d.txt", contentHash="f000" => d-f000.txt
- * file="a/b/c/d", contentHash="f000" => d-f000
- * </pre>
- */
- static String getUniqueContentName(File classpathElement, String contentHash) {
- String fileName = Files.getNameWithoutExtension(classpathElement.getAbsolutePath());
- String fileExtension = Files.getFileExtension(classpathElement.getAbsolutePath());
- if (classpathElement.isDirectory()) {
- return fileName + "-" + contentHash + ".jar";
- } else if (fileExtension.isEmpty()) {
- return fileName + "-" + contentHash;
- }
- return fileName + "-" + contentHash + "." + fileExtension;
- }
-
- /**
- * Copies the contents of the classpathElement to the output channel.
- *
- * <p>If the classpathElement is a directory, a Zip stream is constructed on the fly,
- * otherwise the file contents are copied as-is.
- *
- * <p>The output channel is not closed.
- */
- private static void copyContent(String classpathElement, WritableByteChannel outputChannel)
- throws IOException {
- final File classpathElementFile = new File(classpathElement);
- if (classpathElementFile.isDirectory()) {
- ZipFiles.zipDirectory(classpathElementFile, Channels.newOutputStream(outputChannel));
- } else {
- Files.asByteSource(classpathElementFile).copyTo(Channels.newOutputStream(outputChannel));
- }
- }
- /**
- * Holds the metadata necessary to stage a file or confirm that a staged file has not changed.
- */
- static class PackageAttributes {
- private final boolean directory;
- private final long size;
- private final String hash;
- private DataflowPackage dataflowPackage;
-
- public PackageAttributes(long size, String hash, boolean directory,
- DataflowPackage dataflowPackage) {
- this.size = size;
- this.hash = Objects.requireNonNull(hash, "hash");
- this.directory = directory;
- this.dataflowPackage = Objects.requireNonNull(dataflowPackage, "dataflowPackage");
- }
-
- /**
- * @return the dataflowPackage
- */
- public DataflowPackage getDataflowPackage() {
- return dataflowPackage;
- }
-
- /**
- * @return the directory
- */
- public boolean isDirectory() {
- return directory;
- }
-
- /**
- * @return the size
- */
- public long getSize() {
- return size;
- }
-
- /**
- * @return the hash
- */
- public String getHash() {
- return hash;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/util/Stager.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/util/Stager.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/util/Stager.java
deleted file mode 100644
index f6c6a71..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/util/Stager.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.api.services.dataflow.model.DataflowPackage;
-
-import java.util.List;
-
-/**
- * Interface for staging files needed for running a Dataflow pipeline.
- */
-public interface Stager {
- /* Stage files and return a list of packages. */
- public List<DataflowPackage> stageFiles();
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/BlockingDataflowPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/BlockingDataflowPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/BlockingDataflowPipelineOptions.java
new file mode 100644
index 0000000..6bbafdd
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/BlockingDataflowPipelineOptions.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.options;
+
+import com.google.cloud.dataflow.sdk.runners.BlockingDataflowPipelineRunner;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+import java.io.PrintStream;
+
+/**
+ * Options that are used to configure the {@link BlockingDataflowPipelineRunner}.
+ */
+@Description("Configure options on the BlockingDataflowPipelineRunner.")
+public interface BlockingDataflowPipelineOptions extends DataflowPipelineOptions {
+ /**
+ * Output stream for job status messages.
+ */
+ @Description("Where messages generated during execution of the Dataflow job will be output.")
+ @JsonIgnore
+ @Hidden
+ @Default.InstanceFactory(StandardOutputFactory.class)
+ PrintStream getJobMessageOutput();
+ void setJobMessageOutput(PrintStream value);
+
+ /**
+ * Returns a default of {@link System#out}.
+ */
+ public static class StandardOutputFactory implements DefaultValueFactory<PrintStream> {
+ @Override
+ public PrintStream create(PipelineOptions options) {
+ return System.out;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/CloudDebuggerOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/CloudDebuggerOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/CloudDebuggerOptions.java
new file mode 100644
index 0000000..3f0503e
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/CloudDebuggerOptions.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.options;
+
+import org.apache.beam.sdk.annotations.Experimental;
+
+import com.google.api.services.clouddebugger.v2.model.Debuggee;
+
+import javax.annotation.Nullable;
+
+/**
+ * Options for controlling Cloud Debugger.
+ */
+@Description("[Experimental] Used to configure the Cloud Debugger")
+@Experimental
+@Hidden
+public interface CloudDebuggerOptions {
+
+ /** Whether to enable the Cloud Debugger snapshot agent for the current job. */
+ @Description("Whether to enable the Cloud Debugger snapshot agent for the current job.")
+ boolean getEnableCloudDebugger();
+ void setEnableCloudDebugger(boolean enabled);
+
+ /** The Cloud Debugger debuggee to associate with. This should not be set directly. */
+ @Description("The Cloud Debugger debuggee to associate with. This should not be set directly.")
+ @Hidden
+ @Nullable Debuggee getDebuggee();
+ void setDebuggee(Debuggee debuggee);
+
+ /** The maximum cost (as a ratio of CPU time) allowed for evaluating conditional snapshots. */
+ @Description(
+ "The maximum cost (as a ratio of CPU time) allowed for evaluating conditional snapshots. "
+ + "Should be a double between 0 and 1. "
+ + "Snapshots will be cancelled if evaluating conditions takes more than this ratio of time.")
+ @Default.Double(0.01)
+ double getMaxConditionCost();
+ void setMaxConditionCost(double maxConditionCost);
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowPipelineDebugOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowPipelineDebugOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowPipelineDebugOptions.java
new file mode 100644
index 0000000..1be93eb
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowPipelineDebugOptions.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.options;
+
+import org.apache.beam.sdk.annotations.Experimental;
+
+import com.google.api.services.dataflow.Dataflow;
+import com.google.cloud.dataflow.sdk.util.DataflowPathValidator;
+import com.google.cloud.dataflow.sdk.util.DataflowTransport;
+import com.google.cloud.dataflow.sdk.util.GcsStager;
+import com.google.cloud.dataflow.sdk.util.InstanceBuilder;
+import com.google.cloud.dataflow.sdk.util.PathValidator;
+import com.google.cloud.dataflow.sdk.util.Stager;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Internal. Options used to control execution of the Dataflow SDK for
+ * debugging and testing purposes.
+ */
+@Description("[Internal] Options used to control execution of the Dataflow SDK for "
+ + "debugging and testing purposes.")
+@Hidden
+public interface DataflowPipelineDebugOptions extends PipelineOptions {
+
+ /**
+ * The list of backend experiments to enable.
+ *
+ * <p>Dataflow provides a number of experimental features that can be enabled
+ * with this flag.
+ *
+ * <p>Please sync with the Dataflow team before enabling any experiments.
+ */
+ @Description("[Experimental] Dataflow provides a number of experimental features that can "
+ + "be enabled with this flag. Please sync with the Dataflow team before enabling any "
+ + "experiments.")
+ @Experimental
+ List<String> getExperiments();
+ void setExperiments(List<String> value);
+
+ /**
+ * The root URL for the Dataflow API. {@code dataflowEndpoint} can override this value
+ * if it contains an absolute URL, otherwise {@code apiRootUrl} will be combined with
+ * {@code dataflowEndpoint} to generate the full URL to communicate with the Dataflow API.
+ */
+ @Description("The root URL for the Dataflow API. dataflowEndpoint can override this "
+ + "value if it contains an absolute URL, otherwise apiRootUrl will be combined with "
+ + "dataflowEndpoint to generate the full URL to communicate with the Dataflow API.")
+ @Default.String(Dataflow.DEFAULT_ROOT_URL)
+ String getApiRootUrl();
+ void setApiRootUrl(String value);
+
+ /**
+ * Dataflow endpoint to use.
+ *
+ * <p>Defaults to the current version of the Google Cloud Dataflow
+ * API, at the time the current SDK version was released.
+ *
+ * <p>If the string contains "://", then this is treated as a URL,
+ * otherwise {@link #getApiRootUrl()} is used as the root
+ * URL.
+ */
+ @Description("The URL for the Dataflow API. If the string contains \"://\", this"
+ + " will be treated as the entire URL, otherwise will be treated relative to apiRootUrl.")
+ @Default.String(Dataflow.DEFAULT_SERVICE_PATH)
+ String getDataflowEndpoint();
+ void setDataflowEndpoint(String value);
+
+ /**
+ * The path to write the translated Dataflow job specification out to
+ * at job submission time. The Dataflow job specification will be represented in JSON
+ * format.
+ */
+ @Description("The path to write the translated Dataflow job specification out to "
+ + "at job submission time. The Dataflow job specification will be represented in JSON "
+ + "format.")
+ String getDataflowJobFile();
+ void setDataflowJobFile(String value);
+
+ /**
+ * The class of the validator that should be created and used to validate paths.
+ * If pathValidator has not been set explicitly, an instance of this class will be
+ * constructed and used as the path validator.
+ */
+ @Description("The class of the validator that should be created and used to validate paths. "
+ + "If pathValidator has not been set explicitly, an instance of this class will be "
+ + "constructed and used as the path validator.")
+ @Default.Class(DataflowPathValidator.class)
+ Class<? extends PathValidator> getPathValidatorClass();
+ void setPathValidatorClass(Class<? extends PathValidator> validatorClass);
+
+ /**
+ * The path validator instance that should be used to validate paths.
+ * If no path validator has been set explicitly, the default is to use the instance factory that
+ * constructs a path validator based upon the currently set pathValidatorClass.
+ */
+ @JsonIgnore
+ @Description("The path validator instance that should be used to validate paths. "
+ + "If no path validator has been set explicitly, the default is to use the instance factory "
+ + "that constructs a path validator based upon the currently set pathValidatorClass.")
+ @Default.InstanceFactory(PathValidatorFactory.class)
+ PathValidator getPathValidator();
+ void setPathValidator(PathValidator validator);
+
+ /**
+ * The class responsible for staging resources to be accessible by workers
+ * during job execution. If stager has not been set explicitly, an instance of this class
+ * will be created and used as the resource stager.
+ */
+ @Description("The class of the stager that should be created and used to stage resources. "
+ + "If stager has not been set explicitly, an instance of the this class will be created "
+ + "and used as the resource stager.")
+ @Default.Class(GcsStager.class)
+ Class<? extends Stager> getStagerClass();
+ void setStagerClass(Class<? extends Stager> stagerClass);
+
+ /**
+ * The resource stager instance that should be used to stage resources.
+ * If no stager has been set explicitly, the default is to use the instance factory
+ * that constructs a resource stager based upon the currently set stagerClass.
+ */
+ @JsonIgnore
+ @Description("The resource stager instance that should be used to stage resources. "
+ + "If no stager has been set explicitly, the default is to use the instance factory "
+ + "that constructs a resource stager based upon the currently set stagerClass.")
+ @Default.InstanceFactory(StagerFactory.class)
+ Stager getStager();
+ void setStager(Stager stager);
+
+ /**
+ * An instance of the Dataflow client. Defaults to creating a Dataflow client
+ * using the current set of options.
+ */
+ @JsonIgnore
+ @Description("An instance of the Dataflow client. Defaults to creating a Dataflow client "
+ + "using the current set of options.")
+ @Default.InstanceFactory(DataflowClientFactory.class)
+ Dataflow getDataflowClient();
+ void setDataflowClient(Dataflow value);
+
+ /** Returns the default Dataflow client built from the passed in PipelineOptions. */
+ public static class DataflowClientFactory implements DefaultValueFactory<Dataflow> {
+ @Override
+ public Dataflow create(PipelineOptions options) {
+ return DataflowTransport.newDataflowClient(
+ options.as(DataflowPipelineOptions.class)).build();
+ }
+ }
+
+ /**
+ * Whether to update the currently running pipeline with the same name as this one.
+ *
+ * @deprecated This property is replaced by {@link DataflowPipelineOptions#getUpdate()}
+ */
+ @Deprecated
+ @Description("If set, replace the existing pipeline with the name specified by --jobName with "
+ + "this pipeline, preserving state.")
+ boolean getUpdate();
+ @Deprecated
+ void setUpdate(boolean value);
+
+ /**
+ * Mapping of old PTranform names to new ones, specified as JSON
+ * <code>{"oldName":"newName",...}</code>. To mark a transform as deleted, make newName the
+ * empty string.
+ */
+ @JsonIgnore
+ @Description(
+ "Mapping of old PTranform names to new ones, specified as JSON "
+ + "{\"oldName\":\"newName\",...}. To mark a transform as deleted, make newName the empty "
+ + "string.")
+ Map<String, String> getTransformNameMapping();
+ void setTransformNameMapping(Map<String, String> value);
+
+ /**
+ * Custom windmill_main binary to use with the streaming runner.
+ */
+ @Description("Custom windmill_main binary to use with the streaming runner")
+ String getOverrideWindmillBinary();
+ void setOverrideWindmillBinary(String value);
+
+ /**
+ * Number of threads to use on the Dataflow worker harness. If left unspecified,
+ * the Dataflow service will compute an appropriate number of threads to use.
+ */
+ @Description("Number of threads to use on the Dataflow worker harness. If left unspecified, "
+ + "the Dataflow service will compute an appropriate number of threads to use.")
+ int getNumberOfWorkerHarnessThreads();
+ void setNumberOfWorkerHarnessThreads(int value);
+
+ /**
+ * If {@literal true}, save a heap dump before killing a thread or process which is GC
+ * thrashing or out of memory. The location of the heap file will either be echoed back
+ * to the user, or the user will be given the opportunity to download the heap file.
+ *
+ * <p>
+ * CAUTION: Heap dumps can of comparable size to the default boot disk. Consider increasing
+ * the boot disk size before setting this flag to true.
+ */
+ @Description("If {@literal true}, save a heap dump before killing a thread or process "
+ + "which is GC thrashing or out of memory.")
+ boolean getDumpHeapOnOOM();
+ void setDumpHeapOnOOM(boolean dumpHeapBeforeExit);
+
+ /**
+ * Creates a {@link PathValidator} object using the class specified in
+ * {@link #getPathValidatorClass()}.
+ */
+ public static class PathValidatorFactory implements DefaultValueFactory<PathValidator> {
+ @Override
+ public PathValidator create(PipelineOptions options) {
+ DataflowPipelineDebugOptions debugOptions = options.as(DataflowPipelineDebugOptions.class);
+ return InstanceBuilder.ofType(PathValidator.class)
+ .fromClass(debugOptions.getPathValidatorClass())
+ .fromFactoryMethod("fromOptions")
+ .withArg(PipelineOptions.class, options)
+ .build();
+ }
+ }
+
+ /**
+ * Creates a {@link Stager} object using the class specified in
+ * {@link #getStagerClass()}.
+ */
+ public static class StagerFactory implements DefaultValueFactory<Stager> {
+ @Override
+ public Stager create(PipelineOptions options) {
+ DataflowPipelineDebugOptions debugOptions = options.as(DataflowPipelineDebugOptions.class);
+ return InstanceBuilder.ofType(Stager.class)
+ .fromClass(debugOptions.getStagerClass())
+ .fromFactoryMethod("fromOptions")
+ .withArg(PipelineOptions.class, options)
+ .build();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowPipelineOptions.java
new file mode 100644
index 0000000..dbfafd1
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowPipelineOptions.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.options;
+
+import com.google.cloud.dataflow.sdk.runners.DataflowPipeline;
+import com.google.common.base.MoreObjects;
+
+import org.joda.time.DateTimeUtils;
+import org.joda.time.DateTimeZone;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+/**
+ * Options that can be used to configure the {@link DataflowPipeline}.
+ */
+@Description("Options that configure the Dataflow pipeline.")
+public interface DataflowPipelineOptions extends
+ PipelineOptions, GcpOptions, ApplicationNameOptions, DataflowPipelineDebugOptions,
+ DataflowPipelineWorkerPoolOptions, BigQueryOptions,
+ GcsOptions, StreamingOptions, CloudDebuggerOptions, DataflowWorkerLoggingOptions,
+ DataflowProfilingOptions, PubsubOptions {
+
+ @Description("Project id. Required when running a Dataflow in the cloud. "
+ + "See https://cloud.google.com/storage/docs/projects for further details.")
+ @Override
+ @Validation.Required
+ @Default.InstanceFactory(DefaultProjectFactory.class)
+ String getProject();
+ @Override
+ void setProject(String value);
+
+ /**
+ * GCS path for staging local files, e.g. gs://bucket/object
+ *
+ * <p>Must be a valid Cloud Storage URL, beginning with the prefix "gs://"
+ *
+ * <p>At least one of {@link PipelineOptions#getTempLocation()} or {@link #getStagingLocation()}
+ * must be set. If {@link #getStagingLocation()} is not set, then the Dataflow
+ * pipeline defaults to using {@link PipelineOptions#getTempLocation()}.
+ */
+ @Description("GCS path for staging local files, e.g. \"gs://bucket/object\". "
+ + "Must be a valid Cloud Storage URL, beginning with the prefix \"gs://\". "
+ + "At least one of stagingLocation or tempLocation must be set. If stagingLocation is unset, "
+ + "defaults to using tempLocation.")
+ String getStagingLocation();
+ void setStagingLocation(String value);
+
+ /**
+ * The Dataflow job name is used as an idempotence key within the Dataflow service.
+ * If there is an existing job that is currently active, another active job with the same
+ * name will not be able to be created. Defaults to using the ApplicationName-UserName-Date.
+ */
+ @Description("The Dataflow job name is used as an idempotence key within the Dataflow service. "
+ + "If there is an existing job that is currently active, another active job with the same "
+ + "name will not be able to be created. Defaults to using the ApplicationName-UserName-Date.")
+ @Default.InstanceFactory(JobNameFactory.class)
+ String getJobName();
+ void setJobName(String value);
+
+ /**
+ * Whether to update the currently running pipeline with the same name as this one.
+ */
+ @Override
+ @SuppressWarnings("deprecation") // base class member deprecated in favor of this one.
+ @Description(
+ "If set, replace the existing pipeline with the name specified by --jobName with "
+ + "this pipeline, preserving state.")
+ boolean getUpdate();
+ @Override
+ @SuppressWarnings("deprecation") // base class member deprecated in favor of this one.
+ void setUpdate(boolean value);
+
+ /**
+ * Returns a normalized job name constructed from {@link ApplicationNameOptions#getAppName()}, the
+ * local system user name (if available), and the current time. The normalization makes sure that
+ * the job name matches the required pattern of [a-z]([-a-z0-9]*[a-z0-9])? and length limit of 40
+ * characters.
+ *
+ * <p>This job name factory is only able to generate one unique name per second per application
+ * and user combination.
+ */
+ public static class JobNameFactory implements DefaultValueFactory<String> {
+ private static final DateTimeFormatter FORMATTER =
+ DateTimeFormat.forPattern("MMddHHmmss").withZone(DateTimeZone.UTC);
+
+ @Override
+ public String create(PipelineOptions options) {
+ String appName = options.as(ApplicationNameOptions.class).getAppName();
+ String normalizedAppName = appName == null || appName.length() == 0 ? "dataflow"
+ : appName.toLowerCase()
+ .replaceAll("[^a-z0-9]", "0")
+ .replaceAll("^[^a-z]", "a");
+ String userName = MoreObjects.firstNonNull(System.getProperty("user.name"), "");
+ String normalizedUserName = userName.toLowerCase()
+ .replaceAll("[^a-z0-9]", "0");
+ String datePart = FORMATTER.print(DateTimeUtils.currentTimeMillis());
+ return normalizedAppName + "-" + normalizedUserName + "-" + datePart;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowPipelineWorkerPoolOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowPipelineWorkerPoolOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowPipelineWorkerPoolOptions.java
new file mode 100644
index 0000000..44a9b00
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowPipelineWorkerPoolOptions.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.options;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+import java.util.List;
+
+/**
+ * Options that are used to configure the Dataflow pipeline worker pool.
+ */
+@Description("Options that are used to configure the Dataflow pipeline worker pool.")
+public interface DataflowPipelineWorkerPoolOptions extends PipelineOptions {
+ /**
+ * Number of workers to use when executing the Dataflow job. Note that selection of an autoscaling
+ * algorithm other then {@code NONE} will affect the size of the worker pool. If left unspecified,
+ * the Dataflow service will determine the number of workers.
+ */
+ @Description("Number of workers to use when executing the Dataflow job. Note that "
+ + "selection of an autoscaling algorithm other then \"NONE\" will affect the "
+ + "size of the worker pool. If left unspecified, the Dataflow service will "
+ + "determine the number of workers.")
+ int getNumWorkers();
+ void setNumWorkers(int value);
+
+ /**
+ * Type of autoscaling algorithm to use.
+ */
+ @Experimental(Experimental.Kind.AUTOSCALING)
+ public enum AutoscalingAlgorithmType {
+ /** Use numWorkers machines. Do not autoscale the worker pool. */
+ NONE("AUTOSCALING_ALGORITHM_NONE"),
+
+ @Deprecated
+ BASIC("AUTOSCALING_ALGORITHM_BASIC"),
+
+ /** Autoscale the workerpool based on throughput (up to maxNumWorkers). */
+ THROUGHPUT_BASED("AUTOSCALING_ALGORITHM_BASIC");
+
+ private final String algorithm;
+
+ private AutoscalingAlgorithmType(String algorithm) {
+ this.algorithm = algorithm;
+ }
+
+ /** Returns the string representation of this type. */
+ public String getAlgorithm() {
+ return this.algorithm;
+ }
+ }
+
+ /**
+ * [Experimental] The autoscaling algorithm to use for the workerpool.
+ *
+ * <ul>
+ * <li>NONE: does not change the size of the worker pool.</li>
+ * <li>BASIC: autoscale the worker pool size up to maxNumWorkers until the job completes.</li>
+ * <li>THROUGHPUT_BASED: autoscale the workerpool based on throughput (up to maxNumWorkers).
+ * </li>
+ * </ul>
+ */
+ @Description("[Experimental] The autoscaling algorithm to use for the workerpool. "
+ + "NONE: does not change the size of the worker pool. "
+ + "BASIC (deprecated): autoscale the worker pool size up to maxNumWorkers until the job "
+ + "completes. "
+ + "THROUGHPUT_BASED: autoscale the workerpool based on throughput (up to maxNumWorkers).")
+ @Experimental(Experimental.Kind.AUTOSCALING)
+ AutoscalingAlgorithmType getAutoscalingAlgorithm();
+ void setAutoscalingAlgorithm(AutoscalingAlgorithmType value);
+
+ /**
+ * The maximum number of workers to use for the workerpool. This options limits the size of the
+ * workerpool for the lifetime of the job, including
+ * <a href="https://cloud.google.com/dataflow/pipelines/updating-a-pipeline">pipeline updates</a>.
+ * If left unspecified, the Dataflow service will compute a ceiling.
+ */
+ @Description("The maximum number of workers to use for the workerpool. This options limits the "
+ + "size of the workerpool for the lifetime of the job, including pipeline updates. "
+ + "If left unspecified, the Dataflow service will compute a ceiling.")
+ int getMaxNumWorkers();
+ void setMaxNumWorkers(int value);
+
+ /**
+ * Remote worker disk size, in gigabytes, or 0 to use the default size.
+ */
+ @Description("Remote worker disk size, in gigabytes, or 0 to use the default size.")
+ int getDiskSizeGb();
+ void setDiskSizeGb(int value);
+
+ /**
+ * Docker container image that executes Dataflow worker harness, residing in Google Container
+ * Registry.
+ */
+ @Default.InstanceFactory(WorkerHarnessContainerImageFactory.class)
+ @Description("Docker container image that executes Dataflow worker harness, residing in Google "
+ + " Container Registry.")
+ @Hidden
+ String getWorkerHarnessContainerImage();
+ void setWorkerHarnessContainerImage(String value);
+
+ /**
+ * Returns the default Docker container image that executes Dataflow worker harness, residing in
+ * Google Container Registry.
+ */
+ public static class WorkerHarnessContainerImageFactory
+ implements DefaultValueFactory<String> {
+ @Override
+ public String create(PipelineOptions options) {
+ DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
+ if (dataflowOptions.isStreaming()) {
+ return DataflowPipelineRunner.STREAMING_WORKER_HARNESS_CONTAINER_IMAGE;
+ } else {
+ return DataflowPipelineRunner.BATCH_WORKER_HARNESS_CONTAINER_IMAGE;
+ }
+ }
+ }
+
+ /**
+ * GCE <a href="https://cloud.google.com/compute/docs/networking">network</a> for launching
+ * workers.
+ *
+ * <p>Default is up to the Dataflow service.
+ */
+ @Description("GCE network for launching workers. For more information, see the reference "
+ + "documentation https://cloud.google.com/compute/docs/networking. "
+ + "Default is up to the Dataflow service.")
+ String getNetwork();
+ void setNetwork(String value);
+
+ /**
+ * GCE <a href="https://cloud.google.com/compute/docs/networking">subnetwork</a> for launching
+ * workers.
+ *
+ * <p>Default is up to the Dataflow service. Expected format is
+ * regions/REGION/subnetworks/SUBNETWORK.
+ *
+ * <p>You may also need to specify network option.
+ */
+ @Description("GCE subnetwork for launching workers. For more information, see the reference "
+ + "documentation https://cloud.google.com/compute/docs/networking. "
+ + "Default is up to the Dataflow service.")
+ String getSubnetwork();
+ void setSubnetwork(String value);
+
+ /**
+ * GCE <a href="https://developers.google.com/compute/docs/zones"
+ * >availability zone</a> for launching workers.
+ *
+ * <p>Default is up to the Dataflow service.
+ */
+ @Description("GCE availability zone for launching workers. See "
+ + "https://developers.google.com/compute/docs/zones for a list of valid options. "
+ + "Default is up to the Dataflow service.")
+ String getZone();
+ void setZone(String value);
+
+ /**
+ * Machine type to create Dataflow worker VMs as.
+ *
+ * <p>See <a href="https://cloud.google.com/compute/docs/machine-types">GCE machine types</a>
+ * for a list of valid options.
+ *
+ * <p>If unset, the Dataflow service will choose a reasonable default.
+ */
+ @Description("Machine type to create Dataflow worker VMs as. See "
+ + "https://cloud.google.com/compute/docs/machine-types for a list of valid options. "
+ + "If unset, the Dataflow service will choose a reasonable default.")
+ String getWorkerMachineType();
+ void setWorkerMachineType(String value);
+
+ /**
+ * The policy for tearing down the workers spun up by the service.
+ */
+ public enum TeardownPolicy {
+ /**
+ * All VMs created for a Dataflow job are deleted when the job finishes, regardless of whether
+ * it fails or succeeds.
+ */
+ TEARDOWN_ALWAYS("TEARDOWN_ALWAYS"),
+ /**
+ * All VMs created for a Dataflow job are left running when the job finishes, regardless of
+ * whether it fails or succeeds.
+ */
+ TEARDOWN_NEVER("TEARDOWN_NEVER"),
+ /**
+ * All VMs created for a Dataflow job are deleted when the job succeeds, but are left running
+ * when it fails. (This is typically used for debugging failing jobs by SSHing into the
+ * workers.)
+ */
+ TEARDOWN_ON_SUCCESS("TEARDOWN_ON_SUCCESS");
+
+ private final String teardownPolicy;
+
+ private TeardownPolicy(String teardownPolicy) {
+ this.teardownPolicy = teardownPolicy;
+ }
+
+ public String getTeardownPolicyName() {
+ return this.teardownPolicy;
+ }
+ }
+
+ /**
+ * The teardown policy for the VMs.
+ *
+ * <p>If unset, the Dataflow service will choose a reasonable default.
+ */
+ @Description("The teardown policy for the VMs. If unset, the Dataflow service will "
+ + "choose a reasonable default.")
+ TeardownPolicy getTeardownPolicy();
+ void setTeardownPolicy(TeardownPolicy value);
+
+ /**
+ * List of local files to make available to workers.
+ *
+ * <p>Files are placed on the worker's classpath.
+ *
+ * <p>The default value is the list of jars from the main program's classpath.
+ */
+ @Description("Files to stage on GCS and make available to workers. "
+ + "Files are placed on the worker's classpath. "
+ + "The default value is all files from the classpath.")
+ @JsonIgnore
+ List<String> getFilesToStage();
+ void setFilesToStage(List<String> value);
+
+ /**
+ * Specifies what type of persistent disk should be used. The value should be a full or partial
+ * URL of a disk type resource, e.g., zones/us-central1-f/disks/pd-standard. For
+ * more information, see the
+ * <a href="https://cloud.google.com/compute/docs/reference/latest/diskTypes">API reference
+ * documentation for DiskTypes</a>.
+ */
+ @Description("Specifies what type of persistent disk should be used. The value should be a full "
+ + "or partial URL of a disk type resource, e.g., zones/us-central1-f/disks/pd-standard. For "
+ + "more information, see the API reference documentation for DiskTypes: "
+ + "https://cloud.google.com/compute/docs/reference/latest/diskTypes")
+ String getWorkerDiskType();
+ void setWorkerDiskType(String value);
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowProfilingOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowProfilingOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowProfilingOptions.java
new file mode 100644
index 0000000..3cd7b03
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowProfilingOptions.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.options;
+
+import org.apache.beam.sdk.annotations.Experimental;
+
+import java.util.HashMap;
+
+/**
+ * Options for controlling profiling of pipeline execution.
+ */
+@Description("[Experimental] Used to configure profiling of the Dataflow pipeline")
+@Experimental
+@Hidden
+public interface DataflowProfilingOptions {
+
+ @Description("Whether to periodically dump profiling information to local disk.\n"
+ + "WARNING: Enabling this option may fill local disk with profiling information.")
+ boolean getEnableProfilingAgent();
+ void setEnableProfilingAgent(boolean enabled);
+
+ @Description(
+ "[INTERNAL] Additional configuration for the profiling agent. Not typically necessary.")
+ @Hidden
+ DataflowProfilingAgentConfiguration getProfilingAgentConfiguration();
+ void setProfilingAgentConfiguration(DataflowProfilingAgentConfiguration configuration);
+
+ /**
+ * Configuration the for profiling agent.
+ */
+ public static class DataflowProfilingAgentConfiguration extends HashMap<String, Object> {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowWorkerHarnessOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowWorkerHarnessOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowWorkerHarnessOptions.java
new file mode 100644
index 0000000..7705b66
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowWorkerHarnessOptions.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.options;
+
+/**
+ * Options that are used exclusively within the Dataflow worker harness.
+ * These options have no effect at pipeline creation time.
+ */
+@Description("[Internal] Options that are used exclusively within the Dataflow worker harness. "
+ + "These options have no effect at pipeline creation time.")
+@Hidden
+public interface DataflowWorkerHarnessOptions extends DataflowPipelineOptions {
+ /**
+ * The identity of the worker running this pipeline.
+ */
+ @Description("The identity of the worker running this pipeline.")
+ String getWorkerId();
+ void setWorkerId(String value);
+
+ /**
+ * The identity of the Dataflow job.
+ */
+ @Description("The identity of the Dataflow job.")
+ String getJobId();
+ void setJobId(String value);
+
+ /**
+ * The size of the worker's in-memory cache, in megabytes.
+ *
+ * <p>Currently, this cache is used for storing read values of side inputs.
+ */
+ @Description("The size of the worker's in-memory cache, in megabytes.")
+ @Default.Integer(100)
+ Integer getWorkerCacheMb();
+ void setWorkerCacheMb(Integer value);
+}