You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ka...@apache.org on 2020/11/02 13:35:22 UTC

[beam] branch master updated: [BEAM-5939] - Deduplicate constants (#13142)

This is an automated email from the ASF dual-hosted git repository.

kamilwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new f4bf9c5  [BEAM-5939] - Deduplicate constants (#13142)
f4bf9c5 is described below

commit f4bf9c509450b8c76ff286da2d771965221d6e85
Author: tszerszen <48...@users.noreply.github.com>
AuthorDate: Mon Nov 2 14:34:26 2020 +0100

    [BEAM-5939] - Deduplicate constants (#13142)
---
 .../apache_beam/runners/dataflow/internal/names.py | 15 ++---------
 sdks/python/apache_beam/runners/internal/names.py  |  5 ++++
 .../apache_beam/runners/portability/stager.py      |  7 ++---
 .../apache_beam/runners/portability/stager_test.py | 30 +++++++++++-----------
 4 files changed, 24 insertions(+), 33 deletions(-)

diff --git a/sdks/python/apache_beam/runners/dataflow/internal/names.py b/sdks/python/apache_beam/runners/dataflow/internal/names.py
index 3fc9c73..3aba410 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/names.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/names.py
@@ -27,7 +27,8 @@ from __future__ import absolute_import
 # Standard file names used for staging files.
 from builtins import object
 
-DATAFLOW_SDK_TARBALL_FILE = 'dataflow_python_sdk.tar'
+# Referenced by Dataflow legacy worker.
+from apache_beam.runners.internal.names import PICKLED_MAIN_SESSION_FILE  # pylint: disable=unused-import
 
 # String constants related to sources framework
 SOURCE_FORMAT = 'custom_source'
@@ -45,18 +46,6 @@ BEAM_CONTAINER_VERSION = 'beam-master-20201005'
 # requires changes to SDK harness container or SDK harness launcher.
 BEAM_FNAPI_CONTAINER_VERSION = 'beam-master-20201005'
 
-# TODO(BEAM-5939): Remove these shared names once Dataflow worker is updated.
-PICKLED_MAIN_SESSION_FILE = 'pickled_main_session'
-STAGED_PIPELINE_FILENAME = 'pipeline.pb'
-STAGED_PIPELINE_URL_METADATA_FIELD = 'pipeline_url'
-
-# Package names for different distributions
-BEAM_PACKAGE_NAME = 'apache-beam'
-
-# SDK identifiers for different distributions
-BEAM_SDK_NAME = 'Apache Beam SDK for Python'
-# TODO(BEAM-5393): End duplicated constants (see above).
-
 DATAFLOW_CONTAINER_IMAGE_REPOSITORY = 'gcr.io/cloud-dataflow/v1beta3'
 
 
diff --git a/sdks/python/apache_beam/runners/internal/names.py b/sdks/python/apache_beam/runners/internal/names.py
index c7d1b84..ff6010d 100644
--- a/sdks/python/apache_beam/runners/internal/names.py
+++ b/sdks/python/apache_beam/runners/internal/names.py
@@ -20,8 +20,13 @@
 # All constants are for internal use only; no backwards-compatibility
 # guarantees.
 
+# Current value is hardcoded in Dataflow internal infrastructure;
+# please don't change without a review from Dataflow maintainers.
+STAGED_SDK_SOURCES_FILENAME = 'dataflow_python_sdk.tar'
+
 PICKLED_MAIN_SESSION_FILE = 'pickled_main_session'
 STAGED_PIPELINE_FILENAME = "pipeline.pb"
+STAGED_PIPELINE_URL_METADATA_FIELD = 'pipeline_url'
 
 # Package names for different distributions
 BEAM_PACKAGE_NAME = 'apache-beam'
diff --git a/sdks/python/apache_beam/runners/portability/stager.py b/sdks/python/apache_beam/runners/portability/stager.py
index f1a820f..3aa8937 100644
--- a/sdks/python/apache_beam/runners/portability/stager.py
+++ b/sdks/python/apache_beam/runners/portability/stager.py
@@ -69,8 +69,6 @@ from apache_beam.options.pipeline_options import DebugOptions
 from apache_beam.options.pipeline_options import PipelineOptions  # pylint: disable=unused-import
 from apache_beam.options.pipeline_options import SetupOptions
 from apache_beam.options.pipeline_options import WorkerOptions
-# TODO(angoenka): Remove reference to dataflow internal names
-from apache_beam.runners.dataflow.internal.names import DATAFLOW_SDK_TARBALL_FILE
 from apache_beam.runners.internal import names
 from apache_beam.utils import processes
 from apache_beam.utils import retry
@@ -231,9 +229,8 @@ class Stager(object):
           # This branch is also used by internal tests running with the SDK
           # built at head.
           if os.path.isdir(setup_options.sdk_location):
-            # TODO(angoenka): remove reference to Dataflow
             sdk_path = os.path.join(
-                setup_options.sdk_location, DATAFLOW_SDK_TARBALL_FILE)
+                setup_options.sdk_location, names.STAGED_SDK_SOURCES_FILENAME)
           else:
             sdk_path = setup_options.sdk_location
 
@@ -613,7 +610,7 @@ class Stager(object):
       else:
         raise RuntimeError('Unrecognized SDK wheel file: %s' % sdk_location)
     else:
-      return DATAFLOW_SDK_TARBALL_FILE
+      return names.STAGED_SDK_SOURCES_FILENAME
 
   @staticmethod
   def _create_beam_sdk(sdk_remote_location, temp_dir):
diff --git a/sdks/python/apache_beam/runners/portability/stager_test.py b/sdks/python/apache_beam/runners/portability/stager_test.py
index 5961632..ea676af 100644
--- a/sdks/python/apache_beam/runners/portability/stager_test.py
+++ b/sdks/python/apache_beam/runners/portability/stager_test.py
@@ -36,8 +36,7 @@ from apache_beam.io.filesystems import FileSystems
 from apache_beam.options.pipeline_options import DebugOptions
 from apache_beam.options.pipeline_options import PipelineOptions
 from apache_beam.options.pipeline_options import SetupOptions
-from apache_beam.runners.dataflow.internal import names
-from apache_beam.runners.internal import names as shared_names
+from apache_beam.runners.internal import names
 from apache_beam.runners.portability import stager
 
 _LOGGER = logging.getLogger(__name__)
@@ -178,12 +177,12 @@ class StagerTest(unittest.TestCase):
     options.view_as(SetupOptions).save_main_session = True
     self.update_options(options)
 
-    self.assertEqual([shared_names.PICKLED_MAIN_SESSION_FILE],
+    self.assertEqual([names.PICKLED_MAIN_SESSION_FILE],
                      self.stager.create_and_stage_job_resources(
                          options, staging_location=staging_dir)[1])
     self.assertTrue(
         os.path.isfile(
-            os.path.join(staging_dir, shared_names.PICKLED_MAIN_SESSION_FILE)))
+            os.path.join(staging_dir, names.PICKLED_MAIN_SESSION_FILE)))
 
   def test_default_resources(self):
     staging_dir = self.make_temp_dir()
@@ -336,9 +335,10 @@ class StagerTest(unittest.TestCase):
       _, staged_resources = self.stager.create_and_stage_job_resources(
           options, temp_dir=self.make_temp_dir(), staging_location=staging_dir)
 
-    self.assertEqual([names.DATAFLOW_SDK_TARBALL_FILE], staged_resources)
+    self.assertEqual([names.STAGED_SDK_SOURCES_FILENAME], staged_resources)
 
-    with open(os.path.join(staging_dir, names.DATAFLOW_SDK_TARBALL_FILE)) as f:
+    with open(os.path.join(staging_dir,
+                           names.STAGED_SDK_SOURCES_FILENAME)) as f:
       self.assertEqual(f.read(), 'Package content.')
 
   def test_sdk_location_default_with_wheels(self):
@@ -355,7 +355,7 @@ class StagerTest(unittest.TestCase):
           options, temp_dir=self.make_temp_dir(), staging_location=staging_dir)
 
       self.assertEqual(len(staged_resources), 2)
