You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2019/05/23 20:57:42 UTC

[beam] branch master updated: [BEAM-7203] Signal Dataflow workers to use fastavro library on Python 3. (#8652)

This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 8d83576  [BEAM-7203] Signal Dataflow workers to use fastavro library on Python 3. (#8652)
8d83576 is described below

commit 8d83576df7a60f695fda55019b2965319de9a993
Author: tvalentyn <tv...@users.noreply.github.com>
AuthorDate: Thu May 23 13:57:18 2019 -0700

    [BEAM-7203] Signal Dataflow workers to use fastavro library on Python 3. (#8652)
---
 .../runners/dataflow/dataflow_runner.py            |  9 ++++++++
 .../runners/dataflow/dataflow_runner_test.py       | 25 ++++++++++++++++++++++
 2 files changed, 34 insertions(+)

diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 1a037f6..ff058fc 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -25,6 +25,7 @@ from __future__ import division
 
 import json
 import logging
+import sys
 import threading
 import time
 import traceback
@@ -431,6 +432,14 @@ class DataflowRunner(PipelineRunner):
       else:
         debug_options.add_experiment('use_staged_dataflow_worker_jar')
 
+    # Make Dataflow workers use FastAvro on Python 3 unless use_avro experiment
+    # is set. Note that use_avro is only interpreted by the Dataflow runner
+    # at job submission and is not interpreted by Dataflow service or workers,
+    # which by default use avro library unless use_fastavro experiment is set.
+    if sys.version_info[0] > 2 and (
+        not debug_options.lookup_experiment('use_avro')):
+      debug_options.add_experiment('use_fastavro')
+
     self.job = apiclient.Job(options, self.proto_pipeline)
 
     # Dataflow runner requires a KV type for GBK inputs, hence we enforce that
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
index c774b61..bbb2833 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
@@ -20,6 +20,7 @@
 from __future__ import absolute_import
 
 import json
+import sys
 import unittest
 from builtins import object
 from builtins import range
@@ -444,6 +445,30 @@ class DataflowRunnerTest(unittest.TestCase):
     self.assertIn('beam_fn_api', experiments_for_job)
     self.assertIn('use_staged_dataflow_worker_jar', experiments_for_job)
 
+  def test_use_fastavro_experiment_is_added_on_py3_and_onwards(self):
+    remote_runner = DataflowRunner()
+
+    p = Pipeline(remote_runner, PipelineOptions(self.default_properties))
+    p | ptransform.Create([1])  # pylint: disable=expression-not-assigned
+    p.run()
+
+    self.assertEqual(
+        sys.version_info[0] > 2,
+        remote_runner.job.options.view_as(DebugOptions).lookup_experiment(
+            'use_fastavro', False))
+
+  def test_use_fastavro_experiment_is_not_added_when_use_avro_is_present(self):
+    remote_runner = DataflowRunner()
+    self.default_properties.append('--experiment=use_avro')
+
+    p = Pipeline(remote_runner, PipelineOptions(self.default_properties))
+    p | ptransform.Create([1])  # pylint: disable=expression-not-assigned
+    p.run()
+
+    debug_options = remote_runner.job.options.view_as(DebugOptions)
+
+    self.assertFalse(debug_options.lookup_experiment('use_fastavro', False))
+
 
 if __name__ == '__main__':
   unittest.main()