You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2017/03/22 21:29:40 UTC

[1/3] beam git commit: BeamFileSystem implementation instead of IOChannelFactory

Repository: beam
Updated Branches:
  refs/heads/master 7c7bb8209 -> dd0f8d984


http://git-wip-us.apache.org/repos/asf/beam/blob/1bc240bc/sdks/python/apache_beam/io/localfilesystem.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/localfilesystem.py b/sdks/python/apache_beam/io/localfilesystem.py
new file mode 100644
index 0000000..46589b0
--- /dev/null
+++ b/sdks/python/apache_beam/io/localfilesystem.py
@@ -0,0 +1,236 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""Local File system implementation for accessing files on disk."""
+
+from __future__ import absolute_import
+
+import glob
+import os
+import shutil
+
+from apache_beam.io.filesystem import BeamIOError
+from apache_beam.io.filesystem import CompressedFile
+from apache_beam.io.filesystem import CompressionTypes
+from apache_beam.io.filesystem import FileMetadata
+from apache_beam.io.filesystem import FileSystem
+from apache_beam.io.filesystem import MatchResult
+
+
+class LocalFileSystem(FileSystem):
+  """A Local ``FileSystem`` implementation for accessing files on disk.
+  """
+
+  def mkdirs(self, path):
+    """Recursively create directories for the provided path.
+
+    Args:
+      path: string path of the directory structure that should be created
+
+    Raises:
+      IOError if leaf directory already exists.
+    """
+    try:
+      os.makedirs(path)
+    except OSError as err:
+      raise IOError(err)
+
+  def match(self, patterns, limits=None):
+    """Find all matching paths to the pattern provided.
+
+    Args:
+      patterns: list of string for the file path pattern to match against
+      limits: list of maximum number of responses that need to be fetched
+
+    Returns: list of ``MatchResult`` objects.
+
+    Raises:
+      ``BeamIOError`` if any of the pattern match operations fail
+    """
+    if limits is None:
+      limits = [None] * len(patterns)
+    else:
+      err_msg = "Patterns and limits should be equal in length"
+      assert len(patterns) == len(limits), err_msg
+
+    def _match(pattern, limit):
+      """Find all matching paths to the pattern provided.
+      """
+      files = glob.glob(pattern)
+      metadata = [FileMetadata(f, os.path.getsize(f)) for f in files[:limit]]
+      return MatchResult(pattern, metadata)
+
+    exceptions = {}
+    result = []
+    for pattern, limit in zip(patterns, limits):
+      try:
+        result.append(_match(pattern, limit))
+      except Exception as e:  # pylint: disable=broad-except
+        exceptions[pattern] = e
+
+    if exceptions:
+      raise BeamIOError("Match operation failed", exceptions)
+    return result
+
+  def _path_open(self, path, mode, mime_type='application/octet-stream',
+                 compression_type=CompressionTypes.AUTO):
+    """Helper functions to open a file in the provided mode.
+    """
+    compression_type = FileSystem._get_compression_type(path, compression_type)
+    raw_file = open(path, mode)
+    if compression_type == CompressionTypes.UNCOMPRESSED:
+      return raw_file
+    else:
+      return CompressedFile(raw_file, compression_type=compression_type)
+
+  def create(self, path, mime_type='application/octet-stream',
+             compression_type=CompressionTypes.AUTO):
+    """Returns a write channel for the given file path.
+
+    Args:
+      path: string path of the file object to be written to the system
+      mime_type: MIME type to specify the type of content in the file object
+      compression_type: Type of compression to be used for this object
+
+    Returns: file handle with a close function for the user to use
+    """
+    return self._path_open(path, 'wb', mime_type, compression_type)
+
+  def open(self, path, mime_type='application/octet-stream',
+           compression_type=CompressionTypes.AUTO):
+    """Returns a read channel for the given file path.
+
+    Args:
+      path: string path of the file object to be written to the system
+      mime_type: MIME type to specify the type of content in the file object
+      compression_type: Type of compression to be used for this object
+
+    Returns: file handle with a close function for the user to use
+    """
+    return self._path_open(path, 'rb', mime_type, compression_type)
+
+  def copy(self, source_file_names, destination_file_names):
+    """Recursively copy the file tree from the source to the destination
+
+    Args:
+      source_file_names: list of source file objects that needs to be copied
+      destination_file_names: list of destination of the new object
+
+    Raises:
+      ``BeamIOError`` if any of the copy operations fail
+    """
+    err_msg = ("source_file_names and destination_file_names should "
+               "be equal in length")
+    assert len(source_file_names) == len(destination_file_names), err_msg
+
+    def _copy_path(source, destination):
+      """Recursively copy the file tree from the source to the destination
+      """
+      try:
+        if os.path.exists(destination):
+          if os.path.isdir(destination):
+            shutil.rmtree(destination)
+          else:
+            os.remove(destination)
+        if os.path.isdir(source):
+          shutil.copytree(source, destination)
+        else:
+          shutil.copy2(source, destination)
+      except OSError as err:
+        raise IOError(err)
+
+    exceptions = {}
+    for source, destination in zip(source_file_names, destination_file_names):
+      try:
+        _copy_path(source, destination)
+      except Exception as e:  # pylint: disable=broad-except
+        exceptions[(source, destination)] = e
+
+    if exceptions:
+      raise BeamIOError("Copy operation failed", exceptions)
+
+  def rename(self, source_file_names, destination_file_names):
+    """Rename the files at the source list to the destination list.
+    Source and destination lists should be of the same size.
+
+    Args:
+      source_file_names: List of file paths that need to be moved
+      destination_file_names: List of destination_file_names for the files
+
+    Raises:
+      ``BeamIOError`` if any of the rename operations fail
+    """
+    err_msg = ("source_file_names and destination_file_names should "
+               "be equal in length")
+    assert len(source_file_names) == len(destination_file_names), err_msg
+
+    def _rename_file(source, destination):
+      """Rename a single file object"""
+      try:
+        os.rename(source, destination)
+      except OSError as err:
+        raise IOError(err)
+
+    exceptions = {}
+    for source, destination in zip(source_file_names, destination_file_names):
+      try:
+        _rename_file(source, destination)
+      except Exception as e:  # pylint: disable=broad-except
+        exceptions[(source, destination)] = e
+
+    if exceptions:
+      raise BeamIOError("Rename operation failed", exceptions)
+
+  def exists(self, path):
+    """Check if the provided path exists on the FileSystem.
+
+    Args:
+      path: string path that needs to be checked.
+
+    Returns: boolean flag indicating if path exists
+    """
+    return os.path.exists(path)
+
+  def delete(self, paths):
+    """Deletes files or directories at the provided paths.
+    Directories will be deleted recursively.
+
+    Args:
+      paths: list of paths that give the file objects to be deleted
+
+    Raises:
+      ``BeamIOError`` if any of the delete operations fail
+    """
+    def _delete_path(path):
+      """Recursively delete the file or directory at the provided path.
+      """
+      try:
+        if os.path.isdir(path):
+          shutil.rmtree(path)
+        else:
+          os.remove(path)
+      except OSError as err:
+        raise IOError(err)
+
+    exceptions = {}
+    for path in paths:
+      try:
+        _delete_path(path)
+      except Exception as e:  # pylint: disable=broad-except
+        exceptions[path] = e
+
+    if exceptions:
+      raise BeamIOError("Delete operation failed", exceptions)

http://git-wip-us.apache.org/repos/asf/beam/blob/1bc240bc/sdks/python/apache_beam/io/localfilesystem_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/localfilesystem_test.py b/sdks/python/apache_beam/io/localfilesystem_test.py
new file mode 100644
index 0000000..00059ef
--- /dev/null
+++ b/sdks/python/apache_beam/io/localfilesystem_test.py
@@ -0,0 +1,185 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for LocalFileSystem."""
+
+import unittest
+
+import filecmp
+import os
+import shutil
+import tempfile
+from apache_beam.io.filesystem import BeamIOError
+from apache_beam.io.localfilesystem import LocalFileSystem
+
+
+class LocalFileSystemTest(unittest.TestCase):
+
+  def setUp(self):
+    self.tmpdir = tempfile.mkdtemp()
+    self.fs = LocalFileSystem()
+
+  def tearDown(self):
+    shutil.rmtree(self.tmpdir)
+
+  def test_mkdirs(self):
+    path = os.path.join(self.tmpdir, 't1/t2')
+    self.fs.mkdirs(path)
+    self.assertTrue(os.path.isdir(path))
+
+  def test_mkdirs_failed(self):
+    path = os.path.join(self.tmpdir, 't1/t2')
+    self.fs.mkdirs(path)
+
+    # Check IOError if existing directory is created
+    with self.assertRaises(IOError):
+      self.fs.mkdirs(path)
+
+    with self.assertRaises(IOError):
+      self.fs.mkdirs(os.path.join(self.tmpdir, 't1'))
+
+  def test_match_file(self):
+    path = os.path.join(self.tmpdir, 'f1')
+    open(path, 'a').close()
+
+    # Match files in the temp directory
+    result = self.fs.match([path])[0]
+    files = [f.path for f in result.metadata_list]
+    self.assertEqual(files, [path])
+
+  def test_match_file_empty(self):
+    path = os.path.join(self.tmpdir, 'f2')  # Does not exist
+
+    # Match files in the temp directory
+    result = self.fs.match([path])[0]
+    files = [f.path for f in result.metadata_list]
+    self.assertEqual(files, [])
+
+  def test_match_file_exception(self):
+    # Match files with None so that it throws an exception
+    with self.assertRaises(BeamIOError) as error:
+      self.fs.match([None])
+    self.assertEqual(error.exception.message, 'Match operation failed')
+    self.assertEqual(error.exception.exception_details.keys(), [None])
+
+  def test_match_directory(self):
+    path1 = os.path.join(self.tmpdir, 'f1')
+    path2 = os.path.join(self.tmpdir, 'f2')
+    open(path1, 'a').close()
+    open(path2, 'a').close()
+
+    # Match both the files in the directory
+    path = os.path.join(self.tmpdir, '*')
+    result = self.fs.match([path])[0]
+    files = [f.path for f in result.metadata_list]
+    self.assertEqual(files, [path1, path2])
+
+  def test_match_directory(self):
+    result = self.fs.match([self.tmpdir])[0]
+    files = [f.path for f in result.metadata_list]
+    self.assertEqual(files, [self.tmpdir])
+
+  def test_copy(self):
+    path1 = os.path.join(self.tmpdir, 'f1')
+    path2 = os.path.join(self.tmpdir, 'f2')
+    with open(path1, 'a') as f:
+      f.write('Hello')
+
+    self.fs.copy([path1], [path2])
+    self.assertTrue(filecmp.cmp(path1, path2))
+
+  def test_copy_error(self):
+    path1 = os.path.join(self.tmpdir, 'f1')
+    path2 = os.path.join(self.tmpdir, 'f2')
+    with self.assertRaises(BeamIOError) as error:
+      self.fs.copy([path1], [path2])
+    self.assertEqual(error.exception.message, 'Copy operation failed')
+    self.assertEqual(error.exception.exception_details.keys(), [(path1, path2)])
+
+  def test_copy_directory(self):
+    path_t1 = os.path.join(self.tmpdir, 't1')
+    path_t2 = os.path.join(self.tmpdir, 't2')
+    self.fs.mkdirs(path_t1)
+    self.fs.mkdirs(path_t2)
+
+    path1 = os.path.join(path_t1, 'f1')
+    path2 = os.path.join(path_t2, 'f1')
+    with open(path1, 'a') as f:
+      f.write('Hello')
+
+    self.fs.copy([path_t1], [path_t2])
+    self.assertTrue(filecmp.cmp(path1, path2))
+
+  def test_rename(self):
+    path1 = os.path.join(self.tmpdir, 'f1')
+    path2 = os.path.join(self.tmpdir, 'f2')
+    with open(path1, 'a') as f:
+      f.write('Hello')
+
+    self.fs.rename([path1], [path2])
+    self.assertTrue(self.fs.exists(path2))
+    self.assertFalse(self.fs.exists(path1))
+
+  def test_rename_error(self):
+    path1 = os.path.join(self.tmpdir, 'f1')
+    path2 = os.path.join(self.tmpdir, 'f2')
+    with self.assertRaises(BeamIOError) as error:
+      self.fs.rename([path1], [path2])
+    self.assertEqual(error.exception.message, 'Rename operation failed')
+    self.assertEqual(error.exception.exception_details.keys(), [(path1, path2)])
+
+  def test_rename_directory(self):
+    path_t1 = os.path.join(self.tmpdir, 't1')
+    path_t2 = os.path.join(self.tmpdir, 't2')
+    self.fs.mkdirs(path_t1)
+
+    path1 = os.path.join(path_t1, 'f1')
+    path2 = os.path.join(path_t2, 'f1')
+    with open(path1, 'a') as f:
+      f.write('Hello')
+
+    self.fs.rename([path_t1], [path_t2])
+    self.assertTrue(self.fs.exists(path_t2))
+    self.assertFalse(self.fs.exists(path_t1))
+    self.assertTrue(self.fs.exists(path2))
+    self.assertFalse(self.fs.exists(path1))
+
+  def test_exists(self):
+    path1 = os.path.join(self.tmpdir, 'f1')
+    path2 = os.path.join(self.tmpdir, 'f2')
+    with open(path1, 'a') as f:
+      f.write('Hello')
+    self.assertTrue(self.fs.exists(path1))
+    self.assertFalse(self.fs.exists(path2))
+
+  def test_delete(self):
+    path1 = os.path.join(self.tmpdir, 'f1')
+
+    with open(path1, 'a') as f:
+      f.write('Hello')
+
+    self.assertTrue(self.fs.exists(path1))
+    self.fs.delete([path1])
+    self.assertFalse(self.fs.exists(path1))
+
+  def test_delete_error(self):
+    path1 = os.path.join(self.tmpdir, 'f1')
+    with self.assertRaises(BeamIOError) as error:
+      self.fs.delete([path1])
+    self.assertEqual(error.exception.message, 'Delete operation failed')
+    self.assertEqual(error.exception.exception_details.keys(), [path1])

