You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/07/20 01:58:10 UTC

[1/2] incubator-beam git commit: Closes #686

Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk b75c0c0d4 -> ce29ac7ab


Closes #686


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ce29ac7a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ce29ac7a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ce29ac7a

Branch: refs/heads/python-sdk
Commit: ce29ac7abd3bdcfe3103fe694cb33171295bf35e
Parents: b75c0c0 e3b5302
Author: Dan Halperin <dh...@google.com>
Authored: Tue Jul 19 18:58:03 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Jul 19 18:58:03 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/iobase.py            |  6 ++++++
 .../apache_beam/runners/dataflow_runner.py      | 18 +++++++++--------
 sdks/python/apache_beam/runners/runner_test.py  | 21 ++++++++++++++++++++
 .../python/apache_beam/transforms/ptransform.py | 21 ++++++++++++++++++++
 4 files changed, 58 insertions(+), 8 deletions(-)
----------------------------------------------------------------------



[2/2] incubator-beam git commit: Check type of coder for step feeding into GroupByKey in Dataflow runner

Posted by dh...@apache.org.
Check type of coder for step feeding into GroupByKey in Dataflow runner


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e3b53028
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e3b53028
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e3b53028

Branch: refs/heads/python-sdk
Commit: e3b53028457f3a1969bd3bd0a98d5def50de9335
Parents: b75c0c0
Author: Charles Chen <cc...@google.com>
Authored: Mon Jul 18 17:54:34 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Jul 19 18:58:03 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/iobase.py            |  6 ++++++
 .../apache_beam/runners/dataflow_runner.py      | 18 +++++++++--------
 sdks/python/apache_beam/runners/runner_test.py  | 21 ++++++++++++++++++++
 .../python/apache_beam/transforms/ptransform.py | 21 ++++++++++++++++++++
 4 files changed, 58 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3b53028/sdks/python/apache_beam/io/iobase.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py
index 71ef46b..de5e9d4 100644
--- a/sdks/python/apache_beam/io/iobase.py
+++ b/sdks/python/apache_beam/io/iobase.py
@@ -938,6 +938,12 @@ class Read(ptransform.PTransform):
   def get_windowing(self, unused_inputs):
     return core.Windowing(window.GlobalWindows())
 
+  def _infer_output_coder(self, input_type=None, input_coder=None):
+    if isinstance(self.source, BoundedSource):
+      return self.source.default_output_coder()
+    else:
+      return self.source.coder
+
 
 class Write(ptransform.PTransform):
   """A ``PTransform`` that writes to a sink.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3b53028/sdks/python/apache_beam/runners/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow_runner.py
index 4fafc9f..f794c8b 100644
--- a/sdks/python/apache_beam/runners/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow_runner.py
@@ -30,7 +30,6 @@ import time
 from apache_beam import coders
 from apache_beam import pvalue
 from apache_beam.internal import pickler
-from apache_beam.io import iobase
 from apache_beam.pvalue import PCollectionView
 from apache_beam.runners.runner import PipelineResult
 from apache_beam.runners.runner import PipelineRunner
@@ -326,7 +325,15 @@ class DataflowPipelineRunner(PipelineRunner):
           PropertyNames.OUTPUT_NAME: PropertyNames.OUT}])
 
   def apply_GroupByKey(self, transform, pcoll):
-    coder = self._get_coder(pcoll.element_type or typehints.Any, None)
+    # Infer coder of parent.
+    #
+    # TODO(ccy): make Coder inference and checking less specialized and more
+    # comprehensive.
+    parent = pcoll.producer
+    if parent:
+      coder = parent.transform._infer_output_coder()  # pylint: disable=protected-access
+    if not coder:
+      coder = self._get_coder(pcoll.element_type or typehints.Any, None)
     if not coder.is_kv_coder():
       raise ValueError(('Coder for the GroupByKey operation "%s" is not a '
                         'key-value coder: %s.') % (transform.label,
@@ -525,16 +532,11 @@ class DataflowPipelineRunner(PipelineRunner):
     else:
       step.add_property(PropertyNames.FORMAT, transform.source.format)
 
-    if isinstance(transform.source, iobase.BoundedSource):
-      coder = transform.source.default_output_coder()
-    else:
-      coder = transform.source.coder
-
     # Wrap coder in WindowedValueCoder: this is necessary as the encoding of a
     # step should be the type of value outputted by each step.  Read steps
     # automatically wrap output values in a WindowedValue wrapper, if necessary.
     # This is also necessary for proper encoding for size estimation.
-    coder = coders.WindowedValueCoder(coder)
+    coder = coders.WindowedValueCoder(transform._infer_output_coder())  # pylint: disable=protected-access
 
     step.encoding = self._get_cloud_encoding(coder)
     step.add_property(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3b53028/sdks/python/apache_beam/runners/runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/runner_test.py b/sdks/python/apache_beam/runners/runner_test.py
index d2e70d7..2863756 100644
--- a/sdks/python/apache_beam/runners/runner_test.py
+++ b/sdks/python/apache_beam/runners/runner_test.py
@@ -24,6 +24,8 @@ caching and clearing values that are not tested elsewhere.
 
 import unittest
 
+import apache_beam as beam
+
 from apache_beam.internal import apiclient
 from apache_beam.pipeline import Pipeline
 from apache_beam.runners import create_runner
@@ -64,6 +66,25 @@ class RunnerTest(unittest.TestCase):
     remote_runner.job = apiclient.Job(p.options)
     super(DataflowPipelineRunner, remote_runner).run(p)
 
+  def test_no_group_by_key_directly_after_bigquery(self):
+    remote_runner = DataflowPipelineRunner()
+    p = Pipeline(remote_runner,
+                 options=PipelineOptions([
+                     '--dataflow_endpoint=ignored',
+                     '--job_name=test-job',
+                     '--project=test-project',
+                     '--staging_location=ignored',
+                     '--temp_location=/dev/null',
+                     '--no_auth=True'
+                 ]))
+    rows = p | beam.io.Read('read',
+                            beam.io.BigQuerySource('dataset.faketable'))
+    with self.assertRaises(ValueError,
+                           msg=('Coder for the GroupByKey operation'
+                                '"GroupByKey" is not a key-value coder: '
+                                'RowAsDictJsonCoder')):
+      unused_invalid = rows | beam.GroupByKey()
+
 
 if __name__ == '__main__':
   unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3b53028/sdks/python/apache_beam/transforms/ptransform.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py
index bde05b5..da8b671 100644
--- a/sdks/python/apache_beam/transforms/ptransform.py
+++ b/sdks/python/apache_beam/transforms/ptransform.py
@@ -330,6 +330,27 @@ class PTransform(WithTypeHints):
                 input_or_output.title(), self.label, at_context, hint,
                 pvalue_.element_type))
 
+  def _infer_output_coder(self, input_type=None, input_coder=None):
+    """Returns the output coder to use for output of this transform.
+
+    Note: this API is experimental and is subject to change; please do not rely
+    on behavior induced by this method.
+
+    The Coder returned here should not be wrapped in a WindowedValueCoder
+    wrapper.
+
+    Args:
+      input_type: An instance of an allowed built-in type, a custom class, or a
+        typehints.TypeConstraint for the input type, or None if not available.
+      input_coder: Coder object for encoding input to this PTransform, or None
+        if not available.
+
+    Returns:
+      Coder object for encoding output of this PTransform or None if unknown.
+    """
+    # TODO(ccy): further refine this API.
+    return None
+
   def clone(self, new_label):
     """Clones the current transform instance under a new label."""
     transform = copy.copy(self)