You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bh...@apache.org on 2021/10/14 17:56:04 UTC
[samza] branch master updated: SAMZA-2699: AzureBlob
SystemProducer: remove retries when uploading block as azure sdk does
internal retries (#1542)
This is an automated email from the ASF dual-hosted git repository.
bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 45c0aa9 SAMZA-2699: AzureBlob SystemProducer: remove retries when uploading block as azure sdk does internal retries (#1542)
45c0aa9 is described below
commit 45c0aa998626bb101eb03ea9e4e2f589dd67753d
Author: lakshmi-manasa-g <mg...@linkedin.com>
AuthorDate: Thu Oct 14 10:53:34 2021 -0700
SAMZA-2699: AzureBlob SystemProducer: remove retries when uploading block as azure sdk does internal retries (#1542)
Symptom: interrupted thread stays stuck for a long time when interrupted exception is wrapped in another exception.
Cause: retry several times for all exceptions during block upload. though InterruptedException was immediately bubbled up in #1511, when wrapped it is not bubbled.
Changes: remove retries at samza level since azure sdk does the retries.
Samza uses BlobServiceClientBuilder which uses RequestRetryOptions as the default retry policy and the RequestRetryOptions sets max retries to 4 with exponential backoff.
---
.../azureblob/avro/AzureBlobOutputStream.java | 36 +++++++---------------
1 file changed, 11 insertions(+), 25 deletions(-)
diff --git a/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobOutputStream.java b/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobOutputStream.java
index 222b99c..8ca97e0 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobOutputStream.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobOutputStream.java
@@ -69,7 +69,6 @@ import reactor.core.publisher.Flux;
public class AzureBlobOutputStream extends OutputStream {
private static final Logger LOG = LoggerFactory.getLogger(AzureBlobOutputStream.class);
- private static final int MAX_ATTEMPT = 3;
private static final int MAX_BLOCKS_IN_AZURE_BLOB = 50000;
private final long flushTimeoutMs;
private final BlockBlobAsyncClient blobAsyncClient;
@@ -322,33 +321,20 @@ public class AzureBlobOutputStream extends OutputStream {
// call async stageblock and add to future
@Override
public void run() {
- int attemptCount = 0;
byte[] compressedLocalByte = compression.compress(localByte);
int blockSize = compressedLocalByte.length;
- while (attemptCount < MAX_ATTEMPT) {
- try {
- ByteBuffer outputStream = ByteBuffer.wrap(compressedLocalByte, 0, blockSize);
- metrics.updateCompressByteMetrics(blockSize);
- LOG.info("{} Upload block start for blob: {} for block size:{}.", blobAsyncClient.getBlobUrl().toString(), blockId, blockSize);
- metrics.updateAzureUploadMetrics();
- // StageBlock generates exception on Failure.
- stageBlock(blockIdEncoded, outputStream, blockSize);
- break;
- } catch (InterruptedException e) {
- String msg = String.format("Upload block for blob: %s failed for blockid: %s due to InterruptedException.",
- blobAsyncClient.getBlobUrl().toString(), blockId);
- LOG.error(msg, e);
- throw new AzureException("InterruptedException encountered during block upload. Will not retry.", e);
- } catch (Exception e) {
- attemptCount += 1;
- String msg = "Upload block for blob: " + blobAsyncClient.getBlobUrl().toString()
- + " failed for blockid: " + blockId + " due to exception. AttemptCount: " + attemptCount;
- LOG.error(msg, e);
- if (attemptCount == MAX_ATTEMPT) {
- throw new AzureException("Exceeded number of attempts. Max attempts is: " + MAX_ATTEMPT, e);
- }
- }
+ try {
+ ByteBuffer outputStream = ByteBuffer.wrap(compressedLocalByte, 0, blockSize);
+ metrics.updateCompressByteMetrics(blockSize);
+ LOG.info("{} Upload block start for blob: {} for block size:{}.", blobAsyncClient.getBlobUrl().toString(), blockId, blockSize);
+ metrics.updateAzureUploadMetrics();
+ // StageBlock generates exception on Failure.
+ stageBlock(blockIdEncoded, outputStream, blockSize);
+ } catch (Exception e) {
+ String msg = String.format("Upload block for blob: %s failed for blockid: %s.", blobAsyncClient.getBlobUrl().toString(), blockId);
+ LOG.error(msg, e);
+ throw new AzureException(msg, e);
}
}
}, blobThreadPool);