You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/05/12 01:20:17 UTC
[1/2] beam git commit: CP #3111 Rename filesink to filebasedsink
Repository: beam
Updated Branches:
refs/heads/release-2.0.0 da2476d0b -> 905ebccf2
CP #3111 Rename filesink to filebasedsink
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/717ab8c1
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/717ab8c1
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/717ab8c1
Branch: refs/heads/release-2.0.0
Commit: 717ab8c14cb8c166168c812d4bd16e603831af47
Parents: da2476d
Author: Sourabh Bajaj <so...@google.com>
Authored: Thu May 11 17:23:40 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Thu May 11 18:19:55 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/io/__init__.py | 2 +-
sdks/python/apache_beam/io/avroio.py | 4 +-
sdks/python/apache_beam/io/filebasedsink.py | 299 ++++++++++++++++++
.../python/apache_beam/io/filebasedsink_test.py | 303 ++++++++++++++++++
sdks/python/apache_beam/io/fileio.py | 304 -------------------
sdks/python/apache_beam/io/fileio_test.py | 303 ------------------
sdks/python/apache_beam/io/gcp/gcsio.py | 6 +-
sdks/python/apache_beam/io/iobase.py | 12 +-
sdks/python/apache_beam/io/textio.py | 4 +-
sdks/python/apache_beam/io/tfrecordio.py | 6 +-
.../apache_beam/testing/pipeline_verifiers.py | 4 +-
11 files changed, 623 insertions(+), 624 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/717ab8c1/sdks/python/apache_beam/io/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/__init__.py b/sdks/python/apache_beam/io/__init__.py
index 881ce68..6ea0efd 100644
--- a/sdks/python/apache_beam/io/__init__.py
+++ b/sdks/python/apache_beam/io/__init__.py
@@ -19,7 +19,7 @@
# pylint: disable=wildcard-import
from apache_beam.io.avroio import *
-from apache_beam.io.fileio import *
+from apache_beam.io.filebasedsink import *
from apache_beam.io.iobase import Read
from apache_beam.io.iobase import Sink
from apache_beam.io.iobase import Write
http://git-wip-us.apache.org/repos/asf/beam/blob/717ab8c1/sdks/python/apache_beam/io/avroio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/avroio.py b/sdks/python/apache_beam/io/avroio.py
index 1c08c68..e02e1f7 100644
--- a/sdks/python/apache_beam/io/avroio.py
+++ b/sdks/python/apache_beam/io/avroio.py
@@ -27,7 +27,7 @@ from avro import schema
import apache_beam as beam
from apache_beam.io import filebasedsource
-from apache_beam.io import fileio
+from apache_beam.io import filebasedsink
from apache_beam.io import iobase
from apache_beam.io.filesystem import CompressionTypes
from apache_beam.io.iobase import Read
@@ -335,7 +335,7 @@ class WriteToAvro(beam.transforms.PTransform):
return {'sink_dd': self._sink}
-class _AvroSink(fileio.FileSink):
+class _AvroSink(filebasedsink.FileBasedSink):
"""A sink to avro files."""
def __init__(self,
http://git-wip-us.apache.org/repos/asf/beam/blob/717ab8c1/sdks/python/apache_beam/io/filebasedsink.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsink.py b/sdks/python/apache_beam/io/filebasedsink.py
new file mode 100644
index 0000000..76c09fc
--- /dev/null
+++ b/sdks/python/apache_beam/io/filebasedsink.py
@@ -0,0 +1,299 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""File-based sink."""
+
+from __future__ import absolute_import
+
+import logging
+import os
+import re
+import time
+import uuid
+
+from apache_beam.internal import util
+from apache_beam.io import iobase
+from apache_beam.io.filesystem import BeamIOError
+from apache_beam.io.filesystem import CompressionTypes
+from apache_beam.io.filesystems import FileSystems
+from apache_beam.transforms.display import DisplayDataItem
+from apache_beam.options.value_provider import ValueProvider
+from apache_beam.options.value_provider import StaticValueProvider
+from apache_beam.options.value_provider import check_accessible
+
+DEFAULT_SHARD_NAME_TEMPLATE = '-SSSSS-of-NNNNN'
+
+__all__ = ['FileBasedSink']
+
+
+class FileBasedSink(iobase.Sink):
+ """A sink to a GCS or local files.
+
+ To implement a file-based sink, extend this class and override
+ either ``write_record()`` or ``write_encoded_record()``.
+
+ If needed, also overwrite ``open()`` and/or ``close()`` to customize the
+ file handling or write headers and footers.
+
+ The output of this write is a PCollection of all written shards.
+ """
+
+ # Max number of threads to be used for renaming.
+ _MAX_RENAME_THREADS = 64
+
+ def __init__(self,
+ file_path_prefix,
+ coder,
+ file_name_suffix='',
+ num_shards=0,
+ shard_name_template=None,
+ mime_type='application/octet-stream',
+ compression_type=CompressionTypes.AUTO):
+ """
+ Raises:
+ TypeError: if file path parameters are not a string or ValueProvider,
+ or if compression_type is not member of CompressionTypes.
+ ValueError: if shard_name_template is not of expected format.
+ """
+ if not isinstance(file_path_prefix, (basestring, ValueProvider)):
+ raise TypeError('file_path_prefix must be a string or ValueProvider;'
+ 'got %r instead' % file_path_prefix)
+ if not isinstance(file_name_suffix, (basestring, ValueProvider)):
+ raise TypeError('file_name_suffix must be a string or ValueProvider;'
+ 'got %r instead' % file_name_suffix)
+
+ if not CompressionTypes.is_valid_compression_type(compression_type):
+ raise TypeError('compression_type must be CompressionType object but '
+ 'was %s' % type(compression_type))
+ if shard_name_template is None:
+ shard_name_template = DEFAULT_SHARD_NAME_TEMPLATE
+ elif shard_name_template == '':
+ num_shards = 1
+ if isinstance(file_path_prefix, basestring):
+ file_path_prefix = StaticValueProvider(str, file_path_prefix)
+ if isinstance(file_name_suffix, basestring):
+ file_name_suffix = StaticValueProvider(str, file_name_suffix)
+ self.file_path_prefix = file_path_prefix
+ self.file_name_suffix = file_name_suffix
+ self.num_shards = num_shards
+ self.coder = coder
+ self.shard_name_format = self._template_to_format(shard_name_template)
+ self.compression_type = compression_type
+ self.mime_type = mime_type
+
+ def display_data(self):
+ return {'shards':
+ DisplayDataItem(self.num_shards,
+ label='Number of Shards').drop_if_default(0),
+ 'compression':
+ DisplayDataItem(str(self.compression_type)),
+ 'file_pattern':
+ DisplayDataItem('{}{}{}'.format(self.file_path_prefix,
+ self.shard_name_format,
+ self.file_name_suffix),
+ label='File Pattern')}
+
+ @check_accessible(['file_path_prefix'])
+ def open(self, temp_path):
+ """Opens ``temp_path``, returning an opaque file handle object.
+
+ The returned file handle is passed to ``write_[encoded_]record`` and
+ ``close``.
+ """
+ return FileSystems.create(temp_path, self.mime_type, self.compression_type)
+
+ def write_record(self, file_handle, value):
+ """Writes a single record go the file handle returned by ``open()``.
+
+ By default, calls ``write_encoded_record`` after encoding the record with
+ this sink's Coder.
+ """
+ self.write_encoded_record(file_handle, self.coder.encode(value))
+
+ def write_encoded_record(self, file_handle, encoded_value):
+ """Writes a single encoded record to the file handle returned by ``open()``.
+ """
+ raise NotImplementedError
+
+ def close(self, file_handle):
+ """Finalize and close the file handle returned from ``open()``.
+
+ Called after all records are written.
+
+ By default, calls ``file_handle.close()`` iff it is not None.
+ """
+ if file_handle is not None:
+ file_handle.close()
+
+ @check_accessible(['file_path_prefix', 'file_name_suffix'])
+ def initialize_write(self):
+ file_path_prefix = self.file_path_prefix.get()
+
+ tmp_dir = self._create_temp_dir(file_path_prefix)
+ FileSystems.mkdirs(tmp_dir)
+ return tmp_dir
+
+ def _create_temp_dir(self, file_path_prefix):
+ base_path, last_component = FileSystems.split(file_path_prefix)
+ if not last_component:
+ # Trying to re-split the base_path to check if it's a root.
+ new_base_path, _ = FileSystems.split(base_path)
+ if base_path == new_base_path:
+ raise ValueError('Cannot create a temporary directory for root path '
+ 'prefix %s. Please specify a file path prefix with '
+ 'at least two components.',
+ file_path_prefix)
+ path_components = [base_path,
+ 'beam-temp-' + last_component + '-' + uuid.uuid1().hex]
+ return FileSystems.join(*path_components)
+
+ @check_accessible(['file_path_prefix', 'file_name_suffix'])
+ def open_writer(self, init_result, uid):
+ # A proper suffix is needed for AUTO compression detection.
+ # We also ensure there will be no collisions with uid and a
+ # (possibly unsharded) file_path_prefix and a (possibly empty)
+ # file_name_suffix.
+ file_path_prefix = self.file_path_prefix.get()
+ file_name_suffix = self.file_name_suffix.get()
+ suffix = (
+ '.' + os.path.basename(file_path_prefix) + file_name_suffix)
+ return FileBasedSinkWriter(self, os.path.join(init_result, uid) + suffix)
+
+ @check_accessible(['file_path_prefix', 'file_name_suffix'])
+ def finalize_write(self, init_result, writer_results):
+ file_path_prefix = self.file_path_prefix.get()
+ file_name_suffix = self.file_name_suffix.get()
+ writer_results = sorted(writer_results)
+ num_shards = len(writer_results)
+ min_threads = min(num_shards, FileBasedSink._MAX_RENAME_THREADS)
+ num_threads = max(1, min_threads)
+
+ source_files = []
+ destination_files = []
+ chunk_size = FileSystems.get_chunk_size(file_path_prefix)
+ for shard_num, shard in enumerate(writer_results):
+ final_name = ''.join([
+ file_path_prefix, self.shard_name_format % dict(
+ shard_num=shard_num, num_shards=num_shards), file_name_suffix
+ ])
+ source_files.append(shard)
+ destination_files.append(final_name)
+
+ source_file_batch = [source_files[i:i + chunk_size]
+ for i in xrange(0, len(source_files),
+ chunk_size)]
+ destination_file_batch = [destination_files[i:i + chunk_size]
+ for i in xrange(0, len(destination_files),
+ chunk_size)]
+
+ logging.info(
+ 'Starting finalize_write threads with num_shards: %d, '
+ 'batches: %d, num_threads: %d',
+ num_shards, len(source_file_batch), num_threads)
+ start_time = time.time()
+
+ # Use a thread pool for renaming operations.
+ def _rename_batch(batch):
+ """_rename_batch executes batch rename operations."""
+ source_files, destination_files = batch
+ exceptions = []
+ try:
+ FileSystems.rename(source_files, destination_files)
+ return exceptions
+ except BeamIOError as exp:
+ if exp.exception_details is None:
+ raise
+ for (src, dest), exception in exp.exception_details.iteritems():
+ if exception:
+ logging.warning('Rename not successful: %s -> %s, %s', src, dest,
+ exception)
+ should_report = True
+ if isinstance(exception, IOError):
+ # May have already been copied.
+ try:
+ if FileSystems.exists(dest):
+ should_report = False
+ except Exception as exists_e: # pylint: disable=broad-except
+ logging.warning('Exception when checking if file %s exists: '
+ '%s', dest, exists_e)
+ if should_report:
+ logging.warning(('Exception in _rename_batch. src: %s, '
+ 'dest: %s, err: %s'), src, dest, exception)
+ exceptions.append(exception)
+ else:
+ logging.debug('Rename successful: %s -> %s', src, dest)
+ return exceptions
+
+ exception_batches = util.run_using_threadpool(
+ _rename_batch, zip(source_file_batch, destination_file_batch),
+ num_threads)
+
+ all_exceptions = [e for exception_batch in exception_batches
+ for e in exception_batch]
+ if all_exceptions:
+ raise Exception('Encountered exceptions in finalize_write: %s',
+ all_exceptions)
+
+ for final_name in destination_files:
+ yield final_name
+
+ logging.info('Renamed %d shards in %.2f seconds.', num_shards,
+ time.time() - start_time)
+
+ try:
+ FileSystems.delete([init_result])
+ except IOError:
+ # May have already been removed.
+ pass
+
+ @staticmethod
+ def _template_to_format(shard_name_template):
+ if not shard_name_template:
+ return ''
+ m = re.search('S+', shard_name_template)
+ if m is None:
+ raise ValueError("Shard number pattern S+ not found in template '%s'" %
+ shard_name_template)
+ shard_name_format = shard_name_template.replace(
+ m.group(0), '%%(shard_num)0%dd' % len(m.group(0)))
+ m = re.search('N+', shard_name_format)
+ if m:
+ shard_name_format = shard_name_format.replace(
+ m.group(0), '%%(num_shards)0%dd' % len(m.group(0)))
+ return shard_name_format
+
+ def __eq__(self, other):
+ # TODO: Clean up workitem_test which uses this.
+ # pylint: disable=unidiomatic-typecheck
+ return type(self) == type(other) and self.__dict__ == other.__dict__
+
+
+class FileBasedSinkWriter(iobase.Writer):
+ """The writer for FileBasedSink.
+ """
+
+ def __init__(self, sink, temp_shard_path):
+ self.sink = sink
+ self.temp_shard_path = temp_shard_path
+ self.temp_handle = self.sink.open(temp_shard_path)
+
+ def write(self, value):
+ self.sink.write_record(self.temp_handle, value)
+
+ def close(self):
+ self.sink.close(self.temp_handle)
+ return self.temp_shard_path
http://git-wip-us.apache.org/repos/asf/beam/blob/717ab8c1/sdks/python/apache_beam/io/filebasedsink_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsink_test.py b/sdks/python/apache_beam/io/filebasedsink_test.py
new file mode 100644
index 0000000..1f6aeee
--- /dev/null
+++ b/sdks/python/apache_beam/io/filebasedsink_test.py
@@ -0,0 +1,303 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for file sinks."""
+
+import glob
+import logging
+import os
+import shutil
+import tempfile
+import unittest
+
+import hamcrest as hc
+import mock
+
+import apache_beam as beam
+from apache_beam.coders import coders
+from apache_beam.io import filebasedsink
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.transforms.display import DisplayData
+from apache_beam.transforms.display_test import DisplayDataItemMatcher
+
+from apache_beam.options.value_provider import StaticValueProvider
+
+
+# TODO: Refactor code so all io tests are using same library
+# TestCaseWithTempDirCleanup class.
+class _TestCaseWithTempDirCleanUp(unittest.TestCase):
+ """Base class for TestCases that deals with TempDir clean-up.
+
+ Inherited test cases will call self._new_tempdir() to start a temporary dir
+ which will be deleted at the end of the tests (when tearDown() is called).
+ """
+
+ def setUp(self):
+ self._tempdirs = []
+
+ def tearDown(self):
+ for path in self._tempdirs:
+ if os.path.exists(path):
+ shutil.rmtree(path)
+ self._tempdirs = []
+
+ def _new_tempdir(self):
+ result = tempfile.mkdtemp()
+ self._tempdirs.append(result)
+ return result
+
+ def _create_temp_file(self, name='', suffix=''):
+ if not name:
+ name = tempfile.template
+ file_name = tempfile.NamedTemporaryFile(
+ delete=False, prefix=name,
+ dir=self._new_tempdir(), suffix=suffix).name
+ return file_name
+
+
+class MyFileBasedSink(filebasedsink.FileBasedSink):
+
+ def open(self, temp_path):
+ # TODO: Fix main session pickling.
+ # file_handle = super(MyFileBasedSink, self).open(temp_path)
+ file_handle = filebasedsink.FileBasedSink.open(self, temp_path)
+ file_handle.write('[start]')
+ return file_handle
+
+ def write_encoded_record(self, file_handle, encoded_value):
+ file_handle.write('[')
+ file_handle.write(encoded_value)
+ file_handle.write(']')
+
+ def close(self, file_handle):
+ file_handle.write('[end]')
+ # TODO: Fix main session pickling.
+ # file_handle = super(MyFileBasedSink, self).close(file_handle)
+ file_handle = filebasedsink.FileBasedSink.close(self, file_handle)
+
+
+class TestFileBasedSink(_TestCaseWithTempDirCleanUp):
+
+ def test_file_sink_writing(self):
+ temp_path = os.path.join(self._new_tempdir(), 'FileBasedSink')
+ sink = MyFileBasedSink(
+ temp_path, file_name_suffix='.output', coder=coders.ToStringCoder())
+
+ # Manually invoke the generic Sink API.
+ init_token = sink.initialize_write()
+
+ writer1 = sink.open_writer(init_token, '1')
+ writer1.write('a')
+ writer1.write('b')
+ res1 = writer1.close()
+
+ writer2 = sink.open_writer(init_token, '2')
+ writer2.write('x')
+ writer2.write('y')
+ writer2.write('z')
+ res2 = writer2.close()
+
+ _ = list(sink.finalize_write(init_token, [res1, res2]))
+ # Retry the finalize operation (as if the first attempt was lost).
+ res = list(sink.finalize_write(init_token, [res1, res2]))
+
+ # Check the results.
+ shard1 = temp_path + '-00000-of-00002.output'
+ shard2 = temp_path + '-00001-of-00002.output'
+ self.assertEqual(res, [shard1, shard2])
+ self.assertEqual(open(shard1).read(), '[start][a][b][end]')
+ self.assertEqual(open(shard2).read(), '[start][x][y][z][end]')
+
+ # Check that any temp files are deleted.
+ self.assertItemsEqual([shard1, shard2], glob.glob(temp_path + '*'))
+
+ def test_file_sink_display_data(self):
+ temp_path = os.path.join(self._new_tempdir(), 'display')
+ sink = MyFileBasedSink(
+ temp_path, file_name_suffix='.output', coder=coders.ToStringCoder())
+ dd = DisplayData.create_from(sink)
+ expected_items = [
+ DisplayDataItemMatcher(
+ 'compression', 'auto'),
+ DisplayDataItemMatcher(
+ 'file_pattern',
+ '{}{}'.format(
+ temp_path,
+ '-%(shard_num)05d-of-%(num_shards)05d.output'))]
+ hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
+
+ def test_empty_write(self):
+ temp_path = tempfile.NamedTemporaryFile().name
+ sink = MyFileBasedSink(
+ temp_path, file_name_suffix='.output', coder=coders.ToStringCoder()
+ )
+ p = TestPipeline()
+ p | beam.Create([]) | beam.io.Write(sink) # pylint: disable=expression-not-assigned
+ p.run()
+ self.assertEqual(
+ open(temp_path + '-00000-of-00001.output').read(), '[start][end]')
+
+ def test_static_value_provider_empty_write(self):
+ temp_path = StaticValueProvider(value_type=str,
+ value=tempfile.NamedTemporaryFile().name)
+ sink = MyFileBasedSink(
+ temp_path,
+ file_name_suffix=StaticValueProvider(value_type=str, value='.output'),
+ coder=coders.ToStringCoder()
+ )
+ p = TestPipeline()
+ p | beam.Create([]) | beam.io.Write(sink) # pylint: disable=expression-not-assigned
+ p.run()
+ self.assertEqual(
+ open(temp_path.get() + '-00000-of-00001.output').read(), '[start][end]')
+
+ def test_fixed_shard_write(self):
+ temp_path = os.path.join(self._new_tempdir(), 'empty')
+ sink = MyFileBasedSink(
+ temp_path,
+ file_name_suffix='.output',
+ num_shards=3,
+ shard_name_template='_NN_SSS_',
+ coder=coders.ToStringCoder())
+ p = TestPipeline()
+ p | beam.Create(['a', 'b']) | beam.io.Write(sink) # pylint: disable=expression-not-assigned
+
+ p.run()
+
+ concat = ''.join(
+ open(temp_path + '_03_%03d_.output' % shard_num).read()
+ for shard_num in range(3))
+ self.assertTrue('][a][' in concat, concat)
+ self.assertTrue('][b][' in concat, concat)
+
+ # Not using 'test' in name so that 'nose' doesn't pick this as a test.
+ def run_temp_dir_check(self, no_dir_path, dir_path, no_dir_root_path,
+ dir_root_path, prefix, separator):
+ def _get_temp_dir(file_path_prefix):
+ sink = MyFileBasedSink(
+ file_path_prefix, file_name_suffix='.output',
+ coder=coders.ToStringCoder())
+ return sink.initialize_write()
+
+ temp_dir = _get_temp_dir(no_dir_path)
+ self.assertTrue(temp_dir.startswith(prefix))
+ last_sep = temp_dir.rfind(separator)
+ self.assertTrue(temp_dir[last_sep + 1:].startswith('beam-temp'))
+
+ temp_dir = _get_temp_dir(dir_path)
+ self.assertTrue(temp_dir.startswith(prefix))
+ last_sep = temp_dir.rfind(separator)
+ self.assertTrue(temp_dir[last_sep + 1:].startswith('beam-temp'))
+
+ with self.assertRaises(ValueError):
+ _get_temp_dir(no_dir_root_path)
+
+ with self.assertRaises(ValueError):
+ _get_temp_dir(dir_root_path)
+
+ def test_temp_dir_gcs(self):
+ try:
+ self.run_temp_dir_check(
+ 'gs://aaa/bbb', 'gs://aaa/bbb/', 'gs://aaa', 'gs://aaa/', 'gs://',
+ '/')
+ except ValueError:
+ logging.debug('Ignoring test since GCP module is not installed')
+
+ @mock.patch('apache_beam.io.localfilesystem.os')
+ def test_temp_dir_local(self, filesystem_os_mock):
+ # Here we test a unix-like mock file-system
+ # (not really testing Unix or Windows since we mock the function of 'os'
+ # module).
+
+ def _fake_unix_split(path):
+ sep = path.rfind('/')
+ if sep < 0:
+ raise ValueError('Path must contain a separator')
+ return (path[:sep], path[sep + 1:])
+
+ def _fake_unix_join(base, path):
+ return base + '/' + path
+
+ filesystem_os_mock.path.abspath = lambda a: a
+ filesystem_os_mock.path.split.side_effect = _fake_unix_split
+ filesystem_os_mock.path.join.side_effect = _fake_unix_join
+ self.run_temp_dir_check(
+ '/aaa/bbb', '/aaa/bbb/', '/', '/', '/', '/')
+
+ def test_file_sink_multi_shards(self):
+ temp_path = os.path.join(self._new_tempdir(), 'multishard')
+ sink = MyFileBasedSink(
+ temp_path, file_name_suffix='.output', coder=coders.ToStringCoder())
+
+ # Manually invoke the generic Sink API.
+ init_token = sink.initialize_write()
+
+ num_shards = 1000
+ writer_results = []
+ for i in range(num_shards):
+ uuid = 'uuid-%05d' % i
+ writer = sink.open_writer(init_token, uuid)
+ writer.write('a')
+ writer.write('b')
+ writer.write(uuid)
+ writer_results.append(writer.close())
+
+ res_first = list(sink.finalize_write(init_token, writer_results))
+ # Retry the finalize operation (as if the first attempt was lost).
+ res_second = list(sink.finalize_write(init_token, writer_results))
+
+ self.assertItemsEqual(res_first, res_second)
+
+ res = sorted(res_second)
+ for i in range(num_shards):
+ shard_name = '%s-%05d-of-%05d.output' % (temp_path, i, num_shards)
+ uuid = 'uuid-%05d' % i
+ self.assertEqual(res[i], shard_name)
+ self.assertEqual(
+ open(shard_name).read(), ('[start][a][b][%s][end]' % uuid))
+
+ # Check that any temp files are deleted.
+ self.assertItemsEqual(res, glob.glob(temp_path + '*'))
+
+ def test_file_sink_io_error(self):
+ temp_path = os.path.join(self._new_tempdir(), 'ioerror')
+ sink = MyFileBasedSink(
+ temp_path, file_name_suffix='.output', coder=coders.ToStringCoder())
+
+ # Manually invoke the generic Sink API.
+ init_token = sink.initialize_write()
+
+ writer1 = sink.open_writer(init_token, '1')
+ writer1.write('a')
+ writer1.write('b')
+ res1 = writer1.close()
+
+ writer2 = sink.open_writer(init_token, '2')
+ writer2.write('x')
+ writer2.write('y')
+ writer2.write('z')
+ res2 = writer2.close()
+
+ os.remove(res2)
+ with self.assertRaises(Exception):
+ list(sink.finalize_write(init_token, [res1, res2]))
+
+
+if __name__ == '__main__':
+ logging.getLogger().setLevel(logging.INFO)
+ unittest.main()
http://git-wip-us.apache.org/repos/asf/beam/blob/717ab8c1/sdks/python/apache_beam/io/fileio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py
deleted file mode 100644
index aa18093..0000000
--- a/sdks/python/apache_beam/io/fileio.py
+++ /dev/null
@@ -1,304 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""File-based sources and sinks."""
-
-from __future__ import absolute_import
-
-import logging
-import os
-import re
-import time
-import uuid
-
-from apache_beam.internal import util
-from apache_beam.io import iobase
-from apache_beam.io.filesystem import BeamIOError
-from apache_beam.io.filesystem import CompressionTypes
-from apache_beam.io.filesystems import FileSystems
-from apache_beam.transforms.display import DisplayDataItem
-from apache_beam.options.value_provider import ValueProvider
-from apache_beam.options.value_provider import StaticValueProvider
-from apache_beam.options.value_provider import check_accessible
-
-DEFAULT_SHARD_NAME_TEMPLATE = '-SSSSS-of-NNNNN'
-
-__all__ = ['FileBasedSink']
-
-
-class FileSink(iobase.Sink):
- """A sink to a GCS or local files.
-
- To implement a file-based sink, extend this class and override
- either ``write_record()`` or ``write_encoded_record()``.
-
- If needed, also overwrite ``open()`` and/or ``close()`` to customize the
- file handling or write headers and footers.
-
- The output of this write is a PCollection of all written shards.
- """
-
- # Max number of threads to be used for renaming.
- _MAX_RENAME_THREADS = 64
-
- def __init__(self,
- file_path_prefix,
- coder,
- file_name_suffix='',
- num_shards=0,
- shard_name_template=None,
- mime_type='application/octet-stream',
- compression_type=CompressionTypes.AUTO):
- """
- Raises:
- TypeError: if file path parameters are not a string or ValueProvider,
- or if compression_type is not member of CompressionTypes.
- ValueError: if shard_name_template is not of expected format.
- """
- if not isinstance(file_path_prefix, (basestring, ValueProvider)):
- raise TypeError('file_path_prefix must be a string or ValueProvider;'
- 'got %r instead' % file_path_prefix)
- if not isinstance(file_name_suffix, (basestring, ValueProvider)):
- raise TypeError('file_name_suffix must be a string or ValueProvider;'
- 'got %r instead' % file_name_suffix)
-
- if not CompressionTypes.is_valid_compression_type(compression_type):
- raise TypeError('compression_type must be CompressionType object but '
- 'was %s' % type(compression_type))
- if shard_name_template is None:
- shard_name_template = DEFAULT_SHARD_NAME_TEMPLATE
- elif shard_name_template == '':
- num_shards = 1
- if isinstance(file_path_prefix, basestring):
- file_path_prefix = StaticValueProvider(str, file_path_prefix)
- if isinstance(file_name_suffix, basestring):
- file_name_suffix = StaticValueProvider(str, file_name_suffix)
- self.file_path_prefix = file_path_prefix
- self.file_name_suffix = file_name_suffix
- self.num_shards = num_shards
- self.coder = coder
- self.shard_name_format = self._template_to_format(shard_name_template)
- self.compression_type = compression_type
- self.mime_type = mime_type
-
- def display_data(self):
- return {'shards':
- DisplayDataItem(self.num_shards,
- label='Number of Shards').drop_if_default(0),
- 'compression':
- DisplayDataItem(str(self.compression_type)),
- 'file_pattern':
- DisplayDataItem('{}{}{}'.format(self.file_path_prefix,
- self.shard_name_format,
- self.file_name_suffix),
- label='File Pattern')}
-
- @check_accessible(['file_path_prefix'])
- def open(self, temp_path):
- """Opens ``temp_path``, returning an opaque file handle object.
-
- The returned file handle is passed to ``write_[encoded_]record`` and
- ``close``.
- """
- return FileSystems.create(temp_path, self.mime_type, self.compression_type)
-
- def write_record(self, file_handle, value):
- """Writes a single record go the file handle returned by ``open()``.
-
- By default, calls ``write_encoded_record`` after encoding the record with
- this sink's Coder.
- """
- self.write_encoded_record(file_handle, self.coder.encode(value))
-
- def write_encoded_record(self, file_handle, encoded_value):
- """Writes a single encoded record to the file handle returned by ``open()``.
- """
- raise NotImplementedError
-
- def close(self, file_handle):
- """Finalize and close the file handle returned from ``open()``.
-
- Called after all records are written.
-
- By default, calls ``file_handle.close()`` iff it is not None.
- """
- if file_handle is not None:
- file_handle.close()
-
- @check_accessible(['file_path_prefix', 'file_name_suffix'])
- def initialize_write(self):
- file_path_prefix = self.file_path_prefix.get()
-
- tmp_dir = self._create_temp_dir(file_path_prefix)
- FileSystems.mkdirs(tmp_dir)
- return tmp_dir
-
- def _create_temp_dir(self, file_path_prefix):
- base_path, last_component = FileSystems.split(file_path_prefix)
- if not last_component:
- # Trying to re-split the base_path to check if it's a root.
- new_base_path, _ = FileSystems.split(base_path)
- if base_path == new_base_path:
- raise ValueError('Cannot create a temporary directory for root path '
- 'prefix %s. Please specify a file path prefix with '
- 'at least two components.',
- file_path_prefix)
- path_components = [base_path,
- 'beam-temp-' + last_component + '-' + uuid.uuid1().hex]
- return FileSystems.join(*path_components)
-
- @check_accessible(['file_path_prefix', 'file_name_suffix'])
- def open_writer(self, init_result, uid):
- # A proper suffix is needed for AUTO compression detection.
- # We also ensure there will be no collisions with uid and a
- # (possibly unsharded) file_path_prefix and a (possibly empty)
- # file_name_suffix.
- file_path_prefix = self.file_path_prefix.get()
- file_name_suffix = self.file_name_suffix.get()
- suffix = (
- '.' + os.path.basename(file_path_prefix) + file_name_suffix)
- return FileSinkWriter(self, os.path.join(init_result, uid) + suffix)
-
- @check_accessible(['file_path_prefix', 'file_name_suffix'])
- def finalize_write(self, init_result, writer_results):
- file_path_prefix = self.file_path_prefix.get()
- file_name_suffix = self.file_name_suffix.get()
- writer_results = sorted(writer_results)
- num_shards = len(writer_results)
- min_threads = min(num_shards, FileSink._MAX_RENAME_THREADS)
- num_threads = max(1, min_threads)
-
- source_files = []
- destination_files = []
- chunk_size = FileSystems.get_chunk_size(file_path_prefix)
- for shard_num, shard in enumerate(writer_results):
- final_name = ''.join([
- file_path_prefix, self.shard_name_format % dict(
- shard_num=shard_num, num_shards=num_shards), file_name_suffix
- ])
- source_files.append(shard)
- destination_files.append(final_name)
-
- source_file_batch = [source_files[i:i + chunk_size]
- for i in xrange(0, len(source_files),
- chunk_size)]
- destination_file_batch = [destination_files[i:i + chunk_size]
- for i in xrange(0, len(destination_files),
- chunk_size)]
-
- logging.info(
- 'Starting finalize_write threads with num_shards: %d, '
- 'batches: %d, num_threads: %d',
- num_shards, len(source_file_batch), num_threads)
- start_time = time.time()
-
- # Use a thread pool for renaming operations.
- def _rename_batch(batch):
- """_rename_batch executes batch rename operations."""
- source_files, destination_files = batch
- exceptions = []
- try:
- FileSystems.rename(source_files, destination_files)
- return exceptions
- except BeamIOError as exp:
- if exp.exception_details is None:
- raise
- for (src, dest), exception in exp.exception_details.iteritems():
- if exception:
- logging.warning('Rename not successful: %s -> %s, %s', src, dest,
- exception)
- should_report = True
- if isinstance(exception, IOError):
- # May have already been copied.
- try:
- if FileSystems.exists(dest):
- should_report = False
- except Exception as exists_e: # pylint: disable=broad-except
- logging.warning('Exception when checking if file %s exists: '
- '%s', dest, exists_e)
- if should_report:
- logging.warning(('Exception in _rename_batch. src: %s, '
- 'dest: %s, err: %s'), src, dest, exception)
- exceptions.append(exception)
- else:
- logging.debug('Rename successful: %s -> %s', src, dest)
- return exceptions
-
- exception_batches = util.run_using_threadpool(
- _rename_batch, zip(source_file_batch, destination_file_batch),
- num_threads)
-
- all_exceptions = [e for exception_batch in exception_batches
- for e in exception_batch]
- if all_exceptions:
- raise Exception('Encountered exceptions in finalize_write: %s',
- all_exceptions)
-
- for final_name in destination_files:
- yield final_name
-
- logging.info('Renamed %d shards in %.2f seconds.', num_shards,
- time.time() - start_time)
-
- try:
- FileSystems.delete([init_result])
- except IOError:
- # May have already been removed.
- pass
-
- @staticmethod
- def _template_to_format(shard_name_template):
- if not shard_name_template:
- return ''
- m = re.search('S+', shard_name_template)
- if m is None:
- raise ValueError("Shard number pattern S+ not found in template '%s'" %
- shard_name_template)
- shard_name_format = shard_name_template.replace(
- m.group(0), '%%(shard_num)0%dd' % len(m.group(0)))
- m = re.search('N+', shard_name_format)
- if m:
- shard_name_format = shard_name_format.replace(
- m.group(0), '%%(num_shards)0%dd' % len(m.group(0)))
- return shard_name_format
-
- def __eq__(self, other):
- # TODO: Clean up workitem_test which uses this.
- # pylint: disable=unidiomatic-typecheck
- return type(self) == type(other) and self.__dict__ == other.__dict__
-
-
-# Using FileBasedSink for the public API to be symmetric with FileBasedSource.
-# TODO: move code from FileSink to here and delete that class.
-FileBasedSink = FileSink
-
-
-class FileSinkWriter(iobase.Writer):
- """The writer for FileSink.
- """
-
- def __init__(self, sink, temp_shard_path):
- self.sink = sink
- self.temp_shard_path = temp_shard_path
- self.temp_handle = self.sink.open(temp_shard_path)
-
- def write(self, value):
- self.sink.write_record(self.temp_handle, value)
-
- def close(self):
- self.sink.close(self.temp_handle)
- return self.temp_shard_path
http://git-wip-us.apache.org/repos/asf/beam/blob/717ab8c1/sdks/python/apache_beam/io/fileio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio_test.py b/sdks/python/apache_beam/io/fileio_test.py
deleted file mode 100644
index b92b8be..0000000
--- a/sdks/python/apache_beam/io/fileio_test.py
+++ /dev/null
@@ -1,303 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Unit tests for file sinks."""
-
-import glob
-import logging
-import os
-import shutil
-import tempfile
-import unittest
-
-import hamcrest as hc
-import mock
-
-import apache_beam as beam
-from apache_beam.coders import coders
-from apache_beam.io import fileio
-from apache_beam.testing.test_pipeline import TestPipeline
-from apache_beam.transforms.display import DisplayData
-from apache_beam.transforms.display_test import DisplayDataItemMatcher
-
-from apache_beam.options.value_provider import StaticValueProvider
-
-
-# TODO: Refactor code so all io tests are using same library
-# TestCaseWithTempDirCleanup class.
-class _TestCaseWithTempDirCleanUp(unittest.TestCase):
- """Base class for TestCases that deals with TempDir clean-up.
-
- Inherited test cases will call self._new_tempdir() to start a temporary dir
- which will be deleted at the end of the tests (when tearDown() is called).
- """
-
- def setUp(self):
- self._tempdirs = []
-
- def tearDown(self):
- for path in self._tempdirs:
- if os.path.exists(path):
- shutil.rmtree(path)
- self._tempdirs = []
-
- def _new_tempdir(self):
- result = tempfile.mkdtemp()
- self._tempdirs.append(result)
- return result
-
- def _create_temp_file(self, name='', suffix=''):
- if not name:
- name = tempfile.template
- file_name = tempfile.NamedTemporaryFile(
- delete=False, prefix=name,
- dir=self._new_tempdir(), suffix=suffix).name
- return file_name
-
-
-class MyFileSink(fileio.FileSink):
-
- def open(self, temp_path):
- # TODO: Fix main session pickling.
- # file_handle = super(MyFileSink, self).open(temp_path)
- file_handle = fileio.FileSink.open(self, temp_path)
- file_handle.write('[start]')
- return file_handle
-
- def write_encoded_record(self, file_handle, encoded_value):
- file_handle.write('[')
- file_handle.write(encoded_value)
- file_handle.write(']')
-
- def close(self, file_handle):
- file_handle.write('[end]')
- # TODO: Fix main session pickling.
- # file_handle = super(MyFileSink, self).close(file_handle)
- file_handle = fileio.FileSink.close(self, file_handle)
-
-
-class TestFileSink(_TestCaseWithTempDirCleanUp):
-
- def test_file_sink_writing(self):
- temp_path = os.path.join(self._new_tempdir(), 'filesink')
- sink = MyFileSink(
- temp_path, file_name_suffix='.output', coder=coders.ToStringCoder())
-
- # Manually invoke the generic Sink API.
- init_token = sink.initialize_write()
-
- writer1 = sink.open_writer(init_token, '1')
- writer1.write('a')
- writer1.write('b')
- res1 = writer1.close()
-
- writer2 = sink.open_writer(init_token, '2')
- writer2.write('x')
- writer2.write('y')
- writer2.write('z')
- res2 = writer2.close()
-
- _ = list(sink.finalize_write(init_token, [res1, res2]))
- # Retry the finalize operation (as if the first attempt was lost).
- res = list(sink.finalize_write(init_token, [res1, res2]))
-
- # Check the results.
- shard1 = temp_path + '-00000-of-00002.output'
- shard2 = temp_path + '-00001-of-00002.output'
- self.assertEqual(res, [shard1, shard2])
- self.assertEqual(open(shard1).read(), '[start][a][b][end]')
- self.assertEqual(open(shard2).read(), '[start][x][y][z][end]')
-
- # Check that any temp files are deleted.
- self.assertItemsEqual([shard1, shard2], glob.glob(temp_path + '*'))
-
- def test_file_sink_display_data(self):
- temp_path = os.path.join(self._new_tempdir(), 'display')
- sink = MyFileSink(
- temp_path, file_name_suffix='.output', coder=coders.ToStringCoder())
- dd = DisplayData.create_from(sink)
- expected_items = [
- DisplayDataItemMatcher(
- 'compression', 'auto'),
- DisplayDataItemMatcher(
- 'file_pattern',
- '{}{}'.format(
- temp_path,
- '-%(shard_num)05d-of-%(num_shards)05d.output'))]
- hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
-
- def test_empty_write(self):
- temp_path = tempfile.NamedTemporaryFile().name
- sink = MyFileSink(
- temp_path, file_name_suffix='.output', coder=coders.ToStringCoder()
- )
- p = TestPipeline()
- p | beam.Create([]) | beam.io.Write(sink) # pylint: disable=expression-not-assigned
- p.run()
- self.assertEqual(
- open(temp_path + '-00000-of-00001.output').read(), '[start][end]')
-
- def test_static_value_provider_empty_write(self):
- temp_path = StaticValueProvider(value_type=str,
- value=tempfile.NamedTemporaryFile().name)
- sink = MyFileSink(
- temp_path,
- file_name_suffix=StaticValueProvider(value_type=str, value='.output'),
- coder=coders.ToStringCoder()
- )
- p = TestPipeline()
- p | beam.Create([]) | beam.io.Write(sink) # pylint: disable=expression-not-assigned
- p.run()
- self.assertEqual(
- open(temp_path.get() + '-00000-of-00001.output').read(), '[start][end]')
-
- def test_fixed_shard_write(self):
- temp_path = os.path.join(self._new_tempdir(), 'empty')
- sink = MyFileSink(
- temp_path,
- file_name_suffix='.output',
- num_shards=3,
- shard_name_template='_NN_SSS_',
- coder=coders.ToStringCoder())
- p = TestPipeline()
- p | beam.Create(['a', 'b']) | beam.io.Write(sink) # pylint: disable=expression-not-assigned
-
- p.run()
-
- concat = ''.join(
- open(temp_path + '_03_%03d_.output' % shard_num).read()
- for shard_num in range(3))
- self.assertTrue('][a][' in concat, concat)
- self.assertTrue('][b][' in concat, concat)
-
- # Not using 'test' in name so that 'nose' doesn't pick this as a test.
- def run_temp_dir_check(self, no_dir_path, dir_path, no_dir_root_path,
- dir_root_path, prefix, separator):
- def _get_temp_dir(file_path_prefix):
- sink = MyFileSink(
- file_path_prefix, file_name_suffix='.output',
- coder=coders.ToStringCoder())
- return sink.initialize_write()
-
- temp_dir = _get_temp_dir(no_dir_path)
- self.assertTrue(temp_dir.startswith(prefix))
- last_sep = temp_dir.rfind(separator)
- self.assertTrue(temp_dir[last_sep + 1:].startswith('beam-temp'))
-
- temp_dir = _get_temp_dir(dir_path)
- self.assertTrue(temp_dir.startswith(prefix))
- last_sep = temp_dir.rfind(separator)
- self.assertTrue(temp_dir[last_sep + 1:].startswith('beam-temp'))
-
- with self.assertRaises(ValueError):
- _get_temp_dir(no_dir_root_path)
-
- with self.assertRaises(ValueError):
- _get_temp_dir(dir_root_path)
-
- def test_temp_dir_gcs(self):
- try:
- self.run_temp_dir_check(
- 'gs://aaa/bbb', 'gs://aaa/bbb/', 'gs://aaa', 'gs://aaa/', 'gs://',
- '/')
- except ValueError:
- logging.debug('Ignoring test since GCP module is not installed')
-
- @mock.patch('apache_beam.io.localfilesystem.os')
- def test_temp_dir_local(self, filesystem_os_mock):
- # Here we test a unix-like mock file-system
- # (not really testing Unix or Windows since we mock the function of 'os'
- # module).
-
- def _fake_unix_split(path):
- sep = path.rfind('/')
- if sep < 0:
- raise ValueError('Path must contain a separator')
- return (path[:sep], path[sep + 1:])
-
- def _fake_unix_join(base, path):
- return base + '/' + path
-
- filesystem_os_mock.path.abspath = lambda a: a
- filesystem_os_mock.path.split.side_effect = _fake_unix_split
- filesystem_os_mock.path.join.side_effect = _fake_unix_join
- self.run_temp_dir_check(
- '/aaa/bbb', '/aaa/bbb/', '/', '/', '/', '/')
-
- def test_file_sink_multi_shards(self):
- temp_path = os.path.join(self._new_tempdir(), 'multishard')
- sink = MyFileSink(
- temp_path, file_name_suffix='.output', coder=coders.ToStringCoder())
-
- # Manually invoke the generic Sink API.
- init_token = sink.initialize_write()
-
- num_shards = 1000
- writer_results = []
- for i in range(num_shards):
- uuid = 'uuid-%05d' % i
- writer = sink.open_writer(init_token, uuid)
- writer.write('a')
- writer.write('b')
- writer.write(uuid)
- writer_results.append(writer.close())
-
- res_first = list(sink.finalize_write(init_token, writer_results))
- # Retry the finalize operation (as if the first attempt was lost).
- res_second = list(sink.finalize_write(init_token, writer_results))
-
- self.assertItemsEqual(res_first, res_second)
-
- res = sorted(res_second)
- for i in range(num_shards):
- shard_name = '%s-%05d-of-%05d.output' % (temp_path, i, num_shards)
- uuid = 'uuid-%05d' % i
- self.assertEqual(res[i], shard_name)
- self.assertEqual(
- open(shard_name).read(), ('[start][a][b][%s][end]' % uuid))
-
- # Check that any temp files are deleted.
- self.assertItemsEqual(res, glob.glob(temp_path + '*'))
-
- def test_file_sink_io_error(self):
- temp_path = os.path.join(self._new_tempdir(), 'ioerror')
- sink = MyFileSink(
- temp_path, file_name_suffix='.output', coder=coders.ToStringCoder())
-
- # Manually invoke the generic Sink API.
- init_token = sink.initialize_write()
-
- writer1 = sink.open_writer(init_token, '1')
- writer1.write('a')
- writer1.write('b')
- res1 = writer1.close()
-
- writer2 = sink.open_writer(init_token, '2')
- writer2.write('x')
- writer2.write('y')
- writer2.write('z')
- res2 = writer2.close()
-
- os.remove(res2)
- with self.assertRaises(Exception):
- list(sink.finalize_write(init_token, [res1, res2]))
-
-
-if __name__ == '__main__':
- logging.getLogger().setLevel(logging.INFO)
- unittest.main()
http://git-wip-us.apache.org/repos/asf/beam/blob/717ab8c1/sdks/python/apache_beam/io/gcp/gcsio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py
index 774ee54..7e21586 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio.py
@@ -257,9 +257,9 @@ class GcsIO(object):
self.client.objects.Copy(request)
except HttpError as http_error:
if http_error.status_code == 404:
- # This is a permanent error that should not be retried. Note that
- # FileSink.finalize_write expects an IOError when the source file does
- # not exist.
+ # This is a permanent error that should not be retried. Note that
+ # FileBasedSink.finalize_write expects an IOError when the source
+ # file does not exist.
raise GcsIOError(errno.ENOENT, 'Source file not found: %s' % src)
raise
http://git-wip-us.apache.org/repos/asf/beam/blob/717ab8c1/sdks/python/apache_beam/io/iobase.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py
index a80b12f..7e40d83 100644
--- a/sdks/python/apache_beam/io/iobase.py
+++ b/sdks/python/apache_beam/io/iobase.py
@@ -562,7 +562,9 @@ class RangeTracker(object):
class Sink(HasDisplayData):
- """A resource that can be written to using the ``beam.io.Write`` transform.
+ """This class is deprecated, no backwards-compatibility guarantees.
+
+ A resource that can be written to using the ``beam.io.Write`` transform.
Here ``beam`` stands for Apache Beam Python code imported in following manner.
``import apache_beam as beam``.
@@ -594,8 +596,8 @@ class Sink(HasDisplayData):
single record from the bundle and ``close()`` which is called once
at the end of writing a bundle.
- See also ``apache_beam.io.fileio.FileSink`` which provides a simpler API
- for writing sinks that produce files.
+ See also ``apache_beam.io.filebasedsink.FileBasedSink`` which provides a
+ simpler API for writing sinks that produce files.
**Execution of the Write transform**
@@ -759,7 +761,9 @@ class Sink(HasDisplayData):
class Writer(object):
- """Writes a bundle of elements from a ``PCollection`` to a sink.
+ """This class is deprecated, no backwards-compatibility guarantees.
+
+ Writes a bundle of elements from a ``PCollection`` to a sink.
A Writer ``iobase.Writer.write()`` writes and elements to the sink while
``iobase.Writer.close()`` is called after all elements in the bundle have been
http://git-wip-us.apache.org/repos/asf/beam/blob/717ab8c1/sdks/python/apache_beam/io/textio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/textio.py b/sdks/python/apache_beam/io/textio.py
index d43f4fc..eeefaf6 100644
--- a/sdks/python/apache_beam/io/textio.py
+++ b/sdks/python/apache_beam/io/textio.py
@@ -23,7 +23,7 @@ import logging
from apache_beam.coders import coders
from apache_beam.io import filebasedsource
-from apache_beam.io import fileio
+from apache_beam.io import filebasedsink
from apache_beam.io import iobase
from apache_beam.io.filesystem import CompressionTypes
from apache_beam.io.iobase import Read
@@ -262,7 +262,7 @@ class _TextSource(filebasedsource.FileBasedSource):
sep_bounds[1] - record_start_position_in_buffer)
-class _TextSink(fileio.FileSink):
+class _TextSink(filebasedsink.FileBasedSink):
"""A sink to a GCS or local text file or files."""
def __init__(self,
http://git-wip-us.apache.org/repos/asf/beam/blob/717ab8c1/sdks/python/apache_beam/io/tfrecordio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/tfrecordio.py b/sdks/python/apache_beam/io/tfrecordio.py
index 2f7f4dc..a8cd1ce 100644
--- a/sdks/python/apache_beam/io/tfrecordio.py
+++ b/sdks/python/apache_beam/io/tfrecordio.py
@@ -23,7 +23,7 @@ import struct
from apache_beam import coders
from apache_beam.io import filebasedsource
-from apache_beam.io import fileio
+from apache_beam.io import filebasedsink
from apache_beam.io.filesystem import CompressionTypes
from apache_beam.io.iobase import Read
from apache_beam.io.iobase import Write
@@ -210,7 +210,7 @@ class ReadFromTFRecord(PTransform):
return pvalue.pipeline | Read(self._source)
-class _TFRecordSink(fileio.FileSink):
+class _TFRecordSink(filebasedsink.FileBasedSink):
"""Sink for writing TFRecords files.
For detailed TFRecord format description see:
@@ -242,7 +242,7 @@ class WriteToTFRecord(PTransform):
coder=coders.BytesCoder(),
file_name_suffix='',
num_shards=0,
- shard_name_template=fileio.DEFAULT_SHARD_NAME_TEMPLATE,
+ shard_name_template=filebasedsink.DEFAULT_SHARD_NAME_TEMPLATE,
compression_type=CompressionTypes.AUTO,
**kwargs):
"""Initialize WriteToTFRecord transform.
http://git-wip-us.apache.org/repos/asf/beam/blob/717ab8c1/sdks/python/apache_beam/testing/pipeline_verifiers.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/testing/pipeline_verifiers.py b/sdks/python/apache_beam/testing/pipeline_verifiers.py
index a08eb54..883343a 100644
--- a/sdks/python/apache_beam/testing/pipeline_verifiers.py
+++ b/sdks/python/apache_beam/testing/pipeline_verifiers.py
@@ -81,8 +81,8 @@ def retry_on_io_error_and_server_error(exception):
class FileChecksumMatcher(BaseMatcher):
"""Matcher that verifies file(s) content by comparing file checksum.
- Use apache_beam.io.fileio to fetch file(s) from given path. File checksum
- is a hash string computed from content of file(s).
+ Use apache_beam.io.filebasedsink to fetch file(s) from given path.
+ File checksum is a hash string computed from content of file(s).
"""
def __init__(self, file_path, expected_checksum, sleep_secs=None):
[2/2] beam git commit: This closes #3112
Posted by al...@apache.org.
This closes #3112
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/905ebccf
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/905ebccf
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/905ebccf
Branch: refs/heads/release-2.0.0
Commit: 905ebccf257f74ce73b4af05611492ef3ee7f3fc
Parents: da2476d 717ab8c
Author: Ahmet Altay <al...@google.com>
Authored: Thu May 11 18:20:06 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Thu May 11 18:20:06 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/io/__init__.py | 2 +-
sdks/python/apache_beam/io/avroio.py | 4 +-
sdks/python/apache_beam/io/filebasedsink.py | 299 ++++++++++++++++++
.../python/apache_beam/io/filebasedsink_test.py | 303 ++++++++++++++++++
sdks/python/apache_beam/io/fileio.py | 304 -------------------
sdks/python/apache_beam/io/fileio_test.py | 303 ------------------
sdks/python/apache_beam/io/gcp/gcsio.py | 6 +-
sdks/python/apache_beam/io/iobase.py | 12 +-
sdks/python/apache_beam/io/textio.py | 4 +-
sdks/python/apache_beam/io/tfrecordio.py | 6 +-
.../apache_beam/testing/pipeline_verifiers.py | 4 +-
11 files changed, 623 insertions(+), 624 deletions(-)
----------------------------------------------------------------------