You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/01/25 20:38:22 UTC
[1/2] beam git commit: Closes #1811
Repository: beam
Updated Branches:
refs/heads/python-sdk 9540cf176 -> 592422059
Closes #1811
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/59242205
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/59242205
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/59242205
Branch: refs/heads/python-sdk
Commit: 592422059e21bf72fc7b4842d6fd6df000a7d2a7
Parents: 9540cf1 61d8d3f
Author: Robert Bradshaw <ro...@google.com>
Authored: Wed Jan 25 12:38:03 2017 -0800
Committer: Robert Bradshaw <ro...@google.com>
Committed: Wed Jan 25 12:38:03 2017 -0800
----------------------------------------------------------------------
sdks/python/apache_beam/pipeline_test.py | 57 ++++++++++-----------------
1 file changed, 21 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
[2/2] beam git commit: Cleanup tests in pipeline_test.
Posted by ro...@apache.org.
Cleanup tests in pipeline_test.
Notably, the runner_name parameter has been obsolete since the removal
of DiskCachedRunnerPipelineTest and is an inferior version of what
TestPipeline provides.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/61d8d3f0
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/61d8d3f0
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/61d8d3f0
Branch: refs/heads/python-sdk
Commit: 61d8d3f0690142f6dc87b1484d3ebd148a706837
Parents: 9540cf1
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Sat Jan 21 21:07:39 2017 -0800
Committer: Robert Bradshaw <ro...@google.com>
Committed: Wed Jan 25 12:38:03 2017 -0800
----------------------------------------------------------------------
sdks/python/apache_beam/pipeline_test.py | 57 ++++++++++-----------------
1 file changed, 21 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/61d8d3f0/sdks/python/apache_beam/pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py
index 93b68d1..833293f 100644
--- a/sdks/python/apache_beam/pipeline_test.py
+++ b/sdks/python/apache_beam/pipeline_test.py
@@ -38,8 +38,8 @@ from apache_beam.transforms import Read
from apache_beam.transforms import WindowInto
from apache_beam.transforms.util import assert_that
from apache_beam.transforms.util import equal_to
-from apache_beam.transforms.window import IntervalWindow
-from apache_beam.transforms.window import WindowFn
+from apache_beam.transforms.window import SlidingWindows
+from apache_beam.transforms.window import TimestampedValue
from apache_beam.utils.timestamp import MIN_TIMESTAMP
@@ -70,9 +70,6 @@ class FakeSource(NativeSource):
class PipelineTest(unittest.TestCase):
- def setUp(self):
- self.runner_name = 'DirectRunner'
-
@staticmethod
def custom_callable(pcoll):
return pcoll | '+1' >> FlatMap(lambda x: [x + 1])
@@ -103,7 +100,7 @@ class PipelineTest(unittest.TestCase):
self.leave_composite.append(transform_node)
def test_create(self):
- pipeline = TestPipeline(runner=self.runner_name)
+ pipeline = TestPipeline()
pcoll = pipeline | 'label1' >> Create([1, 2, 3])
assert_that(pcoll, equal_to([1, 2, 3]))
@@ -114,19 +111,19 @@ class PipelineTest(unittest.TestCase):
pipeline.run()
def test_create_singleton_pcollection(self):
- pipeline = TestPipeline(runner=self.runner_name)
+ pipeline = TestPipeline()
pcoll = pipeline | 'label' >> Create([[1, 2, 3]])
assert_that(pcoll, equal_to([[1, 2, 3]]))
pipeline.run()
def test_read(self):
- pipeline = TestPipeline(runner=self.runner_name)
+ pipeline = TestPipeline()
pcoll = pipeline | 'read' >> Read(FakeSource([1, 2, 3]))
assert_that(pcoll, equal_to([1, 2, 3]))
pipeline.run()
def test_visit_entire_graph(self):
- pipeline = Pipeline(self.runner_name)
+ pipeline = Pipeline()
pcoll1 = pipeline | 'pcoll' >> Create([1, 2, 3])
pcoll2 = pcoll1 | 'do1' >> FlatMap(lambda x: [x + 1])
pcoll3 = pcoll2 | 'do2' >> FlatMap(lambda x: [x + 1])
@@ -145,14 +142,14 @@ class PipelineTest(unittest.TestCase):
self.assertEqual(visitor.leave_composite[0].transform, transform)
def test_apply_custom_transform(self):
- pipeline = TestPipeline(runner=self.runner_name)
+ pipeline = TestPipeline()
pcoll = pipeline | 'pcoll' >> Create([1, 2, 3])
result = pcoll | PipelineTest.CustomTransform()
assert_that(result, equal_to([2, 3, 4]))
pipeline.run()
def test_reuse_custom_transform_instance(self):
- pipeline = Pipeline(self.runner_name)
+ pipeline = Pipeline()
pcoll1 = pipeline | 'pcoll1' >> Create([1, 2, 3])
pcoll2 = pipeline | 'pcoll2' >> Create([4, 5, 6])
transform = PipelineTest.CustomTransform()
@@ -167,7 +164,7 @@ class PipelineTest(unittest.TestCase):
'pvalue | "label" >> transform')
def test_reuse_cloned_custom_transform_instance(self):
- pipeline = TestPipeline(runner=self.runner_name)
+ pipeline = TestPipeline()
pcoll1 = pipeline | 'pc1' >> Create([1, 2, 3])
pcoll2 = pipeline | 'pc2' >> Create([4, 5, 6])
transform = PipelineTest.CustomTransform()
@@ -240,7 +237,7 @@ class PipelineTest(unittest.TestCase):
def raise_exception(exn):
raise exn
with self.assertRaises(ValueError):
- with Pipeline(self.runner_name) as p:
+ with Pipeline() as p:
# pylint: disable=expression-not-assigned
p | Create([ValueError]) | Map(raise_exception)
@@ -251,15 +248,12 @@ class PipelineTest(unittest.TestCase):
class NewDoFnTest(unittest.TestCase):
- def setUp(self):
- self.runner_name = 'DirectRunner'
-
def test_element(self):
class TestDoFn(NewDoFn):
def process(self, element):
yield element + 10
- pipeline = TestPipeline(runner=self.runner_name)
+ pipeline = TestPipeline()
pcoll = pipeline | 'Create' >> Create([1, 2]) | 'Do' >> ParDo(TestDoFn())
assert_that(pcoll, equal_to([11, 12]))
pipeline.run()
@@ -269,7 +263,7 @@ class NewDoFnTest(unittest.TestCase):
def process(self, element, context=NewDoFn.ContextParam):
yield context.element + 10
- pipeline = TestPipeline(runner=self.runner_name)
+ pipeline = TestPipeline()
pcoll = pipeline | 'Create' >> Create([1, 2])| 'Do' >> ParDo(TestDoFn())
assert_that(pcoll, equal_to([11, 12]))
pipeline.run()
@@ -307,25 +301,16 @@ class NewDoFnTest(unittest.TestCase):
def test_window_param(self):
class TestDoFn(NewDoFn):
def process(self, element, window=NewDoFn.WindowParam):
- yield (float(window.start), float(window.end))
-
- class TestWindowFn(WindowFn):
- """Windowing function adding two disjoint windows to each element."""
-
- def assign(self, assign_context):
- _ = assign_context
- return [IntervalWindow(10, 20), IntervalWindow(20, 30)]
+ yield (element, (float(window.start), float(window.end)))
- def merge(self, existing_windows):
- return existing_windows
-
- pipeline = TestPipeline(runner=self.runner_name)
+ pipeline = TestPipeline()
pcoll = (pipeline
- | 'KVs' >> Create([(1, 10), (2, 20)])
- | 'W' >> WindowInto(windowfn=TestWindowFn())
- | 'Do' >> ParDo(TestDoFn()))
- assert_that(pcoll, equal_to([(10.0, 20.0), (10.0, 20.0),
- (20.0, 30.0), (20.0, 30.0)]))
+ | Create([1, 7])
+ | Map(lambda x: TimestampedValue(x, x))
+ | WindowInto(windowfn=SlidingWindows(10, 5))
+ | ParDo(TestDoFn()))
+ assert_that(pcoll, equal_to([(1, (-5, 5)), (1, (0, 10)),
+ (7, (0, 10)), (7, (5, 15))]))
pipeline.run()
def test_timestamp_param(self):
@@ -333,7 +318,7 @@ class NewDoFnTest(unittest.TestCase):
def process(self, element, timestamp=NewDoFn.TimestampParam):
yield timestamp
- pipeline = TestPipeline(runner=self.runner_name)
+ pipeline = TestPipeline()
pcoll = pipeline | 'Create' >> Create([1, 2]) | 'Do' >> ParDo(TestDoFn())
assert_that(pcoll, equal_to([MIN_TIMESTAMP, MIN_TIMESTAMP]))
pipeline.run()