http://git-wip-us.apache.org/repos/asf/beam/blob/1bc240bc/sdks/python/apache_beam/io/textio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/textio.py b/sdks/python/apache_beam/io/textio.py
index 5bb1a9d..8122fae 100644
--- a/sdks/python/apache_beam/io/textio.py
+++ b/sdks/python/apache_beam/io/textio.py
@@ -25,6 +25,7 @@ from apache_beam import coders
 from apache_beam.io import filebasedsource
 from apache_beam.io import fileio
 from apache_beam.io import iobase
+from apache_beam.io.filesystem import CompressionTypes
 from apache_beam.io.iobase import Read
 from apache_beam.io.iobase import Write
 from apache_beam.transforms import PTransform
@@ -271,7 +272,7 @@ class _TextSink(fileio.FileSink):
                num_shards=0,
                shard_name_template=None,
                coder=coders.ToStringCoder(),
-               compression_type=fileio.CompressionTypes.AUTO,
+               compression_type=CompressionTypes.AUTO,
                header=None):
     """Initialize a _TextSink.
 
@@ -355,7 +356,7 @@ class ReadFromText(PTransform):
       self,
       file_pattern=None,
       min_bundle_size=0,
-      compression_type=fileio.CompressionTypes.AUTO,
+      compression_type=CompressionTypes.AUTO,
       strip_trailing_newlines=True,
       coder=coders.StrUtf8Coder(),
       validate=True,
@@ -404,7 +405,7 @@ class WriteToText(PTransform):
                num_shards=0,
                shard_name_template=None,
                coder=coders.ToStringCoder(),
-               compression_type=fileio.CompressionTypes.AUTO,
+               compression_type=CompressionTypes.AUTO,
                header=None):
     """Initialize a WriteToText PTransform.
 

http://git-wip-us.apache.org/repos/asf/beam/blob/1bc240bc/sdks/python/apache_beam/io/textio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/textio_test.py b/sdks/python/apache_beam/io/textio_test.py
index 04cf44c..b3f4391 100644
--- a/sdks/python/apache_beam/io/textio_test.py
+++ b/sdks/python/apache_beam/io/textio_test.py
@@ -41,7 +41,7 @@ from apache_beam import coders
 from apache_beam.io.filebasedsource_test import EOL
 from apache_beam.io.filebasedsource_test import write_data
 from apache_beam.io.filebasedsource_test import write_pattern
-from apache_beam.io.fileio import CompressionTypes
+from apache_beam.io.filesystem import CompressionTypes
 
 from apache_beam.test_pipeline import TestPipeline
 

http://git-wip-us.apache.org/repos/asf/beam/blob/1bc240bc/sdks/python/apache_beam/io/tfrecordio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/tfrecordio.py b/sdks/python/apache_beam/io/tfrecordio.py
index 05c0a13..8b9d9ea 100644
--- a/sdks/python/apache_beam/io/tfrecordio.py
+++ b/sdks/python/apache_beam/io/tfrecordio.py
@@ -24,6 +24,7 @@ import struct
 from apache_beam import coders
 from apache_beam.io import filebasedsource
 from apache_beam.io import fileio
+from apache_beam.io.filesystem import CompressionTypes
 from apache_beam.io.iobase import Read
 from apache_beam.io.iobase import Write
 from apache_beam.transforms import PTransform
@@ -180,7 +181,7 @@ class ReadFromTFRecord(PTransform):
   def __init__(self,
                file_pattern,
                coder=coders.BytesCoder(),
-               compression_type=fileio.CompressionTypes.AUTO,
+               compression_type=CompressionTypes.AUTO,
                validate=True,
                **kwargs):
     """Initialize a ReadFromTFRecord transform.
@@ -239,7 +240,7 @@ class WriteToTFRecord(PTransform):
                file_name_suffix='',
                num_shards=0,
                shard_name_template=fileio.DEFAULT_SHARD_NAME_TEMPLATE,
-               compression_type=fileio.CompressionTypes.AUTO,
+               compression_type=CompressionTypes.AUTO,
                **kwargs):
     """Initialize WriteToTFRecord transform.
 

http://git-wip-us.apache.org/repos/asf/beam/blob/1bc240bc/sdks/python/apache_beam/io/tfrecordio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/tfrecordio_test.py b/sdks/python/apache_beam/io/tfrecordio_test.py
index df33fcb..49f9639 100644
--- a/sdks/python/apache_beam/io/tfrecordio_test.py
+++ b/sdks/python/apache_beam/io/tfrecordio_test.py
@@ -29,7 +29,7 @@ import unittest
 
 import apache_beam as beam
 from apache_beam import coders
-from apache_beam.io import fileio
+from apache_beam.io.filesystem import CompressionTypes
 from apache_beam.io.tfrecordio import _TFRecordSink
 from apache_beam.io.tfrecordio import _TFRecordSource
 from apache_beam.io.tfrecordio import _TFRecordUtil
@@ -175,7 +175,7 @@ class TestTFRecordSink(_TestCaseWithTempDirCleanUp):
         file_name_suffix='',
         num_shards=0,
         shard_name_template=None,
-        compression_type=fileio.CompressionTypes.UNCOMPRESSED)
+        compression_type=CompressionTypes.UNCOMPRESSED)
     self._write_lines(sink, path, ['foo'])
 
     with open(path, 'r') as f:
@@ -190,7 +190,7 @@ class TestTFRecordSink(_TestCaseWithTempDirCleanUp):
         file_name_suffix='',
         num_shards=0,
         shard_name_template=None,
-        compression_type=fileio.CompressionTypes.UNCOMPRESSED)
+        compression_type=CompressionTypes.UNCOMPRESSED)
     self._write_lines(sink, path, ['foo', 'bar'])
 
     with open(path, 'r') as f:
@@ -205,7 +205,7 @@ class TestWriteToTFRecord(TestTFRecordSink):
     with TestPipeline() as p:
       input_data = ['foo', 'bar']
       _ = p | beam.Create(input_data) | WriteToTFRecord(
-          file_path_prefix, compression_type=fileio.CompressionTypes.GZIP)
+          file_path_prefix, compression_type=CompressionTypes.GZIP)
 
     actual = []
     file_name = glob.glob(file_path_prefix + '-*')[0]
@@ -252,7 +252,7 @@ class TestTFRecordSource(_TestCaseWithTempDirCleanUp):
                     _TFRecordSource(
                         path,
                         coder=coders.BytesCoder(),
-                        compression_type=fileio.CompressionTypes.AUTO,
+                        compression_type=CompressionTypes.AUTO,
                         validate=True)))
       beam.assert_that(result, beam.equal_to(['foo']))
 
@@ -265,7 +265,7 @@ class TestTFRecordSource(_TestCaseWithTempDirCleanUp):
                     _TFRecordSource(
                         path,
                         coder=coders.BytesCoder(),
-                        compression_type=fileio.CompressionTypes.AUTO,
+                        compression_type=CompressionTypes.AUTO,
                         validate=True)))
       beam.assert_that(result, beam.equal_to(['foo', 'bar']))
 
@@ -278,7 +278,7 @@ class TestTFRecordSource(_TestCaseWithTempDirCleanUp):
                     _TFRecordSource(
                         path,
                         coder=coders.BytesCoder(),
-                        compression_type=fileio.CompressionTypes.GZIP,
+                        compression_type=CompressionTypes.GZIP,
                         validate=True)))
       beam.assert_that(result, beam.equal_to(['foo', 'bar']))
 
@@ -291,7 +291,7 @@ class TestTFRecordSource(_TestCaseWithTempDirCleanUp):
                     _TFRecordSource(
                         path,
                         coder=coders.BytesCoder(),
-                        compression_type=fileio.CompressionTypes.AUTO,
+                        compression_type=CompressionTypes.AUTO,
                         validate=True)))
       beam.assert_that(result, beam.equal_to(['foo', 'bar']))
 
@@ -304,7 +304,7 @@ class TestReadFromTFRecordSource(TestTFRecordSource):
     with TestPipeline() as p:
       result = (p
                 | ReadFromTFRecord(
-                    path, compression_type=fileio.CompressionTypes.GZIP))
+                    path, compression_type=CompressionTypes.GZIP))
       beam.assert_that(result, beam.equal_to(['foo', 'bar']))
 
   def test_process_gzip_auto(self):
@@ -313,7 +313,7 @@ class TestReadFromTFRecordSource(TestTFRecordSource):
     with TestPipeline() as p:
       result = (p
                 | ReadFromTFRecord(
-                    path, compression_type=fileio.CompressionTypes.AUTO))
+                    path, compression_type=CompressionTypes.AUTO))
       beam.assert_that(result, beam.equal_to(['foo', 'bar']))
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/1bc240bc/sdks/python/apache_beam/tests/pipeline_verifiers.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/tests/pipeline_verifiers.py b/sdks/python/apache_beam/tests/pipeline_verifiers.py
index 379a96f..0d6814e 100644
--- a/sdks/python/apache_beam/tests/pipeline_verifiers.py
+++ b/sdks/python/apache_beam/tests/pipeline_verifiers.py
@@ -26,7 +26,7 @@ import logging
 
 from hamcrest.core.base_matcher import BaseMatcher
 
-from apache_beam.io.fileio import ChannelFactory
+from apache_beam.io.filesystems_util import get_filesystem
 from apache_beam.runners.runner import PipelineState
 from apache_beam.tests import test_utils as utils
 from apache_beam.utils import retry
@@ -81,6 +81,7 @@ class FileChecksumMatcher(BaseMatcher):
 
   def __init__(self, file_path, expected_checksum):
     self.file_path = file_path
+    self.file_system = get_filesystem(self.file_path)
     self.expected_checksum = expected_checksum
 
   @retry.with_exponential_backoff(
@@ -89,11 +90,12 @@ class FileChecksumMatcher(BaseMatcher):
   def _read_with_retry(self):
     """Read path with retry if I/O failed"""
     read_lines = []
-    matched_path = ChannelFactory.glob(self.file_path)
+    match_result = self.file_system.match([self.file_path])[0]
+    matched_path = [f.path for f in match_result.metadata_list]
     if not matched_path:
       raise IOError('No such file or directory: %s' % self.file_path)
     for path in matched_path:
-      with ChannelFactory.open(path, 'r') as f:
+      with self.file_system.open(path, 'r') as f:
         for line in f:
           read_lines.append(line)
     return read_lines

http://git-wip-us.apache.org/repos/asf/beam/blob/1bc240bc/sdks/python/apache_beam/tests/pipeline_verifiers_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/tests/pipeline_verifiers_test.py b/sdks/python/apache_beam/tests/pipeline_verifiers_test.py
index 586af82..af8f441 100644
--- a/sdks/python/apache_beam/tests/pipeline_verifiers_test.py
+++ b/sdks/python/apache_beam/tests/pipeline_verifiers_test.py
@@ -24,16 +24,20 @@ import unittest
 from hamcrest import assert_that as hc_assert_that
 from mock import Mock, patch
 
-from apache_beam.io.fileio import ChannelFactory
+from apache_beam.io.localfilesystem import LocalFileSystem
 from apache_beam.runners.runner import PipelineState
 from apache_beam.runners.runner import PipelineResult
 from apache_beam.tests import pipeline_verifiers as verifiers
 from apache_beam.tests.test_utils import patch_retry
 
 try:
+  # pylint: disable=wrong-import-order, wrong-import-position
+  # pylint: disable=ungrouped-imports
   from apitools.base.py.exceptions import HttpError
+  from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem
 except ImportError:
   HttpError = None
+  GCSFileSystem = None
 
 
 class PipelineVerifiersTest(unittest.TestCase):
@@ -96,26 +100,26 @@ class PipelineVerifiersTest(unittest.TestCase):
                                               case['expected_checksum'])
       hc_assert_that(self._mock_result, matcher)
 
-  @patch.object(ChannelFactory, 'glob')
-  def test_file_checksum_matcher_read_failed(self, mock_glob):
-    mock_glob.side_effect = IOError('No file found.')
+  @patch.object(LocalFileSystem, 'match')
+  def test_file_checksum_matcher_read_failed(self, mock_match):
+    mock_match.side_effect = IOError('No file found.')
     matcher = verifiers.FileChecksumMatcher('dummy/path', Mock())
     with self.assertRaises(IOError):
       hc_assert_that(self._mock_result, matcher)
-    self.assertTrue(mock_glob.called)
-    self.assertEqual(verifiers.MAX_RETRIES + 1, mock_glob.call_count)
+    self.assertTrue(mock_match.called)
+    self.assertEqual(verifiers.MAX_RETRIES + 1, mock_match.call_count)
 
-  @patch.object(ChannelFactory, 'glob')
+  @patch.object(GCSFileSystem, 'match')
   @unittest.skipIf(HttpError is None, 'google-apitools is not installed')
