You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/01/05 17:46:00 UTC

[jira] [Commented] (BEAM-2549) gcsio should set timeouts for http requests

    [ https://issues.apache.org/jira/browse/BEAM-2549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16313526#comment-16313526 ] 

ASF GitHub Bot commented on BEAM-2549:
--------------------------------------

aaltay closed pull request #4324: [BEAM-2549] Remove Queue based 60 seconds timeout for GCS io.
URL: https://github.com/apache/beam/pull/4324
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py
index 68ca0265601..d5657e7ca3e 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio.py
@@ -26,7 +26,6 @@
 import logging
 import multiprocessing
 import os
-import Queue
 import re
 import threading
 import time
@@ -427,14 +426,12 @@ def __init__(self,
                client,
                path,
                mode='r',
-               buffer_size=DEFAULT_READ_BUFFER_SIZE,
-               segment_timeout=DEFAULT_READ_SEGMENT_TIMEOUT_SECONDS):
+               buffer_size=DEFAULT_READ_BUFFER_SIZE):
     self.client = client
     self.path = path
     self.bucket, self.name = parse_gcs_path(path)
     self.mode = mode
     self.buffer_size = buffer_size
-    self.segment_timeout = segment_timeout
 
     # Get object state.
     self.get_request = (storage.StorageObjectsGetRequest(
@@ -575,47 +572,18 @@ def _fetch_next_if_buffer_exhausted(self):
         self.buffer_start_position + len(self.buffer) <= self.position):
       bytes_to_request = min(self._remaining(), self.buffer_size)
       self.buffer_start_position = self.position
-      retry_count = 0
-      while retry_count <= 10:
-        queue = Queue.Queue()
-        t = threading.Thread(target=self._fetch_to_queue,
-                             args=(queue, self._get_segment,
-                                   (self.position, bytes_to_request)))
-        t.daemon = True
-        t.start()
-        try:
-          result, exn, tb = queue.get(timeout=self.segment_timeout)
-        except Queue.Empty:
-          logging.warning(
-              ('Timed out fetching %d bytes from position %d of %s after %f '
-               'seconds; retrying...'), bytes_to_request, self.position,
-              self.path, self.segment_timeout)
-          retry_count += 1
-          # Reinitialize download objects.
-          self.download_stream = cStringIO.StringIO()
-          self.downloader = transfer.Download(
-              self.download_stream, auto_transfer=False,
-              chunksize=self.buffer_size)
-          self.client.objects.Get(self.get_request, download=self.downloader)
-          continue
-        if exn:
-          logging.error(
-              ('Exception while fetching %d bytes from position %d of %s: '
-               '%s\n%s'),
-              bytes_to_request, self.position, self.path, exn, tb)
-          raise exn
-        self.buffer = result
-        return
-      raise GcsIOError(
-          'Reached retry limit for _fetch_next_if_buffer_exhausted.')
+      try:
+        result = self._get_segment(self.position, bytes_to_request)
+      except Exception as e:  # pylint: disable=broad-except
+        tb = traceback.format_exc()
+        logging.error(
+            ('Exception while fetching %d bytes from position %d of %s: '
+             '%s\n%s'),
+            bytes_to_request, self.position, self.path, e, tb)
+        raise
 
-  def _fetch_to_queue(self, queue, func, args):
-    try:
-      value = func(*args)
-      queue.put((value, None, None))
-    except Exception as e:  # pylint: disable=broad-except
-      tb = traceback.format_exc()
-      queue.put((None, e, tb))
+      self.buffer = result
+      return
 
   def _remaining(self):
     return self.size - self.position
diff --git a/sdks/python/apache_beam/io/gcp/gcsio_test.py b/sdks/python/apache_beam/io/gcp/gcsio_test.py
index 06a82272900..6994c523032 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio_test.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio_test.py
@@ -22,7 +22,6 @@
 import os
 import random
 import threading
-import time
 import unittest
 
 import httplib2
@@ -436,43 +435,6 @@ def test_full_file_read(self):
     f.seek(0)
     self.assertEqual(f.read(), random_file.contents)
 
-  def test_flaky_file_read(self):
-    file_name = 'gs://gcsio-test/flaky_file'
-    file_size = 5 * 1024 * 1024 + 100
-    random_file = self._insert_random_file(self.client, file_name, file_size)
-    f = self.gcs.open(file_name)
-    random.seed(0)
-    f.buffer_size = 1024 * 1024
-    f.segment_timeout = 0.01
-    self.assertEqual(f.mode, 'r')
-    f._real_get_segment = f._get_segment
-
-    def flaky_get_segment(start, size):
-      if random.randint(0, 3) == 1:
-        time.sleep(600)
-      return f._real_get_segment(start, size)
-
-    f._get_segment = flaky_get_segment
-    self.assertEqual(f.read(), random_file.contents)
-
-    # Test exception handling in file read.
-    def failing_get_segment(unused_start, unused_size):
-      raise IOError('Could not read.')
-
-    f._get_segment = failing_get_segment
-    f.seek(0)
-    with self.assertRaises(IOError):
-      f.read()
-
-    # Test retry limit in hanging file read.
-    def hanging_get_segment(unused_start, unused_size):
-      time.sleep(600)
-
-    f._get_segment = hanging_get_segment
-    f.seek(0)
-    with self.assertRaises(gcsio.GcsIOError):
-      f.read()
-
   def test_file_random_seek(self):
     file_name = 'gs://gcsio-test/seek_file'
     file_size = 5 * 1024 * 1024 - 100


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> gcsio should set timeouts for http requests
> -------------------------------------------
>
>                 Key: BEAM-2549
>                 URL: https://issues.apache.org/jira/browse/BEAM-2549
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>            Reporter: Ahmet Altay
>            Assignee: Ahmet Altay
>
> Use an http client with timeout value for apitool requests. Once this is, it is also possible to simplify gcs read operations (i.e. remove the custom solution for handling timeouts.)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)