You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bo...@apache.org on 2019/10/31 05:18:24 UTC

[beam] branch master updated: Not inject pubsub into Impulse when in fnapi streaming mode.

This is an automated email from the ASF dual-hosted git repository.

boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new eb05f56  Not inject pubsub into Impulse when in fnapi streaming mode.
     new 0573e10  Merge pull request #9932 from boyuanzz/windmill_create
eb05f56 is described below

commit eb05f5684e4a27ad363b507bbeef83773b8fc06c
Author: Boyuan Zhang <bo...@google.com>
AuthorDate: Tue Oct 29 18:18:51 2019 -0700

    Not inject pubsub into Impulse when in fnapi streaming mode.
---
 .../java/org/apache/beam/runners/dataflow/DataflowRunner.java     | 2 +-
 sdks/python/apache_beam/runners/dataflow/dataflow_runner.py       | 8 +++++---
 2 files changed, 6 insertions(+), 4 deletions(-)

diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 9f7394f..f6f78a8 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -1427,7 +1427,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
   private static class ImpulseTranslator implements TransformTranslator<Impulse> {
     @Override
     public void translate(Impulse transform, TranslationContext context) {
-      if (context.getPipelineOptions().isStreaming()) {
+      if (context.getPipelineOptions().isStreaming() && !context.isFnApi()) {
         StepTranslationContext stepContext = context.addStep(transform, "ParallelRead");
         stepContext.addInput(PropertyNames.FORMAT, "pubsub");
         stepContext.addInput(PropertyNames.PUBSUB_SUBSCRIPTION, "_starting_signal/");
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 4928550..039eaf0 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -622,9 +622,12 @@ class DataflowRunner(PipelineRunner):
 
   def run_Impulse(self, transform_node, options):
     standard_options = options.view_as(StandardOptions)
+    debug_options = options.view_as(DebugOptions)
+    use_fn_api = (debug_options.experiments and
+                  'beam_fn_api' in debug_options.experiments)
     step = self._add_step(
         TransformNames.READ, transform_node.full_label, transform_node)
-    if standard_options.streaming:
+    if standard_options.streaming and not use_fn_api:
       step.add_property(PropertyNames.FORMAT, 'pubsub')
       step.add_property(PropertyNames.PUBSUB_SUBSCRIPTION, '_starting_signal/')
     else:
@@ -634,8 +637,7 @@ class DataflowRunner(PipelineRunner):
           coders.coders.GlobalWindowCoder()).get_impl().encode_nested(
               window.GlobalWindows.windowed_value(b''))
 
-      from apache_beam.runners.dataflow.internal import apiclient
-      if apiclient._use_fnapi(options):
+      if use_fn_api:
         encoded_impulse_as_str = self.byte_array_to_json_string(
             encoded_impulse_element)
       else: