You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/04/20 16:00:12 UTC
[1/2] beam git commit: Remove vestigial Read and Write from core.py
Repository: beam
Updated Branches:
refs/heads/master e26bfbe0a -> 4121ec490
Remove vestigial Read and Write from core.py
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c25380be
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c25380be
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c25380be
Branch: refs/heads/master
Commit: c25380be883405862e3620cecf3aa6b00945ec4b
Parents: e26bfbe
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Tue Apr 18 16:23:51 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Apr 20 08:59:51 2017 -0700
----------------------------------------------------------------------
.../examples/cookbook/bigquery_side_input.py | 2 +-
.../apache_beam/examples/cookbook/filters.py | 2 +-
sdks/python/apache_beam/io/concat_source_test.py | 2 +-
.../python/apache_beam/io/filebasedsource_test.py | 18 +++++++++---------
sdks/python/apache_beam/io/sources_test.py | 2 +-
sdks/python/apache_beam/io/tfrecordio_test.py | 8 ++++----
sdks/python/apache_beam/pipeline_test.py | 2 +-
sdks/python/apache_beam/transforms/core.py | 13 ++-----------
8 files changed, 20 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/c25380be/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
index 486cc88..f68c95d 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
@@ -101,7 +101,7 @@ def run(argv=None):
pcoll_corpus = p | 'read corpus' >> beam.io.Read(
beam.io.BigQuerySource(query=query_corpus))
- pcoll_word = p | 'read_words' >> beam.Read(
+ pcoll_word = p | 'read_words' >> beam.io.Read(
beam.io.BigQuerySource(query=query_word))
pcoll_ignore_corpus = p | 'create_ignore_corpus' >> beam.Create(
[ignore_corpus])
http://git-wip-us.apache.org/repos/asf/beam/blob/c25380be/sdks/python/apache_beam/examples/cookbook/filters.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/filters.py b/sdks/python/apache_beam/examples/cookbook/filters.py
index d13d823..374001c 100644
--- a/sdks/python/apache_beam/examples/cookbook/filters.py
+++ b/sdks/python/apache_beam/examples/cookbook/filters.py
@@ -88,7 +88,7 @@ def run(argv=None):
p = beam.Pipeline(argv=pipeline_args)
- input_data = p | beam.Read(beam.io.BigQuerySource(known_args.input))
+ input_data = p | beam.io.Read(beam.io.BigQuerySource(known_args.input))
# pylint: disable=expression-not-assigned
(filter_cold_days(input_data, known_args.month_filter)
http://git-wip-us.apache.org/repos/asf/beam/blob/c25380be/sdks/python/apache_beam/io/concat_source_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/concat_source_test.py b/sdks/python/apache_beam/io/concat_source_test.py
index 77d2647..7c16e63 100644
--- a/sdks/python/apache_beam/io/concat_source_test.py
+++ b/sdks/python/apache_beam/io/concat_source_test.py
@@ -214,7 +214,7 @@ class ConcatSourceTest(unittest.TestCase):
RangeSource(100, 1000),
])
pipeline = TestPipeline()
- pcoll = pipeline | beam.Read(source)
+ pcoll = pipeline | beam.io.Read(source)
assert_that(pcoll, equal_to(range(1000)))
pipeline.run()
http://git-wip-us.apache.org/repos/asf/beam/blob/c25380be/sdks/python/apache_beam/io/filebasedsource_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource_test.py b/sdks/python/apache_beam/io/filebasedsource_test.py
index e681f26..5318c4d 100644
--- a/sdks/python/apache_beam/io/filebasedsource_test.py
+++ b/sdks/python/apache_beam/io/filebasedsource_test.py
@@ -389,7 +389,7 @@ class TestFileBasedSource(unittest.TestCase):
def _run_source_test(self, pattern, expected_data, splittable=True):
pipeline = TestPipeline()
- pcoll = pipeline | 'Read' >> beam.Read(LineSource(
+ pcoll = pipeline | 'Read' >> beam.io.Read(LineSource(
pattern, splittable=splittable))
assert_that(pcoll, equal_to(expected_data))
pipeline.run()
@@ -429,7 +429,7 @@ class TestFileBasedSource(unittest.TestCase):
f.write('\n'.join(lines))
pipeline = TestPipeline()
- pcoll = pipeline | 'Read' >> beam.Read(LineSource(
+ pcoll = pipeline | 'Read' >> beam.io.Read(LineSource(
filename,
splittable=False,
compression_type=CompressionTypes.BZIP2))
@@ -444,7 +444,7 @@ class TestFileBasedSource(unittest.TestCase):
f.write('\n'.join(lines))
pipeline = TestPipeline()
- pcoll = pipeline | 'Read' >> beam.Read(LineSource(
+ pcoll = pipeline | 'Read' >> beam.io.Read(LineSource(
filename,
splittable=False,
compression_type=CompressionTypes.GZIP))
@@ -462,7 +462,7 @@ class TestFileBasedSource(unittest.TestCase):
compressobj.compress('\n'.join(c)) + compressobj.flush())
file_pattern = write_prepared_pattern(compressed_chunks)
pipeline = TestPipeline()
- pcoll = pipeline | 'Read' >> beam.Read(LineSource(
+ pcoll = pipeline | 'Read' >> beam.io.Read(LineSource(
file_pattern,
splittable=False,
compression_type=CompressionTypes.BZIP2))
@@ -481,7 +481,7 @@ class TestFileBasedSource(unittest.TestCase):
compressed_chunks.append(out.getvalue())
file_pattern = write_prepared_pattern(compressed_chunks)
pipeline = TestPipeline()
- pcoll = pipeline | 'Read' >> beam.Read(LineSource(
+ pcoll = pipeline | 'Read' >> beam.io.Read(LineSource(
file_pattern,
splittable=False,
compression_type=CompressionTypes.GZIP))
@@ -496,7 +496,7 @@ class TestFileBasedSource(unittest.TestCase):
f.write('\n'.join(lines))
pipeline = TestPipeline()
- pcoll = pipeline | 'Read' >> beam.Read(LineSource(
+ pcoll = pipeline | 'Read' >> beam.io.Read(LineSource(
filename,
compression_type=CompressionTypes.AUTO))
assert_that(pcoll, equal_to(lines))
@@ -510,7 +510,7 @@ class TestFileBasedSource(unittest.TestCase):
f.write('\n'.join(lines))
pipeline = TestPipeline()
- pcoll = pipeline | 'Read' >> beam.Read(LineSource(
+ pcoll = pipeline | 'Read' >> beam.io.Read(LineSource(
filename,
compression_type=CompressionTypes.AUTO))
assert_that(pcoll, equal_to(lines))
@@ -529,7 +529,7 @@ class TestFileBasedSource(unittest.TestCase):
file_pattern = write_prepared_pattern(
compressed_chunks, suffixes=['.gz']*len(chunks))
pipeline = TestPipeline()
- pcoll = pipeline | 'Read' >> beam.Read(LineSource(
+ pcoll = pipeline | 'Read' >> beam.io.Read(LineSource(
file_pattern,
compression_type=CompressionTypes.AUTO))
assert_that(pcoll, equal_to(lines))
@@ -551,7 +551,7 @@ class TestFileBasedSource(unittest.TestCase):
file_pattern = write_prepared_pattern(chunks_to_write,
suffixes=(['.gz', '']*3))
pipeline = TestPipeline()
- pcoll = pipeline | 'Read' >> beam.Read(LineSource(
+ pcoll = pipeline | 'Read' >> beam.io.Read(LineSource(
file_pattern,
compression_type=CompressionTypes.AUTO))
assert_that(pcoll, equal_to(lines))
http://git-wip-us.apache.org/repos/asf/beam/blob/c25380be/sdks/python/apache_beam/io/sources_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/sources_test.py b/sdks/python/apache_beam/io/sources_test.py
index dc0fd54..3f92756 100644
--- a/sdks/python/apache_beam/io/sources_test.py
+++ b/sdks/python/apache_beam/io/sources_test.py
@@ -100,7 +100,7 @@ class SourcesTest(unittest.TestCase):
def test_run_direct(self):
file_name = self._create_temp_file('aaaa\nbbbb\ncccc\ndddd')
pipeline = TestPipeline()
- pcoll = pipeline | beam.Read(LineSource(file_name))
+ pcoll = pipeline | beam.io.Read(LineSource(file_name))
assert_that(pcoll, equal_to(['aaaa', 'bbbb', 'cccc', 'dddd']))
pipeline.run()
http://git-wip-us.apache.org/repos/asf/beam/blob/c25380be/sdks/python/apache_beam/io/tfrecordio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/tfrecordio_test.py b/sdks/python/apache_beam/io/tfrecordio_test.py
index 49f9639..d8c706e 100644
--- a/sdks/python/apache_beam/io/tfrecordio_test.py
+++ b/sdks/python/apache_beam/io/tfrecordio_test.py
@@ -248,7 +248,7 @@ class TestTFRecordSource(_TestCaseWithTempDirCleanUp):
self._write_file(path, FOO_RECORD_BASE64)
with TestPipeline() as p:
result = (p
- | beam.Read(
+ | beam.io.Read(
_TFRecordSource(
path,
coder=coders.BytesCoder(),
@@ -261,7 +261,7 @@ class TestTFRecordSource(_TestCaseWithTempDirCleanUp):
self._write_file(path, FOO_BAR_RECORD_BASE64)
with TestPipeline() as p:
result = (p
- | beam.Read(
+ | beam.io.Read(
_TFRecordSource(
path,
coder=coders.BytesCoder(),
@@ -274,7 +274,7 @@ class TestTFRecordSource(_TestCaseWithTempDirCleanUp):
self._write_file_gzip(path, FOO_BAR_RECORD_BASE64)
with TestPipeline() as p:
result = (p
- | beam.Read(
+ | beam.io.Read(
_TFRecordSource(
path,
coder=coders.BytesCoder(),
@@ -287,7 +287,7 @@ class TestTFRecordSource(_TestCaseWithTempDirCleanUp):
self._write_file_gzip(path, FOO_BAR_RECORD_BASE64)
with TestPipeline() as p:
result = (p
- | beam.Read(
+ | beam.io.Read(
_TFRecordSource(
path,
coder=coders.BytesCoder(),
http://git-wip-us.apache.org/repos/asf/beam/blob/c25380be/sdks/python/apache_beam/pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py
index 6314609..05503bd 100644
--- a/sdks/python/apache_beam/pipeline_test.py
+++ b/sdks/python/apache_beam/pipeline_test.py
@@ -25,6 +25,7 @@ import unittest
# from nose.plugins.attrib import attr
import apache_beam as beam
+from apache_beam.io import Read
from apache_beam.metrics import Metrics
from apache_beam.pipeline import Pipeline
from apache_beam.pipeline import PipelineOptions
@@ -39,7 +40,6 @@ from apache_beam.transforms import Map
from apache_beam.transforms import DoFn
from apache_beam.transforms import ParDo
from apache_beam.transforms import PTransform
-from apache_beam.transforms import Read
from apache_beam.transforms import WindowInto
from apache_beam.transforms.util import assert_that
from apache_beam.transforms.util import equal_to
http://git-wip-us.apache.org/repos/asf/beam/blob/c25380be/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index 9f66c39..4709056 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -1364,13 +1364,14 @@ class Create(PTransform):
return Union[[trivial_inference.instance_to_type(v) for v in self.value]]
def expand(self, pbegin):
+ from apache_beam.io import iobase
assert isinstance(pbegin, pvalue.PBegin)
self.pipeline = pbegin.pipeline
ouput_type = (self.get_type_hints().simple_output_type(self.label) or
self.infer_output_type(None))
coder = typecoders.registry.get_coder(ouput_type)
source = self._create_source_from_iterable(self.value, coder)
- return pbegin.pipeline | Read(source).with_output_types(ouput_type)
+ return pbegin.pipeline | iobase.Read(source).with_output_types(ouput_type)
def get_windowing(self, unused_inputs):
return Windowing(GlobalWindows())
@@ -1458,13 +1459,3 @@ class Create(PTransform):
return self._total_size
return _CreateSource(serialized_values, coder)
-
-
-def Read(*args, **kwargs):
- from apache_beam import io
- return io.Read(*args, **kwargs)
-
-
-def Write(*args, **kwargs):
- from apache_beam import io
- return io.Write(*args, **kwargs)
[2/2] beam git commit: Closes #2586
Posted by ro...@apache.org.
Closes #2586
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4121ec49
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4121ec49
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4121ec49
Branch: refs/heads/master
Commit: 4121ec490f2b3714cb668cc5811ba0bf88d610e5
Parents: e26bfbe c25380b
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Thu Apr 20 08:59:52 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Apr 20 08:59:52 2017 -0700
----------------------------------------------------------------------
.../examples/cookbook/bigquery_side_input.py | 2 +-
.../apache_beam/examples/cookbook/filters.py | 2 +-
sdks/python/apache_beam/io/concat_source_test.py | 2 +-
.../python/apache_beam/io/filebasedsource_test.py | 18 +++++++++---------
sdks/python/apache_beam/io/sources_test.py | 2 +-
sdks/python/apache_beam/io/tfrecordio_test.py | 8 ++++----
sdks/python/apache_beam/pipeline_test.py | 2 +-
sdks/python/apache_beam/transforms/core.py | 13 ++-----------
8 files changed, 20 insertions(+), 29 deletions(-)
----------------------------------------------------------------------