You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/11/14 23:41:20 UTC

[1/2] incubator-beam git commit: [BEAM-852] Add validation to file based sources during create time

Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 15e78b28a -> 560fe79f8


[BEAM-852] Add validation to file based sources during create time


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/76ad2929
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/76ad2929
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/76ad2929

Branch: refs/heads/python-sdk
Commit: 76ad29296fd57e1eec97bf40d9cf3a1d54a63a3f
Parents: 15e78b2
Author: Sourabh Bajaj <so...@google.com>
Authored: Mon Nov 14 15:40:10 2016 -0800
Committer: Robert Bradshaw <ro...@google.com>
Committed: Mon Nov 14 15:40:10 2016 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/io/avroio.py            |  8 +++-
 sdks/python/apache_beam/io/bigquery.py          |  2 +-
 sdks/python/apache_beam/io/filebasedsource.py   | 16 +++++++-
 .../apache_beam/io/filebasedsource_test.py      | 41 ++++++++++++++------
 sdks/python/apache_beam/io/textio.py            | 13 +++++--
 5 files changed, 60 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/76ad2929/sdks/python/apache_beam/io/avroio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/avroio.py b/sdks/python/apache_beam/io/avroio.py
index 53ed95a..e7e73dd 100644
--- a/sdks/python/apache_beam/io/avroio.py
+++ b/sdks/python/apache_beam/io/avroio.py
@@ -37,7 +37,7 @@ __all__ = ['ReadFromAvro', 'WriteToAvro']
 class ReadFromAvro(PTransform):
   """A ``PTransform`` for reading avro files."""
 
-  def __init__(self, file_pattern=None, min_bundle_size=0):
+  def __init__(self, file_pattern=None, min_bundle_size=0, validate=True):
     """Initializes ``ReadFromAvro``.
 
     Uses source '_AvroSource' to read a set of Avro files defined by a given
@@ -70,13 +70,17 @@ class ReadFromAvro(PTransform):
       file_pattern: the set of files to be read.
       min_bundle_size: the minimum size in bytes, to be considered when
                        splitting the input into bundles.
+      validate: flag to verify that the files exist during the pipeline
+                creation time.
       **kwargs: Additional keyword arguments to be passed to the base class.
     """
     super(ReadFromAvro, self).__init__()
     self._args = (file_pattern, min_bundle_size)
+    self._validate = validate
 
   def apply(self, pvalue):
-    return pvalue.pipeline | Read(_AvroSource(*self._args))
+    return pvalue.pipeline | Read(_AvroSource(*self._args,
+                                              validate=self._validate))
 
 
 class _AvroUtils(object):

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/76ad2929/sdks/python/apache_beam/io/bigquery.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/bigquery.py b/sdks/python/apache_beam/io/bigquery.py
index f0e88a6..8d7892a 100644
--- a/sdks/python/apache_beam/io/bigquery.py
+++ b/sdks/python/apache_beam/io/bigquery.py
@@ -65,7 +65,7 @@ input entails querying the table for all its rows. The coder argument on
 BigQuerySource controls the reading of the lines in the export files (i.e.,
 transform a JSON object into a PCollection element). The coder is not involved
 when the same table is read as a side input since there is no intermediate
-format involved.  We get the table rows directly from the BigQuery service with
+format involved. We get the table rows directly from the BigQuery service with
 a query.
 
 Users may provide a query to read from rather than reading all of a BigQuery

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/76ad2929/sdks/python/apache_beam/io/filebasedsource.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource.py b/sdks/python/apache_beam/io/filebasedsource.py
index 58ad118..c7bc27e 100644
--- a/sdks/python/apache_beam/io/filebasedsource.py
+++ b/sdks/python/apache_beam/io/filebasedsource.py
@@ -50,7 +50,8 @@ class FileBasedSource(iobase.BoundedSource):
                file_pattern,
                min_bundle_size=0,
                compression_type=fileio.CompressionTypes.AUTO,
