You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2017/01/30 23:03:34 UTC
[27/50] [abbrv] beam git commit: Fix read/write display data
Fix read/write display data
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e4eda3c3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e4eda3c3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e4eda3c3
Branch: refs/heads/master
Commit: e4eda3c335b5767bdaf40b56b2dd5d67d7348f20
Parents: c6420df
Author: Pablo <pa...@google.com>
Authored: Fri Jan 13 11:25:36 2017 -0800
Committer: Robert Bradshaw <ro...@google.com>
Committed: Thu Jan 26 14:51:56 2017 -0800
----------------------------------------------------------------------
sdks/python/apache_beam/io/avroio_test.py | 6 ----
sdks/python/apache_beam/io/fileio.py | 10 ++++++-
sdks/python/apache_beam/io/fileio_test.py | 2 --
sdks/python/apache_beam/io/iobase.py | 38 +++++++++++++++++---------
sdks/python/apache_beam/io/textio.py | 25 +++++++++--------
sdks/python/apache_beam/io/textio_test.py | 30 --------------------
6 files changed, 47 insertions(+), 64 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/e4eda3c3/sdks/python/apache_beam/io/avroio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/avroio_test.py b/sdks/python/apache_beam/io/avroio_test.py
index aed468d..d2fb1d1 100644
--- a/sdks/python/apache_beam/io/avroio_test.py
+++ b/sdks/python/apache_beam/io/avroio_test.py
@@ -196,9 +196,6 @@ class TestAvro(unittest.TestCase):
'file_pattern',
'some_avro_sink-%(shard_num)05d-of-%(num_shards)05d.end'),
DisplayDataItemMatcher(
- 'shards',
- 0),
- DisplayDataItemMatcher(
'codec',
'null'),
DisplayDataItemMatcher(
@@ -219,9 +216,6 @@ class TestAvro(unittest.TestCase):
'file_pattern',
'some_avro_sink-%(shard_num)05d-of-%(num_shards)05d'),
DisplayDataItemMatcher(
- 'shards',
- 0),
- DisplayDataItemMatcher(
'codec',
'deflate'),
DisplayDataItemMatcher(
http://git-wip-us.apache.org/repos/asf/beam/blob/e4eda3c3/sdks/python/apache_beam/io/fileio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py
index 52f31c6..f67dca9 100644
--- a/sdks/python/apache_beam/io/fileio.py
+++ b/sdks/python/apache_beam/io/fileio.py
@@ -547,7 +547,8 @@ class FileSink(iobase.Sink):
def display_data(self):
return {'shards':
- DisplayDataItem(self.num_shards, label='Number of Shards'),
+ DisplayDataItem(self.num_shards,
+ label='Number of Shards').drop_if_default(0),
'compression':
DisplayDataItem(str(self.compression_type)),
'file_pattern':
@@ -787,6 +788,13 @@ class TextFileSink(FileSink):
'\'textio.WriteToText()\' instead of directly '
'instantiating a TextFileSink object.')
+ def display_data(self):
+ dd_parent = super(TextFileSink, self).display_data()
+ dd_parent['append_newline'] = DisplayDataItem(
+ self.append_trailing_newlines,
+ label='Append Trailing New Lines')
+ return dd_parent
+
def write_encoded_record(self, file_handle, encoded_value):
"""Writes a single encoded record."""
file_handle.write(encoded_value)
http://git-wip-us.apache.org/repos/asf/beam/blob/e4eda3c3/sdks/python/apache_beam/io/fileio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio_test.py b/sdks/python/apache_beam/io/fileio_test.py
index ad77dc5..6c33f53 100644
--- a/sdks/python/apache_beam/io/fileio_test.py
+++ b/sdks/python/apache_beam/io/fileio_test.py
@@ -142,8 +142,6 @@ class TestFileSink(unittest.TestCase):
dd = DisplayData.create_from(sink)
expected_items = [
DisplayDataItemMatcher(
- 'shards', 0),
- DisplayDataItemMatcher(
'compression', 'auto'),
DisplayDataItemMatcher(
'file_pattern',
http://git-wip-us.apache.org/repos/asf/beam/blob/e4eda3c3/sdks/python/apache_beam/io/iobase.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py
index 12af3b6..1266ed3 100644
--- a/sdks/python/apache_beam/io/iobase.py
+++ b/sdks/python/apache_beam/io/iobase.py
@@ -759,16 +759,15 @@ class WriteImpl(ptransform.PTransform):
write_result_coll = (keyed_pcoll
| core.WindowInto(window.GlobalWindows())
| core.GroupByKey()
- | 'WriteBundles' >> core.Map(
- _write_keyed_bundle, self.sink,
+ | 'WriteBundles' >> core.ParDo(
+ _WriteKeyedBundleDoFn(self.sink),
AsSingleton(init_result_coll)))
else:
min_shards = 1
write_result_coll = (pcoll
| 'WriteBundles' >>
- core.ParDo(
- _WriteBundleDoFn(), self.sink,
- AsSingleton(init_result_coll))
+ core.ParDo(_WriteBundleDoFn(self.sink),
+ AsSingleton(init_result_coll))
| 'Pair' >> core.Map(lambda x: (None, x))
| core.WindowInto(window.GlobalWindows())
| core.GroupByKey()
@@ -788,12 +787,16 @@ class _WriteBundleDoFn(core.DoFn):
Opens a writer at the first element and closes the writer at finish_bundle().
"""
- def __init__(self):
+ def __init__(self, sink):
self.writer = None
+ self.sink = sink
- def process(self, context, sink, init_result):
+ def display_data(self):
+ return {'sink_dd': self.sink}
+
+ def process(self, context, init_result):
if self.writer is None:
- self.writer = sink.open_writer(init_result, str(uuid.uuid4()))
+ self.writer = self.sink.open_writer(init_result, str(uuid.uuid4()))
self.writer.write(context.element)
def finish_bundle(self, context, *args, **kwargs):
@@ -801,11 +804,20 @@ class _WriteBundleDoFn(core.DoFn):
yield window.TimestampedValue(self.writer.close(), window.MAX_TIMESTAMP)
-def _write_keyed_bundle(bundle, sink, init_result):
- writer = sink.open_writer(init_result, str(uuid.uuid4()))
- for element in bundle[1]: # values
- writer.write(element)
- return window.TimestampedValue(writer.close(), window.MAX_TIMESTAMP)
+class _WriteKeyedBundleDoFn(core.DoFn):
+
+ def __init__(self, sink):
+ self.sink = sink
+
+ def display_data(self):
+ return {'sink_dd': self.sink}
+
+ def process(self, context, init_result):
+ bundle = context.element
+ writer = self.sink.open_writer(init_result, str(uuid.uuid4()))
+ for element in bundle[1]: # values
+ writer.write(element)
+ return [window.TimestampedValue(writer.close(), window.MAX_TIMESTAMP)]
def _finalize_write(_, sink, init_result, write_results, min_shards):
http://git-wip-us.apache.org/repos/asf/beam/blob/e4eda3c3/sdks/python/apache_beam/io/textio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/textio.py b/sdks/python/apache_beam/io/textio.py
index 4cdab12..0a593df 100644
--- a/sdks/python/apache_beam/io/textio.py
+++ b/sdks/python/apache_beam/io/textio.py
@@ -83,6 +83,19 @@ class _TextSource(filebasedsource.FileBasedSource):
self._coder = coder
self._buffer_size = buffer_size
+ def display_data(self):
+ parent_dd = super(_TextSource, self).display_data()
+ parent_dd['strip_newline'] = DisplayDataItem(
+ self._strip_trailing_newlines,
+ label='Strip Trailing New Lines')
+ parent_dd['buffer_size'] = DisplayDataItem(
+ self._buffer_size,
+ label='Buffer Size')
+ parent_dd['coder'] = DisplayDataItem(
+ self._coder.__class__,
+ label='Coder')
+ return parent_dd
+
def read_records(self, file_name, range_tracker):
start_offset = range_tracker.start_position()
read_buffer = _TextSource.ReadBuffer('', 0)
@@ -252,11 +265,6 @@ class ReadFromText(PTransform):
def expand(self, pvalue):
return pvalue.pipeline | Read(self._source)
- def display_data(self):
- return {'source_dd': self._source,
- 'strip_newline': DisplayDataItem(self._strip_trailing_newlines,
- label='Strip Trailing New Lines')}
-
class WriteToText(PTransform):
"""A PTransform for writing to text files."""
@@ -302,16 +310,9 @@ class WriteToText(PTransform):
compression.
"""
- self._append_trailing_newlines = append_trailing_newlines
self._sink = _TextSink(file_path_prefix, file_name_suffix,
append_trailing_newlines, num_shards,
shard_name_template, coder, compression_type)
def expand(self, pcoll):
return pcoll | Write(self._sink)
-
- def display_data(self):
- return {'sink_dd': self._sink,
- 'append_newline': DisplayDataItem(
- self._append_trailing_newlines,
- label='Append Trailing New Lines')}
http://git-wip-us.apache.org/repos/asf/beam/blob/e4eda3c3/sdks/python/apache_beam/io/textio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/textio_test.py b/sdks/python/apache_beam/io/textio_test.py
index 4b85584..07c6d9c 100644
--- a/sdks/python/apache_beam/io/textio_test.py
+++ b/sdks/python/apache_beam/io/textio_test.py
@@ -25,8 +25,6 @@ import os
import tempfile
import unittest
-import hamcrest as hc
-
import apache_beam as beam
import apache_beam.io.source_test_utils as source_test_utils
@@ -45,9 +43,6 @@ from apache_beam.io.fileio import CompressionTypes
from apache_beam.test_pipeline import TestPipeline
-from apache_beam.transforms.display import DisplayData
-from apache_beam.transforms.display_test import DisplayDataItemMatcher
-
from apache_beam.transforms.util import assert_that
from apache_beam.transforms.util import equal_to
@@ -294,15 +289,6 @@ class TextSourceTest(unittest.TestCase):
splits[0].source, splits[0].start_position, splits[0].stop_position,
perform_multi_threaded_test=False)
- def test_read_display_data(self):
- read = ReadFromText('prefix', validate=False)
- dd = DisplayData.create_from(read)
- expected_items = [
- DisplayDataItemMatcher('compression', 'auto'),
- DisplayDataItemMatcher('file_pattern', 'prefix'),
- DisplayDataItemMatcher('strip_newline', True)]
- hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
-
def test_dataflow_single_file(self):
file_name, expected_data = write_data(5)
assert len(expected_data) == 5
@@ -506,22 +492,6 @@ class TextSinkTest(unittest.TestCase):
with gzip.GzipFile(self.path, 'r') as f:
self.assertEqual(f.read().splitlines(), [])
- def test_write_display_data(self):
- write = WriteToText('prefix')
- dd = DisplayData.create_from(write)
- expected_items = [
- DisplayDataItemMatcher(
- 'append_newline', True),
- DisplayDataItemMatcher(
- 'compression', 'auto'),
- DisplayDataItemMatcher(
- 'shards', 0),
- DisplayDataItemMatcher(
- 'file_pattern',
- '{}{}'.format('prefix',
- '-%(shard_num)05d-of-%(num_shards)05d'))]
- hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
-
def test_write_dataflow(self):
pipeline = TestPipeline()
pcoll = pipeline | beam.core.Create(self.lines)