You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ra...@apache.org on 2021/07/29 22:40:55 UTC

[samza] branch master updated: Samza-2665: AzureBlob SystemProducer: stop block upload retrying when InterruptedException is thrown (#1511)

This is an automated email from the ASF dual-hosted git repository.

rayman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 8fa07fe  Samza-2665: AzureBlob SystemProducer: stop block upload retrying when InterruptedException is thrown (#1511)
8fa07fe is described below

commit 8fa07fe130f175513088d06abe1f5ed5cdd0624e
Author: lakshmi-manasa-g <mg...@linkedin.com>
AuthorDate: Thu Jul 29 15:40:50 2021 -0700

    Samza-2665: AzureBlob SystemProducer: stop block upload retrying when InterruptedException is thrown (#1511)
    
    * Samza-2665: AzureBlob SystemProducer: stop block upload retrying when InterruptedException is thrown
    
    * address comment: use String.format
---
 .../azureblob/avro/AzureBlobOutputStream.java      |  7 +++-
 .../azureblob/avro/TestAzureBlobOutputStream.java  | 40 +++++++++++++++++++---
 2 files changed, 41 insertions(+), 6 deletions(-)

diff --git a/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobOutputStream.java b/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobOutputStream.java
index e615808..222b99c 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobOutputStream.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobOutputStream.java
@@ -273,7 +273,7 @@ public class AzureBlobOutputStream extends OutputStream {
 
   // SAMZA-2476 stubbing BlockBlobAsyncClient.stageBlock was causing flaky tests.
   @VisibleForTesting
-  void stageBlock(String blockIdEncoded, ByteBuffer outputStream, int blockSize) {
+  void stageBlock(String blockIdEncoded, ByteBuffer outputStream, int blockSize) throws InterruptedException {
     blobAsyncClient.stageBlock(blockIdEncoded, Flux.just(outputStream), blockSize).block();
   }
 
@@ -335,6 +335,11 @@ public class AzureBlobOutputStream extends OutputStream {
             // StageBlock generates exception on Failure.
             stageBlock(blockIdEncoded, outputStream, blockSize);
             break;
+          } catch (InterruptedException e) {
+            String msg = String.format("Upload block for blob: %s failed for blockid: %s due to InterruptedException.",
+                blobAsyncClient.getBlobUrl().toString(), blockId);
+            LOG.error(msg, e);
+            throw new AzureException("InterruptedException encountered during block upload. Will not retry.", e);
           } catch (Exception e) {
             attemptCount += 1;
             String msg = "Upload block for blob: " + blobAsyncClient.getBlobUrl().toString()
diff --git a/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobOutputStream.java b/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobOutputStream.java
index b713ec7..4412edf 100644
--- a/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobOutputStream.java
+++ b/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobOutputStream.java
@@ -127,7 +127,7 @@ public class TestAzureBlobOutputStream {
   }
 
   @Test
-  public void testWrite() {
+  public void testWrite() throws  InterruptedException {
     byte[] b = new byte[THRESHOLD - 10];
     azureBlobOutputStream.write(b, 0, THRESHOLD - 10);
     verify(azureBlobOutputStream, never()).stageBlock(anyString(), any(ByteBuffer.class), anyInt());
@@ -136,7 +136,7 @@ public class TestAzureBlobOutputStream {
   }
 
   @Test
-  public void testWriteLargerThanThreshold() {
+  public void testWriteLargerThanThreshold() throws  InterruptedException {
     byte[] largeRecord = RANDOM_STRING.substring(0, 2 * THRESHOLD).getBytes();
     byte[] largeRecordFirstHalf = RANDOM_STRING.substring(0, THRESHOLD).getBytes();
     byte[] largeRecordSecondHalf = RANDOM_STRING.substring(THRESHOLD, 2 * THRESHOLD).getBytes();
@@ -165,7 +165,7 @@ public class TestAzureBlobOutputStream {
   }
 
   @Test
-  public void testWriteLargeRecordWithSmallRecordInBuffer() {
+  public void testWriteLargeRecordWithSmallRecordInBuffer() throws InterruptedException {
     byte[] halfBlock = new byte[THRESHOLD / 2];
     byte[] fullBlock = new byte[THRESHOLD];
     byte[] largeRecord = new byte[2 * THRESHOLD];
@@ -229,6 +229,36 @@ public class TestAzureBlobOutputStream {
     azureBlobOutputStream.close();
   }
 
+  @Test(expected = AzureException.class)
+  public void testWriteFailedInterruptedException() throws InterruptedException {
+
+    doThrow(new InterruptedException("Lets interrupt the thread"))
+        .when(azureBlobOutputStream).stageBlock(anyString(), any(ByteBuffer.class), anyInt());
+    byte[] b = new byte[100];
+    doReturn(COMPRESSED_BYTES).when(mockCompression).compress(b);
+
+    try {
+      azureBlobOutputStream.write(b, 0, THRESHOLD); // threshold crossed so stageBlock is scheduled.
+      // azureBlobOutputStream.close waits on the CompletableFuture which does the actual stageBlock in uploadBlockAsync
+      azureBlobOutputStream.close();
+    } catch (AzureException exception) {
+      // get root cause of the exception - to confirm its an InterruptedException
+      Throwable dupException = exception;
+      while (dupException.getCause() != null && dupException.getCause() != dupException) {
+        dupException = dupException.getCause();
+      }
+
+      Assert.assertTrue(dupException.getClass().getName().equals(InterruptedException.class.getCanonicalName()));
+      Assert.assertEquals("Lets interrupt the thread", dupException.getMessage());
+
+      // verify stageBlock was called exactly once - aka no retries happen when interrupted exception is thrown
+      verify(azureBlobOutputStream).stageBlock(anyString(), any(ByteBuffer.class), anyInt());
+
+      // rethrow the exception so that the test will fail if no exception was thrown in the try block
+      throw exception;
+    }
+  }
+
   @Test
   public void testClose() {
     azureBlobOutputStream.write(BYTES, 0, THRESHOLD);
@@ -278,7 +308,7 @@ public class TestAzureBlobOutputStream {
   }
 
   @Test(expected = AzureException.class)
-  public void testCloseFailed() {
+  public void testCloseFailed() throws InterruptedException {
 
     azureBlobOutputStream = spy(new AzureBlobOutputStream(mockBlobAsyncClient, threadPool, mockMetrics,
         blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, FAKE_STREAM,
@@ -320,7 +350,7 @@ public class TestAzureBlobOutputStream {
   }
 
   @Test (expected = AzureException.class)
-  public void testFlushFailed() throws IOException {
+  public void testFlushFailed() throws IOException, InterruptedException {
     azureBlobOutputStream = spy(new AzureBlobOutputStream(mockBlobAsyncClient, threadPool, mockMetrics,
         blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, FAKE_STREAM,
         60000, THRESHOLD, mockByteArrayOutputStream, mockCompression));