You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/05/09 20:12:46 UTC
[2/3] beam git commit: [BEAM-1925] validate DoFn at pipeline creation
time
[BEAM-1925] validate DoFn at pipeline creation time
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/15ef41c0
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/15ef41c0
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/15ef41c0
Branch: refs/heads/release-2.0.0
Commit: 15ef41c007ec5a5847d397627d25a2e2e7bee4a2
Parents: 9010314
Author: Sourabh Bajaj <so...@google.com>
Authored: Mon May 8 13:33:15 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Tue May 9 13:11:59 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/runners/common.py | 25 ++++-----
sdks/python/apache_beam/runners/common_test.py | 58 ++++++++++++++++++++
sdks/python/apache_beam/transforms/core.py | 9 ++-
.../apache_beam/transforms/ptransform_test.py | 1 -
4 files changed, 78 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/15ef41c0/sdks/python/apache_beam/runners/common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py
index 045c109..74c61ab 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -95,17 +95,20 @@ class DoFnSignature(object):
self._validate()
def _validate(self):
+ self._validate_process()
self._validate_bundle_method(self.start_bundle_method)
self._validate_bundle_method(self.finish_bundle_method)
- def _validate_bundle_method(self, method_wrapper):
- # Here we use the fact that every DoFn parameter defined in core.DoFn has
- # the value that is the same as the name of the parameter and ends with
- # string 'Param'.
- unsupported_dofn_params = [i for i in core.DoFn.__dict__ if
- i.endswith('Param')]
+ def _validate_process(self):
+ """Validate that none of the DoFnParameters are repeated in the function
+ """
+ for param in core.DoFn.DoFnParams:
+ assert self.process_method.defaults.count(param) <= 1
- for param in unsupported_dofn_params:
+ def _validate_bundle_method(self, method_wrapper):
+ """Validate that none of the DoFnParameters are used in the function
+ """
+ for param in core.DoFn.DoFnParams:
assert param not in method_wrapper.defaults
@@ -156,18 +159,14 @@ class DoFnInvoker(object):
def invoke_start_bundle(self):
"""Invokes the DoFn.start_bundle() method.
"""
- args_for_start_bundle = self.signature.start_bundle_method.defaults
self.output_processor.start_bundle_outputs(
- self.signature.start_bundle_method.method_value(
- *args_for_start_bundle))
+ self.signature.start_bundle_method.method_value())
def invoke_finish_bundle(self):
"""Invokes the DoFn.finish_bundle() method.
"""
- args_for_finish_bundle = self.signature.finish_bundle_method.defaults
self.output_processor.finish_bundle_outputs(
- self.signature.finish_bundle_method.method_value(
- *args_for_finish_bundle))
+ self.signature.finish_bundle_method.method_value())
class SimpleInvoker(DoFnInvoker):
http://git-wip-us.apache.org/repos/asf/beam/blob/15ef41c0/sdks/python/apache_beam/runners/common_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/common_test.py b/sdks/python/apache_beam/runners/common_test.py
new file mode 100644
index 0000000..62a6955
--- /dev/null
+++ b/sdks/python/apache_beam/runners/common_test.py
@@ -0,0 +1,58 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import unittest
+
+from apache_beam.transforms.core import DoFn
+from apache_beam.runners.common import DoFnSignature
+
+
+class DoFnSignatureTest(unittest.TestCase):
+
+ def test_dofn_validate_process_error(self):
+ class MyDoFn(DoFn):
+ def process(self, element, w1=DoFn.WindowParam, w2=DoFn.WindowParam):
+ pass
+
+ with self.assertRaises(AssertionError):
+ DoFnSignature(MyDoFn())
+
+ def test_dofn_validate_start_bundle_error(self):
+ class MyDoFn(DoFn):
+ def process(self, element):
+ pass
+
+ def start_bundle(self, w1=DoFn.WindowParam):
+ pass
+
+ with self.assertRaises(AssertionError):
+ DoFnSignature(MyDoFn())
+
+ def test_dofn_validate_finish_bundle_error(self):
+ class MyDoFn(DoFn):
+ def process(self, element):
+ pass
+
+ def finish_bundle(self, w1=DoFn.WindowParam):
+ pass
+
+ with self.assertRaises(AssertionError):
+ DoFnSignature(MyDoFn())
+
+
+if __name__ == '__main__':
+ unittest.main()
http://git-wip-us.apache.org/repos/asf/beam/blob/15ef41c0/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index 7ca1632..e37a387 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -29,7 +29,8 @@ from apache_beam.coders import typecoders
from apache_beam.internal import util
from apache_beam.runners.api import beam_runner_api_pb2
from apache_beam.transforms import ptransform
-from apache_beam.transforms.display import HasDisplayData, DisplayDataItem
+from apache_beam.transforms.display import DisplayDataItem
+from apache_beam.transforms.display import HasDisplayData
from apache_beam.transforms.ptransform import PTransform
from apache_beam.transforms.ptransform import PTransformWithSideInputs
from apache_beam.transforms.window import MIN_TIMESTAMP
@@ -131,6 +132,8 @@ class DoFn(WithTypeHints, HasDisplayData):
TimestampParam = 'TimestampParam'
WindowParam = 'WindowParam'
+ DoFnParams = [ElementParam, SideInputParam, TimestampParam, WindowParam]
+
@staticmethod
def from_callable(fn):
return CallableWrapperDoFn(fn)
@@ -596,6 +599,10 @@ class ParDo(PTransformWithSideInputs):
if not isinstance(self.fn, DoFn):
raise TypeError('ParDo must be called with a DoFn instance.')
+ # Validate the DoFn by creating a DoFnSignature
+ from apache_beam.runners.common import DoFnSignature
+ DoFnSignature(self.fn)
+
def default_type_hints(self):
return self.fn.get_type_hints()
http://git-wip-us.apache.org/repos/asf/beam/blob/15ef41c0/sdks/python/apache_beam/transforms/ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
index b8b0733..e712661 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -303,7 +303,6 @@ class PTransformTest(unittest.TestCase):
def start_bundle(self):
self.state = 'started'
- return None
def process(self, element):
if self.state == 'started':