You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/08/19 14:48:29 UTC
[2/2] incubator-beam git commit: Move native TextFileWriter to use
GcsIO for writing
Move native TextFileWriter to use GcsIO for writing
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7e50b3f1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7e50b3f1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7e50b3f1
Branch: refs/heads/python-sdk
Commit: 7e50b3f1a5da0223576b51570713b7c72750f9f2
Parents: a3de9fb
Author: Charles Chen <cc...@google.com>
Authored: Thu Aug 18 01:01:06 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Aug 19 07:48:23 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/io/fileio.py | 31 +++----------------------------
1 file changed, 3 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7e50b3f1/sdks/python/apache_beam/io/fileio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py
index a6ce26a..bfa246f 100644
--- a/sdks/python/apache_beam/io/fileio.py
+++ b/sdks/python/apache_beam/io/fileio.py
@@ -25,7 +25,6 @@ from multiprocessing.pool import ThreadPool
import os
import re
import shutil
-import tempfile
import threading
import time
import zlib
@@ -34,8 +33,6 @@ import weakref
from apache_beam import coders
from apache_beam.io import iobase
from apache_beam.io import range_trackers
-from apache_beam.utils import processes
-from apache_beam.utils import retry
__all__ = ['TextFileSource', 'TextFileSink']
@@ -43,26 +40,6 @@ __all__ = ['TextFileSource', 'TextFileSink']
DEFAULT_SHARD_NAME_TEMPLATE = '-SSSSS-of-NNNNN'
-# Retrying is needed because there are transient errors that can happen.
-@retry.with_exponential_backoff(num_retries=4, retry_filter=lambda _: True)
-def _gcs_file_copy(from_path, to_path, encoding=''):
- """Copy a local file to a GCS location with retries for transient errors."""
- if not encoding:
- command_args = ['gsutil', '-m', '-q', 'cp', from_path, to_path]
- else:
- encoding = 'Content-Type:' + encoding
- command_args = ['gsutil', '-m', '-q', '-h', encoding, 'cp', from_path,
- to_path]
- logging.info('Executing command: %s', command_args)
- popen = processes.Popen(command_args, stdout=processes.PIPE,
- stderr=processes.PIPE)
- stdoutdata, stderrdata = popen.communicate()
- if popen.returncode != 0:
- raise ValueError(
- 'Failed to copy GCS file from %s to %s (stdout=%s, stderr=%s).' % (
- from_path, to_path, stdoutdata, stderrdata))
-
-
# -----------------------------------------------------------------------------
# TextFileSource, TextFileSink.
@@ -901,17 +878,15 @@ class TextFileWriter(iobase.NativeSinkWriter):
def __enter__(self):
if self.sink.is_gcs_sink:
- # TODO(silviuc): Use the storage library instead of gsutil for writes.
- self.temp_path = os.path.join(tempfile.mkdtemp(), 'gcsfile')
- self._file = open(self.temp_path, 'wb')
+ # pylint: disable=wrong-import-order, wrong-import-position
+ from apache_beam.io import gcsio
+ self._file = gcsio.GcsIO().open(self.sink.file_path, 'wb')
else:
self._file = open(self.sink.file_path, 'wb')
return self
def __exit__(self, exception_type, exception_value, traceback):
self._file.close()
- if hasattr(self, 'temp_path'):
- _gcs_file_copy(self.temp_path, self.sink.file_path, 'text/plain')
def Write(self, line):
self._file.write(self.sink.coder.encode(line))