-  def test_file_checksum_matcher_service_error(self, mock_glob):
-    mock_glob.side_effect = HttpError(
+  def test_file_checksum_matcher_service_error(self, mock_match):
+    mock_match.side_effect = HttpError(
         response={'status': '404'}, url='', content='Not Found',
     )
     matcher = verifiers.FileChecksumMatcher('gs://dummy/path', Mock())
     with self.assertRaises(HttpError):
       hc_assert_that(self._mock_result, matcher)
-    self.assertTrue(mock_glob.called)
-    self.assertEqual(verifiers.MAX_RETRIES + 1, mock_glob.call_count)
+    self.assertTrue(mock_match.called)
+    self.assertEqual(verifiers.MAX_RETRIES + 1, mock_match.call_count)
 
 
 if __name__ == '__main__':


[2/3] beam git commit: BeamFileSystem implementation instead of IOChannelFactory

Posted by ch...@apache.org.
BeamFileSystem implementation instead of IOChannelFactory


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1bc240bc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1bc240bc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1bc240bc

Branch: refs/heads/master
Commit: 1bc240bc637883cb1c04253264534c9d545fdefa
Parents: 7c7bb82
Author: Sourabh Bajaj <so...@google.com>
Authored: Wed Mar 1 23:01:34 2017 -0800
Committer: Chamikara Jayalath <ch...@google.com>
Committed: Wed Mar 22 14:27:01 2017 -0700

----------------------------------------------------------------------
 .../apache_beam/examples/snippets/snippets.py   |   2 +-
 sdks/python/apache_beam/io/avroio.py            |   3 +-
 sdks/python/apache_beam/io/filebasedsource.py   |  83 +--
 .../apache_beam/io/filebasedsource_test.py      |  18 +-
 sdks/python/apache_beam/io/fileio.py            | 588 +++----------------
 sdks/python/apache_beam/io/fileio_test.py       | 102 +---
 sdks/python/apache_beam/io/filesystem.py        | 439 ++++++++++++++
 sdks/python/apache_beam/io/filesystems_util.py  |  31 +
 sdks/python/apache_beam/io/gcp/gcsfilesystem.py | 242 ++++++++
 .../apache_beam/io/gcp/gcsfilesystem_test.py    | 293 +++++++++
 sdks/python/apache_beam/io/gcp/gcsio.py         |  12 +-
 sdks/python/apache_beam/io/gcp/gcsio_test.py    |   9 +-
 sdks/python/apache_beam/io/iobase.py            |   4 +-
 sdks/python/apache_beam/io/localfilesystem.py   | 236 ++++++++
 .../apache_beam/io/localfilesystem_test.py      | 185 ++++++
 sdks/python/apache_beam/io/textio.py            |   7 +-
 sdks/python/apache_beam/io/textio_test.py       |   2 +-
 sdks/python/apache_beam/io/tfrecordio.py        |   5 +-
 sdks/python/apache_beam/io/tfrecordio_test.py   |  20 +-
 .../apache_beam/tests/pipeline_verifiers.py     |   8 +-
 .../tests/pipeline_verifiers_test.py            |  26 +-
 21 files changed, 1613 insertions(+), 702 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/1bc240bc/sdks/python/apache_beam/examples/snippets/snippets.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py
index 18f1506..5cb5ee5 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -856,7 +856,7 @@ def model_textio_compressed(renames, expected):
   # [START model_textio_write_compressed]
   lines = p | 'ReadFromText' >> beam.io.ReadFromText(
       '/path/to/input-*.csv.gz',
-      compression_type=beam.io.fileio.CompressionTypes.GZIP)
+      compression_type=beam.io.filesystem.CompressionTypes.GZIP)
   # [END model_textio_write_compressed]
 
   beam.assert_that(lines, beam.equal_to(expected))

http://git-wip-us.apache.org/repos/asf/beam/blob/1bc240bc/sdks/python/apache_beam/io/avroio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/avroio.py b/sdks/python/apache_beam/io/avroio.py
index 6fdd798..1c08c68 100644
--- a/sdks/python/apache_beam/io/avroio.py
+++ b/sdks/python/apache_beam/io/avroio.py
@@ -29,6 +29,7 @@ import apache_beam as beam
 from apache_beam.io import filebasedsource
 from apache_beam.io import fileio
 from apache_beam.io import iobase
+from apache_beam.io.filesystem import CompressionTypes
 from apache_beam.io.iobase import Read
 from apache_beam.transforms import PTransform
 
@@ -354,7 +355,7 @@ class _AvroSink(fileio.FileSink):
         mime_type=mime_type,
         # Compression happens at the block level using the supplied codec, and
         # not at the file level.
-        compression_type=fileio.CompressionTypes.UNCOMPRESSED)
+        compression_type=CompressionTypes.UNCOMPRESSED)
     self._schema = schema
     self._codec = codec
 

http://git-wip-us.apache.org/repos/asf/beam/blob/1bc240bc/sdks/python/apache_beam/io/filebasedsource.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource.py b/sdks/python/apache_beam/io/filebasedsource.py
index 582d673..a3e0667 100644
--- a/sdks/python/apache_beam/io/filebasedsource.py
+++ b/sdks/python/apache_beam/io/filebasedsource.py
@@ -25,16 +25,13 @@ for more details.
 For an example implementation of ``FileBasedSource`` see ``avroio.AvroSource``.
 """
 
-import random
-
 from apache_beam.internal import pickler
-from apache_beam.internal import util
 from apache_beam.io import concat_source
-from apache_beam.io import fileio
 from apache_beam.io import iobase
 from apache_beam.io import range_trackers
+from apache_beam.io.filesystem import CompressionTypes
+from apache_beam.io.filesystems_util import get_filesystem
 from apache_beam.transforms.display import DisplayDataItem
-
 MAX_NUM_THREADS_FOR_SIZE_ESTIMATION = 25
 
 
@@ -47,7 +44,7 @@ class FileBasedSource(iobase.BoundedSource):
   def __init__(self,
                file_pattern,
                min_bundle_size=0,
-               compression_type=fileio.CompressionTypes.AUTO,
+               compression_type=CompressionTypes.AUTO,
                splittable=True,
                validate=True):
     """Initializes ``FileBasedSource``.
@@ -81,14 +78,15 @@ class FileBasedSource(iobase.BoundedSource):
           (self.__class__.__name__, file_pattern))
 
     self._pattern = file_pattern
+    self._file_system = get_filesystem(file_pattern)
     self._concat_source = None
     self._min_bundle_size = min_bundle_size
-    if not fileio.CompressionTypes.is_valid_compression_type(compression_type):
+    if not CompressionTypes.is_valid_compression_type(compression_type):
       raise TypeError('compression_type must be CompressionType object but '
                       'was %s' % type(compression_type))
     self._compression_type = compression_type
-    if compression_type in (fileio.CompressionTypes.UNCOMPRESSED,
-                            fileio.CompressionTypes.AUTO):
+    if compression_type in (CompressionTypes.UNCOMPRESSED,
+                            CompressionTypes.AUTO):
       self._splittable = splittable
     else:
       # We can't split compressed files efficiently so turn off splitting.
@@ -105,9 +103,8 @@ class FileBasedSource(iobase.BoundedSource):
   def _get_concat_source(self):
     if self._concat_source is None:
       single_file_sources = []
-      file_names = [f for f in fileio.ChannelFactory.glob(self._pattern)]
-      sizes = FileBasedSource._estimate_sizes_of_files(file_names,
-                                                       self._pattern)
+      match_result = self._file_system.match([self._pattern])[0]
+      files_metadata = match_result.metadata_list
 
       # We create a reference for FileBasedSource that will be serialized along
       # with each _SingleFileSource. To prevent this FileBasedSource from having
@@ -115,23 +112,25 @@ class FileBasedSource(iobase.BoundedSource):
       # we clone it here.
       file_based_source_ref = pickler.loads(pickler.dumps(self))
 
-      for index, file_name in enumerate(file_names):
-        if sizes[index] == 0:
+      for file_metadata in files_metadata:
+        file_name = file_metadata.path
+        file_size = file_metadata.size_in_bytes
+        if file_size == 0:
           continue  # Ignoring empty file.
 
         # We determine splittability of this specific file.
         splittable = self.splittable
         if (splittable and
-            self._compression_type == fileio.CompressionTypes.AUTO):
-          compression_type = fileio.CompressionTypes.detect_compression_type(
+            self._compression_type == CompressionTypes.AUTO):
+          compression_type = CompressionTypes.detect_compression_type(
               file_name)
-          if compression_type != fileio.CompressionTypes.UNCOMPRESSED:
+          if compression_type != CompressionTypes.UNCOMPRESSED:
             splittable = False
 
         single_file_source = _SingleFileSource(
             file_based_source_ref, file_name,
             0,
-            sizes[index],
+            file_size,
             min_bundle_size=self._min_bundle_size,
             splittable=splittable)
         single_file_sources.append(single_file_source)
@@ -139,36 +138,16 @@ class FileBasedSource(iobase.BoundedSource):
     return self._concat_source
 
   def open_file(self, file_name):
-    return fileio.ChannelFactory.open(
-        file_name, 'rb', 'application/octet-stream',
+    return get_filesystem(file_name).open(
+        file_name, 'application/octet-stream',
         compression_type=self._compression_type)
 
-  @staticmethod
-  def _estimate_sizes_of_files(file_names, pattern=None):
-    """Returns the size of all the files as an ordered list based on the file
-    names that are provided here. If the pattern is specified here then we use
-    the size_of_files_in_glob method to get the size of files matching the glob
-    for performance improvements instead of getting the size one by one.
-    """
-    if not file_names:
-      return []
-    elif len(file_names) == 1:
-      return [fileio.ChannelFactory.size_in_bytes(file_names[0])]
-    else:
-      if pattern is None:
-        return util.run_using_threadpool(
-            fileio.ChannelFactory.size_in_bytes, file_names,
-            MAX_NUM_THREADS_FOR_SIZE_ESTIMATION)
-      else:
-        file_sizes = fileio.ChannelFactory.size_of_files_in_glob(pattern,
-                                                                 file_names)
-        return [file_sizes[f] for f in file_names]
-
   def _validate(self):
     """Validate if there are actual files in the specified glob pattern
     """
     # Limit the responses as we only want to check if something exists
-    if len(fileio.ChannelFactory.glob(self._pattern, limit=1)) <= 0:
+    match_result = self._file_system.match([self._pattern], limits=[1])[0]
+    if len(match_result.metadata_list) <= 0:
       raise IOError(
           'No files found based on the file pattern %s' % self._pattern)
 
@@ -180,24 +159,8 @@ class FileBasedSource(iobase.BoundedSource):
         stop_position=stop_position)
 
   def estimate_size(self):
-    file_names = [f for f in fileio.ChannelFactory.glob(self._pattern)]
-    # We're reading very few files so we can pass names file names to
-    # _estimate_sizes_of_files without pattern as otherwise we'll try to do
-    # optimization based on the pattern and might end up reading much more
-    # data than needed for a few files.
-    if (len(file_names) <=
-        FileBasedSource.MIN_NUMBER_OF_FILES_TO_STAT):
-      return sum(self._estimate_sizes_of_files(file_names))
-    else:
-      # Estimating size of a random sample.
-      # TODO: better support distributions where file sizes are not
-      # approximately equal.
-      sample_size = max(FileBasedSource.MIN_NUMBER_OF_FILES_TO_STAT,
-                        int(len(file_names) *
-                            FileBasedSource.MIN_FRACTION_OF_FILES_TO_STAT))
-      sample = random.sample(file_names, sample_size)
-      estimate = self._estimate_sizes_of_files(sample)
-      return int(sum(estimate) * (float(len(file_names)) / len(sample)))
+    match_result = self._file_system.match([self._pattern])[0]
+    return sum([f.size_in_bytes for f in match_result.metadata_list])
 
   def read(self, range_tracker):
     return self._get_concat_source().read(range_tracker)

http://git-wip-us.apache.org/repos/asf/beam/blob/1bc240bc/sdks/python/apache_beam/io/filebasedsource_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource_test.py b/sdks/python/apache_beam/io/filebasedsource_test.py
index 7481c4c..7b7ec8a 100644
--- a/sdks/python/apache_beam/io/filebasedsource_test.py
+++ b/sdks/python/apache_beam/io/filebasedsource_test.py
@@ -29,9 +29,9 @@ import hamcrest as hc
 
 import apache_beam as beam
 from apache_beam.io import filebasedsource
-from apache_beam.io import fileio
 from apache_beam.io import iobase
 from apache_beam.io import range_trackers
+from apache_beam.io.filesystem import CompressionTypes
 
 # importing following private classes for testing
 from apache_beam.io.concat_source import ConcatSource
@@ -409,7 +409,7 @@ class TestFileBasedSource(unittest.TestCase):
     pcoll = pipeline | 'Read' >> beam.Read(LineSource(
         filename,
         splittable=False,
-        compression_type=fileio.CompressionTypes.BZIP2))
+        compression_type=CompressionTypes.BZIP2))
     assert_that(pcoll, equal_to(lines))
     pipeline.run()
 
@@ -424,7 +424,7 @@ class TestFileBasedSource(unittest.TestCase):
     pcoll = pipeline | 'Read' >> beam.Read(LineSource(
         filename,
         splittable=False,
-        compression_type=fileio.CompressionTypes.GZIP))
+        compression_type=CompressionTypes.GZIP))
     assert_that(pcoll, equal_to(lines))
     pipeline.run()
 
@@ -442,7 +442,7 @@ class TestFileBasedSource(unittest.TestCase):
     pcoll = pipeline | 'Read' >> beam.Read(LineSource(
         file_pattern,
         splittable=False,
-        compression_type=fileio.CompressionTypes.BZIP2))
+        compression_type=CompressionTypes.BZIP2))
     assert_that(pcoll, equal_to(lines))
     pipeline.run()
 
@@ -461,7 +461,7 @@ class TestFileBasedSource(unittest.TestCase):
     pcoll = pipeline | 'Read' >> beam.Read(LineSource(
         file_pattern,
         splittable=False,
-        compression_type=fileio.CompressionTypes.GZIP))
+        compression_type=CompressionTypes.GZIP))
     assert_that(pcoll, equal_to(lines))
     pipeline.run()
 
@@ -475,7 +475,7 @@ class TestFileBasedSource(unittest.TestCase):
     pipeline = TestPipeline()
     pcoll = pipeline | 'Read' >> beam.Read(LineSource(
         filename,
-        compression_type=fileio.CompressionTypes.AUTO))
+        compression_type=CompressionTypes.AUTO))
     assert_that(pcoll, equal_to(lines))
     pipeline.run()
 
@@ -489,7 +489,7 @@ class TestFileBasedSource(unittest.TestCase):
     pipeline = TestPipeline()
     pcoll = pipeline | 'Read' >> beam.Read(LineSource(
         filename,
-        compression_type=fileio.CompressionTypes.AUTO))
+        compression_type=CompressionTypes.AUTO))
     assert_that(pcoll, equal_to(lines))
     pipeline.run()
 
@@ -508,7 +508,7 @@ class TestFileBasedSource(unittest.TestCase):
     pipeline = TestPipeline()
     pcoll = pipeline | 'Read' >> beam.Read(LineSource(
         file_pattern,
-        compression_type=fileio.CompressionTypes.AUTO))
+        compression_type=CompressionTypes.AUTO))
     assert_that(pcoll, equal_to(lines))
     pipeline.run()
 
@@ -530,7 +530,7 @@ class TestFileBasedSource(unittest.TestCase):
     pipeline = TestPipeline()
     pcoll = pipeline | 'Read' >> beam.Read(LineSource(
         file_pattern,
-        compression_type=fileio.CompressionTypes.AUTO))
+        compression_type=CompressionTypes.AUTO))
     assert_that(pcoll, equal_to(lines))
     pipeline.run()
 

