You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2017/05/05 21:23:11 UTC
[1/2] beam git commit: [BEAM-1585] Filesystems should be picked using
the provided scheme
Repository: beam
Updated Branches:
refs/heads/master 50262be1f -> 1ecb111d3
[BEAM-1585] Filesystems should be picked using the provided scheme
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/cbe182ee
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/cbe182ee
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/cbe182ee
Branch: refs/heads/master
Commit: cbe182eee203c92073a25bbcd39f86f8440a5fc2
Parents: 50262be
Author: Sourabh Bajaj <so...@google.com>
Authored: Fri May 5 11:00:58 2017 -0700
Committer: chamikara@google.com <ch...@google.com>
Committed: Fri May 5 14:22:49 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/io/fileio_test.py | 3 +-
sdks/python/apache_beam/io/filesystem.py | 16 +++++++++
sdks/python/apache_beam/io/filesystems.py | 36 ++++++++++++++++++--
sdks/python/apache_beam/io/filesystems_test.py | 15 +++++++-
sdks/python/apache_beam/io/filesystems_util.py | 36 --------------------
sdks/python/apache_beam/io/gcp/gcsfilesystem.py | 6 ++++
.../apache_beam/io/gcp/gcsfilesystem_test.py | 5 +++
sdks/python/apache_beam/io/localfilesystem.py | 5 +++
.../apache_beam/io/localfilesystem_test.py | 4 +++
9 files changed, 84 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/cbe182ee/sdks/python/apache_beam/io/fileio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio_test.py b/sdks/python/apache_beam/io/fileio_test.py
index 13778d5..3f7211e 100644
--- a/sdks/python/apache_beam/io/fileio_test.py
+++ b/sdks/python/apache_beam/io/fileio_test.py
@@ -31,7 +31,6 @@ import mock
import apache_beam as beam
from apache_beam import coders
from apache_beam.io import fileio
-from apache_beam.io.filesystem import BeamIOError
from apache_beam.test_pipeline import TestPipeline
from apache_beam.transforms.display import DisplayData
from apache_beam.transforms.display_test import DisplayDataItemMatcher
@@ -216,7 +215,7 @@ class TestFileSink(_TestCaseWithTempDirCleanUp):
self.run_temp_dir_check(
'gs://aaa/bbb', 'gs://aaa/bbb/', 'gs://aaa', 'gs://aaa/', 'gs://',
'/')
- except BeamIOError:
+ except ValueError:
logging.debug('Ignoring test since GCP module is not installed')
@mock.patch('apache_beam.io.localfilesystem.os')
http://git-wip-us.apache.org/repos/asf/beam/blob/cbe182ee/sdks/python/apache_beam/io/filesystem.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filesystem.py b/sdks/python/apache_beam/io/filesystem.py
index ff8af03..3d35f3e 100644
--- a/sdks/python/apache_beam/io/filesystem.py
+++ b/sdks/python/apache_beam/io/filesystem.py
@@ -425,6 +425,22 @@ class FileSystem(object):
'was %s' % type(compression_type))
return compression_type
+ @classmethod
+ def get_all_subclasses(cls):
+ """Get all the subclasses of the FileSystem class
+ """
+ all_subclasses = []
+ for subclass in cls.__subclasses__():
+ all_subclasses.append(subclass)
+ all_subclasses.extend(subclass.get_all_subclasses())
+ return all_subclasses
+
+ @classmethod
+ def scheme(cls):
+ """URI scheme for the FileSystem
+ """
+ raise NotImplementedError
+
@abc.abstractmethod
def join(self, basepath, *paths):
"""Join two or more pathname components for the filesystem
http://git-wip-us.apache.org/repos/asf/beam/blob/cbe182ee/sdks/python/apache_beam/io/filesystems.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filesystems.py b/sdks/python/apache_beam/io/filesystems.py
index 225a857..29f0644 100644
--- a/sdks/python/apache_beam/io/filesystems.py
+++ b/sdks/python/apache_beam/io/filesystems.py
@@ -17,24 +17,54 @@
"""FileSystems interface class for accessing the correct filesystem"""
+import re
+
from apache_beam.io.filesystem import BeamIOError
from apache_beam.io.filesystem import CompressionTypes
-from apache_beam.io.filesystems_util import get_filesystem
+from apache_beam.io.filesystem import FileSystem
+
+# All filesystem implements should be added here
+# pylint: disable=wrong-import-position, unused-import
+from apache_beam.io.localfilesystem import LocalFileSystem
+
+try:
+ from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem
+except ImportError:
+ pass
+# pylint: enable=wrong-import-position, unused-import
class FileSystems(object):
"""A class that defines the functions that can be performed on a filesystem.
All methods are static and access the underlying registered filesystems.
"""
+ URI_SCHEMA_PATTERN = re.compile('(?P<scheme>[a-zA-Z][-a-zA-Z0-9+.]*)://.*')
+
+ @staticmethod
+ def get_scheme(path):
+ match_result = FileSystems.URI_SCHEMA_PATTERN.match(path.strip())
+ if match_result is None:
+ return None
+ return match_result.groupdict()['scheme']
@staticmethod
def get_filesystem(path):
"""Get the correct filesystem for the specified path
"""
try:
- return get_filesystem(path)
+ path_scheme = FileSystems.get_scheme(path)
+ systems = [fs for fs in FileSystem.get_all_subclasses()
+ if fs.scheme() == path_scheme]
+ if len(systems) == 0:
+ raise ValueError('Unable to get the Filesystem for path %s' % path)
+ elif len(systems) == 1:
+ return systems[0]()
+ else:
+ raise ValueError('Found more than one filesystem for path %s' % path)
+ except ValueError:
+ raise
except Exception as e:
- raise BeamIOError('Enable to get the Filesystem', {path: e})
+ raise BeamIOError('Unable to get the Filesystem', {path: e})
@staticmethod
def join(basepath, *paths):
http://git-wip-us.apache.org/repos/asf/beam/blob/cbe182ee/sdks/python/apache_beam/io/filesystems_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filesystems_test.py b/sdks/python/apache_beam/io/filesystems_test.py
index 9165586..9a6f013 100644
--- a/sdks/python/apache_beam/io/filesystems_test.py
+++ b/sdks/python/apache_beam/io/filesystems_test.py
@@ -48,6 +48,19 @@ class FileSystemsTest(unittest.TestCase):
def tearDown(self):
shutil.rmtree(self.tmpdir)
+ def test_get_scheme(self):
+ self.assertIsNone(FileSystems.get_scheme('/abc/cdf'))
+ self.assertIsNone(FileSystems.get_scheme('c:\\abc\cdf')) # pylint: disable=anomalous-backslash-in-string
+ self.assertEqual(FileSystems.get_scheme('gs://abc/cdf'), 'gs')
+
+ def test_get_filesystem(self):
+ self.assertTrue(isinstance(FileSystems.get_filesystem('/tmp'),
+ localfilesystem.LocalFileSystem))
+ self.assertTrue(isinstance(FileSystems.get_filesystem('c:\\abc\def'), # pylint: disable=anomalous-backslash-in-string
+ localfilesystem.LocalFileSystem))
+ with self.assertRaises(ValueError):
+ FileSystems.get_filesystem('error://abc/def')
+
@mock.patch('apache_beam.io.localfilesystem.os')
def test_unix_path_join(self, *unused_mocks):
# Test joining of Unix paths.
@@ -110,7 +123,7 @@ class FileSystemsTest(unittest.TestCase):
with self.assertRaises(BeamIOError) as error:
FileSystems.match([None])
self.assertTrue(
- error.exception.message.startswith('Enable to get the Filesystem'))
+ error.exception.message.startswith('Unable to get the Filesystem'))
self.assertEqual(error.exception.exception_details.keys(), [None])
def test_match_directory(self):
http://git-wip-us.apache.org/repos/asf/beam/blob/cbe182ee/sdks/python/apache_beam/io/filesystems_util.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filesystems_util.py b/sdks/python/apache_beam/io/filesystems_util.py
deleted file mode 100644
index 6d21298..0000000
--- a/sdks/python/apache_beam/io/filesystems_util.py
+++ /dev/null
@@ -1,36 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-"""Utility functions for getting the correct file systems for a file name"""
-
-from apache_beam.io.localfilesystem import LocalFileSystem
-
-
-# TODO(BEAM-1585): Add a mechanism to add user implemented file systems
-def get_filesystem(path):
- """Function that returns the FileSystem class to use based on the path
- provided in the input.
- """
- if path.startswith('gs://'):
- try:
- from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem
- except ImportError:
- raise ImportError(
- 'Google Cloud Platform IO not available, '
- 'please install apache_beam[gcp]')
- return GCSFileSystem()
- else:
- return LocalFileSystem()
http://git-wip-us.apache.org/repos/asf/beam/blob/cbe182ee/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
index ba00f50..dc71fce 100644
--- a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
+++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
@@ -34,6 +34,12 @@ class GCSFileSystem(FileSystem):
CHUNK_SIZE = gcsio.MAX_BATCH_OPERATION_SIZE # Chuck size in batch operations
GCS_PREFIX = 'gs://'
+ @classmethod
+ def scheme(cls):
+ """URI scheme for the FileSystem
+ """
+ return 'gs'
+
def join(self, basepath, *paths):
"""Join two or more pathname components for the filesystem
http://git-wip-us.apache.org/repos/asf/beam/blob/cbe182ee/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
index 5fb9a62..923fc7d 100644
--- a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
+++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
@@ -36,6 +36,11 @@ except ImportError:
@unittest.skipIf(gcsfilesystem is None, 'GCP dependencies are not installed')
class GCSFileSystemTest(unittest.TestCase):
+ def test_scheme(self):
+ file_system = gcsfilesystem.GCSFileSystem()
+ self.assertEqual(file_system.scheme(), 'gs')
+ self.assertEqual(gcsfilesystem.GCSFileSystem.scheme(), 'gs')
+
def test_join(self):
file_system = gcsfilesystem.GCSFileSystem()
self.assertEqual('gs://bucket/path/to/file',
http://git-wip-us.apache.org/repos/asf/beam/blob/cbe182ee/sdks/python/apache_beam/io/localfilesystem.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/localfilesystem.py b/sdks/python/apache_beam/io/localfilesystem.py
index bc7c576..c670704 100644
--- a/sdks/python/apache_beam/io/localfilesystem.py
+++ b/sdks/python/apache_beam/io/localfilesystem.py
@@ -33,6 +33,11 @@ from apache_beam.io.filesystem import MatchResult
class LocalFileSystem(FileSystem):
"""A Local ``FileSystem`` implementation for accessing files on disk.
"""
+ @classmethod
+ def scheme(cls):
+ """URI scheme for the FileSystem
+ """
+ return None
def join(self, basepath, *paths):
"""Join two or more pathname components for the filesystem
http://git-wip-us.apache.org/repos/asf/beam/blob/cbe182ee/sdks/python/apache_beam/io/localfilesystem_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/localfilesystem_test.py b/sdks/python/apache_beam/io/localfilesystem_test.py
index 986dcaf..04cf5b7 100644
--- a/sdks/python/apache_beam/io/localfilesystem_test.py
+++ b/sdks/python/apache_beam/io/localfilesystem_test.py
@@ -61,6 +61,10 @@ class LocalFileSystemTest(unittest.TestCase):
def tearDown(self):
shutil.rmtree(self.tmpdir)
+ def test_scheme(self):
+ self.assertIsNone(self.fs.scheme())
+ self.assertIsNone(localfilesystem.LocalFileSystem.scheme())
+
@mock.patch('apache_beam.io.localfilesystem.os')
def test_unix_path_join(self, *unused_mocks):
# Test joining of Unix paths.
[2/2] beam git commit: This closes #2807
Posted by ch...@apache.org.
This closes #2807
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1ecb111d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1ecb111d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1ecb111d
Branch: refs/heads/master
Commit: 1ecb111d325f4fbac3cba82f3358038b6d4ed5b2
Parents: 50262be cbe182e
Author: chamikara@google.com <ch...@google.com>
Authored: Fri May 5 14:22:56 2017 -0700
Committer: chamikara@google.com <ch...@google.com>
Committed: Fri May 5 14:22:56 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/io/fileio_test.py | 3 +-
sdks/python/apache_beam/io/filesystem.py | 16 +++++++++
sdks/python/apache_beam/io/filesystems.py | 36 ++++++++++++++++++--
sdks/python/apache_beam/io/filesystems_test.py | 15 +++++++-
sdks/python/apache_beam/io/filesystems_util.py | 36 --------------------
sdks/python/apache_beam/io/gcp/gcsfilesystem.py | 6 ++++
.../apache_beam/io/gcp/gcsfilesystem_test.py | 5 +++
sdks/python/apache_beam/io/localfilesystem.py | 5 +++
.../apache_beam/io/localfilesystem_test.py | 4 +++
9 files changed, 84 insertions(+), 42 deletions(-)
----------------------------------------------------------------------