You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/01/25 20:38:23 UTC

[2/2] beam git commit: Cleanup tests in pipeline_test.

Cleanup tests in pipeline_test.

Notably, the runner_name parameter has been obsolete since the removal
of DiskCachedRunnerPipelineTest and is an inferior version of what
TestPipeline provides.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/61d8d3f0
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/61d8d3f0
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/61d8d3f0

Branch: refs/heads/python-sdk
Commit: 61d8d3f0690142f6dc87b1484d3ebd148a706837
Parents: 9540cf1
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Sat Jan 21 21:07:39 2017 -0800
Committer: Robert Bradshaw <ro...@google.com>
Committed: Wed Jan 25 12:38:03 2017 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/pipeline_test.py | 57 ++++++++++-----------------
 1 file changed, 21 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/61d8d3f0/sdks/python/apache_beam/pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py
index 93b68d1..833293f 100644
--- a/sdks/python/apache_beam/pipeline_test.py
+++ b/sdks/python/apache_beam/pipeline_test.py
@@ -38,8 +38,8 @@ from apache_beam.transforms import Read
 from apache_beam.transforms import WindowInto
 from apache_beam.transforms.util import assert_that
 from apache_beam.transforms.util import equal_to
-from apache_beam.transforms.window import IntervalWindow
-from apache_beam.transforms.window import WindowFn
+from apache_beam.transforms.window import SlidingWindows
+from apache_beam.transforms.window import TimestampedValue
 from apache_beam.utils.timestamp import MIN_TIMESTAMP
 
 
@@ -70,9 +70,6 @@ class FakeSource(NativeSource):
 
 class PipelineTest(unittest.TestCase):
 
-  def setUp(self):
-    self.runner_name = 'DirectRunner'
-
   @staticmethod
   def custom_callable(pcoll):
     return pcoll | '+1' >> FlatMap(lambda x: [x + 1])
@@ -103,7 +100,7 @@ class PipelineTest(unittest.TestCase):
       self.leave_composite.append(transform_node)
 
   def test_create(self):
-    pipeline = TestPipeline(runner=self.runner_name)
+    pipeline = TestPipeline()
     pcoll = pipeline | 'label1' >> Create([1, 2, 3])
     assert_that(pcoll, equal_to([1, 2, 3]))
 
@@ -114,19 +111,19 @@ class PipelineTest(unittest.TestCase):
     pipeline.run()
 
   def test_create_singleton_pcollection(self):
-    pipeline = TestPipeline(runner=self.runner_name)
+    pipeline = TestPipeline()
     pcoll = pipeline | 'label' >> Create([[1, 2, 3]])
     assert_that(pcoll, equal_to([[1, 2, 3]]))
     pipeline.run()
 
   def test_read(self):
-    pipeline = TestPipeline(runner=self.runner_name)
+    pipeline = TestPipeline()
     pcoll = pipeline | 'read' >> Read(FakeSource([1, 2, 3]))
     assert_that(pcoll, equal_to([1, 2, 3]))
     pipeline.run()
 
   def test_visit_entire_graph(self):
-    pipeline = Pipeline(self.runner_name)
+    pipeline = Pipeline()
     pcoll1 = pipeline | 'pcoll' >> Create([1, 2, 3])
     pcoll2 = pcoll1 | 'do1' >> FlatMap(lambda x: [x + 1])
     pcoll3 = pcoll2 | 'do2' >> FlatMap(lambda x: [x + 1])
@@ -145,14 +142,14 @@ class PipelineTest(unittest.TestCase):
     self.assertEqual(visitor.leave_composite[0].transform, transform)
 
   def test_apply_custom_transform(self):
-    pipeline = TestPipeline(runner=self.runner_name)
+    pipeline = TestPipeline()
     pcoll = pipeline | 'pcoll' >> Create([1, 2, 3])
     result = pcoll | PipelineTest.CustomTransform()
     assert_that(result, equal_to([2, 3, 4]))
     pipeline.run()
 
   def test_reuse_custom_transform_instance(self):
-    pipeline = Pipeline(self.runner_name)
+    pipeline = Pipeline()
     pcoll1 = pipeline | 'pcoll1' >> Create([1, 2, 3])
     pcoll2 = pipeline | 'pcoll2' >> Create([4, 5, 6])
     transform = PipelineTest.CustomTransform()