-               splittable=True):
+               splittable=True,
+               validate=True):
     """Initializes ``FileBasedSource``.
 
     Args:
@@ -68,10 +69,13 @@ class FileBasedSource(iobase.BoundedSource):
                   the file, for example, for compressed files where currently
                   it is not possible to efficiently read a data range without
                   decompressing the whole file.
+      validate: Boolean flag to verify that the files exist during the pipeline
+                creation time.
     Raises:
       TypeError: when compression_type is not valid or if file_pattern is not a
                  string.
       ValueError: when compression and splittable files are specified.
+      IOError: when the file pattern specified yields an empty result.
     """
     if not isinstance(file_pattern, basestring):
       raise TypeError(
@@ -91,6 +95,8 @@ class FileBasedSource(iobase.BoundedSource):
     else:
       # We can't split compressed files efficiently so turn off splitting.
       self._splittable = False
+    if validate:
+      self._validate()
 
   def display_data(self):
     return {'filePattern': DisplayDataItem(self._pattern, label="File Pattern"),
@@ -133,7 +139,6 @@ class FileBasedSource(iobase.BoundedSource):
 
   @staticmethod
   def _estimate_sizes_in_parallel(file_names):
-
     if not file_names:
       return []
     elif len(file_names) == 1:
@@ -150,6 +155,13 @@ class FileBasedSource(iobase.BoundedSource):
       finally:
         pool.terminate()
 
+  def _validate(self):
+    """Validate if there are actual files in the specified glob pattern
+    """
+    if len(fileio.ChannelFactory.glob(self._pattern)) <= 0:
+      raise IOError(
+          'No files found based on the file pattern %s' % self._pattern)
+
   def split(
       self, desired_bundle_size=None, start_position=None, stop_position=None):
     return self._get_concat_source().split(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/76ad2929/sdks/python/apache_beam/io/filebasedsource_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource_test.py b/sdks/python/apache_beam/io/filebasedsource_test.py
index 7bc31fd..7f4d8d3 100644
--- a/sdks/python/apache_beam/io/filebasedsource_test.py
+++ b/sdks/python/apache_beam/io/filebasedsource_test.py
@@ -220,6 +220,26 @@ class TestFileBasedSource(unittest.TestCase):
     # environments with limited amount of resources.
     filebasedsource.MAX_NUM_THREADS_FOR_SIZE_ESTIMATION = 2
 
+  def test_validation_file_exists(self):
+    file_name, _ = write_data(10)
+    LineSource(file_name)
+
+  def test_validation_directory_non_empty(self):
+    temp_dir = tempfile.mkdtemp()
+    file_name, _ = write_data(10, directory=temp_dir)
+    LineSource(file_name)
+
+  def test_validation_failing(self):
+    no_files_found_error = 'No files found based on the file pattern*'
+    with self.assertRaisesRegexp(IOError, no_files_found_error):
+      LineSource('dummy_pattern')
+    with self.assertRaisesRegexp(IOError, no_files_found_error):
+      temp_dir = tempfile.mkdtemp()
+      LineSource(os.path.join(temp_dir, '*'))
+
+  def test_validation_file_missing_verification_disabled(self):
+    LineSource('dummy_pattern', validate=False)
+
   def test_fully_read_single_file(self):
     file_name, expected_data = write_data(10)
     assert len(expected_data) == 10
@@ -525,7 +545,7 @@ class TestSingleFileSource(unittest.TestCase):
     start_not_a_number_error = 'start_offset must be a number*'
     stop_not_a_number_error = 'stop_offset must be a number*'
     file_name = 'dummy_pattern'
-    fbs = LineSource(file_name)
+    fbs = LineSource(file_name, validate=False)
 
     with self.assertRaisesRegexp(TypeError, start_not_a_number_error):
       SingleFileSource(
@@ -545,7 +565,7 @@ class TestSingleFileSource(unittest.TestCase):
 
   def test_source_creation_display_data(self):
     file_name = 'dummy_pattern'
-    fbs = LineSource(file_name)
+    fbs = LineSource(file_name, validate=False)
     dd = DisplayData.create_from(fbs)
     expected_items = [
         DisplayDataItemMatcher('compression', 'auto'),
@@ -556,8 +576,7 @@ class TestSingleFileSource(unittest.TestCase):
   def test_source_creation_fails_if_start_lg_stop(self):
     start_larger_than_stop_error = (
         'start_offset must be smaller than stop_offset*')
-
-    fbs = LineSource('dummy_pattern')
+    fbs = LineSource('dummy_pattern', validate=False)
     SingleFileSource(
         fbs, file_name='dummy_file', start_offset=99, stop_offset=100)
     with self.assertRaisesRegexp(ValueError, start_larger_than_stop_error):
@@ -568,7 +587,7 @@ class TestSingleFileSource(unittest.TestCase):
           fbs, file_name='dummy_file', start_offset=100, stop_offset=100)
 
   def test_estimates_size(self):
-    fbs = LineSource('dummy_pattern')
+    fbs = LineSource('dummy_pattern', validate=False)
 
     # Should simply return stop_offset - start_offset
     source = SingleFileSource(
@@ -580,7 +599,7 @@ class TestSingleFileSource(unittest.TestCase):
     self.assertEquals(90, source.estimate_size())
 
   def test_read_range_at_beginning(self):
-    fbs = LineSource('dummy_pattern')
+    fbs = LineSource('dummy_pattern', validate=False)
 
     file_name, expected_data = write_data(10)
     assert len(expected_data) == 10
@@ -591,7 +610,7 @@ class TestSingleFileSource(unittest.TestCase):
     self.assertItemsEqual(expected_data[:4], read_data)
 
   def test_read_range_at_end(self):
-    fbs = LineSource('dummy_pattern')
+    fbs = LineSource('dummy_pattern', validate=False)
 
     file_name, expected_data = write_data(10)
     assert len(expected_data) == 10
@@ -602,7 +621,7 @@ class TestSingleFileSource(unittest.TestCase):
     self.assertItemsEqual(expected_data[-3:], read_data)
 
   def test_read_range_at_middle(self):
-    fbs = LineSource('dummy_pattern')
+    fbs = LineSource('dummy_pattern', validate=False)
 
     file_name, expected_data = write_data(10)
     assert len(expected_data) == 10
@@ -613,7 +632,7 @@ class TestSingleFileSource(unittest.TestCase):
     self.assertItemsEqual(expected_data[4:7], read_data)
 
   def test_produces_splits_desiredsize_large_than_size(self):
-    fbs = LineSource('dummy_pattern')
+    fbs = LineSource('dummy_pattern', validate=False)
 
     file_name, expected_data = write_data(10)
     assert len(expected_data) == 10
@@ -629,7 +648,7 @@ class TestSingleFileSource(unittest.TestCase):
     self.assertItemsEqual(expected_data, read_data)
 
   def test_produces_splits_desiredsize_smaller_than_size(self):
-    fbs = LineSource('dummy_pattern')
+    fbs = LineSource('dummy_pattern', validate=False)
 
     file_name, expected_data = write_data(10)
     assert len(expected_data) == 10
@@ -647,7 +666,7 @@ class TestSingleFileSource(unittest.TestCase):
     self.assertItemsEqual(expected_data, read_data)
 
   def test_produce_split_with_start_and_end_positions(self):
-    fbs = LineSource('dummy_pattern')
+    fbs = LineSource('dummy_pattern', validate=False)
 
     file_name, expected_data = write_data(10)
     assert len(expected_data) == 10

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/76ad2929/sdks/python/apache_beam/io/textio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/textio.py b/sdks/python/apache_beam/io/textio.py
index 01f6ef6..e031572 100644
--- a/sdks/python/apache_beam/io/textio.py
+++ b/sdks/python/apache_beam/io/textio.py
@@ -72,9 +72,10 @@ class _TextSource(filebasedsource.FileBasedSource):
 
   def __init__(self, file_pattern, min_bundle_size,
                compression_type, strip_trailing_newlines, coder,
-               buffer_size=DEFAULT_READ_BUFFER_SIZE):
+               buffer_size=DEFAULT_READ_BUFFER_SIZE, validate=True):
     super(_TextSource, self).__init__(file_pattern, min_bundle_size,
-                                      compression_type=compression_type)
+                                      compression_type=compression_type,
+                                      validate=validate)
 
     self._strip_trailing_newlines = strip_trailing_newlines
     self._compression_type = compression_type
@@ -206,7 +207,6 @@ class ReadFromText(PTransform):
 
   This implementation only supports reading text encoded using UTF-8 or ASCII.
   This does not support other encodings such as UTF-16 or UTF-32."""
-
   def __init__(
       self,
       file_pattern=None,
@@ -214,6 +214,7 @@ class ReadFromText(PTransform):
       compression_type=fileio.CompressionTypes.AUTO,
       strip_trailing_newlines=True,
       coder=coders.StrUtf8Coder(),
+      validate=True,
       **kwargs):
     """Initialize the ReadFromText transform.
 
@@ -230,15 +231,19 @@ class ReadFromText(PTransform):
       strip_trailing_newlines: Indicates whether this source should remove
                                the newline char in each line it reads before
                                decoding that line.
+      validate: flag to verify that the files exist during the pipeline
+                creation time.
       coder: Coder used to decode each line.
     """
 
     super(ReadFromText, self).__init__(**kwargs)
     self._args = (file_pattern, min_bundle_size, compression_type,
                   strip_trailing_newlines, coder)
+    self._validate = validate
 
   def apply(self, pvalue):
-    return pvalue.pipeline | Read(_TextSource(*self._args))
+    return pvalue.pipeline | Read(_TextSource(*self._args,
+                                              validate=self._validate))
 
 
 class WriteToText(PTransform):


[2/2] incubator-beam git commit: Closes #1220

Posted by ro...@apache.org.
Closes #1220


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/560fe79f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/560fe79f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/560fe79f

Branch: refs/heads/python-sdk
Commit: 560fe79f875b9d08b7256e6bf653a48b19f0ccb5
Parents: 15e78b2 76ad292
Author: Robert Bradshaw <ro...@google.com>
Authored: Mon Nov 14 15:41:00 2016 -0800
Committer: Robert Bradshaw <ro...@google.com>
Committed: Mon Nov 14 15:41:00 2016 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/io/avroio.py            |  8 +++-
 sdks/python/apache_beam/io/bigquery.py          |  2 +-
 sdks/python/apache_beam/io/filebasedsource.py   | 16 +++++++-
 .../apache_beam/io/filebasedsource_test.py      | 41 ++++++++++++++------
 sdks/python/apache_beam/io/textio.py            | 13 +++++--
 5 files changed, 60 insertions(+), 20 deletions(-)
----------------------------------------------------------------------