You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/07/10 04:49:40 UTC

[10/43] beam git commit: For GCS operations use an http client with a default timeout value.

For GCS operations use an http client with a default timeout value.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/68f1fb64
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/68f1fb64
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/68f1fb64

Branch: refs/heads/gearpump-runner
Commit: 68f1fb64fd2565e287e322d715ca778d01e7137b
Parents: 0bd47c0
Author: Ahmet Altay <al...@google.com>
Authored: Fri Jun 30 17:37:33 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Tue Jul 4 11:07:25 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/gcp/gcsio.py | 10 +++++++++-
 1 file changed, 9 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/68f1fb64/sdks/python/apache_beam/io/gcp/gcsio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py
index d43c8ba..643fbc7 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio.py
@@ -31,6 +31,7 @@ import re
 import threading
 import time
 import traceback
+import httplib2
 
 from apache_beam.utils import retry
 
@@ -68,6 +69,10 @@ except ImportError:
 # +---------------+------------+-------------+-------------+-------------+
 DEFAULT_READ_BUFFER_SIZE = 16 * 1024 * 1024
 
+# This is the number of seconds the library will wait for GCS operations to
+# complete.
+DEFAULT_HTTP_TIMEOUT_SECONDS = 60
+
 # This is the number of seconds the library will wait for a partial-file read
 # operation from GCS to complete before retrying.
 DEFAULT_READ_SEGMENT_TIMEOUT_SECONDS = 60
@@ -99,6 +104,7 @@ class GcsIO(object):
 
   def __new__(cls, storage_client=None):
     if storage_client:
+      # This path is only used for testing.
       return super(GcsIO, cls).__new__(cls, storage_client)
     else:
       # Create a single storage client for each thread.  We would like to avoid
@@ -108,7 +114,9 @@ class GcsIO(object):
       local_state = threading.local()
       if getattr(local_state, 'gcsio_instance', None) is None:
         credentials = auth.get_service_credentials()
-        storage_client = storage.StorageV1(credentials=credentials)
+        storage_client = storage.StorageV1(
+            credentials=credentials,
+            http=httplib2.Http(timeout=DEFAULT_HTTP_TIMEOUT_SECONDS))
         local_state.gcsio_instance = (
             super(GcsIO, cls).__new__(cls, storage_client))
         local_state.gcsio_instance.client = storage_client