You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/01/19 21:26:23 UTC

[2/4] beam git commit: update labels in iobase

update labels in iobase


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0db60e40
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0db60e40
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0db60e40

Branch: refs/heads/python-sdk
Commit: 0db60e40dc5a1e1983504e4535eab0140eb97c0e
Parents: 03e18f0
Author: Ahmet Altay <al...@google.com>
Authored: Wed Jan 18 18:06:10 2017 -0800
Committer: Ahmet Altay <al...@google.com>
Committed: Wed Jan 18 18:06:10 2017 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/io/iobase.py | 22 ++++++++++++----------
 1 file changed, 12 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/0db60e40/sdks/python/apache_beam/io/iobase.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py
index 8fb5238..93421a6 100644
--- a/sdks/python/apache_beam/io/iobase.py
+++ b/sdks/python/apache_beam/io/iobase.py
@@ -748,8 +748,8 @@ class WriteImpl(ptransform.PTransform):
 
   def expand(self, pcoll):
     do_once = pcoll.pipeline | 'DoOnce' >> core.Create([None])
-    init_result_coll = do_once | core.Map(
-        'initialize_write', lambda _, sink: sink.initialize_write(), self.sink)
+    init_result_coll = do_once | 'initialize_write' >> core.Map(
+        lambda _, sink: sink.initialize_write(), self.sink)
     if getattr(self.sink, 'num_shards', 0):
       min_shards = self.sink.num_shards
       if min_shards == 1:
@@ -759,18 +759,20 @@ class WriteImpl(ptransform.PTransform):
       write_result_coll = (keyed_pcoll
                            | core.WindowInto(window.GlobalWindows())
                            | core.GroupByKey()
-                           | core.Map('write_bundles',
-                                      _write_keyed_bundle, self.sink,
-                                      AsSingleton(init_result_coll)))
+                           | 'write_bundles' >> core.Map(
+                               _write_keyed_bundle, self.sink,
+                               AsSingleton(init_result_coll)))
     else:
       min_shards = 1
-      write_result_coll = (pcoll | core.ParDo('write_bundles',
-                                              _WriteBundleDoFn(), self.sink,
-                                              AsSingleton(init_result_coll))
-                           | core.Map('pair', lambda x: (None, x))
+      write_result_coll = (pcoll
+                           | 'write_bundles' >>
+                           core.ParDo(
+                               _WriteBundleDoFn(), self.sink,
+                               AsSingleton(init_result_coll))
+                           | 'pair' >> core.Map(lambda x: (None, x))
                            | core.WindowInto(window.GlobalWindows())
                            | core.GroupByKey()
-                           | core.FlatMap('extract', lambda x: x[1]))
+                           | 'extract' >> core.FlatMap(lambda x: x[1]))
     return do_once | core.FlatMap(
         'finalize_write',
         _finalize_write,