You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/10/10 19:05:11 UTC

[2/6] incubator-beam git commit: Remove direct references to iobase.Native*

Remove direct references to iobase.Native*


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/60e271b5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/60e271b5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/60e271b5

Branch: refs/heads/python-sdk
Commit: 60e271b5fe6e42f241b20554ddafd410e87735eb
Parents: 807013a
Author: Robert Bradshaw <ro...@google.com>
Authored: Thu Oct 6 17:50:41 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Mon Oct 10 10:30:00 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/bigquery.py    |  9 ++--
 sdks/python/apache_beam/io/fileio.py      | 17 +++---
 sdks/python/apache_beam/io/fileio_test.py | 73 ++++++++++++++------------
 sdks/python/apache_beam/io/iobase.py      |  8 +--
 sdks/python/apache_beam/io/pubsub.py      |  5 +-
 sdks/python/apache_beam/pipeline_test.py  |  2 +-
 6 files changed, 60 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/60e271b5/sdks/python/apache_beam/io/bigquery.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/bigquery.py b/sdks/python/apache_beam/io/bigquery.py
index 5508eaa..60d85df 100644
--- a/sdks/python/apache_beam/io/bigquery.py
+++ b/sdks/python/apache_beam/io/bigquery.py
@@ -117,6 +117,7 @@ from apache_beam.internal import auth
 from apache_beam.internal.json_value import from_json_value
 from apache_beam.internal.json_value import to_json_value
 from apache_beam.io import iobase
+from apache_beam.runners.dataflow.native_io import iobase as dataflow_io
 from apache_beam.utils import retry
 from apache_beam.utils.options import GoogleCloudOptions
 
@@ -280,7 +281,7 @@ def _parse_table_reference(table, dataset=None, project=None):
 # BigQuerySource, BigQuerySink.
 
 
-class BigQuerySource(iobase.NativeSource):
+class BigQuerySource(dataflow_io.NativeSource):
   """A source based on a BigQuery table."""
 
   def __init__(self, table=None, dataset=None, project=None, query=None,
@@ -345,7 +346,7 @@ class BigQuerySource(iobase.NativeSource):
         source=self, test_bigquery_client=test_bigquery_client)
 
 
