You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/01/19 21:26:23 UTC
[2/4] beam git commit: update labels in iobase
update labels in iobase
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0db60e40
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0db60e40
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0db60e40
Branch: refs/heads/python-sdk
Commit: 0db60e40dc5a1e1983504e4535eab0140eb97c0e
Parents: 03e18f0
Author: Ahmet Altay <al...@google.com>
Authored: Wed Jan 18 18:06:10 2017 -0800
Committer: Ahmet Altay <al...@google.com>
Committed: Wed Jan 18 18:06:10 2017 -0800
----------------------------------------------------------------------
sdks/python/apache_beam/io/iobase.py | 22 ++++++++++++----------
1 file changed, 12 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/0db60e40/sdks/python/apache_beam/io/iobase.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py
index 8fb5238..93421a6 100644
--- a/sdks/python/apache_beam/io/iobase.py
+++ b/sdks/python/apache_beam/io/iobase.py
@@ -748,8 +748,8 @@ class WriteImpl(ptransform.PTransform):
def expand(self, pcoll):
do_once = pcoll.pipeline | 'DoOnce' >> core.Create([None])
- init_result_coll = do_once | core.Map(
- 'initialize_write', lambda _, sink: sink.initialize_write(), self.sink)
+ init_result_coll = do_once | 'initialize_write' >> core.Map(
+ lambda _, sink: sink.initialize_write(), self.sink)
if getattr(self.sink, 'num_shards', 0):
min_shards = self.sink.num_shards
if min_shards == 1:
@@ -759,18 +759,20 @@ class WriteImpl(ptransform.PTransform):
write_result_coll = (keyed_pcoll
| core.WindowInto(window.GlobalWindows())
| core.GroupByKey()
- | core.Map('write_bundles',
- _write_keyed_bundle, self.sink,
- AsSingleton(init_result_coll)))
+ | 'write_bundles' >> core.Map(
+ _write_keyed_bundle, self.sink,
+ AsSingleton(init_result_coll)))
else:
min_shards = 1
- write_result_coll = (pcoll | core.ParDo('write_bundles',
- _WriteBundleDoFn(), self.sink,
- AsSingleton(init_result_coll))
- | core.Map('pair', lambda x: (None, x))
+ write_result_coll = (pcoll
+ | 'write_bundles' >>
+ core.ParDo(
+ _WriteBundleDoFn(), self.sink,
+ AsSingleton(init_result_coll))
+ | 'pair' >> core.Map(lambda x: (None, x))
| core.WindowInto(window.GlobalWindows())
| core.GroupByKey()
- | core.FlatMap('extract', lambda x: x[1]))
+ | 'extract' >> core.FlatMap(lambda x: x[1]))
return do_once | core.FlatMap(
'finalize_write',
_finalize_write,