-      self.assertEqual(staged_resources[0], names.DATAFLOW_SDK_TARBALL_FILE)
+      self.assertEqual(staged_resources[0], names.STAGED_SDK_SOURCES_FILENAME)
       # Exact name depends on the version of the SDK.
       self.assertTrue(staged_resources[1].endswith('whl'))
       for name in staged_resources:
@@ -366,17 +366,17 @@ class StagerTest(unittest.TestCase):
     staging_dir = self.make_temp_dir()
     sdk_location = self.make_temp_dir()
     self.create_temp_file(
-        os.path.join(sdk_location, names.DATAFLOW_SDK_TARBALL_FILE),
+        os.path.join(sdk_location, names.STAGED_SDK_SOURCES_FILENAME),
         'Package content.')
 
     options = PipelineOptions()
     self.update_options(options)
     options.view_as(SetupOptions).sdk_location = sdk_location
 
-    self.assertEqual([names.DATAFLOW_SDK_TARBALL_FILE],
+    self.assertEqual([names.STAGED_SDK_SOURCES_FILENAME],
                      self.stager.create_and_stage_job_resources(
                          options, staging_location=staging_dir)[1])
-    tarball_path = os.path.join(staging_dir, names.DATAFLOW_SDK_TARBALL_FILE)
+    tarball_path = os.path.join(staging_dir, names.STAGED_SDK_SOURCES_FILENAME)
     with open(tarball_path) as f:
       self.assertEqual(f.read(), 'Package content.')
 
@@ -391,10 +391,10 @@ class StagerTest(unittest.TestCase):
     self.update_options(options)
     options.view_as(SetupOptions).sdk_location = sdk_location
 
-    self.assertEqual([names.DATAFLOW_SDK_TARBALL_FILE],
+    self.assertEqual([names.STAGED_SDK_SOURCES_FILENAME],
                      self.stager.create_and_stage_job_resources(
                          options, staging_location=staging_dir)[1])
-    tarball_path = os.path.join(staging_dir, names.DATAFLOW_SDK_TARBALL_FILE)
+    tarball_path = os.path.join(staging_dir, names.STAGED_SDK_SOURCES_FILENAME)
     with open(tarball_path) as f:
       self.assertEqual(f.read(), 'Package content.')
 
@@ -445,7 +445,7 @@ class StagerTest(unittest.TestCase):
     self.update_options(options)
     options.view_as(SetupOptions).sdk_location = sdk_location
 
-    self.assertEqual([names.DATAFLOW_SDK_TARBALL_FILE],
+    self.assertEqual([names.STAGED_SDK_SOURCES_FILENAME],
                      self.stager.create_and_stage_job_resources(
                          options, staging_location=staging_dir)[1])
 
@@ -491,11 +491,11 @@ class StagerTest(unittest.TestCase):
     with mock.patch('apache_beam.runners.portability.stager_test'
                     '.stager.Stager._download_file',
                     staticmethod(file_download)):
-      self.assertEqual([names.DATAFLOW_SDK_TARBALL_FILE],
+      self.assertEqual([names.STAGED_SDK_SOURCES_FILENAME],
                        self.stager.create_and_stage_job_resources(
                            options, staging_location=staging_dir)[1])
 
-    tarball_path = os.path.join(staging_dir, names.DATAFLOW_SDK_TARBALL_FILE)
+    tarball_path = os.path.join(staging_dir, names.STAGED_SDK_SOURCES_FILENAME)
     with open(tarball_path) as f:
       self.assertEqual(f.read(), 'Package content.')