You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2017/04/10 21:19:20 UTC

[1/2] beam git commit: Create as custom source

Repository: beam
Updated Branches:
  refs/heads/master 836e8e4aa -> fc1006500


Create as custom source


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/33d4a02b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/33d4a02b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/33d4a02b

Branch: refs/heads/master
Commit: 33d4a02bb28e5a1c09513dc3e7701b30df148943
Parents: 836e8e4
Author: Vikas Kedigehalli <vi...@google.com>
Authored: Mon Apr 3 10:01:45 2017 -0700
Committer: Chamikara Jayalath <ch...@google.com>
Committed: Mon Apr 10 14:17:15 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/internal/pickler.py     |  22 ++--
 sdks/python/apache_beam/pipeline.py             |   4 +-
 sdks/python/apache_beam/pipeline_test.py        |  13 +-
 .../runners/dataflow/dataflow_runner.py         |  24 ----
 .../runners/dataflow/dataflow_runner_test.py    |   2 +-
 .../consumer_tracking_pipeline_visitor_test.py  |  22 ++--
 .../runners/direct/transform_evaluator.py       |  31 -----
 sdks/python/apache_beam/transforms/core.py      |  92 +++++++++++++-
 .../apache_beam/transforms/create_test.py       | 121 +++++++++++++++++++
 .../apache_beam/transforms/ptransform_test.py   |  14 +--
 10 files changed, 254 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/33d4a02b/sdks/python/apache_beam/internal/pickler.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/internal/pickler.py b/sdks/python/apache_beam/internal/pickler.py
index 67f9fc3..a4ab7b9 100644
--- a/sdks/python/apache_beam/internal/pickler.py
+++ b/sdks/python/apache_beam/internal/pickler.py
@@ -181,12 +181,15 @@ logging.getLogger('dill').setLevel(logging.WARN)
 # TODO(ccy): Currently, there are still instances of pickler.dumps() and
 # pickler.loads() being used for data, which results in an unnecessary base64
 # encoding.  This should be cleaned up.
-def dumps(o):
+def dumps(o, enable_trace=True):
   try:
     s = dill.dumps(o)
-  except Exception:          # pylint: disable=broad-except
-    dill.dill._trace(True)   # pylint: disable=protected-access
-    s = dill.dumps(o)
+  except Exception as e:      # pylint: disable=broad-except
+    if enable_trace:
+      dill.dill._trace(True)  # pylint: disable=protected-access
+      s = dill.dumps(o)
+    else:
+      raise e
   finally:
     dill.dill._trace(False)  # pylint: disable=protected-access
 
@@ -199,7 +202,7 @@ def dumps(o):
   return base64.b64encode(c)
 
 
-def loads(encoded):
+def loads(encoded, enable_trace=True):
   c = base64.b64decode(encoded)
 
   s = zlib.decompress(c)
@@ -207,9 +210,12 @@ def loads(encoded):
 
   try:
     return dill.loads(s)
-  except Exception:          # pylint: disable=broad-except
-    dill.dill._trace(True)   # pylint: disable=protected-access
-    return dill.loads(s)
+  except Exception as e:          # pylint: disable=broad-except
+    if enable_trace:
+      dill.dill._trace(True)   # pylint: disable=protected-access
+      return dill.loads(s)
+    else:
+      raise e
   finally:
     dill.dill._trace(False)  # pylint: disable=protected-access
 

http://git-wip-us.apache.org/repos/asf/beam/blob/33d4a02b/sdks/python/apache_beam/pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index b93167d..2ff9eb3 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -315,7 +315,9 @@ class Pipeline(object):
           Visitor.ok = False
         try:
           # Transforms must be picklable.
-          pickler.loads(pickler.dumps(transform_node.transform))
+          pickler.loads(pickler.dumps(transform_node.transform,
+                                      enable_trace=False),
+                        enable_trace=False)
         except Exception:
           Visitor.ok = False
 

http://git-wip-us.apache.org/repos/asf/beam/blob/33d4a02b/sdks/python/apache_beam/pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py
index ba219bf..6314609 100644
--- a/sdks/python/apache_beam/pipeline_test.py
+++ b/sdks/python/apache_beam/pipeline_test.py
@@ -173,9 +173,9 @@ class PipelineTest(unittest.TestCase):
                      set(visitor.visited))
     self.assertEqual(set(visitor.enter_composite),
                      set(visitor.leave_composite))
