You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2019/05/23 20:57:42 UTC
[beam] branch master updated: [BEAM-7203] Signal Dataflow workers
to use fastavro library on Python 3. (#8652)
This is an automated email from the ASF dual-hosted git repository.
chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 8d83576 [BEAM-7203] Signal Dataflow workers to use fastavro library on Python 3. (#8652)
8d83576 is described below
commit 8d83576df7a60f695fda55019b2965319de9a993
Author: tvalentyn <tv...@users.noreply.github.com>
AuthorDate: Thu May 23 13:57:18 2019 -0700
[BEAM-7203] Signal Dataflow workers to use fastavro library on Python 3. (#8652)
---
.../runners/dataflow/dataflow_runner.py | 9 ++++++++
.../runners/dataflow/dataflow_runner_test.py | 25 ++++++++++++++++++++++
2 files changed, 34 insertions(+)
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 1a037f6..ff058fc 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -25,6 +25,7 @@ from __future__ import division
import json
import logging
+import sys
import threading
import time
import traceback
@@ -431,6 +432,14 @@ class DataflowRunner(PipelineRunner):
else:
debug_options.add_experiment('use_staged_dataflow_worker_jar')
+ # Make Dataflow workers use FastAvro on Python 3 unless use_avro experiment
+ # is set. Note that use_avro is only interpreted by the Dataflow runner
+ # at job submission and is not interpreted by Dataflow service or workers,
+ # which by default use avro library unless use_fastavro experiment is set.
+ if sys.version_info[0] > 2 and (
+ not debug_options.lookup_experiment('use_avro')):
+ debug_options.add_experiment('use_fastavro')
+
self.job = apiclient.Job(options, self.proto_pipeline)
# Dataflow runner requires a KV type for GBK inputs, hence we enforce that
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
index c774b61..bbb2833 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
@@ -20,6 +20,7 @@
from __future__ import absolute_import
import json
+import sys
import unittest
from builtins import object
from builtins import range
@@ -444,6 +445,30 @@ class DataflowRunnerTest(unittest.TestCase):
self.assertIn('beam_fn_api', experiments_for_job)
self.assertIn('use_staged_dataflow_worker_jar', experiments_for_job)
+ def test_use_fastavro_experiment_is_added_on_py3_and_onwards(self):
+ remote_runner = DataflowRunner()
+
+ p = Pipeline(remote_runner, PipelineOptions(self.default_properties))
+ p | ptransform.Create([1]) # pylint: disable=expression-not-assigned
+ p.run()
+
+ self.assertEqual(
+ sys.version_info[0] > 2,
+ remote_runner.job.options.view_as(DebugOptions).lookup_experiment(
+ 'use_fastavro', False))
+
+ def test_use_fastavro_experiment_is_not_added_when_use_avro_is_present(self):
+ remote_runner = DataflowRunner()
+ self.default_properties.append('--experiment=use_avro')
+
+ p = Pipeline(remote_runner, PipelineOptions(self.default_properties))
+ p | ptransform.Create([1]) # pylint: disable=expression-not-assigned
+ p.run()
+
+ debug_options = remote_runner.job.options.view_as(DebugOptions)
+
+ self.assertFalse(debug_options.lookup_experiment('use_fastavro', False))
+
if __name__ == '__main__':
unittest.main()