@@ -167,7 +164,7 @@ class PipelineTest(unittest.TestCase):
         'pvalue | "label" >> transform')
 
   def test_reuse_cloned_custom_transform_instance(self):
-    pipeline = TestPipeline(runner=self.runner_name)
+    pipeline = TestPipeline()
     pcoll1 = pipeline | 'pc1' >> Create([1, 2, 3])
     pcoll2 = pipeline | 'pc2' >> Create([4, 5, 6])
     transform = PipelineTest.CustomTransform()
@@ -240,7 +237,7 @@ class PipelineTest(unittest.TestCase):
     def raise_exception(exn):
       raise exn
     with self.assertRaises(ValueError):
-      with Pipeline(self.runner_name) as p:
+      with Pipeline() as p:
         # pylint: disable=expression-not-assigned
         p | Create([ValueError]) | Map(raise_exception)
 
@@ -251,15 +248,12 @@ class PipelineTest(unittest.TestCase):
 
 class NewDoFnTest(unittest.TestCase):
 
-  def setUp(self):
-    self.runner_name = 'DirectRunner'
-
   def test_element(self):
     class TestDoFn(NewDoFn):
       def process(self, element):
         yield element + 10
 
-    pipeline = TestPipeline(runner=self.runner_name)
+    pipeline = TestPipeline()
     pcoll = pipeline | 'Create' >> Create([1, 2]) | 'Do' >> ParDo(TestDoFn())
     assert_that(pcoll, equal_to([11, 12]))
     pipeline.run()
@@ -269,7 +263,7 @@ class NewDoFnTest(unittest.TestCase):
       def process(self, element, context=NewDoFn.ContextParam):
         yield context.element + 10
 
-    pipeline = TestPipeline(runner=self.runner_name)
+    pipeline = TestPipeline()
     pcoll = pipeline | 'Create' >> Create([1, 2])| 'Do' >> ParDo(TestDoFn())
     assert_that(pcoll, equal_to([11, 12]))
     pipeline.run()
@@ -307,25 +301,16 @@ class NewDoFnTest(unittest.TestCase):
   def test_window_param(self):
     class TestDoFn(NewDoFn):
       def process(self, element, window=NewDoFn.WindowParam):
-        yield (float(window.start), float(window.end))
-
-    class TestWindowFn(WindowFn):
-      """Windowing function adding two disjoint windows to each element."""
-
-      def assign(self, assign_context):
-        _ = assign_context
-        return [IntervalWindow(10, 20), IntervalWindow(20, 30)]
+        yield (element, (float(window.start), float(window.end)))
 
-      def merge(self, existing_windows):
-        return existing_windows
-
-    pipeline = TestPipeline(runner=self.runner_name)
+    pipeline = TestPipeline()
     pcoll = (pipeline
-             | 'KVs' >> Create([(1, 10), (2, 20)])
-             | 'W' >> WindowInto(windowfn=TestWindowFn())
-             | 'Do' >> ParDo(TestDoFn()))
-    assert_that(pcoll, equal_to([(10.0, 20.0), (10.0, 20.0),
-                                 (20.0, 30.0), (20.0, 30.0)]))
+             | Create([1, 7])
+             | Map(lambda x: TimestampedValue(x, x))
+             | WindowInto(windowfn=SlidingWindows(10, 5))
+             | ParDo(TestDoFn()))
+    assert_that(pcoll, equal_to([(1, (-5, 5)), (1, (0, 10)),
+                                 (7, (0, 10)), (7, (5, 15))]))
     pipeline.run()
 
   def test_timestamp_param(self):
@@ -333,7 +318,7 @@ class NewDoFnTest(unittest.TestCase):
       def process(self, element, timestamp=NewDoFn.TimestampParam):
         yield timestamp
 
-    pipeline = TestPipeline(runner=self.runner_name)
+    pipeline = TestPipeline()
     pcoll = pipeline | 'Create' >> Create([1, 2]) | 'Do' >> ParDo(TestDoFn())
     assert_that(pcoll, equal_to([MIN_TIMESTAMP, MIN_TIMESTAMP]))
     pipeline.run()