http://git-wip-us.apache.org/repos/asf/beam/blob/1bc240bc/sdks/python/apache_beam/io/fileio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py
index 49a2082..0759ce4 100644
--- a/sdks/python/apache_beam/io/fileio.py
+++ b/sdks/python/apache_beam/io/fileio.py
@@ -18,146 +18,40 @@
 
 from __future__ import absolute_import
 
-import bz2
-import cStringIO
-import glob
 import logging
 import os
 import re
-import shutil
 import time
-import zlib
 
 from apache_beam.internal import util
 from apache_beam.io import iobase
+from apache_beam.io.filesystem import BeamIOError
+from apache_beam.io.filesystem import CompressedFile as _CompressedFile
+from apache_beam.io.filesystem import CompressionTypes
+from apache_beam.io.filesystems_util import get_filesystem
 from apache_beam.transforms.display import DisplayDataItem
 
-# TODO(sourabhbajaj): Fix the constant values after the new IO factory
-# Current constants are copy pasted from gcsio.py till we fix this.
-# Protect against environments where apitools library is not available.
-# pylint: disable=wrong-import-order, wrong-import-position
-try:
-  from apache_beam.io.gcp import gcsio
-  DEFAULT_READ_BUFFER_SIZE = gcsio.DEFAULT_READ_BUFFER_SIZE
-  MAX_BATCH_OPERATION_SIZE = gcsio.MAX_BATCH_OPERATION_SIZE
-except ImportError:
-  class FakeGcsIO(object):
-    def __getattr__(self, attr):
-      raise ImportError(
-          'Google Cloud Platform IO not available, '
-          'please install apache_beam[gcp]')
-  gcsio = FakeGcsIO()
-  DEFAULT_READ_BUFFER_SIZE = 16 * 1024 * 1024
-  MAX_BATCH_OPERATION_SIZE = 100
-# pylint: enable=wrong-import-order, wrong-import-position
-
-
+MAX_BATCH_OPERATION_SIZE = 100
 DEFAULT_SHARD_NAME_TEMPLATE = '-SSSSS-of-NNNNN'
 
 
-class _CompressionType(object):
-  """Object representing single compression type."""
-
-  def __init__(self, identifier):
-    self.identifier = identifier
-
-  def __eq__(self, other):
-    return (isinstance(other, _CompressionType) and
-            self.identifier == other.identifier)
-
-  def __hash__(self):
-    return hash(self.identifier)
-
-  def __ne__(self, other):
-    return not self.__eq__(other)
-
-  def __str__(self):
-    return self.identifier
-
-  def __repr__(self):
-    return '_CompressionType(%s)' % self.identifier
-
-
-class CompressionTypes(object):
-  """Enum-like class representing known compression types."""
-
-  # Detect compression based on filename extension.
-  #
-  # The following extensions are currently recognized by auto-detection:
-  #   .bz2 (implies BZIP2 as described below).
-  #   .gz  (implies GZIP as described below)
-  # Any non-recognized extension implies UNCOMPRESSED as described below.
-  AUTO = _CompressionType('auto')
-
-  # BZIP2 compression.
-  BZIP2 = _CompressionType('bzip2')
-
-  # GZIP compression (deflate with GZIP headers).
-  GZIP = _CompressionType('gzip')
-
-  # Uncompressed (i.e., may be split).
-  UNCOMPRESSED = _CompressionType('uncompressed')
-
-  @classmethod
-  def is_valid_compression_type(cls, compression_type):
-    """Returns true for valid compression types, false otherwise."""
-    return isinstance(compression_type, _CompressionType)
-
-  @classmethod
-  def mime_type(cls, compression_type, default='application/octet-stream'):
-    mime_types_by_compression_type = {
-        cls.BZIP2: 'application/x-bz2',
-        cls.GZIP: 'application/x-gzip',
-    }
-    return mime_types_by_compression_type.get(compression_type, default)
-
-  @classmethod
-  def detect_compression_type(cls, file_path):
-    """Returns the compression type of a file (based on its suffix)."""
-    compression_types_by_suffix = {'.bz2': cls.BZIP2, '.gz': cls.GZIP}
-    lowercased_path = file_path.lower()
-    for suffix, compression_type in compression_types_by_suffix.iteritems():
-      if lowercased_path.endswith(suffix):
-        return compression_type
-    return cls.UNCOMPRESSED
-
-
+# TODO(sourabhbajaj): Remove this after BFS API is used everywhere
 class ChannelFactory(object):
-  # TODO: Generalize into extensible framework.
-
   @staticmethod
   def mkdir(path):
-    if path.startswith('gs://'):
-      return
-    else:
-      try:
-        os.makedirs(path)
-      except OSError as err:
-        raise IOError(err)
+    bfs = get_filesystem(path)
+    bfs.mkdirs(path)
 
   @staticmethod
   def open(path,
            mode,
            mime_type='application/octet-stream',
            compression_type=CompressionTypes.AUTO):
-    if compression_type == CompressionTypes.AUTO:
-      compression_type = CompressionTypes.detect_compression_type(path)
-    elif not CompressionTypes.is_valid_compression_type(compression_type):
-      raise TypeError('compression_type must be CompressionType object but '
-                      'was %s' % type(compression_type))
-
-    if path.startswith('gs://'):
-      raw_file = gcsio.GcsIO().open(
-          path,
-          mode,
-          mime_type=CompressionTypes.mime_type(compression_type, mime_type))
-    else:
-      raw_file = open(path, mode)
-
-    if compression_type == CompressionTypes.UNCOMPRESSED:
-      return raw_file
-    else:
-      return _CompressedFile(raw_file, compression_type=compression_type)
+    bfs = get_filesystem(path)
+    if mode == 'rb':
+      return bfs.open(path, mime_type, compression_type)
+    elif mode == 'wb':
+      return bfs.create(path, mime_type, compression_type)
 
   @staticmethod
   def is_compressed(fileobj):
@@ -165,358 +59,66 @@ class ChannelFactory(object):
 
   @staticmethod
   def rename(src, dest):
-    if src.startswith('gs://'):
-      if not dest.startswith('gs://'):
-        raise ValueError('Destination %r must be GCS path.', dest)
-      gcsio.GcsIO().rename(src, dest)
-    else:
-      try:
-        os.rename(src, dest)
-      except OSError as err:
-        raise IOError(err)
+    bfs = get_filesystem(path)
+    bfs.rename([src], [dest])
 
   @staticmethod
   def rename_batch(src_dest_pairs):
-    # Filter out local and GCS operations.
-    local_src_dest_pairs = []
-    gcs_src_dest_pairs = []
-    for src, dest in src_dest_pairs:
-      if src.startswith('gs://'):
-        if not dest.startswith('gs://'):
-          raise ValueError('Destination %r must be GCS path.', dest)
-        gcs_src_dest_pairs.append((src, dest))
-      else:
-        local_src_dest_pairs.append((src, dest))
-
-    # Execute local operations.
-    exceptions = []
-    for src, dest in local_src_dest_pairs:
-      try:
-        ChannelFactory.rename(src, dest)
-      except Exception as e:  # pylint: disable=broad-except
-        exceptions.append((src, dest, e))
-
-    # Execute GCS operations.
-    exceptions += ChannelFactory._rename_gcs_batch(gcs_src_dest_pairs)
-
-    return exceptions
-
-  @staticmethod
-  def _rename_gcs_batch(src_dest_pairs):
-    # Prepare batches.
-    gcs_batches = []
-    gcs_current_batch = []
-    for src, dest in src_dest_pairs:
-      gcs_current_batch.append((src, dest))
-      if len(gcs_current_batch) == MAX_BATCH_OPERATION_SIZE:
-        gcs_batches.append(gcs_current_batch)
-        gcs_current_batch = []
-    if gcs_current_batch:
-      gcs_batches.append(gcs_current_batch)
-
-    # Execute GCS renames if any and return exceptions.
-    exceptions = []
-    for batch in gcs_batches:
-      copy_statuses = gcsio.GcsIO().copy_batch(batch)
-      copy_succeeded = []
-      for src, dest, exception in copy_statuses:
-        if exception:
-          exceptions.append((src, dest, exception))
-        else:
-          copy_succeeded.append((src, dest))
-      delete_batch = [src for src, dest in copy_succeeded]
-      delete_statuses = gcsio.GcsIO().delete_batch(delete_batch)
-      for i, (src, exception) in enumerate(delete_statuses):
-        dest = copy_succeeded[i]
-        if exception:
-          exceptions.append((src, dest, exception))
-    return exceptions
+    sources = [s for s, _ in src_dest_pairs]
+    destinations = [d for _, d in src_dest_pairs]
+    bfs = get_filesystem()
+    try:
+      bfs.rename(sources, destinations)
+      return []
+    except BeamIOError as exp:
+      return [(s, d, e) for (s, d), e in exp.exception_details.iteritems()]
 
   @staticmethod
   def copytree(src, dest):
-    if src.startswith('gs://'):
-      if not dest.startswith('gs://'):
-        raise ValueError('Destination %r must be GCS path.', dest)
-      assert src.endswith('/'), src
-      assert dest.endswith('/'), dest
-      gcsio.GcsIO().copytree(src, dest)
-    else:
-      try:
-        if os.path.exists(dest):
-          shutil.rmtree(dest)
-        shutil.copytree(src, dest)
-      except OSError as err:
-        raise IOError(err)
+    bfs = get_filesystem()
+    bfs.copy([src], [dest])
 
   @staticmethod
   def exists(path):
-    if path.startswith('gs://'):
-      return gcsio.GcsIO().exists(path)
-    else:
-      return os.path.exists(path)
+    bfs = get_filesystem(path)
+    bfs.exists(path)
 
   @staticmethod
   def rmdir(path):
-    if path.startswith('gs://'):
-      gcs = gcsio.GcsIO()
-      if not path.endswith('/'):
-        path += '/'
-      # TODO: Threadpool?
-      for entry in gcs.glob(path + '*'):
-        gcs.delete(entry)
-    else:
-      try:
-        shutil.rmtree(path)
-      except OSError as err:
-        raise IOError(err)
+    bfs = get_filesystem(path)
+    bfs.delete([path])
 
   @staticmethod
   def rm(path):
-    if path.startswith('gs://'):
-      gcsio.GcsIO().delete(path)
-    else:
-      try:
-        os.remove(path)
-      except OSError as err:
-        raise IOError(err)
+    bfs = get_filesystem(path)
+    bfs.delete([path])
 
   @staticmethod
   def glob(path, limit=None):
-    if path.startswith('gs://'):
-      return gcsio.GcsIO().glob(path, limit)
-    else:
-      files = glob.glob(path)
-      return files[:limit]
+    bfs = get_filesystem(path)
+    match_result = bfs.match([path], [limit])[0]
+    return [f.path for f in match_result.metadata_list]
 
   @staticmethod
   def size_in_bytes(path):
-    """Returns the size of a file in bytes.
-
-    Args:
-      path: a string that gives the path of a single file.
-    """
-    if path.startswith('gs://'):
-      return gcsio.GcsIO().size(path)
-    else:
-      return os.path.getsize(path)
+    bfs = get_filesystem(path)
+    match_result = bfs.match([path], [limit])[0]
+    return [f.size_in_bytes for f in match_result.metadata_list][0]
 
   @staticmethod
   def size_of_files_in_glob(path, file_names=None):
