You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ra...@apache.org on 2021/02/17 02:59:17 UTC

[samza] branch master updated: AzureBlobSystemProducer: Catch all Exception in completableFuture during flush of producer (#1363)

This is an automated email from the ASF dual-hosted git repository.

rayman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 83204fc  AzureBlobSystemProducer: Catch all Exception in completableFuture during flush of producer (#1363)
83204fc is described below

commit 83204fc6e72e1a18e3b70420e777bce079c92757
Author: lakshmi-manasa-g <mg...@linkedin.com>
AuthorDate: Tue Feb 16 18:59:07 2021 -0800

    AzureBlobSystemProducer: Catch all Exception in completableFuture during flush of producer (#1363)
    
    * AzureBlobSystemProducer: Catch all throwable in completableFuture during flush of the producer
    
    * catch Exception instead of throwable
    
    * Empty commit to Trigger Travis Build
    
    * Empty commit to Trigger Travis Build again
    
    * fix checkstyle build failure
---
 .../azureblob/producer/AzureBlobSystemProducer.java      | 16 ++++++++--------
 1 file changed, 8 insertions(+), 8 deletions(-)

diff --git a/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobSystemProducer.java b/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobSystemProducer.java
index 5ecd528..d89f38f 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobSystemProducer.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobSystemProducer.java
@@ -486,15 +486,15 @@ public class AzureBlobSystemProducer implements SystemProducer {
       sourceWriterMap.forEach((stream, writer) -> {
         LOG.info("Closing topic:{}", stream);
         CompletableFuture<Void> future = CompletableFuture.runAsync(new Runnable() {
-          @Override
-          public void run() {
-            try {
-              writer.close();
-            } catch (IOException e) {
-              throw new SystemProducerException("Close failed for topic " + stream, e);
+            @Override
+            public void run() {
+              try {
+                writer.close();
+              } catch (Exception e) {
+                throw new SystemProducerException("Close failed for topic " + stream, e);
+              }
             }
-          }
-        }, asyncBlobThreadPool);
+          }, asyncBlobThreadPool);
         pendingClose.add(future);
         future.handle((aVoid, throwable) -> {
           sourceWriterMap.remove(writer);