You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/07/20 01:58:10 UTC
[1/2] incubator-beam git commit: Closes #686
Repository: incubator-beam
Updated Branches:
refs/heads/python-sdk b75c0c0d4 -> ce29ac7ab
Closes #686
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ce29ac7a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ce29ac7a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ce29ac7a
Branch: refs/heads/python-sdk
Commit: ce29ac7abd3bdcfe3103fe694cb33171295bf35e
Parents: b75c0c0 e3b5302
Author: Dan Halperin <dh...@google.com>
Authored: Tue Jul 19 18:58:03 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Jul 19 18:58:03 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/io/iobase.py | 6 ++++++
.../apache_beam/runners/dataflow_runner.py | 18 +++++++++--------
sdks/python/apache_beam/runners/runner_test.py | 21 ++++++++++++++++++++
.../python/apache_beam/transforms/ptransform.py | 21 ++++++++++++++++++++
4 files changed, 58 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
[2/2] incubator-beam git commit: Check type of coder for step feeding
into GroupByKey in Dataflow runner
Posted by dh...@apache.org.
Check type of coder for step feeding into GroupByKey in Dataflow runner
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e3b53028
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e3b53028
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e3b53028
Branch: refs/heads/python-sdk
Commit: e3b53028457f3a1969bd3bd0a98d5def50de9335
Parents: b75c0c0
Author: Charles Chen <cc...@google.com>
Authored: Mon Jul 18 17:54:34 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Jul 19 18:58:03 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/io/iobase.py | 6 ++++++
.../apache_beam/runners/dataflow_runner.py | 18 +++++++++--------
sdks/python/apache_beam/runners/runner_test.py | 21 ++++++++++++++++++++
.../python/apache_beam/transforms/ptransform.py | 21 ++++++++++++++++++++
4 files changed, 58 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3b53028/sdks/python/apache_beam/io/iobase.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py
index 71ef46b..de5e9d4 100644
--- a/sdks/python/apache_beam/io/iobase.py
+++ b/sdks/python/apache_beam/io/iobase.py
@@ -938,6 +938,12 @@ class Read(ptransform.PTransform):
def get_windowing(self, unused_inputs):
return core.Windowing(window.GlobalWindows())
+ def _infer_output_coder(self, input_type=None, input_coder=None):
+ if isinstance(self.source, BoundedSource):
+ return self.source.default_output_coder()
+ else:
+ return self.source.coder
+
class Write(ptransform.PTransform):
"""A ``PTransform`` that writes to a sink.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3b53028/sdks/python/apache_beam/runners/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow_runner.py
index 4fafc9f..f794c8b 100644
--- a/sdks/python/apache_beam/runners/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow_runner.py
@@ -30,7 +30,6 @@ import time
from apache_beam import coders
from apache_beam import pvalue
from apache_beam.internal import pickler
-from apache_beam.io import iobase
from apache_beam.pvalue import PCollectionView
from apache_beam.runners.runner import PipelineResult
from apache_beam.runners.runner import PipelineRunner
@@ -326,7 +325,15 @@ class DataflowPipelineRunner(PipelineRunner):
PropertyNames.OUTPUT_NAME: PropertyNames.OUT}])
def apply_GroupByKey(self, transform, pcoll):
- coder = self._get_coder(pcoll.element_type or typehints.Any, None)
+ # Infer coder of parent.
+ #
+ # TODO(ccy): make Coder inference and checking less specialized and more
+ # comprehensive.
+ parent = pcoll.producer
+ if parent:
+ coder = parent.transform._infer_output_coder() # pylint: disable=protected-access
+ if not coder:
+ coder = self._get_coder(pcoll.element_type or typehints.Any, None)
if not coder.is_kv_coder():
raise ValueError(('Coder for the GroupByKey operation "%s" is not a '
'key-value coder: %s.') % (transform.label,
@@ -525,16 +532,11 @@ class DataflowPipelineRunner(PipelineRunner):
else:
step.add_property(PropertyNames.FORMAT, transform.source.format)
- if isinstance(transform.source, iobase.BoundedSource):
- coder = transform.source.default_output_coder()
- else:
- coder = transform.source.coder
-
# Wrap coder in WindowedValueCoder: this is necessary as the encoding of a
# step should be the type of value outputted by each step. Read steps
# automatically wrap output values in a WindowedValue wrapper, if necessary.
# This is also necessary for proper encoding for size estimation.
- coder = coders.WindowedValueCoder(coder)
+ coder = coders.WindowedValueCoder(transform._infer_output_coder()) # pylint: disable=protected-access
step.encoding = self._get_cloud_encoding(coder)
step.add_property(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3b53028/sdks/python/apache_beam/runners/runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/runner_test.py b/sdks/python/apache_beam/runners/runner_test.py
index d2e70d7..2863756 100644
--- a/sdks/python/apache_beam/runners/runner_test.py
+++ b/sdks/python/apache_beam/runners/runner_test.py
@@ -24,6 +24,8 @@ caching and clearing values that are not tested elsewhere.
import unittest
+import apache_beam as beam
+
from apache_beam.internal import apiclient
from apache_beam.pipeline import Pipeline
from apache_beam.runners import create_runner
@@ -64,6 +66,25 @@ class RunnerTest(unittest.TestCase):
remote_runner.job = apiclient.Job(p.options)
super(DataflowPipelineRunner, remote_runner).run(p)
+ def test_no_group_by_key_directly_after_bigquery(self):
+ remote_runner = DataflowPipelineRunner()
+ p = Pipeline(remote_runner,
+ options=PipelineOptions([
+ '--dataflow_endpoint=ignored',
+ '--job_name=test-job',
+ '--project=test-project',
+ '--staging_location=ignored',
+ '--temp_location=/dev/null',
+ '--no_auth=True'
+ ]))
+ rows = p | beam.io.Read('read',
+ beam.io.BigQuerySource('dataset.faketable'))
+ with self.assertRaises(ValueError,
+ msg=('Coder for the GroupByKey operation'
+ '"GroupByKey" is not a key-value coder: '
+ 'RowAsDictJsonCoder')):
+ unused_invalid = rows | beam.GroupByKey()
+
if __name__ == '__main__':
unittest.main()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3b53028/sdks/python/apache_beam/transforms/ptransform.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py
index bde05b5..da8b671 100644
--- a/sdks/python/apache_beam/transforms/ptransform.py
+++ b/sdks/python/apache_beam/transforms/ptransform.py
@@ -330,6 +330,27 @@ class PTransform(WithTypeHints):
input_or_output.title(), self.label, at_context, hint,
pvalue_.element_type))
+ def _infer_output_coder(self, input_type=None, input_coder=None):
+ """Returns the output coder to use for output of this transform.
+
+ Note: this API is experimental and is subject to change; please do not rely
+ on behavior induced by this method.
+
+ The Coder returned here should not be wrapped in a WindowedValueCoder
+ wrapper.
+
+ Args:
+ input_type: An instance of an allowed built-in type, a custom class, or a
+ typehints.TypeConstraint for the input type, or None if not available.
+ input_coder: Coder object for encoding input to this PTransform, or None
+ if not available.
+
+ Returns:
+ Coder object for encoding output of this PTransform or None if unknown.
+ """
+ # TODO(ccy): further refine this API.
+ return None
+
def clone(self, new_label):
"""Clones the current transform instance under a new label."""
transform = copy.copy(self)