-class BigQuerySink(iobase.NativeSink):
+class BigQuerySink(dataflow_io.NativeSink):
   """A sink based on a BigQuery table."""
 
   def __init__(self, table, dataset=None, project=None, schema=None,
@@ -459,7 +460,7 @@ class BigQuerySink(iobase.NativeSink):
 # BigQueryReader, BigQueryWriter.
 
 
-class BigQueryReader(iobase.NativeSourceReader):
+class BigQueryReader(dataflow_io.NativeSourceReader):
   """A reader for a BigQuery source."""
 
   def __init__(self, source, test_bigquery_client=None):
@@ -516,7 +517,7 @@ class BigQueryReader(iobase.NativeSourceReader):
           yield row
 
 
-class BigQueryWriter(iobase.NativeSinkWriter):
+class BigQueryWriter(dataflow_io.NativeSinkWriter):
   """The sink writer for a BigQuerySink."""
 
   def __init__(self, sink, test_bigquery_client=None, buffer_size=None):

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/60e271b5/sdks/python/apache_beam/io/fileio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py
index 54ca891..c248f12 100644
--- a/sdks/python/apache_beam/io/fileio.py
+++ b/sdks/python/apache_beam/io/fileio.py
@@ -32,6 +32,7 @@ import weakref
 from apache_beam import coders
 from apache_beam.io import iobase
 from apache_beam.io import range_trackers
+from apache_beam.runners.dataflow.native_io import iobase as dataflow_io
 
 __all__ = ['TextFileSource', 'TextFileSink']
 
@@ -106,7 +107,7 @@ class CompressionTypes(object):
     return cls.UNCOMPRESSED
 
 
-class NativeFileSource(iobase.NativeSource):
+class NativeFileSource(dataflow_io.NativeSource):
   """A source implemented by Dataflow service from a GCS or local file or files.
 
   This class is to be only inherited by sources natively implemented by Cloud
@@ -185,7 +186,7 @@ class NativeFileSource(iobase.NativeSource):
     return NativeFileSourceReader(self)
 
 
-class NativeFileSourceReader(iobase.NativeSourceReader,
+class NativeFileSourceReader(dataflow_io.NativeSourceReader,
                              coders.observable.ObservableMixin):
   """The source reader for a NativeFileSource.
 
@@ -302,7 +303,7 @@ class NativeFileSourceReader(iobase.NativeSourceReader,
     raise NotImplementedError
 
   def get_progress(self):
-    return iobase.ReaderProgress(position=iobase.ReaderPosition(
+    return dataflow_io.ReaderProgress(position=dataflow_io.ReaderPosition(
         byte_offset=self.range_tracker.last_record_start))
 
   def request_dynamic_split(self, dynamic_split_request):
@@ -328,7 +329,7 @@ class NativeFileSourceReader(iobase.NativeSourceReader,
               'of work to be completed is out of the valid range (0, '
               '1). Requested: %r', dynamic_split_request)
           return
-        split_position = iobase.ReaderPosition()
+        split_position = dataflow_io.ReaderPosition()
         split_position.byte_offset = (
             self.range_tracker.position_at_fraction(percent_complete))
       else:
@@ -339,7 +340,7 @@ class NativeFileSourceReader(iobase.NativeSourceReader,
         return
 
     if self.range_tracker.try_split(split_position.byte_offset):
-      return iobase.DynamicSplitResultWithPosition(split_position)
+      return dataflow_io.DynamicSplitResultWithPosition(split_position)
     else:
       return
 
@@ -964,7 +965,7 @@ class TextFileSink(FileSink):
       file_handle.write('\n')
 
 
-class NativeFileSink(iobase.NativeSink):
+class NativeFileSink(dataflow_io.NativeSink):
   """A sink implemented by Dataflow service to a GCS or local file or files.
 
   This class is to be only inherited by sinks natively implemented by Cloud
@@ -1021,7 +1022,7 @@ class NativeFileSink(iobase.NativeSink):
             self.compression_type == other.compression_type)
 
 
-class NativeFileSinkWriter(iobase.NativeSinkWriter):
+class NativeFileSinkWriter(dataflow_io.NativeSinkWriter):
   """The sink writer for a NativeFileSink.
 
   This class is to be only inherited by sink writers natively implemented by
@@ -1127,7 +1128,7 @@ class TextFileReader(NativeFileSourceReader):
       yield False, self.source.coder.decode(line), delta_offset
 
 
-class TextMultiFileReader(iobase.NativeSourceReader):
+class TextMultiFileReader(dataflow_io.NativeSourceReader):
   """A reader for a multi-file text source."""
 
   def __init__(self, source):

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/60e271b5/sdks/python/apache_beam/io/fileio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio_test.py b/sdks/python/apache_beam/io/fileio_test.py
index 7da0149..77d6c45 100644
--- a/sdks/python/apache_beam/io/fileio_test.py
+++ b/sdks/python/apache_beam/io/fileio_test.py
@@ -30,6 +30,7 @@ import apache_beam as beam
 from apache_beam import coders
 from apache_beam.io import fileio
 from apache_beam.io import iobase
+from apache_beam.runners.dataflow.native_io import iobase as dataflow_io
 
 # TODO: Add tests for file patterns (ie not just individual files) for both
 # uncompressed
@@ -316,7 +317,7 @@ class TestTextFileSource(unittest.TestCase):
     else:
       self.assertIsNotNone(actual_response.stop_position)
       self.assertIsInstance(actual_response.stop_position,
-                            iobase.ReaderPosition)
+                            dataflow_io.ReaderPosition)
       self.assertIsNotNone(actual_response.stop_position.byte_offset)
       self.assertEqual(expected_response.stop_position.byte_offset,
                        actual_response.stop_position.byte_offset)
@@ -338,8 +339,8 @@ class TestTextFileSource(unittest.TestCase):
       for percent_complete in percents_complete:
         self.try_splitting_reader_at(
             reader,
-            iobase.DynamicSplitRequest(
-                iobase.ReaderProgress(percent_complete=percent_complete)),
+            dataflow_io.DynamicSplitRequest(
+                dataflow_io.ReaderProgress(percent_complete=percent_complete)),
             None)
 
       # Cursor passed beginning of file.
@@ -349,8 +350,8 @@ class TestTextFileSource(unittest.TestCase):
       for percent_complete in percents_complete:
         self.try_splitting_reader_at(
             reader,
-            iobase.DynamicSplitRequest(
-                iobase.ReaderProgress(percent_complete=percent_complete)),
+            dataflow_io.DynamicSplitRequest(
+                dataflow_io.ReaderProgress(percent_complete=percent_complete)),
             None)
 
   def test_zlib_file_unsplittable(self):
@@ -368,8 +369,8 @@ class TestTextFileSource(unittest.TestCase):
       for percent_complete in percents_complete:
         self.try_splitting_reader_at(
             reader,
-            iobase.DynamicSplitRequest(
-                iobase.ReaderProgress(percent_complete=percent_complete)),
+            dataflow_io.DynamicSplitRequest(
+                dataflow_io.ReaderProgress(percent_complete=percent_complete)),
             None)
 
       # Cursor passed beginning of file.
@@ -379,8 +380,8 @@ class TestTextFileSource(unittest.TestCase):
       for percent_complete in percents_complete:
         self.try_splitting_reader_at(
             reader,
-            iobase.DynamicSplitRequest(
-                iobase.ReaderProgress(percent_complete=percent_complete)),
+            dataflow_io.DynamicSplitRequest(
+                dataflow_io.ReaderProgress(percent_complete=percent_complete)),
             None)
 
   def test_update_stop_position_for_percent_complete(self):
@@ -397,33 +398,35 @@ class TestTextFileSource(unittest.TestCase):
       # Splitting at end of the range should be unsuccessful
       self.try_splitting_reader_at(
           reader,
-          iobase.DynamicSplitRequest(iobase.ReaderProgress(percent_complete=0)),
+          dataflow_io.DynamicSplitRequest(
+              dataflow_io.ReaderProgress(percent_complete=0)),
           None)
       self.try_splitting_reader_at(
           reader,
-          iobase.DynamicSplitRequest(iobase.ReaderProgress(percent_complete=1)),
+          dataflow_io.DynamicSplitRequest(
+              dataflow_io.ReaderProgress(percent_complete=1)),
           None)
 
       # Splitting at positions on or before start offset of the last record
       self.try_splitting_reader_at(
           reader,
-          iobase.DynamicSplitRequest(
-              iobase.ReaderProgress(percent_complete=0.2)),
+          dataflow_io.DynamicSplitRequest(
+              dataflow_io.ReaderProgress(percent_complete=0.2)),
           None)
       self.try_splitting_reader_at(
           reader,
-          iobase.DynamicSplitRequest(
-              iobase.ReaderProgress(percent_complete=0.4)),
+          dataflow_io.DynamicSplitRequest(
+              dataflow_io.ReaderProgress(percent_complete=0.4)),
           None)
 
       # Splitting at a position after the start offset of the last record should
       # be successful
       self.try_splitting_reader_at(
           reader,
-          iobase.DynamicSplitRequest(
-              iobase.ReaderProgress(percent_complete=0.6)),
-          iobase.DynamicSplitResultWithPosition(
-              iobase.ReaderPosition(byte_offset=15)))
+          dataflow_io.DynamicSplitRequest(
+              dataflow_io.ReaderProgress(percent_complete=0.6)),
+          dataflow_io.DynamicSplitResultWithPosition(
+              dataflow_io.ReaderPosition(byte_offset=15)))
 
   def test_update_stop_position_percent_complete_for_position(self):
     lines = ['aaaa', 'bbbb', 'cccc', 'dddd', 'eeee']
@@ -439,28 +442,28 @@ class TestTextFileSource(unittest.TestCase):
       # Splitting at end of the range should be unsuccessful
       self.try_splitting_reader_at(
           reader,
-          iobase.DynamicSplitRequest(
-              iobase.ReaderProgress(position=iobase.ReaderPosition(
+          dataflow_io.DynamicSplitRequest(
+              dataflow_io.ReaderProgress(position=dataflow_io.ReaderPosition(
                   byte_offset=0))),
           None)
       self.try_splitting_reader_at(
           reader,
-          iobase.DynamicSplitRequest(
-              iobase.ReaderProgress(position=iobase.ReaderPosition(
+          dataflow_io.DynamicSplitRequest(
+              dataflow_io.ReaderProgress(position=dataflow_io.ReaderPosition(
                   byte_offset=25))),
           None)
 
       # Splitting at positions on or before start offset of the last record
       self.try_splitting_reader_at(
           reader,
-          iobase.DynamicSplitRequest(
-              iobase.ReaderProgress(position=iobase.ReaderPosition(
+          dataflow_io.DynamicSplitRequest(
+              dataflow_io.ReaderProgress(position=dataflow_io.ReaderPosition(
                   byte_offset=5))),
           None)
       self.try_splitting_reader_at(
           reader,
-          iobase.DynamicSplitRequest(
-              iobase.ReaderProgress(position=iobase.ReaderPosition(
+          dataflow_io.DynamicSplitRequest(
+              dataflow_io.ReaderProgress(position=dataflow_io.ReaderPosition(
                   byte_offset=10))),
           None)
 
@@ -468,11 +471,11 @@ class TestTextFileSource(unittest.TestCase):
       # be successful
       self.try_splitting_reader_at(
           reader,
-          iobase.DynamicSplitRequest(
-              iobase.ReaderProgress(position=iobase.ReaderPosition(
+          dataflow_io.DynamicSplitRequest(
+              dataflow_io.ReaderProgress(position=dataflow_io.ReaderPosition(
                   byte_offset=15))),
-          iobase.DynamicSplitResultWithPosition(
-              iobase.ReaderPosition(byte_offset=15)))
+          dataflow_io.DynamicSplitResultWithPosition(
+              dataflow_io.ReaderPosition(byte_offset=15)))
 
   def run_update_stop_position_exhaustive(self, lines, newline):
     """An exhaustive test for dynamic splitting.
@@ -551,13 +554,13 @@ class TestTextFileSource(unittest.TestCase):
       elif records_to_read == 0:
         expected_split_response = None  # unstarted
       else:
-        expected_split_response = iobase.DynamicSplitResultWithPosition(
-            stop_position=iobase.ReaderPosition(byte_offset=stop_offset))
+        expected_split_response = dataflow_io.DynamicSplitResultWithPosition(
+            stop_position=dataflow_io.ReaderPosition(byte_offset=stop_offset))
 
       split_response = self.try_splitting_reader_at(
           reader,
-          iobase.DynamicSplitRequest(progress=iobase.ReaderProgress(
-              iobase.ReaderPosition(byte_offset=stop_offset))),
+          dataflow_io.DynamicSplitRequest(progress=dataflow_io.ReaderProgress(
+              dataflow_io.ReaderPosition(byte_offset=stop_offset))),
           expected_split_response)
 
       # Reading remaining records from the updated reader.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/60e271b5/sdks/python/apache_beam/io/iobase.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py
index f070b39..ac20732 100644
--- a/sdks/python/apache_beam/io/iobase.py
+++ b/sdks/python/apache_beam/io/iobase.py
@@ -43,7 +43,7 @@ from apache_beam.transforms import core
 from apache_beam.transforms import ptransform
 from apache_beam.transforms import window
 
-from apache_beam.runners.dataflow.native_io.iobase import *
+#from apache_beam.runners.dataflow.native_io.iobase import *
 
 
 # Encapsulates information about a bundle of a source generated when method
@@ -704,10 +704,10 @@ class Write(ptransform.PTransform):
     self.sink = sink
 
   def apply(self, pcoll):
-    from apache_beam.runners.dataflow.native_io import iobase as native_iobase
-    if isinstance(self.sink, native_iobase.NativeSink):
+    from apache_beam.runners.dataflow.native_io import iobase as dataflow_io
+    if isinstance(self.sink, dataflow_io.NativeSink):
       # A native sink
-      return pcoll | 'native_write' >> native_iobase._NativeWrite(self.sink)
+      return pcoll | 'native_write' >> dataflow_io._NativeWrite(self.sink)
     elif isinstance(self.sink, Sink):
       # A custom sink
       return pcoll | WriteImpl(self.sink)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/60e271b5/sdks/python/apache_beam/io/pubsub.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/pubsub.py b/sdks/python/apache_beam/io/pubsub.py
index a55e7cd..1f5989a 100644
--- a/sdks/python/apache_beam/io/pubsub.py
+++ b/sdks/python/apache_beam/io/pubsub.py
@@ -24,9 +24,10 @@ from __future__ import absolute_import
 
 from apache_beam import coders
 from apache_beam.io import iobase
+from apache_beam.runners.dataflow.native_io import iobase as dataflow_io
 
 
-class PubSubSource(iobase.NativeSource):
+class PubSubSource(dataflow_io.NativeSource):
   """Source for reading from a given Cloud Pub/Sub topic.
 
   Attributes:
@@ -59,7 +60,7 @@ class PubSubSource(iobase.NativeSource):
         'PubSubSource is not supported in local execution.')
 
 
-class PubSubSink(iobase.NativeSink):
+class PubSubSink(dataflow_io.NativeSink):
   """Sink for writing to a given Cloud Pub/Sub topic."""
 
   def __init__(self, topic, coder=coders.StrUtf8Coder()):

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/60e271b5/sdks/python/apache_beam/pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py
index 8a0d246..8c1b3ba 100644
--- a/sdks/python/apache_beam/pipeline_test.py
+++ b/sdks/python/apache_beam/pipeline_test.py
@@ -21,12 +21,12 @@ import gc
 import logging
 import unittest
 
-from apache_beam.io.iobase import NativeSource
 from apache_beam.pipeline import Pipeline
 from apache_beam.pipeline import PipelineOptions
 from apache_beam.pipeline import PipelineVisitor
 from apache_beam.pvalue import AsIter
 from apache_beam.pvalue import SideOutputValue
+from apache_beam.runners.dataflow.native_io.iobase import NativeSource
 from apache_beam.transforms import CombinePerKey
 from apache_beam.transforms import Create
 from apache_beam.transforms import FlatMap