-    """Returns a map of file names to sizes.
-
-    Args:
-      path: a file path pattern that reads the size of all the files
-      file_names: List of file names that we need size for, this is added to
-        support eventually consistent sources where two expantions of glob
-        might yield to different files.
-    """
-    if path.startswith('gs://'):
-      file_sizes = gcsio.GcsIO().size_of_files_in_glob(path)
-      if file_names is None:
-        return file_sizes
-      else:
-        result = {}
-        # We need to make sure we fetched the size for all the files as the
-        # list API in GCS is eventually consistent so directly call size for
-        # any files that may be missing.
-        for file_name in file_names:
-          if file_name in file_sizes:
-            result[file_name] = file_sizes[file_name]
-          else:
-            result[file_name] = ChannelFactory.size_in_bytes(file_name)
-        return result
-    else:
-      if file_names is None:
-        file_names = ChannelFactory.glob(path)
-      return {file_name: ChannelFactory.size_in_bytes(file_name)
-              for file_name in file_names}
-
-
-class _CompressedFile(object):
-  """Somewhat limited file wrapper for easier handling of compressed files."""
-
-  # The bit mask to use for the wbits parameters of the zlib compressor and
-  # decompressor objects.
-  _gzip_mask = zlib.MAX_WBITS | 16  # Mask when using GZIP headers.
-
-  def __init__(self,
-               fileobj,
-               compression_type=CompressionTypes.GZIP,
-               read_size=DEFAULT_READ_BUFFER_SIZE):
-    if not fileobj:
-      raise ValueError('File object must be opened file but was at %s' %
-                       fileobj)
-
-    if not CompressionTypes.is_valid_compression_type(compression_type):
-      raise TypeError('compression_type must be CompressionType object but '
-                      'was %s' % type(compression_type))
-    if compression_type in (CompressionTypes.AUTO, CompressionTypes.UNCOMPRESSED
-                           ):
-      raise ValueError(
-          'Cannot create object with unspecified or no compression')
-
-    self._file = fileobj
-    self._compression_type = compression_type
-
-    if self._file.tell() != 0:
-      raise ValueError('File object must be at position 0 but was %d' %
-                       self._file.tell())
-    self._uncompressed_position = 0
-
-    if self.readable():
-      self._read_size = read_size
-      self._read_buffer = cStringIO.StringIO()
-      self._read_position = 0
-      self._read_eof = False
-
-      if self._compression_type == CompressionTypes.BZIP2:
-        self._decompressor = bz2.BZ2Decompressor()
-      else:
-        assert self._compression_type == CompressionTypes.GZIP
-        self._decompressor = zlib.decompressobj(self._gzip_mask)
-    else:
-      self._decompressor = None
-
-    if self.writeable():
-      if self._compression_type == CompressionTypes.BZIP2:
-        self._compressor = bz2.BZ2Compressor()
-      else:
-        assert self._compression_type == CompressionTypes.GZIP
-        self._compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION,
-                                            zlib.DEFLATED, self._gzip_mask)
-    else:
-      self._compressor = None
-
-  def readable(self):
-    mode = self._file.mode
-    return 'r' in mode or 'a' in mode
-
-  def writeable(self):
-    mode = self._file.mode
-    return 'w' in mode or 'a' in mode
-
-  def write(self, data):
-    """Write data to file."""
-    if not self._compressor:
-      raise ValueError('compressor not initialized')
-    self._uncompressed_position += len(data)
-    compressed = self._compressor.compress(data)
-    if compressed:
-      self._file.write(compressed)
-
-  def _fetch_to_internal_buffer(self, num_bytes):
-    """Fetch up to num_bytes into the internal buffer."""
-    if (not self._read_eof and self._read_position > 0 and
-        (self._read_buffer.tell() - self._read_position) < num_bytes):
-      # There aren't enough number of bytes to accommodate a read, so we
-      # prepare for a possibly large read by clearing up all internal buffers
-      # but without dropping any previous held data.
-      self._read_buffer.seek(self._read_position)
-      data = self._read_buffer.read()
-      self._read_position = 0
-      self._read_buffer.seek(0)
-      self._read_buffer.truncate(0)
-      self._read_buffer.write(data)
-
-    while not self._read_eof and (self._read_buffer.tell() - self._read_position
-                                 ) < num_bytes:
-      # Continue reading from the underlying file object until enough bytes are
-      # available, or EOF is reached.
-      buf = self._file.read(self._read_size)
-      if buf:
-        decompressed = self._decompressor.decompress(buf)
-        del buf  # Free up some possibly large and no-longer-needed memory.
-        self._read_buffer.write(decompressed)
-      else:
-        # EOF reached.
-        # Verify completeness and no corruption and flush (if needed by
-        # the underlying algorithm).
-        if self._compression_type == CompressionTypes.BZIP2:
-          # Having unused_data past end of stream would imply file corruption.
-          assert not self._decompressor.unused_data, 'Possible file corruption.'
-          try:
-            # EOF implies that the underlying BZIP2 stream must also have
-            # reached EOF. We expect this to raise an EOFError and we catch it
-            # below. Any other kind of error though would be problematic.
-            self._decompressor.decompress('dummy')
-            assert False, 'Possible file corruption.'
-          except EOFError:
-            pass  # All is as expected!
-        else:
-          self._read_buffer.write(self._decompressor.flush())
-
-        # Record that we have hit the end of file, so we won't unnecessarily
-        # repeat the completeness verification step above.
-        self._read_eof = True
-
-  def _read_from_internal_buffer(self, read_fn):
-    """Read from the internal buffer by using the supplied read_fn."""
-    self._read_buffer.seek(self._read_position)
-    result = read_fn()
-    self._read_position += len(result)
-    self._uncompressed_position += len(result)
-    self._read_buffer.seek(0, os.SEEK_END)  # Allow future writes.
-    return result
-
-  def read(self, num_bytes):
-    if not self._decompressor:
-      raise ValueError('decompressor not initialized')
-
-    self._fetch_to_internal_buffer(num_bytes)
-    return self._read_from_internal_buffer(
-        lambda: self._read_buffer.read(num_bytes))
-
-  def readline(self):
-    """Equivalent to standard file.readline(). Same return conventions apply."""
-    if not self._decompressor:
-      raise ValueError('decompressor not initialized')
-
-    io = cStringIO.StringIO()
-    while True:
-      # Ensure that the internal buffer has at least half the read_size. Going
-      # with half the _read_size (as opposed to a full _read_size) to ensure
-      # that actual fetches are more evenly spread out, as opposed to having 2
-      # consecutive reads at the beginning of a read.
-      self._fetch_to_internal_buffer(self._read_size / 2)
-      line = self._read_from_internal_buffer(
-          lambda: self._read_buffer.readline())
-      io.write(line)
-      if line.endswith('\n') or not line:
-        break  # Newline or EOF reached.
-
-    return io.getvalue()
-
-  def closed(self):
-    return not self._file or self._file.closed()
-
-  def close(self):
-    if self.readable():
-      self._read_buffer.close()
-
-    if self.writeable():
-      self._file.write(self._compressor.flush())
+    bfs = get_filesystem(path)
+    match_result = bfs.match([path], [limit])[0]
+    part_files = {f.path:f.size_in_bytes for f in match_result.metadata_list}
 
-    self._file.close()
+    if file_names is not None:
+      specific_files = {}
+      match_results = bfs.match(file_names)
+      for match_result in match_results:
+        for metadata in match_result.metadata_list:
+          specific_files[metadata.path] = metadata.size_in_bytes
 
-  def flush(self):
-    if self.writeable():
-      self._file.write(self._compressor.flush())
-    self._file.flush()
-
-  @property
-  def seekable(self):
-    # TODO: Add support for seeking to a file position.
-    return False
-
-  def tell(self):
-    """Returns current position in uncompressed file."""
-    return self._uncompressed_position
-
-  def __enter__(self):
-    return self
-
-  def __exit__(self, exception_type, exception_value, traceback):
-    self.close()
+    return part_files.update(specific_files)
 
 
 class FileSink(iobase.Sink):
@@ -570,6 +172,7 @@ class FileSink(iobase.Sink):
     self.shard_name_format = self._template_to_format(shard_name_template)
     self.compression_type = compression_type
     self.mime_type = mime_type
+    self._file_system = get_filesystem(file_path_prefix)
 
   def display_data(self):
     return {'shards':
@@ -589,11 +192,8 @@ class FileSink(iobase.Sink):
     The returned file handle is passed to ``write_[encoded_]record`` and
     ``close``.
     """
-    return ChannelFactory.open(
-        temp_path,
-        'wb',
-        mime_type=self.mime_type,
-        compression_type=self.compression_type)
+    return self._file_system.create(temp_path, self.mime_type,
+                                    self.compression_type)
 
   def write_record(self, file_handle, value):
     """Writes a single record go the file handle returned by ``open()``.
@@ -621,7 +221,7 @@ class FileSink(iobase.Sink):
   def initialize_write(self):
     tmp_dir = self.file_path_prefix + self.file_name_suffix + time.strftime(
         '-temp-%Y-%m-%d_%H-%M-%S')
-    ChannelFactory().mkdir(tmp_dir)
+    self._file_system.mkdirs(tmp_dir)
     return tmp_dir
 
   def open_writer(self, init_result, uid):
@@ -639,75 +239,79 @@ class FileSink(iobase.Sink):
     min_threads = min(num_shards, FileSink._MAX_RENAME_THREADS)
     num_threads = max(1, min_threads)
 
-    rename_ops = []
+    source_files = []
+    destination_files = []
     for shard_num, shard in enumerate(writer_results):
       final_name = ''.join([
           self.file_path_prefix, self.shard_name_format % dict(
               shard_num=shard_num, num_shards=num_shards), self.file_name_suffix
       ])
-      rename_ops.append((shard, final_name))
-
-    batches = []
-    current_batch = []
-    for rename_op in rename_ops:
-      current_batch.append(rename_op)
-      if len(current_batch) == MAX_BATCH_OPERATION_SIZE:
-        batches.append(current_batch)
-        current_batch = []
-    if current_batch:
-      batches.append(current_batch)
+      source_files.append(shard)
+      destination_files.append(final_name)
+
+    source_file_batch = [source_files[i:i + MAX_BATCH_OPERATION_SIZE]
+                         for i in xrange(0, len(source_files),
+                                         MAX_BATCH_OPERATION_SIZE)]
+    destination_file_batch = [destination_files[i:i + MAX_BATCH_OPERATION_SIZE]
+                              for i in xrange(0, len(destination_files),
+                                              MAX_BATCH_OPERATION_SIZE)]
 
     logging.info(
         'Starting finalize_write threads with num_shards: %d, '
         'batches: %d, num_threads: %d',
-        num_shards, len(batches), num_threads)
+        num_shards, len(source_file_batch), num_threads)
     start_time = time.time()
 
     # Use a thread pool for renaming operations.
     def _rename_batch(batch):
       """_rename_batch executes batch rename operations."""
+      source_files, destination_files = batch
       exceptions = []
-      exception_infos = ChannelFactory.rename_batch(batch)
-      for src, dest, exception in exception_infos:
-        if exception:
-          logging.warning('Rename not successful: %s -> %s, %s', src, dest,
-                          exception)
-          should_report = True
-          if isinstance(exception, IOError):
-            # May have already been copied.
-            try:
-              if ChannelFactory.exists(dest):
-                should_report = False
-            except Exception as exists_e:  # pylint: disable=broad-except
-              logging.warning('Exception when checking if file %s exists: '
-                              '%s', dest, exists_e)
-          if should_report:
-            logging.warning(('Exception in _rename_batch. src: %s, '
-                             'dest: %s, err: %s'), src, dest, exception)
-            exceptions.append(exception)
-        else:
-          logging.debug('Rename successful: %s -> %s', src, dest)
-      return exceptions
+      try:
+        self._file_system.rename(source_files, destination_files)
+        return exceptions
+      except BeamIOError as exp:
+        if exp.exception_details is None:
+          raise exp
+        for (src, dest), exception in exp.exception_details.iteritems():
+          if exception:
+            logging.warning('Rename not successful: %s -> %s, %s', src, dest,
+                            exception)
+            should_report = True
+            if isinstance(exception, IOError):
+              # May have already been copied.
+              try:
+                if self._file_system.exists(dest):
+                  should_report = False
+              except Exception as exists_e:  # pylint: disable=broad-except
+                logging.warning('Exception when checking if file %s exists: '
+                                '%s', dest, exists_e)
+            if should_report:
+              logging.warning(('Exception in _rename_batch. src: %s, '
+                               'dest: %s, err: %s'), src, dest, exception)
+              exceptions.append(exception)
+          else:
+            logging.debug('Rename successful: %s -> %s', src, dest)
+        return exceptions
 
     exception_batches = util.run_using_threadpool(
-        _rename_batch, batches, num_threads)
+        _rename_batch, zip(source_file_batch, destination_file_batch),
+        num_threads)
 
-    all_exceptions = []
-    for exceptions in exception_batches:
-      if exceptions:
-        all_exceptions += exceptions
+    all_exceptions = [e for exception_batch in exception_batches
+                      for e in exception_batch]
     if all_exceptions:
       raise Exception('Encountered exceptions in finalize_write: %s',
                       all_exceptions)
 
-    for shard, final_name in rename_ops:
+    for final_name in destination_files:
       yield final_name
 
     logging.info('Renamed %d shards in %.2f seconds.', num_shards,
                  time.time() - start_time)
 
     try:
-      ChannelFactory.rmdir(init_result)
+      self._file_system.delete([init_result])
     except IOError:
       # May have already been removed.
       pass

http://git-wip-us.apache.org/repos/asf/beam/blob/1bc240bc/sdks/python/apache_beam/io/fileio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio_test.py b/sdks/python/apache_beam/io/fileio_test.py
index a963c67..6b7437d 100644
--- a/sdks/python/apache_beam/io/fileio_test.py
+++ b/sdks/python/apache_beam/io/fileio_test.py
@@ -16,7 +16,7 @@
 # limitations under the License.
 #
 
-"""Unit tests for local and GCS sources and sinks."""
+"""Unit tests for file sinks."""
 
 import glob
 import logging
@@ -26,60 +26,16 @@ import tempfile
 import unittest
 
 import hamcrest as hc
-import mock
 
 import apache_beam as beam
 from apache_beam import coders
 from apache_beam.io import fileio
+from apache_beam.io.filesystem import CompressedFile
 from apache_beam.test_pipeline import TestPipeline
 from apache_beam.transforms.display import DisplayData
 from apache_beam.transforms.display_test import DisplayDataItemMatcher
 
 
-class TestChannelFactory(unittest.TestCase):
-
-  @mock.patch('apache_beam.io.fileio.gcsio')
-  def test_size_of_files_in_glob_complete(self, *unused_args):
-    # Prepare mocks.
-    gcsio_mock = mock.MagicMock()
-    fileio.gcsio.GcsIO = lambda: gcsio_mock
-    file_names = ['gs://bucket/file1', 'gs://bucket/file2']
-    gcsio_mock.size_of_files_in_glob.return_value = {
-        'gs://bucket/file1': 1,
-        'gs://bucket/file2': 2
-    }
-    expected_results = {
-        'gs://bucket/file1': 1,
-        'gs://bucket/file2': 2
-    }
-    self.assertEqual(
-        fileio.ChannelFactory.size_of_files_in_glob(
-            'gs://bucket/*', file_names),
-        expected_results)
-    gcsio_mock.size_of_files_in_glob.assert_called_once_with('gs://bucket/*')
-
-  @mock.patch('apache_beam.io.fileio.gcsio')
-  def test_size_of_files_in_glob_incomplete(self, *unused_args):
-    # Prepare mocks.
-    gcsio_mock = mock.MagicMock()
-    fileio.gcsio.GcsIO = lambda: gcsio_mock
-    file_names = ['gs://bucket/file1', 'gs://bucket/file2']
-    gcsio_mock.size_of_files_in_glob.return_value = {
-        'gs://bucket/file1': 1
-    }
-    gcsio_mock.size.return_value = 2
-    expected_results = {
-        'gs://bucket/file1': 1,
-        'gs://bucket/file2': 2
-    }
-    self.assertEqual(
-        fileio.ChannelFactory.size_of_files_in_glob(
-            'gs://bucket/*', file_names),
-        expected_results)
-    gcsio_mock.size_of_files_in_glob.assert_called_once_with('gs://bucket/*')
-    gcsio_mock.size.assert_called_once_with('gs://bucket/file2')
-
-
 # TODO: Refactor code so all io tests are using same library
 # TestCaseWithTempDirCleanup class.
 class _TestCaseWithTempDirCleanUp(unittest.TestCase):
@@ -115,16 +71,16 @@ class _TestCaseWithTempDirCleanUp(unittest.TestCase):
 class TestCompressedFile(_TestCaseWithTempDirCleanUp):
 
   def test_seekable(self):
-    readable = fileio._CompressedFile(open(self._create_temp_file(), 'r'))
+    readable = CompressedFile(open(self._create_temp_file(), 'r'))
     self.assertFalse(readable.seekable)
 
