You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/04/12 01:42:43 UTC
[09/18] incubator-beam git commit: [BEAM-151] Move a large portion of
the Dataflow runner to separate maven module
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/runners/google-cloud-dataflow-java/src/test/java/com/google/cloud/dataflow/sdk/util/PackageUtilTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/com/google/cloud/dataflow/sdk/util/PackageUtilTest.java b/runners/google-cloud-dataflow-java/src/test/java/com/google/cloud/dataflow/sdk/util/PackageUtilTest.java
new file mode 100644
index 0000000..d2c08c8
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/test/java/com/google/cloud/dataflow/sdk/util/PackageUtilTest.java
@@ -0,0 +1,483 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.util;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import com.google.api.client.googleapis.json.GoogleJsonError.ErrorInfo;
+import com.google.api.client.googleapis.json.GoogleJsonResponseException;
+import com.google.api.client.http.HttpRequest;
+import com.google.api.client.http.HttpResponse;
+import com.google.api.client.http.HttpStatusCodes;
+import com.google.api.client.http.HttpTransport;
+import com.google.api.client.http.LowLevelHttpRequest;
+import com.google.api.client.json.GenericJson;
+import com.google.api.client.json.Json;
+import com.google.api.client.json.JsonFactory;
+import com.google.api.client.json.jackson2.JacksonFactory;
+import com.google.api.client.testing.http.HttpTesting;
+import com.google.api.client.testing.http.MockHttpTransport;
+import com.google.api.client.testing.http.MockLowLevelHttpRequest;
+import com.google.api.client.testing.http.MockLowLevelHttpResponse;
+import com.google.api.services.dataflow.model.DataflowPackage;
+import com.google.cloud.dataflow.sdk.options.GcsOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.testing.ExpectedLogs;
+import com.google.cloud.dataflow.sdk.testing.FastNanoClockAndSleeper;
+import com.google.cloud.dataflow.sdk.util.PackageUtil.PackageAttributes;
+import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+import com.google.common.io.LineReader;
+
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.hamcrest.Matchers;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.channels.Channels;
+import java.nio.channels.Pipe;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.regex.Pattern;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
+
+/** Tests for PackageUtil. */
+@RunWith(JUnit4.class)
+public class PackageUtilTest {
+ @Rule public ExpectedLogs logged = ExpectedLogs.none(PackageUtil.class);
+ @Rule
+ public TemporaryFolder tmpFolder = new TemporaryFolder();
+
+ @Rule
+ public FastNanoClockAndSleeper fastNanoClockAndSleeper = new FastNanoClockAndSleeper();
+
+ @Mock
+ GcsUtil mockGcsUtil;
+
+ // 128 bits, base64 encoded is 171 bits, rounds to 22 bytes
+ private static final String HASH_PATTERN = "[a-zA-Z0-9+-]{22}";
+
+ // Hamcrest matcher to assert a string matches a pattern
+ private static class RegexMatcher extends BaseMatcher<String> {
+ private final Pattern pattern;
+
+ public RegexMatcher(String regex) {
+ this.pattern = Pattern.compile(regex);
+ }
+
+ @Override
+ public boolean matches(Object o) {
+ if (!(o instanceof String)) {
+ return false;
+ }
+ return pattern.matcher((String) o).matches();
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ description.appendText(String.format("matches regular expression %s", pattern));
+ }
+
+ public static RegexMatcher matches(String regex) {
+ return new RegexMatcher(regex);
+ }
+ }
+
+ @Before
+ public void setUp() {
+ MockitoAnnotations.initMocks(this);
+
+ GcsOptions pipelineOptions = PipelineOptionsFactory.as(GcsOptions.class);
+ pipelineOptions.setGcsUtil(mockGcsUtil);
+
+ IOChannelUtils.registerStandardIOFactories(pipelineOptions);
+ }
+
+ private File makeFileWithContents(String name, String contents) throws Exception {
+ File tmpFile = tmpFolder.newFile(name);
+ Files.write(contents, tmpFile, StandardCharsets.UTF_8);
+ tmpFile.setLastModified(0); // required for determinism with directories
+ return tmpFile;
+ }
+
+ static final String STAGING_PATH = GcsPath.fromComponents("somebucket", "base/path").toString();
+ private static PackageAttributes makePackageAttributes(File file, String overridePackageName) {
+ return PackageUtil.createPackageAttributes(file, STAGING_PATH, overridePackageName);
+ }
+
+ @Test
+ public void testFileWithExtensionPackageNamingAndSize() throws Exception {
+ String contents = "This is a test!";
+ File tmpFile = makeFileWithContents("file.txt", contents);
+ PackageAttributes attr = makePackageAttributes(tmpFile, null);
+ DataflowPackage target = attr.getDataflowPackage();
+
+ assertThat(target.getName(), RegexMatcher.matches("file-" + HASH_PATTERN + ".txt"));
+ assertThat(target.getLocation(), equalTo(STAGING_PATH + '/' + target.getName()));
+ assertThat(attr.getSize(), equalTo((long) contents.length()));
+ }
+
+ @Test
+ public void testPackageNamingWithFileNoExtension() throws Exception {
+ File tmpFile = makeFileWithContents("file", "This is a test!");
+ DataflowPackage target = makePackageAttributes(tmpFile, null).getDataflowPackage();
+
+ assertThat(target.getName(), RegexMatcher.matches("file-" + HASH_PATTERN));
+ assertThat(target.getLocation(), equalTo(STAGING_PATH + '/' + target.getName()));
+ }
+
+ @Test
+ public void testPackageNamingWithDirectory() throws Exception {
+ File tmpDirectory = tmpFolder.newFolder("folder");
+ DataflowPackage target = makePackageAttributes(tmpDirectory, null).getDataflowPackage();
+
+ assertThat(target.getName(), RegexMatcher.matches("folder-" + HASH_PATTERN + ".jar"));
+ assertThat(target.getLocation(), equalTo(STAGING_PATH + '/' + target.getName()));
+ }
+
+ @Test
+ public void testPackageNamingWithFilesHavingSameContentsAndSameNames() throws Exception {
+ File tmpDirectory1 = tmpFolder.newFolder("folder1", "folderA");
+ makeFileWithContents("folder1/folderA/sameName", "This is a test!");
+ DataflowPackage target1 = makePackageAttributes(tmpDirectory1, null).getDataflowPackage();
+
+ File tmpDirectory2 = tmpFolder.newFolder("folder2", "folderA");
+ makeFileWithContents("folder2/folderA/sameName", "This is a test!");
+ DataflowPackage target2 = makePackageAttributes(tmpDirectory2, null).getDataflowPackage();
+
+ assertEquals(target1.getName(), target2.getName());
+ assertEquals(target1.getLocation(), target2.getLocation());
+ }
+
+ @Test
+ public void testPackageNamingWithFilesHavingSameContentsButDifferentNames() throws Exception {
+ File tmpDirectory1 = tmpFolder.newFolder("folder1", "folderA");
+ makeFileWithContents("folder1/folderA/uniqueName1", "This is a test!");
+ DataflowPackage target1 = makePackageAttributes(tmpDirectory1, null).getDataflowPackage();
+
+ File tmpDirectory2 = tmpFolder.newFolder("folder2", "folderA");
+ makeFileWithContents("folder2/folderA/uniqueName2", "This is a test!");
+ DataflowPackage target2 = makePackageAttributes(tmpDirectory2, null).getDataflowPackage();
+
+ assertNotEquals(target1.getName(), target2.getName());
+ assertNotEquals(target1.getLocation(), target2.getLocation());
+ }
+
+ @Test
+ public void testPackageNamingWithDirectoriesHavingSameContentsButDifferentNames()
+ throws Exception {
+ File tmpDirectory1 = tmpFolder.newFolder("folder1", "folderA");
+ tmpFolder.newFolder("folder1", "folderA", "uniqueName1");
+ DataflowPackage target1 = makePackageAttributes(tmpDirectory1, null).getDataflowPackage();
+
+ File tmpDirectory2 = tmpFolder.newFolder("folder2", "folderA");
+ tmpFolder.newFolder("folder2", "folderA", "uniqueName2");
+ DataflowPackage target2 = makePackageAttributes(tmpDirectory2, null).getDataflowPackage();
+
+ assertNotEquals(target1.getName(), target2.getName());
+ assertNotEquals(target1.getLocation(), target2.getLocation());
+ }
+
+ @Test
+ public void testPackageUploadWithLargeClasspathLogsWarning() throws Exception {
+ File tmpFile = makeFileWithContents("file.txt", "This is a test!");
+ // all files will be present and cached so no upload needed.
+ when(mockGcsUtil.fileSize(any(GcsPath.class))).thenReturn(tmpFile.length());
+
+ List<String> classpathElements = Lists.newLinkedList();
+ for (int i = 0; i < 1005; ++i) {
+ String eltName = "element" + i;
+ classpathElements.add(eltName + '=' + tmpFile.getAbsolutePath());
+ }
+
+ PackageUtil.stageClasspathElements(classpathElements, STAGING_PATH);
+
+ logged.verifyWarn("Your classpath contains 1005 elements, which Google Cloud Dataflow");
+ }
+
+ @Test
+ public void testPackageUploadWithFileSucceeds() throws Exception {
+ Pipe pipe = Pipe.open();
+ String contents = "This is a test!";
+ File tmpFile = makeFileWithContents("file.txt", contents);
+ when(mockGcsUtil.fileSize(any(GcsPath.class)))
+ .thenThrow(new FileNotFoundException("some/path"));
+ when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink());
+
+ List<DataflowPackage> targets = PackageUtil.stageClasspathElements(
+ ImmutableList.of(tmpFile.getAbsolutePath()), STAGING_PATH);
+ DataflowPackage target = Iterables.getOnlyElement(targets);
+
+ verify(mockGcsUtil).fileSize(any(GcsPath.class));
+ verify(mockGcsUtil).create(any(GcsPath.class), anyString());
+ verifyNoMoreInteractions(mockGcsUtil);
+
+ assertThat(target.getName(), RegexMatcher.matches("file-" + HASH_PATTERN + ".txt"));
+ assertThat(target.getLocation(), equalTo(STAGING_PATH + '/' + target.getName()));
+ assertThat(new LineReader(Channels.newReader(pipe.source(), "UTF-8")).readLine(),
+ equalTo(contents));
+ }
+
+ @Test
+ public void testPackageUploadWithDirectorySucceeds() throws Exception {
+ Pipe pipe = Pipe.open();
+ File tmpDirectory = tmpFolder.newFolder("folder");
+ tmpFolder.newFolder("folder", "empty_directory");
+ tmpFolder.newFolder("folder", "directory");
+ makeFileWithContents("folder/file.txt", "This is a test!");
+ makeFileWithContents("folder/directory/file.txt", "This is also a test!");
+
+ when(mockGcsUtil.fileSize(any(GcsPath.class)))
+ .thenThrow(new FileNotFoundException("some/path"));
+ when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink());
+
+ PackageUtil.stageClasspathElements(
+ ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH);
+
+ verify(mockGcsUtil).fileSize(any(GcsPath.class));
+ verify(mockGcsUtil).create(any(GcsPath.class), anyString());
+ verifyNoMoreInteractions(mockGcsUtil);
+
+ ZipInputStream inputStream = new ZipInputStream(Channels.newInputStream(pipe.source()));
+ List<String> zipEntryNames = new ArrayList<>();
+ for (ZipEntry entry = inputStream.getNextEntry(); entry != null;
+ entry = inputStream.getNextEntry()) {
+ zipEntryNames.add(entry.getName());
+ }
+
+ assertThat(zipEntryNames,
+ containsInAnyOrder("directory/file.txt", "empty_directory/", "file.txt"));
+ }
+
+ @Test
+ public void testPackageUploadWithEmptyDirectorySucceeds() throws Exception {
+ Pipe pipe = Pipe.open();
+ File tmpDirectory = tmpFolder.newFolder("folder");
+
+ when(mockGcsUtil.fileSize(any(GcsPath.class)))
+ .thenThrow(new FileNotFoundException("some/path"));
+ when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink());
+
+ List<DataflowPackage> targets = PackageUtil.stageClasspathElements(
+ ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH);
+ DataflowPackage target = Iterables.getOnlyElement(targets);
+
+ verify(mockGcsUtil).fileSize(any(GcsPath.class));
+ verify(mockGcsUtil).create(any(GcsPath.class), anyString());
+ verifyNoMoreInteractions(mockGcsUtil);
+
+ assertThat(target.getName(), RegexMatcher.matches("folder-" + HASH_PATTERN + ".jar"));
+ assertThat(target.getLocation(), equalTo(STAGING_PATH + '/' + target.getName()));
+ assertNull(new ZipInputStream(Channels.newInputStream(pipe.source())).getNextEntry());
+ }
+
+ @Test(expected = RuntimeException.class)
+ public void testPackageUploadFailsWhenIOExceptionThrown() throws Exception {
+ File tmpFile = makeFileWithContents("file.txt", "This is a test!");
+ when(mockGcsUtil.fileSize(any(GcsPath.class)))
+ .thenThrow(new FileNotFoundException("some/path"));
+ when(mockGcsUtil.create(any(GcsPath.class), anyString()))
+ .thenThrow(new IOException("Fake Exception: Upload error"));
+
+ try {
+ PackageUtil.stageClasspathElements(
+ ImmutableList.of(tmpFile.getAbsolutePath()),
+ STAGING_PATH, fastNanoClockAndSleeper);
+ } finally {
+ verify(mockGcsUtil).fileSize(any(GcsPath.class));
+ verify(mockGcsUtil, times(5)).create(any(GcsPath.class), anyString());
+ verifyNoMoreInteractions(mockGcsUtil);
+ }
+ }
+
+ @Test
+ public void testPackageUploadFailsWithPermissionsErrorGivesDetailedMessage() throws Exception {
+ File tmpFile = makeFileWithContents("file.txt", "This is a test!");
+ when(mockGcsUtil.fileSize(any(GcsPath.class)))
+ .thenThrow(new FileNotFoundException("some/path"));
+ when(mockGcsUtil.create(any(GcsPath.class), anyString()))
+ .thenThrow(new IOException("Failed to write to GCS path " + STAGING_PATH,
+ googleJsonResponseException(
+ HttpStatusCodes.STATUS_CODE_FORBIDDEN, "Permission denied", "Test message")));
+
+ try {
+ PackageUtil.stageClasspathElements(
+ ImmutableList.of(tmpFile.getAbsolutePath()),
+ STAGING_PATH, fastNanoClockAndSleeper);
+ fail("Expected RuntimeException");
+ } catch (RuntimeException e) {
+ assertTrue("Expected IOException containing detailed message.",
+ e.getCause() instanceof IOException);
+ assertThat(e.getCause().getMessage(),
+ Matchers.allOf(
+ Matchers.containsString("Uploaded failed due to permissions error"),
+ Matchers.containsString(
+ "Stale credentials can be resolved by executing 'gcloud auth login'")));
+ } finally {
+ verify(mockGcsUtil).fileSize(any(GcsPath.class));
+ verify(mockGcsUtil).create(any(GcsPath.class), anyString());
+ verifyNoMoreInteractions(mockGcsUtil);
+ }
+ }
+
+ @Test
+ public void testPackageUploadEventuallySucceeds() throws Exception {
+ Pipe pipe = Pipe.open();
+ File tmpFile = makeFileWithContents("file.txt", "This is a test!");
+ when(mockGcsUtil.fileSize(any(GcsPath.class)))
+ .thenThrow(new FileNotFoundException("some/path"));
+ when(mockGcsUtil.create(any(GcsPath.class), anyString()))
+ .thenThrow(new IOException("Fake Exception: 410 Gone")) // First attempt fails
+ .thenReturn(pipe.sink()); // second attempt succeeds
+
+ try {
+ PackageUtil.stageClasspathElements(
+ ImmutableList.of(tmpFile.getAbsolutePath()),
+ STAGING_PATH,
+ fastNanoClockAndSleeper);
+ } finally {
+ verify(mockGcsUtil).fileSize(any(GcsPath.class));
+ verify(mockGcsUtil, times(2)).create(any(GcsPath.class), anyString());
+ verifyNoMoreInteractions(mockGcsUtil);
+ }
+ }
+
+ @Test
+ public void testPackageUploadIsSkippedWhenFileAlreadyExists() throws Exception {
+ File tmpFile = makeFileWithContents("file.txt", "This is a test!");
+ when(mockGcsUtil.fileSize(any(GcsPath.class))).thenReturn(tmpFile.length());
+
+ PackageUtil.stageClasspathElements(
+ ImmutableList.of(tmpFile.getAbsolutePath()), STAGING_PATH);
+
+ verify(mockGcsUtil).fileSize(any(GcsPath.class));
+ verifyNoMoreInteractions(mockGcsUtil);
+ }
+
+ @Test
+ public void testPackageUploadIsNotSkippedWhenSizesAreDifferent() throws Exception {
+ Pipe pipe = Pipe.open();
+ File tmpDirectory = tmpFolder.newFolder("folder");
+ tmpFolder.newFolder("folder", "empty_directory");
+ tmpFolder.newFolder("folder", "directory");
+ makeFileWithContents("folder/file.txt", "This is a test!");
+ makeFileWithContents("folder/directory/file.txt", "This is also a test!");
+ when(mockGcsUtil.fileSize(any(GcsPath.class))).thenReturn(Long.MAX_VALUE);
+ when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink());
+
+ PackageUtil.stageClasspathElements(
+ ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH);
+
+ verify(mockGcsUtil).fileSize(any(GcsPath.class));
+ verify(mockGcsUtil).create(any(GcsPath.class), anyString());
+ verifyNoMoreInteractions(mockGcsUtil);
+ }
+
+ @Test
+ public void testPackageUploadWithExplicitPackageName() throws Exception {
+ Pipe pipe = Pipe.open();
+ File tmpFile = makeFileWithContents("file.txt", "This is a test!");
+ final String overriddenName = "alias.txt";
+
+ when(mockGcsUtil.fileSize(any(GcsPath.class)))
+ .thenThrow(new FileNotFoundException("some/path"));
+ when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink());
+
+ List<DataflowPackage> targets = PackageUtil.stageClasspathElements(
+ ImmutableList.of(overriddenName + "=" + tmpFile.getAbsolutePath()), STAGING_PATH);
+ DataflowPackage target = Iterables.getOnlyElement(targets);
+
+ verify(mockGcsUtil).fileSize(any(GcsPath.class));
+ verify(mockGcsUtil).create(any(GcsPath.class), anyString());
+ verifyNoMoreInteractions(mockGcsUtil);
+
+ assertThat(target.getName(), equalTo(overriddenName));
+ assertThat(target.getLocation(),
+ RegexMatcher.matches(STAGING_PATH + "/file-" + HASH_PATTERN + ".txt"));
+ }
+
+ @Test
+ public void testPackageUploadIsSkippedWithNonExistentResource() throws Exception {
+ String nonExistentFile =
+ IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "non-existent-file");
+ assertEquals(Collections.EMPTY_LIST, PackageUtil.stageClasspathElements(
+ ImmutableList.of(nonExistentFile), STAGING_PATH));
+ }
+
+ /**
+ * Builds a fake GoogleJsonResponseException for testing API error handling.
+ */
+ private static GoogleJsonResponseException googleJsonResponseException(
+ final int status, final String reason, final String message) throws IOException {
+ final JsonFactory jsonFactory = new JacksonFactory();
+ HttpTransport transport = new MockHttpTransport() {
+ @Override
+ public LowLevelHttpRequest buildRequest(String method, String url) throws IOException {
+ ErrorInfo errorInfo = new ErrorInfo();
+ errorInfo.setReason(reason);
+ errorInfo.setMessage(message);
+ errorInfo.setFactory(jsonFactory);
+ GenericJson error = new GenericJson();
+ error.set("code", status);
+ error.set("errors", Arrays.asList(errorInfo));
+ error.setFactory(jsonFactory);
+ GenericJson errorResponse = new GenericJson();
+ errorResponse.set("error", error);
+ errorResponse.setFactory(jsonFactory);
+ return new MockLowLevelHttpRequest().setResponse(
+ new MockLowLevelHttpResponse().setContent(errorResponse.toPrettyString())
+ .setContentType(Json.MEDIA_TYPE).setStatusCode(status));
+ }
+ };
+ HttpRequest request =
+ transport.createRequestFactory().buildGetRequest(HttpTesting.SIMPLE_GENERIC_URL);
+ request.setThrowExceptionOnExecuteError(false);
+ HttpResponse response = request.execute();
+ return GoogleJsonResponseException.from(jsonFactory, response);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/sdks/java/core/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/core/pom.xml b/sdks/java/core/pom.xml
index 6595a21..aa72b11 100644
--- a/sdks/java/core/pom.xml
+++ b/sdks/java/core/pom.xml
@@ -370,20 +370,6 @@
<dependencies>
<dependency>
- <groupId>com.google.apis</groupId>
- <artifactId>google-api-services-dataflow</artifactId>
- <version>${dataflow.version}</version>
- <exclusions>
- <!-- Exclude an old version of guava that is being pulled
- in by a transitive dependency of google-api-client -->
- <exclusion>
- <groupId>com.google.guava</groupId>
- <artifactId>guava-jdk5</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-all</artifactId>
<version>0.12.0</version>
@@ -429,20 +415,6 @@
<dependency>
<groupId>com.google.apis</groupId>
- <artifactId>google-api-services-clouddebugger</artifactId>
- <version>${clouddebugger.version}</version>
- <exclusions>
- <!-- Exclude an old version of guava that is being pulled
- in by a transitive dependency of google-api-client -->
- <exclusion>
- <groupId>com.google.guava</groupId>
- <artifactId>guava-jdk5</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
- <groupId>com.google.apis</groupId>
<artifactId>google-api-services-pubsub</artifactId>
<version>${pubsub.version}</version>
<exclusions>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/BlockingDataflowPipelineOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/BlockingDataflowPipelineOptions.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/BlockingDataflowPipelineOptions.java
deleted file mode 100644
index 6bbafdd..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/BlockingDataflowPipelineOptions.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.options;
-
-import com.google.cloud.dataflow.sdk.runners.BlockingDataflowPipelineRunner;
-
-import com.fasterxml.jackson.annotation.JsonIgnore;
-
-import java.io.PrintStream;
-
-/**
- * Options that are used to configure the {@link BlockingDataflowPipelineRunner}.
- */
-@Description("Configure options on the BlockingDataflowPipelineRunner.")
-public interface BlockingDataflowPipelineOptions extends DataflowPipelineOptions {
- /**
- * Output stream for job status messages.
- */
- @Description("Where messages generated during execution of the Dataflow job will be output.")
- @JsonIgnore
- @Hidden
- @Default.InstanceFactory(StandardOutputFactory.class)
- PrintStream getJobMessageOutput();
- void setJobMessageOutput(PrintStream value);
-
- /**
- * Returns a default of {@link System#out}.
- */
- public static class StandardOutputFactory implements DefaultValueFactory<PrintStream> {
- @Override
- public PrintStream create(PipelineOptions options) {
- return System.out;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/CloudDebuggerOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/CloudDebuggerOptions.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/CloudDebuggerOptions.java
deleted file mode 100644
index 6f1551d..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/CloudDebuggerOptions.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.options;
-
-import com.google.api.services.clouddebugger.v2.model.Debuggee;
-import com.google.cloud.dataflow.sdk.annotations.Experimental;
-
-import javax.annotation.Nullable;
-
-/**
- * Options for controlling Cloud Debugger.
- */
-@Description("[Experimental] Used to configure the Cloud Debugger")
-@Experimental
-@Hidden
-public interface CloudDebuggerOptions {
-
- /** Whether to enable the Cloud Debugger snapshot agent for the current job. */
- @Description("Whether to enable the Cloud Debugger snapshot agent for the current job.")
- boolean getEnableCloudDebugger();
- void setEnableCloudDebugger(boolean enabled);
-
- /** The Cloud Debugger debuggee to associate with. This should not be set directly. */
- @Description("The Cloud Debugger debuggee to associate with. This should not be set directly.")
- @Hidden
- @Nullable Debuggee getDebuggee();
- void setDebuggee(Debuggee debuggee);
-
- /** The maximum cost (as a ratio of CPU time) allowed for evaluating conditional snapshots. */
- @Description(
- "The maximum cost (as a ratio of CPU time) allowed for evaluating conditional snapshots. "
- + "Should be a double between 0 and 1. "
- + "Snapshots will be cancelled if evaluating conditions takes more than this ratio of time.")
- @Default.Double(0.01)
- double getMaxConditionCost();
- void setMaxConditionCost(double maxConditionCost);
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineDebugOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineDebugOptions.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineDebugOptions.java
deleted file mode 100644
index 6231bd4..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineDebugOptions.java
+++ /dev/null
@@ -1,253 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.options;
-
-import com.google.api.services.dataflow.Dataflow;
-import com.google.cloud.dataflow.sdk.annotations.Experimental;
-import com.google.cloud.dataflow.sdk.util.DataflowPathValidator;
-import com.google.cloud.dataflow.sdk.util.DataflowTransport;
-import com.google.cloud.dataflow.sdk.util.GcsStager;
-import com.google.cloud.dataflow.sdk.util.InstanceBuilder;
-import com.google.cloud.dataflow.sdk.util.PathValidator;
-import com.google.cloud.dataflow.sdk.util.Stager;
-
-import com.fasterxml.jackson.annotation.JsonIgnore;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * Internal. Options used to control execution of the Dataflow SDK for
- * debugging and testing purposes.
- */
-@Description("[Internal] Options used to control execution of the Dataflow SDK for "
- + "debugging and testing purposes.")
-@Hidden
-public interface DataflowPipelineDebugOptions extends PipelineOptions {
-
- /**
- * The list of backend experiments to enable.
- *
- * <p>Dataflow provides a number of experimental features that can be enabled
- * with this flag.
- *
- * <p>Please sync with the Dataflow team before enabling any experiments.
- */
- @Description("[Experimental] Dataflow provides a number of experimental features that can "
- + "be enabled with this flag. Please sync with the Dataflow team before enabling any "
- + "experiments.")
- @Experimental
- List<String> getExperiments();
- void setExperiments(List<String> value);
-
- /**
- * The root URL for the Dataflow API. {@code dataflowEndpoint} can override this value
- * if it contains an absolute URL, otherwise {@code apiRootUrl} will be combined with
- * {@code dataflowEndpoint} to generate the full URL to communicate with the Dataflow API.
- */
- @Description("The root URL for the Dataflow API. dataflowEndpoint can override this "
- + "value if it contains an absolute URL, otherwise apiRootUrl will be combined with "
- + "dataflowEndpoint to generate the full URL to communicate with the Dataflow API.")
- @Default.String(Dataflow.DEFAULT_ROOT_URL)
- String getApiRootUrl();
- void setApiRootUrl(String value);
-
- /**
- * Dataflow endpoint to use.
- *
- * <p>Defaults to the current version of the Google Cloud Dataflow
- * API, at the time the current SDK version was released.
- *
- * <p>If the string contains "://", then this is treated as a URL,
- * otherwise {@link #getApiRootUrl()} is used as the root
- * URL.
- */
- @Description("The URL for the Dataflow API. If the string contains \"://\", this"
- + " will be treated as the entire URL, otherwise will be treated relative to apiRootUrl.")
- @Default.String(Dataflow.DEFAULT_SERVICE_PATH)
- String getDataflowEndpoint();
- void setDataflowEndpoint(String value);
-
- /**
- * The path to write the translated Dataflow job specification out to
- * at job submission time. The Dataflow job specification will be represented in JSON
- * format.
- */
- @Description("The path to write the translated Dataflow job specification out to "
- + "at job submission time. The Dataflow job specification will be represented in JSON "
- + "format.")
- String getDataflowJobFile();
- void setDataflowJobFile(String value);
-
- /**
- * The class of the validator that should be created and used to validate paths.
- * If pathValidator has not been set explicitly, an instance of this class will be
- * constructed and used as the path validator.
- */
- @Description("The class of the validator that should be created and used to validate paths. "
- + "If pathValidator has not been set explicitly, an instance of this class will be "
- + "constructed and used as the path validator.")
- @Default.Class(DataflowPathValidator.class)
- Class<? extends PathValidator> getPathValidatorClass();
- void setPathValidatorClass(Class<? extends PathValidator> validatorClass);
-
- /**
- * The path validator instance that should be used to validate paths.
- * If no path validator has been set explicitly, the default is to use the instance factory that
- * constructs a path validator based upon the currently set pathValidatorClass.
- */
- @JsonIgnore
- @Description("The path validator instance that should be used to validate paths. "
- + "If no path validator has been set explicitly, the default is to use the instance factory "
- + "that constructs a path validator based upon the currently set pathValidatorClass.")
- @Default.InstanceFactory(PathValidatorFactory.class)
- PathValidator getPathValidator();
- void setPathValidator(PathValidator validator);
-
- /**
- * The class responsible for staging resources to be accessible by workers
- * during job execution. If stager has not been set explicitly, an instance of this class
- * will be created and used as the resource stager.
- */
- @Description("The class of the stager that should be created and used to stage resources. "
- + "If stager has not been set explicitly, an instance of the this class will be created "
- + "and used as the resource stager.")
- @Default.Class(GcsStager.class)
- Class<? extends Stager> getStagerClass();
- void setStagerClass(Class<? extends Stager> stagerClass);
-
- /**
- * The resource stager instance that should be used to stage resources.
- * If no stager has been set explicitly, the default is to use the instance factory
- * that constructs a resource stager based upon the currently set stagerClass.
- */
- @JsonIgnore
- @Description("The resource stager instance that should be used to stage resources. "
- + "If no stager has been set explicitly, the default is to use the instance factory "
- + "that constructs a resource stager based upon the currently set stagerClass.")
- @Default.InstanceFactory(StagerFactory.class)
- Stager getStager();
- void setStager(Stager stager);
-
- /**
- * An instance of the Dataflow client. Defaults to creating a Dataflow client
- * using the current set of options.
- */
- @JsonIgnore
- @Description("An instance of the Dataflow client. Defaults to creating a Dataflow client "
- + "using the current set of options.")
- @Default.InstanceFactory(DataflowClientFactory.class)
- Dataflow getDataflowClient();
- void setDataflowClient(Dataflow value);
-
- /** Returns the default Dataflow client built from the passed in PipelineOptions. */
- public static class DataflowClientFactory implements DefaultValueFactory<Dataflow> {
- @Override
- public Dataflow create(PipelineOptions options) {
- return DataflowTransport.newDataflowClient(
- options.as(DataflowPipelineOptions.class)).build();
- }
- }
-
- /**
- * Whether to update the currently running pipeline with the same name as this one.
- *
- * @deprecated This property is replaced by {@link DataflowPipelineOptions#getUpdate()}
- */
- @Deprecated
- @Description("If set, replace the existing pipeline with the name specified by --jobName with "
- + "this pipeline, preserving state.")
- boolean getUpdate();
- @Deprecated
- void setUpdate(boolean value);
-
- /**
- * Mapping of old PTranform names to new ones, specified as JSON
- * <code>{"oldName":"newName",...}</code>. To mark a transform as deleted, make newName the
- * empty string.
- */
- @JsonIgnore
- @Description(
- "Mapping of old PTranform names to new ones, specified as JSON "
- + "{\"oldName\":\"newName\",...}. To mark a transform as deleted, make newName the empty "
- + "string.")
- Map<String, String> getTransformNameMapping();
- void setTransformNameMapping(Map<String, String> value);
-
- /**
- * Custom windmill_main binary to use with the streaming runner.
- */
- @Description("Custom windmill_main binary to use with the streaming runner")
- String getOverrideWindmillBinary();
- void setOverrideWindmillBinary(String value);
-
- /**
- * Number of threads to use on the Dataflow worker harness. If left unspecified,
- * the Dataflow service will compute an appropriate number of threads to use.
- */
- @Description("Number of threads to use on the Dataflow worker harness. If left unspecified, "
- + "the Dataflow service will compute an appropriate number of threads to use.")
- int getNumberOfWorkerHarnessThreads();
- void setNumberOfWorkerHarnessThreads(int value);
-
- /**
- * If {@literal true}, save a heap dump before killing a thread or process which is GC
- * thrashing or out of memory. The location of the heap file will either be echoed back
- * to the user, or the user will be given the opportunity to download the heap file.
- *
- * <p>
- * CAUTION: Heap dumps can of comparable size to the default boot disk. Consider increasing
- * the boot disk size before setting this flag to true.
- */
- @Description("If {@literal true}, save a heap dump before killing a thread or process "
- + "which is GC thrashing or out of memory.")
- boolean getDumpHeapOnOOM();
- void setDumpHeapOnOOM(boolean dumpHeapBeforeExit);
-
- /**
- * Creates a {@link PathValidator} object using the class specified in
- * {@link #getPathValidatorClass()}.
- */
- public static class PathValidatorFactory implements DefaultValueFactory<PathValidator> {
- @Override
- public PathValidator create(PipelineOptions options) {
- DataflowPipelineDebugOptions debugOptions = options.as(DataflowPipelineDebugOptions.class);
- return InstanceBuilder.ofType(PathValidator.class)
- .fromClass(debugOptions.getPathValidatorClass())
- .fromFactoryMethod("fromOptions")
- .withArg(PipelineOptions.class, options)
- .build();
- }
- }
-
- /**
- * Creates a {@link Stager} object using the class specified in
- * {@link #getStagerClass()}.
- */
- public static class StagerFactory implements DefaultValueFactory<Stager> {
- @Override
- public Stager create(PipelineOptions options) {
- DataflowPipelineDebugOptions debugOptions = options.as(DataflowPipelineDebugOptions.class);
- return InstanceBuilder.ofType(Stager.class)
- .fromClass(debugOptions.getStagerClass())
- .fromFactoryMethod("fromOptions")
- .withArg(PipelineOptions.class, options)
- .build();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineOptions.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineOptions.java
deleted file mode 100644
index dbfafd1..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineOptions.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.options;
-
-import com.google.cloud.dataflow.sdk.runners.DataflowPipeline;
-import com.google.common.base.MoreObjects;
-
-import org.joda.time.DateTimeUtils;
-import org.joda.time.DateTimeZone;
-import org.joda.time.format.DateTimeFormat;
-import org.joda.time.format.DateTimeFormatter;
-
-/**
- * Options that can be used to configure the {@link DataflowPipeline}.
- */
-@Description("Options that configure the Dataflow pipeline.")
-public interface DataflowPipelineOptions extends
- PipelineOptions, GcpOptions, ApplicationNameOptions, DataflowPipelineDebugOptions,
- DataflowPipelineWorkerPoolOptions, BigQueryOptions,
- GcsOptions, StreamingOptions, CloudDebuggerOptions, DataflowWorkerLoggingOptions,
- DataflowProfilingOptions, PubsubOptions {
-
- @Description("Project id. Required when running a Dataflow in the cloud. "
- + "See https://cloud.google.com/storage/docs/projects for further details.")
- @Override
- @Validation.Required
- @Default.InstanceFactory(DefaultProjectFactory.class)
- String getProject();
- @Override
- void setProject(String value);
-
- /**
- * GCS path for staging local files, e.g. gs://bucket/object
- *
- * <p>Must be a valid Cloud Storage URL, beginning with the prefix "gs://"
- *
- * <p>At least one of {@link PipelineOptions#getTempLocation()} or {@link #getStagingLocation()}
- * must be set. If {@link #getStagingLocation()} is not set, then the Dataflow
- * pipeline defaults to using {@link PipelineOptions#getTempLocation()}.
- */
- @Description("GCS path for staging local files, e.g. \"gs://bucket/object\". "
- + "Must be a valid Cloud Storage URL, beginning with the prefix \"gs://\". "
- + "At least one of stagingLocation or tempLocation must be set. If stagingLocation is unset, "
- + "defaults to using tempLocation.")
- String getStagingLocation();
- void setStagingLocation(String value);
-
- /**
- * The Dataflow job name is used as an idempotence key within the Dataflow service.
- * If there is an existing job that is currently active, another active job with the same
- * name will not be able to be created. Defaults to using the ApplicationName-UserName-Date.
- */
- @Description("The Dataflow job name is used as an idempotence key within the Dataflow service. "
- + "If there is an existing job that is currently active, another active job with the same "
- + "name will not be able to be created. Defaults to using the ApplicationName-UserName-Date.")
- @Default.InstanceFactory(JobNameFactory.class)
- String getJobName();
- void setJobName(String value);
-
- /**
- * Whether to update the currently running pipeline with the same name as this one.
- */
- @Override
- @SuppressWarnings("deprecation") // base class member deprecated in favor of this one.
- @Description(
- "If set, replace the existing pipeline with the name specified by --jobName with "
- + "this pipeline, preserving state.")
- boolean getUpdate();
- @Override
- @SuppressWarnings("deprecation") // base class member deprecated in favor of this one.
- void setUpdate(boolean value);
-
- /**
- * Returns a normalized job name constructed from {@link ApplicationNameOptions#getAppName()}, the
- * local system user name (if available), and the current time. The normalization makes sure that
- * the job name matches the required pattern of [a-z]([-a-z0-9]*[a-z0-9])? and length limit of 40
- * characters.
- *
- * <p>This job name factory is only able to generate one unique name per second per application
- * and user combination.
- */
- public static class JobNameFactory implements DefaultValueFactory<String> {
- private static final DateTimeFormatter FORMATTER =
- DateTimeFormat.forPattern("MMddHHmmss").withZone(DateTimeZone.UTC);
-
- @Override
- public String create(PipelineOptions options) {
- String appName = options.as(ApplicationNameOptions.class).getAppName();
- String normalizedAppName = appName == null || appName.length() == 0 ? "dataflow"
- : appName.toLowerCase()
- .replaceAll("[^a-z0-9]", "0")
- .replaceAll("^[^a-z]", "a");
- String userName = MoreObjects.firstNonNull(System.getProperty("user.name"), "");
- String normalizedUserName = userName.toLowerCase()
- .replaceAll("[^a-z0-9]", "0");
- String datePart = FORMATTER.print(DateTimeUtils.currentTimeMillis());
- return normalizedAppName + "-" + normalizedUserName + "-" + datePart;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineWorkerPoolOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineWorkerPoolOptions.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineWorkerPoolOptions.java
deleted file mode 100644
index 0c6428f..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineWorkerPoolOptions.java
+++ /dev/null
@@ -1,258 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.options;
-
-import com.google.cloud.dataflow.sdk.annotations.Experimental;
-import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
-
-import com.fasterxml.jackson.annotation.JsonIgnore;
-
-import java.util.List;
-
-/**
- * Options that are used to configure the Dataflow pipeline worker pool.
- */
-@Description("Options that are used to configure the Dataflow pipeline worker pool.")
-public interface DataflowPipelineWorkerPoolOptions extends PipelineOptions {
- /**
- * Number of workers to use when executing the Dataflow job. Note that selection of an autoscaling
- * algorithm other then {@code NONE} will affect the size of the worker pool. If left unspecified,
- * the Dataflow service will determine the number of workers.
- */
- @Description("Number of workers to use when executing the Dataflow job. Note that "
- + "selection of an autoscaling algorithm other then \"NONE\" will affect the "
- + "size of the worker pool. If left unspecified, the Dataflow service will "
- + "determine the number of workers.")
- int getNumWorkers();
- void setNumWorkers(int value);
-
- /**
- * Type of autoscaling algorithm to use.
- */
- @Experimental(Experimental.Kind.AUTOSCALING)
- public enum AutoscalingAlgorithmType {
- /** Use numWorkers machines. Do not autoscale the worker pool. */
- NONE("AUTOSCALING_ALGORITHM_NONE"),
-
- @Deprecated
- BASIC("AUTOSCALING_ALGORITHM_BASIC"),
-
- /** Autoscale the workerpool based on throughput (up to maxNumWorkers). */
- THROUGHPUT_BASED("AUTOSCALING_ALGORITHM_BASIC");
-
- private final String algorithm;
-
- private AutoscalingAlgorithmType(String algorithm) {
- this.algorithm = algorithm;
- }
-
- /** Returns the string representation of this type. */
- public String getAlgorithm() {
- return this.algorithm;
- }
- }
-
- /**
- * [Experimental] The autoscaling algorithm to use for the workerpool.
- *
- * <ul>
- * <li>NONE: does not change the size of the worker pool.</li>
- * <li>BASIC: autoscale the worker pool size up to maxNumWorkers until the job completes.</li>
- * <li>THROUGHPUT_BASED: autoscale the workerpool based on throughput (up to maxNumWorkers).
- * </li>
- * </ul>
- */
- @Description("[Experimental] The autoscaling algorithm to use for the workerpool. "
- + "NONE: does not change the size of the worker pool. "
- + "BASIC (deprecated): autoscale the worker pool size up to maxNumWorkers until the job "
- + "completes. "
- + "THROUGHPUT_BASED: autoscale the workerpool based on throughput (up to maxNumWorkers).")
- @Experimental(Experimental.Kind.AUTOSCALING)
- AutoscalingAlgorithmType getAutoscalingAlgorithm();
- void setAutoscalingAlgorithm(AutoscalingAlgorithmType value);
-
- /**
- * The maximum number of workers to use for the workerpool. This options limits the size of the
- * workerpool for the lifetime of the job, including
- * <a href="https://cloud.google.com/dataflow/pipelines/updating-a-pipeline">pipeline updates</a>.
- * If left unspecified, the Dataflow service will compute a ceiling.
- */
- @Description("The maximum number of workers to use for the workerpool. This options limits the "
- + "size of the workerpool for the lifetime of the job, including pipeline updates. "
- + "If left unspecified, the Dataflow service will compute a ceiling.")
- int getMaxNumWorkers();
- void setMaxNumWorkers(int value);
-
- /**
- * Remote worker disk size, in gigabytes, or 0 to use the default size.
- */
- @Description("Remote worker disk size, in gigabytes, or 0 to use the default size.")
- int getDiskSizeGb();
- void setDiskSizeGb(int value);
-
- /**
- * Docker container image that executes Dataflow worker harness, residing in Google Container
- * Registry.
- */
- @Default.InstanceFactory(WorkerHarnessContainerImageFactory.class)
- @Description("Docker container image that executes Dataflow worker harness, residing in Google "
- + " Container Registry.")
- @Hidden
- String getWorkerHarnessContainerImage();
- void setWorkerHarnessContainerImage(String value);
-
- /**
- * Returns the default Docker container image that executes Dataflow worker harness, residing in
- * Google Container Registry.
- */
- public static class WorkerHarnessContainerImageFactory
- implements DefaultValueFactory<String> {
- @Override
- public String create(PipelineOptions options) {
- DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
- if (dataflowOptions.isStreaming()) {
- return DataflowPipelineRunner.STREAMING_WORKER_HARNESS_CONTAINER_IMAGE;
- } else {
- return DataflowPipelineRunner.BATCH_WORKER_HARNESS_CONTAINER_IMAGE;
- }
- }
- }
-
- /**
- * GCE <a href="https://cloud.google.com/compute/docs/networking">network</a> for launching
- * workers.
- *
- * <p>Default is up to the Dataflow service.
- */
- @Description("GCE network for launching workers. For more information, see the reference "
- + "documentation https://cloud.google.com/compute/docs/networking. "
- + "Default is up to the Dataflow service.")
- String getNetwork();
- void setNetwork(String value);
-
- /**
- * GCE <a href="https://cloud.google.com/compute/docs/networking">subnetwork</a> for launching
- * workers.
- *
- * <p>Default is up to the Dataflow service. Expected format is
- * regions/REGION/subnetworks/SUBNETWORK.
- *
- * <p>You may also need to specify network option.
- */
- @Description("GCE subnetwork for launching workers. For more information, see the reference "
- + "documentation https://cloud.google.com/compute/docs/networking. "
- + "Default is up to the Dataflow service.")
- String getSubnetwork();
- void setSubnetwork(String value);
-
- /**
- * GCE <a href="https://developers.google.com/compute/docs/zones"
- * >availability zone</a> for launching workers.
- *
- * <p>Default is up to the Dataflow service.
- */
- @Description("GCE availability zone for launching workers. See "
- + "https://developers.google.com/compute/docs/zones for a list of valid options. "
- + "Default is up to the Dataflow service.")
- String getZone();
- void setZone(String value);
-
- /**
- * Machine type to create Dataflow worker VMs as.
- *
- * <p>See <a href="https://cloud.google.com/compute/docs/machine-types">GCE machine types</a>
- * for a list of valid options.
- *
- * <p>If unset, the Dataflow service will choose a reasonable default.
- */
- @Description("Machine type to create Dataflow worker VMs as. See "
- + "https://cloud.google.com/compute/docs/machine-types for a list of valid options. "
- + "If unset, the Dataflow service will choose a reasonable default.")
- String getWorkerMachineType();
- void setWorkerMachineType(String value);
-
- /**
- * The policy for tearing down the workers spun up by the service.
- */
- public enum TeardownPolicy {
- /**
- * All VMs created for a Dataflow job are deleted when the job finishes, regardless of whether
- * it fails or succeeds.
- */
- TEARDOWN_ALWAYS("TEARDOWN_ALWAYS"),
- /**
- * All VMs created for a Dataflow job are left running when the job finishes, regardless of
- * whether it fails or succeeds.
- */
- TEARDOWN_NEVER("TEARDOWN_NEVER"),
- /**
- * All VMs created for a Dataflow job are deleted when the job succeeds, but are left running
- * when it fails. (This is typically used for debugging failing jobs by SSHing into the
- * workers.)
- */
- TEARDOWN_ON_SUCCESS("TEARDOWN_ON_SUCCESS");
-
- private final String teardownPolicy;
-
- private TeardownPolicy(String teardownPolicy) {
- this.teardownPolicy = teardownPolicy;
- }
-
- public String getTeardownPolicyName() {
- return this.teardownPolicy;
- }
- }
-
- /**
- * The teardown policy for the VMs.
- *
- * <p>If unset, the Dataflow service will choose a reasonable default.
- */
- @Description("The teardown policy for the VMs. If unset, the Dataflow service will "
- + "choose a reasonable default.")
- TeardownPolicy getTeardownPolicy();
- void setTeardownPolicy(TeardownPolicy value);
-
- /**
- * List of local files to make available to workers.
- *
- * <p>Files are placed on the worker's classpath.
- *
- * <p>The default value is the list of jars from the main program's classpath.
- */
- @Description("Files to stage on GCS and make available to workers. "
- + "Files are placed on the worker's classpath. "
- + "The default value is all files from the classpath.")
- @JsonIgnore
- List<String> getFilesToStage();
- void setFilesToStage(List<String> value);
-
- /**
- * Specifies what type of persistent disk should be used. The value should be a full or partial
- * URL of a disk type resource, e.g., zones/us-central1-f/disks/pd-standard. For
- * more information, see the
- * <a href="https://cloud.google.com/compute/docs/reference/latest/diskTypes">API reference
- * documentation for DiskTypes</a>.
- */
- @Description("Specifies what type of persistent disk should be used. The value should be a full "
- + "or partial URL of a disk type resource, e.g., zones/us-central1-f/disks/pd-standard. For "
- + "more information, see the API reference documentation for DiskTypes: "
- + "https://cloud.google.com/compute/docs/reference/latest/diskTypes")
- String getWorkerDiskType();
- void setWorkerDiskType(String value);
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowProfilingOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowProfilingOptions.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowProfilingOptions.java
deleted file mode 100644
index f14b04d..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowProfilingOptions.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.options;
-
-import com.google.cloud.dataflow.sdk.annotations.Experimental;
-
-import java.util.HashMap;
-
-/**
- * Options for controlling profiling of pipeline execution.
- */
-@Description("[Experimental] Used to configure profiling of the Dataflow pipeline")
-@Experimental
-@Hidden
-public interface DataflowProfilingOptions {
-
- @Description("Whether to periodically dump profiling information to local disk.\n"
- + "WARNING: Enabling this option may fill local disk with profiling information.")
- boolean getEnableProfilingAgent();
- void setEnableProfilingAgent(boolean enabled);
-
- @Description(
- "[INTERNAL] Additional configuration for the profiling agent. Not typically necessary.")
- @Hidden
- DataflowProfilingAgentConfiguration getProfilingAgentConfiguration();
- void setProfilingAgentConfiguration(DataflowProfilingAgentConfiguration configuration);
-
- /**
- * Configuration the for profiling agent.
- */
- public static class DataflowProfilingAgentConfiguration extends HashMap<String, Object> {
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowWorkerHarnessOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowWorkerHarnessOptions.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowWorkerHarnessOptions.java
deleted file mode 100644
index 7705b66..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowWorkerHarnessOptions.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.options;
-
-/**
- * Options that are used exclusively within the Dataflow worker harness.
- * These options have no effect at pipeline creation time.
- */
-@Description("[Internal] Options that are used exclusively within the Dataflow worker harness. "
- + "These options have no effect at pipeline creation time.")
-@Hidden
-public interface DataflowWorkerHarnessOptions extends DataflowPipelineOptions {
- /**
- * The identity of the worker running this pipeline.
- */
- @Description("The identity of the worker running this pipeline.")
- String getWorkerId();
- void setWorkerId(String value);
-
- /**
- * The identity of the Dataflow job.
- */
- @Description("The identity of the Dataflow job.")
- String getJobId();
- void setJobId(String value);
-
- /**
- * The size of the worker's in-memory cache, in megabytes.
- *
- * <p>Currently, this cache is used for storing read values of side inputs.
- */
- @Description("The size of the worker's in-memory cache, in megabytes.")
- @Default.Integer(100)
- Integer getWorkerCacheMb();
- void setWorkerCacheMb(Integer value);
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowWorkerLoggingOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowWorkerLoggingOptions.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowWorkerLoggingOptions.java
deleted file mode 100644
index ebd42d9..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowWorkerLoggingOptions.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.options;
-
-import com.google.common.base.Preconditions;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Options that are used to control logging configuration on the Dataflow worker.
- */
-@Description("Options that are used to control logging configuration on the Dataflow worker.")
-public interface DataflowWorkerLoggingOptions extends PipelineOptions {
- /**
- * The set of log levels that can be used on the Dataflow worker.
- */
- public enum Level {
- DEBUG, ERROR, INFO, TRACE, WARN
- }
-
- /**
- * This option controls the default log level of all loggers without a log level override.
- */
- @Description("Controls the default log level of all loggers without a log level override.")
- @Default.Enum("INFO")
- Level getDefaultWorkerLogLevel();
- void setDefaultWorkerLogLevel(Level level);
-
- /**
- * This option controls the log levels for specifically named loggers.
- *
- * <p>Later options with equivalent names override earlier options.
- *
- * <p>See {@link WorkerLogLevelOverrides} for more information on how to configure logging
- * on a per {@link Class}, {@link Package}, or name basis. If used from the command line,
- * the expected format is {"Name":"Level",...}, further details on
- * {@link WorkerLogLevelOverrides#from}.
- */
- @Description("This option controls the log levels for specifically named loggers. "
- + "The expected format is {\"Name\":\"Level\",...}. The Dataflow worker uses "
- + "java.util.logging, which supports a logging hierarchy based off of names that are '.' "
- + "separated. For example, by specifying the value {\"a.b.c.Foo\":\"DEBUG\"}, the logger "
- + "for the class 'a.b.c.Foo' will be configured to output logs at the DEBUG level. "
- + "Similarly, by specifying the value {\"a.b.c\":\"WARN\"}, all loggers underneath the "
- + "'a.b.c' package will be configured to output logs at the WARN level. Also, note that "
- + "when multiple overrides are specified, the exact name followed by the closest parent "
- + "takes precedence.")
- WorkerLogLevelOverrides getWorkerLogLevelOverrides();
- void setWorkerLogLevelOverrides(WorkerLogLevelOverrides value);
-
- /**
- * Defines a log level override for a specific class, package, or name.
- *
- * <p>{@code java.util.logging} is used on the Dataflow worker harness and supports
- * a logging hierarchy based off of names that are "." separated. It is a common
- * pattern to have the logger for a given class share the same name as the class itself.
- * Given the classes {@code a.b.c.Foo}, {@code a.b.c.Xyz}, and {@code a.b.Bar}, with
- * loggers named {@code "a.b.c.Foo"}, {@code "a.b.c.Xyz"}, and {@code "a.b.Bar"} respectively,
- * we can override the log levels:
- * <ul>
- * <li>for {@code Foo} by specifying the name {@code "a.b.c.Foo"} or the {@link Class}
- * representing {@code a.b.c.Foo}.
- * <li>for {@code Foo}, {@code Xyz}, and {@code Bar} by specifying the name {@code "a.b"} or
- * the {@link Package} representing {@code a.b}.
- * <li>for {@code Foo} and {@code Bar} by specifying both of their names or classes.
- * </ul>
- * Note that by specifying multiple overrides, the exact name followed by the closest parent
- * takes precedence.
- */
- public static class WorkerLogLevelOverrides extends HashMap<String, Level> {
- /**
- * Overrides the default log level for the passed in class.
- *
- * <p>This is equivalent to calling
- * {@link #addOverrideForName(String, DataflowWorkerLoggingOptions.Level)}
- * and passing in the {@link Class#getName() class name}.
- */
- public WorkerLogLevelOverrides addOverrideForClass(Class<?> klass, Level level) {
- Preconditions.checkNotNull(klass, "Expected class to be not null.");
- addOverrideForName(klass.getName(), level);
- return this;
- }
-
- /**
- * Overrides the default log level for the passed in package.
- *
- * <p>This is equivalent to calling
- * {@link #addOverrideForName(String, DataflowWorkerLoggingOptions.Level)}
- * and passing in the {@link Package#getName() package name}.
- */
- public WorkerLogLevelOverrides addOverrideForPackage(Package pkg, Level level) {
- Preconditions.checkNotNull(pkg, "Expected package to be not null.");
- addOverrideForName(pkg.getName(), level);
- return this;
- }
-
- /**
- * Overrides the default log level for the passed in name.
- *
- * <p>Note that because of the hierarchical nature of logger names, this will
- * override the log level of all loggers that have the passed in name or
- * a parent logger that has the passed in name.
- */
- public WorkerLogLevelOverrides addOverrideForName(String name, Level level) {
- Preconditions.checkNotNull(name, "Expected name to be not null.");
- Preconditions.checkNotNull(level,
- "Expected level to be one of %s.", Arrays.toString(Level.values()));
- put(name, level);
- return this;
- }
-
- /**
- * Expects a map keyed by logger {@code Name}s with values representing {@code Level}s.
- * The {@code Name} generally represents the fully qualified Java
- * {@link Class#getName() class name}, or fully qualified Java
- * {@link Package#getName() package name}, or custom logger name. The {@code Level}
- * represents the log level and must be one of {@link Level}.
- */
- @JsonCreator
- public static WorkerLogLevelOverrides from(Map<String, String> values) {
- Preconditions.checkNotNull(values, "Expected values to be not null.");
- WorkerLogLevelOverrides overrides = new WorkerLogLevelOverrides();
- for (Map.Entry<String, String> entry : values.entrySet()) {
- try {
- overrides.addOverrideForName(entry.getKey(), Level.valueOf(entry.getValue()));
- } catch (IllegalArgumentException e) {
- throw new IllegalArgumentException(String.format(
- "Unsupported log level '%s' requested for %s. Must be one of %s.",
- entry.getValue(), entry.getKey(), Arrays.toString(Level.values())));
- }
-
- }
- return overrides;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/BlockingDataflowPipelineRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/BlockingDataflowPipelineRunner.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/BlockingDataflowPipelineRunner.java
deleted file mode 100644
index 8903181..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/BlockingDataflowPipelineRunner.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.runners;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.PipelineResult.State;
-import com.google.cloud.dataflow.sdk.annotations.Experimental;
-import com.google.cloud.dataflow.sdk.options.BlockingDataflowPipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsValidator;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.util.MonitoringUtil;
-import com.google.cloud.dataflow.sdk.values.PInput;
-import com.google.cloud.dataflow.sdk.values.POutput;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-
-import javax.annotation.Nullable;
-
-/**
- * A {@link PipelineRunner} that's like {@link DataflowPipelineRunner}
- * but that waits for the launched job to finish.
- *
- * <p>Prints out job status updates and console messages while it waits.
- *
- * <p>Returns the final job state, or throws an exception if the job
- * fails or cannot be monitored.
- *
- * <p><h3>Permissions</h3>
- * When reading from a Dataflow source or writing to a Dataflow sink using
- * {@code BlockingDataflowPipelineRunner}, the Google cloud services account and the Google compute
- * engine service account of the GCP project running the Dataflow Job will need access to the
- * corresponding source/sink.
- *
- * <p>Please see <a href="https://cloud.google.com/dataflow/security-and-permissions">Google Cloud
- * Dataflow Security and Permissions</a> for more details.
- */
-public class BlockingDataflowPipelineRunner extends
- PipelineRunner<DataflowPipelineJob> {
- private static final Logger LOG = LoggerFactory.getLogger(BlockingDataflowPipelineRunner.class);
-
- // Defaults to an infinite wait period.
- // TODO: make this configurable after removal of option map.
- private static final long BUILTIN_JOB_TIMEOUT_SEC = -1L;
-
- private final DataflowPipelineRunner dataflowPipelineRunner;
- private final BlockingDataflowPipelineOptions options;
-
- protected BlockingDataflowPipelineRunner(
- DataflowPipelineRunner internalRunner,
- BlockingDataflowPipelineOptions options) {
- this.dataflowPipelineRunner = internalRunner;
- this.options = options;
- }
-
- /**
- * Constructs a runner from the provided options.
- */
- public static BlockingDataflowPipelineRunner fromOptions(
- PipelineOptions options) {
- BlockingDataflowPipelineOptions dataflowOptions =
- PipelineOptionsValidator.validate(BlockingDataflowPipelineOptions.class, options);
- DataflowPipelineRunner dataflowPipelineRunner =
- DataflowPipelineRunner.fromOptions(dataflowOptions);
-
- return new BlockingDataflowPipelineRunner(dataflowPipelineRunner, dataflowOptions);
- }
-
- /**
- * {@inheritDoc}
- *
- * @throws DataflowJobExecutionException if there is an exception during job execution.
- * @throws DataflowServiceException if there is an exception retrieving information about the job.
- */
- @Override
- public DataflowPipelineJob run(Pipeline p) {
- final DataflowPipelineJob job = dataflowPipelineRunner.run(p);
-
- // We ignore the potential race condition here (Ctrl-C after job submission but before the
- // shutdown hook is registered). Even if we tried to do something smarter (eg., SettableFuture)
- // the run method (which produces the job) could fail or be Ctrl-C'd before it had returned a
- // job. The display of the command to cancel the job is best-effort anyways -- RPC's could fail,
- // etc. If the user wants to verify the job was cancelled they should look at the job status.
- Thread shutdownHook = new Thread() {
- @Override
- public void run() {
- LOG.warn("Job is already running in Google Cloud Platform, Ctrl-C will not cancel it.\n"
- + "To cancel the job in the cloud, run:\n> {}",
- MonitoringUtil.getGcloudCancelCommand(options, job.getJobId()));
- }
- };
-
- try {
- Runtime.getRuntime().addShutdownHook(shutdownHook);
-
- @Nullable
- State result;
- try {
- result = job.waitToFinish(
- BUILTIN_JOB_TIMEOUT_SEC, TimeUnit.SECONDS,
- new MonitoringUtil.PrintHandler(options.getJobMessageOutput()));
- } catch (IOException | InterruptedException ex) {
- if (ex instanceof InterruptedException) {
- Thread.currentThread().interrupt();
- }
- LOG.debug("Exception caught while retrieving status for job {}", job.getJobId(), ex);
- throw new DataflowServiceException(
- job, "Exception caught while retrieving status for job " + job.getJobId(), ex);
- }
-
- if (result == null) {
- throw new DataflowServiceException(
- job, "Timed out while retrieving status for job " + job.getJobId());
- }
-
- LOG.info("Job finished with status {}", result);
- if (!result.isTerminal()) {
- throw new IllegalStateException("Expected terminal state for job " + job.getJobId()
- + ", got " + result);
- }
-
- if (result == State.DONE) {
- return job;
- } else if (result == State.UPDATED) {
- DataflowPipelineJob newJob = job.getReplacedByJob();
- LOG.info("Job {} has been updated and is running as the new job with id {}."
- + "To access the updated job on the Dataflow monitoring console, please navigate to {}",
- job.getJobId(),
- newJob.getJobId(),
- MonitoringUtil.getJobMonitoringPageURL(newJob.getProjectId(), newJob.getJobId()));
- throw new DataflowJobUpdatedException(
- job,
- String.format("Job %s updated; new job is %s.", job.getJobId(), newJob.getJobId()),
- newJob);
- } else if (result == State.CANCELLED) {
- String message = String.format("Job %s cancelled by user", job.getJobId());
- LOG.info(message);
- throw new DataflowJobCancelledException(job, message);
- } else {
- throw new DataflowJobExecutionException(job, "Job " + job.getJobId()
- + " failed with status " + result);
- }
- } finally {
- Runtime.getRuntime().removeShutdownHook(shutdownHook);
- }
- }
-
- @Override
- public <OutputT extends POutput, InputT extends PInput> OutputT apply(
- PTransform<InputT, OutputT> transform, InputT input) {
- return dataflowPipelineRunner.apply(transform, input);
- }
-
- /**
- * Sets callbacks to invoke during execution. See {@link DataflowPipelineRunnerHooks}.
- */
- @Experimental
- public void setHooks(DataflowPipelineRunnerHooks hooks) {
- this.dataflowPipelineRunner.setHooks(hooks);
- }
-
- @Override
- public String toString() {
- return "BlockingDataflowPipelineRunner#" + options.getJobName();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobAlreadyExistsException.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobAlreadyExistsException.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobAlreadyExistsException.java
deleted file mode 100644
index 4a4f100..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobAlreadyExistsException.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.runners;
-
-/**
- * An exception that is thrown if the unique job name constraint of the Dataflow
- * service is broken because an existing job with the same job name is currently active.
- * The {@link DataflowPipelineJob} contained within this exception contains information
- * about the pre-existing job.
- */
-public class DataflowJobAlreadyExistsException extends DataflowJobException {
- /**
- * Create a new {@code DataflowJobAlreadyExistsException} with the specified {@link
- * DataflowPipelineJob} and message.
- */
- public DataflowJobAlreadyExistsException(
- DataflowPipelineJob job, String message) {
- super(job, message, null);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobAlreadyUpdatedException.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobAlreadyUpdatedException.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobAlreadyUpdatedException.java
deleted file mode 100644
index 1f52c6a..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobAlreadyUpdatedException.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.runners;
-
-/**
- * An exception that is thrown if the existing job has already been updated within the Dataflow
- * service and is no longer able to be updated. The {@link DataflowPipelineJob} contained within
- * this exception contains information about the pre-existing updated job.
- */
-public class DataflowJobAlreadyUpdatedException extends DataflowJobException {
- /**
- * Create a new {@code DataflowJobAlreadyUpdatedException} with the specified {@link
- * DataflowPipelineJob} and message.
- */
- public DataflowJobAlreadyUpdatedException(
- DataflowPipelineJob job, String message) {
- super(job, message, null);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobCancelledException.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobCancelledException.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobCancelledException.java
deleted file mode 100644
index 495ca5a..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobCancelledException.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.runners;
-
-/**
- * Signals that a job run by a {@link BlockingDataflowPipelineRunner} was updated during execution.
- */
-public class DataflowJobCancelledException extends DataflowJobException {
- /**
- * Create a new {@code DataflowJobAlreadyUpdatedException} with the specified {@link
- * DataflowPipelineJob} and message.
- */
- public DataflowJobCancelledException(DataflowPipelineJob job, String message) {
- super(job, message, null);
- }
-
- /**
- * Create a new {@code DataflowJobAlreadyUpdatedException} with the specified {@link
- * DataflowPipelineJob}, message, and cause.
- */
- public DataflowJobCancelledException(DataflowPipelineJob job, String message, Throwable cause) {
- super(job, message, cause);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobException.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobException.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobException.java
deleted file mode 100644
index a22d13c..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowJobException.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.runners;
-
-import java.util.Objects;
-
-import javax.annotation.Nullable;
-
-/**
- * A {@link RuntimeException} that contains information about a {@link DataflowPipelineJob}.
- */
-public abstract class DataflowJobException extends RuntimeException {
- private final DataflowPipelineJob job;
-
- DataflowJobException(DataflowPipelineJob job, String message, @Nullable Throwable cause) {
- super(message, cause);
- this.job = Objects.requireNonNull(job);
- }
-
- /**
- * Returns the failed job.
- */
- public DataflowPipelineJob getJob() {
- return job;
- }
-}