You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bo...@apache.org on 2021/04/23 01:22:05 UTC

[beam] branch master updated: add sdkContainerImage to Java WorkerPool PipelineOptions

This is an automated email from the ASF dual-hosted git repository.

boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 2300f45  add sdkContainerImage to Java WorkerPool PipelineOptions
     new 018f5e7  Merge pull request #14575 from [BEAM-12212] Adds  --sdkContainerImage as new Java Dataflow PipelineOption
2300f45 is described below

commit 2300f4547bbdc5c0e1cdcafa537d1a605bedc522
Author: Emily Ye <em...@google.com>
AuthorDate: Sun Apr 18 23:58:24 2021 -0700

    add sdkContainerImage to Java WorkerPool PipelineOptions
---
 .../org/apache/beam/gradle/BeamModulePlugin.groovy |  4 +-
 runners/google-cloud-dataflow-java/build.gradle    |  3 +-
 .../examples-streaming/build.gradle                |  4 +-
 .../beam/runners/dataflow/DataflowRunner.java      | 85 ++++++++++++++++++----
 .../beam/runners/dataflow/DataflowRunnerInfo.java  | 10 ++-
 .../options/DataflowPipelineWorkerPoolOptions.java | 38 +++++-----
 .../beam/runners/dataflow/dataflow.properties      |  1 +
 .../dataflow/DataflowPipelineTranslatorTest.java   | 46 ++++++++++--
 .../runners/dataflow/DataflowRunnerInfoTest.java   | 15 +++-
 .../beam/runners/dataflow/DataflowRunnerTest.java  | 68 ++++++++++++++---
 10 files changed, 214 insertions(+), 60 deletions(-)

diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index 465061d..4c4d41b 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -1583,13 +1583,15 @@ class BeamModulePlugin implements Plugin<Project> {
             if (pipelineOptionsString.contains('use_runner_v2')) {
               def dockerImageName = project.project(':runners:google-cloud-dataflow-java').ext.dockerImageName
               allOptionsList.addAll([
-                "--workerHarnessContainerImage=${dockerImageName}",
+                "--sdkContainerImage=${dockerImageName}",
                 "--region=${dataflowRegion}"
               ])
             } else {
               def dataflowWorkerJar = project.findProperty('dataflowWorkerJar') ?:
                   project.project(":runners:google-cloud-dataflow-java:worker:legacy-worker").shadowJar.archivePath
               allOptionsList.addAll([
+                // Keep as legacy flag to ensure via test this flag works for
+                // legacy pipeline.
                 '--workerHarnessContainerImage=',
                 "--dataflowWorkerJar=${dataflowWorkerJar}",
                 "--region=${dataflowRegion}"
diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle
index 80941db..a3db5a2 100644
--- a/runners/google-cloud-dataflow-java/build.gradle
+++ b/runners/google-cloud-dataflow-java/build.gradle
@@ -46,6 +46,7 @@ processResources {
     'dataflow.legacy_environment_major_version' : '8',
     'dataflow.fnapi_environment_major_version' : '8',
     'dataflow.container_version' : 'beam-master-20210419',
+    'dataflow.container_base_repository' : 'gcr.io/cloud-dataflow/v1beta3',
   ]
 }
 
@@ -147,7 +148,7 @@ def runnerV2PipelineOptions = [
   "--project=${dataflowProject}",
   "--region=${dataflowRegion}",
   "--tempRoot=${dataflowValidatesTempRoot}",
-  "--workerHarnessContainerImage=${dockerImageContainer}:${dockerTag}",
+  "--sdkContainerImage=${dockerImageContainer}:${dockerTag}",
   // TODO(BEAM-11779) remove shuffle_mode=appliance with runner v2 once issue is resolved.
   "--experiments=beam_fn_api,use_unified_worker,use_runner_v2,shuffle_mode=appliance",
 ]
diff --git a/runners/google-cloud-dataflow-java/examples-streaming/build.gradle b/runners/google-cloud-dataflow-java/examples-streaming/build.gradle
index fd8705d..bc5f584d 100644
--- a/runners/google-cloud-dataflow-java/examples-streaming/build.gradle
+++ b/runners/google-cloud-dataflow-java/examples-streaming/build.gradle
@@ -40,8 +40,8 @@ task windmillPreCommit(type: Test) {
   dependsOn ":runners:google-cloud-dataflow-java:worker:legacy-worker:shadowJar"
   def dataflowWorkerJar = project.findProperty('dataflowWorkerJar') ?: project(":runners:google-cloud-dataflow-java:worker:legacy-worker").shadowJar.archivePath
 
-  // Set workerHarnessContainerImage to empty to make Dataflow pick up the non-versioned container
-  // image, which handles a staged worker jar.
+  // Set workerHarnessContainerImage to empty to make Dataflow pick up the
+  // non-versioned container image, which handles a staged worker jar.
   def preCommitBeamTestPipelineOptions = [
      "--project=${gcpProject}",
      "--region=${gcpRegion}",
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 42d463e..cae1cf2 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -269,7 +269,8 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
           "Missing required pipeline options: " + Joiner.on(',').join(missing));
     }
 
-    validateWorkerSettings(PipelineOptionsValidator.validate(GcpOptions.class, options));
+    validateWorkerSettings(
+        PipelineOptionsValidator.validate(DataflowPipelineWorkerPoolOptions.class, options));
 
     PathValidator validator = dataflowOptions.getPathValidator();
     String gcpTempLocation;
@@ -401,8 +402,39 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     return Strings.isNullOrEmpty(endpoint) || Pattern.matches(ENDPOINT_REGEXP, endpoint);
   }
 
+  static void validateSdkContainerImageOptions(DataflowPipelineWorkerPoolOptions workerOptions) {
+    // Check against null - empty string value for workerHarnessContainerImage
+    // must be preserved for legacy dataflowWorkerJar to work.
+    String sdkContainerOption = workerOptions.getSdkContainerImage();
+    String workerHarnessOption = workerOptions.getWorkerHarnessContainerImage();
+    Preconditions.checkArgument(
+        sdkContainerOption == null
+            || workerHarnessOption == null
+            || sdkContainerOption.equals(workerHarnessOption),
+        "Cannot use legacy option workerHarnessContainerImage with sdkContainerImage. Prefer sdkContainerImage.");
+
+    // Default to new option, which may be null.
+    String containerImage = workerOptions.getSdkContainerImage();
+    if (workerOptions.getWorkerHarnessContainerImage() != null
+        && workerOptions.getSdkContainerImage() == null) {
+      // Set image to old option if old option was set but new option is not set.
+      LOG.warn(
+          "Prefer --sdkContainerImage over deprecated legacy option --workerHarnessContainerImage.");
+      containerImage = workerOptions.getWorkerHarnessContainerImage();
+    }
+
+    // Make sure both options have same value.
+    workerOptions.setSdkContainerImage(containerImage);
+    workerOptions.setWorkerHarnessContainerImage(containerImage);
+  }
+
   @VisibleForTesting
-  static void validateWorkerSettings(GcpOptions gcpOptions) {
+  static void validateWorkerSettings(DataflowPipelineWorkerPoolOptions workerOptions) {
+    DataflowPipelineOptions dataflowOptions = workerOptions.as(DataflowPipelineOptions.class);
+
+    validateSdkContainerImageOptions(workerOptions);
+
+    GcpOptions gcpOptions = workerOptions.as(GcpOptions.class);
     Preconditions.checkArgument(
         gcpOptions.getZone() == null || gcpOptions.getWorkerRegion() == null,
         "Cannot use option zone with workerRegion. Prefer either workerZone or workerRegion.");
@@ -413,7 +445,6 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
         gcpOptions.getWorkerRegion() == null || gcpOptions.getWorkerZone() == null,
         "workerRegion and workerZone options are mutually exclusive.");
 
-    DataflowPipelineOptions dataflowOptions = gcpOptions.as(DataflowPipelineOptions.class);
     boolean hasExperimentWorkerRegion = false;
     if (dataflowOptions.getExperiments() != null) {
       for (String experiment : dataflowOptions.getExperiments()) {
@@ -1092,9 +1123,9 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
 
     // Set the Docker container image that executes Dataflow worker harness, residing in Google
     // Container Registry. Translator is guaranteed to create a worker pool prior to this point.
-    String workerHarnessContainerImage = getContainerImageForJob(options);
+    String containerImage = getContainerImageForJob(options);
     for (WorkerPool workerPool : newJob.getEnvironment().getWorkerPools()) {
-      workerPool.setWorkerHarnessContainerImage(workerHarnessContainerImage);
+      workerPool.setWorkerHarnessContainerImage(containerImage);
     }
 
     configureSdkHarnessContainerImages(options, portablePipelineProto, newJob);
@@ -2175,21 +2206,45 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
 
   @VisibleForTesting
   static String getContainerImageForJob(DataflowPipelineOptions options) {
-    String workerHarnessContainerImage = options.getWorkerHarnessContainerImage();
+    String containerImage = options.getSdkContainerImage();
+
+    if (containerImage == null) {
+      // If not set, construct and return default image URL.
+      return getDefaultContainerImageUrl(options);
+    } else if (containerImage.contains("IMAGE")) {
+      // Replace placeholder with default image name
+      // TODO(emilymye): See if we can remove this placeholder
+      return containerImage.replace("IMAGE", getDefaultContainerImageNameForJob(options));
+    } else {
+      return containerImage;
+    }
+  }
+
+  /** Construct the default Dataflow container full image URL. */
+  static String getDefaultContainerImageUrl(DataflowPipelineOptions options) {
+    DataflowRunnerInfo dataflowRunnerInfo = DataflowRunnerInfo.getDataflowRunnerInfo();
+    return String.format(
+        "%s/%s:%s",
+        dataflowRunnerInfo.getContainerImageBaseRepository(),
+        getDefaultContainerImageNameForJob(options),
+        dataflowRunnerInfo.getContainerVersion());
+  }
 
+  /**
+   * Construct the default Dataflow container image name based on pipeline type and Environment Java
+   * version.
+   */
+  static String getDefaultContainerImageNameForJob(DataflowPipelineOptions options) {
     Environments.JavaVersion javaVersion = Environments.getJavaVersion();
-    String javaVersionId =
+    String legacyJavaVersionId =
         (javaVersion == Environments.JavaVersion.v8) ? "java" : javaVersion.toString();
-    if (!workerHarnessContainerImage.contains("IMAGE")) {
-      return workerHarnessContainerImage;
-    } else if (useUnifiedWorker(options)) {
-      return workerHarnessContainerImage.replace("IMAGE", "java");
+
+    if (useUnifiedWorker(options)) {
+      return "java";
     } else if (options.isStreaming()) {
-      return workerHarnessContainerImage.replace(
-          "IMAGE", String.format("beam-%s-streaming", javaVersionId));
+      return String.format("beam-%s-streaming", legacyJavaVersionId);
     } else {
-      return workerHarnessContainerImage.replace(
-          "IMAGE", String.format("beam-%s-batch", javaVersionId));
+      return String.format("beam-%s-batch", legacyJavaVersionId);
     }
   }
 
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunnerInfo.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunnerInfo.java
index f54643f..4c5b272 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunnerInfo.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunnerInfo.java
@@ -42,6 +42,7 @@ public final class DataflowRunnerInfo extends ReleaseInfo {
   private static final String LEGACY_ENVIRONMENT_MAJOR_VERSION_KEY =
       "legacy.environment.major.version";
   private static final String CONTAINER_VERSION_KEY = "container.version";
+  private static final String CONTAINER_BASE_REPOSITORY_KEY = "container.base_repository";
 
   private static class LazyInit {
     private static final DataflowRunnerInfo INSTANCE;
@@ -99,12 +100,19 @@ public final class DataflowRunnerInfo extends ReleaseInfo {
     return properties.get(FNAPI_ENVIRONMENT_MAJOR_VERSION_KEY);
   }
 
-  /** Provides the container version that will be used for constructing harness image paths. */
+  /** Provides the version/tag for constructing the container image path. */
   public String getContainerVersion() {
     checkState(properties.containsKey(CONTAINER_VERSION_KEY), "Unknown container version");
     return properties.get(CONTAINER_VERSION_KEY);
   }
 
+  /** Provides the version/tag for constructing the container image path. */
+  public String getContainerImageBaseRepository() {
+    checkState(
+        properties.containsKey(CONTAINER_BASE_REPOSITORY_KEY), "Unknown container base repository");
+    return properties.get(CONTAINER_BASE_REPOSITORY_KEY);
+  }
+
   @Override
   public Map<String, String> getProperties() {
     return ImmutableMap.copyOf((Map) properties);
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java
index 751bca6..1978b17 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java
@@ -18,16 +18,12 @@
 package org.apache.beam.runners.dataflow.options;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
-import org.apache.beam.runners.dataflow.DataflowRunnerInfo;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
-import org.apache.beam.sdk.options.Default;
-import org.apache.beam.sdk.options.DefaultValueFactory;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.FileStagingOptions;
 import org.apache.beam.sdk.options.Hidden;
-import org.apache.beam.sdk.options.PipelineOptions;
 import org.checkerframework.checker.nullness.qual.Nullable;
 
 /** Options that are used to configure the Dataflow pipeline worker pool. */
@@ -112,30 +108,32 @@ public interface DataflowPipelineWorkerPoolOptions extends GcpOptions, FileStagi
 
   void setDiskSizeGb(int value);
 
-  /**
-   * Docker container image that executes Dataflow worker harness, residing in Google Container
-   * Registry.
-   */
-  @Default.InstanceFactory(WorkerHarnessContainerImageFactory.class)
+  /** Container image used as Dataflow worker harness image. */
+  /** @deprecated Use {@link #getSdkContainerImage} instead. */
   @Description(
-      "Docker container image that executes Dataflow worker harness, residing in Google "
-          + " Container Registry.")
+      "Container image used to configure a Dataflow worker. "
+          + "Can only be used for official Dataflow container images. "
+          + "Prefer using sdkContainerImage instead.")
+  @Deprecated
   @Hidden
   String getWorkerHarnessContainerImage();
 
+  /** @deprecated Use {@link #setSdkContainerImage} instead. */
+  @Deprecated
+  @Hidden
   void setWorkerHarnessContainerImage(String value);
 
   /**
-   * Returns the default Docker container image that executes Dataflow worker harness, residing in
-   * Google Container Registry.
+   * Container image used to configure SDK execution environment on worker. Used for custom
+   * containers on portable pipelines only.
    */
-  class WorkerHarnessContainerImageFactory implements DefaultValueFactory<String> {
-    @Override
-    public String create(PipelineOptions options) {
-      String containerVersion = DataflowRunnerInfo.getDataflowRunnerInfo().getContainerVersion();
-      return String.format("gcr.io/cloud-dataflow/v1beta3/IMAGE:%s", containerVersion);
-    }
-  }
+  @Description(
+      "Container image used to configure the SDK execution environment of "
+          + "pipeline code on a worker. For non-portable pipelines, can only be "
+          + "used for official Dataflow container images.")
+  String getSdkContainerImage();
+
+  void setSdkContainerImage(String value);
 
   /**
    * GCE <a href="https://cloud.google.com/compute/docs/networking">network</a> for launching
diff --git a/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties b/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties
index 7d4bdf0..58e92ee 100644
--- a/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties
+++ b/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties
@@ -19,3 +19,4 @@
 legacy.environment.major.version=@dataflow.legacy_environment_major_version@
 fnapi.environment.major.version=@dataflow.fnapi_environment_major_version@
 container.version=@dataflow.container_version@
+container.base_repository=@dataflow.container_base_repository@
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index a4834e2..c38037f 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -141,10 +141,10 @@ public class DataflowPipelineTranslatorTest implements Serializable {
   private SdkComponents createSdkComponents(PipelineOptions options) {
     SdkComponents sdkComponents = SdkComponents.create();
 
-    String workerHarnessContainerImageURL =
+    String containerImageURL =
         DataflowRunner.getContainerImageForJob(options.as(DataflowPipelineOptions.class));
     RunnerApi.Environment defaultEnvironmentForDataflow =
-        Environments.createDockerEnvironment(workerHarnessContainerImageURL);
+        Environments.createDockerEnvironment(containerImageURL);
 
     sdkComponents.registerEnvironment(defaultEnvironmentForDataflow);
     return sdkComponents;
@@ -1260,15 +1260,15 @@ public class DataflowPipelineTranslatorTest implements Serializable {
   }
 
   /**
-   * Tests that when {@link DataflowPipelineOptions#setWorkerHarnessContainerImage(String)} pipeline
-   * option is set, {@link DataflowRunner} sets that value as the {@link
-   * DockerPayload#getContainerImage()} of the default {@link Environment} used when generating the
-   * model pipeline proto.
+   * Tests that when (deprecated) {@link
+   * DataflowPipelineOptions#setWorkerHarnessContainerImage(String)} pipeline option is set, {@link
+   * DataflowRunner} sets that value as the {@link DockerPayload#getContainerImage()} of the default
+   * {@link Environment} used when generating the model pipeline proto.
    */
   @Test
   public void testSetWorkerHarnessContainerImageInPipelineProto() throws Exception {
     DataflowPipelineOptions options = buildPipelineOptions();
-    String containerImage = "gcr.io/IMAGE/foo";
+    String containerImage = "gcr.io/image:foo";
     options.as(DataflowPipelineOptions.class).setWorkerHarnessContainerImage(containerImage);
 
     Pipeline p = Pipeline.create(options);
@@ -1292,6 +1292,38 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     assertEquals(DataflowRunner.getContainerImageForJob(options), payload.getContainerImage());
   }
 
+  /**
+   * Tests that when {@link DataflowPipelineOptions#setSdkContainerImage(String)} pipeline option is
+   * set, {@link DataflowRunner} sets that value as the {@link DockerPayload#getContainerImage()} of
+   * the default {@link Environment} used when generating the model pipeline proto.
+   */
+  @Test
+  public void testSetSdkContainerImageInPipelineProto() throws Exception {
+    DataflowPipelineOptions options = buildPipelineOptions();
+    String containerImage = "gcr.io/image:foo";
+    options.as(DataflowPipelineOptions.class).setSdkContainerImage(containerImage);
+
+    Pipeline p = Pipeline.create(options);
+    SdkComponents sdkComponents = createSdkComponents(options);
+    RunnerApi.Pipeline proto = PipelineTranslation.toProto(p, sdkComponents, true);
+    JobSpecification specification =
+        DataflowPipelineTranslator.fromOptions(options)
+            .translate(
+                p,
+                proto,
+                sdkComponents,
+                DataflowRunner.fromOptions(options),
+                Collections.emptyList());
+    RunnerApi.Pipeline pipelineProto = specification.getPipelineProto();
+
+    assertEquals(1, pipelineProto.getComponents().getEnvironmentsCount());
+    Environment defaultEnvironment =
+        Iterables.getOnlyElement(pipelineProto.getComponents().getEnvironmentsMap().values());
+
+    DockerPayload payload = DockerPayload.parseFrom(defaultEnvironment.getPayload());
+    assertEquals(DataflowRunner.getContainerImageForJob(options), payload.getContainerImage());
+  }
+
   @Test
   public void testDataflowServiceOptionsSet() throws IOException {
     final List<String> dataflowServiceOptions =
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerInfoTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerInfoTest.java
index 83e617f..fbb8ed2 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerInfoTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerInfoTest.java
@@ -49,8 +49,19 @@ public class DataflowRunnerInfoTest {
         String.format("FnAPI environment major version number %s is not a number", version),
         version.matches("\\d+"));
 
-    // Validate container version does not contain a $ (indicating it was not filled in).
-    assertThat("container version invalid", info.getContainerVersion(), not(containsString("$")));
+    // Validate container version does not contain the property name (indicating it was not filled
+    // in).
+    assertThat(
+        "container version invalid",
+        info.getContainerVersion(),
+        not(containsString("dataflow.container_version")));
+
+    // Validate container base repository does not contain the property name
+    // (indicating it was not filled in).
+    assertThat(
+        "container repository invalid",
+        info.getContainerImageBaseRepository(),
+        not(containsString("dataflow.container_base_repository")));
 
     for (String property :
         new String[] {"java.vendor", "java.version", "os.arch", "os.name", "os.version"}) {
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index d7cf5ae..e3b426f 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -584,7 +584,8 @@ public class DataflowRunnerTest implements Serializable {
 
   @Test
   public void testZoneAndWorkerRegionMutuallyExclusive() {
-    GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class);
+    DataflowPipelineWorkerPoolOptions options =
+        PipelineOptionsFactory.as(DataflowPipelineWorkerPoolOptions.class);
     options.setZone("us-east1-b");
     options.setWorkerRegion("us-east1");
     assertThrows(
@@ -593,7 +594,8 @@ public class DataflowRunnerTest implements Serializable {
 
   @Test
   public void testZoneAndWorkerZoneMutuallyExclusive() {
-    GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class);
+    DataflowPipelineWorkerPoolOptions options =
+        PipelineOptionsFactory.as(DataflowPipelineWorkerPoolOptions.class);
     options.setZone("us-east1-b");
     options.setWorkerZone("us-east1-c");
     assertThrows(
@@ -602,7 +604,8 @@ public class DataflowRunnerTest implements Serializable {
 
   @Test
   public void testExperimentRegionAndWorkerRegionMutuallyExclusive() {
-    GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class);
+    DataflowPipelineWorkerPoolOptions options =
+        PipelineOptionsFactory.as(DataflowPipelineWorkerPoolOptions.class);
     DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
     ExperimentalOptions.addExperiment(dataflowOptions, "worker_region=us-west1");
     options.setWorkerRegion("us-east1");
@@ -612,7 +615,8 @@ public class DataflowRunnerTest implements Serializable {
 
   @Test
   public void testExperimentRegionAndWorkerZoneMutuallyExclusive() {
-    GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class);
+    DataflowPipelineWorkerPoolOptions options =
+        PipelineOptionsFactory.as(DataflowPipelineWorkerPoolOptions.class);
     DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
     ExperimentalOptions.addExperiment(dataflowOptions, "worker_region=us-west1");
     options.setWorkerZone("us-east1-b");
@@ -622,7 +626,8 @@ public class DataflowRunnerTest implements Serializable {
 
   @Test
   public void testWorkerRegionAndWorkerZoneMutuallyExclusive() {
-    GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class);
+    DataflowPipelineWorkerPoolOptions options =
+        PipelineOptionsFactory.as(DataflowPipelineWorkerPoolOptions.class);
     options.setWorkerRegion("us-east1");
     options.setWorkerZone("us-east1-b");
     assertThrows(
@@ -631,7 +636,8 @@ public class DataflowRunnerTest implements Serializable {
 
   @Test
   public void testZoneAliasWorkerZone() {
-    GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class);
+    DataflowPipelineWorkerPoolOptions options =
+        PipelineOptionsFactory.as(DataflowPipelineWorkerPoolOptions.class);
     options.setZone("us-east1-b");
     DataflowRunner.validateWorkerSettings(options);
     assertNull(options.getZone());
@@ -639,6 +645,28 @@ public class DataflowRunnerTest implements Serializable {
   }
 
   @Test
+  public void testAliasForLegacyWorkerHarnessContainerImage() {
+    DataflowPipelineWorkerPoolOptions options =
+        PipelineOptionsFactory.as(DataflowPipelineWorkerPoolOptions.class);
+    String testImage = "image.url:worker";
+    options.setWorkerHarnessContainerImage(testImage);
+    DataflowRunner.validateWorkerSettings(options);
+    assertEquals(testImage, options.getWorkerHarnessContainerImage());
+    assertEquals(testImage, options.getSdkContainerImage());
+  }
+
+  @Test
+  public void testAliasForSdkContainerImage() {
+    DataflowPipelineWorkerPoolOptions options =
+        PipelineOptionsFactory.as(DataflowPipelineWorkerPoolOptions.class);
+    String testImage = "image.url:sdk";
+    options.setSdkContainerImage("image.url:sdk");
+    DataflowRunner.validateWorkerSettings(options);
+    assertEquals(testImage, options.getWorkerHarnessContainerImage());
+    assertEquals(testImage, options.getSdkContainerImage());
+  }
+
+  @Test
   public void testRegionRequiredForServiceRunner() throws IOException {
     DataflowPipelineOptions options = buildPipelineOptions();
     options.setRegion(null);
@@ -1591,15 +1619,33 @@ public class DataflowRunnerTest implements Serializable {
   }
 
   @Test
-  public void testWorkerHarnessContainerImage() {
+  public void testGetContainerImageForJobFromOption() {
     DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
 
-    // default image set
-    options.setWorkerHarnessContainerImage("some-container");
-    assertThat(getContainerImageForJob(options), equalTo("some-container"));
+    String[] testCases = {
+      "some-container",
+
+      // It is important that empty string is preserved, as
+      // dataflowWorkerJar relies on being passed an empty value vs
+      // not providing the container image option at all.
+      "",
+    };
+
+    for (String testCase : testCases) {
+      // When image option is set, should use that exact image.
+      options.setSdkContainerImage(testCase);
+      assertThat(getContainerImageForJob(options), equalTo(testCase));
+    }
+  }
+
+  @Test
+  public void testGetContainerImageForJobFromOptionWithPlaceholder() {
+    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+    // When image option contains placeholder, container image should
+    // have placeholder replaced with default image for job.
+    options.setSdkContainerImage("gcr.io/IMAGE/foo");
 
     // batch, legacy
-    options.setWorkerHarnessContainerImage("gcr.io/IMAGE/foo");
     options.setExperiments(null);
     options.setStreaming(false);
     System.setProperty("java.specification.version", "1.8");