You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ca...@apache.org on 2020/05/27 22:06:20 UTC
[samza] branch master updated: SAMZA-2526: Azure blob system
producer: do not commit blobs if avro DataFileWriter.close fails (#1362)
This is an automated email from the ASF dual-hosted git repository.
cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 3e18e19 SAMZA-2526: Azure blob system producer: do not commit blobs if avro DataFileWriter.close fails (#1362)
3e18e19 is described below
commit 3e18e19c850327afdbbd8994dc4ba8fb58ffa09b
Author: lakshmi-manasa-g <mg...@linkedin.com>
AuthorDate: Wed May 27 15:06:12 2020 -0700
SAMZA-2526: Azure blob system producer: do not commit blobs if avro DataFileWriter.close fails (#1362)
API changes: Behavior change: Blob not committed if DataFileWriter.close fails discarding all uploaded blocks.
Upgrade Instructions: Catch exceptions arising out of AzureBlobSystemProducer.flush and retry messages since previous flush. Do not advance checkpoint if flush fails.
Usage Instructions: None
---
.../apache/samza/system/azureblob/avro/AzureBlobAvroWriter.java | 7 +++----
.../samza/system/azureblob/avro/TestAzureBlobAvroWriter.java | 3 ++-
2 files changed, 5 insertions(+), 5 deletions(-)
diff --git a/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobAvroWriter.java b/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobAvroWriter.java
index 6e74461..ba8e3aa 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobAvroWriter.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobAvroWriter.java
@@ -284,10 +284,9 @@ public class AzureBlobAvroWriter implements AzureBlobWriter {
// dataFileWriter.close calls close of the azureBlobOutputStream associated with it.
dataFileWriter.close();
} catch (Exception e) {
- // ensure that close is called even if dataFileWriter.close fails.
- // This is to avoid loss of all the blocks uploaded for the blob
- // as commitBlockList happens in close of azureBlobOutputStream.
- azureBlobOutputStream.close();
+ LOG.error("Exception occurred during DataFileWriter.close for blob "
+ + blockBlobAsyncClient.getBlobUrl()
+ + ". All blocks uploaded so far for this blob will be discarded to avoid invalid blobs.");
throw e;
}
}
diff --git a/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobAvroWriter.java b/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobAvroWriter.java
index 62076aa..f52b484 100644
--- a/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobAvroWriter.java
+++ b/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobAvroWriter.java
@@ -63,6 +63,7 @@ import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -217,7 +218,7 @@ public class TestAzureBlobAvroWriter {
azureBlobAvroWriter.flush();
azureBlobAvroWriter.close();
- verify(mockAzureBlobOutputStream).close();
+ verify(mockAzureBlobOutputStream, never()).close();
}
@Test(expected = RuntimeException.class)