You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2017/03/22 21:29:40 UTC
[1/3] beam git commit: BeamFileSystem implementation instead of
IOChannelFactory
Repository: beam
Updated Branches:
refs/heads/master 7c7bb8209 -> dd0f8d984
http://git-wip-us.apache.org/repos/asf/beam/blob/1bc240bc/sdks/python/apache_beam/io/localfilesystem.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/localfilesystem.py b/sdks/python/apache_beam/io/localfilesystem.py
new file mode 100644
index 0000000..46589b0
--- /dev/null
+++ b/sdks/python/apache_beam/io/localfilesystem.py
@@ -0,0 +1,236 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""Local File system implementation for accessing files on disk."""
+
+from __future__ import absolute_import
+
+import glob
+import os
+import shutil
+
+from apache_beam.io.filesystem import BeamIOError
+from apache_beam.io.filesystem import CompressedFile
+from apache_beam.io.filesystem import CompressionTypes
+from apache_beam.io.filesystem import FileMetadata
+from apache_beam.io.filesystem import FileSystem
+from apache_beam.io.filesystem import MatchResult
+
+
+class LocalFileSystem(FileSystem):
+ """A Local ``FileSystem`` implementation for accessing files on disk.
+ """
+
+ def mkdirs(self, path):
+ """Recursively create directories for the provided path.
+
+ Args:
+ path: string path of the directory structure that should be created
+
+ Raises:
+ IOError if leaf directory already exists.
+ """
+ try:
+ os.makedirs(path)
+ except OSError as err:
+ raise IOError(err)
+
+ def match(self, patterns, limits=None):
+ """Find all matching paths to the pattern provided.
+
+ Args:
+ patterns: list of string for the file path pattern to match against
+ limits: list of maximum number of responses that need to be fetched
+
+ Returns: list of ``MatchResult`` objects.
+
+ Raises:
+ ``BeamIOError`` if any of the pattern match operations fail
+ """
+ if limits is None:
+ limits = [None] * len(patterns)
+ else:
+ err_msg = "Patterns and limits should be equal in length"
+ assert len(patterns) == len(limits), err_msg
+
+ def _match(pattern, limit):
+ """Find all matching paths to the pattern provided.
+ """
+ files = glob.glob(pattern)
+ metadata = [FileMetadata(f, os.path.getsize(f)) for f in files[:limit]]
+ return MatchResult(pattern, metadata)
+
+ exceptions = {}
+ result = []
+ for pattern, limit in zip(patterns, limits):
+ try:
+ result.append(_match(pattern, limit))
+ except Exception as e: # pylint: disable=broad-except
+ exceptions[pattern] = e
+
+ if exceptions:
+ raise BeamIOError("Match operation failed", exceptions)
+ return result
+
+ def _path_open(self, path, mode, mime_type='application/octet-stream',
+ compression_type=CompressionTypes.AUTO):
+ """Helper functions to open a file in the provided mode.
+ """
+ compression_type = FileSystem._get_compression_type(path, compression_type)
+ raw_file = open(path, mode)
+ if compression_type == CompressionTypes.UNCOMPRESSED:
+ return raw_file
+ else:
+ return CompressedFile(raw_file, compression_type=compression_type)
+
+ def create(self, path, mime_type='application/octet-stream',
+ compression_type=CompressionTypes.AUTO):
+ """Returns a write channel for the given file path.
+
+ Args:
+ path: string path of the file object to be written to the system
+ mime_type: MIME type to specify the type of content in the file object
+ compression_type: Type of compression to be used for this object
+
+ Returns: file handle with a close function for the user to use
+ """
+ return self._path_open(path, 'wb', mime_type, compression_type)
+
+ def open(self, path, mime_type='application/octet-stream',
+ compression_type=CompressionTypes.AUTO):
+ """Returns a read channel for the given file path.
+
+ Args:
+ path: string path of the file object to be written to the system
+ mime_type: MIME type to specify the type of content in the file object
+ compression_type: Type of compression to be used for this object
+
+ Returns: file handle with a close function for the user to use
+ """
+ return self._path_open(path, 'rb', mime_type, compression_type)
+
+ def copy(self, source_file_names, destination_file_names):
+ """Recursively copy the file tree from the source to the destination
+
+ Args:
+ source_file_names: list of source file objects that needs to be copied
+ destination_file_names: list of destination of the new object
+
+ Raises:
+ ``BeamIOError`` if any of the copy operations fail
+ """
+ err_msg = ("source_file_names and destination_file_names should "
+ "be equal in length")
+ assert len(source_file_names) == len(destination_file_names), err_msg
+
+ def _copy_path(source, destination):
+ """Recursively copy the file tree from the source to the destination
+ """
+ try:
+ if os.path.exists(destination):
+ if os.path.isdir(destination):
+ shutil.rmtree(destination)
+ else:
+ os.remove(destination)
+ if os.path.isdir(source):
+ shutil.copytree(source, destination)
+ else:
+ shutil.copy2(source, destination)
+ except OSError as err:
+ raise IOError(err)
+
+ exceptions = {}
+ for source, destination in zip(source_file_names, destination_file_names):
+ try:
+ _copy_path(source, destination)
+ except Exception as e: # pylint: disable=broad-except
+ exceptions[(source, destination)] = e
+
+ if exceptions:
+ raise BeamIOError("Copy operation failed", exceptions)
+
+ def rename(self, source_file_names, destination_file_names):
+ """Rename the files at the source list to the destination list.
+ Source and destination lists should be of the same size.
+
+ Args:
+ source_file_names: List of file paths that need to be moved
+ destination_file_names: List of destination_file_names for the files
+
+ Raises:
+ ``BeamIOError`` if any of the rename operations fail
+ """
+ err_msg = ("source_file_names and destination_file_names should "
+ "be equal in length")
+ assert len(source_file_names) == len(destination_file_names), err_msg
+
+ def _rename_file(source, destination):
+ """Rename a single file object"""
+ try:
+ os.rename(source, destination)
+ except OSError as err:
+ raise IOError(err)
+
+ exceptions = {}
+ for source, destination in zip(source_file_names, destination_file_names):
+ try:
+ _rename_file(source, destination)
+ except Exception as e: # pylint: disable=broad-except
+ exceptions[(source, destination)] = e
+
+ if exceptions:
+ raise BeamIOError("Rename operation failed", exceptions)
+
+ def exists(self, path):
+ """Check if the provided path exists on the FileSystem.
+
+ Args:
+ path: string path that needs to be checked.
+
+ Returns: boolean flag indicating if path exists
+ """
+ return os.path.exists(path)
+
+ def delete(self, paths):
+ """Deletes files or directories at the provided paths.
+ Directories will be deleted recursively.
+
+ Args:
+ paths: list of paths that give the file objects to be deleted
+
+ Raises:
+ ``BeamIOError`` if any of the delete operations fail
+ """
+ def _delete_path(path):
+ """Recursively delete the file or directory at the provided path.
+ """
+ try:
+ if os.path.isdir(path):
+ shutil.rmtree(path)
+ else:
+ os.remove(path)
+ except OSError as err:
+ raise IOError(err)
+
+ exceptions = {}
+ for path in paths:
+ try:
+ _delete_path(path)
+ except Exception as e: # pylint: disable=broad-except
+ exceptions[path] = e
+
+ if exceptions:
+ raise BeamIOError("Delete operation failed", exceptions)
http://git-wip-us.apache.org/repos/asf/beam/blob/1bc240bc/sdks/python/apache_beam/io/localfilesystem_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/localfilesystem_test.py b/sdks/python/apache_beam/io/localfilesystem_test.py
new file mode 100644
index 0000000..00059ef
--- /dev/null
+++ b/sdks/python/apache_beam/io/localfilesystem_test.py
@@ -0,0 +1,185 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for LocalFileSystem."""
+
+import unittest
+
+import filecmp
+import os
+import shutil
+import tempfile
+from apache_beam.io.filesystem import BeamIOError
+from apache_beam.io.localfilesystem import LocalFileSystem
+
+
+class LocalFileSystemTest(unittest.TestCase):
+
+ def setUp(self):
+ self.tmpdir = tempfile.mkdtemp()
+ self.fs = LocalFileSystem()
+
+ def tearDown(self):
+ shutil.rmtree(self.tmpdir)
+
+ def test_mkdirs(self):
+ path = os.path.join(self.tmpdir, 't1/t2')
+ self.fs.mkdirs(path)
+ self.assertTrue(os.path.isdir(path))
+
+ def test_mkdirs_failed(self):
+ path = os.path.join(self.tmpdir, 't1/t2')
+ self.fs.mkdirs(path)
+
+ # Check IOError if existing directory is created
+ with self.assertRaises(IOError):
+ self.fs.mkdirs(path)
+
+ with self.assertRaises(IOError):
+ self.fs.mkdirs(os.path.join(self.tmpdir, 't1'))
+
+ def test_match_file(self):
+ path = os.path.join(self.tmpdir, 'f1')
+ open(path, 'a').close()
+
+ # Match files in the temp directory
+ result = self.fs.match([path])[0]
+ files = [f.path for f in result.metadata_list]
+ self.assertEqual(files, [path])
+
+ def test_match_file_empty(self):
+ path = os.path.join(self.tmpdir, 'f2') # Does not exist
+
+ # Match files in the temp directory
+ result = self.fs.match([path])[0]
+ files = [f.path for f in result.metadata_list]
+ self.assertEqual(files, [])
+
+ def test_match_file_exception(self):
+ # Match files with None so that it throws an exception
+ with self.assertRaises(BeamIOError) as error:
+ self.fs.match([None])
+ self.assertEqual(error.exception.message, 'Match operation failed')
+ self.assertEqual(error.exception.exception_details.keys(), [None])
+
+ def test_match_directory(self):
+ path1 = os.path.join(self.tmpdir, 'f1')
+ path2 = os.path.join(self.tmpdir, 'f2')
+ open(path1, 'a').close()
+ open(path2, 'a').close()
+
+ # Match both the files in the directory
+ path = os.path.join(self.tmpdir, '*')
+ result = self.fs.match([path])[0]
+ files = [f.path for f in result.metadata_list]
+ self.assertEqual(files, [path1, path2])
+
+ def test_match_directory(self):
+ result = self.fs.match([self.tmpdir])[0]
+ files = [f.path for f in result.metadata_list]
+ self.assertEqual(files, [self.tmpdir])
+
+ def test_copy(self):
+ path1 = os.path.join(self.tmpdir, 'f1')
+ path2 = os.path.join(self.tmpdir, 'f2')
+ with open(path1, 'a') as f:
+ f.write('Hello')
+
+ self.fs.copy([path1], [path2])
+ self.assertTrue(filecmp.cmp(path1, path2))
+
+ def test_copy_error(self):
+ path1 = os.path.join(self.tmpdir, 'f1')
+ path2 = os.path.join(self.tmpdir, 'f2')
+ with self.assertRaises(BeamIOError) as error:
+ self.fs.copy([path1], [path2])
+ self.assertEqual(error.exception.message, 'Copy operation failed')
+ self.assertEqual(error.exception.exception_details.keys(), [(path1, path2)])
+
+ def test_copy_directory(self):
+ path_t1 = os.path.join(self.tmpdir, 't1')
+ path_t2 = os.path.join(self.tmpdir, 't2')
+ self.fs.mkdirs(path_t1)
+ self.fs.mkdirs(path_t2)
+
+ path1 = os.path.join(path_t1, 'f1')
+ path2 = os.path.join(path_t2, 'f1')
+ with open(path1, 'a') as f:
+ f.write('Hello')
+
+ self.fs.copy([path_t1], [path_t2])
+ self.assertTrue(filecmp.cmp(path1, path2))
+
+ def test_rename(self):
+ path1 = os.path.join(self.tmpdir, 'f1')
+ path2 = os.path.join(self.tmpdir, 'f2')
+ with open(path1, 'a') as f:
+ f.write('Hello')
+
+ self.fs.rename([path1], [path2])
+ self.assertTrue(self.fs.exists(path2))
+ self.assertFalse(self.fs.exists(path1))
+
+ def test_rename_error(self):
+ path1 = os.path.join(self.tmpdir, 'f1')
+ path2 = os.path.join(self.tmpdir, 'f2')
+ with self.assertRaises(BeamIOError) as error:
+ self.fs.rename([path1], [path2])
+ self.assertEqual(error.exception.message, 'Rename operation failed')
+ self.assertEqual(error.exception.exception_details.keys(), [(path1, path2)])
+
+ def test_rename_directory(self):
+ path_t1 = os.path.join(self.tmpdir, 't1')
+ path_t2 = os.path.join(self.tmpdir, 't2')
+ self.fs.mkdirs(path_t1)
+
+ path1 = os.path.join(path_t1, 'f1')
+ path2 = os.path.join(path_t2, 'f1')
+ with open(path1, 'a') as f:
+ f.write('Hello')
+
+ self.fs.rename([path_t1], [path_t2])
+ self.assertTrue(self.fs.exists(path_t2))
+ self.assertFalse(self.fs.exists(path_t1))
+ self.assertTrue(self.fs.exists(path2))
+ self.assertFalse(self.fs.exists(path1))
+
+ def test_exists(self):
+ path1 = os.path.join(self.tmpdir, 'f1')
+ path2 = os.path.join(self.tmpdir, 'f2')
+ with open(path1, 'a') as f:
+ f.write('Hello')
+ self.assertTrue(self.fs.exists(path1))
+ self.assertFalse(self.fs.exists(path2))
+
+ def test_delete(self):
+ path1 = os.path.join(self.tmpdir, 'f1')
+
+ with open(path1, 'a') as f:
+ f.write('Hello')
+
+ self.assertTrue(self.fs.exists(path1))
+ self.fs.delete([path1])
+ self.assertFalse(self.fs.exists(path1))
+
+ def test_delete_error(self):
+ path1 = os.path.join(self.tmpdir, 'f1')
+ with self.assertRaises(BeamIOError) as error:
+ self.fs.delete([path1])
+ self.assertEqual(error.exception.message, 'Delete operation failed')
+ self.assertEqual(error.exception.exception_details.keys(), [path1])
http://git-wip-us.apache.org/repos/asf/beam/blob/1bc240bc/sdks/python/apache_beam/io/textio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/textio.py b/sdks/python/apache_beam/io/textio.py
index 5bb1a9d..8122fae 100644
--- a/sdks/python/apache_beam/io/textio.py
+++ b/sdks/python/apache_beam/io/textio.py
@@ -25,6 +25,7 @@ from apache_beam import coders
from apache_beam.io import filebasedsource
from apache_beam.io import fileio
from apache_beam.io import iobase
+from apache_beam.io.filesystem import CompressionTypes
from apache_beam.io.iobase import Read
from apache_beam.io.iobase import Write
from apache_beam.transforms import PTransform
@@ -271,7 +272,7 @@ class _TextSink(fileio.FileSink):
num_shards=0,
shard_name_template=None,
coder=coders.ToStringCoder(),
- compression_type=fileio.CompressionTypes.AUTO,
+ compression_type=CompressionTypes.AUTO,
header=None):
"""Initialize a _TextSink.
@@ -355,7 +356,7 @@ class ReadFromText(PTransform):
self,
file_pattern=None,
min_bundle_size=0,
- compression_type=fileio.CompressionTypes.AUTO,
+ compression_type=CompressionTypes.AUTO,
strip_trailing_newlines=True,
coder=coders.StrUtf8Coder(),
validate=True,
@@ -404,7 +405,7 @@ class WriteToText(PTransform):
num_shards=0,
shard_name_template=None,
coder=coders.ToStringCoder(),
- compression_type=fileio.CompressionTypes.AUTO,
+ compression_type=CompressionTypes.AUTO,
header=None):
"""Initialize a WriteToText PTransform.
http://git-wip-us.apache.org/repos/asf/beam/blob/1bc240bc/sdks/python/apache_beam/io/textio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/textio_test.py b/sdks/python/apache_beam/io/textio_test.py
index 04cf44c..b3f4391 100644
--- a/sdks/python/apache_beam/io/textio_test.py
+++ b/sdks/python/apache_beam/io/textio_test.py
@@ -41,7 +41,7 @@ from apache_beam import coders
from apache_beam.io.filebasedsource_test import EOL
from apache_beam.io.filebasedsource_test import write_data
from apache_beam.io.filebasedsource_test import write_pattern
-from apache_beam.io.fileio import CompressionTypes
+from apache_beam.io.filesystem import CompressionTypes
from apache_beam.test_pipeline import TestPipeline
http://git-wip-us.apache.org/repos/asf/beam/blob/1bc240bc/sdks/python/apache_beam/io/tfrecordio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/tfrecordio.py b/sdks/python/apache_beam/io/tfrecordio.py
index 05c0a13..8b9d9ea 100644
--- a/sdks/python/apache_beam/io/tfrecordio.py
+++ b/sdks/python/apache_beam/io/tfrecordio.py
@@ -24,6 +24,7 @@ import struct
from apache_beam import coders
from apache_beam.io import filebasedsource
from apache_beam.io import fileio
+from apache_beam.io.filesystem import CompressionTypes
from apache_beam.io.iobase import Read
from apache_beam.io.iobase import Write
from apache_beam.transforms import PTransform
@@ -180,7 +181,7 @@ class ReadFromTFRecord(PTransform):
def __init__(self,
file_pattern,
coder=coders.BytesCoder(),
- compression_type=fileio.CompressionTypes.AUTO,
+ compression_type=CompressionTypes.AUTO,
validate=True,
**kwargs):
"""Initialize a ReadFromTFRecord transform.
@@ -239,7 +240,7 @@ class WriteToTFRecord(PTransform):
file_name_suffix='',
num_shards=0,
shard_name_template=fileio.DEFAULT_SHARD_NAME_TEMPLATE,
- compression_type=fileio.CompressionTypes.AUTO,
+ compression_type=CompressionTypes.AUTO,
**kwargs):
"""Initialize WriteToTFRecord transform.
http://git-wip-us.apache.org/repos/asf/beam/blob/1bc240bc/sdks/python/apache_beam/io/tfrecordio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/tfrecordio_test.py b/sdks/python/apache_beam/io/tfrecordio_test.py
index df33fcb..49f9639 100644
--- a/sdks/python/apache_beam/io/tfrecordio_test.py
+++ b/sdks/python/apache_beam/io/tfrecordio_test.py
@@ -29,7 +29,7 @@ import unittest
import apache_beam as beam
from apache_beam import coders
-from apache_beam.io import fileio
+from apache_beam.io.filesystem import CompressionTypes
from apache_beam.io.tfrecordio import _TFRecordSink
from apache_beam.io.tfrecordio import _TFRecordSource
from apache_beam.io.tfrecordio import _TFRecordUtil
@@ -175,7 +175,7 @@ class TestTFRecordSink(_TestCaseWithTempDirCleanUp):
file_name_suffix='',
num_shards=0,
shard_name_template=None,
- compression_type=fileio.CompressionTypes.UNCOMPRESSED)
+ compression_type=CompressionTypes.UNCOMPRESSED)
self._write_lines(sink, path, ['foo'])
with open(path, 'r') as f:
@@ -190,7 +190,7 @@ class TestTFRecordSink(_TestCaseWithTempDirCleanUp):
file_name_suffix='',
num_shards=0,
shard_name_template=None,
- compression_type=fileio.CompressionTypes.UNCOMPRESSED)
+ compression_type=CompressionTypes.UNCOMPRESSED)
self._write_lines(sink, path, ['foo', 'bar'])
with open(path, 'r') as f:
@@ -205,7 +205,7 @@ class TestWriteToTFRecord(TestTFRecordSink):
with TestPipeline() as p:
input_data = ['foo', 'bar']
_ = p | beam.Create(input_data) | WriteToTFRecord(
- file_path_prefix, compression_type=fileio.CompressionTypes.GZIP)
+ file_path_prefix, compression_type=CompressionTypes.GZIP)
actual = []
file_name = glob.glob(file_path_prefix + '-*')[0]
@@ -252,7 +252,7 @@ class TestTFRecordSource(_TestCaseWithTempDirCleanUp):
_TFRecordSource(
path,
coder=coders.BytesCoder(),
- compression_type=fileio.CompressionTypes.AUTO,
+ compression_type=CompressionTypes.AUTO,
validate=True)))
beam.assert_that(result, beam.equal_to(['foo']))
@@ -265,7 +265,7 @@ class TestTFRecordSource(_TestCaseWithTempDirCleanUp):
_TFRecordSource(
path,
coder=coders.BytesCoder(),
- compression_type=fileio.CompressionTypes.AUTO,
+ compression_type=CompressionTypes.AUTO,
validate=True)))
beam.assert_that(result, beam.equal_to(['foo', 'bar']))
@@ -278,7 +278,7 @@ class TestTFRecordSource(_TestCaseWithTempDirCleanUp):
_TFRecordSource(
path,
coder=coders.BytesCoder(),
- compression_type=fileio.CompressionTypes.GZIP,
+ compression_type=CompressionTypes.GZIP,
validate=True)))
beam.assert_that(result, beam.equal_to(['foo', 'bar']))
@@ -291,7 +291,7 @@ class TestTFRecordSource(_TestCaseWithTempDirCleanUp):
_TFRecordSource(
path,
coder=coders.BytesCoder(),
- compression_type=fileio.CompressionTypes.AUTO,
+ compression_type=CompressionTypes.AUTO,
validate=True)))
beam.assert_that(result, beam.equal_to(['foo', 'bar']))
@@ -304,7 +304,7 @@ class TestReadFromTFRecordSource(TestTFRecordSource):
with TestPipeline() as p:
result = (p
| ReadFromTFRecord(
- path, compression_type=fileio.CompressionTypes.GZIP))
+ path, compression_type=CompressionTypes.GZIP))
beam.assert_that(result, beam.equal_to(['foo', 'bar']))
def test_process_gzip_auto(self):
@@ -313,7 +313,7 @@ class TestReadFromTFRecordSource(TestTFRecordSource):
with TestPipeline() as p:
result = (p
| ReadFromTFRecord(
- path, compression_type=fileio.CompressionTypes.AUTO))
+ path, compression_type=CompressionTypes.AUTO))
beam.assert_that(result, beam.equal_to(['foo', 'bar']))
http://git-wip-us.apache.org/repos/asf/beam/blob/1bc240bc/sdks/python/apache_beam/tests/pipeline_verifiers.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/tests/pipeline_verifiers.py b/sdks/python/apache_beam/tests/pipeline_verifiers.py
index 379a96f..0d6814e 100644
--- a/sdks/python/apache_beam/tests/pipeline_verifiers.py
+++ b/sdks/python/apache_beam/tests/pipeline_verifiers.py
@@ -26,7 +26,7 @@ import logging
from hamcrest.core.base_matcher import BaseMatcher
-from apache_beam.io.fileio import ChannelFactory
+from apache_beam.io.filesystems_util import get_filesystem
from apache_beam.runners.runner import PipelineState
from apache_beam.tests import test_utils as utils
from apache_beam.utils import retry
@@ -81,6 +81,7 @@ class FileChecksumMatcher(BaseMatcher):
def __init__(self, file_path, expected_checksum):
self.file_path = file_path
+ self.file_system = get_filesystem(self.file_path)
self.expected_checksum = expected_checksum
@retry.with_exponential_backoff(
@@ -89,11 +90,12 @@ class FileChecksumMatcher(BaseMatcher):
def _read_with_retry(self):
"""Read path with retry if I/O failed"""
read_lines = []
- matched_path = ChannelFactory.glob(self.file_path)
+ match_result = self.file_system.match([self.file_path])[0]
+ matched_path = [f.path for f in match_result.metadata_list]
if not matched_path:
raise IOError('No such file or directory: %s' % self.file_path)
for path in matched_path:
- with ChannelFactory.open(path, 'r') as f:
+ with self.file_system.open(path, 'r') as f:
for line in f:
read_lines.append(line)
return read_lines
http://git-wip-us.apache.org/repos/asf/beam/blob/1bc240bc/sdks/python/apache_beam/tests/pipeline_verifiers_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/tests/pipeline_verifiers_test.py b/sdks/python/apache_beam/tests/pipeline_verifiers_test.py
index 586af82..af8f441 100644
--- a/sdks/python/apache_beam/tests/pipeline_verifiers_test.py
+++ b/sdks/python/apache_beam/tests/pipeline_verifiers_test.py
@@ -24,16 +24,20 @@ import unittest
from hamcrest import assert_that as hc_assert_that
from mock import Mock, patch
-from apache_beam.io.fileio import ChannelFactory
+from apache_beam.io.localfilesystem import LocalFileSystem
from apache_beam.runners.runner import PipelineState
from apache_beam.runners.runner import PipelineResult
from apache_beam.tests import pipeline_verifiers as verifiers
from apache_beam.tests.test_utils import patch_retry
try:
+ # pylint: disable=wrong-import-order, wrong-import-position
+ # pylint: disable=ungrouped-imports
from apitools.base.py.exceptions import HttpError
+ from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem
except ImportError:
HttpError = None
+ GCSFileSystem = None
class PipelineVerifiersTest(unittest.TestCase):
@@ -96,26 +100,26 @@ class PipelineVerifiersTest(unittest.TestCase):
case['expected_checksum'])
hc_assert_that(self._mock_result, matcher)
- @patch.object(ChannelFactory, 'glob')
- def test_file_checksum_matcher_read_failed(self, mock_glob):
- mock_glob.side_effect = IOError('No file found.')
+ @patch.object(LocalFileSystem, 'match')
+ def test_file_checksum_matcher_read_failed(self, mock_match):
+ mock_match.side_effect = IOError('No file found.')
matcher = verifiers.FileChecksumMatcher('dummy/path', Mock())
with self.assertRaises(IOError):
hc_assert_that(self._mock_result, matcher)
- self.assertTrue(mock_glob.called)
- self.assertEqual(verifiers.MAX_RETRIES + 1, mock_glob.call_count)
+ self.assertTrue(mock_match.called)
+ self.assertEqual(verifiers.MAX_RETRIES + 1, mock_match.call_count)
- @patch.object(ChannelFactory, 'glob')
+ @patch.object(GCSFileSystem, 'match')
@unittest.skipIf(HttpError is None, 'google-apitools is not installed')
- def test_file_checksum_matcher_service_error(self, mock_glob):
- mock_glob.side_effect = HttpError(
+ def test_file_checksum_matcher_service_error(self, mock_match):
+ mock_match.side_effect = HttpError(
response={'status': '404'}, url='', content='Not Found',
)
matcher = verifiers.FileChecksumMatcher('gs://dummy/path', Mock())
with self.assertRaises(HttpError):
hc_assert_that(self._mock_result, matcher)
- self.assertTrue(mock_glob.called)
- self.assertEqual(verifiers.MAX_RETRIES + 1, mock_glob.call_count)
+ self.assertTrue(mock_match.called)
+ self.assertEqual(verifiers.MAX_RETRIES + 1, mock_match.call_count)
if __name__ == '__main__':
[2/3] beam git commit: BeamFileSystem implementation instead of
IOChannelFactory
Posted by ch...@apache.org.
BeamFileSystem implementation instead of IOChannelFactory
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1bc240bc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1bc240bc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1bc240bc
Branch: refs/heads/master
Commit: 1bc240bc637883cb1c04253264534c9d545fdefa
Parents: 7c7bb82
Author: Sourabh Bajaj <so...@google.com>
Authored: Wed Mar 1 23:01:34 2017 -0800
Committer: Chamikara Jayalath <ch...@google.com>
Committed: Wed Mar 22 14:27:01 2017 -0700
----------------------------------------------------------------------
.../apache_beam/examples/snippets/snippets.py | 2 +-
sdks/python/apache_beam/io/avroio.py | 3 +-
sdks/python/apache_beam/io/filebasedsource.py | 83 +--
.../apache_beam/io/filebasedsource_test.py | 18 +-
sdks/python/apache_beam/io/fileio.py | 588 +++----------------
sdks/python/apache_beam/io/fileio_test.py | 102 +---
sdks/python/apache_beam/io/filesystem.py | 439 ++++++++++++++
sdks/python/apache_beam/io/filesystems_util.py | 31 +
sdks/python/apache_beam/io/gcp/gcsfilesystem.py | 242 ++++++++
.../apache_beam/io/gcp/gcsfilesystem_test.py | 293 +++++++++
sdks/python/apache_beam/io/gcp/gcsio.py | 12 +-
sdks/python/apache_beam/io/gcp/gcsio_test.py | 9 +-
sdks/python/apache_beam/io/iobase.py | 4 +-
sdks/python/apache_beam/io/localfilesystem.py | 236 ++++++++
.../apache_beam/io/localfilesystem_test.py | 185 ++++++
sdks/python/apache_beam/io/textio.py | 7 +-
sdks/python/apache_beam/io/textio_test.py | 2 +-
sdks/python/apache_beam/io/tfrecordio.py | 5 +-
sdks/python/apache_beam/io/tfrecordio_test.py | 20 +-
.../apache_beam/tests/pipeline_verifiers.py | 8 +-
.../tests/pipeline_verifiers_test.py | 26 +-
21 files changed, 1613 insertions(+), 702 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/1bc240bc/sdks/python/apache_beam/examples/snippets/snippets.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py
index 18f1506..5cb5ee5 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -856,7 +856,7 @@ def model_textio_compressed(renames, expected):
# [START model_textio_write_compressed]
lines = p | 'ReadFromText' >> beam.io.ReadFromText(
'/path/to/input-*.csv.gz',
- compression_type=beam.io.fileio.CompressionTypes.GZIP)
+ compression_type=beam.io.filesystem.CompressionTypes.GZIP)
# [END model_textio_write_compressed]
beam.assert_that(lines, beam.equal_to(expected))
http://git-wip-us.apache.org/repos/asf/beam/blob/1bc240bc/sdks/python/apache_beam/io/avroio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/avroio.py b/sdks/python/apache_beam/io/avroio.py
index 6fdd798..1c08c68 100644
--- a/sdks/python/apache_beam/io/avroio.py
+++ b/sdks/python/apache_beam/io/avroio.py
@@ -29,6 +29,7 @@ import apache_beam as beam
from apache_beam.io import filebasedsource
from apache_beam.io import fileio
from apache_beam.io import iobase
+from apache_beam.io.filesystem import CompressionTypes
from apache_beam.io.iobase import Read
from apache_beam.transforms import PTransform
@@ -354,7 +355,7 @@ class _AvroSink(fileio.FileSink):
mime_type=mime_type,
# Compression happens at the block level using the supplied codec, and
# not at the file level.
- compression_type=fileio.CompressionTypes.UNCOMPRESSED)
+ compression_type=CompressionTypes.UNCOMPRESSED)
self._schema = schema
self._codec = codec
http://git-wip-us.apache.org/repos/asf/beam/blob/1bc240bc/sdks/python/apache_beam/io/filebasedsource.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource.py b/sdks/python/apache_beam/io/filebasedsource.py
index 582d673..a3e0667 100644
--- a/sdks/python/apache_beam/io/filebasedsource.py
+++ b/sdks/python/apache_beam/io/filebasedsource.py
@@ -25,16 +25,13 @@ for more details.
For an example implementation of ``FileBasedSource`` see ``avroio.AvroSource``.
"""
-import random
-
from apache_beam.internal import pickler
-from apache_beam.internal import util
from apache_beam.io import concat_source
-from apache_beam.io import fileio
from apache_beam.io import iobase
from apache_beam.io import range_trackers
+from apache_beam.io.filesystem import CompressionTypes
+from apache_beam.io.filesystems_util import get_filesystem
from apache_beam.transforms.display import DisplayDataItem
-
MAX_NUM_THREADS_FOR_SIZE_ESTIMATION = 25
@@ -47,7 +44,7 @@ class FileBasedSource(iobase.BoundedSource):
def __init__(self,
file_pattern,
min_bundle_size=0,
- compression_type=fileio.CompressionTypes.AUTO,
+ compression_type=CompressionTypes.AUTO,
splittable=True,
validate=True):
"""Initializes ``FileBasedSource``.
@@ -81,14 +78,15 @@ class FileBasedSource(iobase.BoundedSource):
(self.__class__.__name__, file_pattern))
self._pattern = file_pattern
+ self._file_system = get_filesystem(file_pattern)
self._concat_source = None
self._min_bundle_size = min_bundle_size
- if not fileio.CompressionTypes.is_valid_compression_type(compression_type):
+ if not CompressionTypes.is_valid_compression_type(compression_type):
raise TypeError('compression_type must be CompressionType object but '
'was %s' % type(compression_type))
self._compression_type = compression_type
- if compression_type in (fileio.CompressionTypes.UNCOMPRESSED,
- fileio.CompressionTypes.AUTO):
+ if compression_type in (CompressionTypes.UNCOMPRESSED,
+ CompressionTypes.AUTO):
self._splittable = splittable
else:
# We can't split compressed files efficiently so turn off splitting.
@@ -105,9 +103,8 @@ class FileBasedSource(iobase.BoundedSource):
def _get_concat_source(self):
if self._concat_source is None:
single_file_sources = []
- file_names = [f for f in fileio.ChannelFactory.glob(self._pattern)]
- sizes = FileBasedSource._estimate_sizes_of_files(file_names,
- self._pattern)
+ match_result = self._file_system.match([self._pattern])[0]
+ files_metadata = match_result.metadata_list
# We create a reference for FileBasedSource that will be serialized along
# with each _SingleFileSource. To prevent this FileBasedSource from having
@@ -115,23 +112,25 @@ class FileBasedSource(iobase.BoundedSource):
# we clone it here.
file_based_source_ref = pickler.loads(pickler.dumps(self))
- for index, file_name in enumerate(file_names):
- if sizes[index] == 0:
+ for file_metadata in files_metadata:
+ file_name = file_metadata.path
+ file_size = file_metadata.size_in_bytes
+ if file_size == 0:
continue # Ignoring empty file.
# We determine splittability of this specific file.
splittable = self.splittable
if (splittable and
- self._compression_type == fileio.CompressionTypes.AUTO):
- compression_type = fileio.CompressionTypes.detect_compression_type(
+ self._compression_type == CompressionTypes.AUTO):
+ compression_type = CompressionTypes.detect_compression_type(
file_name)
- if compression_type != fileio.CompressionTypes.UNCOMPRESSED:
+ if compression_type != CompressionTypes.UNCOMPRESSED:
splittable = False
single_file_source = _SingleFileSource(
file_based_source_ref, file_name,
0,
- sizes[index],
+ file_size,
min_bundle_size=self._min_bundle_size,
splittable=splittable)
single_file_sources.append(single_file_source)
@@ -139,36 +138,16 @@ class FileBasedSource(iobase.BoundedSource):
return self._concat_source
def open_file(self, file_name):
- return fileio.ChannelFactory.open(
- file_name, 'rb', 'application/octet-stream',
+ return get_filesystem(file_name).open(
+ file_name, 'application/octet-stream',
compression_type=self._compression_type)
- @staticmethod
- def _estimate_sizes_of_files(file_names, pattern=None):
- """Returns the size of all the files as an ordered list based on the file
- names that are provided here. If the pattern is specified here then we use
- the size_of_files_in_glob method to get the size of files matching the glob
- for performance improvements instead of getting the size one by one.
- """
- if not file_names:
- return []
- elif len(file_names) == 1:
- return [fileio.ChannelFactory.size_in_bytes(file_names[0])]
- else:
- if pattern is None:
- return util.run_using_threadpool(
- fileio.ChannelFactory.size_in_bytes, file_names,
- MAX_NUM_THREADS_FOR_SIZE_ESTIMATION)
- else:
- file_sizes = fileio.ChannelFactory.size_of_files_in_glob(pattern,
- file_names)
- return [file_sizes[f] for f in file_names]
-
def _validate(self):
"""Validate if there are actual files in the specified glob pattern
"""
# Limit the responses as we only want to check if something exists
- if len(fileio.ChannelFactory.glob(self._pattern, limit=1)) <= 0:
+ match_result = self._file_system.match([self._pattern], limits=[1])[0]
+ if len(match_result.metadata_list) <= 0:
raise IOError(
'No files found based on the file pattern %s' % self._pattern)
@@ -180,24 +159,8 @@ class FileBasedSource(iobase.BoundedSource):
stop_position=stop_position)
def estimate_size(self):
- file_names = [f for f in fileio.ChannelFactory.glob(self._pattern)]
- # We're reading very few files so we can pass names file names to
- # _estimate_sizes_of_files without pattern as otherwise we'll try to do
- # optimization based on the pattern and might end up reading much more
- # data than needed for a few files.
- if (len(file_names) <=
- FileBasedSource.MIN_NUMBER_OF_FILES_TO_STAT):
- return sum(self._estimate_sizes_of_files(file_names))
- else:
- # Estimating size of a random sample.
- # TODO: better support distributions where file sizes are not
- # approximately equal.
- sample_size = max(FileBasedSource.MIN_NUMBER_OF_FILES_TO_STAT,
- int(len(file_names) *
- FileBasedSource.MIN_FRACTION_OF_FILES_TO_STAT))
- sample = random.sample(file_names, sample_size)
- estimate = self._estimate_sizes_of_files(sample)
- return int(sum(estimate) * (float(len(file_names)) / len(sample)))
+ match_result = self._file_system.match([self._pattern])[0]
+ return sum([f.size_in_bytes for f in match_result.metadata_list])
def read(self, range_tracker):
return self._get_concat_source().read(range_tracker)
http://git-wip-us.apache.org/repos/asf/beam/blob/1bc240bc/sdks/python/apache_beam/io/filebasedsource_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource_test.py b/sdks/python/apache_beam/io/filebasedsource_test.py
index 7481c4c..7b7ec8a 100644
--- a/sdks/python/apache_beam/io/filebasedsource_test.py
+++ b/sdks/python/apache_beam/io/filebasedsource_test.py
@@ -29,9 +29,9 @@ import hamcrest as hc
import apache_beam as beam
from apache_beam.io import filebasedsource
-from apache_beam.io import fileio
from apache_beam.io import iobase
from apache_beam.io import range_trackers
+from apache_beam.io.filesystem import CompressionTypes
# importing following private classes for testing
from apache_beam.io.concat_source import ConcatSource
@@ -409,7 +409,7 @@ class TestFileBasedSource(unittest.TestCase):
pcoll = pipeline | 'Read' >> beam.Read(LineSource(
filename,
splittable=False,
- compression_type=fileio.CompressionTypes.BZIP2))
+ compression_type=CompressionTypes.BZIP2))
assert_that(pcoll, equal_to(lines))
pipeline.run()
@@ -424,7 +424,7 @@ class TestFileBasedSource(unittest.TestCase):
pcoll = pipeline | 'Read' >> beam.Read(LineSource(
filename,
splittable=False,
- compression_type=fileio.CompressionTypes.GZIP))
+ compression_type=CompressionTypes.GZIP))
assert_that(pcoll, equal_to(lines))
pipeline.run()
@@ -442,7 +442,7 @@ class TestFileBasedSource(unittest.TestCase):
pcoll = pipeline | 'Read' >> beam.Read(LineSource(
file_pattern,
splittable=False,
- compression_type=fileio.CompressionTypes.BZIP2))
+ compression_type=CompressionTypes.BZIP2))
assert_that(pcoll, equal_to(lines))
pipeline.run()
@@ -461,7 +461,7 @@ class TestFileBasedSource(unittest.TestCase):
pcoll = pipeline | 'Read' >> beam.Read(LineSource(
file_pattern,
splittable=False,
- compression_type=fileio.CompressionTypes.GZIP))
+ compression_type=CompressionTypes.GZIP))
assert_that(pcoll, equal_to(lines))
pipeline.run()
@@ -475,7 +475,7 @@ class TestFileBasedSource(unittest.TestCase):
pipeline = TestPipeline()
pcoll = pipeline | 'Read' >> beam.Read(LineSource(
filename,
- compression_type=fileio.CompressionTypes.AUTO))
+ compression_type=CompressionTypes.AUTO))
assert_that(pcoll, equal_to(lines))
pipeline.run()
@@ -489,7 +489,7 @@ class TestFileBasedSource(unittest.TestCase):
pipeline = TestPipeline()
pcoll = pipeline | 'Read' >> beam.Read(LineSource(
filename,
- compression_type=fileio.CompressionTypes.AUTO))
+ compression_type=CompressionTypes.AUTO))
assert_that(pcoll, equal_to(lines))
pipeline.run()
@@ -508,7 +508,7 @@ class TestFileBasedSource(unittest.TestCase):
pipeline = TestPipeline()
pcoll = pipeline | 'Read' >> beam.Read(LineSource(
file_pattern,
- compression_type=fileio.CompressionTypes.AUTO))
+ compression_type=CompressionTypes.AUTO))
assert_that(pcoll, equal_to(lines))
pipeline.run()
@@ -530,7 +530,7 @@ class TestFileBasedSource(unittest.TestCase):
pipeline = TestPipeline()
pcoll = pipeline | 'Read' >> beam.Read(LineSource(
file_pattern,
- compression_type=fileio.CompressionTypes.AUTO))
+ compression_type=CompressionTypes.AUTO))
assert_that(pcoll, equal_to(lines))
pipeline.run()
http://git-wip-us.apache.org/repos/asf/beam/blob/1bc240bc/sdks/python/apache_beam/io/fileio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py
index 49a2082..0759ce4 100644
--- a/sdks/python/apache_beam/io/fileio.py
+++ b/sdks/python/apache_beam/io/fileio.py
@@ -18,146 +18,40 @@
from __future__ import absolute_import
-import bz2
-import cStringIO
-import glob
import logging
import os
import re
-import shutil
import time
-import zlib
from apache_beam.internal import util
from apache_beam.io import iobase
+from apache_beam.io.filesystem import BeamIOError
+from apache_beam.io.filesystem import CompressedFile as _CompressedFile
+from apache_beam.io.filesystem import CompressionTypes
+from apache_beam.io.filesystems_util import get_filesystem
from apache_beam.transforms.display import DisplayDataItem
-# TODO(sourabhbajaj): Fix the constant values after the new IO factory
-# Current constants are copy pasted from gcsio.py till we fix this.
-# Protect against environments where apitools library is not available.
-# pylint: disable=wrong-import-order, wrong-import-position
-try:
- from apache_beam.io.gcp import gcsio
- DEFAULT_READ_BUFFER_SIZE = gcsio.DEFAULT_READ_BUFFER_SIZE
- MAX_BATCH_OPERATION_SIZE = gcsio.MAX_BATCH_OPERATION_SIZE
-except ImportError:
- class FakeGcsIO(object):
- def __getattr__(self, attr):
- raise ImportError(
- 'Google Cloud Platform IO not available, '
- 'please install apache_beam[gcp]')
- gcsio = FakeGcsIO()
- DEFAULT_READ_BUFFER_SIZE = 16 * 1024 * 1024
- MAX_BATCH_OPERATION_SIZE = 100
-# pylint: enable=wrong-import-order, wrong-import-position
-
-
+MAX_BATCH_OPERATION_SIZE = 100
DEFAULT_SHARD_NAME_TEMPLATE = '-SSSSS-of-NNNNN'
-class _CompressionType(object):
- """Object representing single compression type."""
-
- def __init__(self, identifier):
- self.identifier = identifier
-
- def __eq__(self, other):
- return (isinstance(other, _CompressionType) and
- self.identifier == other.identifier)
-
- def __hash__(self):
- return hash(self.identifier)
-
- def __ne__(self, other):
- return not self.__eq__(other)
-
- def __str__(self):
- return self.identifier
-
- def __repr__(self):
- return '_CompressionType(%s)' % self.identifier
-
-
-class CompressionTypes(object):
- """Enum-like class representing known compression types."""
-
- # Detect compression based on filename extension.
- #
- # The following extensions are currently recognized by auto-detection:
- # .bz2 (implies BZIP2 as described below).
- # .gz (implies GZIP as described below)
- # Any non-recognized extension implies UNCOMPRESSED as described below.
- AUTO = _CompressionType('auto')
-
- # BZIP2 compression.
- BZIP2 = _CompressionType('bzip2')
-
- # GZIP compression (deflate with GZIP headers).
- GZIP = _CompressionType('gzip')
-
- # Uncompressed (i.e., may be split).
- UNCOMPRESSED = _CompressionType('uncompressed')
-
- @classmethod
- def is_valid_compression_type(cls, compression_type):
- """Returns true for valid compression types, false otherwise."""
- return isinstance(compression_type, _CompressionType)
-
- @classmethod
- def mime_type(cls, compression_type, default='application/octet-stream'):
- mime_types_by_compression_type = {
- cls.BZIP2: 'application/x-bz2',
- cls.GZIP: 'application/x-gzip',
- }
- return mime_types_by_compression_type.get(compression_type, default)
-
- @classmethod
- def detect_compression_type(cls, file_path):
- """Returns the compression type of a file (based on its suffix)."""
- compression_types_by_suffix = {'.bz2': cls.BZIP2, '.gz': cls.GZIP}
- lowercased_path = file_path.lower()
- for suffix, compression_type in compression_types_by_suffix.iteritems():
- if lowercased_path.endswith(suffix):
- return compression_type
- return cls.UNCOMPRESSED
-
-
+# TODO(sourabhbajaj): Remove this after BFS API is used everywhere
class ChannelFactory(object):
- # TODO: Generalize into extensible framework.
-
@staticmethod
def mkdir(path):
- if path.startswith('gs://'):
- return
- else:
- try:
- os.makedirs(path)
- except OSError as err:
- raise IOError(err)
+ bfs = get_filesystem(path)
+ bfs.mkdirs(path)
@staticmethod
def open(path,
mode,
mime_type='application/octet-stream',
compression_type=CompressionTypes.AUTO):
- if compression_type == CompressionTypes.AUTO:
- compression_type = CompressionTypes.detect_compression_type(path)
- elif not CompressionTypes.is_valid_compression_type(compression_type):
- raise TypeError('compression_type must be CompressionType object but '
- 'was %s' % type(compression_type))
-
- if path.startswith('gs://'):
- raw_file = gcsio.GcsIO().open(
- path,
- mode,
- mime_type=CompressionTypes.mime_type(compression_type, mime_type))
- else:
- raw_file = open(path, mode)
-
- if compression_type == CompressionTypes.UNCOMPRESSED:
- return raw_file
- else:
- return _CompressedFile(raw_file, compression_type=compression_type)
+ bfs = get_filesystem(path)
+ if mode == 'rb':
+ return bfs.open(path, mime_type, compression_type)
+ elif mode == 'wb':
+ return bfs.create(path, mime_type, compression_type)
@staticmethod
def is_compressed(fileobj):
@@ -165,358 +59,66 @@ class ChannelFactory(object):
@staticmethod
def rename(src, dest):
- if src.startswith('gs://'):
- if not dest.startswith('gs://'):
- raise ValueError('Destination %r must be GCS path.', dest)
- gcsio.GcsIO().rename(src, dest)
- else:
- try:
- os.rename(src, dest)
- except OSError as err:
- raise IOError(err)
+ bfs = get_filesystem(path)
+ bfs.rename([src], [dest])
@staticmethod
def rename_batch(src_dest_pairs):
- # Filter out local and GCS operations.
- local_src_dest_pairs = []
- gcs_src_dest_pairs = []
- for src, dest in src_dest_pairs:
- if src.startswith('gs://'):
- if not dest.startswith('gs://'):
- raise ValueError('Destination %r must be GCS path.', dest)
- gcs_src_dest_pairs.append((src, dest))
- else:
- local_src_dest_pairs.append((src, dest))
-
- # Execute local operations.
- exceptions = []
- for src, dest in local_src_dest_pairs:
- try:
- ChannelFactory.rename(src, dest)
- except Exception as e: # pylint: disable=broad-except
- exceptions.append((src, dest, e))
-
- # Execute GCS operations.
- exceptions += ChannelFactory._rename_gcs_batch(gcs_src_dest_pairs)
-
- return exceptions
-
- @staticmethod
- def _rename_gcs_batch(src_dest_pairs):
- # Prepare batches.
- gcs_batches = []
- gcs_current_batch = []
- for src, dest in src_dest_pairs:
- gcs_current_batch.append((src, dest))
- if len(gcs_current_batch) == MAX_BATCH_OPERATION_SIZE:
- gcs_batches.append(gcs_current_batch)
- gcs_current_batch = []
- if gcs_current_batch:
- gcs_batches.append(gcs_current_batch)
-
- # Execute GCS renames if any and return exceptions.
- exceptions = []
- for batch in gcs_batches:
- copy_statuses = gcsio.GcsIO().copy_batch(batch)
- copy_succeeded = []
- for src, dest, exception in copy_statuses:
- if exception:
- exceptions.append((src, dest, exception))
- else:
- copy_succeeded.append((src, dest))
- delete_batch = [src for src, dest in copy_succeeded]
- delete_statuses = gcsio.GcsIO().delete_batch(delete_batch)
- for i, (src, exception) in enumerate(delete_statuses):
- dest = copy_succeeded[i]
- if exception:
- exceptions.append((src, dest, exception))
- return exceptions
+ sources = [s for s, _ in src_dest_pairs]
+ destinations = [d for _, d in src_dest_pairs]
+ bfs = get_filesystem()
+ try:
+ bfs.rename(sources, destinations)
+ return []
+ except BeamIOError as exp:
+ return [(s, d, e) for (s, d), e in exp.exception_details.iteritems()]
@staticmethod
def copytree(src, dest):
- if src.startswith('gs://'):
- if not dest.startswith('gs://'):
- raise ValueError('Destination %r must be GCS path.', dest)
- assert src.endswith('/'), src
- assert dest.endswith('/'), dest
- gcsio.GcsIO().copytree(src, dest)
- else:
- try:
- if os.path.exists(dest):
- shutil.rmtree(dest)
- shutil.copytree(src, dest)
- except OSError as err:
- raise IOError(err)
+ bfs = get_filesystem()
+ bfs.copy([src], [dest])
@staticmethod
def exists(path):
- if path.startswith('gs://'):
- return gcsio.GcsIO().exists(path)
- else:
- return os.path.exists(path)
+ bfs = get_filesystem(path)
+ bfs.exists(path)
@staticmethod
def rmdir(path):
- if path.startswith('gs://'):
- gcs = gcsio.GcsIO()
- if not path.endswith('/'):
- path += '/'
- # TODO: Threadpool?
- for entry in gcs.glob(path + '*'):
- gcs.delete(entry)
- else:
- try:
- shutil.rmtree(path)
- except OSError as err:
- raise IOError(err)
+ bfs = get_filesystem(path)
+ bfs.delete([path])
@staticmethod
def rm(path):
- if path.startswith('gs://'):
- gcsio.GcsIO().delete(path)
- else:
- try:
- os.remove(path)
- except OSError as err:
- raise IOError(err)
+ bfs = get_filesystem(path)
+ bfs.delete([path])
@staticmethod
def glob(path, limit=None):
- if path.startswith('gs://'):
- return gcsio.GcsIO().glob(path, limit)
- else:
- files = glob.glob(path)
- return files[:limit]
+ bfs = get_filesystem(path)
+ match_result = bfs.match([path], [limit])[0]
+ return [f.path for f in match_result.metadata_list]
@staticmethod
def size_in_bytes(path):
- """Returns the size of a file in bytes.
-
- Args:
- path: a string that gives the path of a single file.
- """
- if path.startswith('gs://'):
- return gcsio.GcsIO().size(path)
- else:
- return os.path.getsize(path)
+ bfs = get_filesystem(path)
+ match_result = bfs.match([path], [limit])[0]
+ return [f.size_in_bytes for f in match_result.metadata_list][0]
@staticmethod
def size_of_files_in_glob(path, file_names=None):
- """Returns a map of file names to sizes.
-
- Args:
- path: a file path pattern that reads the size of all the files
- file_names: List of file names that we need size for, this is added to
- support eventually consistent sources where two expantions of glob
- might yield to different files.
- """
- if path.startswith('gs://'):
- file_sizes = gcsio.GcsIO().size_of_files_in_glob(path)
- if file_names is None:
- return file_sizes
- else:
- result = {}
- # We need to make sure we fetched the size for all the files as the
- # list API in GCS is eventually consistent so directly call size for
- # any files that may be missing.
- for file_name in file_names:
- if file_name in file_sizes:
- result[file_name] = file_sizes[file_name]
- else:
- result[file_name] = ChannelFactory.size_in_bytes(file_name)
- return result
- else:
- if file_names is None:
- file_names = ChannelFactory.glob(path)
- return {file_name: ChannelFactory.size_in_bytes(file_name)
- for file_name in file_names}
-
-
-class _CompressedFile(object):
- """Somewhat limited file wrapper for easier handling of compressed files."""
-
- # The bit mask to use for the wbits parameters of the zlib compressor and
- # decompressor objects.
- _gzip_mask = zlib.MAX_WBITS | 16 # Mask when using GZIP headers.
-
- def __init__(self,
- fileobj,
- compression_type=CompressionTypes.GZIP,
- read_size=DEFAULT_READ_BUFFER_SIZE):
- if not fileobj:
- raise ValueError('File object must be opened file but was at %s' %
- fileobj)
-
- if not CompressionTypes.is_valid_compression_type(compression_type):
- raise TypeError('compression_type must be CompressionType object but '
- 'was %s' % type(compression_type))
- if compression_type in (CompressionTypes.AUTO, CompressionTypes.UNCOMPRESSED
- ):
- raise ValueError(
- 'Cannot create object with unspecified or no compression')
-
- self._file = fileobj
- self._compression_type = compression_type
-
- if self._file.tell() != 0:
- raise ValueError('File object must be at position 0 but was %d' %
- self._file.tell())
- self._uncompressed_position = 0
-
- if self.readable():
- self._read_size = read_size
- self._read_buffer = cStringIO.StringIO()
- self._read_position = 0
- self._read_eof = False
-
- if self._compression_type == CompressionTypes.BZIP2:
- self._decompressor = bz2.BZ2Decompressor()
- else:
- assert self._compression_type == CompressionTypes.GZIP
- self._decompressor = zlib.decompressobj(self._gzip_mask)
- else:
- self._decompressor = None
-
- if self.writeable():
- if self._compression_type == CompressionTypes.BZIP2:
- self._compressor = bz2.BZ2Compressor()
- else:
- assert self._compression_type == CompressionTypes.GZIP
- self._compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION,
- zlib.DEFLATED, self._gzip_mask)
- else:
- self._compressor = None
-
- def readable(self):
- mode = self._file.mode
- return 'r' in mode or 'a' in mode
-
- def writeable(self):
- mode = self._file.mode
- return 'w' in mode or 'a' in mode
-
- def write(self, data):
- """Write data to file."""
- if not self._compressor:
- raise ValueError('compressor not initialized')
- self._uncompressed_position += len(data)
- compressed = self._compressor.compress(data)
- if compressed:
- self._file.write(compressed)
-
- def _fetch_to_internal_buffer(self, num_bytes):
- """Fetch up to num_bytes into the internal buffer."""
- if (not self._read_eof and self._read_position > 0 and
- (self._read_buffer.tell() - self._read_position) < num_bytes):
- # There aren't enough number of bytes to accommodate a read, so we
- # prepare for a possibly large read by clearing up all internal buffers
- # but without dropping any previous held data.
- self._read_buffer.seek(self._read_position)
- data = self._read_buffer.read()
- self._read_position = 0
- self._read_buffer.seek(0)
- self._read_buffer.truncate(0)
- self._read_buffer.write(data)
-
- while not self._read_eof and (self._read_buffer.tell() - self._read_position
- ) < num_bytes:
- # Continue reading from the underlying file object until enough bytes are
- # available, or EOF is reached.
- buf = self._file.read(self._read_size)
- if buf:
- decompressed = self._decompressor.decompress(buf)
- del buf # Free up some possibly large and no-longer-needed memory.
- self._read_buffer.write(decompressed)
- else:
- # EOF reached.
- # Verify completeness and no corruption and flush (if needed by
- # the underlying algorithm).
- if self._compression_type == CompressionTypes.BZIP2:
- # Having unused_data past end of stream would imply file corruption.
- assert not self._decompressor.unused_data, 'Possible file corruption.'
- try:
- # EOF implies that the underlying BZIP2 stream must also have
- # reached EOF. We expect this to raise an EOFError and we catch it
- # below. Any other kind of error though would be problematic.
- self._decompressor.decompress('dummy')
- assert False, 'Possible file corruption.'
- except EOFError:
- pass # All is as expected!
- else:
- self._read_buffer.write(self._decompressor.flush())
-
- # Record that we have hit the end of file, so we won't unnecessarily
- # repeat the completeness verification step above.
- self._read_eof = True
-
- def _read_from_internal_buffer(self, read_fn):
- """Read from the internal buffer by using the supplied read_fn."""
- self._read_buffer.seek(self._read_position)
- result = read_fn()
- self._read_position += len(result)
- self._uncompressed_position += len(result)
- self._read_buffer.seek(0, os.SEEK_END) # Allow future writes.
- return result
-
- def read(self, num_bytes):
- if not self._decompressor:
- raise ValueError('decompressor not initialized')
-
- self._fetch_to_internal_buffer(num_bytes)
- return self._read_from_internal_buffer(
- lambda: self._read_buffer.read(num_bytes))
-
- def readline(self):
- """Equivalent to standard file.readline(). Same return conventions apply."""
- if not self._decompressor:
- raise ValueError('decompressor not initialized')
-
- io = cStringIO.StringIO()
- while True:
- # Ensure that the internal buffer has at least half the read_size. Going
- # with half the _read_size (as opposed to a full _read_size) to ensure
- # that actual fetches are more evenly spread out, as opposed to having 2
- # consecutive reads at the beginning of a read.
- self._fetch_to_internal_buffer(self._read_size / 2)
- line = self._read_from_internal_buffer(
- lambda: self._read_buffer.readline())
- io.write(line)
- if line.endswith('\n') or not line:
- break # Newline or EOF reached.
-
- return io.getvalue()
-
- def closed(self):
- return not self._file or self._file.closed()
-
- def close(self):
- if self.readable():
- self._read_buffer.close()
-
- if self.writeable():
- self._file.write(self._compressor.flush())
+ bfs = get_filesystem(path)
+ match_result = bfs.match([path], [limit])[0]
+ part_files = {f.path:f.size_in_bytes for f in match_result.metadata_list}
- self._file.close()
+ if file_names is not None:
+ specific_files = {}
+ match_results = bfs.match(file_names)
+ for match_result in match_results:
+ for metadata in match_result.metadata_list:
+ specific_files[metadata.path] = metadata.size_in_bytes
- def flush(self):
- if self.writeable():
- self._file.write(self._compressor.flush())
- self._file.flush()
-
- @property
- def seekable(self):
- # TODO: Add support for seeking to a file position.
- return False
-
- def tell(self):
- """Returns current position in uncompressed file."""
- return self._uncompressed_position
-
- def __enter__(self):
- return self
-
- def __exit__(self, exception_type, exception_value, traceback):
- self.close()
+ return part_files.update(specific_files)
class FileSink(iobase.Sink):
@@ -570,6 +172,7 @@ class FileSink(iobase.Sink):
self.shard_name_format = self._template_to_format(shard_name_template)
self.compression_type = compression_type
self.mime_type = mime_type
+ self._file_system = get_filesystem(file_path_prefix)
def display_data(self):
return {'shards':
@@ -589,11 +192,8 @@ class FileSink(iobase.Sink):
The returned file handle is passed to ``write_[encoded_]record`` and
``close``.
"""
- return ChannelFactory.open(
- temp_path,
- 'wb',
- mime_type=self.mime_type,
- compression_type=self.compression_type)
+ return self._file_system.create(temp_path, self.mime_type,
+ self.compression_type)
def write_record(self, file_handle, value):
"""Writes a single record go the file handle returned by ``open()``.
@@ -621,7 +221,7 @@ class FileSink(iobase.Sink):
def initialize_write(self):
tmp_dir = self.file_path_prefix + self.file_name_suffix + time.strftime(
'-temp-%Y-%m-%d_%H-%M-%S')
- ChannelFactory().mkdir(tmp_dir)
+ self._file_system.mkdirs(tmp_dir)
return tmp_dir
def open_writer(self, init_result, uid):
@@ -639,75 +239,79 @@ class FileSink(iobase.Sink):
min_threads = min(num_shards, FileSink._MAX_RENAME_THREADS)
num_threads = max(1, min_threads)
- rename_ops = []
+ source_files = []
+ destination_files = []
for shard_num, shard in enumerate(writer_results):
final_name = ''.join([
self.file_path_prefix, self.shard_name_format % dict(
shard_num=shard_num, num_shards=num_shards), self.file_name_suffix
])
- rename_ops.append((shard, final_name))
-
- batches = []
- current_batch = []
- for rename_op in rename_ops:
- current_batch.append(rename_op)
- if len(current_batch) == MAX_BATCH_OPERATION_SIZE:
- batches.append(current_batch)
- current_batch = []
- if current_batch:
- batches.append(current_batch)
+ source_files.append(shard)
+ destination_files.append(final_name)
+
+ source_file_batch = [source_files[i:i + MAX_BATCH_OPERATION_SIZE]
+ for i in xrange(0, len(source_files),
+ MAX_BATCH_OPERATION_SIZE)]
+ destination_file_batch = [destination_files[i:i + MAX_BATCH_OPERATION_SIZE]
+ for i in xrange(0, len(destination_files),
+ MAX_BATCH_OPERATION_SIZE)]
logging.info(
'Starting finalize_write threads with num_shards: %d, '
'batches: %d, num_threads: %d',
- num_shards, len(batches), num_threads)
+ num_shards, len(source_file_batch), num_threads)
start_time = time.time()
# Use a thread pool for renaming operations.
def _rename_batch(batch):
"""_rename_batch executes batch rename operations."""
+ source_files, destination_files = batch
exceptions = []
- exception_infos = ChannelFactory.rename_batch(batch)
- for src, dest, exception in exception_infos:
- if exception:
- logging.warning('Rename not successful: %s -> %s, %s', src, dest,
- exception)
- should_report = True
- if isinstance(exception, IOError):
- # May have already been copied.
- try:
- if ChannelFactory.exists(dest):
- should_report = False
- except Exception as exists_e: # pylint: disable=broad-except
- logging.warning('Exception when checking if file %s exists: '
- '%s', dest, exists_e)
- if should_report:
- logging.warning(('Exception in _rename_batch. src: %s, '
- 'dest: %s, err: %s'), src, dest, exception)
- exceptions.append(exception)
- else:
- logging.debug('Rename successful: %s -> %s', src, dest)
- return exceptions
+ try:
+ self._file_system.rename(source_files, destination_files)
+ return exceptions
+ except BeamIOError as exp:
+ if exp.exception_details is None:
+ raise exp
+ for (src, dest), exception in exp.exception_details.iteritems():
+ if exception:
+ logging.warning('Rename not successful: %s -> %s, %s', src, dest,
+ exception)
+ should_report = True
+ if isinstance(exception, IOError):
+ # May have already been copied.
+ try:
+ if self._file_system.exists(dest):
+ should_report = False
+ except Exception as exists_e: # pylint: disable=broad-except
+ logging.warning('Exception when checking if file %s exists: '
+ '%s', dest, exists_e)
+ if should_report:
+ logging.warning(('Exception in _rename_batch. src: %s, '
+ 'dest: %s, err: %s'), src, dest, exception)
+ exceptions.append(exception)
+ else:
+ logging.debug('Rename successful: %s -> %s', src, dest)
+ return exceptions
exception_batches = util.run_using_threadpool(
- _rename_batch, batches, num_threads)
+ _rename_batch, zip(source_file_batch, destination_file_batch),
+ num_threads)
- all_exceptions = []
- for exceptions in exception_batches:
- if exceptions:
- all_exceptions += exceptions
+ all_exceptions = [e for exception_batch in exception_batches
+ for e in exception_batch]
if all_exceptions:
raise Exception('Encountered exceptions in finalize_write: %s',
all_exceptions)
- for shard, final_name in rename_ops:
+ for final_name in destination_files:
yield final_name
logging.info('Renamed %d shards in %.2f seconds.', num_shards,
time.time() - start_time)
try:
- ChannelFactory.rmdir(init_result)
+ self._file_system.delete([init_result])
except IOError:
# May have already been removed.
pass
http://git-wip-us.apache.org/repos/asf/beam/blob/1bc240bc/sdks/python/apache_beam/io/fileio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio_test.py b/sdks/python/apache_beam/io/fileio_test.py
index a963c67..6b7437d 100644
--- a/sdks/python/apache_beam/io/fileio_test.py
+++ b/sdks/python/apache_beam/io/fileio_test.py
@@ -16,7 +16,7 @@
# limitations under the License.
#
-"""Unit tests for local and GCS sources and sinks."""
+"""Unit tests for file sinks."""
import glob
import logging
@@ -26,60 +26,16 @@ import tempfile
import unittest
import hamcrest as hc
-import mock
import apache_beam as beam
from apache_beam import coders
from apache_beam.io import fileio
+from apache_beam.io.filesystem import CompressedFile
from apache_beam.test_pipeline import TestPipeline
from apache_beam.transforms.display import DisplayData
from apache_beam.transforms.display_test import DisplayDataItemMatcher
-class TestChannelFactory(unittest.TestCase):
-
- @mock.patch('apache_beam.io.fileio.gcsio')
- def test_size_of_files_in_glob_complete(self, *unused_args):
- # Prepare mocks.
- gcsio_mock = mock.MagicMock()
- fileio.gcsio.GcsIO = lambda: gcsio_mock
- file_names = ['gs://bucket/file1', 'gs://bucket/file2']
- gcsio_mock.size_of_files_in_glob.return_value = {
- 'gs://bucket/file1': 1,
- 'gs://bucket/file2': 2
- }
- expected_results = {
- 'gs://bucket/file1': 1,
- 'gs://bucket/file2': 2
- }
- self.assertEqual(
- fileio.ChannelFactory.size_of_files_in_glob(
- 'gs://bucket/*', file_names),
- expected_results)
- gcsio_mock.size_of_files_in_glob.assert_called_once_with('gs://bucket/*')
-
- @mock.patch('apache_beam.io.fileio.gcsio')
- def test_size_of_files_in_glob_incomplete(self, *unused_args):
- # Prepare mocks.
- gcsio_mock = mock.MagicMock()
- fileio.gcsio.GcsIO = lambda: gcsio_mock
- file_names = ['gs://bucket/file1', 'gs://bucket/file2']
- gcsio_mock.size_of_files_in_glob.return_value = {
- 'gs://bucket/file1': 1
- }
- gcsio_mock.size.return_value = 2
- expected_results = {
- 'gs://bucket/file1': 1,
- 'gs://bucket/file2': 2
- }
- self.assertEqual(
- fileio.ChannelFactory.size_of_files_in_glob(
- 'gs://bucket/*', file_names),
- expected_results)
- gcsio_mock.size_of_files_in_glob.assert_called_once_with('gs://bucket/*')
- gcsio_mock.size.assert_called_once_with('gs://bucket/file2')
-
-
# TODO: Refactor code so all io tests are using same library
# TestCaseWithTempDirCleanup class.
class _TestCaseWithTempDirCleanUp(unittest.TestCase):
@@ -115,16 +71,16 @@ class _TestCaseWithTempDirCleanUp(unittest.TestCase):
class TestCompressedFile(_TestCaseWithTempDirCleanUp):
def test_seekable(self):
- readable = fileio._CompressedFile(open(self._create_temp_file(), 'r'))
+ readable = CompressedFile(open(self._create_temp_file(), 'r'))
self.assertFalse(readable.seekable)
- writeable = fileio._CompressedFile(open(self._create_temp_file(), 'w'))
+ writeable = CompressedFile(open(self._create_temp_file(), 'w'))
self.assertFalse(writeable.seekable)
def test_tell(self):
lines = ['line%d\n' % i for i in range(10)]
tmpfile = self._create_temp_file()
- writeable = fileio._CompressedFile(open(tmpfile, 'w'))
+ writeable = CompressedFile(open(tmpfile, 'w'))
current_offset = 0
for line in lines:
writeable.write(line)
@@ -132,7 +88,7 @@ class TestCompressedFile(_TestCaseWithTempDirCleanUp):
self.assertEqual(current_offset, writeable.tell())
writeable.close()
- readable = fileio._CompressedFile(open(tmpfile))
+ readable = CompressedFile(open(tmpfile))
current_offset = 0
while True:
line = readable.readline()
@@ -300,52 +256,6 @@ class TestFileSink(_TestCaseWithTempDirCleanUp):
with self.assertRaises(Exception):
list(sink.finalize_write(init_token, [res1, res2]))
- @mock.patch('apache_beam.io.fileio.ChannelFactory.rename')
- @mock.patch('apache_beam.io.fileio.gcsio')
- def test_rename_batch(self, *unused_args):
- # Prepare mocks.
- gcsio_mock = mock.MagicMock()
- fileio.gcsio.GcsIO = lambda: gcsio_mock
- fileio.ChannelFactory.rename = mock.MagicMock()
- to_rename = [
- ('gs://bucket/from1', 'gs://bucket/to1'),
- ('gs://bucket/from2', 'gs://bucket/to2'),
- ('/local/from1', '/local/to1'),
- ('gs://bucket/from3', 'gs://bucket/to3'),
- ('/local/from2', '/local/to2'),
- ]
- gcsio_mock.copy_batch.side_effect = [[
- ('gs://bucket/from1', 'gs://bucket/to1', None),
- ('gs://bucket/from2', 'gs://bucket/to2', None),
- ('gs://bucket/from3', 'gs://bucket/to3', None),
- ]]
- gcsio_mock.delete_batch.side_effect = [[
- ('gs://bucket/from1', None),
- ('gs://bucket/from2', None),
- ('gs://bucket/from3', None),
- ]]
-
- # Issue batch rename.
- fileio.ChannelFactory.rename_batch(to_rename)
-
- # Verify mocks.
- expected_local_rename_calls = [
- mock.call('/local/from1', '/local/to1'),
- mock.call('/local/from2', '/local/to2'),
- ]
- self.assertEqual(fileio.ChannelFactory.rename.call_args_list,
- expected_local_rename_calls)
- gcsio_mock.copy_batch.assert_called_once_with([
- ('gs://bucket/from1', 'gs://bucket/to1'),
- ('gs://bucket/from2', 'gs://bucket/to2'),
- ('gs://bucket/from3', 'gs://bucket/to3'),
- ])
- gcsio_mock.delete_batch.assert_called_once_with([
- 'gs://bucket/from1',
- 'gs://bucket/from2',
- 'gs://bucket/from3',
- ])
-
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
http://git-wip-us.apache.org/repos/asf/beam/blob/1bc240bc/sdks/python/apache_beam/io/filesystem.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filesystem.py b/sdks/python/apache_beam/io/filesystem.py
new file mode 100644
index 0000000..14493c0
--- /dev/null
+++ b/sdks/python/apache_beam/io/filesystem.py
@@ -0,0 +1,439 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""File system abstraction for file-based sources and sinks."""
+
+from __future__ import absolute_import
+
+import abc
+import bz2
+import cStringIO
+import os
+import zlib
+
+
+DEFAULT_READ_BUFFER_SIZE = 16 * 1024 * 1024
+
+
+class CompressionTypes(object):
+ """Enum-like class representing known compression types."""
+
+ # Detect compression based on filename extension.
+ #
+ # The following extensions are currently recognized by auto-detection:
+ # .bz2 (implies BZIP2 as described below).
+ # .gz (implies GZIP as described below)
+ # Any non-recognized extension implies UNCOMPRESSED as described below.
+ AUTO = 'auto'
+
+ # BZIP2 compression.
+ BZIP2 = 'bzip2'
+
+ # GZIP compression (deflate with GZIP headers).
+ GZIP = 'gzip'
+
+ # Uncompressed (i.e., may be split).
+ UNCOMPRESSED = 'uncompressed'
+
+ @classmethod
+ def is_valid_compression_type(cls, compression_type):
+ """Returns True for valid compression types, False otherwise."""
+ types = set([
+ CompressionTypes.AUTO,
+ CompressionTypes.BZIP2,
+ CompressionTypes.GZIP,
+ CompressionTypes.UNCOMPRESSED
+ ])
+ return compression_type in types
+
+ @classmethod
+ def mime_type(cls, compression_type, default='application/octet-stream'):
+ mime_types_by_compression_type = {
+ cls.BZIP2: 'application/x-bz2',
+ cls.GZIP: 'application/x-gzip',
+ }
+ return mime_types_by_compression_type.get(compression_type, default)
+
+ @classmethod
+ def detect_compression_type(cls, file_path):
+ """Returns the compression type of a file (based on its suffix)."""
+ compression_types_by_suffix = {'.bz2': cls.BZIP2, '.gz': cls.GZIP}
+ lowercased_path = file_path.lower()
+ for suffix, compression_type in compression_types_by_suffix.iteritems():
+ if lowercased_path.endswith(suffix):
+ return compression_type
+ return cls.UNCOMPRESSED
+
+
+class CompressedFile(object):
+ """Somewhat limited file wrapper for easier handling of compressed files."""
+
+ # The bit mask to use for the wbits parameters of the zlib compressor and
+ # decompressor objects.
+ _gzip_mask = zlib.MAX_WBITS | 16 # Mask when using GZIP headers.
+
+ def __init__(self,
+ fileobj,
+ compression_type=CompressionTypes.GZIP,
+ read_size=DEFAULT_READ_BUFFER_SIZE):
+ if not fileobj:
+ raise ValueError('File object must not be None')
+
+ if not CompressionTypes.is_valid_compression_type(compression_type):
+ raise TypeError('compression_type must be CompressionType object but '
+ 'was %s' % type(compression_type))
+ if compression_type in (CompressionTypes.AUTO, CompressionTypes.UNCOMPRESSED
+ ):
+ raise ValueError(
+ 'Cannot create object with unspecified or no compression')
+
+ self._file = fileobj
+ self._compression_type = compression_type
+
+ if self._file.tell() != 0:
+ raise ValueError('File object must be at position 0 but was %d' %
+ self._file.tell())
+ self._uncompressed_position = 0
+
+ if self.readable():
+ self._read_size = read_size
+ self._read_buffer = cStringIO.StringIO()
+ self._read_position = 0
+ self._read_eof = False
+
+ if self._compression_type == CompressionTypes.BZIP2:
+ self._decompressor = bz2.BZ2Decompressor()
+ else:
+ assert self._compression_type == CompressionTypes.GZIP
+ self._decompressor = zlib.decompressobj(self._gzip_mask)
+ else:
+ self._decompressor = None
+
+ if self.writeable():
+ if self._compression_type == CompressionTypes.BZIP2:
+ self._compressor = bz2.BZ2Compressor()
+ else:
+ assert self._compression_type == CompressionTypes.GZIP
+ self._compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION,
+ zlib.DEFLATED, self._gzip_mask)
+ else:
+ self._compressor = None
+
+ def readable(self):
+ mode = self._file.mode
+ return 'r' in mode or 'a' in mode
+
+ def writeable(self):
+ mode = self._file.mode
+ return 'w' in mode or 'a' in mode
+
+ def write(self, data):
+ """Write data to file."""
+ if not self._compressor:
+ raise ValueError('compressor not initialized')
+ self._uncompressed_position += len(data)
+ compressed = self._compressor.compress(data)
+ if compressed:
+ self._file.write(compressed)
+
+ def _fetch_to_internal_buffer(self, num_bytes):
+ """Fetch up to num_bytes into the internal buffer."""
+ if (not self._read_eof and self._read_position > 0 and
+ (self._read_buffer.tell() - self._read_position) < num_bytes):
+ # There aren't enough number of bytes to accommodate a read, so we
+ # prepare for a possibly large read by clearing up all internal buffers
+ # but without dropping any previous held data.
+ self._read_buffer.seek(self._read_position)
+ data = self._read_buffer.read()
+ self._read_position = 0
+ self._read_buffer.seek(0)
+ self._read_buffer.truncate(0)
+ self._read_buffer.write(data)
+
+ while not self._read_eof and (self._read_buffer.tell() - self._read_position
+ ) < num_bytes:
+ # Continue reading from the underlying file object until enough bytes are
+ # available, or EOF is reached.
+ buf = self._file.read(self._read_size)
+ if buf:
+ decompressed = self._decompressor.decompress(buf)
+ del buf # Free up some possibly large and no-longer-needed memory.
+ self._read_buffer.write(decompressed)
+ else:
+ # EOF reached.
+ # Verify completeness and no corruption and flush (if needed by
+ # the underlying algorithm).
+ if self._compression_type == CompressionTypes.BZIP2:
+ # Having unused_data past end of stream would imply file corruption.
+ assert not self._decompressor.unused_data, 'Possible file corruption.'
+ try:
+ # EOF implies that the underlying BZIP2 stream must also have
+ # reached EOF. We expect this to raise an EOFError and we catch it
+ # below. Any other kind of error though would be problematic.
+ self._decompressor.decompress('dummy')
+ assert False, 'Possible file corruption.'
+ except EOFError:
+ pass # All is as expected!
+ else:
+ self._read_buffer.write(self._decompressor.flush())
+
+ # Record that we have hit the end of file, so we won't unnecessarily
+ # repeat the completeness verification step above.
+ self._read_eof = True
+
+ def _read_from_internal_buffer(self, read_fn):
+ """Read from the internal buffer by using the supplied read_fn."""
+ self._read_buffer.seek(self._read_position)
+ result = read_fn()
+ self._read_position += len(result)
+ self._uncompressed_position += len(result)
+ self._read_buffer.seek(0, os.SEEK_END) # Allow future writes.
+ return result
+
+ def read(self, num_bytes):
+ if not self._decompressor:
+ raise ValueError('decompressor not initialized')
+
+ self._fetch_to_internal_buffer(num_bytes)
+ return self._read_from_internal_buffer(
+ lambda: self._read_buffer.read(num_bytes))
+
+ def readline(self):
+ """Equivalent to standard file.readline(). Same return conventions apply."""
+ if not self._decompressor:
+ raise ValueError('decompressor not initialized')
+
+ io = cStringIO.StringIO()
+ while True:
+ # Ensure that the internal buffer has at least half the read_size. Going
+ # with half the _read_size (as opposed to a full _read_size) to ensure
+ # that actual fetches are more evenly spread out, as opposed to having 2
+ # consecutive reads at the beginning of a read.
+ self._fetch_to_internal_buffer(self._read_size / 2)
+ line = self._read_from_internal_buffer(
+ lambda: self._read_buffer.readline())
+ io.write(line)
+ if line.endswith('\n') or not line:
+ break # Newline or EOF reached.
+
+ return io.getvalue()
+
+ def closed(self):
+ return not self._file or self._file.closed()
+
+ def close(self):
+ if self.readable():
+ self._read_buffer.close()
+
+ if self.writeable():
+ self._file.write(self._compressor.flush())
+
+ self._file.close()
+
+ def flush(self):
+ if self.writeable():
+ self._file.write(self._compressor.flush())
+ self._file.flush()
+
+ @property
+ def seekable(self):
+ # TODO: Add support for seeking to a file position.
+ return False
+
+ def tell(self):
+ """Returns current position in uncompressed file."""
+ return self._uncompressed_position
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exception_type, exception_value, traceback):
+ self.close()
+
+
+class FileMetadata(object):
+ """Metadata about a file path that is the output of FileSystem.match
+ """
+ def __init__(self, path, size_in_bytes):
+ assert isinstance(path, basestring) and path, "Path should be a string"
+ assert isinstance(size_in_bytes, int) and size_in_bytes >= 0, \
+ "Size of bytes should be greater than equal to zero"
+ self.path = path
+ self.size_in_bytes = size_in_bytes
+
+ def __eq__(self, other):
+ """Note: This is only used in tests where we verify that mock objects match.
+ """
+ return (isinstance(other, FileMetadata) and
+ self.path == other.path and
+ self.size_in_bytes == other.size_in_bytes)
+
+ def __hash__(self):
+ return hash((self.path, self.size_in_bytes))
+
+ def __ne__(self, other):
+ return not self.__eq__(other)
+
+ def __repr__(self):
+ return 'FileMetadata(%s, %s)' % (self.path, self.size_in_bytes)
+
+
+class MatchResult(object):
+ """Result from the ``FileSystem`` match operation which contains the list
+ of matched FileMetadata.
+ """
+ def __init__(self, pattern, metadata_list):
+ self.metadata_list = metadata_list
+ self.pattern = pattern
+
+
+class BeamIOError(IOError):
+ def __init__(self, msg, exception_details=None):
+ """Class representing the errors thrown in the batch file operations.
+ Args:
+ msg: Message string for the exception thrown
+ exception_details: Optional map of individual input to exception for
+ failed operations in batch. This parameter is optional so if specified
+ the user can assume that the all errors in the filesystem operation
+ have been reported. When the details are missing then the operation
+ may have failed anywhere so the user should use match to determine
+ the current state of the system.
+ """
+ super(BeamIOError, self).__init__(msg)
+ self.exception_details = exception_details
+
+
+class FileSystem(object):
+ """A class that defines the functions that can be performed on a filesystem.
+
+ All methods are abstract and they are for file system providers to
+ implement. Clients should use the FileSystemUtil class to interact with
+ the correct file system based on the provided file pattern scheme.
+ """
+ __metaclass__ = abc.ABCMeta
+
+ @staticmethod
+ def _get_compression_type(path, compression_type):
+ if compression_type == CompressionTypes.AUTO:
+ compression_type = CompressionTypes.detect_compression_type(path)
+ elif not CompressionTypes.is_valid_compression_type(compression_type):
+ raise TypeError('compression_type must be CompressionType object but '
+ 'was %s' % type(compression_type))
+ return compression_type
+
+ @abc.abstractmethod
+ def mkdirs(self, path):
+ """Recursively create directories for the provided path.
+
+ Args:
+ path: string path of the directory structure that should be created
+
+ Raises:
+ IOError if leaf directory already exists.
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def match(self, patterns, limits=None):
+ """Find all matching paths to the patterns provided.
+
+ Args:
+ patterns: list of string for the file path pattern to match against
+ limits: list of maximum number of responses that need to be fetched
+
+ Returns: list of ``MatchResult`` objects.
+
+ Raises:
+ ``BeamIOError`` if any of the pattern match operations fail
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def create(self, path, mime_type, compression_type):
+ """Returns a write channel for the given file path.
+
+ Args:
+ path: string path of the file object to be written to the system
+ mime_type: MIME type to specify the type of content in the file object
+ compression_type: Type of compression to be used for this object
+
+ Returns: file handle with a close function for the user to use
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def open(self, path, mime_type, compression_type):
+ """Returns a read channel for the given file path.
+
+ Args:
+ path: string path of the file object to be written to the system
+ mime_type: MIME type to specify the type of content in the file object
+ compression_type: Type of compression to be used for this object
+
+ Returns: file handle with a close function for the user to use
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def copy(self, source_file_names, destination_file_names):
+ """Recursively copy the file tree from the source to the destination
+
+ Args:
+ source_file_names: list of source file objects that needs to be copied
+ destination_file_names: list of destination of the new object
+
+ Raises:
+ ``BeamIOError`` if any of the copy operations fail
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def rename(self, source_file_names, destination_file_names):
+ """Rename the files at the source list to the destination list.
+ Source and destination lists should be of the same size.
+
+ Args:
+ source_file_names: List of file paths that need to be moved
+ destination_file_names: List of destination_file_names for the files
+
+ Raises:
+ ``BeamIOError`` if any of the rename operations fail
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def exists(self, path):
+ """Check if the provided path exists on the FileSystem.
+
+ Args:
+ path: string path that needs to be checked.
+
+ Returns: boolean flag indicating if path exists
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def delete(self, paths):
+ """Deletes files or directories at the provided paths.
+ Directories will be deleted recursively.
+
+ Args:
+ paths: list of paths that give the file objects to be deleted
+
+ Raises:
+ ``BeamIOError`` if any of the delete operations fail
+ """
+ raise NotImplementedError
http://git-wip-us.apache.org/repos/asf/beam/blob/1bc240bc/sdks/python/apache_beam/io/filesystems_util.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filesystems_util.py b/sdks/python/apache_beam/io/filesystems_util.py
new file mode 100644
index 0000000..47c2361
--- /dev/null
+++ b/sdks/python/apache_beam/io/filesystems_util.py
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""Utility functions for getting the correct file systems for a file name"""
+
+from apache_beam.io.localfilesystem import LocalFileSystem
+
+
+# TODO(BEAM-1585): Add a mechanism to add user implemented file systems
+def get_filesystem(path):
+ """Function that returns the FileSystem class to use based on the path
+ provided in the input.
+ """
+ if path.startswith('gs://'):
+ from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem
+ return GCSFileSystem()
+ else:
+ return LocalFileSystem()
http://git-wip-us.apache.org/repos/asf/beam/blob/1bc240bc/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
new file mode 100644
index 0000000..5aef0ab
--- /dev/null
+++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
@@ -0,0 +1,242 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""GCS file system implementation for accessing files on GCS."""
+
+from __future__ import absolute_import
+
+from apache_beam.io.filesystem import BeamIOError
+from apache_beam.io.filesystem import CompressedFile
+from apache_beam.io.filesystem import CompressionTypes
+from apache_beam.io.filesystem import FileMetadata
+from apache_beam.io.filesystem import FileSystem
+from apache_beam.io.filesystem import MatchResult
+from apache_beam.io.gcp import gcsio
+
+
+class GCSFileSystem(FileSystem):
+ """A GCS ``FileSystem`` implementation for accessing files on GCS.
+ """
+
+ def mkdirs(self, path):
+ """Recursively create directories for the provided path.
+
+ Args:
+ path: string path of the directory structure that should be created
+
+ Raises:
+ IOError if leaf directory already exists.
+ """
+ pass
+
+ def match(self, patterns, limits=None):
+ """Find all matching paths to the pattern provided.
+
+ Args:
+ pattern: string for the file path pattern to match against
+ limit: Maximum number of responses that need to be fetched
+
+ Returns: list of ``MatchResult`` objects.
+
+ Raises:
+ ``BeamIOError`` if any of the pattern match operations fail
+ """
+ if limits is None:
+ limits = [None] * len(patterns)
+ else:
+ err_msg = "Patterns and limits should be equal in length"
+ assert len(patterns) == len(limits), err_msg
+
+ def _match(pattern, limit):
+ """Find all matching paths to the pattern provided.
+ """
+ if pattern.endswith('/'):
+ pattern += '*'
+ file_sizes = gcsio.GcsIO().size_of_files_in_glob(pattern)
+ metadata_list = [FileMetadata(path, size)
+ for path, size in file_sizes.iteritems()]
+ return MatchResult(pattern, metadata_list)
+
+ exceptions = {}
+ result = []
+ for pattern, limit in zip(patterns, limits):
+ try:
+ result.append(_match(pattern, limit))
+ except Exception as e: # pylint: disable=broad-except
+ exceptions[pattern] = e
+
+ if exceptions:
+ raise BeamIOError("Match operation failed", exceptions)
+ return result
+
+ def _path_open(self, path, mode, mime_type='application/octet-stream',
+ compression_type=CompressionTypes.AUTO):
+ """Helper functions to open a file in the provided mode.
+ """
+ compression_type = FileSystem._get_compression_type(path, compression_type)
+ mime_type = CompressionTypes.mime_type(compression_type, mime_type)
+ raw_file = gcsio.GcsIO().open(path, mode, mime_type=mime_type)
+ if compression_type == CompressionTypes.UNCOMPRESSED:
+ return raw_file
+ else:
+ return CompressedFile(raw_file, compression_type=compression_type)
+
+ def create(self, path, mime_type='application/octet-stream',
+ compression_type=CompressionTypes.AUTO):
+ """Returns a write channel for the given file path.
+
+ Args:
+ path: string path of the file object to be written to the system
+ mime_type: MIME type to specify the type of content in the file object
+ compression_type: Type of compression to be used for this object
+
+ Returns: file handle with a close function for the user to use
+ """
+ return self._path_open(path, 'wb', mime_type, compression_type)
+
+ def open(self, path, mime_type='application/octet-stream',
+ compression_type=CompressionTypes.AUTO):
+ """Returns a read channel for the given file path.
+
+ Args:
+ path: string path of the file object to be written to the system
+ mime_type: MIME type to specify the type of content in the file object
+ compression_type: Type of compression to be used for this object
+
+ Returns: file handle with a close function for the user to use
+ """
+ return self._path_open(path, 'rb', mime_type, compression_type)
+
+ def copy(self, source_file_names, destination_file_names):
+ """Recursively copy the file tree from the source to the destination
+
+ Args:
+ source_file_names: list of source file objects that needs to be copied
+ destination_file_names: list of destination of the new object
+
+ Raises:
+ ``BeamIOError`` if any of the copy operations fail
+ """
+ err_msg = ("source_file_names and destination_file_names should "
+ "be equal in length")
+ assert len(source_file_names) == len(destination_file_names), err_msg
+
+ def _copy_path(source, destination):
+ """Recursively copy the file tree from the source to the destination
+ """
+ if not destination.startswith('gs://'):
+ raise ValueError('Destination %r must be GCS path.', destination)
+ # Use copy_tree if the path ends with / as it is a directory
+ if source.endswith('/'):
+ gcsio.GcsIO().copytree(source, destination)
+ else:
+ gcsio.GcsIO().copy(source, destination)
+
+ exceptions = {}
+ for source, destination in zip(source_file_names, destination_file_names):
+ try:
+ _copy_path(source, destination)
+ except Exception as e: # pylint: disable=broad-except
+ exceptions[(source, destination)] = e
+
+ if exceptions:
+ raise BeamIOError("Copy operation failed", exceptions)
+
+ def rename(self, source_file_names, destination_file_names):
+ """Rename the files at the source list to the destination list.
+ Source and destination lists should be of the same size.
+
+ Args:
+ source_file_names: List of file paths that need to be moved
+ destination_file_names: List of destination_file_names for the files
+
+ Raises:
+ ``BeamIOError`` if any of the rename operations fail
+ """
+ err_msg = ("source_file_names and destination_file_names should "
+ "be equal in length")
+ assert len(source_file_names) == len(destination_file_names), err_msg
+
+ gcs_batches = []
+ gcs_current_batch = []
+ for src, dest in zip(source_file_names, destination_file_names):
+ gcs_current_batch.append((src, dest))
+ if len(gcs_current_batch) == gcsio.MAX_BATCH_OPERATION_SIZE:
+ gcs_batches.append(gcs_current_batch)
+ gcs_current_batch = []
+ if gcs_current_batch:
+ gcs_batches.append(gcs_current_batch)
+
+ # Execute GCS renames if any and return exceptions.
+ exceptions = {}
+ for batch in gcs_batches:
+ copy_statuses = gcsio.GcsIO().copy_batch(batch)
+ copy_succeeded = []
+ for src, dest, exception in copy_statuses:
+ if exception:
+ exceptions[(src, dest)] = exception
+ else:
+ copy_succeeded.append((src, dest))
+ delete_batch = [src for src, dest in copy_succeeded]
+ delete_statuses = gcsio.GcsIO().delete_batch(delete_batch)
+ for i, (src, exception) in enumerate(delete_statuses):
+ dest = copy_succeeded[i][1]
+ if exception:
+ exceptions[(src, dest)] = exception
+
+ if exceptions:
+ raise BeamIOError("Rename operation failed", exceptions)
+
+ def exists(self, path):
+ """Check if the provided path exists on the FileSystem.
+
+ Args:
+ path: string path that needs to be checked.
+
+ Returns: boolean flag indicating if path exists
+ """
+ return gcsio.GcsIO().exists(path)
+
+ def delete(self, paths):
+ """Deletes files or directories at the provided paths.
+ Directories will be deleted recursively.
+
+ Args:
+ paths: list of paths that give the file objects to be deleted
+ """
+ def _delete_path(path):
+ """Recursively delete the file or directory at the provided path.
+ """
+ if path.endswith('/'):
+ path_to_use = path + '*'
+ else:
+ path_to_use = path
+ match_result = self.match([path_to_use])[0]
+ statuses = gcsio.GcsIO().delete_batch(
+ [m.path for m in match_result.metadata_list])
+ failures = [e for (_, e) in statuses if e is not None]
+ if failures:
+ raise failures[0]
+
+ exceptions = {}
+ for path in paths:
+ try:
+ _delete_path(path)
+ except Exception as e: # pylint: disable=broad-except
+ exceptions[path] = e
+
+ if exceptions:
+ raise BeamIOError("Delete operation failed", exceptions)
http://git-wip-us.apache.org/repos/asf/beam/blob/1bc240bc/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
new file mode 100644
index 0000000..3fe5cce
--- /dev/null
+++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
@@ -0,0 +1,293 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for GCS File System."""
+
+import unittest
+
+import mock
+from apache_beam.io.filesystem import BeamIOError
+from apache_beam.io.filesystem import FileMetadata
+
+# Protect against environments where apitools library is not available.
+# pylint: disable=wrong-import-order, wrong-import-position
+try:
+ from apache_beam.io.gcp import gcsfilesystem
+except ImportError:
+ gcsfilesystem = None
+# pylint: enable=wrong-import-order, wrong-import-position
+
+
+@unittest.skipIf(gcsfilesystem is None, 'GCP dependencies are not installed')
+class GCSFileSystemTest(unittest.TestCase):
+
+ @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio')
+ def test_match_multiples(self, mock_gcsio):
+ # Prepare mocks.
+ gcsio_mock = mock.MagicMock()
+ gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock
+ gcsio_mock.size_of_files_in_glob.return_value = {
+ 'gs://bucket/file1': 1,
+ 'gs://bucket/file2': 2
+ }
+ expected_results = set([
+ FileMetadata('gs://bucket/file1', 1),
+ FileMetadata('gs://bucket/file2', 2)
+ ])
+ file_system = gcsfilesystem.GCSFileSystem()
+ match_result = file_system.match(['gs://bucket/'])[0]
+ self.assertEqual(
+ set(match_result.metadata_list),
+ expected_results)
+ gcsio_mock.size_of_files_in_glob.assert_called_once_with('gs://bucket/*')
+
+ @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio')
+ def test_match_multiples_error(self, mock_gcsio):
+ # Prepare mocks.
+ gcsio_mock = mock.MagicMock()
+ gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock
+ exception = IOError('Failed')
+ gcsio_mock.size_of_files_in_glob.side_effect = exception
+ expected_results = {'gs://bucket/': exception}
+
+ file_system = gcsfilesystem.GCSFileSystem()
+ with self.assertRaises(BeamIOError) as error:
+ file_system.match(['gs://bucket/'])
+ self.assertEqual(error.exception.message, 'Match operation failed')
+ self.assertEqual(error.exception.exception_details, expected_results)
+ gcsio_mock.size_of_files_in_glob.assert_called_once_with('gs://bucket/*')
+
+ @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio')
+ def test_match_multiple_patterns(self, mock_gcsio):
+ # Prepare mocks.
+ gcsio_mock = mock.MagicMock()
+ gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock
+ gcsio_mock.size_of_files_in_glob.side_effect = [
+ {'gs://bucket/file1': 1},
+ {'gs://bucket/file2': 2},
+ ]
+ expected_results = [
+ [FileMetadata('gs://bucket/file1', 1)],
+ [FileMetadata('gs://bucket/file2', 2)]
+ ]
+ file_system = gcsfilesystem.GCSFileSystem()
+ result = file_system.match(['gs://bucket/file1*', 'gs://bucket/file2*'])
+ self.assertEqual(
+ [mr.metadata_list for mr in result],
+ expected_results)
+
+ @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio')
+ def test_create(self, mock_gcsio):
+ # Prepare mocks.
+ gcsio_mock = mock.MagicMock()
+ gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock
+ # Issue file copy
+ file_system = gcsfilesystem.GCSFileSystem()
+ _ = file_system.create('gs://bucket/from1', 'application/octet-stream')
+
+ gcsio_mock.open.assert_called_once_with(
+ 'gs://bucket/from1', 'wb', mime_type='application/octet-stream')
+
+ @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio')
+ def test_open(self, mock_gcsio):
+ # Prepare mocks.
+ gcsio_mock = mock.MagicMock()
+ gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock
+ # Issue file copy
+ file_system = gcsfilesystem.GCSFileSystem()
+ _ = file_system.open('gs://bucket/from1', 'application/octet-stream')
+
+ gcsio_mock.open.assert_called_once_with(
+ 'gs://bucket/from1', 'rb', mime_type='application/octet-stream')
+
+ @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio')
+ def test_copy_file(self, mock_gcsio):
+ # Prepare mocks.
+ gcsio_mock = mock.MagicMock()
+ gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock
+ sources = ['gs://bucket/from1']
+ destinations = ['gs://bucket/to1']
+
+ # Issue file copy
+ file_system = gcsfilesystem.GCSFileSystem()
+ file_system.copy(sources, destinations)
+
+ gcsio_mock.copy.assert_called_once_with(
+ 'gs://bucket/from1', 'gs://bucket/to1')
+
+ @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio')
+ def test_copy_file_error(self, mock_gcsio):
+ # Prepare mocks.
+ gcsio_mock = mock.MagicMock()
+ gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock
+ sources = ['gs://bucket/from1']
+ destinations = ['gs://bucket/to1']
+
+ exception = IOError('Failed')
+ gcsio_mock.copy.side_effect = exception
+
+ # Issue batch rename.
+ expected_results = {(s, d):exception for s, d in zip(sources, destinations)}
+
+ # Issue batch copy.
+ file_system = gcsfilesystem.GCSFileSystem()
+ with self.assertRaises(BeamIOError) as error:
+ file_system.copy(sources, destinations)
+ self.assertEqual(error.exception.message, 'Copy operation failed')
+ self.assertEqual(error.exception.exception_details, expected_results)
+
+ gcsio_mock.copy.assert_called_once_with(
+ 'gs://bucket/from1', 'gs://bucket/to1')
+
+ @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio')
+ def test_copy_tree(self, mock_gcsio):
+ # Prepare mocks.
+ gcsio_mock = mock.MagicMock()
+ gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock
+ sources = ['gs://bucket1/']
+ destinations = ['gs://bucket2/']
+
+ # Issue directory copy
+ file_system = gcsfilesystem.GCSFileSystem()
+ file_system.copy(sources, destinations)
+
+ gcsio_mock.copytree.assert_called_once_with(
+ 'gs://bucket1/', 'gs://bucket2/')
+
+ @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio')
+ def test_rename(self, mock_gcsio):
+ # Prepare mocks.
+ gcsio_mock = mock.MagicMock()
+ gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock
+ sources = [
+ 'gs://bucket/from1',
+ 'gs://bucket/from2',
+ 'gs://bucket/from3',
+ ]
+ destinations = [
+ 'gs://bucket/to1',
+ 'gs://bucket/to2',
+ 'gs://bucket/to3',
+ ]
+ gcsio_mock.copy_batch.side_effect = [[
+ ('gs://bucket/from1', 'gs://bucket/to1', None),
+ ('gs://bucket/from2', 'gs://bucket/to2', None),
+ ('gs://bucket/from3', 'gs://bucket/to3', None),
+ ]]
+ gcsio_mock.delete_batch.side_effect = [[
+ ('gs://bucket/from1', None),
+ ('gs://bucket/from2', None),
+ ('gs://bucket/from3', None),
+ ]]
+
+ # Issue batch rename.
+ file_system = gcsfilesystem.GCSFileSystem()
+ file_system.rename(sources, destinations)
+
+ gcsio_mock.copy_batch.assert_called_once_with([
+ ('gs://bucket/from1', 'gs://bucket/to1'),
+ ('gs://bucket/from2', 'gs://bucket/to2'),
+ ('gs://bucket/from3', 'gs://bucket/to3'),
+ ])
+ gcsio_mock.delete_batch.assert_called_once_with([
+ 'gs://bucket/from1',
+ 'gs://bucket/from2',
+ 'gs://bucket/from3',
+ ])
+
+ @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio')
+ def test_rename_error(self, mock_gcsio):
+ # Prepare mocks.
+ gcsio_mock = mock.MagicMock()
+ gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock
+ sources = [
+ 'gs://bucket/from1',
+ 'gs://bucket/from2',
+ 'gs://bucket/from3',
+ ]
+ destinations = [
+ 'gs://bucket/to1',
+ 'gs://bucket/to2',
+ 'gs://bucket/to3',
+ ]
+ exception = IOError('Failed')
+ gcsio_mock.delete_batch.side_effect = [[(f, exception) for f in sources]]
+ gcsio_mock.copy_batch.side_effect = [[
+ ('gs://bucket/from1', 'gs://bucket/to1', None),
+ ('gs://bucket/from2', 'gs://bucket/to2', None),
+ ('gs://bucket/from3', 'gs://bucket/to3', None),
+ ]]
+
+ # Issue batch rename.
+ expected_results = {(s, d):exception for s, d in zip(sources, destinations)}
+
+ # Issue batch rename.
+ file_system = gcsfilesystem.GCSFileSystem()
+ with self.assertRaises(BeamIOError) as error:
+ file_system.rename(sources, destinations)
+ self.assertEqual(error.exception.message, 'Rename operation failed')
+ self.assertEqual(error.exception.exception_details, expected_results)
+
+ gcsio_mock.copy_batch.assert_called_once_with([
+ ('gs://bucket/from1', 'gs://bucket/to1'),
+ ('gs://bucket/from2', 'gs://bucket/to2'),
+ ('gs://bucket/from3', 'gs://bucket/to3'),
+ ])
+ gcsio_mock.delete_batch.assert_called_once_with([
+ 'gs://bucket/from1',
+ 'gs://bucket/from2',
+ 'gs://bucket/from3',
+ ])
+
+ @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio')
+ def test_delete(self, mock_gcsio):
+ # Prepare mocks.
+ gcsio_mock = mock.MagicMock()
+ gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock
+ files = [
+ 'gs://bucket/from1',
+ 'gs://bucket/from2',
+ 'gs://bucket/from3',
+ ]
+
+ # Issue batch delete.
+ file_system = gcsfilesystem.GCSFileSystem()
+ file_system.delete(files)
+ gcsio_mock.delete_batch.assert_called()
+
+ @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio')
+ def test_delete_error(self, mock_gcsio):
+ # Prepare mocks.
+ gcsio_mock = mock.MagicMock()
+ gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock
+ exception = IOError('Failed')
+ gcsio_mock.delete_batch.side_effect = exception
+ files = [
+ 'gs://bucket/from1',
+ 'gs://bucket/from2',
+ 'gs://bucket/from3',
+ ]
+ expected_results = {f:exception for f in files}
+
+ # Issue batch delete.
+ file_system = gcsfilesystem.GCSFileSystem()
+ with self.assertRaises(BeamIOError) as error:
+ file_system.delete(files)
+ self.assertEqual(error.exception.message, 'Delete operation failed')
+ self.assertEqual(error.exception.exception_details, expected_results)
+ gcsio_mock.delete_batch.assert_called()
http://git-wip-us.apache.org/repos/asf/beam/blob/1bc240bc/sdks/python/apache_beam/io/gcp/gcsio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py
index 020c38f..285e272 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio.py
@@ -31,20 +31,20 @@ import re
import threading
import traceback
-import apitools.base.py.transfer as transfer
-from apitools.base.py.batch import BatchApiRequest
-from apitools.base.py.exceptions import HttpError
-
-from apache_beam.internal.gcp import auth
from apache_beam.utils import retry
# Issue a friendlier error message if the storage library is not available.
# TODO(silviuc): Remove this guard when storage is available everywhere.
try:
# pylint: disable=wrong-import-order, wrong-import-position
+ # pylint: disable=ungrouped-imports
+ import apitools.base.py.transfer as transfer
+ from apitools.base.py.batch import BatchApiRequest
+ from apitools.base.py.exceptions import HttpError
+ from apache_beam.internal.gcp import auth
from apache_beam.io.gcp.internal.clients import storage
except ImportError:
- raise RuntimeError(
+ raise ImportError(
'Google Cloud Storage I/O not supported for this execution environment '
'(could not import storage API client).')
http://git-wip-us.apache.org/repos/asf/beam/blob/1bc240bc/sdks/python/apache_beam/io/gcp/gcsio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/gcsio_test.py b/sdks/python/apache_beam/io/gcp/gcsio_test.py
index a852689..c028f0d 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio_test.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio_test.py
@@ -28,12 +28,11 @@ import unittest
import httplib2
import mock
-from apache_beam.io import gcsio
-from apache_beam.io.gcp.internal.clients import storage
-
# Protect against environments where apitools library is not available.
# pylint: disable=wrong-import-order, wrong-import-position
try:
+ from apache_beam.io.gcp import gcsio
+ from apache_beam.io.gcp.internal.clients import storage
from apitools.base.py.exceptions import HttpError
except ImportError:
HttpError = None
@@ -295,7 +294,7 @@ class TestGCSIO(unittest.TestCase):
self.assertFalse(
gcsio.parse_gcs_path(file_name) in self.client.objects.files)
- @mock.patch('apache_beam.io.gcsio.BatchApiRequest')
+ @mock.patch('apache_beam.io.gcp.gcsio.BatchApiRequest')
def test_delete_batch(self, *unused_args):
gcsio.BatchApiRequest = FakeBatchApiRequest
file_name_pattern = 'gs://gcsio-test/delete_me_%d'
@@ -346,7 +345,7 @@ class TestGCSIO(unittest.TestCase):
self.assertRaises(IOError, self.gcs.copy, 'gs://gcsio-test/non-existent',
'gs://gcsio-test/non-existent-destination')
- @mock.patch('apache_beam.io.gcsio.BatchApiRequest')
+ @mock.patch('apache_beam.io.gcp.gcsio.BatchApiRequest')
def test_copy_batch(self, *unused_args):
gcsio.BatchApiRequest = FakeBatchApiRequest
from_name_pattern = 'gs://gcsio-test/copy_me_%d'
http://git-wip-us.apache.org/repos/asf/beam/blob/1bc240bc/sdks/python/apache_beam/io/iobase.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py
index 057f853..512824b 100644
--- a/sdks/python/apache_beam/io/iobase.py
+++ b/sdks/python/apache_beam/io/iobase.py
@@ -591,8 +591,8 @@ class Sink(HasDisplayData):
single record from the bundle and ``close()`` which is called once
at the end of writing a bundle.
- See also ``beam.io.fileio.FileSink`` which provides a simpler API for writing
- sinks that produce files.
+ See also ``apache_beam.io.fileio.FileSink`` which provides a simpler API
+ for writing sinks that produce files.
**Execution of the Write transform**
[3/3] beam git commit: This closes #2136
Posted by ch...@apache.org.
This closes #2136
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/dd0f8d98
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/dd0f8d98
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/dd0f8d98
Branch: refs/heads/master
Commit: dd0f8d984aa79c08867ae9f5e54ce48179499e19
Parents: 7c7bb82 1bc240b
Author: Chamikara Jayalath <ch...@google.com>
Authored: Wed Mar 22 14:28:53 2017 -0700
Committer: Chamikara Jayalath <ch...@google.com>
Committed: Wed Mar 22 14:28:53 2017 -0700
----------------------------------------------------------------------
.../apache_beam/examples/snippets/snippets.py | 2 +-
sdks/python/apache_beam/io/avroio.py | 3 +-
sdks/python/apache_beam/io/filebasedsource.py | 83 +--
.../apache_beam/io/filebasedsource_test.py | 18 +-
sdks/python/apache_beam/io/fileio.py | 588 +++----------------
sdks/python/apache_beam/io/fileio_test.py | 102 +---
sdks/python/apache_beam/io/filesystem.py | 439 ++++++++++++++
sdks/python/apache_beam/io/filesystems_util.py | 31 +
sdks/python/apache_beam/io/gcp/gcsfilesystem.py | 242 ++++++++
.../apache_beam/io/gcp/gcsfilesystem_test.py | 293 +++++++++
sdks/python/apache_beam/io/gcp/gcsio.py | 12 +-
sdks/python/apache_beam/io/gcp/gcsio_test.py | 9 +-
sdks/python/apache_beam/io/iobase.py | 4 +-
sdks/python/apache_beam/io/localfilesystem.py | 236 ++++++++
.../apache_beam/io/localfilesystem_test.py | 185 ++++++
sdks/python/apache_beam/io/textio.py | 7 +-
sdks/python/apache_beam/io/textio_test.py | 2 +-
sdks/python/apache_beam/io/tfrecordio.py | 5 +-
sdks/python/apache_beam/io/tfrecordio_test.py | 20 +-
.../apache_beam/tests/pipeline_verifiers.py | 8 +-
.../tests/pipeline_verifiers_test.py | 26 +-
21 files changed, 1613 insertions(+), 702 deletions(-)
----------------------------------------------------------------------