You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ra...@apache.org on 2020/03/02 19:25:53 UTC
[samza] branch master updated: SAMZA-2476:Fix flaky tests in
TestAzureBlobOutputStream (#1298)
This is an automated email from the ASF dual-hosted git repository.
rayman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 851fc48 SAMZA-2476:Fix flaky tests in TestAzureBlobOutputStream (#1298)
851fc48 is described below
commit 851fc481d0c12f6bb3330677cc61a7626323423c
Author: lakshmi-manasa-g <mg...@linkedin.com>
AuthorDate: Mon Mar 2 11:23:35 2020 -0800
SAMZA-2476:Fix flaky tests in TestAzureBlobOutputStream (#1298)
* SAMZA-2476:Fix flaky tests in TestAzureBlobOutputStream
* simple java doc for new method
---
.../azureblob/avro/AzureBlobOutputStream.java | 8 +++++-
.../azureblob/avro/TestAzureBlobOutputStream.java | 33 ++++++++++++----------
2 files changed, 25 insertions(+), 16 deletions(-)
diff --git a/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobOutputStream.java b/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobOutputStream.java
index 2b25f96..21dc2a9 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobOutputStream.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobOutputStream.java
@@ -173,7 +173,7 @@ public class AzureBlobOutputStream extends OutputStream {
LOG.info("For blob: {} committing blockList size:{}", blobAsyncClient.getBlobUrl().toString(), blockList.size());
metrics.updateAzureCommitMetrics();
Map<String, String> blobMetadata = Collections.singletonMap(BLOB_RAW_SIZE_BYTES_METADATA, Long.toString(totalUploadedBlockSize));
- blobAsyncClient.commitBlockListWithResponse(blockList, null, blobMetadata, null, null).block();
+ commitBlob(blockList, blobMetadata);
} catch (Exception e) {
String msg = String.format("Close blob %s failed with exception. Total pending sends %d",
blobAsyncClient.getBlobUrl().toString(), pendingUpload.size());
@@ -227,6 +227,12 @@ public class AzureBlobOutputStream extends OutputStream {
this.compression = compression;
}
+ // SAMZA-2476 stubbing BlockBlobAsyncClient.commitBlockListWithResponse was causing flaky tests.
+ @VisibleForTesting
+ void commitBlob(ArrayList<String> blockList, Map<String, String> blobMetadata) {
+ blobAsyncClient.commitBlockListWithResponse(blockList, null, blobMetadata, null, null).block();
+ }
+
/**
* This api will async upload the outputstream into block using stageBlocks,
* reint outputstream
diff --git a/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobOutputStream.java b/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobOutputStream.java
index fa21934..34c5a4b 100644
--- a/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobOutputStream.java
+++ b/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobOutputStream.java
@@ -21,7 +21,7 @@ package org.apache.samza.system.azureblob.avro;
import com.azure.core.http.rest.SimpleResponse;
import com.azure.core.implementation.util.FluxUtil;
-import java.util.Arrays;
+import org.apache.samza.AzureException;
import org.apache.samza.system.azureblob.compression.Compression;
import org.apache.samza.system.azureblob.producer.AzureBlobWriterMetrics;
import com.azure.storage.blob.specialized.BlockBlobAsyncClient;
@@ -48,7 +48,9 @@ import reactor.core.publisher.Mono;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyList;
import static org.mockito.Mockito.anyLong;
+import static org.mockito.Mockito.anyMap;
import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
@@ -189,9 +191,10 @@ public class TestAzureBlobOutputStream {
verify(mockMetrics, times(1)).updateAzureUploadMetrics();
}
- @Test(expected = RuntimeException.class)
+ @Test(expected = AzureException.class)
public void testWriteFailed() {
- when(mockBlobAsyncClient.stageBlock(anyString(), any(), anyLong())).thenThrow(new Exception("Test Failed"));
+ when(mockBlobAsyncClient.stageBlock(anyString(), any(), anyLong()))
+ .thenReturn(Mono.error(new Exception("Test Failed")));
byte[] b = new byte[100];
azureBlobOutputStream.write(b, 0, THRESHOLD); // threshold crossed so stageBlock is scheduled.
@@ -206,14 +209,14 @@ public class TestAzureBlobOutputStream {
String blockId = String.format("%05d", blockNum);
String blockIdEncoded = Base64.getEncoder().encodeToString(blockId.getBytes());
- PowerMockito.doAnswer(invocation -> {
+ doAnswer(invocation -> {
ArrayList<String> blockListArg = (ArrayList<String>) invocation.getArguments()[0];
String blockIdArg = (String) blockListArg.toArray()[0];
Assert.assertEquals(blockIdEncoded, blockIdArg);
- Map<String, String> blobMetadata = (Map<String, String>) invocation.getArguments()[2];
+ Map<String, String> blobMetadata = (Map<String, String>) invocation.getArguments()[1];
Assert.assertEquals(blobMetadata.get(AzureBlobOutputStream.BLOB_RAW_SIZE_BYTES_METADATA), Long.toString(THRESHOLD));
- return Mono.just(new SimpleResponse(null, 200, null, null));
- }).when(mockBlobAsyncClient).commitBlockListWithResponse(anyList(), any(), any(), any(), any());
+ return null;
+ }).when(azureBlobOutputStream).commitBlob(any(ArrayList.class), anyMap());
azureBlobOutputStream.close();
verify(mockMetrics).updateAzureCommitMetrics();
@@ -232,19 +235,19 @@ public class TestAzureBlobOutputStream {
String blockId1 = String.format("%05d", blockNum1);
String blockIdEncoded1 = Base64.getEncoder().encodeToString(blockId1.getBytes());
- PowerMockito.doAnswer(invocation -> {
+ doAnswer(invocation -> {
ArrayList<String> blockListArg = (ArrayList<String>) invocation.getArguments()[0];
- Assert.assertEquals(Arrays.asList(blockIdEncoded, blockIdEncoded1), blockListArg);
- Map<String, String> blobMetadata = (Map<String, String>) invocation.getArguments()[2];
+ String blockIdArg = (String) blockListArg.toArray()[0];
+ Assert.assertEquals(blockIdEncoded, blockIdArg);
+ Map<String, String> blobMetadata = (Map<String, String>) invocation.getArguments()[1];
Assert.assertEquals(blobMetadata.get(AzureBlobOutputStream.BLOB_RAW_SIZE_BYTES_METADATA), Long.toString(2 * THRESHOLD));
- return Mono.just(new SimpleResponse(null, 200, null, null));
- }).when(mockBlobAsyncClient).commitBlockListWithResponse(anyList(), any(), any(), any(), any());
-
+ return null;
+ }).when(azureBlobOutputStream).commitBlob(any(ArrayList.class), anyMap());
azureBlobOutputStream.close();
verify(mockMetrics).updateAzureCommitMetrics();
}
- @Test(expected = RuntimeException.class)
+ @Test(expected = AzureException.class)
public void testCloseFailed() {
when(mockBlobAsyncClient.commitBlockListWithResponse(anyList(), any(), any(), any(), any()))
.thenReturn(Mono.error(new Exception("Test Failed")));
@@ -279,7 +282,7 @@ public class TestAzureBlobOutputStream {
verify(mockMetrics).updateAzureUploadMetrics();
}
- @Test (expected = RuntimeException.class)
+ @Test (expected = AzureException.class)
public void testFlushFailed() throws IOException {
azureBlobOutputStream.write(BYTES);
when(mockBlobAsyncClient.stageBlock(anyString(), any(), anyLong()))