You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/01/31 10:30:14 UTC

[GitHub] [flink] vahmed-hamdy commented on a change in pull request #18553: [FLINK-25846][FLINK-25848] Async Sink does not gracefully shutdown on Cancel, KDS Sink does not fast fail when invalid configuration supplied

vahmed-hamdy commented on a change in pull request #18553:
URL: https://github.com/apache/flink/pull/18553#discussion_r795534589



##########
File path: flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkWriter.java
##########
@@ -176,11 +179,35 @@ private void handlePartiallyFailedRequest(
 
     private boolean isRetryable(Throwable err) {
         if (err instanceof CompletionException
-                && err.getCause() instanceof ResourceNotFoundException) {
+                && isInterruptingSignalException(ExceptionUtils.stripCompletionException(err))) {
+            getFatalExceptionCons().accept(new FlinkException("Running job was cancelled"));
+            return false;
+        }
+        if (err instanceof CompletionException
+                && ExceptionUtils.stripCompletionException(err)
+                        instanceof ResourceNotFoundException) {
             getFatalExceptionCons()
                     .accept(
                             new KinesisDataStreamsException(
-                                    "Encountered non-recoverable exception", err));
+                                    "Encountered non-recoverable exception relating to not being able to find the specified resources",
+                                    err));
+            return false;
+        }
+        if (err instanceof CompletionException
+                && ExceptionUtils.stripCompletionException(err) instanceof StsException) {
+            getFatalExceptionCons()
+                    .accept(
+                            new KinesisDataStreamsException(
+                                    "Encountered non-recoverable exception relating to the provided credentials.",
+                                    err));
+            return false;
+        }
+        if (err instanceof Error) {
+            getFatalExceptionCons()

Review comment:
       The parent (`AsynSinkWriter`) doesn't offer retry strategy leaving the request submission implementation to concrete classes. Is it better to enforce having a retry strategy on all implementations or just move the common logic to a validator/classifier class to be reused by all implementations without changing base class?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org