You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by re...@apache.org on 2023/06/07 21:45:25 UTC
[beam] branch master updated: Merge pull request #27047: Enable pubsub dynamic destinations by default
This is an automated email from the ASF dual-hosted git repository.
reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new c3b330c66c6 Merge pull request #27047: Enable pubsub dynamic destinations by default
c3b330c66c6 is described below
commit c3b330c66c6e098a3a3a9505d3b4eaddc9ccb9c0
Author: Reuven Lax <re...@google.com>
AuthorDate: Wed Jun 7 14:45:18 2023 -0700
Merge pull request #27047: Enable pubsub dynamic destinations by default
---
.../java/org/apache/beam/runners/dataflow/DataflowRunner.java | 9 +--------
.../beam/runners/dataflow/options/DataflowPipelineOptions.java | 7 -------
.../org/apache/beam/runners/dataflow/DataflowRunnerTest.java | 1 -
3 files changed, 1 insertion(+), 16 deletions(-)
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 2951b48bab4..60ebe8d8846 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -1916,14 +1916,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
((NestedValueProvider) overriddenTransform.getTopicProvider()).propertyName());
}
} else {
- DataflowPipelineOptions options =
- input.getPipeline().getOptions().as(DataflowPipelineOptions.class);
- if (options.getEnableDynamicPubsubDestinations()) {
- stepContext.addInput(PropertyNames.PUBSUB_DYNAMIC_DESTINATIONS, true);
- } else {
- throw new RuntimeException(
- "Dynamic Pubsub destinations not yet supported. Topic must be set.");
- }
+ stepContext.addInput(PropertyNames.PUBSUB_DYNAMIC_DESTINATIONS, true);
}
if (overriddenTransform.getTimestampAttribute() != null) {
stepContext.addInput(
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
index bf1027cdd3a..f87af28ca61 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
@@ -153,13 +153,6 @@ public interface DataflowPipelineOptions
void setDataflowWorkerJar(String dataflowWorkerJafr);
- // Disable this support for now until the Dataflow backend fully supports this option.
- @Description("Whether to allow dynamic pubsub destinations. Temporary option: will be removed.")
- @Default.Boolean(false)
- Boolean getEnableDynamicPubsubDestinations();
-
- void setEnableDynamicPubsubDestinations(Boolean enable);
-
/** Set of available Flexible Resource Scheduling goals. */
enum FlexResourceSchedulingGoal {
/** No goal specified. */
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index d16afeb91e0..84f1069f592 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -2505,7 +2505,6 @@ public class DataflowRunnerTest implements Serializable {
PipelineOptions options = buildPipelineOptions();
DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
dataflowOptions.setStreaming(true);
- dataflowOptions.setEnableDynamicPubsubDestinations(true);
Pipeline p = Pipeline.create(options);
List<PubsubMessage> testValues =