You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ra...@apache.org on 2020/03/02 19:25:53 UTC

[samza] branch master updated: SAMZA-2476:Fix flaky tests in TestAzureBlobOutputStream (#1298)

This is an automated email from the ASF dual-hosted git repository.

rayman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 851fc48  SAMZA-2476:Fix flaky tests in TestAzureBlobOutputStream (#1298)
851fc48 is described below

commit 851fc481d0c12f6bb3330677cc61a7626323423c
Author: lakshmi-manasa-g <mg...@linkedin.com>
AuthorDate: Mon Mar 2 11:23:35 2020 -0800

    SAMZA-2476:Fix flaky tests in TestAzureBlobOutputStream (#1298)
    
    * SAMZA-2476:Fix flaky tests in TestAzureBlobOutputStream
    
    * simple java doc for new method
---
 .../azureblob/avro/AzureBlobOutputStream.java      |  8 +++++-
 .../azureblob/avro/TestAzureBlobOutputStream.java  | 33 ++++++++++++----------
 2 files changed, 25 insertions(+), 16 deletions(-)

diff --git a/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobOutputStream.java b/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobOutputStream.java
index 2b25f96..21dc2a9 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobOutputStream.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobOutputStream.java
@@ -173,7 +173,7 @@ public class AzureBlobOutputStream extends OutputStream {
       LOG.info("For blob: {} committing blockList size:{}", blobAsyncClient.getBlobUrl().toString(), blockList.size());
       metrics.updateAzureCommitMetrics();
       Map<String, String> blobMetadata = Collections.singletonMap(BLOB_RAW_SIZE_BYTES_METADATA, Long.toString(totalUploadedBlockSize));
-      blobAsyncClient.commitBlockListWithResponse(blockList, null, blobMetadata, null, null).block();
+      commitBlob(blockList, blobMetadata);
     } catch (Exception e) {
       String msg = String.format("Close blob %s failed with exception. Total pending sends %d",
           blobAsyncClient.getBlobUrl().toString(), pendingUpload.size());
@@ -227,6 +227,12 @@ public class AzureBlobOutputStream extends OutputStream {
     this.compression = compression;
   }
 
+  // SAMZA-2476 stubbing BlockBlobAsyncClient.commitBlockListWithResponse was causing flaky tests.
+  @VisibleForTesting
+  void commitBlob(ArrayList<String> blockList, Map<String, String> blobMetadata) {
+    blobAsyncClient.commitBlockListWithResponse(blockList, null, blobMetadata, null, null).block();
+  }
+
   /**
    * This api will async upload the outputstream into block using stageBlocks,
    * reint outputstream
diff --git a/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobOutputStream.java b/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobOutputStream.java
index fa21934..34c5a4b 100644
--- a/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobOutputStream.java
+++ b/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobOutputStream.java
@@ -21,7 +21,7 @@ package org.apache.samza.system.azureblob.avro;
 
 import com.azure.core.http.rest.SimpleResponse;
 import com.azure.core.implementation.util.FluxUtil;
-import java.util.Arrays;
+import org.apache.samza.AzureException;
 import org.apache.samza.system.azureblob.compression.Compression;
 import org.apache.samza.system.azureblob.producer.AzureBlobWriterMetrics;
 import com.azure.storage.blob.specialized.BlockBlobAsyncClient;
@@ -48,7 +48,9 @@ import reactor.core.publisher.Mono;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.anyList;
 import static org.mockito.Mockito.anyLong;
+import static org.mockito.Mockito.anyMap;
 import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.eq;
 import static org.mockito.Mockito.mock;
@@ -189,9 +191,10 @@ public class TestAzureBlobOutputStream {
     verify(mockMetrics, times(1)).updateAzureUploadMetrics();
   }
 
-  @Test(expected = RuntimeException.class)
+  @Test(expected = AzureException.class)
   public void testWriteFailed() {
-    when(mockBlobAsyncClient.stageBlock(anyString(), any(), anyLong())).thenThrow(new Exception("Test Failed"));
+    when(mockBlobAsyncClient.stageBlock(anyString(), any(), anyLong()))
+        .thenReturn(Mono.error(new Exception("Test Failed")));
 
     byte[] b = new byte[100];
     azureBlobOutputStream.write(b, 0, THRESHOLD); // threshold crossed so stageBlock is scheduled.
@@ -206,14 +209,14 @@ public class TestAzureBlobOutputStream {
     String blockId = String.format("%05d", blockNum);
     String blockIdEncoded = Base64.getEncoder().encodeToString(blockId.getBytes());
 
-    PowerMockito.doAnswer(invocation -> {
+    doAnswer(invocation -> {
         ArrayList<String> blockListArg = (ArrayList<String>) invocation.getArguments()[0];
         String blockIdArg = (String) blockListArg.toArray()[0];
         Assert.assertEquals(blockIdEncoded, blockIdArg);
-        Map<String, String> blobMetadata = (Map<String, String>) invocation.getArguments()[2];
+        Map<String, String> blobMetadata = (Map<String, String>) invocation.getArguments()[1];
         Assert.assertEquals(blobMetadata.get(AzureBlobOutputStream.BLOB_RAW_SIZE_BYTES_METADATA), Long.toString(THRESHOLD));
-        return Mono.just(new SimpleResponse(null, 200, null, null));
-      }).when(mockBlobAsyncClient).commitBlockListWithResponse(anyList(), any(), any(), any(), any());
+        return null;
+      }).when(azureBlobOutputStream).commitBlob(any(ArrayList.class), anyMap());
 
     azureBlobOutputStream.close();
     verify(mockMetrics).updateAzureCommitMetrics();
@@ -232,19 +235,19 @@ public class TestAzureBlobOutputStream {
     String blockId1 = String.format("%05d", blockNum1);
     String blockIdEncoded1 = Base64.getEncoder().encodeToString(blockId1.getBytes());
 
-    PowerMockito.doAnswer(invocation -> {
+    doAnswer(invocation -> {
         ArrayList<String> blockListArg = (ArrayList<String>) invocation.getArguments()[0];
-        Assert.assertEquals(Arrays.asList(blockIdEncoded, blockIdEncoded1), blockListArg);
-        Map<String, String> blobMetadata = (Map<String, String>) invocation.getArguments()[2];
+        String blockIdArg = (String) blockListArg.toArray()[0];
+        Assert.assertEquals(blockIdEncoded, blockIdArg);
+        Map<String, String> blobMetadata = (Map<String, String>) invocation.getArguments()[1];
         Assert.assertEquals(blobMetadata.get(AzureBlobOutputStream.BLOB_RAW_SIZE_BYTES_METADATA), Long.toString(2 * THRESHOLD));
-        return Mono.just(new SimpleResponse(null, 200, null, null));
-      }).when(mockBlobAsyncClient).commitBlockListWithResponse(anyList(), any(), any(), any(), any());
-
+        return null;
+      }).when(azureBlobOutputStream).commitBlob(any(ArrayList.class), anyMap());
     azureBlobOutputStream.close();
     verify(mockMetrics).updateAzureCommitMetrics();
   }
 
-  @Test(expected = RuntimeException.class)
+  @Test(expected = AzureException.class)
   public void testCloseFailed() {
     when(mockBlobAsyncClient.commitBlockListWithResponse(anyList(), any(), any(), any(), any()))
         .thenReturn(Mono.error(new Exception("Test Failed")));
@@ -279,7 +282,7 @@ public class TestAzureBlobOutputStream {
     verify(mockMetrics).updateAzureUploadMetrics();
   }
 
-  @Test (expected = RuntimeException.class)
+  @Test (expected = AzureException.class)
   public void testFlushFailed() throws IOException {
     azureBlobOutputStream.write(BYTES);
     when(mockBlobAsyncClient.stageBlock(anyString(), any(), anyLong()))