You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/01/05 17:46:00 UTC
[jira] [Commented] (BEAM-2549) gcsio should set timeouts for http
requests
[ https://issues.apache.org/jira/browse/BEAM-2549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16313526#comment-16313526 ]
ASF GitHub Bot commented on BEAM-2549:
--------------------------------------
aaltay closed pull request #4324: [BEAM-2549] Remove Queue based 60 seconds timeout for GCS io.
URL: https://github.com/apache/beam/pull/4324
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py
index 68ca0265601..d5657e7ca3e 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio.py
@@ -26,7 +26,6 @@
import logging
import multiprocessing
import os
-import Queue
import re
import threading
import time
@@ -427,14 +426,12 @@ def __init__(self,
client,
path,
mode='r',
- buffer_size=DEFAULT_READ_BUFFER_SIZE,
- segment_timeout=DEFAULT_READ_SEGMENT_TIMEOUT_SECONDS):
+ buffer_size=DEFAULT_READ_BUFFER_SIZE):
self.client = client
self.path = path
self.bucket, self.name = parse_gcs_path(path)
self.mode = mode
self.buffer_size = buffer_size
- self.segment_timeout = segment_timeout
# Get object state.
self.get_request = (storage.StorageObjectsGetRequest(
@@ -575,47 +572,18 @@ def _fetch_next_if_buffer_exhausted(self):
self.buffer_start_position + len(self.buffer) <= self.position):
bytes_to_request = min(self._remaining(), self.buffer_size)
self.buffer_start_position = self.position
- retry_count = 0
- while retry_count <= 10:
- queue = Queue.Queue()
- t = threading.Thread(target=self._fetch_to_queue,
- args=(queue, self._get_segment,
- (self.position, bytes_to_request)))
- t.daemon = True
- t.start()
- try:
- result, exn, tb = queue.get(timeout=self.segment_timeout)
- except Queue.Empty:
- logging.warning(
- ('Timed out fetching %d bytes from position %d of %s after %f '
- 'seconds; retrying...'), bytes_to_request, self.position,
- self.path, self.segment_timeout)
- retry_count += 1
- # Reinitialize download objects.
- self.download_stream = cStringIO.StringIO()
- self.downloader = transfer.Download(
- self.download_stream, auto_transfer=False,
- chunksize=self.buffer_size)
- self.client.objects.Get(self.get_request, download=self.downloader)
- continue
- if exn:
- logging.error(
- ('Exception while fetching %d bytes from position %d of %s: '
- '%s\n%s'),
- bytes_to_request, self.position, self.path, exn, tb)
- raise exn
- self.buffer = result
- return
- raise GcsIOError(
- 'Reached retry limit for _fetch_next_if_buffer_exhausted.')
+ try:
+ result = self._get_segment(self.position, bytes_to_request)
+ except Exception as e: # pylint: disable=broad-except
+ tb = traceback.format_exc()
+ logging.error(
+ ('Exception while fetching %d bytes from position %d of %s: '
+ '%s\n%s'),
+ bytes_to_request, self.position, self.path, e, tb)
+ raise
- def _fetch_to_queue(self, queue, func, args):
- try:
- value = func(*args)
- queue.put((value, None, None))
- except Exception as e: # pylint: disable=broad-except
- tb = traceback.format_exc()
- queue.put((None, e, tb))
+ self.buffer = result
+ return
def _remaining(self):
return self.size - self.position
diff --git a/sdks/python/apache_beam/io/gcp/gcsio_test.py b/sdks/python/apache_beam/io/gcp/gcsio_test.py
index 06a82272900..6994c523032 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio_test.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio_test.py
@@ -22,7 +22,6 @@
import os
import random
import threading
-import time
import unittest
import httplib2
@@ -436,43 +435,6 @@ def test_full_file_read(self):
f.seek(0)
self.assertEqual(f.read(), random_file.contents)
- def test_flaky_file_read(self):
- file_name = 'gs://gcsio-test/flaky_file'
- file_size = 5 * 1024 * 1024 + 100
- random_file = self._insert_random_file(self.client, file_name, file_size)
- f = self.gcs.open(file_name)
- random.seed(0)
- f.buffer_size = 1024 * 1024
- f.segment_timeout = 0.01
- self.assertEqual(f.mode, 'r')
- f._real_get_segment = f._get_segment
-
- def flaky_get_segment(start, size):
- if random.randint(0, 3) == 1:
- time.sleep(600)
- return f._real_get_segment(start, size)
-
- f._get_segment = flaky_get_segment
- self.assertEqual(f.read(), random_file.contents)
-
- # Test exception handling in file read.
- def failing_get_segment(unused_start, unused_size):
- raise IOError('Could not read.')
-
- f._get_segment = failing_get_segment
- f.seek(0)
- with self.assertRaises(IOError):
- f.read()
-
- # Test retry limit in hanging file read.
- def hanging_get_segment(unused_start, unused_size):
- time.sleep(600)
-
- f._get_segment = hanging_get_segment
- f.seek(0)
- with self.assertRaises(gcsio.GcsIOError):
- f.read()
-
def test_file_random_seek(self):
file_name = 'gs://gcsio-test/seek_file'
file_size = 5 * 1024 * 1024 - 100
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
> gcsio should set timeouts for http requests
> -------------------------------------------
>
> Key: BEAM-2549
> URL: https://issues.apache.org/jira/browse/BEAM-2549
> Project: Beam
> Issue Type: Bug
> Components: sdk-py-core
> Reporter: Ahmet Altay
> Assignee: Ahmet Altay
>
> Use an http client with timeout value for apitool requests. Once this is, it is also possible to simplify gcs read operations (i.e. remove the custom solution for handling timeouts.)
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)