You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/01/24 00:41:00 UTC

[jira] [Commented] (BEAM-3099) Implement HDFS FileSystem for Python SDK

    [ https://issues.apache.org/jira/browse/BEAM-3099?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16336649#comment-16336649 ] 

ASF GitHub Bot commented on BEAM-3099:
--------------------------------------

chamikaramj closed pull request #4465: [BEAM-3099] Pass PipelineOptions to FileSystem constructor.
URL: https://github.com/apache/beam/pull/4465
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/python/apache_beam/io/filesystem.py b/sdks/python/apache_beam/io/filesystem.py
index 0efdb0ef751..424462ab30c 100644
--- a/sdks/python/apache_beam/io/filesystem.py
+++ b/sdks/python/apache_beam/io/filesystem.py
@@ -431,6 +431,12 @@ class FileSystem(BeamPlugin):
   __metaclass__ = abc.ABCMeta
   CHUNK_SIZE = 1  # Chuck size in the batch operations
 
+  def __init__(self, pipeline_options):
+    """
+    Args:
+      pipeline_options: Instance of ``PipelineOptions``.
+    """
+
   @staticmethod
   def _get_compression_type(path, compression_type):
     if compression_type == CompressionTypes.AUTO:
diff --git a/sdks/python/apache_beam/io/filesystems.py b/sdks/python/apache_beam/io/filesystems.py
index 0c82a7e25dc..dad4e5f9f27 100644
--- a/sdks/python/apache_beam/io/filesystems.py
+++ b/sdks/python/apache_beam/io/filesystems.py
@@ -42,6 +42,17 @@ class FileSystems(object):
   """
   URI_SCHEMA_PATTERN = re.compile('(?P<scheme>[a-zA-Z][-a-zA-Z0-9+.]*)://.*')
 
+  _pipeline_options = None
+
+  @classmethod
+  def set_options(cls, pipeline_options):
+    """Set filesystem options.
+
+    Args:
+      pipeline_options: Instance of ``PipelineOptions``.
+    """
+    cls._options = pipeline_options
+
   @staticmethod
   def get_scheme(path):
     match_result = FileSystems.URI_SCHEMA_PATTERN.match(path.strip())
@@ -60,7 +71,7 @@ def get_filesystem(path):
       if len(systems) == 0:
         raise ValueError('Unable to get the Filesystem for path %s' % path)
       elif len(systems) == 1:
-        return systems[0]()
+        return systems[0](pipeline_options=FileSystems._pipeline_options)
       else:
         raise ValueError('Found more than one filesystem for path %s' % path)
     except ValueError:
diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
index c174e48778e..bc55b08f7dd 100644
--- a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
+++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
@@ -24,6 +24,7 @@
 
 from apache_beam.io.filesystem import BeamIOError
 from apache_beam.io.filesystem import FileMetadata
+from apache_beam.options.pipeline_options import PipelineOptions
 
 # Protect against environments where apitools library is not available.
 # pylint: disable=wrong-import-order, wrong-import-position
@@ -37,39 +38,40 @@
 @unittest.skipIf(gcsfilesystem is None, 'GCP dependencies are not installed')
 class GCSFileSystemTest(unittest.TestCase):
 
+  def setUp(self):
+    pipeline_options = PipelineOptions()
+    self.fs = gcsfilesystem.GCSFileSystem(pipeline_options=pipeline_options)
+
   def test_scheme(self):
-    file_system = gcsfilesystem.GCSFileSystem()
-    self.assertEqual(file_system.scheme(), 'gs')
+    self.assertEqual(self.fs.scheme(), 'gs')
     self.assertEqual(gcsfilesystem.GCSFileSystem.scheme(), 'gs')
 
   def test_join(self):
-    file_system = gcsfilesystem.GCSFileSystem()
     self.assertEqual('gs://bucket/path/to/file',
-                     file_system.join('gs://bucket/path', 'to', 'file'))
+                     self.fs.join('gs://bucket/path', 'to', 'file'))
     self.assertEqual('gs://bucket/path/to/file',
-                     file_system.join('gs://bucket/path', 'to/file'))
+                     self.fs.join('gs://bucket/path', 'to/file'))
     self.assertEqual('gs://bucket/path/to/file',
-                     file_system.join('gs://bucket/path', '/to/file'))
+                     self.fs.join('gs://bucket/path', '/to/file'))
     self.assertEqual('gs://bucket/path/to/file',
-                     file_system.join('gs://bucket/path/', 'to', 'file'))
+                     self.fs.join('gs://bucket/path/', 'to', 'file'))
     self.assertEqual('gs://bucket/path/to/file',
-                     file_system.join('gs://bucket/path/', 'to/file'))
+                     self.fs.join('gs://bucket/path/', 'to/file'))
     self.assertEqual('gs://bucket/path/to/file',
-                     file_system.join('gs://bucket/path/', '/to/file'))
+                     self.fs.join('gs://bucket/path/', '/to/file'))
     with self.assertRaises(ValueError):
-      file_system.join('/bucket/path/', '/to/file')
+      self.fs.join('/bucket/path/', '/to/file')
 
   def test_split(self):
-    file_system = gcsfilesystem.GCSFileSystem()
     self.assertEqual(('gs://foo/bar', 'baz'),
-                     file_system.split('gs://foo/bar/baz'))
+                     self.fs.split('gs://foo/bar/baz'))
     self.assertEqual(('gs://foo', ''),
-                     file_system.split('gs://foo/'))
+                     self.fs.split('gs://foo/'))
     self.assertEqual(('gs://foo', ''),
-                     file_system.split('gs://foo'))
+                     self.fs.split('gs://foo'))
 
     with self.assertRaises(ValueError):
-      file_system.split('/no/gcs/prefix')
+      self.fs.split('/no/gcs/prefix')
 
   @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio')
   def test_match_multiples(self, mock_gcsio):
@@ -84,8 +86,7 @@ def test_match_multiples(self, mock_gcsio):
         FileMetadata('gs://bucket/file1', 1),
         FileMetadata('gs://bucket/file2', 2)
     ])
-    file_system = gcsfilesystem.GCSFileSystem()
-    match_result = file_system.match(['gs://bucket/'])[0]
+    match_result = self.fs.match(['gs://bucket/'])[0]
     self.assertEqual(
         set(match_result.metadata_list),
         expected_results)
@@ -104,8 +105,7 @@ def test_match_multiples_limit(self, mock_gcsio):
     expected_results = set([
         FileMetadata('gs://bucket/file1', 1)
     ])
-    file_system = gcsfilesystem.GCSFileSystem()
-    match_result = file_system.match(['gs://bucket/'], [limit])[0]
+    match_result = self.fs.match(['gs://bucket/'], [limit])[0]
     self.assertEqual(
         set(match_result.metadata_list),
         expected_results)
@@ -124,10 +124,9 @@ def test_match_multiples_error(self, mock_gcsio):
     gcsio_mock.size_of_files_in_glob.side_effect = exception
     expected_results = {'gs://bucket/': exception}
 
-    file_system = gcsfilesystem.GCSFileSystem()
     with self.assertRaisesRegexp(BeamIOError,
                                  r'^Match operation failed') as error:
-      file_system.match(['gs://bucket/'])
+      self.fs.match(['gs://bucket/'])
     self.assertEqual(error.exception.exception_details, expected_results)
     gcsio_mock.size_of_files_in_glob.assert_called_once_with(
         'gs://bucket/*', None)
@@ -145,8 +144,7 @@ def test_match_multiple_patterns(self, mock_gcsio):
         [FileMetadata('gs://bucket/file1', 1)],
         [FileMetadata('gs://bucket/file2', 2)]
     ]
-    file_system = gcsfilesystem.GCSFileSystem()
-    result = file_system.match(['gs://bucket/file1*', 'gs://bucket/file2*'])
+    result = self.fs.match(['gs://bucket/file1*', 'gs://bucket/file2*'])
     self.assertEqual(
         [mr.metadata_list for mr in result],
         expected_results)
@@ -157,8 +155,7 @@ def test_create(self, mock_gcsio):
     gcsio_mock = mock.MagicMock()
     gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock
     # Issue file copy
-    file_system = gcsfilesystem.GCSFileSystem()
-    _ = file_system.create('gs://bucket/from1', 'application/octet-stream')
+    _ = self.fs.create('gs://bucket/from1', 'application/octet-stream')
 
     gcsio_mock.open.assert_called_once_with(
         'gs://bucket/from1', 'wb', mime_type='application/octet-stream')
@@ -169,8 +166,7 @@ def test_open(self, mock_gcsio):
     gcsio_mock = mock.MagicMock()
     gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock
     # Issue file copy
-    file_system = gcsfilesystem.GCSFileSystem()
-    _ = file_system.open('gs://bucket/from1', 'application/octet-stream')
+    _ = self.fs.open('gs://bucket/from1', 'application/octet-stream')
 
     gcsio_mock.open.assert_called_once_with(
         'gs://bucket/from1', 'rb', mime_type='application/octet-stream')
@@ -184,8 +180,7 @@ def test_copy_file(self, mock_gcsio):
     destinations = ['gs://bucket/to1']
 
     # Issue file copy
-    file_system = gcsfilesystem.GCSFileSystem()
-    file_system.copy(sources, destinations)
+    self.fs.copy(sources, destinations)
 
     gcsio_mock.copy.assert_called_once_with(
         'gs://bucket/from1', 'gs://bucket/to1')
@@ -205,10 +200,9 @@ def test_copy_file_error(self, mock_gcsio):
     expected_results = {(s, d):exception for s, d in zip(sources, destinations)}
 
     # Issue batch copy.
-    file_system = gcsfilesystem.GCSFileSystem()
     with self.assertRaisesRegexp(BeamIOError,
                                  r'^Copy operation failed') as error:
-      file_system.copy(sources, destinations)
+      self.fs.copy(sources, destinations)
     self.assertEqual(error.exception.exception_details, expected_results)
 
     gcsio_mock.copy.assert_called_once_with(
@@ -223,8 +217,7 @@ def test_copy_tree(self, mock_gcsio):
     destinations = ['gs://bucket2/']
 
     # Issue directory copy
-    file_system = gcsfilesystem.GCSFileSystem()
-    file_system.copy(sources, destinations)
+    self.fs.copy(sources, destinations)
 
     gcsio_mock.copytree.assert_called_once_with(
         'gs://bucket1/', 'gs://bucket2/')
@@ -256,8 +249,7 @@ def test_rename(self, mock_gcsio):
     ]]
 
     # Issue batch rename.
-    file_system = gcsfilesystem.GCSFileSystem()
-    file_system.rename(sources, destinations)
+    self.fs.rename(sources, destinations)
 
     gcsio_mock.copy_batch.assert_called_once_with([
         ('gs://bucket/from1', 'gs://bucket/to1'),
@@ -297,10 +289,9 @@ def test_rename_error(self, mock_gcsio):
     expected_results = {(s, d):exception for s, d in zip(sources, destinations)}
 
     # Issue batch rename.
-    file_system = gcsfilesystem.GCSFileSystem()
     with self.assertRaisesRegexp(BeamIOError,
                                  r'^Rename operation failed') as error:
-      file_system.rename(sources, destinations)
+      self.fs.rename(sources, destinations)
     self.assertEqual(error.exception.exception_details, expected_results)
 
     gcsio_mock.copy_batch.assert_called_once_with([
@@ -326,8 +317,7 @@ def test_delete(self, mock_gcsio):
     ]
 
     # Issue batch delete.
-    file_system = gcsfilesystem.GCSFileSystem()
-    file_system.delete(files)
+    self.fs.delete(files)
     gcsio_mock.delete_batch.assert_called()
 
   @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio')
@@ -345,9 +335,8 @@ def test_delete_error(self, mock_gcsio):
     expected_results = {f:exception for f in files}
 
     # Issue batch delete.
-    file_system = gcsfilesystem.GCSFileSystem()
     with self.assertRaisesRegexp(BeamIOError,
                                  r'^Delete operation failed') as error:
-      file_system.delete(files)
+      self.fs.delete(files)
     self.assertEqual(error.exception.exception_details, expected_results)
     gcsio_mock.delete_batch.assert_called()
diff --git a/sdks/python/apache_beam/io/hadoopfilesystem.py b/sdks/python/apache_beam/io/hadoopfilesystem.py
index 9aad7f09dd1..a761068f413 100644
--- a/sdks/python/apache_beam/io/hadoopfilesystem.py
+++ b/sdks/python/apache_beam/io/hadoopfilesystem.py
@@ -52,12 +52,12 @@ class HadoopFileSystem(FileSystem):
   Uses client library :class:`hdfs3.core.HDFileSystem`.
   """
 
-  def __init__(self):
+  def __init__(self, pipeline_options):
     """Initializes a connection to HDFS.
 
     Connection configuration is done using :doc:`hdfs`.
     """
-    super(HadoopFileSystem, self).__init__()
+    super(HadoopFileSystem, self).__init__(pipeline_options)
     self._hdfs_client = HDFileSystem()
 
   @classmethod
diff --git a/sdks/python/apache_beam/io/hadoopfilesystem_test.py b/sdks/python/apache_beam/io/hadoopfilesystem_test.py
index af5bca86344..8a1c0f1b3d2 100644
--- a/sdks/python/apache_beam/io/hadoopfilesystem_test.py
+++ b/sdks/python/apache_beam/io/hadoopfilesystem_test.py
@@ -25,6 +25,7 @@
 
 from apache_beam.io import hadoopfilesystem
 from apache_beam.io.filesystem import BeamIOError
+from apache_beam.options.pipeline_options import PipelineOptions
 
 
 class FakeFile(StringIO.StringIO):
@@ -166,7 +167,8 @@ class HadoopFileSystemTest(unittest.TestCase):
   def setUp(self):
     self._fake_hdfs = FakeHdfs()
     hadoopfilesystem.HDFileSystem = lambda *args, **kwargs: self._fake_hdfs
-    self.fs = hadoopfilesystem.HadoopFileSystem()
+    pipeline_options = PipelineOptions()
+    self.fs = hadoopfilesystem.HadoopFileSystem(pipeline_options)
     self.tmpdir = 'hdfs://test_dir'
 
     for filename in ['old_file1', 'old_file2']:
diff --git a/sdks/python/apache_beam/io/localfilesystem_test.py b/sdks/python/apache_beam/io/localfilesystem_test.py
index 9bc1a0774c5..29b68f61c34 100644
--- a/sdks/python/apache_beam/io/localfilesystem_test.py
+++ b/sdks/python/apache_beam/io/localfilesystem_test.py
@@ -28,6 +28,7 @@
 
 from apache_beam.io import localfilesystem
 from apache_beam.io.filesystem import BeamIOError
+from apache_beam.options.pipeline_options import PipelineOptions
 
 
 def _gen_fake_join(separator):
@@ -56,7 +57,8 @@ class LocalFileSystemTest(unittest.TestCase):
 
   def setUp(self):
     self.tmpdir = tempfile.mkdtemp()
-    self.fs = localfilesystem.LocalFileSystem()
+    pipeline_options = PipelineOptions()
+    self.fs = localfilesystem.LocalFileSystem(pipeline_options)
 
   def tearDown(self):
     shutil.rmtree(self.tmpdir)
diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index 4c7d6012261..d3d1932901d 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -55,6 +55,7 @@
 
 from apache_beam import pvalue
 from apache_beam.internal import pickler
+from apache_beam.io.filesystems import FileSystems
 from apache_beam.options.pipeline_options import PipelineOptions
 from apache_beam.options.pipeline_options import SetupOptions
 from apache_beam.options.pipeline_options import StandardOptions
@@ -124,6 +125,8 @@ def __init__(self, runner=None, options=None, argv=None):
     else:
       self._options = PipelineOptions([])
 
+    FileSystems.set_options(self._options)
+
     if runner is None:
       runner = self._options.view_as(StandardOptions).runner
       if runner is None:


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Implement HDFS FileSystem for Python SDK
> ----------------------------------------
>
>                 Key: BEAM-3099
>                 URL: https://issues.apache.org/jira/browse/BEAM-3099
>             Project: Beam
>          Issue Type: New Feature
>          Components: sdk-py-core
>            Reporter: Chamikara Jayalath
>            Assignee: Udi Meiri
>            Priority: Major
>
> Currently Java SDK has HDFS support but Python SDK does not. With current portability efforts other runners may soon be able to use Python SDK. Having HDFS support will allow these runners to execute large scale jobs without using GCS. 
> Following suggests some libraries that can be used to connect to HDFS from Python.
> http://wesmckinney.com/blog/python-hdfs-interfaces/



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)