You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/07/08 01:29:55 UTC

[1/2] beam git commit: Set the type of batch jobs to FNAPI_BATCH when beam_fn_api experiment is specified.

Repository: beam
Updated Branches:
  refs/heads/master 56e4251de -> dba514079


Set the type of batch jobs to FNAPI_BATCH when beam_fn_api experiment is specified.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1fa3bfe9
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1fa3bfe9
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1fa3bfe9

Branch: refs/heads/master
Commit: 1fa3bfe92bc59a85bfcf12c47c68206757ce238a
Parents: 56e4251
Author: Valentyn Tymofieiev <va...@google.com>
Authored: Fri Jul 7 15:14:56 2017 -0700
Committer: Valentyn Tymofieiev <va...@google.com>
Committed: Fri Jul 7 15:14:56 2017 -0700

----------------------------------------------------------------------
 .../runners/dataflow/dataflow_runner.py         | 16 ++---------
 .../runners/dataflow/internal/apiclient.py      | 29 ++++++++++++++++++--
 .../runners/dataflow/internal/apiclient_test.py |  5 +---
 3 files changed, 29 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/1fa3bfe9/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 57bcc5e..059e139 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -46,8 +46,8 @@ from apache_beam.runners.runner import PipelineRunner
 from apache_beam.runners.runner import PipelineState
 from apache_beam.transforms.display import DisplayData
 from apache_beam.typehints import typehints
-from apache_beam.options.pipeline_options import StandardOptions
 from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.options.pipeline_options import StandardOptions
 from apache_beam.options.pipeline_options import TestOptions
 from apache_beam.utils.plugin import BeamPlugin
 
@@ -65,12 +65,6 @@ class DataflowRunner(PipelineRunner):
   if blocking is set to False.
   """
 
-  # Environment version information. It is passed to the service during a
-  # a job submission and is used by the service to establish what features
-  # are expected by the workers.
-  BATCH_ENVIRONMENT_MAJOR_VERSION = '6'
-  STREAMING_ENVIRONMENT_MAJOR_VERSION = '1'
-
   # A list of PTransformOverride objects to be applied before running a pipeline
   # using DataflowRunner.
   # Currently this only works for overrides where the input and output types do
@@ -268,15 +262,9 @@ class DataflowRunner(PipelineRunner):
     if test_options.dry_run:
       return None
 
-    standard_options = pipeline._options.view_as(StandardOptions)
-    if standard_options.streaming:
-      job_version = DataflowRunner.STREAMING_ENVIRONMENT_MAJOR_VERSION
-    else:
-      job_version = DataflowRunner.BATCH_ENVIRONMENT_MAJOR_VERSION
-
     # Get a Dataflow API client and set its options
     self.dataflow_client = apiclient.DataflowApplicationClient(
-        pipeline._options, job_version)
+        pipeline._options)
 
     # Create the job
     result = DataflowPipelineResult(

http://git-wip-us.apache.org/repos/asf/beam/blob/1fa3bfe9/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index edac9d7..33dfe19 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -49,6 +49,13 @@ from apache_beam.options.pipeline_options import StandardOptions
 from apache_beam.options.pipeline_options import WorkerOptions
 
 
+# Environment version information. It is passed to the service during a
+# a job submission and is used by the service to establish what features
+# are expected by the workers.
+_LEGACY_ENVIRONMENT_MAJOR_VERSION = '6'
+_FNAPI_ENVIRONMENT_MAJOR_VERSION = '1'
+
+
 class Step(object):
   """Wrapper for a dataflow Step protobuf."""
 
@@ -146,7 +153,10 @@ class Environment(object):
     if self.standard_options.streaming:
       job_type = 'FNAPI_STREAMING'
     else:
-      job_type = 'PYTHON_BATCH'
+      if _use_fnapi(options):
+        job_type = 'FNAPI_BATCH'
+      else:
+        job_type = 'PYTHON_BATCH'
     self.proto.version.additionalProperties.extend([
         dataflow.Environment.VersionValue.AdditionalProperty(
             key='job_type',
@@ -360,11 +370,16 @@ class Job(object):
 class DataflowApplicationClient(object):
   """A Dataflow API client used by application code to create and query jobs."""
 
-  def __init__(self, options, environment_version):
+  def __init__(self, options):
     """Initializes a Dataflow API client object."""
     self.standard_options = options.view_as(StandardOptions)
     self.google_cloud_options = options.view_as(GoogleCloudOptions)
-    self.environment_version = environment_version
+
+    if _use_fnapi(options):
+      self.environment_version = _FNAPI_ENVIRONMENT_MAJOR_VERSION
+    else:
+      self.environment_version = _LEGACY_ENVIRONMENT_MAJOR_VERSION
+
     if self.google_cloud_options.no_auth:
       credentials = None
     else:
@@ -706,6 +721,14 @@ def translate_mean(accumulator, metric_update):
     metric_update.kind = None
 
 
+def _use_fnapi(pipeline_options):
+  standard_options = pipeline_options.view_as(StandardOptions)
+  debug_options = pipeline_options.view_as(DebugOptions)
+
+  return standard_options.streaming or (
+      debug_options.experiments and 'beam_fn_api' in debug_options.experiments)
+
+
 # To enable a counter on the service, add it to this dictionary.
 metric_translations = {
     cy_combiners.CountCombineFn: ('sum', translate_scalar),

http://git-wip-us.apache.org/repos/asf/beam/blob/1fa3bfe9/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
index 55211f7..407ffcf 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
@@ -22,7 +22,6 @@ from mock import Mock
 from apache_beam.metrics.cells import DistributionData
 from apache_beam.options.pipeline_options import PipelineOptions
 
-from apache_beam.runners.dataflow.dataflow_runner import DataflowRunner
 from apache_beam.runners.dataflow.internal.clients import dataflow
 
 # Protect against environments where apitools library is not available.
@@ -40,9 +39,7 @@ class UtilTest(unittest.TestCase):
   @unittest.skip("Enable once BEAM-1080 is fixed.")
   def test_create_application_client(self):
     pipeline_options = PipelineOptions()
-    apiclient.DataflowApplicationClient(
-        pipeline_options,
-        DataflowRunner.BATCH_ENVIRONMENT_MAJOR_VERSION)
+    apiclient.DataflowApplicationClient(pipeline_options)
 
   def test_set_network(self):
     pipeline_options = PipelineOptions(


[2/2] beam git commit: This closes #3522

Posted by al...@apache.org.
This closes #3522


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/dba51407
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/dba51407
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/dba51407

Branch: refs/heads/master
Commit: dba5140792969182b85e449eb2f5630bd45710ca
Parents: 56e4251 1fa3bfe
Author: Ahmet Altay <al...@google.com>
Authored: Fri Jul 7 18:29:30 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Fri Jul 7 18:29:30 2017 -0700

----------------------------------------------------------------------
 .../runners/dataflow/dataflow_runner.py         | 16 ++---------
 .../runners/dataflow/internal/apiclient.py      | 29 ++++++++++++++++++--
 .../runners/dataflow/internal/apiclient_test.py |  5 +---
 3 files changed, 29 insertions(+), 21 deletions(-)
----------------------------------------------------------------------