You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bo...@apache.org on 2021/04/23 01:22:05 UTC
[beam] branch master updated: add sdkContainerImage to Java
WorkerPool PipelineOptions
This is an automated email from the ASF dual-hosted git repository.
boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 2300f45 add sdkContainerImage to Java WorkerPool PipelineOptions
new 018f5e7 Merge pull request #14575 from [BEAM-12212] Adds --sdkContainerImage as new Java Dataflow PipelineOption
2300f45 is described below
commit 2300f4547bbdc5c0e1cdcafa537d1a605bedc522
Author: Emily Ye <em...@google.com>
AuthorDate: Sun Apr 18 23:58:24 2021 -0700
add sdkContainerImage to Java WorkerPool PipelineOptions
---
.../org/apache/beam/gradle/BeamModulePlugin.groovy | 4 +-
runners/google-cloud-dataflow-java/build.gradle | 3 +-
.../examples-streaming/build.gradle | 4 +-
.../beam/runners/dataflow/DataflowRunner.java | 85 ++++++++++++++++++----
.../beam/runners/dataflow/DataflowRunnerInfo.java | 10 ++-
.../options/DataflowPipelineWorkerPoolOptions.java | 38 +++++-----
.../beam/runners/dataflow/dataflow.properties | 1 +
.../dataflow/DataflowPipelineTranslatorTest.java | 46 ++++++++++--
.../runners/dataflow/DataflowRunnerInfoTest.java | 15 +++-
.../beam/runners/dataflow/DataflowRunnerTest.java | 68 ++++++++++++++---
10 files changed, 214 insertions(+), 60 deletions(-)
diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index 465061d..4c4d41b 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -1583,13 +1583,15 @@ class BeamModulePlugin implements Plugin<Project> {
if (pipelineOptionsString.contains('use_runner_v2')) {
def dockerImageName = project.project(':runners:google-cloud-dataflow-java').ext.dockerImageName
allOptionsList.addAll([
- "--workerHarnessContainerImage=${dockerImageName}",
+ "--sdkContainerImage=${dockerImageName}",
"--region=${dataflowRegion}"
])
} else {
def dataflowWorkerJar = project.findProperty('dataflowWorkerJar') ?:
project.project(":runners:google-cloud-dataflow-java:worker:legacy-worker").shadowJar.archivePath
allOptionsList.addAll([
+ // Keep as legacy flag to ensure via test this flag works for
+ // legacy pipeline.
'--workerHarnessContainerImage=',
"--dataflowWorkerJar=${dataflowWorkerJar}",
"--region=${dataflowRegion}"
diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle
index 80941db..a3db5a2 100644
--- a/runners/google-cloud-dataflow-java/build.gradle
+++ b/runners/google-cloud-dataflow-java/build.gradle
@@ -46,6 +46,7 @@ processResources {
'dataflow.legacy_environment_major_version' : '8',
'dataflow.fnapi_environment_major_version' : '8',
'dataflow.container_version' : 'beam-master-20210419',
+ 'dataflow.container_base_repository' : 'gcr.io/cloud-dataflow/v1beta3',
]
}
@@ -147,7 +148,7 @@ def runnerV2PipelineOptions = [
"--project=${dataflowProject}",
"--region=${dataflowRegion}",
"--tempRoot=${dataflowValidatesTempRoot}",
- "--workerHarnessContainerImage=${dockerImageContainer}:${dockerTag}",
+ "--sdkContainerImage=${dockerImageContainer}:${dockerTag}",
// TODO(BEAM-11779) remove shuffle_mode=appliance with runner v2 once issue is resolved.
"--experiments=beam_fn_api,use_unified_worker,use_runner_v2,shuffle_mode=appliance",
]
diff --git a/runners/google-cloud-dataflow-java/examples-streaming/build.gradle b/runners/google-cloud-dataflow-java/examples-streaming/build.gradle
index fd8705d..bc5f584d 100644
--- a/runners/google-cloud-dataflow-java/examples-streaming/build.gradle
+++ b/runners/google-cloud-dataflow-java/examples-streaming/build.gradle
@@ -40,8 +40,8 @@ task windmillPreCommit(type: Test) {
dependsOn ":runners:google-cloud-dataflow-java:worker:legacy-worker:shadowJar"
def dataflowWorkerJar = project.findProperty('dataflowWorkerJar') ?: project(":runners:google-cloud-dataflow-java:worker:legacy-worker").shadowJar.archivePath
- // Set workerHarnessContainerImage to empty to make Dataflow pick up the non-versioned container
- // image, which handles a staged worker jar.
+ // Set workerHarnessContainerImage to empty to make Dataflow pick up the
+ // non-versioned container image, which handles a staged worker jar.
def preCommitBeamTestPipelineOptions = [
"--project=${gcpProject}",
"--region=${gcpRegion}",
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 42d463e..cae1cf2 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -269,7 +269,8 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
"Missing required pipeline options: " + Joiner.on(',').join(missing));
}
- validateWorkerSettings(PipelineOptionsValidator.validate(GcpOptions.class, options));
+ validateWorkerSettings(
+ PipelineOptionsValidator.validate(DataflowPipelineWorkerPoolOptions.class, options));
PathValidator validator = dataflowOptions.getPathValidator();
String gcpTempLocation;
@@ -401,8 +402,39 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
return Strings.isNullOrEmpty(endpoint) || Pattern.matches(ENDPOINT_REGEXP, endpoint);
}
+ static void validateSdkContainerImageOptions(DataflowPipelineWorkerPoolOptions workerOptions) {
+ // Check against null - empty string value for workerHarnessContainerImage
+ // must be preserved for legacy dataflowWorkerJar to work.
+ String sdkContainerOption = workerOptions.getSdkContainerImage();
+ String workerHarnessOption = workerOptions.getWorkerHarnessContainerImage();
+ Preconditions.checkArgument(
+ sdkContainerOption == null
+ || workerHarnessOption == null
+ || sdkContainerOption.equals(workerHarnessOption),
+ "Cannot use legacy option workerHarnessContainerImage with sdkContainerImage. Prefer sdkContainerImage.");
+
+ // Default to new option, which may be null.
+ String containerImage = workerOptions.getSdkContainerImage();
+ if (workerOptions.getWorkerHarnessContainerImage() != null
+ && workerOptions.getSdkContainerImage() == null) {
+ // Set image to old option if old option was set but new option is not set.
+ LOG.warn(
+ "Prefer --sdkContainerImage over deprecated legacy option --workerHarnessContainerImage.");
+ containerImage = workerOptions.getWorkerHarnessContainerImage();
+ }
+
+ // Make sure both options have same value.
+ workerOptions.setSdkContainerImage(containerImage);
+ workerOptions.setWorkerHarnessContainerImage(containerImage);
+ }
+
@VisibleForTesting
- static void validateWorkerSettings(GcpOptions gcpOptions) {
+ static void validateWorkerSettings(DataflowPipelineWorkerPoolOptions workerOptions) {
+ DataflowPipelineOptions dataflowOptions = workerOptions.as(DataflowPipelineOptions.class);
+
+ validateSdkContainerImageOptions(workerOptions);
+
+ GcpOptions gcpOptions = workerOptions.as(GcpOptions.class);
Preconditions.checkArgument(
gcpOptions.getZone() == null || gcpOptions.getWorkerRegion() == null,
"Cannot use option zone with workerRegion. Prefer either workerZone or workerRegion.");
@@ -413,7 +445,6 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
gcpOptions.getWorkerRegion() == null || gcpOptions.getWorkerZone() == null,
"workerRegion and workerZone options are mutually exclusive.");
- DataflowPipelineOptions dataflowOptions = gcpOptions.as(DataflowPipelineOptions.class);
boolean hasExperimentWorkerRegion = false;
if (dataflowOptions.getExperiments() != null) {
for (String experiment : dataflowOptions.getExperiments()) {
@@ -1092,9 +1123,9 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
// Set the Docker container image that executes Dataflow worker harness, residing in Google
// Container Registry. Translator is guaranteed to create a worker pool prior to this point.
- String workerHarnessContainerImage = getContainerImageForJob(options);
+ String containerImage = getContainerImageForJob(options);
for (WorkerPool workerPool : newJob.getEnvironment().getWorkerPools()) {
- workerPool.setWorkerHarnessContainerImage(workerHarnessContainerImage);
+ workerPool.setWorkerHarnessContainerImage(containerImage);
}
configureSdkHarnessContainerImages(options, portablePipelineProto, newJob);
@@ -2175,21 +2206,45 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
@VisibleForTesting
static String getContainerImageForJob(DataflowPipelineOptions options) {
- String workerHarnessContainerImage = options.getWorkerHarnessContainerImage();
+ String containerImage = options.getSdkContainerImage();
+
+ if (containerImage == null) {
+ // If not set, construct and return default image URL.
+ return getDefaultContainerImageUrl(options);
+ } else if (containerImage.contains("IMAGE")) {
+ // Replace placeholder with default image name
+ // TODO(emilymye): See if we can remove this placeholder
+ return containerImage.replace("IMAGE", getDefaultContainerImageNameForJob(options));
+ } else {
+ return containerImage;
+ }
+ }
+
+ /** Construct the default Dataflow container full image URL. */
+ static String getDefaultContainerImageUrl(DataflowPipelineOptions options) {
+ DataflowRunnerInfo dataflowRunnerInfo = DataflowRunnerInfo.getDataflowRunnerInfo();
+ return String.format(
+ "%s/%s:%s",
+ dataflowRunnerInfo.getContainerImageBaseRepository(),
+ getDefaultContainerImageNameForJob(options),
+ dataflowRunnerInfo.getContainerVersion());
+ }
+ /**
+ * Construct the default Dataflow container image name based on pipeline type and Environment Java
+ * version.
+ */
+ static String getDefaultContainerImageNameForJob(DataflowPipelineOptions options) {
Environments.JavaVersion javaVersion = Environments.getJavaVersion();
- String javaVersionId =
+ String legacyJavaVersionId =
(javaVersion == Environments.JavaVersion.v8) ? "java" : javaVersion.toString();
- if (!workerHarnessContainerImage.contains("IMAGE")) {
- return workerHarnessContainerImage;
- } else if (useUnifiedWorker(options)) {
- return workerHarnessContainerImage.replace("IMAGE", "java");
+
+ if (useUnifiedWorker(options)) {
+ return "java";
} else if (options.isStreaming()) {
- return workerHarnessContainerImage.replace(
- "IMAGE", String.format("beam-%s-streaming", javaVersionId));
+ return String.format("beam-%s-streaming", legacyJavaVersionId);
} else {
- return workerHarnessContainerImage.replace(
- "IMAGE", String.format("beam-%s-batch", javaVersionId));
+ return String.format("beam-%s-batch", legacyJavaVersionId);
}
}
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunnerInfo.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunnerInfo.java
index f54643f..4c5b272 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunnerInfo.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunnerInfo.java
@@ -42,6 +42,7 @@ public final class DataflowRunnerInfo extends ReleaseInfo {
private static final String LEGACY_ENVIRONMENT_MAJOR_VERSION_KEY =
"legacy.environment.major.version";
private static final String CONTAINER_VERSION_KEY = "container.version";
+ private static final String CONTAINER_BASE_REPOSITORY_KEY = "container.base_repository";
private static class LazyInit {
private static final DataflowRunnerInfo INSTANCE;
@@ -99,12 +100,19 @@ public final class DataflowRunnerInfo extends ReleaseInfo {
return properties.get(FNAPI_ENVIRONMENT_MAJOR_VERSION_KEY);
}
- /** Provides the container version that will be used for constructing harness image paths. */
+ /** Provides the version/tag for constructing the container image path. */
public String getContainerVersion() {
checkState(properties.containsKey(CONTAINER_VERSION_KEY), "Unknown container version");
return properties.get(CONTAINER_VERSION_KEY);
}
+ /** Provides the version/tag for constructing the container image path. */
+ public String getContainerImageBaseRepository() {
+ checkState(
+ properties.containsKey(CONTAINER_BASE_REPOSITORY_KEY), "Unknown container base repository");
+ return properties.get(CONTAINER_BASE_REPOSITORY_KEY);
+ }
+
@Override
public Map<String, String> getProperties() {
return ImmutableMap.copyOf((Map) properties);
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java
index 751bca6..1978b17 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java
@@ -18,16 +18,12 @@
package org.apache.beam.runners.dataflow.options;
import com.fasterxml.jackson.annotation.JsonIgnore;
-import org.apache.beam.runners.dataflow.DataflowRunnerInfo;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
-import org.apache.beam.sdk.options.Default;
-import org.apache.beam.sdk.options.DefaultValueFactory;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.FileStagingOptions;
import org.apache.beam.sdk.options.Hidden;
-import org.apache.beam.sdk.options.PipelineOptions;
import org.checkerframework.checker.nullness.qual.Nullable;
/** Options that are used to configure the Dataflow pipeline worker pool. */
@@ -112,30 +108,32 @@ public interface DataflowPipelineWorkerPoolOptions extends GcpOptions, FileStagi
void setDiskSizeGb(int value);
- /**
- * Docker container image that executes Dataflow worker harness, residing in Google Container
- * Registry.
- */
- @Default.InstanceFactory(WorkerHarnessContainerImageFactory.class)
+ /** Container image used as Dataflow worker harness image. */
+ /** @deprecated Use {@link #getSdkContainerImage} instead. */
@Description(
- "Docker container image that executes Dataflow worker harness, residing in Google "
- + " Container Registry.")
+ "Container image used to configure a Dataflow worker. "
+ + "Can only be used for official Dataflow container images. "
+ + "Prefer using sdkContainerImage instead.")
+ @Deprecated
@Hidden
String getWorkerHarnessContainerImage();
+ /** @deprecated Use {@link #setSdkContainerImage} instead. */
+ @Deprecated
+ @Hidden
void setWorkerHarnessContainerImage(String value);
/**
- * Returns the default Docker container image that executes Dataflow worker harness, residing in
- * Google Container Registry.
+ * Container image used to configure SDK execution environment on worker. Used for custom
+ * containers on portable pipelines only.
*/
- class WorkerHarnessContainerImageFactory implements DefaultValueFactory<String> {
- @Override
- public String create(PipelineOptions options) {
- String containerVersion = DataflowRunnerInfo.getDataflowRunnerInfo().getContainerVersion();
- return String.format("gcr.io/cloud-dataflow/v1beta3/IMAGE:%s", containerVersion);
- }
- }
+ @Description(
+ "Container image used to configure the SDK execution environment of "
+ + "pipeline code on a worker. For non-portable pipelines, can only be "
+ + "used for official Dataflow container images.")
+ String getSdkContainerImage();
+
+ void setSdkContainerImage(String value);
/**
* GCE <a href="https://cloud.google.com/compute/docs/networking">network</a> for launching
diff --git a/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties b/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties
index 7d4bdf0..58e92ee 100644
--- a/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties
+++ b/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties
@@ -19,3 +19,4 @@
legacy.environment.major.version=@dataflow.legacy_environment_major_version@
fnapi.environment.major.version=@dataflow.fnapi_environment_major_version@
container.version=@dataflow.container_version@
+container.base_repository=@dataflow.container_base_repository@
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index a4834e2..c38037f 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -141,10 +141,10 @@ public class DataflowPipelineTranslatorTest implements Serializable {
private SdkComponents createSdkComponents(PipelineOptions options) {
SdkComponents sdkComponents = SdkComponents.create();
- String workerHarnessContainerImageURL =
+ String containerImageURL =
DataflowRunner.getContainerImageForJob(options.as(DataflowPipelineOptions.class));
RunnerApi.Environment defaultEnvironmentForDataflow =
- Environments.createDockerEnvironment(workerHarnessContainerImageURL);
+ Environments.createDockerEnvironment(containerImageURL);
sdkComponents.registerEnvironment(defaultEnvironmentForDataflow);
return sdkComponents;
@@ -1260,15 +1260,15 @@ public class DataflowPipelineTranslatorTest implements Serializable {
}
/**
- * Tests that when {@link DataflowPipelineOptions#setWorkerHarnessContainerImage(String)} pipeline
- * option is set, {@link DataflowRunner} sets that value as the {@link
- * DockerPayload#getContainerImage()} of the default {@link Environment} used when generating the
- * model pipeline proto.
+ * Tests that when (deprecated) {@link
+ * DataflowPipelineOptions#setWorkerHarnessContainerImage(String)} pipeline option is set, {@link
+ * DataflowRunner} sets that value as the {@link DockerPayload#getContainerImage()} of the default
+ * {@link Environment} used when generating the model pipeline proto.
*/
@Test
public void testSetWorkerHarnessContainerImageInPipelineProto() throws Exception {
DataflowPipelineOptions options = buildPipelineOptions();
- String containerImage = "gcr.io/IMAGE/foo";
+ String containerImage = "gcr.io/image:foo";
options.as(DataflowPipelineOptions.class).setWorkerHarnessContainerImage(containerImage);
Pipeline p = Pipeline.create(options);
@@ -1292,6 +1292,38 @@ public class DataflowPipelineTranslatorTest implements Serializable {
assertEquals(DataflowRunner.getContainerImageForJob(options), payload.getContainerImage());
}
+ /**
+ * Tests that when {@link DataflowPipelineOptions#setSdkContainerImage(String)} pipeline option is
+ * set, {@link DataflowRunner} sets that value as the {@link DockerPayload#getContainerImage()} of
+ * the default {@link Environment} used when generating the model pipeline proto.
+ */
+ @Test
+ public void testSetSdkContainerImageInPipelineProto() throws Exception {
+ DataflowPipelineOptions options = buildPipelineOptions();
+ String containerImage = "gcr.io/image:foo";
+ options.as(DataflowPipelineOptions.class).setSdkContainerImage(containerImage);
+
+ Pipeline p = Pipeline.create(options);
+ SdkComponents sdkComponents = createSdkComponents(options);
+ RunnerApi.Pipeline proto = PipelineTranslation.toProto(p, sdkComponents, true);
+ JobSpecification specification =
+ DataflowPipelineTranslator.fromOptions(options)
+ .translate(
+ p,
+ proto,
+ sdkComponents,
+ DataflowRunner.fromOptions(options),
+ Collections.emptyList());
+ RunnerApi.Pipeline pipelineProto = specification.getPipelineProto();
+
+ assertEquals(1, pipelineProto.getComponents().getEnvironmentsCount());
+ Environment defaultEnvironment =
+ Iterables.getOnlyElement(pipelineProto.getComponents().getEnvironmentsMap().values());
+
+ DockerPayload payload = DockerPayload.parseFrom(defaultEnvironment.getPayload());
+ assertEquals(DataflowRunner.getContainerImageForJob(options), payload.getContainerImage());
+ }
+
@Test
public void testDataflowServiceOptionsSet() throws IOException {
final List<String> dataflowServiceOptions =
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerInfoTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerInfoTest.java
index 83e617f..fbb8ed2 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerInfoTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerInfoTest.java
@@ -49,8 +49,19 @@ public class DataflowRunnerInfoTest {
String.format("FnAPI environment major version number %s is not a number", version),
version.matches("\\d+"));
- // Validate container version does not contain a $ (indicating it was not filled in).
- assertThat("container version invalid", info.getContainerVersion(), not(containsString("$")));
+ // Validate container version does not contain the property name (indicating it was not filled
+ // in).
+ assertThat(
+ "container version invalid",
+ info.getContainerVersion(),
+ not(containsString("dataflow.container_version")));
+
+ // Validate container base repository does not contain the property name
+ // (indicating it was not filled in).
+ assertThat(
+ "container repository invalid",
+ info.getContainerImageBaseRepository(),
+ not(containsString("dataflow.container_base_repository")));
for (String property :
new String[] {"java.vendor", "java.version", "os.arch", "os.name", "os.version"}) {
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index d7cf5ae..e3b426f 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -584,7 +584,8 @@ public class DataflowRunnerTest implements Serializable {
@Test
public void testZoneAndWorkerRegionMutuallyExclusive() {
- GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class);
+ DataflowPipelineWorkerPoolOptions options =
+ PipelineOptionsFactory.as(DataflowPipelineWorkerPoolOptions.class);
options.setZone("us-east1-b");
options.setWorkerRegion("us-east1");
assertThrows(
@@ -593,7 +594,8 @@ public class DataflowRunnerTest implements Serializable {
@Test
public void testZoneAndWorkerZoneMutuallyExclusive() {
- GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class);
+ DataflowPipelineWorkerPoolOptions options =
+ PipelineOptionsFactory.as(DataflowPipelineWorkerPoolOptions.class);
options.setZone("us-east1-b");
options.setWorkerZone("us-east1-c");
assertThrows(
@@ -602,7 +604,8 @@ public class DataflowRunnerTest implements Serializable {
@Test
public void testExperimentRegionAndWorkerRegionMutuallyExclusive() {
- GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class);
+ DataflowPipelineWorkerPoolOptions options =
+ PipelineOptionsFactory.as(DataflowPipelineWorkerPoolOptions.class);
DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
ExperimentalOptions.addExperiment(dataflowOptions, "worker_region=us-west1");
options.setWorkerRegion("us-east1");
@@ -612,7 +615,8 @@ public class DataflowRunnerTest implements Serializable {
@Test
public void testExperimentRegionAndWorkerZoneMutuallyExclusive() {
- GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class);
+ DataflowPipelineWorkerPoolOptions options =
+ PipelineOptionsFactory.as(DataflowPipelineWorkerPoolOptions.class);
DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
ExperimentalOptions.addExperiment(dataflowOptions, "worker_region=us-west1");
options.setWorkerZone("us-east1-b");
@@ -622,7 +626,8 @@ public class DataflowRunnerTest implements Serializable {
@Test
public void testWorkerRegionAndWorkerZoneMutuallyExclusive() {
- GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class);
+ DataflowPipelineWorkerPoolOptions options =
+ PipelineOptionsFactory.as(DataflowPipelineWorkerPoolOptions.class);
options.setWorkerRegion("us-east1");
options.setWorkerZone("us-east1-b");
assertThrows(
@@ -631,7 +636,8 @@ public class DataflowRunnerTest implements Serializable {
@Test
public void testZoneAliasWorkerZone() {
- GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class);
+ DataflowPipelineWorkerPoolOptions options =
+ PipelineOptionsFactory.as(DataflowPipelineWorkerPoolOptions.class);
options.setZone("us-east1-b");
DataflowRunner.validateWorkerSettings(options);
assertNull(options.getZone());
@@ -639,6 +645,28 @@ public class DataflowRunnerTest implements Serializable {
}
@Test
+ public void testAliasForLegacyWorkerHarnessContainerImage() {
+ DataflowPipelineWorkerPoolOptions options =
+ PipelineOptionsFactory.as(DataflowPipelineWorkerPoolOptions.class);
+ String testImage = "image.url:worker";
+ options.setWorkerHarnessContainerImage(testImage);
+ DataflowRunner.validateWorkerSettings(options);
+ assertEquals(testImage, options.getWorkerHarnessContainerImage());
+ assertEquals(testImage, options.getSdkContainerImage());
+ }
+
+ @Test
+ public void testAliasForSdkContainerImage() {
+ DataflowPipelineWorkerPoolOptions options =
+ PipelineOptionsFactory.as(DataflowPipelineWorkerPoolOptions.class);
+ String testImage = "image.url:sdk";
+ options.setSdkContainerImage("image.url:sdk");
+ DataflowRunner.validateWorkerSettings(options);
+ assertEquals(testImage, options.getWorkerHarnessContainerImage());
+ assertEquals(testImage, options.getSdkContainerImage());
+ }
+
+ @Test
public void testRegionRequiredForServiceRunner() throws IOException {
DataflowPipelineOptions options = buildPipelineOptions();
options.setRegion(null);
@@ -1591,15 +1619,33 @@ public class DataflowRunnerTest implements Serializable {
}
@Test
- public void testWorkerHarnessContainerImage() {
+ public void testGetContainerImageForJobFromOption() {
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
- // default image set
- options.setWorkerHarnessContainerImage("some-container");
- assertThat(getContainerImageForJob(options), equalTo("some-container"));
+ String[] testCases = {
+ "some-container",
+
+ // It is important that empty string is preserved, as
+ // dataflowWorkerJar relies on being passed an empty value vs
+ // not providing the container image option at all.
+ "",
+ };
+
+ for (String testCase : testCases) {
+ // When image option is set, should use that exact image.
+ options.setSdkContainerImage(testCase);
+ assertThat(getContainerImageForJob(options), equalTo(testCase));
+ }
+ }
+
+ @Test
+ public void testGetContainerImageForJobFromOptionWithPlaceholder() {
+ DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+ // When image option contains placeholder, container image should
+ // have placeholder replaced with default image for job.
+ options.setSdkContainerImage("gcr.io/IMAGE/foo");
// batch, legacy
- options.setWorkerHarnessContainerImage("gcr.io/IMAGE/foo");
options.setExperiments(null);
options.setStreaming(false);
System.setProperty("java.specification.version", "1.8");