-    writeable = fileio._CompressedFile(open(self._create_temp_file(), 'w'))
+    writeable = CompressedFile(open(self._create_temp_file(), 'w'))
     self.assertFalse(writeable.seekable)
 
   def test_tell(self):
     lines = ['line%d\n' % i for i in range(10)]
     tmpfile = self._create_temp_file()
-    writeable = fileio._CompressedFile(open(tmpfile, 'w'))
+    writeable = CompressedFile(open(tmpfile, 'w'))
     current_offset = 0
     for line in lines:
       writeable.write(line)
@@ -132,7 +88,7 @@ class TestCompressedFile(_TestCaseWithTempDirCleanUp):
       self.assertEqual(current_offset, writeable.tell())
 
     writeable.close()
-    readable = fileio._CompressedFile(open(tmpfile))
+    readable = CompressedFile(open(tmpfile))
     current_offset = 0
     while True:
       line = readable.readline()
@@ -300,52 +256,6 @@ class TestFileSink(_TestCaseWithTempDirCleanUp):
     with self.assertRaises(Exception):
       list(sink.finalize_write(init_token, [res1, res2]))
 
-  @mock.patch('apache_beam.io.fileio.ChannelFactory.rename')
-  @mock.patch('apache_beam.io.fileio.gcsio')
-  def test_rename_batch(self, *unused_args):
-    # Prepare mocks.
-    gcsio_mock = mock.MagicMock()
-    fileio.gcsio.GcsIO = lambda: gcsio_mock
-    fileio.ChannelFactory.rename = mock.MagicMock()
-    to_rename = [
-        ('gs://bucket/from1', 'gs://bucket/to1'),
-        ('gs://bucket/from2', 'gs://bucket/to2'),
-        ('/local/from1', '/local/to1'),
-        ('gs://bucket/from3', 'gs://bucket/to3'),
-        ('/local/from2', '/local/to2'),
-    ]
-    gcsio_mock.copy_batch.side_effect = [[
-        ('gs://bucket/from1', 'gs://bucket/to1', None),
-        ('gs://bucket/from2', 'gs://bucket/to2', None),
-        ('gs://bucket/from3', 'gs://bucket/to3', None),
-    ]]
-    gcsio_mock.delete_batch.side_effect = [[
-        ('gs://bucket/from1', None),
-        ('gs://bucket/from2', None),
-        ('gs://bucket/from3', None),
-    ]]
-
-    # Issue batch rename.
-    fileio.ChannelFactory.rename_batch(to_rename)
-
-    # Verify mocks.
-    expected_local_rename_calls = [
-        mock.call('/local/from1', '/local/to1'),
-        mock.call('/local/from2', '/local/to2'),
-    ]
-    self.assertEqual(fileio.ChannelFactory.rename.call_args_list,
-                     expected_local_rename_calls)
-    gcsio_mock.copy_batch.assert_called_once_with([
-        ('gs://bucket/from1', 'gs://bucket/to1'),
-        ('gs://bucket/from2', 'gs://bucket/to2'),
-        ('gs://bucket/from3', 'gs://bucket/to3'),
-    ])
-    gcsio_mock.delete_batch.assert_called_once_with([
-        'gs://bucket/from1',
-        'gs://bucket/from2',
-        'gs://bucket/from3',
-    ])
-
 
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)

http://git-wip-us.apache.org/repos/asf/beam/blob/1bc240bc/sdks/python/apache_beam/io/filesystem.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filesystem.py b/sdks/python/apache_beam/io/filesystem.py
new file mode 100644
index 0000000..14493c0
--- /dev/null
+++ b/sdks/python/apache_beam/io/filesystem.py
@@ -0,0 +1,439 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""File system abstraction for file-based sources and sinks."""
+
+from __future__ import absolute_import
+
+import abc
+import bz2
+import cStringIO
+import os
+import zlib
+
+
+DEFAULT_READ_BUFFER_SIZE = 16 * 1024 * 1024
+
+
+class CompressionTypes(object):
+  """Enum-like class representing known compression types."""
+
+  # Detect compression based on filename extension.
+  #
+  # The following extensions are currently recognized by auto-detection:
+  #   .bz2 (implies BZIP2 as described below).
+  #   .gz  (implies GZIP as described below)
+  # Any non-recognized extension implies UNCOMPRESSED as described below.
+  AUTO = 'auto'
+
+  # BZIP2 compression.
+  BZIP2 = 'bzip2'
+
+  # GZIP compression (deflate with GZIP headers).
+  GZIP = 'gzip'
+
+  # Uncompressed (i.e., may be split).
+  UNCOMPRESSED = 'uncompressed'
+
+  @classmethod
+  def is_valid_compression_type(cls, compression_type):
+    """Returns True for valid compression types, False otherwise."""
+    types = set([
+        CompressionTypes.AUTO,
+        CompressionTypes.BZIP2,
+        CompressionTypes.GZIP,
+        CompressionTypes.UNCOMPRESSED
+    ])
+    return compression_type in types
+
+  @classmethod
+  def mime_type(cls, compression_type, default='application/octet-stream'):
+    mime_types_by_compression_type = {
+        cls.BZIP2: 'application/x-bz2',
+        cls.GZIP: 'application/x-gzip',
+    }
+    return mime_types_by_compression_type.get(compression_type, default)
+
+  @classmethod
+  def detect_compression_type(cls, file_path):
+    """Returns the compression type of a file (based on its suffix)."""
+    compression_types_by_suffix = {'.bz2': cls.BZIP2, '.gz': cls.GZIP}
+    lowercased_path = file_path.lower()
+    for suffix, compression_type in compression_types_by_suffix.iteritems():
+      if lowercased_path.endswith(suffix):
+        return compression_type
+    return cls.UNCOMPRESSED
+
+
+class CompressedFile(object):
+  """Somewhat limited file wrapper for easier handling of compressed files."""
+
+  # The bit mask to use for the wbits parameters of the zlib compressor and
+  # decompressor objects.
+  _gzip_mask = zlib.MAX_WBITS | 16  # Mask when using GZIP headers.
+
+  def __init__(self,
+               fileobj,
+               compression_type=CompressionTypes.GZIP,
+               read_size=DEFAULT_READ_BUFFER_SIZE):
+    if not fileobj:
+      raise ValueError('File object must not be None')
+
+    if not CompressionTypes.is_valid_compression_type(compression_type):
+      raise TypeError('compression_type must be CompressionType object but '
+                      'was %s' % type(compression_type))
+    if compression_type in (CompressionTypes.AUTO, CompressionTypes.UNCOMPRESSED
+                           ):
+      raise ValueError(
+          'Cannot create object with unspecified or no compression')
+
+    self._file = fileobj
+    self._compression_type = compression_type
+
+    if self._file.tell() != 0:
+      raise ValueError('File object must be at position 0 but was %d' %
+                       self._file.tell())
+    self._uncompressed_position = 0
+
+    if self.readable():
+      self._read_size = read_size
+      self._read_buffer = cStringIO.StringIO()
+      self._read_position = 0
+      self._read_eof = False
+
+      if self._compression_type == CompressionTypes.BZIP2:
+        self._decompressor = bz2.BZ2Decompressor()
+      else:
+        assert self._compression_type == CompressionTypes.GZIP
+        self._decompressor = zlib.decompressobj(self._gzip_mask)
+    else:
+      self._decompressor = None
+
+    if self.writeable():
+      if self._compression_type == CompressionTypes.BZIP2:
+        self._compressor = bz2.BZ2Compressor()
+      else:
+        assert self._compression_type == CompressionTypes.GZIP
+        self._compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION,
+                                            zlib.DEFLATED, self._gzip_mask)
+    else:
+      self._compressor = None
+
+  def readable(self):
+    mode = self._file.mode
+    return 'r' in mode or 'a' in mode
+
+  def writeable(self):
+    mode = self._file.mode
+    return 'w' in mode or 'a' in mode
+
+  def write(self, data):
+    """Write data to file."""
+    if not self._compressor:
+      raise ValueError('compressor not initialized')
+    self._uncompressed_position += len(data)
+    compressed = self._compressor.compress(data)
+    if compressed:
+      self._file.write(compressed)
+
+  def _fetch_to_internal_buffer(self, num_bytes):
+    """Fetch up to num_bytes into the internal buffer."""
+    if (not self._read_eof and self._read_position > 0 and
+        (self._read_buffer.tell() - self._read_position) < num_bytes):
+      # There aren't enough number of bytes to accommodate a read, so we
+      # prepare for a possibly large read by clearing up all internal buffers
+      # but without dropping any previous held data.
+      self._read_buffer.seek(self._read_position)
+      data = self._read_buffer.read()
+      self._read_position = 0
+      self._read_buffer.seek(0)
+      self._read_buffer.truncate(0)
+      self._read_buffer.write(data)
+
+    while not self._read_eof and (self._read_buffer.tell() - self._read_position
+                                 ) < num_bytes:
+      # Continue reading from the underlying file object until enough bytes are
+      # available, or EOF is reached.
+      buf = self._file.read(self._read_size)
+      if buf:
+        decompressed = self._decompressor.decompress(buf)
+        del buf  # Free up some possibly large and no-longer-needed memory.
+        self._read_buffer.write(decompressed)
+      else:
+        # EOF reached.
+        # Verify completeness and no corruption and flush (if needed by
+        # the underlying algorithm).
+        if self._compression_type == CompressionTypes.BZIP2:
+          # Having unused_data past end of stream would imply file corruption.
+          assert not self._decompressor.unused_data, 'Possible file corruption.'
+          try:
+            # EOF implies that the underlying BZIP2 stream must also have
+            # reached EOF. We expect this to raise an EOFError and we catch it
+            # below. Any other kind of error though would be problematic.
+            self._decompressor.decompress('dummy')
+            assert False, 'Possible file corruption.'
+          except EOFError:
+            pass  # All is as expected!
+        else:
+          self._read_buffer.write(self._decompressor.flush())
+
+        # Record that we have hit the end of file, so we won't unnecessarily
+        # repeat the completeness verification step above.
+        self._read_eof = True
+
+  def _read_from_internal_buffer(self, read_fn):
+    """Read from the internal buffer by using the supplied read_fn."""
+    self._read_buffer.seek(self._read_position)
+    result = read_fn()
+    self._read_position += len(result)
+    self._uncompressed_position += len(result)
+    self._read_buffer.seek(0, os.SEEK_END)  # Allow future writes.
+    return result
+
+  def read(self, num_bytes):
+    if not self._decompressor:
+      raise ValueError('decompressor not initialized')
+
+    self._fetch_to_internal_buffer(num_bytes)
+    return self._read_from_internal_buffer(
+        lambda: self._read_buffer.read(num_bytes))
+
+  def readline(self):
+    """Equivalent to standard file.readline(). Same return conventions apply."""
+    if not self._decompressor:
+      raise ValueError('decompressor not initialized')
+
+    io = cStringIO.StringIO()
+    while True:
+      # Ensure that the internal buffer has at least half the read_size. Going
+      # with half the _read_size (as opposed to a full _read_size) to ensure
+      # that actual fetches are more evenly spread out, as opposed to having 2
+      # consecutive reads at the beginning of a read.
+      self._fetch_to_internal_buffer(self._read_size / 2)
+      line = self._read_from_internal_buffer(
+          lambda: self._read_buffer.readline())
+      io.write(line)
+      if line.endswith('\n') or not line:
+        break  # Newline or EOF reached.
+
+    return io.getvalue()
+
+  def closed(self):
+    return not self._file or self._file.closed()
+
+  def close(self):
+    if self.readable():
+      self._read_buffer.close()
+
+    if self.writeable():
+      self._file.write(self._compressor.flush())
+
+    self._file.close()
+
+  def flush(self):
+    if self.writeable():
+      self._file.write(self._compressor.flush())
+    self._file.flush()
+
+  @property
+  def seekable(self):
+    # TODO: Add support for seeking to a file position.
+    return False
+
+  def tell(self):
+    """Returns current position in uncompressed file."""
+    return self._uncompressed_position
+
+  def __enter__(self):
+    return self
+
+  def __exit__(self, exception_type, exception_value, traceback):
+    self.close()
+
+
+class FileMetadata(object):
+  """Metadata about a file path that is the output of FileSystem.match
+  """
+  def __init__(self, path, size_in_bytes):
+    assert isinstance(path, basestring) and path, "Path should be a string"
+    assert isinstance(size_in_bytes, int) and size_in_bytes >= 0, \
+        "Size of bytes should be greater than equal to zero"
+    self.path = path
+    self.size_in_bytes = size_in_bytes
+
+  def __eq__(self, other):
+    """Note: This is only used in tests where we verify that mock objects match.
+    """
+    return (isinstance(other, FileMetadata) and
+            self.path == other.path and
+            self.size_in_bytes == other.size_in_bytes)
+
+  def __hash__(self):
+    return hash((self.path, self.size_in_bytes))
+
+  def __ne__(self, other):
+    return not self.__eq__(other)
+
+  def __repr__(self):
+    return 'FileMetadata(%s, %s)' % (self.path, self.size_in_bytes)
+
+
+class MatchResult(object):
+  """Result from the ``FileSystem`` match operation which contains the list
+   of matched FileMetadata.
+  """
+  def __init__(self, pattern, metadata_list):
+    self.metadata_list = metadata_list
+    self.pattern = pattern
+
+
+class BeamIOError(IOError):
+  def __init__(self, msg, exception_details=None):
+    """Class representing the errors thrown in the batch file operations.
+    Args:
+      msg: Message string for the exception thrown
+      exception_details: Optional map of individual input to exception for
+        failed operations in batch. This parameter is optional so if specified
+        the user can assume that the all errors in the filesystem operation
+        have been reported. When the details are missing then the operation
+        may have failed anywhere so the user should use match to determine
+        the current state of the system.
+    """
+    super(BeamIOError, self).__init__(msg)
+    self.exception_details = exception_details
+
+
+class FileSystem(object):
+  """A class that defines the functions that can be performed on a filesystem.
+
+  All methods are abstract and they are for file system providers to
+  implement. Clients should use the FileSystemUtil class to interact with
+  the correct file system based on the provided file pattern scheme.
+  """
+  __metaclass__ = abc.ABCMeta
+
+  @staticmethod
+  def _get_compression_type(path, compression_type):
+    if compression_type == CompressionTypes.AUTO:
+      compression_type = CompressionTypes.detect_compression_type(path)
+    elif not CompressionTypes.is_valid_compression_type(compression_type):
+      raise TypeError('compression_type must be CompressionType object but '
+                      'was %s' % type(compression_type))
+    return compression_type
+
+  @abc.abstractmethod
+  def mkdirs(self, path):
+    """Recursively create directories for the provided path.
+
+    Args:
+      path: string path of the directory structure that should be created
+
+    Raises:
+      IOError if leaf directory already exists.
+    """
+    raise NotImplementedError
+
+  @abc.abstractmethod
+  def match(self, patterns, limits=None):
+    """Find all matching paths to the patterns provided.
+
+    Args:
+      patterns: list of string for the file path pattern to match against
+      limits: list of maximum number of responses that need to be fetched
+
+    Returns: list of ``MatchResult`` objects.
+
+    Raises:
+      ``BeamIOError`` if any of the pattern match operations fail
+    """
+    raise NotImplementedError
+
+  @abc.abstractmethod
+  def create(self, path, mime_type, compression_type):
+    """Returns a write channel for the given file path.
+
+    Args:
+      path: string path of the file object to be written to the system
+      mime_type: MIME type to specify the type of content in the file object
+      compression_type: Type of compression to be used for this object
+
+    Returns: file handle with a close function for the user to use
+    """
+    raise NotImplementedError
+
+  @abc.abstractmethod
+  def open(self, path, mime_type, compression_type):
+    """Returns a read channel for the given file path.
+
+    Args:
+      path: string path of the file object to be written to the system
+      mime_type: MIME type to specify the type of content in the file object
+      compression_type: Type of compression to be used for this object
+
+    Returns: file handle with a close function for the user to use
+    """
+    raise NotImplementedError
+
+  @abc.abstractmethod
+  def copy(self, source_file_names, destination_file_names):
+    """Recursively copy the file tree from the source to the destination
+
+    Args:
+      source_file_names: list of source file objects that needs to be copied
+      destination_file_names: list of destination of the new object
+
+    Raises:
+      ``BeamIOError`` if any of the copy operations fail
+    """
+    raise NotImplementedError
+
+  @abc.abstractmethod
+  def rename(self, source_file_names, destination_file_names):
+    """Rename the files at the source list to the destination list.
+    Source and destination lists should be of the same size.
+
+    Args:
+      source_file_names: List of file paths that need to be moved
+      destination_file_names: List of destination_file_names for the files
+
+    Raises:
+      ``BeamIOError`` if any of the rename operations fail
+    """
+    raise NotImplementedError
+
+  @abc.abstractmethod
+  def exists(self, path):
+    """Check if the provided path exists on the FileSystem.
+
+    Args:
+      path: string path that needs to be checked.
+
+    Returns: boolean flag indicating if path exists
+    """
+    raise NotImplementedError
+
+  @abc.abstractmethod
+  def delete(self, paths):
+    """Deletes files or directories at the provided paths.
+    Directories will be deleted recursively.
+
+    Args:
+      paths: list of paths that give the file objects to be deleted
+
+    Raises:
+      ``BeamIOError`` if any of the delete operations fail
+    """
+    raise NotImplementedError

http://git-wip-us.apache.org/repos/asf/beam/blob/1bc240bc/sdks/python/apache_beam/io/filesystems_util.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filesystems_util.py b/sdks/python/apache_beam/io/filesystems_util.py
new file mode 100644
index 0000000..47c2361
--- /dev/null
+++ b/sdks/python/apache_beam/io/filesystems_util.py
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""Utility functions for getting the correct file systems for a file name"""
+
+from apache_beam.io.localfilesystem import LocalFileSystem
+
+
+# TODO(BEAM-1585): Add a mechanism to add user implemented file systems
+def get_filesystem(path):
+  """Function that returns the FileSystem class to use based on the path
+  provided in the input.
+  """
+  if path.startswith('gs://'):
+    from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem
+    return GCSFileSystem()
+  else:
+    return LocalFileSystem()

