You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/07/15 01:15:47 UTC
[1/2] incubator-beam git commit: Handle HttpError in GCS upload thread
Repository: incubator-beam
Updated Branches:
refs/heads/python-sdk a1a51c3c1 -> d898d56ae
Handle HttpError in GCS upload thread
* break connection to the main thread and propagate the exception.
* Retry in auth _refresh() to guard against temporary errors in the
metadata service.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a4267d26
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a4267d26
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a4267d26
Branch: refs/heads/python-sdk
Commit: a4267d264395706f12479aa876501a62d5b679b7
Parents: a1a51c3
Author: Ahmet Altay <al...@google.com>
Authored: Fri Jul 8 16:28:07 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Jul 14 18:15:34 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/internal/auth.py | 2 ++
sdks/python/apache_beam/io/gcsio.py | 23 ++++++++++++++++++++---
2 files changed, 22 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a4267d26/sdks/python/apache_beam/internal/auth.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/internal/auth.py b/sdks/python/apache_beam/internal/auth.py
index 0081970..f324a2d 100644
--- a/sdks/python/apache_beam/internal/auth.py
+++ b/sdks/python/apache_beam/internal/auth.py
@@ -82,6 +82,8 @@ class GCEMetadataCredentials(OAuth2Credentials):
None, # token_uri
user_agent)
+ @retry.with_exponential_backoff(
+ retry_filter=retry.retry_on_server_errors_and_timeout_filter)
def _refresh(self, http_request):
refresh_time = datetime.datetime.now()
req = urllib2.Request('http://metadata.google.internal/computeMetadata/v1/'
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a4267d26/sdks/python/apache_beam/io/gcsio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcsio.py b/sdks/python/apache_beam/io/gcsio.py
index a01988b..c61f251 100644
--- a/sdks/python/apache_beam/io/gcsio.py
+++ b/sdks/python/apache_beam/io/gcsio.py
@@ -533,6 +533,7 @@ class GcsBufferedWriter(object):
# Set up communication with uploading thread.
parent_conn, child_conn = multiprocessing.Pipe()
+ self.child_conn = child_conn
self.conn = parent_conn
# Set up uploader.
@@ -547,6 +548,7 @@ class GcsBufferedWriter(object):
# Start uploading thread.
self.upload_thread = threading.Thread(target=self._start_upload)
self.upload_thread.daemon = True
+ self.upload_thread.last_error = None
self.upload_thread.start()
# TODO(silviuc): Refactor so that retry logic can be applied.
@@ -560,7 +562,15 @@ class GcsBufferedWriter(object):
#
# The uploader by default transfers data in chunks of 1024 * 1024 bytes at
# a time, buffering writes until that size is reached.
- self.client.objects.Insert(self.insert_request, upload=self.upload)
+ try:
+ self.client.objects.Insert(self.insert_request, upload=self.upload)
+ except HttpError as http_error:
+ logging.error(
+ 'HTTP error while inserting file %s: %s', self.path, http_error)
+ self.upload_thread.last_error = http_error
+ raise
+ finally:
+ self.child_conn.close()
def write(self, data):
"""Write data to a GCS file.
@@ -574,8 +584,14 @@ class GcsBufferedWriter(object):
self._check_open()
if not data:
return
- self.conn.send_bytes(data)
- self.position += len(data)
+ try:
+ self.conn.send_bytes(data)
+ self.position += len(data)
+ except IOError:
+ if self.upload_thread.last_error:
+ raise self.upload_thread.last_error # pylint: disable=raising-bad-type
+ else:
+ raise
def tell(self):
"""Return the total number of bytes passed to write() so far."""
@@ -583,6 +599,7 @@ class GcsBufferedWriter(object):
def close(self):
"""Close the current GCS file."""
+ self.closed = True
self.conn.close()
self.upload_thread.join()
[2/2] incubator-beam git commit: Closes #617
Posted by dh...@apache.org.
Closes #617
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d898d56a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d898d56a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d898d56a
Branch: refs/heads/python-sdk
Commit: d898d56aeb73cf58ce0f8978603d163601147b73
Parents: a1a51c3 a4267d2
Author: Dan Halperin <dh...@google.com>
Authored: Thu Jul 14 18:15:35 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Jul 14 18:15:35 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/internal/auth.py | 2 ++
sdks/python/apache_beam/io/gcsio.py | 23 ++++++++++++++++++++---
2 files changed, 22 insertions(+), 3 deletions(-)
----------------------------------------------------------------------