You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/04/14 06:48:21 UTC

[34/74] [partial] incubator-beam git commit: Rename com/google/cloud/dataflow->org/apache/beam

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineRunner.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineRunner.java
deleted file mode 100644
index e961066..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineRunner.java
+++ /dev/null
@@ -1,256 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.testing;
-
-import com.google.api.services.dataflow.model.JobMessage;
-import com.google.api.services.dataflow.model.JobMetrics;
-import com.google.api.services.dataflow.model.MetricUpdate;
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.PipelineResult.State;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.runners.DataflowJobExecutionException;
-import com.google.cloud.dataflow.sdk.runners.DataflowPipelineJob;
-import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
-import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.util.MonitoringUtil;
-import com.google.cloud.dataflow.sdk.util.MonitoringUtil.JobMessagesHandler;
-import com.google.cloud.dataflow.sdk.values.PInput;
-import com.google.cloud.dataflow.sdk.values.POutput;
-import com.google.common.base.Optional;
-import com.google.common.base.Throwables;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-/**
- * {@link TestDataflowPipelineRunner} is a pipeline runner that wraps a
- * {@link DataflowPipelineRunner} when running tests against the {@link TestPipeline}.
- *
- * @see TestPipeline
- */
-public class TestDataflowPipelineRunner extends PipelineRunner<DataflowPipelineJob> {
-  private static final String TENTATIVE_COUNTER = "tentative";
-  private static final Logger LOG = LoggerFactory.getLogger(TestDataflowPipelineRunner.class);
-
-  private final TestDataflowPipelineOptions options;
-  private final DataflowPipelineRunner runner;
-  private int expectedNumberOfAssertions = 0;
-
-  TestDataflowPipelineRunner(TestDataflowPipelineOptions options) {
-    this.options = options;
-    this.runner = DataflowPipelineRunner.fromOptions(options);
-  }
-
-  /**
-   * Constructs a runner from the provided options.
-   */
-  public static TestDataflowPipelineRunner fromOptions(
-      PipelineOptions options) {
-    TestDataflowPipelineOptions dataflowOptions = options.as(TestDataflowPipelineOptions.class);
-
-    return new TestDataflowPipelineRunner(dataflowOptions);
-  }
-
-  @Override
-  public DataflowPipelineJob run(Pipeline pipeline) {
-    return run(pipeline, runner);
-  }
-
-  DataflowPipelineJob run(Pipeline pipeline, DataflowPipelineRunner runner) {
-
-    final DataflowPipelineJob job;
-    try {
-      job = runner.run(pipeline);
-    } catch (DataflowJobExecutionException ex) {
-      throw new IllegalStateException("The dataflow failed.");
-    }
-
-    LOG.info("Running Dataflow job {} with {} expected assertions.",
-        job.getJobId(), expectedNumberOfAssertions);
-
-    CancelWorkflowOnError messageHandler = new CancelWorkflowOnError(
-        job, new MonitoringUtil.PrintHandler(options.getJobMessageOutput()));
-
-    try {
-      final Optional<Boolean> result;
-
-      if (options.isStreaming()) {
-        Future<Optional<Boolean>> resultFuture = options.getExecutorService().submit(
-            new Callable<Optional<Boolean>>() {
-          @Override
-          public Optional<Boolean> call() throws Exception {
-            try {
-              for (;;) {
-                Optional<Boolean> result = checkForSuccess(job);
-                if (result.isPresent()) {
-                  return result;
-                }
-                Thread.sleep(10000L);
-              }
-            } finally {
-              LOG.info("Cancelling Dataflow job {}", job.getJobId());
-              job.cancel();
-            }
-          }
-        });
-        State finalState = job.waitToFinish(10L, TimeUnit.MINUTES, messageHandler);
-        if (finalState == null || finalState == State.RUNNING) {
-          LOG.info("Dataflow job {} took longer than 10 minutes to complete, cancelling.",
-              job.getJobId());
-          job.cancel();
-        }
-        result = resultFuture.get();
-      } else {
-        job.waitToFinish(-1, TimeUnit.SECONDS, messageHandler);
-        result = checkForSuccess(job);
-      }
-      if (!result.isPresent()) {
-        throw new IllegalStateException(
-            "The dataflow did not output a success or failure metric.");
-      } else if (!result.get()) {
-        throw new AssertionError(messageHandler.getErrorMessage() == null ?
-            "The dataflow did not return a failure reason."
-            : messageHandler.getErrorMessage());
-      }
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      throw new RuntimeException(e);
-    } catch (ExecutionException e) {
-      Throwables.propagateIfPossible(e.getCause());
-      throw new RuntimeException(e.getCause());
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-    return job;
-  }
-
-  @Override
-  public <OutputT extends POutput, InputT extends PInput> OutputT apply(
-      PTransform<InputT, OutputT> transform, InputT input) {
-    if (transform instanceof PAssert.OneSideInputAssert
-        || transform instanceof PAssert.TwoSideInputAssert) {
-      expectedNumberOfAssertions += 1;
-    }
-
-    return runner.apply(transform, input);
-  }
-
-  Optional<Boolean> checkForSuccess(DataflowPipelineJob job)
-      throws IOException {
-    State state = job.getState();
-    if (state == State.FAILED || state == State.CANCELLED) {
-      LOG.info("The pipeline failed");
-      return Optional.of(false);
-    }
-
-    JobMetrics metrics = job.getDataflowClient().projects().jobs()
-        .getMetrics(job.getProjectId(), job.getJobId()).execute();
-
-    if (metrics == null || metrics.getMetrics() == null) {
-      LOG.warn("Metrics not present for Dataflow job {}.", job.getJobId());
-    } else {
-      int successes = 0;
-      int failures = 0;
-      for (MetricUpdate metric : metrics.getMetrics()) {
-        if (metric.getName() == null || metric.getName().getContext() == null
-            || !metric.getName().getContext().containsKey(TENTATIVE_COUNTER)) {
-          // Don't double count using the non-tentative version of the metric.
-          continue;
-        }
-        if (PAssert.SUCCESS_COUNTER.equals(metric.getName().getName())) {
-          successes += ((BigDecimal) metric.getScalar()).intValue();
-        } else if (PAssert.FAILURE_COUNTER.equals(metric.getName().getName())) {
-          failures += ((BigDecimal) metric.getScalar()).intValue();
-        }
-      }
-
-      if (failures > 0) {
-        LOG.info("Found result while running Dataflow job {}. Found {} success, {} failures out of "
-            + "{} expected assertions.", job.getJobId(), successes, failures,
-            expectedNumberOfAssertions);
-        return Optional.of(false);
-      } else if (successes >= expectedNumberOfAssertions) {
-        LOG.info("Found result while running Dataflow job {}. Found {} success, {} failures out of "
-            + "{} expected assertions.", job.getJobId(), successes, failures,
-            expectedNumberOfAssertions);
-        return Optional.of(true);
-      }
-
-      LOG.info("Running Dataflow job {}. Found {} success, {} failures out of {} expected "
-          + "assertions.", job.getJobId(), successes, failures, expectedNumberOfAssertions);
-    }
-
-    return Optional.<Boolean>absent();
-  }
-
-  @Override
-  public String toString() {
-    return "TestDataflowPipelineRunner#" + options.getAppName();
-  }
-
-  /**
-   * Cancels the workflow on the first error message it sees.
-   *
-   * <p>Creates an error message representing the concatenation of all error messages seen.
-   */
-  private static class CancelWorkflowOnError implements JobMessagesHandler {
-    private final DataflowPipelineJob job;
-    private final JobMessagesHandler messageHandler;
-    private final StringBuffer errorMessage;
-    private CancelWorkflowOnError(DataflowPipelineJob job, JobMessagesHandler messageHandler) {
-      this.job = job;
-      this.messageHandler = messageHandler;
-      this.errorMessage = new StringBuffer();
-    }
-
-    @Override
-    public void process(List<JobMessage> messages) {
-      messageHandler.process(messages);
-      for (JobMessage message : messages) {
-        if (message.getMessageImportance() != null
-            && message.getMessageImportance().equals("JOB_MESSAGE_ERROR")) {
-          LOG.info("Dataflow job {} threw exception. Failure message was: {}",
-              job.getJobId(), message.getMessageText());
-          errorMessage.append(message.getMessageText());
-        }
-      }
-      if (errorMessage.length() > 0) {
-        LOG.info("Cancelling Dataflow job {}", job.getJobId());
-        try {
-          job.cancel();
-        } catch (Exception ignore) {
-          // The TestDataflowPipelineRunner will thrown an AssertionError with the job failure
-          // messages.
-        }
-      }
-    }
-
-    private String getErrorMessage() {
-      return errorMessage.toString();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/util/DataflowPathValidator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/util/DataflowPathValidator.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/util/DataflowPathValidator.java
deleted file mode 100644
index aeb864a..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/util/DataflowPathValidator.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath;
-import com.google.common.base.Preconditions;
-
-import java.io.IOException;
-
-/**
- * GCP implementation of {@link PathValidator}. Only GCS paths are allowed.
- */
-public class DataflowPathValidator implements PathValidator {
-
-  private DataflowPipelineOptions dataflowOptions;
-
-  DataflowPathValidator(DataflowPipelineOptions options) {
-    this.dataflowOptions = options;
-  }
-
-  public static DataflowPathValidator fromOptions(PipelineOptions options) {
-    return new DataflowPathValidator(options.as(DataflowPipelineOptions.class));
-  }
-
-  /**
-   * Validates the the input GCS path is accessible and that the path
-   * is well formed.
-   */
-  @Override
-  public String validateInputFilePatternSupported(String filepattern) {
-    GcsPath gcsPath = getGcsPath(filepattern);
-    Preconditions.checkArgument(
-        dataflowOptions.getGcsUtil().isGcsPatternSupported(gcsPath.getObject()));
-    String returnValue = verifyPath(filepattern);
-    verifyPathIsAccessible(filepattern, "Could not find file %s");
-    return returnValue;
-  }
-
-  /**
-   * Validates the the output GCS path is accessible and that the path
-   * is well formed.
-   */
-  @Override
-  public String validateOutputFilePrefixSupported(String filePrefix) {
-    String returnValue = verifyPath(filePrefix);
-    verifyPathIsAccessible(filePrefix, "Output path does not exist or is not writeable: %s");
-    return returnValue;
-  }
-
-  @Override
-  public String verifyPath(String path) {
-    GcsPath gcsPath = getGcsPath(path);
-    Preconditions.checkArgument(gcsPath.isAbsolute(),
-        "Must provide absolute paths for Dataflow");
-    Preconditions.checkArgument(!gcsPath.getObject().contains("//"),
-        "Dataflow Service does not allow objects with consecutive slashes");
-    return gcsPath.toResourceName();
-  }
-
-  private void verifyPathIsAccessible(String path, String errorMessage) {
-    GcsPath gcsPath = getGcsPath(path);
-    try {
-      Preconditions.checkArgument(dataflowOptions.getGcsUtil().bucketExists(gcsPath),
-        errorMessage, path);
-    } catch (IOException e) {
-      throw new RuntimeException(
-          String.format("Unable to verify that GCS bucket gs://%s exists.", gcsPath.getBucket()),
-          e);
-    }
-  }
-
-  private GcsPath getGcsPath(String path) {
-    try {
-      return GcsPath.fromUri(path);
-    } catch (IllegalArgumentException e) {
-      throw new IllegalArgumentException(String.format(
-          "%s expected a valid 'gs://' path but was given '%s'",
-          dataflowOptions.getRunner().getSimpleName(), path), e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/util/DataflowTransport.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/util/DataflowTransport.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/util/DataflowTransport.java
deleted file mode 100644
index 18e6654..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/util/DataflowTransport.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.util;
-
-import static com.google.cloud.dataflow.sdk.util.Transport.getJsonFactory;
-import static com.google.cloud.dataflow.sdk.util.Transport.getTransport;
-
-import com.google.api.client.auth.oauth2.Credential;
-import com.google.api.client.http.HttpRequestInitializer;
-import com.google.api.services.clouddebugger.v2.Clouddebugger;
-import com.google.api.services.dataflow.Dataflow;
-import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
-import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer;
-import com.google.common.collect.ImmutableList;
-
-import java.net.MalformedURLException;
-import java.net.URL;
-
-/**
- * Helpers for cloud communication.
- */
-public class DataflowTransport {
-
-
-  private static class ApiComponents {
-    public String rootUrl;
-    public String servicePath;
-
-    public ApiComponents(String root, String path) {
-      this.rootUrl = root;
-      this.servicePath = path;
-    }
-  }
-
-  private static ApiComponents apiComponentsFromUrl(String urlString) {
-    try {
-      URL url = new URL(urlString);
-      String rootUrl = url.getProtocol() + "://" + url.getHost() +
-          (url.getPort() > 0 ? ":" + url.getPort() : "");
-      return new ApiComponents(rootUrl, url.getPath());
-    } catch (MalformedURLException e) {
-      throw new RuntimeException("Invalid URL: " + urlString);
-    }
-  }
-
-  /**
-   * Returns a Google Cloud Dataflow client builder.
-   */
-  public static Dataflow.Builder newDataflowClient(DataflowPipelineOptions options) {
-    String servicePath = options.getDataflowEndpoint();
-    ApiComponents components;
-    if (servicePath.contains("://")) {
-      components = apiComponentsFromUrl(servicePath);
-    } else {
-      components = new ApiComponents(options.getApiRootUrl(), servicePath);
-    }
-
-    return new Dataflow.Builder(getTransport(),
-        getJsonFactory(),
-        chainHttpRequestInitializer(
-            options.getGcpCredential(),
-            // Do not log 404. It clutters the output and is possibly even required by the caller.
-            new RetryHttpRequestInitializer(ImmutableList.of(404))))
-        .setApplicationName(options.getAppName())
-        .setRootUrl(components.rootUrl)
-        .setServicePath(components.servicePath)
-        .setGoogleClientRequestInitializer(options.getGoogleApiTrace());
-  }
-
-  public static Clouddebugger.Builder newClouddebuggerClient(DataflowPipelineOptions options) {
-    return new Clouddebugger.Builder(getTransport(),
-        getJsonFactory(),
-        chainHttpRequestInitializer(options.getGcpCredential(), new RetryHttpRequestInitializer()))
-        .setApplicationName(options.getAppName())
-        .setGoogleClientRequestInitializer(options.getGoogleApiTrace());
-  }
-
-  /**
-   * Returns a Dataflow client that does not automatically retry failed
-   * requests.
-   */
-  public static Dataflow.Builder
-      newRawDataflowClient(DataflowPipelineOptions options) {
-    return newDataflowClient(options)
-        .setHttpRequestInitializer(options.getGcpCredential())
-        .setGoogleClientRequestInitializer(options.getGoogleApiTrace());
-  }
-
-  private static HttpRequestInitializer chainHttpRequestInitializer(
-      Credential credential, HttpRequestInitializer httpRequestInitializer) {
-    if (credential == null) {
-      return httpRequestInitializer;
-    } else {
-      return new ChainingHttpRequestInitializer(credential, httpRequestInitializer);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/util/GcsStager.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/util/GcsStager.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/util/GcsStager.java
deleted file mode 100644
index 7307e83..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/util/GcsStager.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.api.services.dataflow.model.DataflowPackage;
-import com.google.cloud.dataflow.sdk.options.DataflowPipelineDebugOptions;
-import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.common.base.Preconditions;
-
-import java.util.List;
-
-/**
- * Utility class for staging files to GCS.
- */
-public class GcsStager implements Stager {
-  private DataflowPipelineOptions options;
-
-  private GcsStager(DataflowPipelineOptions options) {
-    this.options = options;
-  }
-
-  public static GcsStager fromOptions(PipelineOptions options) {
-    return new GcsStager(options.as(DataflowPipelineOptions.class));
-  }
-
-  @Override
-  public List<DataflowPackage> stageFiles() {
-    Preconditions.checkNotNull(options.getStagingLocation());
-    List<String> filesToStage = options.getFilesToStage();
-    String windmillBinary =
-        options.as(DataflowPipelineDebugOptions.class).getOverrideWindmillBinary();
-    if (windmillBinary != null) {
-      filesToStage.add("windmill_main=" + windmillBinary);
-    }
-    return PackageUtil.stageClasspathElements(
-        options.getFilesToStage(), options.getStagingLocation());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/util/MonitoringUtil.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/util/MonitoringUtil.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/util/MonitoringUtil.java
deleted file mode 100644
index 2c06a92..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/util/MonitoringUtil.java
+++ /dev/null
@@ -1,235 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.util;
-
-import static com.google.cloud.dataflow.sdk.util.TimeUtil.fromCloudTime;
-
-import com.google.api.services.dataflow.Dataflow;
-import com.google.api.services.dataflow.Dataflow.Projects.Jobs.Messages;
-import com.google.api.services.dataflow.model.JobMessage;
-import com.google.api.services.dataflow.model.ListJobMessagesResponse;
-import com.google.cloud.dataflow.sdk.PipelineResult.State;
-import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
-import com.google.common.base.MoreObjects;
-import com.google.common.collect.ImmutableMap;
-
-import org.joda.time.Instant;
-
-import java.io.IOException;
-import java.io.PrintStream;
-import java.io.UnsupportedEncodingException;
-import java.net.URLEncoder;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-
-import javax.annotation.Nullable;
-
-/**
- * A helper class for monitoring jobs submitted to the service.
- */
-public final class MonitoringUtil {
-
-  private static final String GCLOUD_DATAFLOW_PREFIX = "gcloud alpha dataflow";
-  private static final String ENDPOINT_OVERRIDE_ENV_VAR =
-      "CLOUDSDK_API_ENDPOINT_OVERRIDES_DATAFLOW";
-
-  private static final Map<String, State> DATAFLOW_STATE_TO_JOB_STATE =
-      ImmutableMap
-          .<String, State>builder()
-          .put("JOB_STATE_UNKNOWN", State.UNKNOWN)
-          .put("JOB_STATE_STOPPED", State.STOPPED)
-          .put("JOB_STATE_RUNNING", State.RUNNING)
-          .put("JOB_STATE_DONE", State.DONE)
-          .put("JOB_STATE_FAILED", State.FAILED)
-          .put("JOB_STATE_CANCELLED", State.CANCELLED)
-          .put("JOB_STATE_UPDATED", State.UPDATED)
-          .build();
-
-  private String projectId;
-  private Messages messagesClient;
-
-  /**
-   * An interface that can be used for defining callbacks to receive a list
-   * of JobMessages containing monitoring information.
-   */
-  public interface JobMessagesHandler {
-    /** Process the rows. */
-    void process(List<JobMessage> messages);
-  }
-
-  /** A handler that prints monitoring messages to a stream. */
-  public static class PrintHandler implements JobMessagesHandler {
-    private PrintStream out;
-
-    /**
-     * Construct the handler.
-     *
-     * @param stream The stream to write the messages to.
-     */
-    public PrintHandler(PrintStream stream) {
-      out = stream;
-    }
-
-    @Override
-    public void process(List<JobMessage> messages) {
-      for (JobMessage message : messages) {
-        if (message.getMessageText() == null || message.getMessageText().isEmpty()) {
-          continue;
-        }
-        String importanceString = null;
-        if (message.getMessageImportance() == null) {
-          continue;
-        } else if (message.getMessageImportance().equals("JOB_MESSAGE_ERROR")) {
-          importanceString = "Error:   ";
-        } else if (message.getMessageImportance().equals("JOB_MESSAGE_WARNING")) {
-          importanceString = "Warning: ";
-        } else if (message.getMessageImportance().equals("JOB_MESSAGE_BASIC")) {
-          importanceString = "Basic:  ";
-        } else if (message.getMessageImportance().equals("JOB_MESSAGE_DETAILED")) {
-          importanceString = "Detail:  ";
-        } else {
-          // TODO: Remove filtering here once getJobMessages supports minimum
-          // importance.
-          continue;
-        }
-        @Nullable Instant time = TimeUtil.fromCloudTime(message.getTime());
-        if (time == null) {
-          out.print("UNKNOWN TIMESTAMP: ");
-        } else {
-          out.print(time + ": ");
-        }
-        if (importanceString != null) {
-          out.print(importanceString);
-        }
-        out.println(message.getMessageText());
-      }
-      out.flush();
-    }
-  }
-
-  /** Construct a helper for monitoring. */
-  public MonitoringUtil(String projectId, Dataflow dataflow) {
-    this(projectId, dataflow.projects().jobs().messages());
-  }
-
-  // @VisibleForTesting
-  MonitoringUtil(String projectId, Messages messagesClient) {
-    this.projectId = projectId;
-    this.messagesClient = messagesClient;
-  }
-
-  /**
-   * Comparator for sorting rows in increasing order based on timestamp.
-   */
-  public static class TimeStampComparator implements Comparator<JobMessage> {
-    @Override
-    public int compare(JobMessage o1, JobMessage o2) {
-      @Nullable Instant t1 = fromCloudTime(o1.getTime());
-      if (t1 == null) {
-        return -1;
-      }
-      @Nullable Instant t2 = fromCloudTime(o2.getTime());
-      if (t2 == null) {
-        return 1;
-      }
-      return t1.compareTo(t2);
-    }
-  }
-
-  /**
-   * Return job messages sorted in ascending order by timestamp.
-   * @param jobId The id of the job to get the messages for.
-   * @param startTimestampMs Return only those messages with a
-   *   timestamp greater than this value.
-   * @return collection of messages
-   * @throws IOException
-   */
-  public ArrayList<JobMessage> getJobMessages(
-      String jobId, long startTimestampMs) throws IOException {
-    // TODO: Allow filtering messages by importance
-    Instant startTimestamp = new Instant(startTimestampMs);
-    ArrayList<JobMessage> allMessages = new ArrayList<>();
-    String pageToken = null;
-    while (true) {
-      Messages.List listRequest = messagesClient.list(projectId, jobId);
-      if (pageToken != null) {
-        listRequest.setPageToken(pageToken);
-      }
-      ListJobMessagesResponse response = listRequest.execute();
-
-      if (response == null || response.getJobMessages() == null) {
-        return allMessages;
-      }
-
-      for (JobMessage m : response.getJobMessages()) {
-        @Nullable Instant timestamp = fromCloudTime(m.getTime());
-        if (timestamp == null) {
-          continue;
-        }
-        if (timestamp.isAfter(startTimestamp)) {
-          allMessages.add(m);
-        }
-      }
-
-      if (response.getNextPageToken() == null) {
-        break;
-      } else {
-        pageToken = response.getNextPageToken();
-      }
-    }
-
-    Collections.sort(allMessages, new TimeStampComparator());
-    return allMessages;
-  }
-
-  public static String getJobMonitoringPageURL(String projectName, String jobId) {
-    try {
-      // Project name is allowed in place of the project id: the user will be redirected to a URL
-      // that has the project name replaced with project id.
-      return String.format(
-          "https://console.developers.google.com/project/%s/dataflow/job/%s",
-          URLEncoder.encode(projectName, "UTF-8"),
-          URLEncoder.encode(jobId, "UTF-8"));
-    } catch (UnsupportedEncodingException e) {
-      // Should never happen.
-      throw new AssertionError("UTF-8 encoding is not supported by the environment", e);
-    }
-  }
-
-  public static String getGcloudCancelCommand(DataflowPipelineOptions options, String jobId) {
-
-    // If using a different Dataflow API than default, prefix command with an API override.
-    String dataflowApiOverridePrefix = "";
-    String apiUrl = options.getDataflowClient().getBaseUrl();
-    if (!apiUrl.equals(Dataflow.DEFAULT_BASE_URL)) {
-      dataflowApiOverridePrefix = String.format("%s=%s ", ENDPOINT_OVERRIDE_ENV_VAR, apiUrl);
-    }
-
-    // Assemble cancel command from optional prefix and project/job parameters.
-    return String.format("%s%s jobs --project=%s cancel %s",
-        dataflowApiOverridePrefix, GCLOUD_DATAFLOW_PREFIX, options.getProject(), jobId);
-  }
-
-  public static State toState(String stateName) {
-    return MoreObjects.firstNonNull(DATAFLOW_STATE_TO_JOB_STATE.get(stateName),
-        State.UNKNOWN);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/util/PackageUtil.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/util/PackageUtil.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/util/PackageUtil.java
deleted file mode 100644
index 0e234a8..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/util/PackageUtil.java
+++ /dev/null
@@ -1,328 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.api.client.util.BackOffUtils;
-import com.google.api.client.util.Sleeper;
-import com.google.api.services.dataflow.model.DataflowPackage;
-import com.google.cloud.hadoop.util.ApiErrorExtractor;
-import com.google.common.hash.Funnels;
-import com.google.common.hash.Hasher;
-import com.google.common.hash.Hashing;
-import com.google.common.io.CountingOutputStream;
-import com.google.common.io.Files;
-
-import com.fasterxml.jackson.core.Base64Variants;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.channels.Channels;
-import java.nio.channels.WritableByteChannel;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Objects;
-
-/** Helper routines for packages. */
-public class PackageUtil {
-  private static final Logger LOG = LoggerFactory.getLogger(PackageUtil.class);
-  /**
-   * A reasonable upper bound on the number of jars required to launch a Dataflow job.
-   */
-  public static final int SANE_CLASSPATH_SIZE = 1000;
-  /**
-   * The initial interval to use between package staging attempts.
-   */
-  private static final long INITIAL_BACKOFF_INTERVAL_MS = 5000L;
-  /**
-   * The maximum number of attempts when staging a file.
-   */
-  private static final int MAX_ATTEMPTS = 5;
-
-  /**
-   * Translates exceptions from API calls.
-   */
-  private static final ApiErrorExtractor ERROR_EXTRACTOR = new ApiErrorExtractor();
-
-  /**
-   * Creates a DataflowPackage containing information about how a classpath element should be
-   * staged, including the staging destination as well as its size and hash.
-   *
-   * @param classpathElement The local path for the classpath element.
-   * @param stagingPath The base location for staged classpath elements.
-   * @param overridePackageName If non-null, use the given value as the package name
-   *                            instead of generating one automatically.
-   * @return The package.
-   */
-  @Deprecated
-  public static DataflowPackage createPackage(File classpathElement,
-      String stagingPath, String overridePackageName) {
-    return createPackageAttributes(classpathElement, stagingPath, overridePackageName)
-        .getDataflowPackage();
-  }
-
-  /**
-   * Compute and cache the attributes of a classpath element that we will need to stage it.
-   *
-   * @param classpathElement the file or directory to be staged.
-   * @param stagingPath The base location for staged classpath elements.
-   * @param overridePackageName If non-null, use the given value as the package name
-   *                            instead of generating one automatically.
-   * @return a {@link PackageAttributes} that containing metadata about the object to be staged.
-   */
-  static PackageAttributes createPackageAttributes(File classpathElement,
-      String stagingPath, String overridePackageName) {
-    try {
-      boolean directory = classpathElement.isDirectory();
-
-      // Compute size and hash in one pass over file or directory.
-      Hasher hasher = Hashing.md5().newHasher();
-      OutputStream hashStream = Funnels.asOutputStream(hasher);
-      CountingOutputStream countingOutputStream = new CountingOutputStream(hashStream);
-
-      if (!directory) {
-        // Files are staged as-is.
-        Files.asByteSource(classpathElement).copyTo(countingOutputStream);
-      } else {
-        // Directories are recursively zipped.
-        ZipFiles.zipDirectory(classpathElement, countingOutputStream);
-      }
-
-      long size = countingOutputStream.getCount();
-      String hash = Base64Variants.MODIFIED_FOR_URL.encode(hasher.hash().asBytes());
-
-      // Create the DataflowPackage with staging name and location.
-      String uniqueName = getUniqueContentName(classpathElement, hash);
-      String resourcePath = IOChannelUtils.resolve(stagingPath, uniqueName);
-      DataflowPackage target = new DataflowPackage();
-      target.setName(overridePackageName != null ? overridePackageName : uniqueName);
-      target.setLocation(resourcePath);
-
-      return new PackageAttributes(size, hash, directory, target);
-    } catch (IOException e) {
-      throw new RuntimeException("Package setup failure for " + classpathElement, e);
-    }
-  }
-
-  /**
-   * Transfers the classpath elements to the staging location.
-   *
-   * @param classpathElements The elements to stage.
-   * @param stagingPath The base location to stage the elements to.
-   * @return A list of cloud workflow packages, each representing a classpath element.
-   */
-  public static List<DataflowPackage> stageClasspathElements(
-      Collection<String> classpathElements, String stagingPath) {
-    return stageClasspathElements(classpathElements, stagingPath, Sleeper.DEFAULT);
-  }
-
-  // Visible for testing.
-  static List<DataflowPackage> stageClasspathElements(
-      Collection<String> classpathElements, String stagingPath,
-      Sleeper retrySleeper) {
-    LOG.info("Uploading {} files from PipelineOptions.filesToStage to staging location to "
-        + "prepare for execution.", classpathElements.size());
-
-    if (classpathElements.size() > SANE_CLASSPATH_SIZE) {
-      LOG.warn("Your classpath contains {} elements, which Google Cloud Dataflow automatically "
-          + "copies to all workers. Having this many entries on your classpath may be indicative "
-          + "of an issue in your pipeline. You may want to consider trimming the classpath to "
-          + "necessary dependencies only, using --filesToStage pipeline option to override "
-          + "what files are being staged, or bundling several dependencies into one.",
-          classpathElements.size());
-    }
-
-    ArrayList<DataflowPackage> packages = new ArrayList<>();
-
-    if (stagingPath == null) {
-      throw new IllegalArgumentException(
-          "Can't stage classpath elements on because no staging location has been provided");
-    }
-
-    int numUploaded = 0;
-    int numCached = 0;
-    for (String classpathElement : classpathElements) {
-      String packageName = null;
-      if (classpathElement.contains("=")) {
-        String[] components = classpathElement.split("=", 2);
-        packageName = components[0];
-        classpathElement = components[1];
-      }
-
-      File file = new File(classpathElement);
-      if (!file.exists()) {
-        LOG.warn("Skipping non-existent classpath element {} that was specified.",
-            classpathElement);
-        continue;
-      }
-
-      PackageAttributes attributes = createPackageAttributes(file, stagingPath, packageName);
-
-      DataflowPackage workflowPackage = attributes.getDataflowPackage();
-      packages.add(workflowPackage);
-      String target = workflowPackage.getLocation();
-
-      // TODO: Should we attempt to detect the Mime type rather than
-      // always using MimeTypes.BINARY?
-      try {
-        try {
-          long remoteLength = IOChannelUtils.getSizeBytes(target);
-          if (remoteLength == attributes.getSize()) {
-            LOG.debug("Skipping classpath element already staged: {} at {}",
-                classpathElement, target);
-            numCached++;
-            continue;
-          }
-        } catch (FileNotFoundException expected) {
-          // If the file doesn't exist, it means we need to upload it.
-        }
-
-        // Upload file, retrying on failure.
-        AttemptBoundedExponentialBackOff backoff = new AttemptBoundedExponentialBackOff(
-            MAX_ATTEMPTS,
-            INITIAL_BACKOFF_INTERVAL_MS);
-        while (true) {
-          try {
-            LOG.debug("Uploading classpath element {} to {}", classpathElement, target);
-            try (WritableByteChannel writer = IOChannelUtils.create(target, MimeTypes.BINARY)) {
-              copyContent(classpathElement, writer);
-            }
-            numUploaded++;
-            break;
-          } catch (IOException e) {
-            if (ERROR_EXTRACTOR.accessDenied(e)) {
-              String errorMessage = String.format(
-                  "Uploaded failed due to permissions error, will NOT retry staging "
-                  + "of classpath %s. Please verify credentials are valid and that you have "
-                  + "write access to %s. Stale credentials can be resolved by executing "
-                  + "'gcloud auth login'.", classpathElement, target);
-              LOG.error(errorMessage);
-              throw new IOException(errorMessage, e);
-            } else if (!backoff.atMaxAttempts()) {
-              LOG.warn("Upload attempt failed, sleeping before retrying staging of classpath: {}",
-                  classpathElement, e);
-              BackOffUtils.next(retrySleeper, backoff);
-            } else {
-              // Rethrow last error, to be included as a cause in the catch below.
-              LOG.error("Upload failed, will NOT retry staging of classpath: {}",
-                  classpathElement, e);
-              throw e;
-            }
-          }
-        }
-      } catch (Exception e) {
-        throw new RuntimeException("Could not stage classpath element: " + classpathElement, e);
-      }
-    }
-
-    LOG.info("Uploading PipelineOptions.filesToStage complete: {} files newly uploaded, "
-        + "{} files cached",
-        numUploaded, numCached);
-
-    return packages;
-  }
-
-  /**
-   * Returns a unique name for a file with a given content hash.
-   *
-   * <p>Directory paths are removed. Example:
-   * <pre>
-   * dir="a/b/c/d", contentHash="f000" => d-f000.jar
-   * file="a/b/c/d.txt", contentHash="f000" => d-f000.txt
-   * file="a/b/c/d", contentHash="f000" => d-f000
-   * </pre>
-   */
-  static String getUniqueContentName(File classpathElement, String contentHash) {
-    String fileName = Files.getNameWithoutExtension(classpathElement.getAbsolutePath());
-    String fileExtension = Files.getFileExtension(classpathElement.getAbsolutePath());
-    if (classpathElement.isDirectory()) {
-      return fileName + "-" + contentHash + ".jar";
-    } else if (fileExtension.isEmpty()) {
-      return fileName + "-" + contentHash;
-    }
-    return fileName + "-" + contentHash + "." + fileExtension;
-  }
-
-  /**
-   * Copies the contents of the classpathElement to the output channel.
-   *
-   * <p>If the classpathElement is a directory, a Zip stream is constructed on the fly,
-   * otherwise the file contents are copied as-is.
-   *
-   * <p>The output channel is not closed.
-   */
-  private static void copyContent(String classpathElement, WritableByteChannel outputChannel)
-      throws IOException {
-    final File classpathElementFile = new File(classpathElement);
-    if (classpathElementFile.isDirectory()) {
-      ZipFiles.zipDirectory(classpathElementFile, Channels.newOutputStream(outputChannel));
-    } else {
-      Files.asByteSource(classpathElementFile).copyTo(Channels.newOutputStream(outputChannel));
-    }
-  }
-  /**
-   * Holds the metadata necessary to stage a file or confirm that a staged file has not changed.
-   */
-  static class PackageAttributes {
-    private final boolean directory;
-    private final long size;
-    private final String hash;
-    private DataflowPackage dataflowPackage;
-
-    public PackageAttributes(long size, String hash, boolean directory,
-        DataflowPackage dataflowPackage) {
-      this.size = size;
-      this.hash = Objects.requireNonNull(hash, "hash");
-      this.directory = directory;
-      this.dataflowPackage = Objects.requireNonNull(dataflowPackage, "dataflowPackage");
-    }
-
-    /**
-     * @return the dataflowPackage
-     */
-    public DataflowPackage getDataflowPackage() {
-      return dataflowPackage;
-    }
-
-    /**
-     * @return the directory
-     */
-    public boolean isDirectory() {
-      return directory;
-    }
-
-    /**
-     * @return the size
-     */
-    public long getSize() {
-      return size;
-    }
-
-    /**
-     * @return the hash
-     */
-    public String getHash() {
-      return hash;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/util/Stager.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/util/Stager.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/util/Stager.java
deleted file mode 100644
index f6c6a71..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/util/Stager.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.api.services.dataflow.model.DataflowPackage;
-
-import java.util.List;
-
-/**
- * Interface for staging files needed for running a Dataflow pipeline.
- */
-public interface Stager {
-  /* Stage files and return a list of packages. */
-  public List<DataflowPackage> stageFiles();
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/BlockingDataflowPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/BlockingDataflowPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/BlockingDataflowPipelineOptions.java
new file mode 100644
index 0000000..6bbafdd
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/BlockingDataflowPipelineOptions.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.options;
+
+import com.google.cloud.dataflow.sdk.runners.BlockingDataflowPipelineRunner;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+import java.io.PrintStream;
+
+/**
+ * Options that are used to configure the {@link BlockingDataflowPipelineRunner}.
+ */
+@Description("Configure options on the BlockingDataflowPipelineRunner.")
+public interface BlockingDataflowPipelineOptions extends DataflowPipelineOptions {
+  /**
+   * Output stream for job status messages.
+   */
+  @Description("Where messages generated during execution of the Dataflow job will be output.")
+  @JsonIgnore
+  @Hidden
+  @Default.InstanceFactory(StandardOutputFactory.class)
+  PrintStream getJobMessageOutput();
+  void setJobMessageOutput(PrintStream value);
+
+  /**
+   * Returns a default of {@link System#out}.
+   */
+  public static class StandardOutputFactory implements DefaultValueFactory<PrintStream> {
+    @Override
+    public PrintStream create(PipelineOptions options) {
+      return System.out;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/CloudDebuggerOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/CloudDebuggerOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/CloudDebuggerOptions.java
new file mode 100644
index 0000000..3f0503e
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/CloudDebuggerOptions.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.options;
+
+import org.apache.beam.sdk.annotations.Experimental;
+
+import com.google.api.services.clouddebugger.v2.model.Debuggee;
+
+import javax.annotation.Nullable;
+
+/**
+ * Options for controlling Cloud Debugger.
+ */
+@Description("[Experimental] Used to configure the Cloud Debugger")
+@Experimental
+@Hidden
+public interface CloudDebuggerOptions {
+
+  /** Whether to enable the Cloud Debugger snapshot agent for the current job. */
+  @Description("Whether to enable the Cloud Debugger snapshot agent for the current job.")
+  boolean getEnableCloudDebugger();
+  void setEnableCloudDebugger(boolean enabled);
+
+  /** The Cloud Debugger debuggee to associate with. This should not be set directly. */
+  @Description("The Cloud Debugger debuggee to associate with. This should not be set directly.")
+  @Hidden
+  @Nullable Debuggee getDebuggee();
+  void setDebuggee(Debuggee debuggee);
+
+  /** The maximum cost (as a ratio of CPU time) allowed for evaluating conditional snapshots. */
+  @Description(
+      "The maximum cost (as a ratio of CPU time) allowed for evaluating conditional snapshots. "
+      + "Should be a double between 0 and 1. "
+      + "Snapshots will be cancelled if evaluating conditions takes more than this ratio of time.")
+  @Default.Double(0.01)
+  double getMaxConditionCost();
+  void setMaxConditionCost(double maxConditionCost);
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowPipelineDebugOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowPipelineDebugOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowPipelineDebugOptions.java
new file mode 100644
index 0000000..1be93eb
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowPipelineDebugOptions.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.options;
+
+import org.apache.beam.sdk.annotations.Experimental;
+
+import com.google.api.services.dataflow.Dataflow;
+import com.google.cloud.dataflow.sdk.util.DataflowPathValidator;
+import com.google.cloud.dataflow.sdk.util.DataflowTransport;
+import com.google.cloud.dataflow.sdk.util.GcsStager;
+import com.google.cloud.dataflow.sdk.util.InstanceBuilder;
+import com.google.cloud.dataflow.sdk.util.PathValidator;
+import com.google.cloud.dataflow.sdk.util.Stager;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Internal. Options used to control execution of the Dataflow SDK for
+ * debugging and testing purposes.
+ */
+@Description("[Internal] Options used to control execution of the Dataflow SDK for "
+    + "debugging and testing purposes.")
+@Hidden
+public interface DataflowPipelineDebugOptions extends PipelineOptions {
+
+  /**
+   * The list of backend experiments to enable.
+   *
+   * <p>Dataflow provides a number of experimental features that can be enabled
+   * with this flag.
+   *
+   * <p>Please sync with the Dataflow team before enabling any experiments.
+   */
+  @Description("[Experimental] Dataflow provides a number of experimental features that can "
+      + "be enabled with this flag. Please sync with the Dataflow team before enabling any "
+      + "experiments.")
+  @Experimental
+  List<String> getExperiments();
+  void setExperiments(List<String> value);
+
+  /**
+   * The root URL for the Dataflow API. {@code dataflowEndpoint} can override this value
+   * if it contains an absolute URL, otherwise {@code apiRootUrl} will be combined with
+   * {@code dataflowEndpoint} to generate the full URL to communicate with the Dataflow API.
+   */
+  @Description("The root URL for the Dataflow API. dataflowEndpoint can override this "
+      + "value if it contains an absolute URL, otherwise apiRootUrl will be combined with "
+      + "dataflowEndpoint to generate the full URL to communicate with the Dataflow API.")
+  @Default.String(Dataflow.DEFAULT_ROOT_URL)
+  String getApiRootUrl();
+  void setApiRootUrl(String value);
+
+  /**
+   * Dataflow endpoint to use.
+   *
+   * <p>Defaults to the current version of the Google Cloud Dataflow
+   * API, at the time the current SDK version was released.
+   *
+   * <p>If the string contains "://", then this is treated as a URL,
+   * otherwise {@link #getApiRootUrl()} is used as the root
+   * URL.
+   */
+  @Description("The URL for the Dataflow API. If the string contains \"://\", this"
+      + " will be treated as the entire URL, otherwise will be treated relative to apiRootUrl.")
+  @Default.String(Dataflow.DEFAULT_SERVICE_PATH)
+  String getDataflowEndpoint();
+  void setDataflowEndpoint(String value);
+
+  /**
+   * The path to write the translated Dataflow job specification out to
+   * at job submission time. The Dataflow job specification will be represented in JSON
+   * format.
+   */
+  @Description("The path to write the translated Dataflow job specification out to "
+      + "at job submission time. The Dataflow job specification will be represented in JSON "
+      + "format.")
+  String getDataflowJobFile();
+  void setDataflowJobFile(String value);
+
+  /**
+   * The class of the validator that should be created and used to validate paths.
+   * If pathValidator has not been set explicitly, an instance of this class will be
+   * constructed and used as the path validator.
+   */
+  @Description("The class of the validator that should be created and used to validate paths. "
+      + "If pathValidator has not been set explicitly, an instance of this class will be "
+      + "constructed and used as the path validator.")
+  @Default.Class(DataflowPathValidator.class)
+  Class<? extends PathValidator> getPathValidatorClass();
+  void setPathValidatorClass(Class<? extends PathValidator> validatorClass);
+
+  /**
+   * The path validator instance that should be used to validate paths.
+   * If no path validator has been set explicitly, the default is to use the instance factory that
+   * constructs a path validator based upon the currently set pathValidatorClass.
+   */
+  @JsonIgnore
+  @Description("The path validator instance that should be used to validate paths. "
+      + "If no path validator has been set explicitly, the default is to use the instance factory "
+      + "that constructs a path validator based upon the currently set pathValidatorClass.")
+  @Default.InstanceFactory(PathValidatorFactory.class)
+  PathValidator getPathValidator();
+  void setPathValidator(PathValidator validator);
+
+  /**
+   * The class responsible for staging resources to be accessible by workers
+   * during job execution. If stager has not been set explicitly, an instance of this class
+   * will be created and used as the resource stager.
+   */
+  @Description("The class of the stager that should be created and used to stage resources. "
+      + "If stager has not been set explicitly, an instance of the this class will be created "
+      + "and used as the resource stager.")
+  @Default.Class(GcsStager.class)
+  Class<? extends Stager> getStagerClass();
+  void setStagerClass(Class<? extends Stager> stagerClass);
+
+  /**
+   * The resource stager instance that should be used to stage resources.
+   * If no stager has been set explicitly, the default is to use the instance factory
+   * that constructs a resource stager based upon the currently set stagerClass.
+   */
+  @JsonIgnore
+  @Description("The resource stager instance that should be used to stage resources. "
+      + "If no stager has been set explicitly, the default is to use the instance factory "
+      + "that constructs a resource stager based upon the currently set stagerClass.")
+  @Default.InstanceFactory(StagerFactory.class)
+  Stager getStager();
+  void setStager(Stager stager);
+
+  /**
+   * An instance of the Dataflow client. Defaults to creating a Dataflow client
+   * using the current set of options.
+   */
+  @JsonIgnore
+  @Description("An instance of the Dataflow client. Defaults to creating a Dataflow client "
+      + "using the current set of options.")
+  @Default.InstanceFactory(DataflowClientFactory.class)
+  Dataflow getDataflowClient();
+  void setDataflowClient(Dataflow value);
+
+  /** Returns the default Dataflow client built from the passed in PipelineOptions. */
+  public static class DataflowClientFactory implements DefaultValueFactory<Dataflow> {
+    @Override
+    public Dataflow create(PipelineOptions options) {
+        return DataflowTransport.newDataflowClient(
+            options.as(DataflowPipelineOptions.class)).build();
+    }
+  }
+
+  /**
+   * Whether to update the currently running pipeline with the same name as this one.
+   *
+   * @deprecated This property is replaced by {@link DataflowPipelineOptions#getUpdate()}
+   */
+  @Deprecated
+  @Description("If set, replace the existing pipeline with the name specified by --jobName with "
+      + "this pipeline, preserving state.")
+  boolean getUpdate();
+  @Deprecated
+  void setUpdate(boolean value);
+
+  /**
+   * Mapping of old PTranform names to new ones, specified as JSON
+   * <code>{"oldName":"newName",...}</code>. To mark a transform as deleted, make newName the
+   * empty string.
+   */
+  @JsonIgnore
+  @Description(
+      "Mapping of old PTranform names to new ones, specified as JSON "
+      + "{\"oldName\":\"newName\",...}. To mark a transform as deleted, make newName the empty "
+      + "string.")
+  Map<String, String> getTransformNameMapping();
+  void setTransformNameMapping(Map<String, String> value);
+
+  /**
+   * Custom windmill_main binary to use with the streaming runner.
+   */
+  @Description("Custom windmill_main binary to use with the streaming runner")
+  String getOverrideWindmillBinary();
+  void setOverrideWindmillBinary(String value);
+
+  /**
+   * Number of threads to use on the Dataflow worker harness. If left unspecified,
+   * the Dataflow service will compute an appropriate number of threads to use.
+   */
+  @Description("Number of threads to use on the Dataflow worker harness. If left unspecified, "
+      + "the Dataflow service will compute an appropriate number of threads to use.")
+  int getNumberOfWorkerHarnessThreads();
+  void setNumberOfWorkerHarnessThreads(int value);
+
+  /**
+   * If {@literal true}, save a heap dump before killing a thread or process which is GC
+   * thrashing or out of memory. The location of the heap file will either be echoed back
+   * to the user, or the user will be given the opportunity to download the heap file.
+   *
+   * <p>
+   * CAUTION: Heap dumps can of comparable size to the default boot disk. Consider increasing
+   * the boot disk size before setting this flag to true.
+   */
+  @Description("If {@literal true}, save a heap dump before killing a thread or process "
+      + "which is GC thrashing or out of memory.")
+  boolean getDumpHeapOnOOM();
+  void setDumpHeapOnOOM(boolean dumpHeapBeforeExit);
+
+  /**
+   * Creates a {@link PathValidator} object using the class specified in
+   * {@link #getPathValidatorClass()}.
+   */
+  public static class PathValidatorFactory implements DefaultValueFactory<PathValidator> {
+      @Override
+      public PathValidator create(PipelineOptions options) {
+      DataflowPipelineDebugOptions debugOptions = options.as(DataflowPipelineDebugOptions.class);
+      return InstanceBuilder.ofType(PathValidator.class)
+          .fromClass(debugOptions.getPathValidatorClass())
+          .fromFactoryMethod("fromOptions")
+          .withArg(PipelineOptions.class, options)
+          .build();
+    }
+  }
+
+  /**
+   * Creates a {@link Stager} object using the class specified in
+   * {@link #getStagerClass()}.
+   */
+  public static class StagerFactory implements DefaultValueFactory<Stager> {
+      @Override
+      public Stager create(PipelineOptions options) {
+      DataflowPipelineDebugOptions debugOptions = options.as(DataflowPipelineDebugOptions.class);
+      return InstanceBuilder.ofType(Stager.class)
+          .fromClass(debugOptions.getStagerClass())
+          .fromFactoryMethod("fromOptions")
+          .withArg(PipelineOptions.class, options)
+          .build();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowPipelineOptions.java
new file mode 100644
index 0000000..dbfafd1
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowPipelineOptions.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.options;
+
+import com.google.cloud.dataflow.sdk.runners.DataflowPipeline;
+import com.google.common.base.MoreObjects;
+
+import org.joda.time.DateTimeUtils;
+import org.joda.time.DateTimeZone;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+/**
+ * Options that can be used to configure the {@link DataflowPipeline}.
+ */
+@Description("Options that configure the Dataflow pipeline.")
+public interface DataflowPipelineOptions extends
+    PipelineOptions, GcpOptions, ApplicationNameOptions, DataflowPipelineDebugOptions,
+    DataflowPipelineWorkerPoolOptions, BigQueryOptions,
+    GcsOptions, StreamingOptions, CloudDebuggerOptions, DataflowWorkerLoggingOptions,
+    DataflowProfilingOptions, PubsubOptions {
+
+  @Description("Project id. Required when running a Dataflow in the cloud. "
+      + "See https://cloud.google.com/storage/docs/projects for further details.")
+  @Override
+  @Validation.Required
+  @Default.InstanceFactory(DefaultProjectFactory.class)
+  String getProject();
+  @Override
+  void setProject(String value);
+
+  /**
+   * GCS path for staging local files, e.g. gs://bucket/object
+   *
+   * <p>Must be a valid Cloud Storage URL, beginning with the prefix "gs://"
+   *
+   * <p>At least one of {@link PipelineOptions#getTempLocation()} or {@link #getStagingLocation()}
+   * must be set. If {@link #getStagingLocation()} is not set, then the Dataflow
+   * pipeline defaults to using {@link PipelineOptions#getTempLocation()}.
+   */
+  @Description("GCS path for staging local files, e.g. \"gs://bucket/object\". "
+      + "Must be a valid Cloud Storage URL, beginning with the prefix \"gs://\". "
+      + "At least one of stagingLocation or tempLocation must be set. If stagingLocation is unset, "
+      + "defaults to using tempLocation.")
+  String getStagingLocation();
+  void setStagingLocation(String value);
+
+  /**
+   * The Dataflow job name is used as an idempotence key within the Dataflow service.
+   * If there is an existing job that is currently active, another active job with the same
+   * name will not be able to be created. Defaults to using the ApplicationName-UserName-Date.
+   */
+  @Description("The Dataflow job name is used as an idempotence key within the Dataflow service. "
+      + "If there is an existing job that is currently active, another active job with the same "
+      + "name will not be able to be created. Defaults to using the ApplicationName-UserName-Date.")
+  @Default.InstanceFactory(JobNameFactory.class)
+  String getJobName();
+  void setJobName(String value);
+
+  /**
+   * Whether to update the currently running pipeline with the same name as this one.
+   */
+  @Override
+  @SuppressWarnings("deprecation") // base class member deprecated in favor of this one.
+  @Description(
+      "If set, replace the existing pipeline with the name specified by --jobName with "
+          + "this pipeline, preserving state.")
+  boolean getUpdate();
+  @Override
+  @SuppressWarnings("deprecation") // base class member deprecated in favor of this one.
+  void setUpdate(boolean value);
+
+  /**
+   * Returns a normalized job name constructed from {@link ApplicationNameOptions#getAppName()}, the
+   * local system user name (if available), and the current time. The normalization makes sure that
+   * the job name matches the required pattern of [a-z]([-a-z0-9]*[a-z0-9])? and length limit of 40
+   * characters.
+   *
+   * <p>This job name factory is only able to generate one unique name per second per application
+   * and user combination.
+   */
+  public static class JobNameFactory implements DefaultValueFactory<String> {
+    private static final DateTimeFormatter FORMATTER =
+        DateTimeFormat.forPattern("MMddHHmmss").withZone(DateTimeZone.UTC);
+
+    @Override
+    public String create(PipelineOptions options) {
+      String appName = options.as(ApplicationNameOptions.class).getAppName();
+      String normalizedAppName = appName == null || appName.length() == 0 ? "dataflow"
+          : appName.toLowerCase()
+                   .replaceAll("[^a-z0-9]", "0")
+                   .replaceAll("^[^a-z]", "a");
+      String userName = MoreObjects.firstNonNull(System.getProperty("user.name"), "");
+      String normalizedUserName = userName.toLowerCase()
+                                          .replaceAll("[^a-z0-9]", "0");
+      String datePart = FORMATTER.print(DateTimeUtils.currentTimeMillis());
+      return normalizedAppName + "-" + normalizedUserName + "-" + datePart;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowPipelineWorkerPoolOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowPipelineWorkerPoolOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowPipelineWorkerPoolOptions.java
new file mode 100644
index 0000000..44a9b00
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowPipelineWorkerPoolOptions.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.options;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+import java.util.List;
+
+/**
+ * Options that are used to configure the Dataflow pipeline worker pool.
+ */
+@Description("Options that are used to configure the Dataflow pipeline worker pool.")
+public interface DataflowPipelineWorkerPoolOptions extends PipelineOptions {
+  /**
+   * Number of workers to use when executing the Dataflow job. Note that selection of an autoscaling
+   * algorithm other then {@code NONE} will affect the size of the worker pool. If left unspecified,
+   * the Dataflow service will determine the number of workers.
+   */
+  @Description("Number of workers to use when executing the Dataflow job. Note that "
+      + "selection of an autoscaling algorithm other then \"NONE\" will affect the "
+      + "size of the worker pool. If left unspecified, the Dataflow service will "
+      + "determine the number of workers.")
+  int getNumWorkers();
+  void setNumWorkers(int value);
+
+  /**
+   * Type of autoscaling algorithm to use.
+   */
+  @Experimental(Experimental.Kind.AUTOSCALING)
+  public enum AutoscalingAlgorithmType {
+    /** Use numWorkers machines. Do not autoscale the worker pool. */
+    NONE("AUTOSCALING_ALGORITHM_NONE"),
+
+    @Deprecated
+    BASIC("AUTOSCALING_ALGORITHM_BASIC"),
+
+    /** Autoscale the workerpool based on throughput (up to maxNumWorkers). */
+    THROUGHPUT_BASED("AUTOSCALING_ALGORITHM_BASIC");
+
+    private final String algorithm;
+
+    private AutoscalingAlgorithmType(String algorithm) {
+      this.algorithm = algorithm;
+    }
+
+    /** Returns the string representation of this type. */
+    public String getAlgorithm() {
+      return this.algorithm;
+    }
+  }
+
+  /**
+   * [Experimental] The autoscaling algorithm to use for the workerpool.
+   *
+   * <ul>
+   *   <li>NONE: does not change the size of the worker pool.</li>
+   *   <li>BASIC: autoscale the worker pool size up to maxNumWorkers until the job completes.</li>
+   *   <li>THROUGHPUT_BASED: autoscale the workerpool based on throughput (up to maxNumWorkers).
+   *   </li>
+   * </ul>
+   */
+  @Description("[Experimental] The autoscaling algorithm to use for the workerpool. "
+      + "NONE: does not change the size of the worker pool. "
+      + "BASIC (deprecated): autoscale the worker pool size up to maxNumWorkers until the job "
+      + "completes. "
+      + "THROUGHPUT_BASED: autoscale the workerpool based on throughput (up to maxNumWorkers).")
+  @Experimental(Experimental.Kind.AUTOSCALING)
+  AutoscalingAlgorithmType getAutoscalingAlgorithm();
+  void setAutoscalingAlgorithm(AutoscalingAlgorithmType value);
+
+  /**
+   * The maximum number of workers to use for the workerpool. This options limits the size of the
+   * workerpool for the lifetime of the job, including
+   * <a href="https://cloud.google.com/dataflow/pipelines/updating-a-pipeline">pipeline updates</a>.
+   * If left unspecified, the Dataflow service will compute a ceiling.
+   */
+  @Description("The maximum number of workers to use for the workerpool. This options limits the "
+      + "size of the workerpool for the lifetime of the job, including pipeline updates. "
+      + "If left unspecified, the Dataflow service will compute a ceiling.")
+  int getMaxNumWorkers();
+  void setMaxNumWorkers(int value);
+
+  /**
+   * Remote worker disk size, in gigabytes, or 0 to use the default size.
+   */
+  @Description("Remote worker disk size, in gigabytes, or 0 to use the default size.")
+  int getDiskSizeGb();
+  void setDiskSizeGb(int value);
+
+  /**
+   * Docker container image that executes Dataflow worker harness, residing in Google Container
+   * Registry.
+   */
+  @Default.InstanceFactory(WorkerHarnessContainerImageFactory.class)
+  @Description("Docker container image that executes Dataflow worker harness, residing in Google "
+      + " Container Registry.")
+  @Hidden
+  String getWorkerHarnessContainerImage();
+  void setWorkerHarnessContainerImage(String value);
+
+  /**
+   * Returns the default Docker container image that executes Dataflow worker harness, residing in
+   * Google Container Registry.
+   */
+  public static class WorkerHarnessContainerImageFactory
+      implements DefaultValueFactory<String> {
+    @Override
+    public String create(PipelineOptions options) {
+      DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
+      if (dataflowOptions.isStreaming()) {
+        return DataflowPipelineRunner.STREAMING_WORKER_HARNESS_CONTAINER_IMAGE;
+      } else {
+        return DataflowPipelineRunner.BATCH_WORKER_HARNESS_CONTAINER_IMAGE;
+      }
+    }
+  }
+
+  /**
+   * GCE <a href="https://cloud.google.com/compute/docs/networking">network</a> for launching
+   * workers.
+   *
+   * <p>Default is up to the Dataflow service.
+   */
+  @Description("GCE network for launching workers. For more information, see the reference "
+      + "documentation https://cloud.google.com/compute/docs/networking. "
+      + "Default is up to the Dataflow service.")
+  String getNetwork();
+  void setNetwork(String value);
+
+  /**
+   * GCE <a href="https://cloud.google.com/compute/docs/networking">subnetwork</a> for launching
+   * workers.
+   *
+   * <p>Default is up to the Dataflow service. Expected format is
+   * regions/REGION/subnetworks/SUBNETWORK.
+   *
+   * <p>You may also need to specify network option.
+   */
+  @Description("GCE subnetwork for launching workers. For more information, see the reference "
+      + "documentation https://cloud.google.com/compute/docs/networking. "
+      + "Default is up to the Dataflow service.")
+  String getSubnetwork();
+  void setSubnetwork(String value);
+
+  /**
+   * GCE <a href="https://developers.google.com/compute/docs/zones"
+   * >availability zone</a> for launching workers.
+   *
+   * <p>Default is up to the Dataflow service.
+   */
+  @Description("GCE availability zone for launching workers. See "
+      + "https://developers.google.com/compute/docs/zones for a list of valid options. "
+      + "Default is up to the Dataflow service.")
+  String getZone();
+  void setZone(String value);
+
+  /**
+   * Machine type to create Dataflow worker VMs as.
+   *
+   * <p>See <a href="https://cloud.google.com/compute/docs/machine-types">GCE machine types</a>
+   * for a list of valid options.
+   *
+   * <p>If unset, the Dataflow service will choose a reasonable default.
+   */
+  @Description("Machine type to create Dataflow worker VMs as. See "
+      + "https://cloud.google.com/compute/docs/machine-types for a list of valid options. "
+      + "If unset, the Dataflow service will choose a reasonable default.")
+  String getWorkerMachineType();
+  void setWorkerMachineType(String value);
+
+  /**
+   * The policy for tearing down the workers spun up by the service.
+   */
+  public enum TeardownPolicy {
+    /**
+     * All VMs created for a Dataflow job are deleted when the job finishes, regardless of whether
+     * it fails or succeeds.
+     */
+    TEARDOWN_ALWAYS("TEARDOWN_ALWAYS"),
+    /**
+     * All VMs created for a Dataflow job are left running when the job finishes, regardless of
+     * whether it fails or succeeds.
+     */
+    TEARDOWN_NEVER("TEARDOWN_NEVER"),
+    /**
+     * All VMs created for a Dataflow job are deleted when the job succeeds, but are left running
+     * when it fails. (This is typically used for debugging failing jobs by SSHing into the
+     * workers.)
+     */
+    TEARDOWN_ON_SUCCESS("TEARDOWN_ON_SUCCESS");
+
+    private final String teardownPolicy;
+
+    private TeardownPolicy(String teardownPolicy) {
+      this.teardownPolicy = teardownPolicy;
+    }
+
+    public String getTeardownPolicyName() {
+      return this.teardownPolicy;
+    }
+  }
+
+  /**
+   * The teardown policy for the VMs.
+   *
+   * <p>If unset, the Dataflow service will choose a reasonable default.
+   */
+  @Description("The teardown policy for the VMs. If unset, the Dataflow service will "
+      + "choose a reasonable default.")
+  TeardownPolicy getTeardownPolicy();
+  void setTeardownPolicy(TeardownPolicy value);
+
+  /**
+   * List of local files to make available to workers.
+   *
+   * <p>Files are placed on the worker's classpath.
+   *
+   * <p>The default value is the list of jars from the main program's classpath.
+   */
+  @Description("Files to stage on GCS and make available to workers. "
+      + "Files are placed on the worker's classpath. "
+      + "The default value is all files from the classpath.")
+  @JsonIgnore
+  List<String> getFilesToStage();
+  void setFilesToStage(List<String> value);
+
+  /**
+   * Specifies what type of persistent disk should be used. The value should be a full or partial
+   * URL of a disk type resource, e.g., zones/us-central1-f/disks/pd-standard. For
+   * more information, see the
+   * <a href="https://cloud.google.com/compute/docs/reference/latest/diskTypes">API reference
+   * documentation for DiskTypes</a>.
+   */
+  @Description("Specifies what type of persistent disk should be used. The value should be a full "
+      + "or partial URL of a disk type resource, e.g., zones/us-central1-f/disks/pd-standard. For "
+      + "more information, see the API reference documentation for DiskTypes: "
+      + "https://cloud.google.com/compute/docs/reference/latest/diskTypes")
+  String getWorkerDiskType();
+  void setWorkerDiskType(String value);
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowProfilingOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowProfilingOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowProfilingOptions.java
new file mode 100644
index 0000000..3cd7b03
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowProfilingOptions.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.options;
+
+import org.apache.beam.sdk.annotations.Experimental;
+
+import java.util.HashMap;
+
+/**
+ * Options for controlling profiling of pipeline execution.
+ */
+@Description("[Experimental] Used to configure profiling of the Dataflow pipeline")
+@Experimental
+@Hidden
+public interface DataflowProfilingOptions {
+
+  @Description("Whether to periodically dump profiling information to local disk.\n"
+      + "WARNING: Enabling this option may fill local disk with profiling information.")
+  boolean getEnableProfilingAgent();
+  void setEnableProfilingAgent(boolean enabled);
+
+  @Description(
+      "[INTERNAL] Additional configuration for the profiling agent. Not typically necessary.")
+  @Hidden
+  DataflowProfilingAgentConfiguration getProfilingAgentConfiguration();
+  void setProfilingAgentConfiguration(DataflowProfilingAgentConfiguration configuration);
+
+  /**
+   * Configuration the for profiling agent.
+   */
+  public static class DataflowProfilingAgentConfiguration extends HashMap<String, Object> {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowWorkerHarnessOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowWorkerHarnessOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowWorkerHarnessOptions.java
new file mode 100644
index 0000000..7705b66
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowWorkerHarnessOptions.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.options;
+
+/**
+ * Options that are used exclusively within the Dataflow worker harness.
+ * These options have no effect at pipeline creation time.
+ */
+@Description("[Internal] Options that are used exclusively within the Dataflow worker harness. "
+    + "These options have no effect at pipeline creation time.")
+@Hidden
+public interface DataflowWorkerHarnessOptions extends DataflowPipelineOptions {
+  /**
+   * The identity of the worker running this pipeline.
+   */
+  @Description("The identity of the worker running this pipeline.")
+  String getWorkerId();
+  void setWorkerId(String value);
+
+  /**
+   * The identity of the Dataflow job.
+   */
+  @Description("The identity of the Dataflow job.")
+  String getJobId();
+  void setJobId(String value);
+
+  /**
+   * The size of the worker's in-memory cache, in megabytes.
+   *
+   * <p>Currently, this cache is used for storing read values of side inputs.
+   */
+  @Description("The size of the worker's in-memory cache, in megabytes.")
+  @Default.Integer(100)
+  Integer getWorkerCacheMb();
+  void setWorkerCacheMb(Integer value);
+}