http://git-wip-us.apache.org/repos/asf/beam/blob/1bc240bc/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
new file mode 100644
index 0000000..5aef0ab
--- /dev/null
+++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
@@ -0,0 +1,242 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""GCS file system implementation for accessing files on GCS."""
+
+from __future__ import absolute_import
+
+from apache_beam.io.filesystem import BeamIOError
+from apache_beam.io.filesystem import CompressedFile
+from apache_beam.io.filesystem import CompressionTypes
+from apache_beam.io.filesystem import FileMetadata
+from apache_beam.io.filesystem import FileSystem
+from apache_beam.io.filesystem import MatchResult
+from apache_beam.io.gcp import gcsio
+
+
+class GCSFileSystem(FileSystem):
+  """A GCS ``FileSystem`` implementation for accessing files on GCS.
+  """
+
+  def mkdirs(self, path):
+    """Recursively create directories for the provided path.
+
+    Args:
+      path: string path of the directory structure that should be created
+
+    Raises:
+      IOError if leaf directory already exists.
+    """
+    pass
+
+  def match(self, patterns, limits=None):
+    """Find all matching paths to the pattern provided.
+
+    Args:
+      pattern: string for the file path pattern to match against
+      limit: Maximum number of responses that need to be fetched
+
+    Returns: list of ``MatchResult`` objects.
+
+    Raises:
+      ``BeamIOError`` if any of the pattern match operations fail
+    """
+    if limits is None:
+      limits = [None] * len(patterns)
+    else:
+      err_msg = "Patterns and limits should be equal in length"
+      assert len(patterns) == len(limits), err_msg
+
+    def _match(pattern, limit):
+      """Find all matching paths to the pattern provided.
+      """
+      if pattern.endswith('/'):
+        pattern += '*'
+      file_sizes = gcsio.GcsIO().size_of_files_in_glob(pattern)
+      metadata_list = [FileMetadata(path, size)
+                       for path, size in file_sizes.iteritems()]
+      return MatchResult(pattern, metadata_list)
+
+    exceptions = {}
+    result = []
+    for pattern, limit in zip(patterns, limits):
+      try:
+        result.append(_match(pattern, limit))
+      except Exception as e:  # pylint: disable=broad-except
+        exceptions[pattern] = e
+
+    if exceptions:
+      raise BeamIOError("Match operation failed", exceptions)
+    return result
+
+  def _path_open(self, path, mode, mime_type='application/octet-stream',
+                 compression_type=CompressionTypes.AUTO):
+    """Helper functions to open a file in the provided mode.
+    """
+    compression_type = FileSystem._get_compression_type(path, compression_type)
+    mime_type = CompressionTypes.mime_type(compression_type, mime_type)
+    raw_file = gcsio.GcsIO().open(path, mode, mime_type=mime_type)
+    if compression_type == CompressionTypes.UNCOMPRESSED:
+      return raw_file
+    else:
+      return CompressedFile(raw_file, compression_type=compression_type)
+
+  def create(self, path, mime_type='application/octet-stream',
+             compression_type=CompressionTypes.AUTO):
+    """Returns a write channel for the given file path.
+
+    Args:
+      path: string path of the file object to be written to the system
+      mime_type: MIME type to specify the type of content in the file object
+      compression_type: Type of compression to be used for this object
+
+    Returns: file handle with a close function for the user to use
+    """
+    return self._path_open(path, 'wb', mime_type, compression_type)
+
+  def open(self, path, mime_type='application/octet-stream',
+           compression_type=CompressionTypes.AUTO):
+    """Returns a read channel for the given file path.
+
+    Args:
+      path: string path of the file object to be written to the system
+      mime_type: MIME type to specify the type of content in the file object
+      compression_type: Type of compression to be used for this object
+
+    Returns: file handle with a close function for the user to use
+    """
+    return self._path_open(path, 'rb', mime_type, compression_type)
+
+  def copy(self, source_file_names, destination_file_names):
+    """Recursively copy the file tree from the source to the destination
+
+    Args:
+      source_file_names: list of source file objects that needs to be copied
+      destination_file_names: list of destination of the new object
+
+    Raises:
+      ``BeamIOError`` if any of the copy operations fail
+    """
+    err_msg = ("source_file_names and destination_file_names should "
+               "be equal in length")
+    assert len(source_file_names) == len(destination_file_names), err_msg
+
+    def _copy_path(source, destination):
+      """Recursively copy the file tree from the source to the destination
+      """
+      if not destination.startswith('gs://'):
+        raise ValueError('Destination %r must be GCS path.', destination)
+      # Use copy_tree if the path ends with / as it is a directory
+      if source.endswith('/'):
+        gcsio.GcsIO().copytree(source, destination)
+      else:
+        gcsio.GcsIO().copy(source, destination)
+
+    exceptions = {}
+    for source, destination in zip(source_file_names, destination_file_names):
+      try:
+        _copy_path(source, destination)
+      except Exception as e:  # pylint: disable=broad-except
+        exceptions[(source, destination)] = e
+
+    if exceptions:
+      raise BeamIOError("Copy operation failed", exceptions)
+
+  def rename(self, source_file_names, destination_file_names):
+    """Rename the files at the source list to the destination list.
+    Source and destination lists should be of the same size.
+
+    Args:
+      source_file_names: List of file paths that need to be moved
+      destination_file_names: List of destination_file_names for the files
+
+    Raises:
+      ``BeamIOError`` if any of the rename operations fail
+    """
+    err_msg = ("source_file_names and destination_file_names should "
+               "be equal in length")
+    assert len(source_file_names) == len(destination_file_names), err_msg
+
+    gcs_batches = []
+    gcs_current_batch = []
+    for src, dest in zip(source_file_names, destination_file_names):
+      gcs_current_batch.append((src, dest))
+      if len(gcs_current_batch) == gcsio.MAX_BATCH_OPERATION_SIZE:
+        gcs_batches.append(gcs_current_batch)
+        gcs_current_batch = []
+    if gcs_current_batch:
+      gcs_batches.append(gcs_current_batch)
+
+    # Execute GCS renames if any and return exceptions.
+    exceptions = {}
+    for batch in gcs_batches:
+      copy_statuses = gcsio.GcsIO().copy_batch(batch)
+      copy_succeeded = []
+      for src, dest, exception in copy_statuses:
+        if exception:
+          exceptions[(src, dest)] = exception
+        else:
+          copy_succeeded.append((src, dest))
+      delete_batch = [src for src, dest in copy_succeeded]
+      delete_statuses = gcsio.GcsIO().delete_batch(delete_batch)
+      for i, (src, exception) in enumerate(delete_statuses):
+        dest = copy_succeeded[i][1]
+        if exception:
+          exceptions[(src, dest)] = exception
+
+    if exceptions:
+      raise BeamIOError("Rename operation failed", exceptions)
+
+  def exists(self, path):
+    """Check if the provided path exists on the FileSystem.
+
+    Args:
+      path: string path that needs to be checked.
+
+    Returns: boolean flag indicating if path exists
+    """
+    return gcsio.GcsIO().exists(path)
+
+  def delete(self, paths):
+    """Deletes files or directories at the provided paths.
+    Directories will be deleted recursively.
+
+    Args:
+      paths: list of paths that give the file objects to be deleted
+    """
+    def _delete_path(path):
+      """Recursively delete the file or directory at the provided path.
+      """
+      if path.endswith('/'):
+        path_to_use = path + '*'
+      else:
+        path_to_use = path
+      match_result = self.match([path_to_use])[0]
+      statuses = gcsio.GcsIO().delete_batch(
+          [m.path for m in match_result.metadata_list])
+      failures = [e for (_, e) in statuses if e is not None]
+      if failures:
+        raise failures[0]
+
+    exceptions = {}
+    for path in paths:
+      try:
+        _delete_path(path)
+      except Exception as e:  # pylint: disable=broad-except
+        exceptions[path] = e
+
+    if exceptions:
+      raise BeamIOError("Delete operation failed", exceptions)

http://git-wip-us.apache.org/repos/asf/beam/blob/1bc240bc/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
new file mode 100644
index 0000000..3fe5cce
--- /dev/null
+++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
@@ -0,0 +1,293 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for GCS File System."""
+
+import unittest
+
+import mock
+from apache_beam.io.filesystem import BeamIOError
+from apache_beam.io.filesystem import FileMetadata
+
+# Protect against environments where apitools library is not available.
+# pylint: disable=wrong-import-order, wrong-import-position
+try:
+  from apache_beam.io.gcp import gcsfilesystem
+except ImportError:
+  gcsfilesystem = None
+# pylint: enable=wrong-import-order, wrong-import-position
+
+
+@unittest.skipIf(gcsfilesystem is None, 'GCP dependencies are not installed')
+class GCSFileSystemTest(unittest.TestCase):
+
+  @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio')
+  def test_match_multiples(self, mock_gcsio):
+    # Prepare mocks.
+    gcsio_mock = mock.MagicMock()
+    gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock
+    gcsio_mock.size_of_files_in_glob.return_value = {
+        'gs://bucket/file1': 1,
+        'gs://bucket/file2': 2
+    }
+    expected_results = set([
+        FileMetadata('gs://bucket/file1', 1),
+        FileMetadata('gs://bucket/file2', 2)
+    ])
+    file_system = gcsfilesystem.GCSFileSystem()
+    match_result = file_system.match(['gs://bucket/'])[0]
+    self.assertEqual(
+        set(match_result.metadata_list),
+        expected_results)
+    gcsio_mock.size_of_files_in_glob.assert_called_once_with('gs://bucket/*')
+
+  @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio')
+  def test_match_multiples_error(self, mock_gcsio):
+    # Prepare mocks.
+    gcsio_mock = mock.MagicMock()
+    gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock
+    exception = IOError('Failed')
+    gcsio_mock.size_of_files_in_glob.side_effect = exception
+    expected_results = {'gs://bucket/': exception}
+
+    file_system = gcsfilesystem.GCSFileSystem()
+    with self.assertRaises(BeamIOError) as error:
+      file_system.match(['gs://bucket/'])
+    self.assertEqual(error.exception.message, 'Match operation failed')
+    self.assertEqual(error.exception.exception_details, expected_results)
+    gcsio_mock.size_of_files_in_glob.assert_called_once_with('gs://bucket/*')
+
+  @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio')
+  def test_match_multiple_patterns(self, mock_gcsio):
+    # Prepare mocks.
+    gcsio_mock = mock.MagicMock()
+    gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock
+    gcsio_mock.size_of_files_in_glob.side_effect = [
+        {'gs://bucket/file1': 1},
+        {'gs://bucket/file2': 2},
+    ]
+    expected_results = [
+        [FileMetadata('gs://bucket/file1', 1)],
+        [FileMetadata('gs://bucket/file2', 2)]
+    ]
+    file_system = gcsfilesystem.GCSFileSystem()
+    result = file_system.match(['gs://bucket/file1*', 'gs://bucket/file2*'])
+    self.assertEqual(
+        [mr.metadata_list for mr in result],
+        expected_results)
+
+  @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio')
+  def test_create(self, mock_gcsio):
+    # Prepare mocks.
+    gcsio_mock = mock.MagicMock()
+    gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock
+    # Issue file copy
+    file_system = gcsfilesystem.GCSFileSystem()
+    _ = file_system.create('gs://bucket/from1', 'application/octet-stream')
+
+    gcsio_mock.open.assert_called_once_with(
+        'gs://bucket/from1', 'wb', mime_type='application/octet-stream')
+
+  @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio')
+  def test_open(self, mock_gcsio):
+    # Prepare mocks.
+    gcsio_mock = mock.MagicMock()
+    gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock
+    # Issue file copy
+    file_system = gcsfilesystem.GCSFileSystem()
+    _ = file_system.open('gs://bucket/from1', 'application/octet-stream')
+
+    gcsio_mock.open.assert_called_once_with(
+        'gs://bucket/from1', 'rb', mime_type='application/octet-stream')
+
+  @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio')
+  def test_copy_file(self, mock_gcsio):
+    # Prepare mocks.
+    gcsio_mock = mock.MagicMock()
+    gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock
+    sources = ['gs://bucket/from1']
+    destinations = ['gs://bucket/to1']
+
+    # Issue file copy
+    file_system = gcsfilesystem.GCSFileSystem()
+    file_system.copy(sources, destinations)
+
+    gcsio_mock.copy.assert_called_once_with(
+        'gs://bucket/from1', 'gs://bucket/to1')
+
+  @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio')
+  def test_copy_file_error(self, mock_gcsio):
+    # Prepare mocks.
+    gcsio_mock = mock.MagicMock()
+    gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock
+    sources = ['gs://bucket/from1']
+    destinations = ['gs://bucket/to1']
+
+    exception = IOError('Failed')
+    gcsio_mock.copy.side_effect = exception
+
+    # Issue batch rename.
+    expected_results = {(s, d):exception for s, d in zip(sources, destinations)}
+
+    # Issue batch copy.
+    file_system = gcsfilesystem.GCSFileSystem()
+    with self.assertRaises(BeamIOError) as error:
+      file_system.copy(sources, destinations)
+    self.assertEqual(error.exception.message, 'Copy operation failed')
+    self.assertEqual(error.exception.exception_details, expected_results)
+
+    gcsio_mock.copy.assert_called_once_with(
+        'gs://bucket/from1', 'gs://bucket/to1')
+
+  @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio')
+  def test_copy_tree(self, mock_gcsio):
+    # Prepare mocks.
+    gcsio_mock = mock.MagicMock()
+    gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock
+    sources = ['gs://bucket1/']
+    destinations = ['gs://bucket2/']
+
+    # Issue directory copy
+    file_system = gcsfilesystem.GCSFileSystem()
+    file_system.copy(sources, destinations)
+
+    gcsio_mock.copytree.assert_called_once_with(
+        'gs://bucket1/', 'gs://bucket2/')
+
+  @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio')
+  def test_rename(self, mock_gcsio):
+    # Prepare mocks.
+    gcsio_mock = mock.MagicMock()
+    gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock
+    sources = [
+        'gs://bucket/from1',
+        'gs://bucket/from2',
+        'gs://bucket/from3',
+    ]
+    destinations = [
+        'gs://bucket/to1',
+        'gs://bucket/to2',
+        'gs://bucket/to3',
+    ]
+    gcsio_mock.copy_batch.side_effect = [[
+        ('gs://bucket/from1', 'gs://bucket/to1', None),
+        ('gs://bucket/from2', 'gs://bucket/to2', None),
+        ('gs://bucket/from3', 'gs://bucket/to3', None),
+    ]]
+    gcsio_mock.delete_batch.side_effect = [[
+        ('gs://bucket/from1', None),
+        ('gs://bucket/from2', None),
+        ('gs://bucket/from3', None),
+    ]]
+
+    # Issue batch rename.
+    file_system = gcsfilesystem.GCSFileSystem()
+    file_system.rename(sources, destinations)
+
+    gcsio_mock.copy_batch.assert_called_once_with([
+        ('gs://bucket/from1', 'gs://bucket/to1'),
+        ('gs://bucket/from2', 'gs://bucket/to2'),
+        ('gs://bucket/from3', 'gs://bucket/to3'),
+    ])
+    gcsio_mock.delete_batch.assert_called_once_with([
+        'gs://bucket/from1',
+        'gs://bucket/from2',
+        'gs://bucket/from3',
+    ])
+
+  @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio')
+  def test_rename_error(self, mock_gcsio):
+    # Prepare mocks.
+    gcsio_mock = mock.MagicMock()
+    gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock
+    sources = [
+        'gs://bucket/from1',
+        'gs://bucket/from2',
+        'gs://bucket/from3',
+    ]
+    destinations = [
+        'gs://bucket/to1',
+        'gs://bucket/to2',
+        'gs://bucket/to3',
+    ]
+    exception = IOError('Failed')
+    gcsio_mock.delete_batch.side_effect = [[(f, exception) for f in sources]]
+    gcsio_mock.copy_batch.side_effect = [[
+        ('gs://bucket/from1', 'gs://bucket/to1', None),
+        ('gs://bucket/from2', 'gs://bucket/to2', None),
+        ('gs://bucket/from3', 'gs://bucket/to3', None),
+    ]]
+
+    # Issue batch rename.
+    expected_results = {(s, d):exception for s, d in zip(sources, destinations)}
+
+    # Issue batch rename.
+    file_system = gcsfilesystem.GCSFileSystem()
+    with self.assertRaises(BeamIOError) as error:
+      file_system.rename(sources, destinations)
+    self.assertEqual(error.exception.message, 'Rename operation failed')
+    self.assertEqual(error.exception.exception_details, expected_results)
+
+    gcsio_mock.copy_batch.assert_called_once_with([
+        ('gs://bucket/from1', 'gs://bucket/to1'),
+        ('gs://bucket/from2', 'gs://bucket/to2'),
+        ('gs://bucket/from3', 'gs://bucket/to3'),
+    ])
+    gcsio_mock.delete_batch.assert_called_once_with([
+        'gs://bucket/from1',
+        'gs://bucket/from2',
+        'gs://bucket/from3',
+    ])
+
+  @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio')
+  def test_delete(self, mock_gcsio):
+    # Prepare mocks.
+    gcsio_mock = mock.MagicMock()
+    gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock
+    files = [
+        'gs://bucket/from1',
+        'gs://bucket/from2',
+        'gs://bucket/from3',
+    ]
+
+    # Issue batch delete.
+    file_system = gcsfilesystem.GCSFileSystem()
+    file_system.delete(files)
+    gcsio_mock.delete_batch.assert_called()
+
+  @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio')
+  def test_delete_error(self, mock_gcsio):
+    # Prepare mocks.
+    gcsio_mock = mock.MagicMock()
+    gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock
+    exception = IOError('Failed')
+    gcsio_mock.delete_batch.side_effect = exception
+    files = [
+        'gs://bucket/from1',
+        'gs://bucket/from2',
+        'gs://bucket/from3',
+    ]
+    expected_results = {f:exception for f in files}
+
+    # Issue batch delete.
+    file_system = gcsfilesystem.GCSFileSystem()
+    with self.assertRaises(BeamIOError) as error:
+      file_system.delete(files)
+    self.assertEqual(error.exception.message, 'Delete operation failed')
+    self.assertEqual(error.exception.exception_details, expected_results)
+    gcsio_mock.delete_batch.assert_called()

