You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2019/10/25 00:26:07 UTC
[beam] 01/01: Revert "Merge pull request #9854 from [BEAM-8457]
Label Dataflow jobs from Notebook"
This is an automated email from the ASF dual-hosted git repository.
altay pushed a commit to branch revert-9854-BEAM-8457
in repository https://gitbox.apache.org/repos/asf/beam.git
commit 1d59072b835283c48e5d047074e67e2db2911171
Author: Ahmet Altay <aa...@gmail.com>
AuthorDate: Thu Oct 24 17:25:22 2019 -0700
Revert "Merge pull request #9854 from [BEAM-8457] Label Dataflow jobs from Notebook"
This reverts commit 1a8391da9222ab8d0493b0007bd60bdbeeb5e275.
---
sdks/python/apache_beam/pipeline.py | 48 ++++------------------
.../runners/dataflow/dataflow_runner.py | 10 -----
.../runners/interactive/interactive_runner.py | 2 +-
3 files changed, 9 insertions(+), 51 deletions(-)
diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index 5574a82..a776d30 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -171,10 +171,6 @@ class Pipeline(object):
# If a transform is applied and the full label is already in the set
# then the transform will have to be cloned with a new label.
self.applied_labels = set()
- # A boolean value indicating whether the pipeline is created in an
- # interactive environment such as interactive notebooks. Initialized as
- # None. The value is set ad hoc when `pipeline.run()` is invoked.
- self.interactive = None
@property
@deprecated(since='First stable release',
@@ -399,56 +395,28 @@ class Pipeline(object):
for override in replacements:
self._check_replacement(override)
- def run(self, test_runner_api=True, runner=None, options=None,
- interactive=None):
- """Runs the pipeline. Returns whatever our runner returns after running.
-
- If another runner instance and options are provided, that runner will
- execute the pipeline with the given options. If either of them is not set,
- a ValueError is raised. The usage is similar to directly invoking
- `runner.run_pipeline(pipeline, options)`.
- Additionally, an interactive field can be set to override the pipeline's
- self.interactive field to mark current pipeline as being initiated from an
- interactive environment.
- """
- from apache_beam.runners.interactive import interactive_runner
- if interactive:
- self.interactive = interactive
- elif isinstance(self.runner, interactive_runner.InteractiveRunner):
- self.interactive = True
- else:
- self.interactive = False
- runner_in_use = self.runner
- options_in_use = self._options
- if runner and options:
- runner_in_use = runner
- options_in_use = options
- elif not runner and options:
- raise ValueError('Parameter runner is not given when parameter options '
- 'is given.')
- elif not options and runner:
- raise ValueError('Parameter options is not given when parameter runner '
- 'is given.')
+ def run(self, test_runner_api=True):
+ """Runs the pipeline. Returns whatever our runner returns after running."""
+
# When possible, invoke a round trip through the runner API.
if test_runner_api and self._verify_runner_api_compatible():
return Pipeline.from_runner_api(
self.to_runner_api(use_fake_coders=True),
- runner_in_use,
- options_in_use).run(test_runner_api=False,
- interactive=self.interactive)
+ self.runner,
+ self._options).run(False)
- if options_in_use.view_as(TypeOptions).runtime_type_check:
+ if self._options.view_as(TypeOptions).runtime_type_check:
from apache_beam.typehints import typecheck
self.visit(typecheck.TypeCheckVisitor())
- if options_in_use.view_as(SetupOptions).save_main_session:
+ if self._options.view_as(SetupOptions).save_main_session:
# If this option is chosen, verify we can pickle the main session early.
tmpdir = tempfile.mkdtemp()
try:
pickler.dump_session(os.path.join(tmpdir, 'main_session.pickle'))
finally:
shutil.rmtree(tmpdir)
- return runner_in_use.run_pipeline(self, options_in_use)
+ return self.runner.run_pipeline(self, self._options)
def __enter__(self):
return self
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index f57be74..4928550 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -364,16 +364,6 @@ class DataflowRunner(PipelineRunner):
def run_pipeline(self, pipeline, options):
"""Remotely executes entire pipeline or parts reachable from node."""
- # Label goog-dataflow-notebook if pipeline is initiated from interactive
- # runner.
- if pipeline.interactive:
- notebook_version = ('goog-dataflow-notebook=' +
- beam.version.__version__.replace('.', '_'))
- if options.view_as(GoogleCloudOptions).labels:
- options.view_as(GoogleCloudOptions).labels.append(notebook_version)
- else:
- options.view_as(GoogleCloudOptions).labels = [notebook_version]
-
# Import here to avoid adding the dependency for local running scenarios.
try:
# pylint: disable=wrong-import-order, wrong-import-position
diff --git a/sdks/python/apache_beam/runners/interactive/interactive_runner.py b/sdks/python/apache_beam/runners/interactive/interactive_runner.py
index 56a3c18..94c0de7 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_runner.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_runner.py
@@ -146,7 +146,7 @@ class InteractiveRunner(runners.PipelineRunner):
cache_manager=self._cache_manager,
pipeline_graph_renderer=self._renderer)
display.start_periodic_update()
- result = pipeline_to_execute.run(interactive=True)
+ result = pipeline_to_execute.run()
result.wait_until_finish()
display.stop_periodic_update()