You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bh...@apache.org on 2021/10/14 17:56:04 UTC

[samza] branch master updated: SAMZA-2699: AzureBlob SystemProducer: remove retries when uploading block as azure sdk does internal retries (#1542)

This is an automated email from the ASF dual-hosted git repository.

bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 45c0aa9  SAMZA-2699: AzureBlob SystemProducer: remove retries when uploading block as azure sdk does internal retries (#1542)
45c0aa9 is described below

commit 45c0aa998626bb101eb03ea9e4e2f589dd67753d
Author: lakshmi-manasa-g <mg...@linkedin.com>
AuthorDate: Thu Oct 14 10:53:34 2021 -0700

    SAMZA-2699: AzureBlob SystemProducer: remove retries when uploading block as azure sdk does internal retries (#1542)
    
    Symptom: interrupted thread stays stuck for a long time when interrupted exception is wrapped in another exception.
    
    Cause: retry several times for all exceptions during block upload. though InterruptedException was immediately bubbled up in #1511, when wrapped it is not bubbled.
    
    Changes: remove retries at samza level since azure sdk does the retries.
    Samza uses BlobServiceClientBuilder which uses RequestRetryOptions as the default retry policy and the RequestRetryOptions sets max retries to 4 with exponential backoff.
---
 .../azureblob/avro/AzureBlobOutputStream.java      | 36 +++++++---------------
 1 file changed, 11 insertions(+), 25 deletions(-)

diff --git a/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobOutputStream.java b/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobOutputStream.java
index 222b99c..8ca97e0 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobOutputStream.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobOutputStream.java
@@ -69,7 +69,6 @@ import reactor.core.publisher.Flux;
 public class AzureBlobOutputStream extends OutputStream {
 
   private static final Logger LOG = LoggerFactory.getLogger(AzureBlobOutputStream.class);
-  private static final int MAX_ATTEMPT = 3;
   private static final int MAX_BLOCKS_IN_AZURE_BLOB = 50000;
   private final long flushTimeoutMs;
   private final BlockBlobAsyncClient blobAsyncClient;
@@ -322,33 +321,20 @@ public class AzureBlobOutputStream extends OutputStream {
       // call async stageblock and add to future
       @Override
       public void run() {
-        int attemptCount = 0;
         byte[] compressedLocalByte = compression.compress(localByte);
         int blockSize = compressedLocalByte.length;
 
-        while (attemptCount < MAX_ATTEMPT) {
-          try {
-            ByteBuffer outputStream = ByteBuffer.wrap(compressedLocalByte, 0, blockSize);
-            metrics.updateCompressByteMetrics(blockSize);
-            LOG.info("{} Upload block start for blob: {} for block size:{}.", blobAsyncClient.getBlobUrl().toString(), blockId, blockSize);
-            metrics.updateAzureUploadMetrics();
-            // StageBlock generates exception on Failure.
-            stageBlock(blockIdEncoded, outputStream, blockSize);
-            break;
-          } catch (InterruptedException e) {
-            String msg = String.format("Upload block for blob: %s failed for blockid: %s due to InterruptedException.",
-                blobAsyncClient.getBlobUrl().toString(), blockId);
-            LOG.error(msg, e);
-            throw new AzureException("InterruptedException encountered during block upload. Will not retry.", e);
-          } catch (Exception e) {
-            attemptCount += 1;
-            String msg = "Upload block for blob: " + blobAsyncClient.getBlobUrl().toString()
-                + " failed for blockid: " + blockId + " due to exception. AttemptCount: " + attemptCount;
-            LOG.error(msg, e);
-            if (attemptCount == MAX_ATTEMPT) {
-              throw new AzureException("Exceeded number of attempts. Max attempts is: " + MAX_ATTEMPT, e);
-            }
-          }
+        try {
+          ByteBuffer outputStream = ByteBuffer.wrap(compressedLocalByte, 0, blockSize);
+          metrics.updateCompressByteMetrics(blockSize);
+          LOG.info("{} Upload block start for blob: {} for block size:{}.", blobAsyncClient.getBlobUrl().toString(), blockId, blockSize);
+          metrics.updateAzureUploadMetrics();
+          // StageBlock generates exception on Failure.
+          stageBlock(blockIdEncoded, outputStream, blockSize);
+        } catch (Exception e) {
+          String msg = String.format("Upload block for blob: %s failed for blockid: %s.", blobAsyncClient.getBlobUrl().toString(), blockId);
+          LOG.error(msg, e);
+          throw new AzureException(msg, e);
         }
       }
     }, blobThreadPool);