You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2017/04/10 21:19:20 UTC
[1/2] beam git commit: Create as custom source
Repository: beam
Updated Branches:
refs/heads/master 836e8e4aa -> fc1006500
Create as custom source
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/33d4a02b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/33d4a02b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/33d4a02b
Branch: refs/heads/master
Commit: 33d4a02bb28e5a1c09513dc3e7701b30df148943
Parents: 836e8e4
Author: Vikas Kedigehalli <vi...@google.com>
Authored: Mon Apr 3 10:01:45 2017 -0700
Committer: Chamikara Jayalath <ch...@google.com>
Committed: Mon Apr 10 14:17:15 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/internal/pickler.py | 22 ++--
sdks/python/apache_beam/pipeline.py | 4 +-
sdks/python/apache_beam/pipeline_test.py | 13 +-
.../runners/dataflow/dataflow_runner.py | 24 ----
.../runners/dataflow/dataflow_runner_test.py | 2 +-
.../consumer_tracking_pipeline_visitor_test.py | 22 ++--
.../runners/direct/transform_evaluator.py | 31 -----
sdks/python/apache_beam/transforms/core.py | 92 +++++++++++++-
.../apache_beam/transforms/create_test.py | 121 +++++++++++++++++++
.../apache_beam/transforms/ptransform_test.py | 14 +--
10 files changed, 254 insertions(+), 91 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/33d4a02b/sdks/python/apache_beam/internal/pickler.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/internal/pickler.py b/sdks/python/apache_beam/internal/pickler.py
index 67f9fc3..a4ab7b9 100644
--- a/sdks/python/apache_beam/internal/pickler.py
+++ b/sdks/python/apache_beam/internal/pickler.py
@@ -181,12 +181,15 @@ logging.getLogger('dill').setLevel(logging.WARN)
# TODO(ccy): Currently, there are still instances of pickler.dumps() and
# pickler.loads() being used for data, which results in an unnecessary base64
# encoding. This should be cleaned up.
-def dumps(o):
+def dumps(o, enable_trace=True):
try:
s = dill.dumps(o)
- except Exception: # pylint: disable=broad-except
- dill.dill._trace(True) # pylint: disable=protected-access
- s = dill.dumps(o)
+ except Exception as e: # pylint: disable=broad-except
+ if enable_trace:
+ dill.dill._trace(True) # pylint: disable=protected-access
+ s = dill.dumps(o)
+ else:
+ raise e
finally:
dill.dill._trace(False) # pylint: disable=protected-access
@@ -199,7 +202,7 @@ def dumps(o):
return base64.b64encode(c)
-def loads(encoded):
+def loads(encoded, enable_trace=True):
c = base64.b64decode(encoded)
s = zlib.decompress(c)
@@ -207,9 +210,12 @@ def loads(encoded):
try:
return dill.loads(s)
- except Exception: # pylint: disable=broad-except
- dill.dill._trace(True) # pylint: disable=protected-access
- return dill.loads(s)
+ except Exception as e: # pylint: disable=broad-except
+ if enable_trace:
+ dill.dill._trace(True) # pylint: disable=protected-access
+ return dill.loads(s)
+ else:
+ raise e
finally:
dill.dill._trace(False) # pylint: disable=protected-access
http://git-wip-us.apache.org/repos/asf/beam/blob/33d4a02b/sdks/python/apache_beam/pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index b93167d..2ff9eb3 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -315,7 +315,9 @@ class Pipeline(object):
Visitor.ok = False
try:
# Transforms must be picklable.
- pickler.loads(pickler.dumps(transform_node.transform))
+ pickler.loads(pickler.dumps(transform_node.transform,
+ enable_trace=False),
+ enable_trace=False)
except Exception:
Visitor.ok = False
http://git-wip-us.apache.org/repos/asf/beam/blob/33d4a02b/sdks/python/apache_beam/pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py
index ba219bf..6314609 100644
--- a/sdks/python/apache_beam/pipeline_test.py
+++ b/sdks/python/apache_beam/pipeline_test.py
@@ -173,9 +173,9 @@ class PipelineTest(unittest.TestCase):
set(visitor.visited))
self.assertEqual(set(visitor.enter_composite),
set(visitor.leave_composite))
- self.assertEqual(2, len(visitor.enter_composite))
- self.assertEqual(visitor.enter_composite[1].transform, transform)
- self.assertEqual(visitor.leave_composite[0].transform, transform)
+ self.assertEqual(3, len(visitor.enter_composite))
+ self.assertEqual(visitor.enter_composite[2].transform, transform)
+ self.assertEqual(visitor.leave_composite[1].transform, transform)
def test_apply_custom_transform(self):
pipeline = TestPipeline()
@@ -280,9 +280,10 @@ class PipelineTest(unittest.TestCase):
# pylint: disable=expression-not-assigned
p | Create([ValueError]) | Map(raise_exception)
- def test_eager_pipeline(self):
- p = Pipeline('EagerRunner')
- self.assertEqual([1, 4, 9], p | Create([1, 2, 3]) | Map(lambda x: x*x))
+ # TODO(BEAM-1894).
+ # def test_eager_pipeline(self):
+ # p = Pipeline('EagerRunner')
+ # self.assertEqual([1, 4, 9], p | Create([1, 2, 3]) | Map(lambda x: x*x))
class DoFnTest(unittest.TestCase):
http://git-wip-us.apache.org/repos/asf/beam/blob/33d4a02b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index f0600bc..1a935c1 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -21,7 +21,6 @@ The runner will create a JSON description of the job graph and then submit it
to the Dataflow Service for remote execution by a worker.
"""
-import base64
import logging
import threading
import time
@@ -261,29 +260,6 @@ class DataflowRunner(PipelineRunner):
return step
- def run_Create(self, transform_node):
- transform = transform_node.transform
- step = self._add_step(TransformNames.CREATE_PCOLLECTION,
- transform_node.full_label, transform_node)
- # TODO(silviuc): Eventually use a coder based on typecoders.
- # Note that we base64-encode values here so that the service will accept
- # the values.
- element_coder = coders.PickleCoder()
- step.add_property(
- PropertyNames.ELEMENT,
- [base64.b64encode(element_coder.encode(v))
- for v in transform.value])
- # The service expects a WindowedValueCoder here, so we wrap the actual
- # encoding in a WindowedValueCoder.
- step.encoding = self._get_cloud_encoding(
- coders.WindowedValueCoder(element_coder))
- step.add_property(
- PropertyNames.OUTPUT_INFO,
- [{PropertyNames.USER_NAME: (
- '%s.%s' % (transform_node.full_label, PropertyNames.OUT)),
- PropertyNames.ENCODING: step.encoding,
- PropertyNames.OUTPUT_NAME: PropertyNames.OUT}])
-
def _add_singleton_step(self, label, full_label, tag, input_step):
"""Creates a CollectionToSingleton step used to handle ParDo side inputs."""
# Import here to avoid adding the dependency for local running scenarios.
http://git-wip-us.apache.org/repos/asf/beam/blob/33d4a02b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
index cc5928a..b9ed84d 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
@@ -142,7 +142,7 @@ class DataflowRunnerTest(unittest.TestCase):
steps = [step
for step in job_dict['steps']
if len(step['properties'].get('display_data', [])) > 0]
- step = steps[0]
+ step = steps[1]
disp_data = step['properties']['display_data']
disp_data = sorted(disp_data, key=lambda x: x['namespace']+x['key'])
nspace = SpecialParDo.__module__+ '.'
http://git-wip-us.apache.org/repos/asf/beam/blob/33d4a02b/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py b/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py
index eb8b14b..154284b 100644
--- a/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py
+++ b/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py
@@ -46,8 +46,6 @@ class ConsumerTrackingPipelineVisitorTest(unittest.TestCase):
self.visitor = ConsumerTrackingPipelineVisitor()
def test_root_transforms(self):
- root_create = Create([[1, 2, 3]])
-
class DummySource(iobase.BoundedSource):
pass
@@ -55,9 +53,8 @@ class ConsumerTrackingPipelineVisitorTest(unittest.TestCase):
root_flatten = Flatten(pipeline=self.pipeline)
pbegin = pvalue.PBegin(self.pipeline)
- pcoll_create = pbegin | 'create' >> root_create
- pbegin | 'read' >> root_read
- pcoll_create | FlatMap(lambda x: x)
+ pcoll_read = pbegin | 'read' >> root_read
+ pcoll_read | FlatMap(lambda x: x)
[] | 'flatten' >> root_flatten
self.pipeline.visit(self.visitor)
@@ -66,12 +63,12 @@ class ConsumerTrackingPipelineVisitorTest(unittest.TestCase):
[t.transform for t in self.visitor.root_transforms])
self.assertEqual(root_transforms, sorted(
- [root_read, root_create, root_flatten]))
+ [root_read, root_flatten]))
pbegin_consumers = sorted(
[c.transform for c in self.visitor.value_to_consumers[pbegin]])
- self.assertEqual(pbegin_consumers, sorted([root_read, root_create]))
- self.assertEqual(len(self.visitor.step_names), 4)
+ self.assertEqual(pbegin_consumers, sorted([root_read]))
+ self.assertEqual(len(self.visitor.step_names), 3)
def test_side_inputs(self):
@@ -88,10 +85,13 @@ class ConsumerTrackingPipelineVisitorTest(unittest.TestCase):
def process(self, element, negatives):
yield element
- root_create = Create([[-1, 2, 3]])
+ class DummySource(iobase.BoundedSource):
+ pass
+
+ root_read = Read(DummySource())
result = (self.pipeline
- | 'create' >> root_create
+ | 'read' >> root_read
| ParDo(SplitNumbersFn()).with_outputs('tag_negative',
main='positive'))
positive, negative = result
@@ -101,7 +101,7 @@ class ConsumerTrackingPipelineVisitorTest(unittest.TestCase):
root_transforms = sorted(
[t.transform for t in self.visitor.root_transforms])
- self.assertEqual(root_transforms, sorted([root_create]))
+ self.assertEqual(root_transforms, sorted([root_read]))
self.assertEqual(len(self.visitor.step_names), 3)
self.assertEqual(len(self.visitor.views), 1)
self.assertTrue(isinstance(self.visitor.views[0],
http://git-wip-us.apache.org/repos/asf/beam/blob/33d4a02b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
index 0c35d99..662c61d 100644
--- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py
+++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
@@ -49,7 +49,6 @@ class TransformEvaluatorRegistry(object):
self._evaluation_context = evaluation_context
self._evaluators = {
io.Read: _BoundedReadEvaluator,
- core.Create: _CreateEvaluator,
core.Flatten: _FlattenEvaluator,
core.ParDo: _ParDoEvaluator,
core.GroupByKeyOnly: _GroupByKeyOnlyEvaluator,
@@ -233,36 +232,6 @@ class _FlattenEvaluator(_TransformEvaluator):
self._applied_ptransform, bundles, None, None, None, None)
-class _CreateEvaluator(_TransformEvaluator):
- """TransformEvaluator for Create transform."""
-
- def __init__(self, evaluation_context, applied_ptransform,
- input_committed_bundle, side_inputs, scoped_metrics_container):
- assert not input_committed_bundle
- assert not side_inputs
- super(_CreateEvaluator, self).__init__(
- evaluation_context, applied_ptransform, input_committed_bundle,
- side_inputs, scoped_metrics_container)
-
- def start_bundle(self):
- assert len(self._outputs) == 1
- output_pcollection = list(self._outputs)[0]
- self.bundle = self._evaluation_context.create_bundle(output_pcollection)
-
- def finish_bundle(self):
- bundles = []
- transform = self._applied_ptransform.transform
-
- assert transform.value is not None
- create_result = [GlobalWindows.windowed_value(v) for v in transform.value]
- for result in create_result:
- self.bundle.output(result)
- bundles.append(self.bundle)
-
- return TransformResult(
- self._applied_ptransform, bundles, None, None, None, None)
-
-
class _TaggedReceivers(dict):
"""Received ParDo output and redirect to the associated output bundle."""
http://git-wip-us.apache.org/repos/asf/beam/blob/33d4a02b/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index 88fdec8..cf313d1 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -1088,7 +1088,6 @@ class GroupByKey(PTransform):
# This code path is only used in the local direct runner. For Dataflow
# runner execution, the GroupByKey transform is expanded on the service.
input_type = pcoll.element_type
-
if input_type is not None:
# Initialize type-hints used below to enforce type-checking and to pass
# downstream to further PTransforms.
@@ -1373,11 +1372,100 @@ class Create(PTransform):
def expand(self, pbegin):
assert isinstance(pbegin, pvalue.PBegin)
self.pipeline = pbegin.pipeline
- return pvalue.PCollection(self.pipeline)
+ ouput_type = (self.get_type_hints().simple_output_type(self.label) or
+ self.infer_output_type(None))
+ coder = typecoders.registry.get_coder(ouput_type)
+ source = self._create_source_from_iterable(self.value, coder)
+ return pbegin.pipeline | Read(source).with_output_types(ouput_type)
def get_windowing(self, unused_inputs):
return Windowing(GlobalWindows())
+ @staticmethod
+ def _create_source_from_iterable(values, coder):
+ return Create._create_source(map(coder.encode, values), coder)
+
+ @staticmethod
+ def _create_source(serialized_values, coder):
+ from apache_beam.io import iobase
+
+ class _CreateSource(iobase.BoundedSource):
+ def __init__(self, serialized_values, coder):
+ self._coder = coder
+ self._serialized_values = []
+ self._total_size = 0
+ self._serialized_values = serialized_values
+ self._total_size = sum(map(len, self._serialized_values))
+
+ def read(self, range_tracker):
+ start_position = range_tracker.start_position()
+ current_position = start_position
+
+ def split_points_unclaimed(stop_position):
+ if current_position >= stop_position:
+ return 0
+ else:
+ return stop_position - current_position - 1
+
+ range_tracker.set_split_points_unclaimed_callback(
+ split_points_unclaimed)
+ element_iter = iter(self._serialized_values[start_position:])
+ for i in range(start_position, range_tracker.stop_position()):
+ if not range_tracker.try_claim(i):
+ return
+ current_position = i
+ yield self._coder.decode(next(element_iter))
+
+ def split(self, desired_bundle_size, start_position=None,
+ stop_position=None):
+ from apache_beam.io import iobase
+
+ if len(self._serialized_values) < 2:
+ yield iobase.SourceBundle(
+ weight=0, source=self, start_position=0,
+ stop_position=len(self._serialized_values))
+ else:
+ if start_position is None:
+ start_position = 0
+ if stop_position is None:
+ stop_position = len(self._serialized_values)
+
+ avg_size_per_value = self._total_size / len(self._serialized_values)
+ num_values_per_split = max(
+ int(desired_bundle_size / avg_size_per_value), 1)
+
+ start = start_position
+ while start < stop_position:
+ end = min(start + num_values_per_split, stop_position)
+ remaining = stop_position - end
+ # Avoid having a too small bundle at the end.
+ if remaining < (num_values_per_split / 4):
+ end = stop_position
+
+ sub_source = Create._create_source(
+ self._serialized_values[start:end], self._coder)
+
+ yield iobase.SourceBundle(weight=(end - start),
+ source=sub_source,
+ start_position=0,
+ stop_position=(end - start))
+
+ start = end
+
+ def get_range_tracker(self, start_position, stop_position):
+ if start_position is None:
+ start_position = 0
+ if stop_position is None:
+ stop_position = len(self._serialized_values)
+
+ from apache_beam import io
+ return io.OffsetRangeTracker(start_position, stop_position)
+
+ def estimate_size(self):
+ return self._total_size
+
+ return _CreateSource(serialized_values, coder)
+
def Read(*args, **kwargs):
from apache_beam import io
http://git-wip-us.apache.org/repos/asf/beam/blob/33d4a02b/sdks/python/apache_beam/transforms/create_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/create_test.py b/sdks/python/apache_beam/transforms/create_test.py
new file mode 100644
index 0000000..f4b1f07
--- /dev/null
+++ b/sdks/python/apache_beam/transforms/create_test.py
@@ -0,0 +1,121 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for the Create and _CreateSource classes."""
+import unittest
+
+from apache_beam.io import source_test_utils
+
+from apache_beam import Create, assert_that, equal_to
+from apache_beam.coders import FastPrimitivesCoder
+from apache_beam.test_pipeline import TestPipeline
+
+
+class CreateTest(unittest.TestCase):
+ def setUp(self):
+ self.coder = FastPrimitivesCoder()
+
+ def test_create_transform(self):
+ with TestPipeline() as p:
+ assert_that(p | Create(range(10)), equal_to(range(10)))
+
+ def test_create_source_read(self):
+ self.check_read([], self.coder)
+ self.check_read([1], self.coder)
+ # multiple values.
+ self.check_read(range(10), self.coder)
+
+ def check_read(self, values, coder):
+ source = Create._create_source_from_iterable(values, coder)
+ read_values = source_test_utils.readFromSource(source)
+ self.assertEqual(sorted(values), sorted(read_values))
+
+ def test_create_source_read_with_initial_splits(self):
+ self.check_read_with_initial_splits([], self.coder, num_splits=2)
+ self.check_read_with_initial_splits([1], self.coder, num_splits=2)
+ values = range(8)
+ # multiple values with a single split.
+ self.check_read_with_initial_splits(values, self.coder, num_splits=1)
+ # multiple values with a single split with a large desired bundle size
+ self.check_read_with_initial_splits(values, self.coder, num_splits=0.5)
+ # multiple values with many splits.
+ self.check_read_with_initial_splits(values, self.coder, num_splits=3)
+ # multiple values with uneven sized splits.
+ self.check_read_with_initial_splits(values, self.coder, num_splits=4)
+ # multiple values with num splits equal to num values.
+ self.check_read_with_initial_splits(values, self.coder,
+ num_splits=len(values))
+ # multiple values with num splits greater than to num values.
+ self.check_read_with_initial_splits(values, self.coder, num_splits=30)
+
+ def check_read_with_initial_splits(self, values, coder, num_splits):
+ """A test that splits the given source into `num_splits` and verifies that
+ the data read from original source is equal to the union of the data read
+ from the split sources.
+ """
+ source = Create._create_source_from_iterable(values, coder)
+ desired_bundle_size = source._total_size / num_splits
+ splits = source.split(desired_bundle_size)
+ splits_info = [
+ (split.source, split.start_position, split.stop_position)
+ for split in splits]
+ source_test_utils.assertSourcesEqualReferenceSource((source, None, None),
+ splits_info)
+
+ def test_create_source_read_reentrant(self):
+ source = Create._create_source_from_iterable(range(9), self.coder)
+ source_test_utils.assertReentrantReadsSucceed((source, None, None))
+
+ def test_create_source_read_reentrant_with_initial_splits(self):
+ source = Create._create_source_from_iterable(range(24), self.coder)
+ for split in source.split(desired_bundle_size=5):
+ source_test_utils.assertReentrantReadsSucceed((split.source,
+ split.start_position,
+ split.stop_position))
+
+ def test_create_source_dynamic_splitting(self):
+ # 2 values
+ source = Create._create_source_from_iterable(range(2), self.coder)
+ source_test_utils.assertSplitAtFractionExhaustive(source)
+ # Multiple values.
+ source = Create._create_source_from_iterable(range(11), self.coder)
+ source_test_utils.assertSplitAtFractionExhaustive(
+ source, perform_multi_threaded_test=True)
+
+ def test_create_source_progress(self):
+ num_values = 10
+ source = Create._create_source_from_iterable(range(num_values), self.coder)
+ splits = [split for split in source.split(desired_bundle_size=100)]
+ assert len(splits) == 1
+ fraction_consumed_report = []
+ split_points_report = []
+ range_tracker = splits[0].source.get_range_tracker(
+ splits[0].start_position, splits[0].stop_position)
+ for _ in splits[0].source.read(range_tracker):
+ fraction_consumed_report.append(range_tracker.fraction_consumed())
+ split_points_report.append(range_tracker.split_points())
+
+ self.assertEqual(
+ [float(i) / num_values for i in range(num_values)],
+ fraction_consumed_report)
+
+ expected_split_points_report = [
+ ((i - 1), num_values - (i - 1))
+ for i in range(1, num_values + 1)]
+
+ self.assertEqual(
+ expected_split_points_report, split_points_report)
http://git-wip-us.apache.org/repos/asf/beam/blob/33d4a02b/sdks/python/apache_beam/transforms/ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
index 5889ab5..cb1dd77 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -58,7 +58,7 @@ class PTransformTest(unittest.TestCase):
pa = TestPipeline()
res = pa | 'ALabel' >> beam.Create([1, 2])
- self.assertEqual('AppliedPTransform(ALabel, Create)',
+ self.assertEqual('AppliedPTransform(ALabel/Read, Read)',
str(res.producer))
pc = TestPipeline()
@@ -66,7 +66,7 @@ class PTransformTest(unittest.TestCase):
inputs_tr = res.producer.transform
inputs_tr.inputs = ('ci',)
self.assertEqual(
- """<Create(PTransform) label=[Create] inputs=('ci',)>""",
+ """<Read(PTransform) label=[Read] inputs=('ci',)>""",
str(inputs_tr))
pd = TestPipeline()
@@ -74,12 +74,12 @@ class PTransformTest(unittest.TestCase):
side_tr = res.producer.transform
side_tr.side_inputs = (4,)
self.assertEqual(
- '<Create(PTransform) label=[Create] side_inputs=(4,)>',
+ '<Read(PTransform) label=[Read] side_inputs=(4,)>',
str(side_tr))
inputs_tr.side_inputs = ('cs',)
self.assertEqual(
- """<Create(PTransform) label=[Create] """
+ """<Read(PTransform) label=[Read] """
"""inputs=('ci',) side_inputs=('cs',)>""",
str(inputs_tr))
@@ -689,7 +689,7 @@ class PTransformLabelsTest(unittest.TestCase):
def check_label(self, ptransform, expected_label):
pipeline = TestPipeline()
pipeline | 'Start' >> beam.Create([('a', 1)]) | ptransform
- actual_label = sorted(pipeline.applied_labels - {'Start'})[0]
+ actual_label = sorted(pipeline.applied_labels - {'Start', 'Start/Read'})[0]
self.assertEqual(expected_label, re.sub(r'\d{3,}', '#', actual_label))
def test_default_labels(self):
@@ -707,7 +707,7 @@ class PTransformLabelsTest(unittest.TestCase):
self.check_label(beam.ParDo(MyDoFn()), r'ParDo(MyDoFn)')
- def test_lable_propogation(self):
+ def test_label_propogation(self):
self.check_label('TestMap' >> beam.Map(len), r'TestMap')
self.check_label('TestLambda' >> beam.Map(lambda x: x), r'TestLambda')
self.check_label('TestFlatMap' >> beam.FlatMap(list), r'TestFlatMap')
@@ -1058,7 +1058,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
# aliased to Tuple[int, str].
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
- | (beam.Create(range(5))
+ | (beam.Create([[1], [2]])
.with_output_types(typehints.Iterable[int]))
| 'T' >> beam.GroupByKey())
[2/2] beam git commit: This closes #2446
Posted by ch...@apache.org.
This closes #2446
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fc100650
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fc100650
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fc100650
Branch: refs/heads/master
Commit: fc1006500d14afd01a3a34527c0ea9dc94ac09e9
Parents: 836e8e4 33d4a02
Author: Chamikara Jayalath <ch...@google.com>
Authored: Mon Apr 10 14:18:30 2017 -0700
Committer: Chamikara Jayalath <ch...@google.com>
Committed: Mon Apr 10 14:18:30 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/internal/pickler.py | 22 ++--
sdks/python/apache_beam/pipeline.py | 4 +-
sdks/python/apache_beam/pipeline_test.py | 13 +-
.../runners/dataflow/dataflow_runner.py | 24 ----
.../runners/dataflow/dataflow_runner_test.py | 2 +-
.../consumer_tracking_pipeline_visitor_test.py | 22 ++--
.../runners/direct/transform_evaluator.py | 31 -----
sdks/python/apache_beam/transforms/core.py | 92 +++++++++++++-
.../apache_beam/transforms/create_test.py | 121 +++++++++++++++++++
.../apache_beam/transforms/ptransform_test.py | 14 +--
10 files changed, 254 insertions(+), 91 deletions(-)
----------------------------------------------------------------------