-    self.assertEqual(2, len(visitor.enter_composite))
-    self.assertEqual(visitor.enter_composite[1].transform, transform)
-    self.assertEqual(visitor.leave_composite[0].transform, transform)
+    self.assertEqual(3, len(visitor.enter_composite))
+    self.assertEqual(visitor.enter_composite[2].transform, transform)
+    self.assertEqual(visitor.leave_composite[1].transform, transform)
 
   def test_apply_custom_transform(self):
     pipeline = TestPipeline()
@@ -280,9 +280,10 @@ class PipelineTest(unittest.TestCase):
         # pylint: disable=expression-not-assigned
         p | Create([ValueError]) | Map(raise_exception)
 
-  def test_eager_pipeline(self):
-    p = Pipeline('EagerRunner')
-    self.assertEqual([1, 4, 9], p | Create([1, 2, 3]) | Map(lambda x: x*x))
+  # TODO(BEAM-1894).
+  # def test_eager_pipeline(self):
+  #   p = Pipeline('EagerRunner')
+  #   self.assertEqual([1, 4, 9], p | Create([1, 2, 3]) | Map(lambda x: x*x))
 
 
 class DoFnTest(unittest.TestCase):

http://git-wip-us.apache.org/repos/asf/beam/blob/33d4a02b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index f0600bc..1a935c1 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -21,7 +21,6 @@ The runner will create a JSON description of the job graph and then submit it
 to the Dataflow Service for remote execution by a worker.
 """
 
-import base64
 import logging
 import threading
 import time
@@ -261,29 +260,6 @@ class DataflowRunner(PipelineRunner):
 
     return step
 
-  def run_Create(self, transform_node):
-    transform = transform_node.transform
-    step = self._add_step(TransformNames.CREATE_PCOLLECTION,
-                          transform_node.full_label, transform_node)
-    # TODO(silviuc): Eventually use a coder based on typecoders.
-    # Note that we base64-encode values here so that the service will accept
-    # the values.
-    element_coder = coders.PickleCoder()
-    step.add_property(
-        PropertyNames.ELEMENT,
-        [base64.b64encode(element_coder.encode(v))
-         for v in transform.value])
-    # The service expects a WindowedValueCoder here, so we wrap the actual
-    # encoding in a WindowedValueCoder.
-    step.encoding = self._get_cloud_encoding(
-        coders.WindowedValueCoder(element_coder))
-    step.add_property(
-        PropertyNames.OUTPUT_INFO,
-        [{PropertyNames.USER_NAME: (
-            '%s.%s' % (transform_node.full_label, PropertyNames.OUT)),
-          PropertyNames.ENCODING: step.encoding,
-          PropertyNames.OUTPUT_NAME: PropertyNames.OUT}])
-
   def _add_singleton_step(self, label, full_label, tag, input_step):
     """Creates a CollectionToSingleton step used to handle ParDo side inputs."""
     # Import here to avoid adding the dependency for local running scenarios.

http://git-wip-us.apache.org/repos/asf/beam/blob/33d4a02b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
index cc5928a..b9ed84d 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
@@ -142,7 +142,7 @@ class DataflowRunnerTest(unittest.TestCase):
     steps = [step
              for step in job_dict['steps']
              if len(step['properties'].get('display_data', [])) > 0]
-    step = steps[0]
+    step = steps[1]
     disp_data = step['properties']['display_data']
     disp_data = sorted(disp_data, key=lambda x: x['namespace']+x['key'])
     nspace = SpecialParDo.__module__+ '.'

http://git-wip-us.apache.org/repos/asf/beam/blob/33d4a02b/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py b/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py
index eb8b14b..154284b 100644
--- a/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py
+++ b/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py
@@ -46,8 +46,6 @@ class ConsumerTrackingPipelineVisitorTest(unittest.TestCase):
     self.visitor = ConsumerTrackingPipelineVisitor()
 
   def test_root_transforms(self):
-    root_create = Create([[1, 2, 3]])
-
     class DummySource(iobase.BoundedSource):
       pass
 
@@ -55,9 +53,8 @@ class ConsumerTrackingPipelineVisitorTest(unittest.TestCase):
     root_flatten = Flatten(pipeline=self.pipeline)
 
     pbegin = pvalue.PBegin(self.pipeline)
-    pcoll_create = pbegin | 'create' >> root_create
-    pbegin | 'read' >> root_read
-    pcoll_create | FlatMap(lambda x: x)
+    pcoll_read = pbegin | 'read' >> root_read
+    pcoll_read | FlatMap(lambda x: x)
     [] | 'flatten' >> root_flatten
 
     self.pipeline.visit(self.visitor)
@@ -66,12 +63,12 @@ class ConsumerTrackingPipelineVisitorTest(unittest.TestCase):
         [t.transform for t in self.visitor.root_transforms])
 
     self.assertEqual(root_transforms, sorted(
-        [root_read, root_create, root_flatten]))
+        [root_read, root_flatten]))
 
     pbegin_consumers = sorted(
         [c.transform for c in self.visitor.value_to_consumers[pbegin]])
-    self.assertEqual(pbegin_consumers, sorted([root_read, root_create]))
-    self.assertEqual(len(self.visitor.step_names), 4)
+    self.assertEqual(pbegin_consumers, sorted([root_read]))
+    self.assertEqual(len(self.visitor.step_names), 3)
 
   def test_side_inputs(self):
 
@@ -88,10 +85,13 @@ class ConsumerTrackingPipelineVisitorTest(unittest.TestCase):
       def process(self, element, negatives):
         yield element
 
-    root_create = Create([[-1, 2, 3]])
+    class DummySource(iobase.BoundedSource):
+      pass
+
+    root_read = Read(DummySource())
 
     result = (self.pipeline
-              | 'create' >> root_create
+              | 'read' >> root_read
               | ParDo(SplitNumbersFn()).with_outputs('tag_negative',
                                                      main='positive'))
     positive, negative = result
@@ -101,7 +101,7 @@ class ConsumerTrackingPipelineVisitorTest(unittest.TestCase):
 
     root_transforms = sorted(
         [t.transform for t in self.visitor.root_transforms])
-    self.assertEqual(root_transforms, sorted([root_create]))
+    self.assertEqual(root_transforms, sorted([root_read]))
     self.assertEqual(len(self.visitor.step_names), 3)
     self.assertEqual(len(self.visitor.views), 1)
     self.assertTrue(isinstance(self.visitor.views[0],

http://git-wip-us.apache.org/repos/asf/beam/blob/33d4a02b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
index 0c35d99..662c61d 100644
--- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py
+++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
@@ -49,7 +49,6 @@ class TransformEvaluatorRegistry(object):
     self._evaluation_context = evaluation_context
     self._evaluators = {
         io.Read: _BoundedReadEvaluator,
-        core.Create: _CreateEvaluator,
         core.Flatten: _FlattenEvaluator,
         core.ParDo: _ParDoEvaluator,
         core.GroupByKeyOnly: _GroupByKeyOnlyEvaluator,
@@ -233,36 +232,6 @@ class _FlattenEvaluator(_TransformEvaluator):
         self._applied_ptransform, bundles, None, None, None, None)
 
 
-class _CreateEvaluator(_TransformEvaluator):
-  """TransformEvaluator for Create transform."""
-
-  def __init__(self, evaluation_context, applied_ptransform,
-               input_committed_bundle, side_inputs, scoped_metrics_container):
-    assert not input_committed_bundle
-    assert not side_inputs
-    super(_CreateEvaluator, self).__init__(
-        evaluation_context, applied_ptransform, input_committed_bundle,
-        side_inputs, scoped_metrics_container)
-
-  def start_bundle(self):
-    assert len(self._outputs) == 1
-    output_pcollection = list(self._outputs)[0]
-    self.bundle = self._evaluation_context.create_bundle(output_pcollection)
-
-  def finish_bundle(self):
-    bundles = []
-    transform = self._applied_ptransform.transform
-
-    assert transform.value is not None
-    create_result = [GlobalWindows.windowed_value(v) for v in transform.value]
-    for result in create_result:
-      self.bundle.output(result)
-    bundles.append(self.bundle)
-
-    return TransformResult(
-        self._applied_ptransform, bundles, None, None, None, None)
-
-
 class _TaggedReceivers(dict):
   """Received ParDo output and redirect to the associated output bundle."""
 

http://git-wip-us.apache.org/repos/asf/beam/blob/33d4a02b/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index 88fdec8..cf313d1 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -1088,7 +1088,6 @@ class GroupByKey(PTransform):
     # This code path is only used in the local direct runner.  For Dataflow
     # runner execution, the GroupByKey transform is expanded on the service.
     input_type = pcoll.element_type
-
     if input_type is not None:
       # Initialize type-hints used below to enforce type-checking and to pass
       # downstream to further PTransforms.
@@ -1373,11 +1372,100 @@ class Create(PTransform):
   def expand(self, pbegin):
     assert isinstance(pbegin, pvalue.PBegin)
     self.pipeline = pbegin.pipeline
-    return pvalue.PCollection(self.pipeline)
+    ouput_type = (self.get_type_hints().simple_output_type(self.label) or
+                  self.infer_output_type(None))
+    coder = typecoders.registry.get_coder(ouput_type)
+    source = self._create_source_from_iterable(self.value, coder)
+    return pbegin.pipeline | Read(source).with_output_types(ouput_type)
 
   def get_windowing(self, unused_inputs):
     return Windowing(GlobalWindows())
 
+  @staticmethod
+  def _create_source_from_iterable(values, coder):
+    return Create._create_source(map(coder.encode, values), coder)
+
+  @staticmethod
+  def _create_source(serialized_values, coder):
+    from apache_beam.io import iobase
+
+    class _CreateSource(iobase.BoundedSource):
+      def __init__(self, serialized_values, coder):
+        self._coder = coder
+        self._serialized_values = []
+        self._total_size = 0
+        self._serialized_values = serialized_values
+        self._total_size = sum(map(len, self._serialized_values))
+
+      def read(self, range_tracker):
+        start_position = range_tracker.start_position()
+        current_position = start_position
+
+        def split_points_unclaimed(stop_position):
+          if current_position >= stop_position:
+            return 0
+          else:
+            return stop_position - current_position - 1
+
+        range_tracker.set_split_points_unclaimed_callback(
+            split_points_unclaimed)
+        element_iter = iter(self._serialized_values[start_position:])
+        for i in range(start_position, range_tracker.stop_position()):
+          if not range_tracker.try_claim(i):
+            return
+          current_position = i
+          yield self._coder.decode(next(element_iter))
+
+      def split(self, desired_bundle_size, start_position=None,
+                stop_position=None):
+        from apache_beam.io import iobase
+
+        if len(self._serialized_values) < 2:
+          yield iobase.SourceBundle(
+              weight=0, source=self, start_position=0,
+              stop_position=len(self._serialized_values))
+        else:
+          if start_position is None:
+            start_position = 0
+          if stop_position is None:
+            stop_position = len(self._serialized_values)
+
+          avg_size_per_value = self._total_size / len(self._serialized_values)
+          num_values_per_split = max(
+              int(desired_bundle_size / avg_size_per_value), 1)
+
+          start = start_position
+          while start < stop_position:
+            end = min(start + num_values_per_split, stop_position)
+            remaining = stop_position - end
+            # Avoid having a too small bundle at the end.
+            if remaining < (num_values_per_split / 4):
+              end = stop_position
+
+            sub_source = Create._create_source(
+                self._serialized_values[start:end], self._coder)
+
+            yield iobase.SourceBundle(weight=(end - start),
+                                      source=sub_source,
+                                      start_position=0,
+                                      stop_position=(end - start))
+
+            start = end
+
+      def get_range_tracker(self, start_position, stop_position):
+        if start_position is None:
+          start_position = 0
+        if stop_position is None:
+          stop_position = len(self._serialized_values)
+
+        from apache_beam import io
+        return io.OffsetRangeTracker(start_position, stop_position)
+
+      def estimate_size(self):
+        return self._total_size
+
+    return _CreateSource(serialized_values, coder)
+
 
 def Read(*args, **kwargs):
   from apache_beam import io

http://git-wip-us.apache.org/repos/asf/beam/blob/33d4a02b/sdks/python/apache_beam/transforms/create_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/create_test.py b/sdks/python/apache_beam/transforms/create_test.py
new file mode 100644
index 0000000..f4b1f07
--- /dev/null
+++ b/sdks/python/apache_beam/transforms/create_test.py
@@ -0,0 +1,121 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for the Create and _CreateSource classes."""
+import unittest
+
+from apache_beam.io import source_test_utils
+
+from apache_beam import Create, assert_that, equal_to
+from apache_beam.coders import FastPrimitivesCoder
+from apache_beam.test_pipeline import TestPipeline
+
+
+class CreateTest(unittest.TestCase):
+  def setUp(self):
+    self.coder = FastPrimitivesCoder()
+
+  def test_create_transform(self):
+    with TestPipeline() as p:
+      assert_that(p | Create(range(10)), equal_to(range(10)))
+
+  def test_create_source_read(self):
+    self.check_read([], self.coder)
+    self.check_read([1], self.coder)
+    # multiple values.
+    self.check_read(range(10), self.coder)
+
+  def check_read(self, values, coder):
+    source = Create._create_source_from_iterable(values, coder)
+    read_values = source_test_utils.readFromSource(source)
+    self.assertEqual(sorted(values), sorted(read_values))
+
+  def test_create_source_read_with_initial_splits(self):
+    self.check_read_with_initial_splits([], self.coder, num_splits=2)
+    self.check_read_with_initial_splits([1], self.coder, num_splits=2)
+    values = range(8)
+    # multiple values with a single split.
+    self.check_read_with_initial_splits(values, self.coder, num_splits=1)
+    # multiple values with a single split with a large desired bundle size
+    self.check_read_with_initial_splits(values, self.coder, num_splits=0.5)
+    # multiple values with many splits.
+    self.check_read_with_initial_splits(values, self.coder, num_splits=3)
+    # multiple values with uneven sized splits.
+    self.check_read_with_initial_splits(values, self.coder, num_splits=4)
+    # multiple values with num splits equal to num values.
+    self.check_read_with_initial_splits(values, self.coder,
+                                        num_splits=len(values))
+    # multiple values with num splits greater than to num values.
+    self.check_read_with_initial_splits(values, self.coder, num_splits=30)
+
+  def check_read_with_initial_splits(self, values, coder, num_splits):
+    """A test that splits the given source into `num_splits` and verifies that
+    the data read from original source is equal to the union of the data read
+    from the split sources.
+    """
+    source = Create._create_source_from_iterable(values, coder)
+    desired_bundle_size = source._total_size / num_splits
+    splits = source.split(desired_bundle_size)
+    splits_info = [
+        (split.source, split.start_position, split.stop_position)
+        for split in splits]
+    source_test_utils.assertSourcesEqualReferenceSource((source, None, None),
+                                                        splits_info)
+
+  def test_create_source_read_reentrant(self):
+    source = Create._create_source_from_iterable(range(9), self.coder)
+    source_test_utils.assertReentrantReadsSucceed((source, None, None))
+
+  def test_create_source_read_reentrant_with_initial_splits(self):
+    source = Create._create_source_from_iterable(range(24), self.coder)
+    for split in source.split(desired_bundle_size=5):
+      source_test_utils.assertReentrantReadsSucceed((split.source,
+                                                     split.start_position,
+                                                     split.stop_position))
+
+  def test_create_source_dynamic_splitting(self):
+    # 2 values
+    source = Create._create_source_from_iterable(range(2), self.coder)
+    source_test_utils.assertSplitAtFractionExhaustive(source)
+    # Multiple values.
+    source = Create._create_source_from_iterable(range(11), self.coder)
+    source_test_utils.assertSplitAtFractionExhaustive(
+        source, perform_multi_threaded_test=True)
+
+  def test_create_source_progress(self):
+    num_values = 10
+    source = Create._create_source_from_iterable(range(num_values), self.coder)
+    splits = [split for split in source.split(desired_bundle_size=100)]
+    assert len(splits) == 1
+    fraction_consumed_report = []
+    split_points_report = []
+    range_tracker = splits[0].source.get_range_tracker(
+        splits[0].start_position, splits[0].stop_position)
+    for _ in splits[0].source.read(range_tracker):
+      fraction_consumed_report.append(range_tracker.fraction_consumed())
+      split_points_report.append(range_tracker.split_points())
+
+    self.assertEqual(
+        [float(i) / num_values for i in range(num_values)],
+        fraction_consumed_report)
+
+    expected_split_points_report = [
+        ((i - 1), num_values - (i - 1))
+        for i in range(1, num_values + 1)]
+
+    self.assertEqual(
+        expected_split_points_report, split_points_report)

http://git-wip-us.apache.org/repos/asf/beam/blob/33d4a02b/sdks/python/apache_beam/transforms/ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
index 5889ab5..cb1dd77 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -58,7 +58,7 @@ class PTransformTest(unittest.TestCase):
 
     pa = TestPipeline()
     res = pa | 'ALabel' >> beam.Create([1, 2])
-    self.assertEqual('AppliedPTransform(ALabel, Create)',
+    self.assertEqual('AppliedPTransform(ALabel/Read, Read)',
                      str(res.producer))
 
     pc = TestPipeline()
@@ -66,7 +66,7 @@ class PTransformTest(unittest.TestCase):
     inputs_tr = res.producer.transform
     inputs_tr.inputs = ('ci',)
     self.assertEqual(
-        """<Create(PTransform) label=[Create] inputs=('ci',)>""",
+        """<Read(PTransform) label=[Read] inputs=('ci',)>""",
         str(inputs_tr))
 
     pd = TestPipeline()
@@ -74,12 +74,12 @@ class PTransformTest(unittest.TestCase):
     side_tr = res.producer.transform
     side_tr.side_inputs = (4,)
     self.assertEqual(
-        '<Create(PTransform) label=[Create] side_inputs=(4,)>',
+        '<Read(PTransform) label=[Read] side_inputs=(4,)>',
         str(side_tr))
 
     inputs_tr.side_inputs = ('cs',)
     self.assertEqual(
-        """<Create(PTransform) label=[Create] """
+        """<Read(PTransform) label=[Read] """
         """inputs=('ci',) side_inputs=('cs',)>""",
         str(inputs_tr))
 
@@ -689,7 +689,7 @@ class PTransformLabelsTest(unittest.TestCase):
   def check_label(self, ptransform, expected_label):
     pipeline = TestPipeline()
     pipeline | 'Start' >> beam.Create([('a', 1)]) | ptransform
-    actual_label = sorted(pipeline.applied_labels - {'Start'})[0]
+    actual_label = sorted(pipeline.applied_labels - {'Start', 'Start/Read'})[0]
     self.assertEqual(expected_label, re.sub(r'\d{3,}', '#', actual_label))
 
   def test_default_labels(self):
@@ -707,7 +707,7 @@ class PTransformLabelsTest(unittest.TestCase):
 
     self.check_label(beam.ParDo(MyDoFn()), r'ParDo(MyDoFn)')
 
-  def test_lable_propogation(self):
+  def test_label_propogation(self):
     self.check_label('TestMap' >> beam.Map(len), r'TestMap')
     self.check_label('TestLambda' >> beam.Map(lambda x: x), r'TestLambda')
     self.check_label('TestFlatMap' >> beam.FlatMap(list), r'TestFlatMap')
@@ -1058,7 +1058,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     # aliased to Tuple[int, str].
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | (beam.Create(range(5))
+       | (beam.Create([[1], [2]])
           .with_output_types(typehints.Iterable[int]))
        | 'T' >> beam.GroupByKey())
 


[2/2] beam git commit: This closes #2446

Posted by ch...@apache.org.
This closes #2446


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fc100650
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fc100650
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fc100650

Branch: refs/heads/master
Commit: fc1006500d14afd01a3a34527c0ea9dc94ac09e9
Parents: 836e8e4 33d4a02
Author: Chamikara Jayalath <ch...@google.com>
Authored: Mon Apr 10 14:18:30 2017 -0700
Committer: Chamikara Jayalath <ch...@google.com>
Committed: Mon Apr 10 14:18:30 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/internal/pickler.py     |  22 ++--
 sdks/python/apache_beam/pipeline.py             |   4 +-
 sdks/python/apache_beam/pipeline_test.py        |  13 +-
 .../runners/dataflow/dataflow_runner.py         |  24 ----
 .../runners/dataflow/dataflow_runner_test.py    |   2 +-
 .../consumer_tracking_pipeline_visitor_test.py  |  22 ++--
 .../runners/direct/transform_evaluator.py       |  31 -----
 sdks/python/apache_beam/transforms/core.py      |  92 +++++++++++++-
 .../apache_beam/transforms/create_test.py       | 121 +++++++++++++++++++
 .../apache_beam/transforms/ptransform_test.py   |  14 +--
 10 files changed, 254 insertions(+), 91 deletions(-)
----------------------------------------------------------------------