You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/07/19 00:46:44 UTC

[1/2] incubator-beam git commit: Closes #676

Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 5daab7fb3 -> ad7c216f4


Closes #676


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ad7c216f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ad7c216f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ad7c216f

Branch: refs/heads/python-sdk
Commit: ad7c216f4949cb098ab7eacaefc68c22f6a16bad
Parents: 5daab7f e68eb05
Author: Robert Bradshaw <ro...@google.com>
Authored: Mon Jul 18 17:46:16 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Mon Jul 18 17:46:16 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/dataflow_test.py        |  5 +++--
 sdks/python/apache_beam/pipeline.py             | 22 +++-----------------
 sdks/python/apache_beam/pipeline_test.py        | 18 ----------------
 .../python/apache_beam/transforms/ptransform.py |  2 ++
 4 files changed, 8 insertions(+), 39 deletions(-)
----------------------------------------------------------------------



[2/2] incubator-beam git commit: Remove pipeline.apply(pvalue, callable)

Posted by ro...@apache.org.
Remove pipeline.apply(pvalue, callable)


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e68eb05e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e68eb05e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e68eb05e

Branch: refs/heads/python-sdk
Commit: e68eb05e5ffa366f83478807bbac3af83ea8cda5
Parents: 5daab7f
Author: Robert Bradshaw <ro...@google.com>
Authored: Mon Jul 18 10:23:24 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Mon Jul 18 17:46:16 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/dataflow_test.py        |  5 +++--
 sdks/python/apache_beam/pipeline.py             | 22 +++-----------------
 sdks/python/apache_beam/pipeline_test.py        | 18 ----------------
 .../python/apache_beam/transforms/ptransform.py |  2 ++
 4 files changed, 8 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e68eb05e/sdks/python/apache_beam/dataflow_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/dataflow_test.py b/sdks/python/apache_beam/dataflow_test.py
index c4933af..9bbb5ff 100644
--- a/sdks/python/apache_beam/dataflow_test.py
+++ b/sdks/python/apache_beam/dataflow_test.py
@@ -23,6 +23,7 @@ import logging
 import re
 import unittest
 
+import apache_beam as beam
 from apache_beam.pipeline import Pipeline
 from apache_beam.pvalue import AsDict
 from apache_beam.pvalue import AsIter as AllOf
@@ -51,8 +52,8 @@ class DataflowTest(unittest.TestCase):
 
   # TODO(silviuc): Figure out a nice way to specify labels for stages so that
   # internal steps get prepended with surorunding stage names.
-  @staticmethod
-  def Count(pcoll):  # pylint: disable=invalid-name
+  @beam.ptransform_fn
+  def Count(pcoll):  # pylint: disable=invalid-name, no-self-argument
     """A Count transform: v, ... => (v, n), ..."""
     return (pcoll
             | Map('AddCount', lambda x: (x, 1))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e68eb05e/sdks/python/apache_beam/pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index 012d4d9..bc1feb2 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -47,7 +47,6 @@ import logging
 import os
 import shutil
 import tempfile
-import types
 
 from apache_beam import pvalue
 from apache_beam import typehints
@@ -187,20 +186,17 @@ class Pipeline(object):
     """Applies a custom transform using the pvalueish specified.
 
     Args:
-      transform: the PTranform (or callable) to apply.
+      transform: the PTranform to apply.
       pvalueish: the input for the PTransform (typically a PCollection).
 
     Raises:
       TypeError: if the transform object extracted from the argument list is
-        not a callable type or a descendant from PTransform.
+        not a PTransform.
       RuntimeError: if the transform object was already applied to this pipeline
         and needs to be cloned in order to apply again.
     """
     if not isinstance(transform, ptransform.PTransform):
-      if isinstance(transform, (type, types.ClassType)):
-        raise TypeError("%s is not a PTransform instance, did you mean %s()?"
-                        % (transform, transform.__name__))
-      transform = _CallableWrapperPTransform(transform)
+      raise TypeError("Expected a PTransform object, got %s" % transform)
 
     full_label = format_full_label(self._current_transform(), transform)
     if full_label in self.applied_labels:
@@ -286,18 +282,6 @@ class Pipeline(object):
     return pvalueish_result
 
 
-class _CallableWrapperPTransform(ptransform.PTransform):
-
-  def __init__(self, callee):
-    assert callable(callee)
-    super(_CallableWrapperPTransform, self).__init__(
-        label=getattr(callee, '__name__', 'Callable'))
-    self._callee = callee
-
-  def apply(self, *args, **kwargs):
-    return self._callee(*args, **kwargs)
-
-
 class PipelineVisitor(object):
   """Visitor pattern class used to traverse a DAG of transforms.
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e68eb05e/sdks/python/apache_beam/pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py
index 8598737..04cd2ee 100644
--- a/sdks/python/apache_beam/pipeline_test.py
+++ b/sdks/python/apache_beam/pipeline_test.py
@@ -32,7 +32,6 @@ from apache_beam.transforms import Create
 from apache_beam.transforms import FlatMap
 from apache_beam.transforms import Flatten
 from apache_beam.transforms import Map
-from apache_beam.transforms import GroupByKey
 from apache_beam.transforms import PTransform
 from apache_beam.transforms import Read
 from apache_beam.transforms.util import assert_that, equal_to
@@ -172,23 +171,6 @@ class PipelineTest(unittest.TestCase):
     assert_that(result2, equal_to([5, 6, 7]), label='r2')
     pipeline.run()
 
-  def test_apply_custom_callable(self):
-    pipeline = Pipeline(self.runner_name)
-    pcoll = pipeline | Create('pcoll', [1, 2, 3])
-    result = pcoll | PipelineTest.custom_callable
-    assert_that(result, equal_to([2, 3, 4]))
-    pipeline.run()
-
-  def test_apply_custom_callable_error(self):
-    pipeline = Pipeline(self.runner_name)
-    pcoll = pipeline | Create('pcoll', [1, 2, 3])
-    with self.assertRaises(TypeError) as cm:
-      pcoll | GroupByKey  # Note the missing ()'s
-    self.assertEqual(
-        cm.exception.message,
-        "<class 'apache_beam.transforms.core.GroupByKey'> is not "
-        "a PTransform instance, did you mean GroupByKey()?")
-
   def test_transform_no_super_init(self):
     class AddSuffix(PTransform):
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e68eb05e/sdks/python/apache_beam/transforms/ptransform.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py
index 04dd9b3..1457bec 100644
--- a/sdks/python/apache_beam/transforms/ptransform.py
+++ b/sdks/python/apache_beam/transforms/ptransform.py
@@ -601,6 +601,8 @@ class CallablePTransform(PTransform):
     # is called (and __call__ invoked) we will have all the information
     # needed to initialize the super class.
     self.fn = fn
+    self._args = ()
+    self._kwargs = {}
 
   def __call__(self, *args, **kwargs):
     if args and args[0] is None: