You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by re...@apache.org on 2023/06/07 21:45:25 UTC

[beam] branch master updated: Merge pull request #27047: Enable pubsub dynamic destinations by default

This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new c3b330c66c6 Merge pull request #27047: Enable pubsub dynamic destinations by default
c3b330c66c6 is described below

commit c3b330c66c6e098a3a3a9505d3b4eaddc9ccb9c0
Author: Reuven Lax <re...@google.com>
AuthorDate: Wed Jun 7 14:45:18 2023 -0700

    Merge pull request #27047: Enable pubsub dynamic destinations by default
---
 .../java/org/apache/beam/runners/dataflow/DataflowRunner.java    | 9 +--------
 .../beam/runners/dataflow/options/DataflowPipelineOptions.java   | 7 -------
 .../org/apache/beam/runners/dataflow/DataflowRunnerTest.java     | 1 -
 3 files changed, 1 insertion(+), 16 deletions(-)

diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 2951b48bab4..60ebe8d8846 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -1916,14 +1916,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
               ((NestedValueProvider) overriddenTransform.getTopicProvider()).propertyName());
         }
       } else {
-        DataflowPipelineOptions options =
-            input.getPipeline().getOptions().as(DataflowPipelineOptions.class);
-        if (options.getEnableDynamicPubsubDestinations()) {
-          stepContext.addInput(PropertyNames.PUBSUB_DYNAMIC_DESTINATIONS, true);
-        } else {
-          throw new RuntimeException(
-              "Dynamic Pubsub destinations not yet supported. Topic must be set.");
-        }
+        stepContext.addInput(PropertyNames.PUBSUB_DYNAMIC_DESTINATIONS, true);
       }
       if (overriddenTransform.getTimestampAttribute() != null) {
         stepContext.addInput(
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
index bf1027cdd3a..f87af28ca61 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
@@ -153,13 +153,6 @@ public interface DataflowPipelineOptions
 
   void setDataflowWorkerJar(String dataflowWorkerJafr);
 
-  // Disable this support for now until the Dataflow backend fully supports this option.
-  @Description("Whether to allow dynamic pubsub destinations. Temporary option: will be removed.")
-  @Default.Boolean(false)
-  Boolean getEnableDynamicPubsubDestinations();
-
-  void setEnableDynamicPubsubDestinations(Boolean enable);
-
   /** Set of available Flexible Resource Scheduling goals. */
   enum FlexResourceSchedulingGoal {
     /** No goal specified. */
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index d16afeb91e0..84f1069f592 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -2505,7 +2505,6 @@ public class DataflowRunnerTest implements Serializable {
     PipelineOptions options = buildPipelineOptions();
     DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
     dataflowOptions.setStreaming(true);
-    dataflowOptions.setEnableDynamicPubsubDestinations(true);
     Pipeline p = Pipeline.create(options);
 
     List<PubsubMessage> testValues =