You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/07/19 00:46:44 UTC
[1/2] incubator-beam git commit: Closes #676
Repository: incubator-beam
Updated Branches:
refs/heads/python-sdk 5daab7fb3 -> ad7c216f4
Closes #676
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ad7c216f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ad7c216f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ad7c216f
Branch: refs/heads/python-sdk
Commit: ad7c216f4949cb098ab7eacaefc68c22f6a16bad
Parents: 5daab7f e68eb05
Author: Robert Bradshaw <ro...@google.com>
Authored: Mon Jul 18 17:46:16 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Mon Jul 18 17:46:16 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/dataflow_test.py | 5 +++--
sdks/python/apache_beam/pipeline.py | 22 +++-----------------
sdks/python/apache_beam/pipeline_test.py | 18 ----------------
.../python/apache_beam/transforms/ptransform.py | 2 ++
4 files changed, 8 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
[2/2] incubator-beam git commit: Remove pipeline.apply(pvalue,
callable)
Posted by ro...@apache.org.
Remove pipeline.apply(pvalue, callable)
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e68eb05e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e68eb05e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e68eb05e
Branch: refs/heads/python-sdk
Commit: e68eb05e5ffa366f83478807bbac3af83ea8cda5
Parents: 5daab7f
Author: Robert Bradshaw <ro...@google.com>
Authored: Mon Jul 18 10:23:24 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Mon Jul 18 17:46:16 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/dataflow_test.py | 5 +++--
sdks/python/apache_beam/pipeline.py | 22 +++-----------------
sdks/python/apache_beam/pipeline_test.py | 18 ----------------
.../python/apache_beam/transforms/ptransform.py | 2 ++
4 files changed, 8 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e68eb05e/sdks/python/apache_beam/dataflow_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/dataflow_test.py b/sdks/python/apache_beam/dataflow_test.py
index c4933af..9bbb5ff 100644
--- a/sdks/python/apache_beam/dataflow_test.py
+++ b/sdks/python/apache_beam/dataflow_test.py
@@ -23,6 +23,7 @@ import logging
import re
import unittest
+import apache_beam as beam
from apache_beam.pipeline import Pipeline
from apache_beam.pvalue import AsDict
from apache_beam.pvalue import AsIter as AllOf
@@ -51,8 +52,8 @@ class DataflowTest(unittest.TestCase):
# TODO(silviuc): Figure out a nice way to specify labels for stages so that
# internal steps get prepended with surorunding stage names.
- @staticmethod
- def Count(pcoll): # pylint: disable=invalid-name
+ @beam.ptransform_fn
+ def Count(pcoll): # pylint: disable=invalid-name, no-self-argument
"""A Count transform: v, ... => (v, n), ..."""
return (pcoll
| Map('AddCount', lambda x: (x, 1))
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e68eb05e/sdks/python/apache_beam/pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index 012d4d9..bc1feb2 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -47,7 +47,6 @@ import logging
import os
import shutil
import tempfile
-import types
from apache_beam import pvalue
from apache_beam import typehints
@@ -187,20 +186,17 @@ class Pipeline(object):
"""Applies a custom transform using the pvalueish specified.
Args:
- transform: the PTranform (or callable) to apply.
+ transform: the PTranform to apply.
pvalueish: the input for the PTransform (typically a PCollection).
Raises:
TypeError: if the transform object extracted from the argument list is
- not a callable type or a descendant from PTransform.
+ not a PTransform.
RuntimeError: if the transform object was already applied to this pipeline
and needs to be cloned in order to apply again.
"""
if not isinstance(transform, ptransform.PTransform):
- if isinstance(transform, (type, types.ClassType)):
- raise TypeError("%s is not a PTransform instance, did you mean %s()?"
- % (transform, transform.__name__))
- transform = _CallableWrapperPTransform(transform)
+ raise TypeError("Expected a PTransform object, got %s" % transform)
full_label = format_full_label(self._current_transform(), transform)
if full_label in self.applied_labels:
@@ -286,18 +282,6 @@ class Pipeline(object):
return pvalueish_result
-class _CallableWrapperPTransform(ptransform.PTransform):
-
- def __init__(self, callee):
- assert callable(callee)
- super(_CallableWrapperPTransform, self).__init__(
- label=getattr(callee, '__name__', 'Callable'))
- self._callee = callee
-
- def apply(self, *args, **kwargs):
- return self._callee(*args, **kwargs)
-
-
class PipelineVisitor(object):
"""Visitor pattern class used to traverse a DAG of transforms.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e68eb05e/sdks/python/apache_beam/pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py
index 8598737..04cd2ee 100644
--- a/sdks/python/apache_beam/pipeline_test.py
+++ b/sdks/python/apache_beam/pipeline_test.py
@@ -32,7 +32,6 @@ from apache_beam.transforms import Create
from apache_beam.transforms import FlatMap
from apache_beam.transforms import Flatten
from apache_beam.transforms import Map
-from apache_beam.transforms import GroupByKey
from apache_beam.transforms import PTransform
from apache_beam.transforms import Read
from apache_beam.transforms.util import assert_that, equal_to
@@ -172,23 +171,6 @@ class PipelineTest(unittest.TestCase):
assert_that(result2, equal_to([5, 6, 7]), label='r2')
pipeline.run()
- def test_apply_custom_callable(self):
- pipeline = Pipeline(self.runner_name)
- pcoll = pipeline | Create('pcoll', [1, 2, 3])
- result = pcoll | PipelineTest.custom_callable
- assert_that(result, equal_to([2, 3, 4]))
- pipeline.run()
-
- def test_apply_custom_callable_error(self):
- pipeline = Pipeline(self.runner_name)
- pcoll = pipeline | Create('pcoll', [1, 2, 3])
- with self.assertRaises(TypeError) as cm:
- pcoll | GroupByKey # Note the missing ()'s
- self.assertEqual(
- cm.exception.message,
- "<class 'apache_beam.transforms.core.GroupByKey'> is not "
- "a PTransform instance, did you mean GroupByKey()?")
-
def test_transform_no_super_init(self):
class AddSuffix(PTransform):
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e68eb05e/sdks/python/apache_beam/transforms/ptransform.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py
index 04dd9b3..1457bec 100644
--- a/sdks/python/apache_beam/transforms/ptransform.py
+++ b/sdks/python/apache_beam/transforms/ptransform.py
@@ -601,6 +601,8 @@ class CallablePTransform(PTransform):
# is called (and __call__ invoked) we will have all the information
# needed to initialize the super class.
self.fn = fn
+ self._args = ()
+ self._kwargs = {}
def __call__(self, *args, **kwargs):
if args and args[0] is None: