You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2021/10/02 01:04:16 UTC

[GitHub] [samza] lakshmi-manasa-g opened a new pull request #1542: SAMZA-2699: AzureBlob SystemProducer: remove retries when uploading block as azure sdk does internal retries

lakshmi-manasa-g opened a new pull request #1542:
URL: https://github.com/apache/samza/pull/1542


   Symptom: interrupted thread stays stuck for a long time when interrupted exception is wrapped in another exception.
   
   Cause: retry several times for all exceptions during block upload. though InterruptedException was immediately bubbled up in #1511, when wrapped it is not bubbled.
   
   Changes: remove retries at samza level since azure sdk does the retries.
   Samza uses  [BlobServiceClientBuilder](https://github.com/apache/samza/blob/master/samza-azure/src/main/java/org/apache/samza/system/azureblob/AzureBlobClientBuilder.java#L89) which uses [RequestRetryOptions](https://github.com/Azure/azure-sdk-for-java/blob/5d602a2d97d36c1b20fc56863313222897c032ed/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/BlobServiceClientBuilder.java#L72) as the default retry policy and the [RequestRetryOptions sets max retries to 4 with exponential backoff](https://github.com/Azure/azure-sdk-for-java/blob/3f31d68eed6fbe11516ca3afe3955c8840a6e974/sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/policy/RequestRetryOptions.java#L97).
   
   Tests: removing retry.
   API changes: None
   Usage/upgrade instructions: N/A
   Backwards compatible: yes


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] rmatharu commented on a change in pull request #1542: SAMZA-2699: AzureBlob SystemProducer: remove retries when uploading block as azure sdk does internal retries

Posted by GitBox <gi...@apache.org>.
rmatharu commented on a change in pull request #1542:
URL: https://github.com/apache/samza/pull/1542#discussion_r721571685



##########
File path: samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobOutputStream.java
##########
@@ -322,33 +321,20 @@ private synchronized void uploadBlockAsync() {
       // call async stageblock and add to future
       @Override
       public void run() {
-        int attemptCount = 0;
         byte[] compressedLocalByte = compression.compress(localByte);
         int blockSize = compressedLocalByte.length;
 
-        while (attemptCount < MAX_ATTEMPT) {
-          try {
-            ByteBuffer outputStream = ByteBuffer.wrap(compressedLocalByte, 0, blockSize);
-            metrics.updateCompressByteMetrics(blockSize);
-            LOG.info("{} Upload block start for blob: {} for block size:{}.", blobAsyncClient.getBlobUrl().toString(), blockId, blockSize);
-            metrics.updateAzureUploadMetrics();
-            // StageBlock generates exception on Failure.
-            stageBlock(blockIdEncoded, outputStream, blockSize);
-            break;
-          } catch (InterruptedException e) {
-            String msg = String.format("Upload block for blob: %s failed for blockid: %s due to InterruptedException.",
-                blobAsyncClient.getBlobUrl().toString(), blockId);
-            LOG.error(msg, e);
-            throw new AzureException("InterruptedException encountered during block upload. Will not retry.", e);
-          } catch (Exception e) {
-            attemptCount += 1;
-            String msg = "Upload block for blob: " + blobAsyncClient.getBlobUrl().toString()
-                + " failed for blockid: " + blockId + " due to exception. AttemptCount: " + attemptCount;
-            LOG.error(msg, e);
-            if (attemptCount == MAX_ATTEMPT) {
-              throw new AzureException("Exceeded number of attempts. Max attempts is: " + MAX_ATTEMPT, e);
-            }
-          }
+        try {
+          ByteBuffer outputStream = ByteBuffer.wrap(compressedLocalByte, 0, blockSize);
+          metrics.updateCompressByteMetrics(blockSize);
+          LOG.info("{} Upload block start for blob: {} for block size:{}.", blobAsyncClient.getBlobUrl().toString(), blockId, blockSize);
+          metrics.updateAzureUploadMetrics();

Review comment:
       should metric be updated after upload is complete, i.e.  stageBlock?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] lakshmi-manasa-g commented on a change in pull request #1542: SAMZA-2699: AzureBlob SystemProducer: remove retries when uploading block as azure sdk does internal retries

Posted by GitBox <gi...@apache.org>.
lakshmi-manasa-g commented on a change in pull request #1542:
URL: https://github.com/apache/samza/pull/1542#discussion_r728555630



##########
File path: samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobOutputStream.java
##########
@@ -322,33 +321,20 @@ private synchronized void uploadBlockAsync() {
       // call async stageblock and add to future
       @Override
       public void run() {
-        int attemptCount = 0;
         byte[] compressedLocalByte = compression.compress(localByte);
         int blockSize = compressedLocalByte.length;
 
-        while (attemptCount < MAX_ATTEMPT) {
-          try {
-            ByteBuffer outputStream = ByteBuffer.wrap(compressedLocalByte, 0, blockSize);
-            metrics.updateCompressByteMetrics(blockSize);
-            LOG.info("{} Upload block start for blob: {} for block size:{}.", blobAsyncClient.getBlobUrl().toString(), blockId, blockSize);
-            metrics.updateAzureUploadMetrics();
-            // StageBlock generates exception on Failure.
-            stageBlock(blockIdEncoded, outputStream, blockSize);
-            break;
-          } catch (InterruptedException e) {

Review comment:
       yes interrupted exception can be thrown while sending request. in which case, it is simply logged and bubbled up. that part of interrupted exception handling has not changed in this pr.

##########
File path: samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobOutputStream.java
##########
@@ -322,33 +321,20 @@ private synchronized void uploadBlockAsync() {
       // call async stageblock and add to future
       @Override
       public void run() {
-        int attemptCount = 0;
         byte[] compressedLocalByte = compression.compress(localByte);
         int blockSize = compressedLocalByte.length;
 
-        while (attemptCount < MAX_ATTEMPT) {
-          try {
-            ByteBuffer outputStream = ByteBuffer.wrap(compressedLocalByte, 0, blockSize);
-            metrics.updateCompressByteMetrics(blockSize);
-            LOG.info("{} Upload block start for blob: {} for block size:{}.", blobAsyncClient.getBlobUrl().toString(), blockId, blockSize);
-            metrics.updateAzureUploadMetrics();
-            // StageBlock generates exception on Failure.
-            stageBlock(blockIdEncoded, outputStream, blockSize);
-            break;
-          } catch (InterruptedException e) {
-            String msg = String.format("Upload block for blob: %s failed for blockid: %s due to InterruptedException.",
-                blobAsyncClient.getBlobUrl().toString(), blockId);
-            LOG.error(msg, e);
-            throw new AzureException("InterruptedException encountered during block upload. Will not retry.", e);
-          } catch (Exception e) {
-            attemptCount += 1;
-            String msg = "Upload block for blob: " + blobAsyncClient.getBlobUrl().toString()
-                + " failed for blockid: " + blockId + " due to exception. AttemptCount: " + attemptCount;
-            LOG.error(msg, e);
-            if (attemptCount == MAX_ATTEMPT) {
-              throw new AzureException("Exceeded number of attempts. Max attempts is: " + MAX_ATTEMPT, e);
-            }
-          }
+        try {
+          ByteBuffer outputStream = ByteBuffer.wrap(compressedLocalByte, 0, blockSize);
+          metrics.updateCompressByteMetrics(blockSize);
+          LOG.info("{} Upload block start for blob: {} for block size:{}.", blobAsyncClient.getBlobUrl().toString(), blockId, blockSize);
+          metrics.updateAzureUploadMetrics();

Review comment:
       doing so now will actually make this change backwards incompatible. 
   the errors for upload are tracked separately [here](https://github.com/apache/samza/blob/master/samza-azure/src/main/java/org/apache/samza/system/azureblob/AzureBlobBasicMetrics.java#L46)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] ZitingShen commented on a change in pull request #1542: SAMZA-2699: AzureBlob SystemProducer: remove retries when uploading block as azure sdk does internal retries

Posted by GitBox <gi...@apache.org>.
ZitingShen commented on a change in pull request #1542:
URL: https://github.com/apache/samza/pull/1542#discussion_r721582950



##########
File path: samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobOutputStream.java
##########
@@ -322,33 +321,20 @@ private synchronized void uploadBlockAsync() {
       // call async stageblock and add to future
       @Override
       public void run() {
-        int attemptCount = 0;
         byte[] compressedLocalByte = compression.compress(localByte);
         int blockSize = compressedLocalByte.length;
 
-        while (attemptCount < MAX_ATTEMPT) {
-          try {
-            ByteBuffer outputStream = ByteBuffer.wrap(compressedLocalByte, 0, blockSize);
-            metrics.updateCompressByteMetrics(blockSize);
-            LOG.info("{} Upload block start for blob: {} for block size:{}.", blobAsyncClient.getBlobUrl().toString(), blockId, blockSize);
-            metrics.updateAzureUploadMetrics();
-            // StageBlock generates exception on Failure.
-            stageBlock(blockIdEncoded, outputStream, blockSize);
-            break;
-          } catch (InterruptedException e) {

Review comment:
       Does it mean InterruptedException is not thrown during sending request? Otherwise you probably need to change the retry policy to make retry not stuck on InterruptedException




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org