You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bo...@apache.org on 2019/10/31 05:18:24 UTC
[beam] branch master updated: Not inject pubsub into Impulse when
in fnapi streaming mode.
This is an automated email from the ASF dual-hosted git repository.
boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new eb05f56 Not inject pubsub into Impulse when in fnapi streaming mode.
new 0573e10 Merge pull request #9932 from boyuanzz/windmill_create
eb05f56 is described below
commit eb05f5684e4a27ad363b507bbeef83773b8fc06c
Author: Boyuan Zhang <bo...@google.com>
AuthorDate: Tue Oct 29 18:18:51 2019 -0700
Not inject pubsub into Impulse when in fnapi streaming mode.
---
.../java/org/apache/beam/runners/dataflow/DataflowRunner.java | 2 +-
sdks/python/apache_beam/runners/dataflow/dataflow_runner.py | 8 +++++---
2 files changed, 6 insertions(+), 4 deletions(-)
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 9f7394f..f6f78a8 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -1427,7 +1427,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
private static class ImpulseTranslator implements TransformTranslator<Impulse> {
@Override
public void translate(Impulse transform, TranslationContext context) {
- if (context.getPipelineOptions().isStreaming()) {
+ if (context.getPipelineOptions().isStreaming() && !context.isFnApi()) {
StepTranslationContext stepContext = context.addStep(transform, "ParallelRead");
stepContext.addInput(PropertyNames.FORMAT, "pubsub");
stepContext.addInput(PropertyNames.PUBSUB_SUBSCRIPTION, "_starting_signal/");
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 4928550..039eaf0 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -622,9 +622,12 @@ class DataflowRunner(PipelineRunner):
def run_Impulse(self, transform_node, options):
standard_options = options.view_as(StandardOptions)
+ debug_options = options.view_as(DebugOptions)
+ use_fn_api = (debug_options.experiments and
+ 'beam_fn_api' in debug_options.experiments)
step = self._add_step(
TransformNames.READ, transform_node.full_label, transform_node)
- if standard_options.streaming:
+ if standard_options.streaming and not use_fn_api:
step.add_property(PropertyNames.FORMAT, 'pubsub')
step.add_property(PropertyNames.PUBSUB_SUBSCRIPTION, '_starting_signal/')
else:
@@ -634,8 +637,7 @@ class DataflowRunner(PipelineRunner):
coders.coders.GlobalWindowCoder()).get_impl().encode_nested(
window.GlobalWindows.windowed_value(b''))
- from apache_beam.runners.dataflow.internal import apiclient
- if apiclient._use_fnapi(options):
+ if use_fn_api:
encoded_impulse_as_str = self.byte_array_to_json_string(
encoded_impulse_element)
else: