You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ka...@apache.org on 2020/11/02 13:35:22 UTC
[beam] branch master updated: [BEAM-5939] - Deduplicate constants
(#13142)
This is an automated email from the ASF dual-hosted git repository.
kamilwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new f4bf9c5 [BEAM-5939] - Deduplicate constants (#13142)
f4bf9c5 is described below
commit f4bf9c509450b8c76ff286da2d771965221d6e85
Author: tszerszen <48...@users.noreply.github.com>
AuthorDate: Mon Nov 2 14:34:26 2020 +0100
[BEAM-5939] - Deduplicate constants (#13142)
---
.../apache_beam/runners/dataflow/internal/names.py | 15 ++---------
sdks/python/apache_beam/runners/internal/names.py | 5 ++++
.../apache_beam/runners/portability/stager.py | 7 ++---
.../apache_beam/runners/portability/stager_test.py | 30 +++++++++++-----------
4 files changed, 24 insertions(+), 33 deletions(-)
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/names.py b/sdks/python/apache_beam/runners/dataflow/internal/names.py
index 3fc9c73..3aba410 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/names.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/names.py
@@ -27,7 +27,8 @@ from __future__ import absolute_import
# Standard file names used for staging files.
from builtins import object
-DATAFLOW_SDK_TARBALL_FILE = 'dataflow_python_sdk.tar'
+# Referenced by Dataflow legacy worker.
+from apache_beam.runners.internal.names import PICKLED_MAIN_SESSION_FILE # pylint: disable=unused-import
# String constants related to sources framework
SOURCE_FORMAT = 'custom_source'
@@ -45,18 +46,6 @@ BEAM_CONTAINER_VERSION = 'beam-master-20201005'
# requires changes to SDK harness container or SDK harness launcher.
BEAM_FNAPI_CONTAINER_VERSION = 'beam-master-20201005'
-# TODO(BEAM-5939): Remove these shared names once Dataflow worker is updated.
-PICKLED_MAIN_SESSION_FILE = 'pickled_main_session'
-STAGED_PIPELINE_FILENAME = 'pipeline.pb'
-STAGED_PIPELINE_URL_METADATA_FIELD = 'pipeline_url'
-
-# Package names for different distributions
-BEAM_PACKAGE_NAME = 'apache-beam'
-
-# SDK identifiers for different distributions
-BEAM_SDK_NAME = 'Apache Beam SDK for Python'
-# TODO(BEAM-5393): End duplicated constants (see above).
-
DATAFLOW_CONTAINER_IMAGE_REPOSITORY = 'gcr.io/cloud-dataflow/v1beta3'
diff --git a/sdks/python/apache_beam/runners/internal/names.py b/sdks/python/apache_beam/runners/internal/names.py
index c7d1b84..ff6010d 100644
--- a/sdks/python/apache_beam/runners/internal/names.py
+++ b/sdks/python/apache_beam/runners/internal/names.py
@@ -20,8 +20,13 @@
# All constants are for internal use only; no backwards-compatibility
# guarantees.
+# Current value is hardcoded in Dataflow internal infrastructure;
+# please don't change without a review from Dataflow maintainers.
+STAGED_SDK_SOURCES_FILENAME = 'dataflow_python_sdk.tar'
+
PICKLED_MAIN_SESSION_FILE = 'pickled_main_session'
STAGED_PIPELINE_FILENAME = "pipeline.pb"
+STAGED_PIPELINE_URL_METADATA_FIELD = 'pipeline_url'
# Package names for different distributions
BEAM_PACKAGE_NAME = 'apache-beam'
diff --git a/sdks/python/apache_beam/runners/portability/stager.py b/sdks/python/apache_beam/runners/portability/stager.py
index f1a820f..3aa8937 100644
--- a/sdks/python/apache_beam/runners/portability/stager.py
+++ b/sdks/python/apache_beam/runners/portability/stager.py
@@ -69,8 +69,6 @@ from apache_beam.options.pipeline_options import DebugOptions
from apache_beam.options.pipeline_options import PipelineOptions # pylint: disable=unused-import
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import WorkerOptions
-# TODO(angoenka): Remove reference to dataflow internal names
-from apache_beam.runners.dataflow.internal.names import DATAFLOW_SDK_TARBALL_FILE
from apache_beam.runners.internal import names
from apache_beam.utils import processes
from apache_beam.utils import retry
@@ -231,9 +229,8 @@ class Stager(object):
# This branch is also used by internal tests running with the SDK
# built at head.
if os.path.isdir(setup_options.sdk_location):
- # TODO(angoenka): remove reference to Dataflow
sdk_path = os.path.join(
- setup_options.sdk_location, DATAFLOW_SDK_TARBALL_FILE)
+ setup_options.sdk_location, names.STAGED_SDK_SOURCES_FILENAME)
else:
sdk_path = setup_options.sdk_location
@@ -613,7 +610,7 @@ class Stager(object):
else:
raise RuntimeError('Unrecognized SDK wheel file: %s' % sdk_location)
else:
- return DATAFLOW_SDK_TARBALL_FILE
+ return names.STAGED_SDK_SOURCES_FILENAME
@staticmethod
def _create_beam_sdk(sdk_remote_location, temp_dir):
diff --git a/sdks/python/apache_beam/runners/portability/stager_test.py b/sdks/python/apache_beam/runners/portability/stager_test.py
index 5961632..ea676af 100644
--- a/sdks/python/apache_beam/runners/portability/stager_test.py
+++ b/sdks/python/apache_beam/runners/portability/stager_test.py
@@ -36,8 +36,7 @@ from apache_beam.io.filesystems import FileSystems
from apache_beam.options.pipeline_options import DebugOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
-from apache_beam.runners.dataflow.internal import names
-from apache_beam.runners.internal import names as shared_names
+from apache_beam.runners.internal import names
from apache_beam.runners.portability import stager
_LOGGER = logging.getLogger(__name__)
@@ -178,12 +177,12 @@ class StagerTest(unittest.TestCase):
options.view_as(SetupOptions).save_main_session = True
self.update_options(options)
- self.assertEqual([shared_names.PICKLED_MAIN_SESSION_FILE],
+ self.assertEqual([names.PICKLED_MAIN_SESSION_FILE],
self.stager.create_and_stage_job_resources(
options, staging_location=staging_dir)[1])
self.assertTrue(
os.path.isfile(
- os.path.join(staging_dir, shared_names.PICKLED_MAIN_SESSION_FILE)))
+ os.path.join(staging_dir, names.PICKLED_MAIN_SESSION_FILE)))
def test_default_resources(self):
staging_dir = self.make_temp_dir()
@@ -336,9 +335,10 @@ class StagerTest(unittest.TestCase):
_, staged_resources = self.stager.create_and_stage_job_resources(
options, temp_dir=self.make_temp_dir(), staging_location=staging_dir)
- self.assertEqual([names.DATAFLOW_SDK_TARBALL_FILE], staged_resources)
+ self.assertEqual([names.STAGED_SDK_SOURCES_FILENAME], staged_resources)
- with open(os.path.join(staging_dir, names.DATAFLOW_SDK_TARBALL_FILE)) as f:
+ with open(os.path.join(staging_dir,
+ names.STAGED_SDK_SOURCES_FILENAME)) as f:
self.assertEqual(f.read(), 'Package content.')
def test_sdk_location_default_with_wheels(self):
@@ -355,7 +355,7 @@ class StagerTest(unittest.TestCase):
options, temp_dir=self.make_temp_dir(), staging_location=staging_dir)
self.assertEqual(len(staged_resources), 2)
- self.assertEqual(staged_resources[0], names.DATAFLOW_SDK_TARBALL_FILE)
+ self.assertEqual(staged_resources[0], names.STAGED_SDK_SOURCES_FILENAME)
# Exact name depends on the version of the SDK.
self.assertTrue(staged_resources[1].endswith('whl'))
for name in staged_resources:
@@ -366,17 +366,17 @@ class StagerTest(unittest.TestCase):
staging_dir = self.make_temp_dir()
sdk_location = self.make_temp_dir()
self.create_temp_file(
- os.path.join(sdk_location, names.DATAFLOW_SDK_TARBALL_FILE),
+ os.path.join(sdk_location, names.STAGED_SDK_SOURCES_FILENAME),
'Package content.')
options = PipelineOptions()
self.update_options(options)
options.view_as(SetupOptions).sdk_location = sdk_location
- self.assertEqual([names.DATAFLOW_SDK_TARBALL_FILE],
+ self.assertEqual([names.STAGED_SDK_SOURCES_FILENAME],
self.stager.create_and_stage_job_resources(
options, staging_location=staging_dir)[1])
- tarball_path = os.path.join(staging_dir, names.DATAFLOW_SDK_TARBALL_FILE)
+ tarball_path = os.path.join(staging_dir, names.STAGED_SDK_SOURCES_FILENAME)
with open(tarball_path) as f:
self.assertEqual(f.read(), 'Package content.')
@@ -391,10 +391,10 @@ class StagerTest(unittest.TestCase):
self.update_options(options)
options.view_as(SetupOptions).sdk_location = sdk_location
- self.assertEqual([names.DATAFLOW_SDK_TARBALL_FILE],
+ self.assertEqual([names.STAGED_SDK_SOURCES_FILENAME],
self.stager.create_and_stage_job_resources(
options, staging_location=staging_dir)[1])
- tarball_path = os.path.join(staging_dir, names.DATAFLOW_SDK_TARBALL_FILE)
+ tarball_path = os.path.join(staging_dir, names.STAGED_SDK_SOURCES_FILENAME)
with open(tarball_path) as f:
self.assertEqual(f.read(), 'Package content.')
@@ -445,7 +445,7 @@ class StagerTest(unittest.TestCase):
self.update_options(options)
options.view_as(SetupOptions).sdk_location = sdk_location
- self.assertEqual([names.DATAFLOW_SDK_TARBALL_FILE],
+ self.assertEqual([names.STAGED_SDK_SOURCES_FILENAME],
self.stager.create_and_stage_job_resources(
options, staging_location=staging_dir)[1])
@@ -491,11 +491,11 @@ class StagerTest(unittest.TestCase):
with mock.patch('apache_beam.runners.portability.stager_test'
'.stager.Stager._download_file',
staticmethod(file_download)):
- self.assertEqual([names.DATAFLOW_SDK_TARBALL_FILE],
+ self.assertEqual([names.STAGED_SDK_SOURCES_FILENAME],
self.stager.create_and_stage_job_resources(
options, staging_location=staging_dir)[1])
- tarball_path = os.path.join(staging_dir, names.DATAFLOW_SDK_TARBALL_FILE)
+ tarball_path = os.path.join(staging_dir, names.STAGED_SDK_SOURCES_FILENAME)
with open(tarball_path) as f:
self.assertEqual(f.read(), 'Package content.')