You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ra...@apache.org on 2021/07/29 22:40:55 UTC
[samza] branch master updated: Samza-2665: AzureBlob
SystemProducer: stop block upload retrying when InterruptedException is
thrown (#1511)
This is an automated email from the ASF dual-hosted git repository.
rayman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 8fa07fe Samza-2665: AzureBlob SystemProducer: stop block upload retrying when InterruptedException is thrown (#1511)
8fa07fe is described below
commit 8fa07fe130f175513088d06abe1f5ed5cdd0624e
Author: lakshmi-manasa-g <mg...@linkedin.com>
AuthorDate: Thu Jul 29 15:40:50 2021 -0700
Samza-2665: AzureBlob SystemProducer: stop block upload retrying when InterruptedException is thrown (#1511)
* Samza-2665: AzureBlob SystemProducer: stop block upload retrying when InterruptedException is thrown
* address comment: use String.format
---
.../azureblob/avro/AzureBlobOutputStream.java | 7 +++-
.../azureblob/avro/TestAzureBlobOutputStream.java | 40 +++++++++++++++++++---
2 files changed, 41 insertions(+), 6 deletions(-)
diff --git a/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobOutputStream.java b/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobOutputStream.java
index e615808..222b99c 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobOutputStream.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobOutputStream.java
@@ -273,7 +273,7 @@ public class AzureBlobOutputStream extends OutputStream {
// SAMZA-2476 stubbing BlockBlobAsyncClient.stageBlock was causing flaky tests.
@VisibleForTesting
- void stageBlock(String blockIdEncoded, ByteBuffer outputStream, int blockSize) {
+ void stageBlock(String blockIdEncoded, ByteBuffer outputStream, int blockSize) throws InterruptedException {
blobAsyncClient.stageBlock(blockIdEncoded, Flux.just(outputStream), blockSize).block();
}
@@ -335,6 +335,11 @@ public class AzureBlobOutputStream extends OutputStream {
// StageBlock generates exception on Failure.
stageBlock(blockIdEncoded, outputStream, blockSize);
break;
+ } catch (InterruptedException e) {
+ String msg = String.format("Upload block for blob: %s failed for blockid: %s due to InterruptedException.",
+ blobAsyncClient.getBlobUrl().toString(), blockId);
+ LOG.error(msg, e);
+ throw new AzureException("InterruptedException encountered during block upload. Will not retry.", e);
} catch (Exception e) {
attemptCount += 1;
String msg = "Upload block for blob: " + blobAsyncClient.getBlobUrl().toString()
diff --git a/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobOutputStream.java b/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobOutputStream.java
index b713ec7..4412edf 100644
--- a/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobOutputStream.java
+++ b/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobOutputStream.java
@@ -127,7 +127,7 @@ public class TestAzureBlobOutputStream {
}
@Test
- public void testWrite() {
+ public void testWrite() throws InterruptedException {
byte[] b = new byte[THRESHOLD - 10];
azureBlobOutputStream.write(b, 0, THRESHOLD - 10);
verify(azureBlobOutputStream, never()).stageBlock(anyString(), any(ByteBuffer.class), anyInt());
@@ -136,7 +136,7 @@ public class TestAzureBlobOutputStream {
}
@Test
- public void testWriteLargerThanThreshold() {
+ public void testWriteLargerThanThreshold() throws InterruptedException {
byte[] largeRecord = RANDOM_STRING.substring(0, 2 * THRESHOLD).getBytes();
byte[] largeRecordFirstHalf = RANDOM_STRING.substring(0, THRESHOLD).getBytes();
byte[] largeRecordSecondHalf = RANDOM_STRING.substring(THRESHOLD, 2 * THRESHOLD).getBytes();
@@ -165,7 +165,7 @@ public class TestAzureBlobOutputStream {
}
@Test
- public void testWriteLargeRecordWithSmallRecordInBuffer() {
+ public void testWriteLargeRecordWithSmallRecordInBuffer() throws InterruptedException {
byte[] halfBlock = new byte[THRESHOLD / 2];
byte[] fullBlock = new byte[THRESHOLD];
byte[] largeRecord = new byte[2 * THRESHOLD];
@@ -229,6 +229,36 @@ public class TestAzureBlobOutputStream {
azureBlobOutputStream.close();
}
+ @Test(expected = AzureException.class)
+ public void testWriteFailedInterruptedException() throws InterruptedException {
+
+ doThrow(new InterruptedException("Lets interrupt the thread"))
+ .when(azureBlobOutputStream).stageBlock(anyString(), any(ByteBuffer.class), anyInt());
+ byte[] b = new byte[100];
+ doReturn(COMPRESSED_BYTES).when(mockCompression).compress(b);
+
+ try {
+ azureBlobOutputStream.write(b, 0, THRESHOLD); // threshold crossed so stageBlock is scheduled.
+ // azureBlobOutputStream.close waits on the CompletableFuture which does the actual stageBlock in uploadBlockAsync
+ azureBlobOutputStream.close();
+ } catch (AzureException exception) {
+ // get root cause of the exception - to confirm its an InterruptedException
+ Throwable dupException = exception;
+ while (dupException.getCause() != null && dupException.getCause() != dupException) {
+ dupException = dupException.getCause();
+ }
+
+ Assert.assertTrue(dupException.getClass().getName().equals(InterruptedException.class.getCanonicalName()));
+ Assert.assertEquals("Lets interrupt the thread", dupException.getMessage());
+
+ // verify stageBlock was called exactly once - aka no retries happen when interrupted exception is thrown
+ verify(azureBlobOutputStream).stageBlock(anyString(), any(ByteBuffer.class), anyInt());
+
+ // rethrow the exception so that the test will fail if no exception was thrown in the try block
+ throw exception;
+ }
+ }
+
@Test
public void testClose() {
azureBlobOutputStream.write(BYTES, 0, THRESHOLD);
@@ -278,7 +308,7 @@ public class TestAzureBlobOutputStream {
}
@Test(expected = AzureException.class)
- public void testCloseFailed() {
+ public void testCloseFailed() throws InterruptedException {
azureBlobOutputStream = spy(new AzureBlobOutputStream(mockBlobAsyncClient, threadPool, mockMetrics,
blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, FAKE_STREAM,
@@ -320,7 +350,7 @@ public class TestAzureBlobOutputStream {
}
@Test (expected = AzureException.class)
- public void testFlushFailed() throws IOException {
+ public void testFlushFailed() throws IOException, InterruptedException {
azureBlobOutputStream = spy(new AzureBlobOutputStream(mockBlobAsyncClient, threadPool, mockMetrics,
blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, FAKE_STREAM,
60000, THRESHOLD, mockByteArrayOutputStream, mockCompression));