You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/08/19 14:48:29 UTC

[2/2] incubator-beam git commit: Move native TextFileWriter to use GcsIO for writing

Move native TextFileWriter to use GcsIO for writing


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7e50b3f1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7e50b3f1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7e50b3f1

Branch: refs/heads/python-sdk
Commit: 7e50b3f1a5da0223576b51570713b7c72750f9f2
Parents: a3de9fb
Author: Charles Chen <cc...@google.com>
Authored: Thu Aug 18 01:01:06 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Aug 19 07:48:23 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/fileio.py | 31 +++----------------------------
 1 file changed, 3 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7e50b3f1/sdks/python/apache_beam/io/fileio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py
index a6ce26a..bfa246f 100644
--- a/sdks/python/apache_beam/io/fileio.py
+++ b/sdks/python/apache_beam/io/fileio.py
@@ -25,7 +25,6 @@ from multiprocessing.pool import ThreadPool
 import os
 import re
 import shutil
-import tempfile
 import threading
 import time
 import zlib
@@ -34,8 +33,6 @@ import weakref
 from apache_beam import coders
 from apache_beam.io import iobase
 from apache_beam.io import range_trackers
-from apache_beam.utils import processes
-from apache_beam.utils import retry
 
 
 __all__ = ['TextFileSource', 'TextFileSink']
@@ -43,26 +40,6 @@ __all__ = ['TextFileSource', 'TextFileSink']
 DEFAULT_SHARD_NAME_TEMPLATE = '-SSSSS-of-NNNNN'
 
 
-# Retrying is needed because there are transient errors that can happen.
-@retry.with_exponential_backoff(num_retries=4, retry_filter=lambda _: True)
-def _gcs_file_copy(from_path, to_path, encoding=''):
-  """Copy a local file to a GCS location with retries for transient errors."""
-  if not encoding:
-    command_args = ['gsutil', '-m', '-q', 'cp', from_path, to_path]
-  else:
-    encoding = 'Content-Type:' + encoding
-    command_args = ['gsutil', '-m', '-q', '-h', encoding, 'cp', from_path,
-                    to_path]
-  logging.info('Executing command: %s', command_args)
-  popen = processes.Popen(command_args, stdout=processes.PIPE,
-                          stderr=processes.PIPE)
-  stdoutdata, stderrdata = popen.communicate()
-  if popen.returncode != 0:
-    raise ValueError(
-        'Failed to copy GCS file from %s to %s (stdout=%s, stderr=%s).' % (
-            from_path, to_path, stdoutdata, stderrdata))
-
-
 # -----------------------------------------------------------------------------
 # TextFileSource, TextFileSink.
 
@@ -901,17 +878,15 @@ class TextFileWriter(iobase.NativeSinkWriter):
 
   def __enter__(self):
     if self.sink.is_gcs_sink:
-      # TODO(silviuc): Use the storage library instead of gsutil for writes.
-      self.temp_path = os.path.join(tempfile.mkdtemp(), 'gcsfile')
-      self._file = open(self.temp_path, 'wb')
+      # pylint: disable=wrong-import-order, wrong-import-position
+      from apache_beam.io import gcsio
+      self._file = gcsio.GcsIO().open(self.sink.file_path, 'wb')
     else:
       self._file = open(self.sink.file_path, 'wb')
     return self
 
   def __exit__(self, exception_type, exception_value, traceback):
     self._file.close()
-    if hasattr(self, 'temp_path'):
-      _gcs_file_copy(self.temp_path, self.sink.file_path, 'text/plain')
 
   def Write(self, line):
     self._file.write(self.sink.coder.encode(line))