You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ra...@apache.org on 2021/02/17 02:59:17 UTC
[samza] branch master updated: AzureBlobSystemProducer: Catch all
Exception in completableFuture during flush of producer (#1363)
This is an automated email from the ASF dual-hosted git repository.
rayman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 83204fc AzureBlobSystemProducer: Catch all Exception in completableFuture during flush of producer (#1363)
83204fc is described below
commit 83204fc6e72e1a18e3b70420e777bce079c92757
Author: lakshmi-manasa-g <mg...@linkedin.com>
AuthorDate: Tue Feb 16 18:59:07 2021 -0800
AzureBlobSystemProducer: Catch all Exception in completableFuture during flush of producer (#1363)
* AzureBlobSystemProducer: Catch all throwable in completableFuture during flush of the producer
* catch Exception instead of throwable
* Empty commit to Trigger Travis Build
* Empty commit to Trigger Travis Build again
* fix checkstyle build failure
---
.../azureblob/producer/AzureBlobSystemProducer.java | 16 ++++++++--------
1 file changed, 8 insertions(+), 8 deletions(-)
diff --git a/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobSystemProducer.java b/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobSystemProducer.java
index 5ecd528..d89f38f 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobSystemProducer.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobSystemProducer.java
@@ -486,15 +486,15 @@ public class AzureBlobSystemProducer implements SystemProducer {
sourceWriterMap.forEach((stream, writer) -> {
LOG.info("Closing topic:{}", stream);
CompletableFuture<Void> future = CompletableFuture.runAsync(new Runnable() {
- @Override
- public void run() {
- try {
- writer.close();
- } catch (IOException e) {
- throw new SystemProducerException("Close failed for topic " + stream, e);
+ @Override
+ public void run() {
+ try {
+ writer.close();
+ } catch (Exception e) {
+ throw new SystemProducerException("Close failed for topic " + stream, e);
+ }
}
- }
- }, asyncBlobThreadPool);
+ }, asyncBlobThreadPool);
pendingClose.add(future);
future.handle((aVoid, throwable) -> {
sourceWriterMap.remove(writer);