You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/05/04 19:53:39 UTC

[14/50] [abbrv] beam git commit: [BEAM-1316] Remove the usage of mock from ptransform tests

[BEAM-1316] Remove the usage of mock from ptransform tests


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ab55ef3b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ab55ef3b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ab55ef3b

Branch: refs/heads/gearpump-runner
Commit: ab55ef3b4248789aa4c9eaf4a6dab7262d673819
Parents: 87a12af
Author: Sourabh Bajaj <so...@google.com>
Authored: Tue May 2 11:48:19 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Tue May 2 13:59:09 2017 -0700

----------------------------------------------------------------------
 .../apache_beam/transforms/ptransform_test.py   | 60 +++++++++++++-------
 1 file changed, 40 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/ab55ef3b/sdks/python/apache_beam/transforms/ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
index 80c9768..46c340c 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -22,7 +22,6 @@ from __future__ import absolute_import
 import operator
 import re
 import unittest
-import mock
 
 import hamcrest as hc
 from nose.plugins.attrib import attr
@@ -47,16 +46,6 @@ from apache_beam.utils.pipeline_options import TypeOptions
 # Disable frequent lint warning due to pipe operator for chaining transforms.
 # pylint: disable=expression-not-assigned
 
-class MyDoFn(beam.DoFn):
-  def start_bundle(self):
-    pass
-
-  def process(self, element):
-    pass
-
-  def finish_bundle(self):
-    yield 'finish'
-
 
 class PTransformTest(unittest.TestCase):
   # Enable nose tests running in parallel
@@ -286,6 +275,13 @@ class PTransformTest(unittest.TestCase):
     self.assertStartswith(cm.exception.message, expected_error_prefix)
 
   def test_do_fn_with_finish(self):
+    class MyDoFn(beam.DoFn):
+      def process(self, element):
+        pass
+
+      def finish_bundle(self):
+        yield 'finish'
+
     pipeline = TestPipeline()
     pcoll = pipeline | 'Start' >> beam.Create([1, 2, 3])
     result = pcoll | 'Do' >> beam.ParDo(MyDoFn())
@@ -300,22 +296,46 @@ class PTransformTest(unittest.TestCase):
     assert_that(result, matcher())
     pipeline.run()
 
-  @mock.patch.object(MyDoFn, 'start_bundle')
-  def test_do_fn_with_start(self, mock_method):
-    mock_method.return_value = None
+  def test_do_fn_with_start(self):
+    class MyDoFn(beam.DoFn):
+      def __init__(self):
+        self.state = 'init'
+
+      def start_bundle(self):
+        self.state = 'started'
+        return None
+
+      def process(self, element):
+        if self.state == 'started':
+          yield 'started'
+        self.state = 'process'
+
     pipeline = TestPipeline()
-    pipeline | 'Start' >> beam.Create([1, 2, 3]) | 'Do' >> beam.ParDo(MyDoFn())
+    pcoll = pipeline | 'Start' >> beam.Create([1, 2, 3])
+    result = pcoll | 'Do' >> beam.ParDo(MyDoFn())
+
+    # May have many bundles, but each has a start and finish.
+    def  matcher():
+      def match(actual):
+        equal_to(['started'])(list(set(actual)))
+        equal_to([1])([actual.count('started')])
+      return match
+
+    assert_that(result, matcher())
     pipeline.run()
-    self.assertTrue(mock_method.called)
 
-  @mock.patch.object(MyDoFn, 'start_bundle')
-  def test_do_fn_with_start_error(self, mock_method):
-    mock_method.return_value = [1]
+  def test_do_fn_with_start_error(self):
+    class MyDoFn(beam.DoFn):
+      def start_bundle(self):
+        return [1]
+
+      def process(self, element):
+        pass
+
     pipeline = TestPipeline()
     pipeline | 'Start' >> beam.Create([1, 2, 3]) | 'Do' >> beam.ParDo(MyDoFn())
     with self.assertRaises(RuntimeError):
       pipeline.run()
-    self.assertTrue(mock_method.called)
 
   def test_filter(self):
     pipeline = TestPipeline()