You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/04/12 01:42:43 UTC

[09/18] incubator-beam git commit: [BEAM-151] Move a large portion of the Dataflow runner to separate maven module

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/runners/google-cloud-dataflow-java/src/test/java/com/google/cloud/dataflow/sdk/util/PackageUtilTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/com/google/cloud/dataflow/sdk/util/PackageUtilTest.java b/runners/google-cloud-dataflow-java/src/test/java/com/google/cloud/dataflow/sdk/util/PackageUtilTest.java
new file mode 100644
index 0000000..d2c08c8
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/test/java/com/google/cloud/dataflow/sdk/util/PackageUtilTest.java
@@ -0,0 +1,483 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.util;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import com.google.api.client.googleapis.json.GoogleJsonError.ErrorInfo;
+import com.google.api.client.googleapis.json.GoogleJsonResponseException;
+import com.google.api.client.http.HttpRequest;
+import com.google.api.client.http.HttpResponse;
+import com.google.api.client.http.HttpStatusCodes;
+import com.google.api.client.http.HttpTransport;
+import com.google.api.client.http.LowLevelHttpRequest;
+import com.google.api.client.json.GenericJson;
+import com.google.api.client.json.Json;
+import com.google.api.client.json.JsonFactory;
+import com.google.api.client.json.jackson2.JacksonFactory;
+import com.google.api.client.testing.http.HttpTesting;
+import com.google.api.client.testing.http.MockHttpTransport;
+import com.google.api.client.testing.http.MockLowLevelHttpRequest;
+import com.google.api.client.testing.http.MockLowLevelHttpResponse;
+import com.google.api.services.dataflow.model.DataflowPackage;
+import com.google.cloud.dataflow.sdk.options.GcsOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.testing.ExpectedLogs;
+import com.google.cloud.dataflow.sdk.testing.FastNanoClockAndSleeper;
+import com.google.cloud.dataflow.sdk.util.PackageUtil.PackageAttributes;
+import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+import com.google.common.io.LineReader;
+
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.hamcrest.Matchers;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.channels.Channels;
+import java.nio.channels.Pipe;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.regex.Pattern;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
+
+/** Tests for PackageUtil. */
+@RunWith(JUnit4.class)
+public class PackageUtilTest {
+  @Rule public ExpectedLogs logged = ExpectedLogs.none(PackageUtil.class);
+  @Rule
+  public TemporaryFolder tmpFolder = new TemporaryFolder();
+
+  @Rule
+  public FastNanoClockAndSleeper fastNanoClockAndSleeper = new FastNanoClockAndSleeper();
+
+  @Mock
+  GcsUtil mockGcsUtil;
+
+  // 128 bits, base64 encoded is 171 bits, rounds to 22 bytes
+  private static final String HASH_PATTERN = "[a-zA-Z0-9+-]{22}";
+
+  // Hamcrest matcher to assert a string matches a pattern
+  private static class RegexMatcher extends BaseMatcher<String> {
+    private final Pattern pattern;
+
+    public RegexMatcher(String regex) {
+      this.pattern = Pattern.compile(regex);
+    }
+
+    @Override
+    public boolean matches(Object o) {
+      if (!(o instanceof String)) {
+        return false;
+      }
+      return pattern.matcher((String) o).matches();
+    }
+
+    @Override
+    public void describeTo(Description description) {
+      description.appendText(String.format("matches regular expression %s", pattern));
+    }
+
+    public static RegexMatcher matches(String regex) {
+      return new RegexMatcher(regex);
+    }
+  }
+
+  @Before
+  public void setUp() {
+    MockitoAnnotations.initMocks(this);
+
+    GcsOptions pipelineOptions = PipelineOptionsFactory.as(GcsOptions.class);
+    pipelineOptions.setGcsUtil(mockGcsUtil);
+
+    IOChannelUtils.registerStandardIOFactories(pipelineOptions);
+  }
+
+  private File makeFileWithContents(String name, String contents) throws Exception {
+    File tmpFile = tmpFolder.newFile(name);
+    Files.write(contents, tmpFile, StandardCharsets.UTF_8);
+    tmpFile.setLastModified(0);  // required for determinism with directories
+    return tmpFile;
+  }
+
+  static final String STAGING_PATH = GcsPath.fromComponents("somebucket", "base/path").toString();
+  private static PackageAttributes makePackageAttributes(File file, String overridePackageName) {
+    return PackageUtil.createPackageAttributes(file, STAGING_PATH, overridePackageName);
+  }
+
+  @Test
+  public void testFileWithExtensionPackageNamingAndSize() throws Exception {
+    String contents = "This is a test!";
+    File tmpFile = makeFileWithContents("file.txt", contents);
+    PackageAttributes attr = makePackageAttributes(tmpFile, null);
+    DataflowPackage target = attr.getDataflowPackage();
+
+    assertThat(target.getName(), RegexMatcher.matches("file-" + HASH_PATTERN + ".txt"));
+    assertThat(target.getLocation(), equalTo(STAGING_PATH + '/' + target.getName()));
+    assertThat(attr.getSize(), equalTo((long) contents.length()));
+  }
+
+  @Test
+  public void testPackageNamingWithFileNoExtension() throws Exception {
+    File tmpFile = makeFileWithContents("file", "This is a test!");
+    DataflowPackage target = makePackageAttributes(tmpFile, null).getDataflowPackage();
+
+    assertThat(target.getName(), RegexMatcher.matches("file-" + HASH_PATTERN));
+    assertThat(target.getLocation(), equalTo(STAGING_PATH + '/' + target.getName()));
+  }
+
+  @Test
+  public void testPackageNamingWithDirectory() throws Exception {
+    File tmpDirectory = tmpFolder.newFolder("folder");
+    DataflowPackage target = makePackageAttributes(tmpDirectory, null).getDataflowPackage();
+
+    assertThat(target.getName(), RegexMatcher.matches("folder-" + HASH_PATTERN + ".jar"));
+    assertThat(target.getLocation(), equalTo(STAGING_PATH + '/' + target.getName()));
+  }
+
+  @Test
+  public void testPackageNamingWithFilesHavingSameContentsAndSameNames() throws Exception {
+    File tmpDirectory1 = tmpFolder.newFolder("folder1", "folderA");
+    makeFileWithContents("folder1/folderA/sameName", "This is a test!");
+    DataflowPackage target1 = makePackageAttributes(tmpDirectory1, null).getDataflowPackage();
+
+    File tmpDirectory2 = tmpFolder.newFolder("folder2", "folderA");
+    makeFileWithContents("folder2/folderA/sameName", "This is a test!");
+    DataflowPackage target2 = makePackageAttributes(tmpDirectory2, null).getDataflowPackage();
+
+    assertEquals(target1.getName(), target2.getName());
+    assertEquals(target1.getLocation(), target2.getLocation());
+  }
+
+  @Test
+  public void testPackageNamingWithFilesHavingSameContentsButDifferentNames() throws Exception {
+    File tmpDirectory1 = tmpFolder.newFolder("folder1", "folderA");
+    makeFileWithContents("folder1/folderA/uniqueName1", "This is a test!");
+    DataflowPackage target1 = makePackageAttributes(tmpDirectory1, null).getDataflowPackage();
+
+    File tmpDirectory2 = tmpFolder.newFolder("folder2", "folderA");
+    makeFileWithContents("folder2/folderA/uniqueName2", "This is a test!");
+    DataflowPackage target2 = makePackageAttributes(tmpDirectory2, null).getDataflowPackage();
+
+    assertNotEquals(target1.getName(), target2.getName());
+    assertNotEquals(target1.getLocation(), target2.getLocation());
+  }
+
+  @Test
+  public void testPackageNamingWithDirectoriesHavingSameContentsButDifferentNames()
+      throws Exception {
+    File tmpDirectory1 = tmpFolder.newFolder("folder1", "folderA");
+    tmpFolder.newFolder("folder1", "folderA", "uniqueName1");
+    DataflowPackage target1 = makePackageAttributes(tmpDirectory1, null).getDataflowPackage();
+
+    File tmpDirectory2 = tmpFolder.newFolder("folder2", "folderA");
+    tmpFolder.newFolder("folder2", "folderA", "uniqueName2");
+    DataflowPackage target2 = makePackageAttributes(tmpDirectory2, null).getDataflowPackage();
+
+    assertNotEquals(target1.getName(), target2.getName());
+    assertNotEquals(target1.getLocation(), target2.getLocation());
+  }
+
+  @Test
+  public void testPackageUploadWithLargeClasspathLogsWarning() throws Exception {
+    File tmpFile = makeFileWithContents("file.txt", "This is a test!");
+    // all files will be present and cached so no upload needed.
+    when(mockGcsUtil.fileSize(any(GcsPath.class))).thenReturn(tmpFile.length());
+
+    List<String> classpathElements = Lists.newLinkedList();
+    for (int i = 0; i < 1005; ++i) {
+      String eltName = "element" + i;
+      classpathElements.add(eltName + '=' + tmpFile.getAbsolutePath());
+    }
+
+    PackageUtil.stageClasspathElements(classpathElements, STAGING_PATH);
+
+    logged.verifyWarn("Your classpath contains 1005 elements, which Google Cloud Dataflow");
+  }
+
+  @Test
+  public void testPackageUploadWithFileSucceeds() throws Exception {
+    Pipe pipe = Pipe.open();
+    String contents = "This is a test!";
+    File tmpFile = makeFileWithContents("file.txt", contents);
+    when(mockGcsUtil.fileSize(any(GcsPath.class)))
+        .thenThrow(new FileNotFoundException("some/path"));
+    when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink());
+
+    List<DataflowPackage> targets = PackageUtil.stageClasspathElements(
+        ImmutableList.of(tmpFile.getAbsolutePath()), STAGING_PATH);
+    DataflowPackage target = Iterables.getOnlyElement(targets);
+
+    verify(mockGcsUtil).fileSize(any(GcsPath.class));
+    verify(mockGcsUtil).create(any(GcsPath.class), anyString());
+    verifyNoMoreInteractions(mockGcsUtil);
+
+    assertThat(target.getName(), RegexMatcher.matches("file-" + HASH_PATTERN + ".txt"));
+    assertThat(target.getLocation(), equalTo(STAGING_PATH + '/' + target.getName()));
+    assertThat(new LineReader(Channels.newReader(pipe.source(), "UTF-8")).readLine(),
+        equalTo(contents));
+  }
+
+  @Test
+  public void testPackageUploadWithDirectorySucceeds() throws Exception {
+    Pipe pipe = Pipe.open();
+    File tmpDirectory = tmpFolder.newFolder("folder");
+    tmpFolder.newFolder("folder", "empty_directory");
+    tmpFolder.newFolder("folder", "directory");
+    makeFileWithContents("folder/file.txt", "This is a test!");
+    makeFileWithContents("folder/directory/file.txt", "This is also a test!");
+
+    when(mockGcsUtil.fileSize(any(GcsPath.class)))
+        .thenThrow(new FileNotFoundException("some/path"));
+    when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink());
+
+    PackageUtil.stageClasspathElements(
+        ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH);
+
+    verify(mockGcsUtil).fileSize(any(GcsPath.class));
+    verify(mockGcsUtil).create(any(GcsPath.class), anyString());
+    verifyNoMoreInteractions(mockGcsUtil);
+
+    ZipInputStream inputStream = new ZipInputStream(Channels.newInputStream(pipe.source()));
+    List<String> zipEntryNames = new ArrayList<>();
+    for (ZipEntry entry = inputStream.getNextEntry(); entry != null;
+        entry = inputStream.getNextEntry()) {
+      zipEntryNames.add(entry.getName());
+    }
+
+    assertThat(zipEntryNames,
+        containsInAnyOrder("directory/file.txt", "empty_directory/", "file.txt"));
+  }
+
+  @Test
+  public void testPackageUploadWithEmptyDirectorySucceeds() throws Exception {
+    Pipe pipe = Pipe.open();
+    File tmpDirectory = tmpFolder.newFolder("folder");
+
+    when(mockGcsUtil.fileSize(any(GcsPath.class)))
+        .thenThrow(new FileNotFoundException("some/path"));
+    when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink());
+
+    List<DataflowPackage> targets = PackageUtil.stageClasspathElements(
+        ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH);
+    DataflowPackage target = Iterables.getOnlyElement(targets);
+
+    verify(mockGcsUtil).fileSize(any(GcsPath.class));
+    verify(mockGcsUtil).create(any(GcsPath.class), anyString());
+    verifyNoMoreInteractions(mockGcsUtil);
+
+    assertThat(target.getName(), RegexMatcher.matches("folder-" + HASH_PATTERN + ".jar"));
+    assertThat(target.getLocation(), equalTo(STAGING_PATH + '/' + target.getName()));
+    assertNull(new ZipInputStream(Channels.newInputStream(pipe.source())).getNextEntry());
+  }
+
+  @Test(expected = RuntimeException.class)
+  public void testPackageUploadFailsWhenIOExceptionThrown() throws Exception {
+    File tmpFile = makeFileWithContents("file.txt", "This is a test!");
+    when(mockGcsUtil.fileSize(any(GcsPath.class)))
+        .thenThrow(new FileNotFoundException("some/path"));
+    when(mockGcsUtil.create(any(GcsPath.class), anyString()))
+        .thenThrow(new IOException("Fake Exception: Upload error"));
+
+    try {
+      PackageUtil.stageClasspathElements(
+          ImmutableList.of(tmpFile.getAbsolutePath()),
+          STAGING_PATH, fastNanoClockAndSleeper);
+    } finally {
+      verify(mockGcsUtil).fileSize(any(GcsPath.class));
+      verify(mockGcsUtil, times(5)).create(any(GcsPath.class), anyString());
+      verifyNoMoreInteractions(mockGcsUtil);
+    }
+  }
+
+  @Test
+  public void testPackageUploadFailsWithPermissionsErrorGivesDetailedMessage() throws Exception {
+    File tmpFile = makeFileWithContents("file.txt", "This is a test!");
+    when(mockGcsUtil.fileSize(any(GcsPath.class)))
+        .thenThrow(new FileNotFoundException("some/path"));
+    when(mockGcsUtil.create(any(GcsPath.class), anyString()))
+        .thenThrow(new IOException("Failed to write to GCS path " + STAGING_PATH,
+            googleJsonResponseException(
+                HttpStatusCodes.STATUS_CODE_FORBIDDEN, "Permission denied", "Test message")));
+
+    try {
+      PackageUtil.stageClasspathElements(
+          ImmutableList.of(tmpFile.getAbsolutePath()),
+          STAGING_PATH, fastNanoClockAndSleeper);
+      fail("Expected RuntimeException");
+    } catch (RuntimeException e) {
+      assertTrue("Expected IOException containing detailed message.",
+          e.getCause() instanceof IOException);
+      assertThat(e.getCause().getMessage(),
+          Matchers.allOf(
+              Matchers.containsString("Uploaded failed due to permissions error"),
+              Matchers.containsString(
+                  "Stale credentials can be resolved by executing 'gcloud auth login'")));
+    } finally {
+      verify(mockGcsUtil).fileSize(any(GcsPath.class));
+      verify(mockGcsUtil).create(any(GcsPath.class), anyString());
+      verifyNoMoreInteractions(mockGcsUtil);
+    }
+  }
+
+  @Test
+  public void testPackageUploadEventuallySucceeds() throws Exception {
+    Pipe pipe = Pipe.open();
+    File tmpFile = makeFileWithContents("file.txt", "This is a test!");
+    when(mockGcsUtil.fileSize(any(GcsPath.class)))
+        .thenThrow(new FileNotFoundException("some/path"));
+    when(mockGcsUtil.create(any(GcsPath.class), anyString()))
+        .thenThrow(new IOException("Fake Exception: 410 Gone")) // First attempt fails
+        .thenReturn(pipe.sink());                               // second attempt succeeds
+
+    try {
+      PackageUtil.stageClasspathElements(
+                                              ImmutableList.of(tmpFile.getAbsolutePath()),
+                                              STAGING_PATH,
+                                              fastNanoClockAndSleeper);
+    } finally {
+      verify(mockGcsUtil).fileSize(any(GcsPath.class));
+      verify(mockGcsUtil, times(2)).create(any(GcsPath.class), anyString());
+      verifyNoMoreInteractions(mockGcsUtil);
+    }
+  }
+
+  @Test
+  public void testPackageUploadIsSkippedWhenFileAlreadyExists() throws Exception {
+    File tmpFile = makeFileWithContents("file.txt", "This is a test!");
+    when(mockGcsUtil.fileSize(any(GcsPath.class))).thenReturn(tmpFile.length());
+
+    PackageUtil.stageClasspathElements(
+        ImmutableList.of(tmpFile.getAbsolutePath()), STAGING_PATH);
+
+    verify(mockGcsUtil).fileSize(any(GcsPath.class));
+    verifyNoMoreInteractions(mockGcsUtil);
+  }
+
+  @Test
+  public void testPackageUploadIsNotSkippedWhenSizesAreDifferent() throws Exception {
+    Pipe pipe = Pipe.open();
+    File tmpDirectory = tmpFolder.newFolder("folder");
+    tmpFolder.newFolder("folder", "empty_directory");
+    tmpFolder.newFolder("folder", "directory");
+    makeFileWithContents("folder/file.txt", "This is a test!");
+    makeFileWithContents("folder/directory/file.txt", "This is also a test!");
+    when(mockGcsUtil.fileSize(any(GcsPath.class))).thenReturn(Long.MAX_VALUE);
+    when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink());
+
+    PackageUtil.stageClasspathElements(
+        ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH);
+
+    verify(mockGcsUtil).fileSize(any(GcsPath.class));
+    verify(mockGcsUtil).create(any(GcsPath.class), anyString());
+    verifyNoMoreInteractions(mockGcsUtil);
+  }
+
+  @Test
+  public void testPackageUploadWithExplicitPackageName() throws Exception {
+    Pipe pipe = Pipe.open();
+    File tmpFile = makeFileWithContents("file.txt", "This is a test!");
+    final String overriddenName = "alias.txt";
+
+    when(mockGcsUtil.fileSize(any(GcsPath.class)))
+        .thenThrow(new FileNotFoundException("some/path"));
+    when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink());
+
+    List<DataflowPackage> targets = PackageUtil.stageClasspathElements(
+        ImmutableList.of(overriddenName + "=" + tmpFile.getAbsolutePath()), STAGING_PATH);
+    DataflowPackage target = Iterables.getOnlyElement(targets);
+
+    verify(mockGcsUtil).fileSize(any(GcsPath.class));
+    verify(mockGcsUtil).create(any(GcsPath.class), anyString());
+    verifyNoMoreInteractions(mockGcsUtil);
+
+    assertThat(target.getName(), equalTo(overriddenName));
+    assertThat(target.getLocation(),
+        RegexMatcher.matches(STAGING_PATH + "/file-" + HASH_PATTERN + ".txt"));
+  }
+
+  @Test
+  public void testPackageUploadIsSkippedWithNonExistentResource() throws Exception {
+    String nonExistentFile =
+        IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "non-existent-file");
+    assertEquals(Collections.EMPTY_LIST, PackageUtil.stageClasspathElements(
+        ImmutableList.of(nonExistentFile), STAGING_PATH));
+  }
+
+  /**
+   * Builds a fake GoogleJsonResponseException for testing API error handling.
+   */
+  private static GoogleJsonResponseException googleJsonResponseException(
+      final int status, final String reason, final String message) throws IOException {
+    final JsonFactory jsonFactory = new JacksonFactory();
+    HttpTransport transport = new MockHttpTransport() {
+      @Override
+      public LowLevelHttpRequest buildRequest(String method, String url) throws IOException {
+        ErrorInfo errorInfo = new ErrorInfo();
+        errorInfo.setReason(reason);
+        errorInfo.setMessage(message);
+        errorInfo.setFactory(jsonFactory);
+        GenericJson error = new GenericJson();
+        error.set("code", status);
+        error.set("errors", Arrays.asList(errorInfo));
+        error.setFactory(jsonFactory);
+        GenericJson errorResponse = new GenericJson();
+        errorResponse.set("error", error);
+        errorResponse.setFactory(jsonFactory);
+        return new MockLowLevelHttpRequest().setResponse(
+            new MockLowLevelHttpResponse().setContent(errorResponse.toPrettyString())
+            .setContentType(Json.MEDIA_TYPE).setStatusCode(status));
+        }
+    };
+    HttpRequest request =
+        transport.createRequestFactory().buildGetRequest(HttpTesting.SIMPLE_GENERIC_URL);
+    request.setThrowExceptionOnExecuteError(false);
+    HttpResponse response = request.execute();
+    return GoogleJsonResponseException.from(jsonFactory, response);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/sdks/java/core/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/core/pom.xml b/sdks/java/core/pom.xml
index 6595a21..aa72b11 100644
--- a/sdks/java/core/pom.xml
+++ b/sdks/java/core/pom.xml
@@ -370,20 +370,6 @@
 
   <dependencies>
     <dependency>
-      <groupId>com.google.apis</groupId>
-      <artifactId>google-api-services-dataflow</artifactId>
-      <version>${dataflow.version}</version>
-      <exclusions>
-        <!-- Exclude an old version of guava that is being pulled
-             in by a transitive dependency of google-api-client -->
-        <exclusion>
-          <groupId>com.google.guava</groupId>
-          <artifactId>guava-jdk5</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-
-    <dependency>
       <groupId>io.grpc</groupId>
       <artifactId>grpc-all</artifactId>
       <version>0.12.0</version>
@@ -429,20 +415,6 @@
 
     <dependency>
       <groupId>com.google.apis</groupId>
-      <artifactId>google-api-services-clouddebugger</artifactId>
-      <version>${clouddebugger.version}</version>
-      <exclusions>
-        <!-- Exclude an old version of guava that is being pulled
-             in by a transitive dependency of google-api-client -->
-        <exclusion>
-          <groupId>com.google.guava</groupId>
-          <artifactId>guava-jdk5</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-
-    <dependency>
-      <groupId>com.google.apis</groupId>
       <artifactId>google-api-services-pubsub</artifactId>
       <version>${pubsub.version}</version>
       <exclusions>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/BlockingDataflowPipelineOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/BlockingDataflowPipelineOptions.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/BlockingDataflowPipelineOptions.java
deleted file mode 100644
index 6bbafdd..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/BlockingDataflowPipelineOptions.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.options;
-
-import com.google.cloud.dataflow.sdk.runners.BlockingDataflowPipelineRunner;
-
-import com.fasterxml.jackson.annotation.JsonIgnore;
-
-import java.io.PrintStream;
-
-/**
- * Options that are used to configure the {@link BlockingDataflowPipelineRunner}.
- */
-@Description("Configure options on the BlockingDataflowPipelineRunner.")
-public interface BlockingDataflowPipelineOptions extends DataflowPipelineOptions {
-  /**
-   * Output stream for job status messages.
-   */
-  @Description("Where messages generated during execution of the Dataflow job will be output.")
-  @JsonIgnore
-  @Hidden
-  @Default.InstanceFactory(StandardOutputFactory.class)
-  PrintStream getJobMessageOutput();
-  void setJobMessageOutput(PrintStream value);
-
-  /**
-   * Returns a default of {@link System#out}.
-   */
-  public static class StandardOutputFactory implements DefaultValueFactory<PrintStream> {
-    @Override
-    public PrintStream create(PipelineOptions options) {
-      return System.out;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/CloudDebuggerOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/CloudDebuggerOptions.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/CloudDebuggerOptions.java
deleted file mode 100644
index 6f1551d..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/CloudDebuggerOptions.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.options;
-
-import com.google.api.services.clouddebugger.v2.model.Debuggee;
-import com.google.cloud.dataflow.sdk.annotations.Experimental;
-
-import javax.annotation.Nullable;
-
-/**
- * Options for controlling Cloud Debugger.
- */
-@Description("[Experimental] Used to configure the Cloud Debugger")
-@Experimental
-@Hidden
-public interface CloudDebuggerOptions {
-
-  /** Whether to enable the Cloud Debugger snapshot agent for the current job. */
-  @Description("Whether to enable the Cloud Debugger snapshot agent for the current job.")
-  boolean getEnableCloudDebugger();
-  void setEnableCloudDebugger(boolean enabled);
-
-  /** The Cloud Debugger debuggee to associate with. This should not be set directly. */
-  @Description("The Cloud Debugger debuggee to associate with. This should not be set directly.")
-  @Hidden
-  @Nullable Debuggee getDebuggee();
-  void setDebuggee(Debuggee debuggee);
-
-  /** The maximum cost (as a ratio of CPU time) allowed for evaluating conditional snapshots. */
-  @Description(
-      "The maximum cost (as a ratio of CPU time) allowed for evaluating conditional snapshots. "
-      + "Should be a double between 0 and 1. "
-      + "Snapshots will be cancelled if evaluating conditions takes more than this ratio of time.")
-  @Default.Double(0.01)
-  double getMaxConditionCost();
-  void setMaxConditionCost(double maxConditionCost);
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineDebugOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineDebugOptions.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineDebugOptions.java
deleted file mode 100644
index 6231bd4..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineDebugOptions.java
+++ /dev/null
@@ -1,253 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.options;
-
-import com.google.api.services.dataflow.Dataflow;
-import com.google.cloud.dataflow.sdk.annotations.Experimental;
-import com.google.cloud.dataflow.sdk.util.DataflowPathValidator;
-import com.google.cloud.dataflow.sdk.util.DataflowTransport;
-import com.google.cloud.dataflow.sdk.util.GcsStager;
-import com.google.cloud.dataflow.sdk.util.InstanceBuilder;
-import com.google.cloud.dataflow.sdk.util.PathValidator;
-import com.google.cloud.dataflow.sdk.util.Stager;
-
-import com.fasterxml.jackson.annotation.JsonIgnore;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * Internal. Options used to control execution of the Dataflow SDK for
- * debugging and testing purposes.
- */
-@Description("[Internal] Options used to control execution of the Dataflow SDK for "
-    + "debugging and testing purposes.")
-@Hidden
-public interface DataflowPipelineDebugOptions extends PipelineOptions {
-
-  /**
-   * The list of backend experiments to enable.
-   *
-   * <p>Dataflow provides a number of experimental features that can be enabled
-   * with this flag.
-   *
-   * <p>Please sync with the Dataflow team before enabling any experiments.
-   */
-  @Description("[Experimental] Dataflow provides a number of experimental features that can "
-      + "be enabled with this flag. Please sync with the Dataflow team before enabling any "
-      + "experiments.")
-  @Experimental
-  List<String> getExperiments();
-  void setExperiments(List<String> value);
-
-  /**
-   * The root URL for the Dataflow API. {@code dataflowEndpoint} can override this value
-   * if it contains an absolute URL, otherwise {@code apiRootUrl} will be combined with
-   * {@code dataflowEndpoint} to generate the full URL to communicate with the Dataflow API.
-   */
-  @Description("The root URL for the Dataflow API. dataflowEndpoint can override this "
-      + "value if it contains an absolute URL, otherwise apiRootUrl will be combined with "
-      + "dataflowEndpoint to generate the full URL to communicate with the Dataflow API.")
-  @Default.String(Dataflow.DEFAULT_ROOT_URL)
-  String getApiRootUrl();
-  void setApiRootUrl(String value);
-
-  /**
-   * Dataflow endpoint to use.
-   *
-   * <p>Defaults to the current version of the Google Cloud Dataflow
-   * API, at the time the current SDK version was released.
-   *
-   * <p>If the string contains "://", then this is treated as a URL,
-   * otherwise {@link #getApiRootUrl()} is used as the root
-   * URL.
-   */
-  @Description("The URL for the Dataflow API. If the string contains \"://\", this"
-      + " will be treated as the entire URL, otherwise will be treated relative to apiRootUrl.")
-  @Default.String(Dataflow.DEFAULT_SERVICE_PATH)
-  String getDataflowEndpoint();
-  void setDataflowEndpoint(String value);
-
-  /**
-   * The path to write the translated Dataflow job specification out to
-   * at job submission time. The Dataflow job specification will be represented in JSON
-   * format.
-   */
-  @Description("The path to write the translated Dataflow job specification out to "
-      + "at job submission time. The Dataflow job specification will be represented in JSON "
-      + "format.")
-  String getDataflowJobFile();
-  void setDataflowJobFile(String value);
-
-  /**
-   * The class of the validator that should be created and used to validate paths.
-   * If pathValidator has not been set explicitly, an instance of this class will be
-   * constructed and used as the path validator.
-   */
-  @Description("The class of the validator that should be created and used to validate paths. "
-      + "If pathValidator has not been set explicitly, an instance of this class will be "
-      + "constructed and used as the path validator.")
-  @Default.Class(DataflowPathValidator.class)
-  Class<? extends PathValidator> getPathValidatorClass();
-  void setPathValidatorClass(Class<? extends PathValidator> validatorClass);
-
-  /**
-   * The path validator instance that should be used to validate paths.
-   * If no path validator has been set explicitly, the default is to use the instance factory that
-   * constructs a path validator based upon the currently set pathValidatorClass.
-   */
-  @JsonIgnore
-  @Description("The path validator instance that should be used to validate paths. "
-      + "If no path validator has been set explicitly, the default is to use the instance factory "
-      + "that constructs a path validator based upon the currently set pathValidatorClass.")
-  @Default.InstanceFactory(PathValidatorFactory.class)
-  PathValidator getPathValidator();
-  void setPathValidator(PathValidator validator);
-
-  /**
-   * The class responsible for staging resources to be accessible by workers
-   * during job execution. If stager has not been set explicitly, an instance of this class
-   * will be created and used as the resource stager.
-   */
-  @Description("The class of the stager that should be created and used to stage resources. "
-      + "If stager has not been set explicitly, an instance of the this class will be created "
-      + "and used as the resource stager.")
-  @Default.Class(GcsStager.class)
-  Class<? extends Stager> getStagerClass();
-  void setStagerClass(Class<? extends Stager> stagerClass);
-
-  /**
-   * The resource stager instance that should be used to stage resources.
-   * If no stager has been set explicitly, the default is to use the instance factory
-   * that constructs a resource stager based upon the currently set stagerClass.
-   */
-  @JsonIgnore
-  @Description("The resource stager instance that should be used to stage resources. "
-      + "If no stager has been set explicitly, the default is to use the instance factory "
-      + "that constructs a resource stager based upon the currently set stagerClass.")
-  @Default.InstanceFactory(StagerFactory.class)
-  Stager getStager();
-  void setStager(Stager stager);
-
-  /**
-   * An instance of the Dataflow client. Defaults to creating a Dataflow client
-   * using the current set of options.
-   */
-  @JsonIgnore
-  @Description("An instance of the Dataflow client. Defaults to creating a Dataflow client "
-      + "using the current set of options.")
-  @Default.InstanceFactory(DataflowClientFactory.class)
-  Dataflow getDataflowClient();
-  void setDataflowClient(Dataflow value);
-
-  /** Returns the default Dataflow client built from the passed in PipelineOptions. */
-  public static class DataflowClientFactory implements DefaultValueFactory<Dataflow> {
-    @Override
-    public Dataflow create(PipelineOptions options) {
-        return DataflowTransport.newDataflowClient(
-            options.as(DataflowPipelineOptions.class)).build();
-    }
-  }
-
-  /**
-   * Whether to update the currently running pipeline with the same name as this one.
-   *
-   * @deprecated This property is replaced by {@link DataflowPipelineOptions#getUpdate()}
-   */
-  @Deprecated
-  @Description("If set, replace the existing pipeline with the name specified by --jobName with "
-      + "this pipeline, preserving state.")
-  boolean getUpdate();
-  @Deprecated
-  void setUpdate(boolean value);
-
-  /**
-   * Mapping of old PTranform names to new ones, specified as JSON
-   * <code>{"oldName":"newName",...}</code>. To mark a transform as deleted, make newName the
-   * empty string.
-   */
-  @JsonIgnore
-  @Description(
-      "Mapping of old PTranform names to new ones, specified as JSON "
-      + "{\"oldName\":\"newName\",...}. To mark a transform as deleted, make newName the empty "
-      + "string.")
-  Map<String, String> getTransformNameMapping();
-  void setTransformNameMapping(Map<String, String> value);
-
-  /**
-   * Custom windmill_main binary to use with the streaming runner.
-   */
-  @Description("Custom windmill_main binary to use with the streaming runner")
-  String getOverrideWindmillBinary();
-  void setOverrideWindmillBinary(String value);
-
-  /**
-   * Number of threads to use on the Dataflow worker harness. If left unspecified,
-   * the Dataflow service will compute an appropriate number of threads to use.
-   */
-  @Description("Number of threads to use on the Dataflow worker harness. If left unspecified, "
-      + "the Dataflow service will compute an appropriate number of threads to use.")
-  int getNumberOfWorkerHarnessThreads();
-  void setNumberOfWorkerHarnessThreads(int value);
-
-  /**
-   * If {@literal true}, save a heap dump before killing a thread or process which is GC
-   * thrashing or out of memory. The location of the heap file will either be echoed back
-   * to the user, or the user will be given the opportunity to download the heap file.
-   *
-   * <p>
-   * CAUTION: Heap dumps can of comparable size to the default boot disk. Consider increasing
-   * the boot disk size before setting this flag to true.
-   */
-  @Description("If {@literal true}, save a heap dump before killing a thread or process "
-      + "which is GC thrashing or out of memory.")
-  boolean getDumpHeapOnOOM();
-  void setDumpHeapOnOOM(boolean dumpHeapBeforeExit);
-
-  /**
-   * Creates a {@link PathValidator} object using the class specified in
-   * {@link #getPathValidatorClass()}.
-   */
-  public static class PathValidatorFactory implements DefaultValueFactory<PathValidator> {
-      @Override
-      public PathValidator create(PipelineOptions options) {
-      DataflowPipelineDebugOptions debugOptions = options.as(DataflowPipelineDebugOptions.class);
-      return InstanceBuilder.ofType(PathValidator.class)
-          .fromClass(debugOptions.getPathValidatorClass())
-          .fromFactoryMethod("fromOptions")
-          .withArg(PipelineOptions.class, options)
-          .build();
-    }
-  }
-
-  /**
-   * Creates a {@link Stager} object using the class specified in
-   * {@link #getStagerClass()}.
-   */
-  public static class StagerFactory implements DefaultValueFactory<Stager> {
-      @Override
-      public Stager create(PipelineOptions options) {
-      DataflowPipelineDebugOptions debugOptions = options.as(DataflowPipelineDebugOptions.class);
-      return InstanceBuilder.ofType(Stager.class)
-          .fromClass(debugOptions.getStagerClass())
-          .fromFactoryMethod("fromOptions")
-          .withArg(PipelineOptions.class, options)
-          .build();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineOptions.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineOptions.java
deleted file mode 100644
index dbfafd1..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineOptions.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.options;
-
-import com.google.cloud.dataflow.sdk.runners.DataflowPipeline;
-import com.google.common.base.MoreObjects;
-
-import org.joda.time.DateTimeUtils;
-import org.joda.time.DateTimeZone;
-import org.joda.time.format.DateTimeFormat;
-import org.joda.time.format.DateTimeFormatter;
-
-/**
- * Options that can be used to configure the {@link DataflowPipeline}.
- */
-@Description("Options that configure the Dataflow pipeline.")
-public interface DataflowPipelineOptions extends
-    PipelineOptions, GcpOptions, ApplicationNameOptions, DataflowPipelineDebugOptions,
-    DataflowPipelineWorkerPoolOptions, BigQueryOptions,
-    GcsOptions, StreamingOptions, CloudDebuggerOptions, DataflowWorkerLoggingOptions,
-    DataflowProfilingOptions, PubsubOptions {
-
-  @Description("Project id. Required when running a Dataflow in the cloud. "
-      + "See https://cloud.google.com/storage/docs/projects for further details.")
-  @Override
-  @Validation.Required
-  @Default.InstanceFactory(DefaultProjectFactory.class)
-  String getProject();
-  @Override
-  void setProject(String value);
-
-  /**
-   * GCS path for staging local files, e.g. gs://bucket/object
-   *
-   * <p>Must be a valid Cloud Storage URL, beginning with the prefix "gs://"
-   *
-   * <p>At least one of {@link PipelineOptions#getTempLocation()} or {@link #getStagingLocation()}
-   * must be set. If {@link #getStagingLocation()} is not set, then the Dataflow
-   * pipeline defaults to using {@link PipelineOptions#getTempLocation()}.
-   */
-  @Description("GCS path for staging local files, e.g. \"gs://bucket/object\". "
-      + "Must be a valid Cloud Storage URL, beginning with the prefix \"gs://\". "
-      + "At least one of stagingLocation or tempLocation must be set. If stagingLocation is unset, "
-      + "defaults to using tempLocation.")
-  String getStagingLocation();
-  void setStagingLocation(String value);
-
-  /**
-   * The Dataflow job name is used as an idempotence key within the Dataflow service.
-   * If there is an existing job that is currently active, another active job with the same
-   * name will not be able to be created. Defaults to using the ApplicationName-UserName-Date.
-   */
-  @Description("The Dataflow job name is used as an idempotence key within the Dataflow service. "
-      + "If there is an existing job that is currently active, another active job with the same "
-      + "name will not be able to be created. Defaults to using the ApplicationName-UserName-Date.")
-  @Default.InstanceFactory(JobNameFactory.class)
-  String getJobName();
-  void setJobName(String value);
-
-  /**
-   * Whether to update the currently running pipeline with the same name as this one.
-   */
-  @Override
-  @SuppressWarnings("deprecation") // base class member deprecated in favor of this one.
-  @Description(
-      "If set, replace the existing pipeline with the name specified by --jobName with "
-          + "this pipeline, preserving state.")
-  boolean getUpdate();
-  @Override
-  @SuppressWarnings("deprecation") // base class member deprecated in favor of this one.
-  void setUpdate(boolean value);
-
-  /**
-   * Returns a normalized job name constructed from {@link ApplicationNameOptions#getAppName()}, the
-   * local system user name (if available), and the current time. The normalization makes sure that
-   * the job name matches the required pattern of [a-z]([-a-z0-9]*[a-z0-9])? and length limit of 40
-   * characters.
-   *
-   * <p>This job name factory is only able to generate one unique name per second per application
-   * and user combination.
-   */
-  public static class JobNameFactory implements DefaultValueFactory<String> {
-    private static final DateTimeFormatter FORMATTER =
-        DateTimeFormat.forPattern("MMddHHmmss").withZone(DateTimeZone.UTC);
-
-    @Override
-    public String create(PipelineOptions options) {
-      String appName = options.as(ApplicationNameOptions.class).getAppName();
-      String normalizedAppName = appName == null || appName.length() == 0 ? "dataflow"
-          : appName.toLowerCase()
-                   .replaceAll("[^a-z0-9]", "0")
-                   .replaceAll("^[^a-z]", "a");
-      String userName = MoreObjects.firstNonNull(System.getProperty("user.name"), "");
-      String normalizedUserName = userName.toLowerCase()
-                                          .replaceAll("[^a-z0-9]", "0");
-      String datePart = FORMATTER.print(DateTimeUtils.currentTimeMillis());
-      return normalizedAppName + "-" + normalizedUserName + "-" + datePart;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineWorkerPoolOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineWorkerPoolOptions.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineWorkerPoolOptions.java
deleted file mode 100644
index 0c6428f..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineWorkerPoolOptions.java
+++ /dev/null
@@ -1,258 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.options;
-
-import com.google.cloud.dataflow.sdk.annotations.Experimental;
-import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
-
-import com.fasterxml.jackson.annotation.JsonIgnore;
-
-import java.util.List;
-
-/**
- * Options that are used to configure the Dataflow pipeline worker pool.
- */
-@Description("Options that are used to configure the Dataflow pipeline worker pool.")
-public interface DataflowPipelineWorkerPoolOptions extends PipelineOptions {
-  /**
-   * Number of workers to use when executing the Dataflow job. Note that selection of an autoscaling
-   * algorithm other then {@code NONE} will affect the size of the worker pool. If left unspecified,
-   * the Dataflow service will determine the number of workers.
-   */
-  @Description("Number of workers to use when executing the Dataflow job. Note that "
-      + "selection of an autoscaling algorithm other then \"NONE\" will affect the "
-      + "size of the worker pool. If left unspecified, the Dataflow service will "
-      + "determine the number of workers.")
-  int getNumWorkers();
-  void setNumWorkers(int value);
-
-  /**
-   * Type of autoscaling algorithm to use.
-   */
-  @Experimental(Experimental.Kind.AUTOSCALING)
-  public enum AutoscalingAlgorithmType {
-    /** Use numWorkers machines. Do not autoscale the worker pool. */
-    NONE("AUTOSCALING_ALGORITHM_NONE"),
-
-    @Deprecated
-    BASIC("AUTOSCALING_ALGORITHM_BASIC"),
-
-    /** Autoscale the workerpool based on throughput (up to maxNumWorkers). */
-    THROUGHPUT_BASED("AUTOSCALING_ALGORITHM_BASIC");
-
-    private final String algorithm;
-
-    private AutoscalingAlgorithmType(String algorithm) {
-      this.algorithm = algorithm;
-    }
-
-    /** Returns the string representation of this type. */
-    public String getAlgorithm() {
-      return this.algorithm;
-    }
-  }
-
-  /**
-   * [Experimental] The autoscaling algorithm to use for the workerpool.
-   *
-   * <ul>
-   *   <li>NONE: does not change the size of the worker pool.</li>
-   *   <li>BASIC: autoscale the worker pool size up to maxNumWorkers until the job completes.</li>
-   *   <li>THROUGHPUT_BASED: autoscale the workerpool based on throughput (up to maxNumWorkers).
-   *   </li>
-   * </ul>
-   */
-  @Description("[Experimental] The autoscaling algorithm to use for the workerpool. "
-      + "NONE: does not change the size of the worker pool. "
-      + "BASIC (deprecated): autoscale the worker pool size up to maxNumWorkers until the job "
-      + "completes. "
-      + "THROUGHPUT_BASED: autoscale the workerpool based on throughput (up to maxNumWorkers).")
-  @Experimental(Experimental.Kind.AUTOSCALING)
-  AutoscalingAlgorithmType getAutoscalingAlgorithm();
-  void setAutoscalingAlgorithm(AutoscalingAlgorithmType value);
-
-  /**
-   * The maximum number of workers to use for the workerpool. This options limits the size of the
-   * workerpool for the lifetime of the job, including
-   * <a href="https://cloud.google.com/dataflow/pipelines/updating-a-pipeline">pipeline updates</a>.
-   * If left unspecified, the Dataflow service will compute a ceiling.
-   */
-  @Description("The maximum number of workers to use for the workerpool. This options limits the "
-      + "size of the workerpool for the lifetime of the job, including pipeline updates. "
-      + "If left unspecified, the Dataflow service will compute a ceiling.")
-  int getMaxNumWorkers();
-  void setMaxNumWorkers(int value);
-
-  /**
-   * Remote worker disk size, in gigabytes, or 0 to use the default size.
-   */
-  @Description("Remote worker disk size, in gigabytes, or 0 to use the default size.")
-  int getDiskSizeGb();
-  void setDiskSizeGb(int value);
-
-  /**
-   * Docker container image that executes Dataflow worker harness, residing in Google Container
-   * Registry.
-   */
-  @Default.InstanceFactory(WorkerHarnessContainerImageFactory.class)
-  @Description("Docker container image that executes Dataflow worker harness, residing in Google "
-      + " Container Registry.")
-  @Hidden
-  String getWorkerHarnessContainerImage();
-  void setWorkerHarnessContainerImage(String value);
-
-  /**
-   * Returns the default Docker container image that executes Dataflow worker harness, residing in
-   * Google Container Registry.
-   */
-  public static class WorkerHarnessContainerImageFactory
-      implements DefaultValueFactory<String> {
-    @Override
-    public String create(PipelineOptions options) {
-      DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
-      if (dataflowOptions.isStreaming()) {
-        return DataflowPipelineRunner.STREAMING_WORKER_HARNESS_CONTAINER_IMAGE;
-      } else {
-        return DataflowPipelineRunner.BATCH_WORKER_HARNESS_CONTAINER_IMAGE;
-      }
-    }
-  }
-
-  /**
-   * GCE <a href="https://cloud.google.com/compute/docs/networking">network</a> for launching
-   * workers.
-   *
-   * <p>Default is up to the Dataflow service.
-   */
-  @Description("GCE network for launching workers. For more information, see the reference "
-      + "documentation https://cloud.google.com/compute/docs/networking. "
-      + "Default is up to the Dataflow service.")
-  String getNetwork();
-  void setNetwork(String value);
-
-  /**
-   * GCE <a href="https://cloud.google.com/compute/docs/networking">subnetwork</a> for launching
-   * workers.
-   *
-   * <p>Default is up to the Dataflow service. Expected format is
-   * regions/REGION/subnetworks/SUBNETWORK.
-   *
-   * <p>You may also need to specify network option.
-   */
-  @Description("GCE subnetwork for launching workers. For more information, see the reference "
-      + "documentation https://cloud.google.com/compute/docs/networking. "
-      + "Default is up to the Dataflow service.")
-  String getSubnetwork();
-  void setSubnetwork(String value);
-
-  /**
-   * GCE <a href="https://developers.google.com/compute/docs/zones"
-   * >availability zone</a> for launching workers.
-   *
-   * <p>Default is up to the Dataflow service.
-   */
-  @Description("GCE availability zone for launching workers. See "
-      + "https://developers.google.com/compute/docs/zones for a list of valid options. "
-      + "Default is up to the Dataflow service.")
-  String getZone();
-  void setZone(String value);
-
-  /**
-   * Machine type to create Dataflow worker VMs as.
-   *
-   * <p>See <a href="https://cloud.google.com/compute/docs/machine-types">GCE machine types</a>
-   * for a list of valid options.
-   *
-   * <p>If unset, the Dataflow service will choose a reasonable default.
-   */
-  @Description("Machine type to create Dataflow worker VMs as. See "
-      + "https://cloud.google.com/compute/docs/machine-types for a list of valid options. "
-      + "If unset, the Dataflow service will choose a reasonable default.")
-  String getWorkerMachineType();
-  void setWorkerMachineType(String value);
-
-  /**
-   * The policy for tearing down the workers spun up by the service.
-   */
-  public enum TeardownPolicy {
-    /**
-     * All VMs created for a Dataflow job are deleted when the job finishes, regardless of whether
-     * it fails or succeeds.
-     */
-    TEARDOWN_ALWAYS("TEARDOWN_ALWAYS"),
-    /**
-     * All VMs created for a Dataflow job are left running when the job finishes, regardless of
-     * whether it fails or succeeds.
-     */
-    TEARDOWN_NEVER("TEARDOWN_NEVER"),
-    /**
-     * All VMs created for a Dataflow job are deleted when the job succeeds, but are left running
-     * when it fails. (This is typically used for debugging failing jobs by SSHing into the
-     * workers.)
-     */
-    TEARDOWN_ON_SUCCESS("TEARDOWN_ON_SUCCESS");
-
-    private final String teardownPolicy;
-
-    private TeardownPolicy(String teardownPolicy) {
-      this.teardownPolicy = teardownPolicy;
-    }
-
-    public String getTeardownPolicyName() {
-      return this.teardownPolicy;
-    }
-  }
-
-  /**
-   * The teardown policy for the VMs.
-   *
-   * <p>If unset, the Dataflow service will choose a reasonable default.
-   */
-  @Description("The teardown policy for the VMs. If unset, the Dataflow service will "
-      + "choose a reasonable default.")
-  TeardownPolicy getTeardownPolicy();
-  void setTeardownPolicy(TeardownPolicy value);
-
-  /**
-   * List of local files to make available to workers.
-   *
-   * <p>Files are placed on the worker's classpath.
-   *
-   * <p>The default value is the list of jars from the main program's classpath.
-   */
-  @Description("Files to stage on GCS and make available to workers. "
-      + "Files are placed on the worker's classpath. "
-      + "The default value is all files from the classpath.")
-  @JsonIgnore
-  List<String> getFilesToStage();
-  void setFilesToStage(List<String> value);
-
-  /**
-   * Specifies what type of persistent disk should be used. The value should be a full or partial
-   * URL of a disk type resource, e.g., zones/us-central1-f/disks/pd-standard. For
-   * more information, see the
-   * <a href="https://cloud.google.com/compute/docs/reference/latest/diskTypes">API reference
-   * documentation for DiskTypes</a>.
-   */
-  @Description("Specifies what type of persistent disk should be used. The value should be a full "
-      + "or partial URL of a disk type resource, e.g., zones/us-central1-f/disks/pd-standard. For "
-      + "more information, see the API reference documentation for DiskTypes: "
-      + "https://cloud.google.com/compute/docs/reference/latest/diskTypes")
-  String getWorkerDiskType();
-  void setWorkerDiskType(String value);
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowProfilingOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowProfilingOptions.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowProfilingOptions.java
deleted file mode 100644
index f14b04d..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowProfilingOptions.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.options;
-
-import com.google.cloud.dataflow.sdk.annotations.Experimental;
-
-import java.util.HashMap;
-
-/**
- * Options for controlling profiling of pipeline execution.
- */
-@Description("[Experimental] Used to configure profiling of the Dataflow pipeline")
-@Experimental
-@Hidden
-public interface DataflowProfilingOptions {
-
-  @Description("Whether to periodically dump profiling information to local disk.\n"
-      + "WARNING: Enabling this option may fill local disk with profiling information.")
-  boolean getEnableProfilingAgent();
-  void setEnableProfilingAgent(boolean enabled);
-
-  @Description(
-      "[INTERNAL] Additional configuration for the profiling agent. Not typically necessary.")
-  @Hidden
-  DataflowProfilingAgentConfiguration getProfilingAgentConfiguration();
-  void setProfilingAgentConfiguration(DataflowProfilingAgentConfiguration configuration);
-
-  /**
-   * Configuration the for profiling agent.
-   */
-  public static class DataflowProfilingAgentConfiguration extends HashMap<String, Object> {
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowWorkerHarnessOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowWorkerHarnessOptions.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowWorkerHarnessOptions.java
deleted file mode 100644
index 7705b66..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowWorkerHarnessOptions.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.options;
-
-/**
- * Options that are used exclusively within the Dataflow worker harness.
- * These options have no effect at pipeline creation time.
- */
-@Description("[Internal] Options that are used exclusively within the Dataflow worker harness. "
-    + "These options have no effect at pipeline creation time.")
-@Hidden
-public interface DataflowWorkerHarnessOptions extends DataflowPipelineOptions {
-  /**
-   * The identity of the worker running this pipeline.
-   */
-  @Description("The identity of the worker running this pipeline.")
-  String getWorkerId();
-  void setWorkerId(String value);
-
-  /**
-   * The identity of the Dataflow job.
-   */
-  @Description("The identity of the Dataflow job.")
-  String getJobId();
-  void setJobId(String value);
-
-  /**
-   * The size of the worker's in-memory cache, in megabytes.
-   *
-   * <p>Currently, this cache is used for storing read values of side inputs.
-   */
-  @Description("The size of the worker's in-memory cache, in megabytes.")
-  @Default.Integer(100)
-  Integer getWorkerCacheMb();
-  void setWorkerCacheMb(Integer value);
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowWorkerLoggingOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowWorkerLoggingOptions.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowWorkerLoggingOptions.java
deleted file mode 100644
index ebd42d9..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowWorkerLoggingOptions.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.options;
-
-import com.google.common.base.Preconditions;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Options that are used to control logging configuration on the Dataflow worker.
- */
-@Description("Options that are used to control logging configuration on the Dataflow worker.")
-public interface DataflowWorkerLoggingOptions extends PipelineOptions {
-  /**
-   * The set of log levels that can be used on the Dataflow worker.
-   */
-  public enum Level {
-    DEBUG, ERROR, INFO, TRACE, WARN
-  }
-
-  /**
-   * This option controls the default log level of all loggers without a log level override.
-   */
-  @Description("Controls the default log level of all loggers without a log level override.")
-  @Default.Enum("INFO")
-  Level getDefaultWorkerLogLevel();
-  void setDefaultWorkerLogLevel(Level level);
-
-  /**
-   * This option controls the log levels for specifically named loggers.
-   *
-   * <p>Later options with equivalent names override earlier options.
-   *
-   * <p>See {@link WorkerLogLevelOverrides} for more information on how to configure logging
-   * on a per {@link Class}, {@link Package}, or name basis. If used from the command line,
-   * the expected format is {"Name":"Level",...}, further details on
-   * {@link WorkerLogLevelOverrides#from}.
-   */
-  @Description("This option controls the log levels for specifically named loggers. "
-      + "The expected format is {\"Name\":\"Level\",...}. The Dataflow worker uses "
-      + "java.util.logging, which supports a logging hierarchy based off of names that are '.' "
-      + "separated. For example, by specifying the value {\"a.b.c.Foo\":\"DEBUG\"}, the logger "
-      + "for the class 'a.b.c.Foo' will be configured to output logs at the DEBUG level. "
-      + "Similarly, by specifying the value {\"a.b.c\":\"WARN\"}, all loggers underneath the "
-      + "'a.b.c' package will be configured to output logs at the WARN level. Also, note that "
-      + "when multiple overrides are specified, the exact name followed by the closest parent "
-      + "takes precedence.")
-  WorkerLogLevelOverrides getWorkerLogLevelOverrides();
-  void setWorkerLogLevelOverrides(WorkerLogLevelOverrides value);
-
-  /**
-   * Defines a log level override for a specific class, package, or name.
-   *
-   * <p>{@code java.util.logging} is used on the Dataflow worker harness and supports
-   * a logging hierarchy based off of names that are "." separated. It is a common
-   * pattern to have the logger for a given class share the same name as the class itself.
-   * Given the classes {@code a.b.c.Foo}, {@code a.b.c.Xyz}, and {@code a.b.Bar}, with
-   * loggers named {@code "a.b.c.Foo"}, {@code "a.b.c.Xyz"}, and {@code "a.b.Bar"} respectively,
-   * we can override the log levels:
-   * <ul>
-   *    <li>for {@code Foo} by specifying the name {@code "a.b.c.Foo"} or the {@link Class}
-   *    representing {@code a.b.c.Foo}.
-   *    <li>for {@code Foo}, {@code Xyz}, and {@code Bar} by specifying the name {@code "a.b"} or
-   *    the {@link Package} representing {@code a.b}.
-   *    <li>for {@code Foo} and {@code Bar} by specifying both of their names or classes.
-   * </ul>
-   * Note that by specifying multiple overrides, the exact name followed by the closest parent
-   * takes precedence.
-   */
-  public static class WorkerLogLevelOverrides extends HashMap<String, Level> {
-    /**
-     * Overrides the default log level for the passed in class.
-     *
-     * <p>This is equivalent to calling
-     * {@link #addOverrideForName(String, DataflowWorkerLoggingOptions.Level)}
-     * and passing in the {@link Class#getName() class name}.
-     */
-    public WorkerLogLevelOverrides addOverrideForClass(Class<?> klass, Level level) {
-      Preconditions.checkNotNull(klass, "Expected class to be not null.");
-      addOverrideForName(klass.getName(), level);
-      return this;
-    }
-
-    /**
-     * Overrides the default log level for the passed in package.
-     *
-     * <p>This is equivalent to calling
-     * {@link #addOverrideForName(String, DataflowWorkerLoggingOptions.Level)}
-     * and passing in the {@link Package#getName() package name}.
-     */
-    public WorkerLogLevelOverrides addOverrideForPackage(Package pkg, Level level) {
-      Preconditions.checkNotNull(pkg, "Expected package to be not null.");
-      addOverrideForName(pkg.getName(), level);
-      return this;
-    }
-
-    /**
-     * Overrides the default log level for the passed in name.
-     *
-     * <p>Note that because of the hierarchical nature of logger names, this will
-     * override the log level of all loggers that have the passed in name or
-     * a parent logger that has the passed in name.
-     */
-    public WorkerLogLevelOverrides addOverrideForName(String name, Level level) {
-      Preconditions.checkNotNull(name, "Expected name to be not null.");
-      Preconditions.checkNotNull(level,
-          "Expected level to be one of %s.", Arrays.toString(Level.values()));
-      put(name, level);
-      return this;
-    }
-
-    /**
-     * Expects a map keyed by logger {@code Name}s with values representing {@code Level}s.
-     * The {@code Name} generally represents the fully qualified Java
-     * {@link Class#getName() class name}, or fully qualified Java
-     * {@link Package#getName() package name}, or custom logger name. The {@code Level}
-     * represents the log level and must be one of {@link Level}.
-     */
-    @JsonCreator
-    public static WorkerLogLevelOverrides from(Map<String, String> values) {
-      Preconditions.checkNotNull(values, "Expected values to be not null.");
-      WorkerLogLevelOverrides overrides = new WorkerLogLevelOverrides();
-      for (Map.Entry<String, String> entry : values.entrySet()) {
-        try {
-          overrides.addOverrideForName(entry.getKey(), Level.valueOf(entry.getValue()));
-        } catch (IllegalArgumentException e) {
-          throw new IllegalArgumentException(String.format(
-              "Unsupported log level '%s' requested for %s. Must be one of %s.",
-              entry.getValue(), entry.getKey(), Arrays.toString(Level.values())));
-        }
-
-      }
-      return overrides;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/BlockingDataflowPipelineRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/BlockingDataflowPipelineRunner.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/BlockingDataflowPipelineRunner.java
deleted file mode 100644
index 8903181..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/BlockingDataflowPipelineRunner.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.runners;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.PipelineResult.State;
-import com.google.cloud.dataflow.sdk.annotations.Experimental;
-import com.google.cloud.dataflow.sdk.options.BlockingDataflowPipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsValidator;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.util.MonitoringUtil;
-import com.google.cloud.dataflow.sdk.values.PInput;
-import com.google.cloud.dataflow.sdk.values.POutput;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-
-import javax.annotation.Nullable;
-
-/**
- * A {@link PipelineRunner} that's like {@link DataflowPipelineRunner}
- * but that waits for the launched job to finish.
- *
- * <p>Prints out job status updates and console messages while it waits.
- *
- * <p>Returns the final job state, or throws an exception if the job
- * fails or cannot be monitored.
- *
- * <p><h3>Permissions</h3>
- * When reading from a Dataflow source or writing to a Dataflow sink using
- * {@code BlockingDataflowPipelineRunner}, the Google cloud services account and the Google compute
- * engine service account of the GCP project running the Dataflow Job will need access to the
- * corresponding source/sink.
- *
- * <p>Please see <a href="https://cloud.google.com/dataflow/security-and-permissions">Google Cloud
- * Dataflow Security and Permissions</a> for more details.
- */
-public class BlockingDataflowPipelineRunner extends
-    PipelineRunner<DataflowPipelineJob> {
-  private static final Logger LOG = LoggerFactory.getLogger(BlockingDataflowPipelineRunner.class);
-
-  // Defaults to an infinite wait period.
-  // TODO: make this configurable after removal of option map.
-  private static final long BUILTIN_JOB_TIMEOUT_SEC = -1L;
-
-  private final DataflowPipelineRunner dataflowPipelineRunner;
-  private final BlockingDataflowPipelineOptions options;
-
-  protected BlockingDataflowPipelineRunner(
-      DataflowPipelineRunner internalRunner,
-      BlockingDataflowPipelineOptions options) {
-    this.dataflowPipelineRunner = internalRunner;
-    this.options = options;
-  }
-
-  /**
-   * Constructs a runner from the provided options.
-   */
-  public static BlockingDataflowPipelineRunner fromOptions(
-      PipelineOptions options) {
-    BlockingDataflowPipelineOptions dataflowOptions =
-        PipelineOptionsValidator.validate(BlockingDataflowPipelineOptions.class, options);
-    DataflowPipelineRunner dataflowPipelineRunner =
-        DataflowPipelineRunner.fromOptions(dataflowOptions);
-
-    return new BlockingDataflowPipelineRunner(dataflowPipelineRunner, dataflowOptions);
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * @throws DataflowJobExecutionException if there is an exception during job execution.
-   * @throws DataflowServiceException if there is an exception retrieving information about the job.
-   */
-  @Override
-  public DataflowPipelineJob run(Pipeline p) {
-    final DataflowPipelineJob job = dataflowPipelineRunner.run(p);
-
-    // We ignore the potential race condition here (Ctrl-C after job submission but before the
-    // shutdown hook is registered). Even if we tried to do something smarter (eg., SettableFuture)
-    // the run method (which produces the job) could fail or be Ctrl-C'd before it had returned a
-    // job. The display of the command to cancel the job is best-effort anyways -- RPC's could fail,
-    // etc. If the user wants to verify the job was cancelled they should look at the job status.
-    Thread shutdownHook = new Thread() {
-      @Override
-      public void run() {
-        LOG.warn("Job is already running in Google Cloud Platform, Ctrl-C will not cancel it.\n"
-            + "To cancel the job in the cloud, run:\n> {}",
-            MonitoringUtil.getGcloudCancelCommand(options, job.getJobId()));
-      }
-    };
-
-    try {
-      Runtime.getRuntime().addShutdownHook(shutdownHook);
-
-      @Nullable
-      State result;
-      try {
-        result = job.waitToFinish(
-            BUILTIN_JOB_TIMEOUT_SEC, TimeUnit.SECONDS,
-            new MonitoringUtil.PrintHandler(options.getJobMessageOutput()));
-      } catch (IOException | InterruptedException ex) {
-        if (ex instanceof InterruptedException) {
-          Thread.currentThread().interrupt();
-        }
-        LOG.debug("Exception caught while retrieving status for job {}", job.getJobId(), ex);
-        throw new DataflowServiceException(
-            job, "Exception caught while retrieving status for job " + job.getJobId(), ex);
-      }
-
-      if (result == null) {
-        throw new DataflowServiceException(
-            job, "Timed out while retrieving status for job " + job.getJobId());
-      }
-
-      LOG.info("Job finished with status {}", result);
-      if (!result.isTerminal()) {
-        throw new IllegalStateException("Expected terminal state for job " + job.getJobId()
-            + ", got " + result);
-      }
-
-      if (result == State.DONE) {
-        return job;
-      } else if (result == State.UPDATED) {
-        DataflowPipelineJob newJob = job.getReplacedByJob();
-        LOG.info("Job {} has been updated and is running as the new job with id {}."
-            + "To access the updated job on the Dataflow monitoring console, please navigate to {}",
-            job.getJobId(),
-            newJob.getJobId(),
-            MonitoringUtil.getJobMonitoringPageURL(newJob.getProjectId(), newJob.getJobId()));
-        throw new DataflowJobUpdatedException(
-            job,
-            String.format("Job %s updated; new job is %s.", job.getJobId(), newJob.getJobId()),
-            newJob);
-      } else if (result == State.CANCELLED) {
-        String message = String.format("Job %s cancelled by user", job.getJobId());
-        LOG.info(message);
-        throw new DataflowJobCancelledException(job, message);
-      } else {
-        throw new DataflowJobExecutionException(job, "Job " + job.getJobId()
-            + " failed with status " + result);
-      }
-    } finally {
-      Runtime.getRuntime().removeShutdownHook(shutdownHook);
-    }
-  }
-
-  @Override
-  public <OutputT extends POutput, InputT extends PInput> OutputT apply(
-      PTransform<InputT, OutputT> transform, InputT input) {
-    return dataflowPipelineRunner.apply(transform, input);
-  }
-
-  /**
-   * Sets callbacks to invoke during execution. See {@link DataflowPipelineRunnerHooks}.
-   */
-  @Experimental
-  public void setHooks(DataflowPipelineRunnerHooks hooks) {
-    this.dataflowPipelineRunner.setHooks(hooks);
-  }
-
-  @Override
-  public String toString() {
-    return "BlockingDataflowPipelineRunner#" + options.getJobName();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobAlreadyExistsException.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobAlreadyExistsException.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobAlreadyExistsException.java
deleted file mode 100644
index 4a4f100..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobAlreadyExistsException.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.runners;
-
-/**
- * An exception that is thrown if the unique job name constraint of the Dataflow
- * service is broken because an existing job with the same job name is currently active.
- * The {@link DataflowPipelineJob} contained within this exception contains information
- * about the pre-existing job.
- */
-public class DataflowJobAlreadyExistsException extends DataflowJobException {
-  /**
-   * Create a new {@code DataflowJobAlreadyExistsException} with the specified {@link
-   * DataflowPipelineJob} and message.
-   */
-  public DataflowJobAlreadyExistsException(
-      DataflowPipelineJob job, String message) {
-    super(job, message, null);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobAlreadyUpdatedException.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobAlreadyUpdatedException.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobAlreadyUpdatedException.java
deleted file mode 100644
index 1f52c6a..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobAlreadyUpdatedException.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.runners;
-
-/**
- * An exception that is thrown if the existing job has already been updated within the Dataflow
- * service and is no longer able to be updated. The {@link DataflowPipelineJob} contained within
- * this exception contains information about the pre-existing updated job.
- */
-public class DataflowJobAlreadyUpdatedException extends DataflowJobException {
-  /**
-   * Create a new {@code DataflowJobAlreadyUpdatedException} with the specified {@link
-   * DataflowPipelineJob} and message.
-   */
-  public DataflowJobAlreadyUpdatedException(
-      DataflowPipelineJob job, String message) {
-    super(job, message, null);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobCancelledException.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobCancelledException.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobCancelledException.java
deleted file mode 100644
index 495ca5a..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobCancelledException.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.runners;
-
-/**
- * Signals that a job run by a {@link BlockingDataflowPipelineRunner} was updated during execution.
- */
-public class DataflowJobCancelledException extends DataflowJobException {
-  /**
-   * Create a new {@code DataflowJobAlreadyUpdatedException} with the specified {@link
-   * DataflowPipelineJob} and message.
-   */
-  public DataflowJobCancelledException(DataflowPipelineJob job, String message) {
-    super(job, message, null);
-  }
-
-  /**
-   * Create a new {@code DataflowJobAlreadyUpdatedException} with the specified {@link
-   * DataflowPipelineJob}, message, and cause.
-   */
-  public DataflowJobCancelledException(DataflowPipelineJob job, String message, Throwable cause) {
-    super(job, message, cause);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobException.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobException.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobException.java
deleted file mode 100644
index a22d13c..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobException.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.runners;
-
-import java.util.Objects;
-
-import javax.annotation.Nullable;
-
-/**
- * A {@link RuntimeException} that contains information about a {@link DataflowPipelineJob}.
- */
-public abstract class DataflowJobException extends RuntimeException {
-  private final DataflowPipelineJob job;
-
-  DataflowJobException(DataflowPipelineJob job, String message, @Nullable Throwable cause) {
-    super(message, cause);
-    this.job = Objects.requireNonNull(job);
-  }
-
-  /**
-   * Returns the failed job.
-   */
-  public DataflowPipelineJob getJob() {
-    return job;
-  }
-}