You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ca...@apache.org on 2020/05/27 22:06:20 UTC

[samza] branch master updated: SAMZA-2526: Azure blob system producer: do not commit blobs if avro DataFileWriter.close fails (#1362)

This is an automated email from the ASF dual-hosted git repository.

cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 3e18e19  SAMZA-2526: Azure blob system producer: do not commit blobs if avro DataFileWriter.close fails (#1362)
3e18e19 is described below

commit 3e18e19c850327afdbbd8994dc4ba8fb58ffa09b
Author: lakshmi-manasa-g <mg...@linkedin.com>
AuthorDate: Wed May 27 15:06:12 2020 -0700

    SAMZA-2526: Azure blob system producer: do not commit blobs if avro DataFileWriter.close fails (#1362)
    
    API changes: Behavior change: Blob not committed if DataFileWriter.close fails discarding all uploaded blocks.
    Upgrade Instructions: Catch exceptions arising out of AzureBlobSystemProducer.flush and retry messages since previous flush. Do not advance checkpoint if flush fails.
    Usage Instructions: None
---
 .../apache/samza/system/azureblob/avro/AzureBlobAvroWriter.java    | 7 +++----
 .../samza/system/azureblob/avro/TestAzureBlobAvroWriter.java       | 3 ++-
 2 files changed, 5 insertions(+), 5 deletions(-)

diff --git a/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobAvroWriter.java b/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobAvroWriter.java
index 6e74461..ba8e3aa 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobAvroWriter.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobAvroWriter.java
@@ -284,10 +284,9 @@ public class AzureBlobAvroWriter implements AzureBlobWriter {
       // dataFileWriter.close calls close of the azureBlobOutputStream associated with it.
       dataFileWriter.close();
     } catch (Exception e) {
-      // ensure that close is called even if dataFileWriter.close fails.
-      // This is to avoid loss of all the blocks uploaded for the blob
-      // as commitBlockList happens in close of azureBlobOutputStream.
-      azureBlobOutputStream.close();
+      LOG.error("Exception occurred during DataFileWriter.close for blob  "
+          + blockBlobAsyncClient.getBlobUrl()
+          + ". All blocks uploaded so far for this blob will be discarded to avoid invalid blobs.");
       throw e;
     }
   }
diff --git a/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobAvroWriter.java b/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobAvroWriter.java
index 62076aa..f52b484 100644
--- a/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobAvroWriter.java
+++ b/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobAvroWriter.java
@@ -63,6 +63,7 @@ import static org.mockito.Mockito.anyString;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -217,7 +218,7 @@ public class TestAzureBlobAvroWriter {
 
     azureBlobAvroWriter.flush();
     azureBlobAvroWriter.close();
-    verify(mockAzureBlobOutputStream).close();
+    verify(mockAzureBlobOutputStream, never()).close();
   }
 
   @Test(expected = RuntimeException.class)