You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/04/21 17:53:10 UTC
[26/50] [abbrv] beam git commit: [BEAM-1441] Remove deprecated
ChannelFactory
[BEAM-1441] Remove deprecated ChannelFactory
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/97c66784
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/97c66784
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/97c66784
Branch: refs/heads/gearpump-runner
Commit: 97c667846b566c312ceaadc66fb14fde1dfa7ebe
Parents: 8319369
Author: Sourabh Bajaj <so...@google.com>
Authored: Fri Apr 14 14:45:16 2017 -0700
Committer: chamikara@google.com <ch...@google.com>
Committed: Wed Apr 19 09:56:28 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/io/fileio.py | 90 -------------------------------
1 file changed, 90 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/97c66784/sdks/python/apache_beam/io/fileio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py
index 8ee5198..f61289e 100644
--- a/sdks/python/apache_beam/io/fileio.py
+++ b/sdks/python/apache_beam/io/fileio.py
@@ -27,7 +27,6 @@ import time
from apache_beam.internal import util
from apache_beam.io import iobase
from apache_beam.io.filesystem import BeamIOError
-from apache_beam.io.filesystem import CompressedFile as _CompressedFile
from apache_beam.io.filesystem import CompressionTypes
from apache_beam.io.filesystems_util import get_filesystem
from apache_beam.transforms.display import DisplayDataItem
@@ -38,95 +37,6 @@ from apache_beam.utils.value_provider import check_accessible
DEFAULT_SHARD_NAME_TEMPLATE = '-SSSSS-of-NNNNN'
-# TODO(sourabhbajaj): Remove this after BFS API is used everywhere
-class ChannelFactory(object):
- @staticmethod
- def mkdir(path):
- bfs = get_filesystem(path)
- return bfs.mkdirs(path)
-
- @staticmethod
- def open(path,
- mode,
- mime_type='application/octet-stream',
- compression_type=CompressionTypes.AUTO):
- bfs = get_filesystem(path)
- if mode == 'rb':
- return bfs.open(path, mime_type, compression_type)
- elif mode == 'wb':
- return bfs.create(path, mime_type, compression_type)
-
- @staticmethod
- def is_compressed(fileobj):
- return isinstance(fileobj, _CompressedFile)
-
- @staticmethod
- def rename(src, dest):
- bfs = get_filesystem(src)
- return bfs.rename([src], [dest])
-
- @staticmethod
- def rename_batch(src_dest_pairs):
- sources = [s for s, _ in src_dest_pairs]
- destinations = [d for _, d in src_dest_pairs]
- if not sources:
- return []
- bfs = get_filesystem(sources[0])
- try:
- bfs.rename(sources, destinations)
- return []
- except BeamIOError as exp:
- return [(s, d, e) for (s, d), e in exp.exception_details.iteritems()]
-
- @staticmethod
- def copytree(src, dest):
- bfs = get_filesystem(src)
- return bfs.copy([src], [dest])
-
- @staticmethod
- def exists(path):
- bfs = get_filesystem(path)
- return bfs.exists(path)
-
- @staticmethod
- def rmdir(path):
- bfs = get_filesystem(path)
- return bfs.delete([path])
-
- @staticmethod
- def rm(path):
- bfs = get_filesystem(path)
- return bfs.delete([path])
-
- @staticmethod
- def glob(path, limit=None):
- bfs = get_filesystem(path)
- match_result = bfs.match([path], [limit])[0]
- return [f.path for f in match_result.metadata_list]
-
- @staticmethod
- def size_in_bytes(path):
- bfs = get_filesystem(path)
- match_result = bfs.match([path])[0]
- return [f.size_in_bytes for f in match_result.metadata_list][0]
-
- @staticmethod
- def size_of_files_in_glob(path, file_names=None):
- bfs = get_filesystem(path)
- match_result = bfs.match([path])[0]
- part_files = {f.path:f.size_in_bytes for f in match_result.metadata_list}
-
- if file_names is not None:
- specific_files = {}
- match_results = bfs.match(file_names)
- for match_result in match_results:
- for metadata in match_result.metadata_list:
- specific_files[metadata.path] = metadata.size_in_bytes
-
- part_files.update(specific_files)
- return part_files
-
-
class FileSink(iobase.Sink):
"""A sink to a GCS or local files.