http://git-wip-us.apache.org/repos/asf/beam/blob/1bc240bc/sdks/python/apache_beam/io/gcp/gcsio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py
index 020c38f..285e272 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio.py
@@ -31,20 +31,20 @@ import re
 import threading
 import traceback
 
-import apitools.base.py.transfer as transfer
-from apitools.base.py.batch import BatchApiRequest
-from apitools.base.py.exceptions import HttpError
-
-from apache_beam.internal.gcp import auth
 from apache_beam.utils import retry
 
 # Issue a friendlier error message if the storage library is not available.
 # TODO(silviuc): Remove this guard when storage is available everywhere.
 try:
   # pylint: disable=wrong-import-order, wrong-import-position
+  # pylint: disable=ungrouped-imports
+  import apitools.base.py.transfer as transfer
+  from apitools.base.py.batch import BatchApiRequest
+  from apitools.base.py.exceptions import HttpError
+  from apache_beam.internal.gcp import auth
   from apache_beam.io.gcp.internal.clients import storage
 except ImportError:
-  raise RuntimeError(
+  raise ImportError(
       'Google Cloud Storage I/O not supported for this execution environment '
       '(could not import storage API client).')
 

http://git-wip-us.apache.org/repos/asf/beam/blob/1bc240bc/sdks/python/apache_beam/io/gcp/gcsio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/gcsio_test.py b/sdks/python/apache_beam/io/gcp/gcsio_test.py
index a852689..c028f0d 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio_test.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio_test.py
@@ -28,12 +28,11 @@ import unittest
 import httplib2
 import mock
 
-from apache_beam.io import gcsio
-from apache_beam.io.gcp.internal.clients import storage
-
 # Protect against environments where apitools library is not available.
 # pylint: disable=wrong-import-order, wrong-import-position
 try:
+  from apache_beam.io.gcp import gcsio
+  from apache_beam.io.gcp.internal.clients import storage
   from apitools.base.py.exceptions import HttpError
 except ImportError:
   HttpError = None
@@ -295,7 +294,7 @@ class TestGCSIO(unittest.TestCase):
     self.assertFalse(
         gcsio.parse_gcs_path(file_name) in self.client.objects.files)
 
-  @mock.patch('apache_beam.io.gcsio.BatchApiRequest')
+  @mock.patch('apache_beam.io.gcp.gcsio.BatchApiRequest')
   def test_delete_batch(self, *unused_args):
     gcsio.BatchApiRequest = FakeBatchApiRequest
     file_name_pattern = 'gs://gcsio-test/delete_me_%d'
@@ -346,7 +345,7 @@ class TestGCSIO(unittest.TestCase):
     self.assertRaises(IOError, self.gcs.copy, 'gs://gcsio-test/non-existent',
                       'gs://gcsio-test/non-existent-destination')
 
-  @mock.patch('apache_beam.io.gcsio.BatchApiRequest')
+  @mock.patch('apache_beam.io.gcp.gcsio.BatchApiRequest')
   def test_copy_batch(self, *unused_args):
     gcsio.BatchApiRequest = FakeBatchApiRequest
     from_name_pattern = 'gs://gcsio-test/copy_me_%d'

http://git-wip-us.apache.org/repos/asf/beam/blob/1bc240bc/sdks/python/apache_beam/io/iobase.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py
index 057f853..512824b 100644
--- a/sdks/python/apache_beam/io/iobase.py
+++ b/sdks/python/apache_beam/io/iobase.py
@@ -591,8 +591,8 @@ class Sink(HasDisplayData):
   single record from the bundle and ``close()`` which is called once
   at the end of writing a bundle.
 
-  See also ``beam.io.fileio.FileSink`` which provides a simpler API for writing
-  sinks that produce files.
+  See also ``apache_beam.io.fileio.FileSink`` which provides a simpler API
+  for writing sinks that produce files.
 
   **Execution of the Write transform**
 


[3/3] beam git commit: This closes #2136

Posted by ch...@apache.org.
This closes #2136


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/dd0f8d98
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/dd0f8d98
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/dd0f8d98

Branch: refs/heads/master
Commit: dd0f8d984aa79c08867ae9f5e54ce48179499e19
Parents: 7c7bb82 1bc240b
Author: Chamikara Jayalath <ch...@google.com>
Authored: Wed Mar 22 14:28:53 2017 -0700
Committer: Chamikara Jayalath <ch...@google.com>
Committed: Wed Mar 22 14:28:53 2017 -0700

----------------------------------------------------------------------
 .../apache_beam/examples/snippets/snippets.py   |   2 +-
 sdks/python/apache_beam/io/avroio.py            |   3 +-
 sdks/python/apache_beam/io/filebasedsource.py   |  83 +--
 .../apache_beam/io/filebasedsource_test.py      |  18 +-
 sdks/python/apache_beam/io/fileio.py            | 588 +++----------------
 sdks/python/apache_beam/io/fileio_test.py       | 102 +---
 sdks/python/apache_beam/io/filesystem.py        | 439 ++++++++++++++
 sdks/python/apache_beam/io/filesystems_util.py  |  31 +
 sdks/python/apache_beam/io/gcp/gcsfilesystem.py | 242 ++++++++
 .../apache_beam/io/gcp/gcsfilesystem_test.py    | 293 +++++++++
 sdks/python/apache_beam/io/gcp/gcsio.py         |  12 +-
 sdks/python/apache_beam/io/gcp/gcsio_test.py    |   9 +-
 sdks/python/apache_beam/io/iobase.py            |   4 +-
 sdks/python/apache_beam/io/localfilesystem.py   | 236 ++++++++
 .../apache_beam/io/localfilesystem_test.py      | 185 ++++++
 sdks/python/apache_beam/io/textio.py            |   7 +-
 sdks/python/apache_beam/io/textio_test.py       |   2 +-
 sdks/python/apache_beam/io/tfrecordio.py        |   5 +-
 sdks/python/apache_beam/io/tfrecordio_test.py   |  20 +-
 .../apache_beam/tests/pipeline_verifiers.py     |   8 +-
 .../tests/pipeline_verifiers_test.py            |  26 +-
 21 files changed, 1613 insertions(+), 702 deletions(-)
----------------------------------------------------------------------