You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/12/14 09:57:18 UTC

[GitHub] [beam] phoerious commented on a change in pull request #15931: Make S3 streaming more efficient

phoerious commented on a change in pull request #15931:
URL: https://github.com/apache/beam/pull/15931#discussion_r768491540



##########
File path: sdks/python/apache_beam/io/aws/clients/s3/boto3_client.py
##########
@@ -92,27 +99,65 @@ def get_object_metadata(self, request):
         boto_response['ContentLength'],
         boto_response['ContentType'])
 
+    item.size = max(item.size - self._download_offset, 0)
     return item
 
+  def get_stream(self, request, start):
+    """Opens a stream object starting at the given position.
+
+    Args:
+      request: (GetRequest) request
+      start: (int) start offset
+    Returns:
+      (Stream) Boto3 stream object.
+    """
+
+    if self._download_request and (
+        start != self._download_pos or
+        request.bucket != self._download_request.bucket or
+        request.object != self._download_request.object):
+      self._download_stream.close()
+      self._download_stream = None
+
+    # noinspection PyProtectedMember
+    if not self._download_stream or self._download_stream._raw_stream.closed:
+      try:
+        self._download_stream = self.client.get_object(
+            Bucket=request.bucket,
+            Key=request.object,
+            Range='bytes={}-'.format(start + self._download_offset))['Body']
+        self._download_request = request
+        self._download_pos = start
+      except Exception as e:
+        message = e.response['Error'].get(
+            'Message', e.response['Error'].get('Code', ''))
+        code = e.response['ResponseMetadata']['HTTPStatusCode']
+        raise messages.S3ClientError(message, code)
+
+    return self._download_stream
+
   def get_range(self, request, start, end):
     r"""Retrieves an object's contents.
 
       Args:
         request: (GetRequest) request
+        start: (int) start offset
+        end: (int) end offset
       Returns:
         (bytes) The response message.
       """
-    try:
-      boto_response = self.client.get_object(
-          Bucket=request.bucket,
-          Key=request.object,
-          Range='bytes={}-{}'.format(start, end - 1))
-    except Exception as e:
-      message = e.response['Error']['Message']
-      code = e.response['ResponseMetadata']['HTTPStatusCode']
-      raise messages.S3ClientError(message, code)
-
-    return boto_response['Body'].read()  # A bytes object
+    for i in range(self._retries):

Review comment:
       Good point. Done.
   
   The main reason for the retry here is that long-lived connections tend to close intermittently if the reads do not occur at a fast and constant rate (I regularly have issues with that when reading and processing records from a WARC file), so I want to retry at least once. The first retry should occur pretty much instantly, so I opted to give `with_exponential_backoff` a very small initial delay.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org