You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2019/10/25 00:26:06 UTC

[beam] branch revert-9854-BEAM-8457 created (now 1d59072)

This is an automated email from the ASF dual-hosted git repository.

altay pushed a change to branch revert-9854-BEAM-8457
in repository https://gitbox.apache.org/repos/asf/beam.git.


      at 1d59072  Revert "Merge pull request #9854 from [BEAM-8457] Label Dataflow jobs from Notebook"

This branch includes the following new commits:

     new 1d59072  Revert "Merge pull request #9854 from [BEAM-8457] Label Dataflow jobs from Notebook"

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[beam] 01/01: Revert "Merge pull request #9854 from [BEAM-8457] Label Dataflow jobs from Notebook"

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch revert-9854-BEAM-8457
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 1d59072b835283c48e5d047074e67e2db2911171
Author: Ahmet Altay <aa...@gmail.com>
AuthorDate: Thu Oct 24 17:25:22 2019 -0700

    Revert "Merge pull request #9854 from [BEAM-8457] Label Dataflow jobs from Notebook"
    
    This reverts commit 1a8391da9222ab8d0493b0007bd60bdbeeb5e275.
---
 sdks/python/apache_beam/pipeline.py                | 48 ++++------------------
 .../runners/dataflow/dataflow_runner.py            | 10 -----
 .../runners/interactive/interactive_runner.py      |  2 +-
 3 files changed, 9 insertions(+), 51 deletions(-)

diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index 5574a82..a776d30 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -171,10 +171,6 @@ class Pipeline(object):
     # If a transform is applied and the full label is already in the set
     # then the transform will have to be cloned with a new label.
     self.applied_labels = set()
-    # A boolean value indicating whether the pipeline is created in an
-    # interactive environment such as interactive notebooks. Initialized as
-    # None. The value is set ad hoc when `pipeline.run()` is invoked.
-    self.interactive = None
 
   @property
   @deprecated(since='First stable release',
@@ -399,56 +395,28 @@ class Pipeline(object):
     for override in replacements:
       self._check_replacement(override)
 
-  def run(self, test_runner_api=True, runner=None, options=None,
-          interactive=None):
-    """Runs the pipeline. Returns whatever our runner returns after running.
-
-    If another runner instance and options are provided, that runner will
-    execute the pipeline with the given options. If either of them is not set,
-    a ValueError is raised. The usage is similar to directly invoking
-    `runner.run_pipeline(pipeline, options)`.
-    Additionally, an interactive field can be set to override the pipeline's
-    self.interactive field to mark current pipeline as being initiated from an
-    interactive environment.
-    """
-    from apache_beam.runners.interactive import interactive_runner
-    if interactive:
-      self.interactive = interactive
-    elif isinstance(self.runner, interactive_runner.InteractiveRunner):
-      self.interactive = True
-    else:
-      self.interactive = False
-    runner_in_use = self.runner
-    options_in_use = self._options
-    if runner and options:
-      runner_in_use = runner
-      options_in_use = options
-    elif not runner and options:
-      raise ValueError('Parameter runner is not given when parameter options '
-                       'is given.')
-    elif not options and runner:
-      raise ValueError('Parameter options is not given when parameter runner '
-                       'is given.')
+  def run(self, test_runner_api=True):
+    """Runs the pipeline. Returns whatever our runner returns after running."""
+
     # When possible, invoke a round trip through the runner API.
     if test_runner_api and self._verify_runner_api_compatible():
       return Pipeline.from_runner_api(
           self.to_runner_api(use_fake_coders=True),
-          runner_in_use,
-          options_in_use).run(test_runner_api=False,
-                              interactive=self.interactive)
+          self.runner,
+          self._options).run(False)
 
-    if options_in_use.view_as(TypeOptions).runtime_type_check:
+    if self._options.view_as(TypeOptions).runtime_type_check:
       from apache_beam.typehints import typecheck
       self.visit(typecheck.TypeCheckVisitor())
 
-    if options_in_use.view_as(SetupOptions).save_main_session:
+    if self._options.view_as(SetupOptions).save_main_session:
       # If this option is chosen, verify we can pickle the main session early.
       tmpdir = tempfile.mkdtemp()
       try:
         pickler.dump_session(os.path.join(tmpdir, 'main_session.pickle'))
       finally:
         shutil.rmtree(tmpdir)
-    return runner_in_use.run_pipeline(self, options_in_use)
+    return self.runner.run_pipeline(self, self._options)
 
   def __enter__(self):
     return self
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index f57be74..4928550 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -364,16 +364,6 @@ class DataflowRunner(PipelineRunner):
 
   def run_pipeline(self, pipeline, options):
     """Remotely executes entire pipeline or parts reachable from node."""
-    # Label goog-dataflow-notebook if pipeline is initiated from interactive
-    # runner.
-    if pipeline.interactive:
-      notebook_version = ('goog-dataflow-notebook=' +
-                          beam.version.__version__.replace('.', '_'))
-      if options.view_as(GoogleCloudOptions).labels:
-        options.view_as(GoogleCloudOptions).labels.append(notebook_version)
-      else:
-        options.view_as(GoogleCloudOptions).labels = [notebook_version]
-
     # Import here to avoid adding the dependency for local running scenarios.
     try:
       # pylint: disable=wrong-import-order, wrong-import-position
diff --git a/sdks/python/apache_beam/runners/interactive/interactive_runner.py b/sdks/python/apache_beam/runners/interactive/interactive_runner.py
index 56a3c18..94c0de7 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_runner.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_runner.py
@@ -146,7 +146,7 @@ class InteractiveRunner(runners.PipelineRunner):
         cache_manager=self._cache_manager,
         pipeline_graph_renderer=self._renderer)
     display.start_periodic_update()
-    result = pipeline_to_execute.run(interactive=True)
+    result = pipeline_to_execute.run()
     result.wait_until_finish()
     display.stop_periodic_update()