You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/13 02:03:13 UTC

[GitHub] [flink] gaoyunhaii commented on a diff in pull request #19983: [FLINK-27878][datastream] Add Retry Support For Async I/O In DataStream API

gaoyunhaii commented on code in PR #19983:
URL: https://github.com/apache/flink/pull/19983#discussion_r919556536


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java:
##########
@@ -128,6 +156,15 @@ public AsyncWaitOperator(
 
         this.timeout = timeout;
 
+        this.asyncRetryStrategy = asyncRetryStrategy;
+
+        this.retryEnabled =
+                // construct from utility class
+                asyncRetryStrategy != NO_RETRY_STRATEGY
+                        // construct from api
+                        || asyncRetryStrategy.getRetryPredicate().resultPredicate().isPresent()

Review Comment:
   Should it be 
   
   `asyncRetryStrategy != NO_RETRY_STRATEGY` && (asyncRetryStrategy.getRetryPredicate().resultPredicate().isPresent() ||asyncRetryStrategy.getRetryPredicate().exceptionPredicate().isPresent())` ?
   
   otherwise if user construct a new strategy return empty for both methods, it should be indeed not enabled?
   
   



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java:
##########
@@ -154,6 +190,14 @@ public void setup(
             default:
                 throw new IllegalStateException("Unknown async mode: " + outputMode + '.');
         }
+        if (asyncRetryStrategy.getRetryPredicate().resultPredicate().isPresent()) {

Review Comment:
   I tend we do not leave retryResultPredicate / retryExceptionPredicate to be null to avoid possible errors. It could be 
   
   ```
           retryResultPredicate = asyncRetryStrategy.getRetryPredicate().resultPredicate()
               .orElse(ignored -> false);
           retryExceptionPredicate = asyncRetryStrategy.getRetryPredicate().exceptionPredicate()
               .orElse(ignored -> false);
   ```
   
   Then we could also simplify the test to 
   ```
   boolean satisfy =
           (null != results && retryResultPredicate.test(results))
                   || (null != error && retryExceptionPredicate.test(error));
   ```
   
   
   



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java:
##########
@@ -318,6 +389,143 @@ private void outputCompletedElement() {
         }
     }
 
+    /**
+     * Besides doRetry, the cleanup work will be done after retry fired, includes reset retry
+     * in-flight flag and remove retry handler from the incomplete retry handlers.
+     */
+    private void doRetryWithCleanup(RetryableResultHandlerDelegator resultHandlerDelegator)
+            throws Exception {
+        doRetry(resultHandlerDelegator);
+
+        // reset retryInFlight for next possible retry
+        resultHandlerDelegator.retryInFlight.set(false);
+        // remove from incomplete retry handlers
+        inFlightDelayRetryHandlers.remove(resultHandlerDelegator);
+    }
+
+    /** Increments number of attempts and fire the attempt. */
+    private void doRetry(RetryableResultHandlerDelegator resultHandlerDelegator) throws Exception {
+        // increment current attempt number
+        resultHandlerDelegator.currentAttempts++;
+
+        // fire a new attempt
+        userFunction.asyncInvoke(
+                resultHandlerDelegator.resultHandler.inputRecord.getValue(),
+                resultHandlerDelegator);
+    }
+
+    /** A delegator holds the real {@link ResultHandler} to handle retries. */
+    private class RetryableResultHandlerDelegator implements ResultFuture<OUT> {
+
+        private final ResultHandler resultHandler;
+        private final ProcessingTimeService processingTimeService;
+
+        private ScheduledFuture<?> delayedRetryTimer;
+
+        /** start from 1, when this entry created, the first attempt will happen. */
+        private int currentAttempts = 1;
+
+        /**
+         * A guard similar to ResultHandler.complete to prevent repeated complete calls from
+         * ill-written AsyncFunction. This flag indicates a retry is in-flight, will reject new
+         * retry request if true. And wil be reset to false after the retry fired.
+         */
+        private final AtomicBoolean retryInFlight = new AtomicBoolean(false);
+
+        public RetryableResultHandlerDelegator(
+                StreamRecord<IN> inputRecord,
+                ResultFuture<OUT> resultFuture,
+                ProcessingTimeService processingTimeService) {
+            this.resultHandler = new ResultHandler(inputRecord, resultFuture);
+            this.processingTimeService = processingTimeService;
+        }
+
+        public void registerTimeout(long timeout) {
+            resultHandler.registerTimeout(processingTimeService, timeout);
+        }
+
+        @Override
+        public void complete(Collection<OUT> results) {
+            Preconditions.checkNotNull(
+                    results, "Results must not be null, use empty collection to emit nothing");
+            if (retryEnabled
+                    && delayedRetryAvailable.get()
+                    && resultHandler.inputRecord.isRecord()) {
+                // ignore repeated call(s)
+                if (!retryInFlight.compareAndSet(false, true)) {
+                    return;
+                }
+
+                processRetryInMailBox(results, null);
+            } else {
+                resultHandler.complete(results);
+            }
+        }
+
+        private void processRetryInMailBox(Collection<OUT> results, Throwable error) {
+            mailboxExecutor.submit(
+                    () -> processRetry(this, results, error), "delayed retry or complete");
+        }
+
+        private void processRetry(
+                RetryableResultHandlerDelegator resultHandlerDelegator,
+                Collection<OUT> results,
+                Throwable error) {
+            boolean satisfy = false;
+            if (null != results && null != retryResultPredicate) {
+                satisfy = (satisfy || retryResultPredicate.test(results));
+            }
+            if (null != error && null != retryExceptionPredicate) {
+                satisfy = (satisfy || retryExceptionPredicate.test(error));
+            }
+            if (satisfy) {
+                if (asyncRetryStrategy.canRetry(currentAttempts)) {
+                    long nextBackoffTimeMillis =
+                            asyncRetryStrategy.getBackoffTimeMillis(currentAttempts);
+                    if (delayedRetryAvailable.get()) {
+                        final long delayedRetry =
+                                nextBackoffTimeMillis
+                                        + getProcessingTimeService().getCurrentProcessingTime();
+
+                        // timer thread will finally dispatch the task to mailbox executor,
+                        // and it can only be submitted once for one attempt.
+                        delayedRetryTimer =
+                                processingTimeService.registerTimer(
+                                        delayedRetry,
+                                        timestamp -> doRetryWithCleanup(resultHandlerDelegator));
+
+                        // add to incomplete retry handlers, will remove it after retry fired
+                        inFlightDelayRetryHandlers.add(resultHandlerDelegator);
+
+                        return;

Review Comment:
   nit: it seems a bit non-intuitive for the return here. Could we remove the nested iteration by 
   
   ```
   if (satisfy
                       && asyncRetryStrategy.canRetry(currentAttempts)
                       && delayedRetryAvailable.get()) {
   ...
   }
   
   ``



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java:
##########
@@ -318,6 +389,143 @@ private void outputCompletedElement() {
         }
     }
 
+    /**
+     * Besides doRetry, the cleanup work will be done after retry fired, includes reset retry
+     * in-flight flag and remove retry handler from the incomplete retry handlers.
+     */
+    private void doRetryWithCleanup(RetryableResultHandlerDelegator resultHandlerDelegator)
+            throws Exception {
+        doRetry(resultHandlerDelegator);
+
+        // reset retryInFlight for next possible retry
+        resultHandlerDelegator.retryInFlight.set(false);
+        // remove from incomplete retry handlers
+        inFlightDelayRetryHandlers.remove(resultHandlerDelegator);
+    }
+
+    /** Increments number of attempts and fire the attempt. */
+    private void doRetry(RetryableResultHandlerDelegator resultHandlerDelegator) throws Exception {
+        // increment current attempt number
+        resultHandlerDelegator.currentAttempts++;
+
+        // fire a new attempt
+        userFunction.asyncInvoke(
+                resultHandlerDelegator.resultHandler.inputRecord.getValue(),
+                resultHandlerDelegator);
+    }
+
+    /** A delegator holds the real {@link ResultHandler} to handle retries. */
+    private class RetryableResultHandlerDelegator implements ResultFuture<OUT> {
+
+        private final ResultHandler resultHandler;
+        private final ProcessingTimeService processingTimeService;
+
+        private ScheduledFuture<?> delayedRetryTimer;
+
+        /** start from 1, when this entry created, the first attempt will happen. */
+        private int currentAttempts = 1;
+
+        /**
+         * A guard similar to ResultHandler.complete to prevent repeated complete calls from
+         * ill-written AsyncFunction. This flag indicates a retry is in-flight, will reject new
+         * retry request if true. And wil be reset to false after the retry fired.
+         */
+        private final AtomicBoolean retryInFlight = new AtomicBoolean(false);
+
+        public RetryableResultHandlerDelegator(
+                StreamRecord<IN> inputRecord,
+                ResultFuture<OUT> resultFuture,
+                ProcessingTimeService processingTimeService) {
+            this.resultHandler = new ResultHandler(inputRecord, resultFuture);
+            this.processingTimeService = processingTimeService;
+        }
+
+        public void registerTimeout(long timeout) {
+            resultHandler.registerTimeout(processingTimeService, timeout);
+        }
+
+        @Override
+        public void complete(Collection<OUT> results) {
+            Preconditions.checkNotNull(
+                    results, "Results must not be null, use empty collection to emit nothing");
+            if (retryEnabled
+                    && delayedRetryAvailable.get()
+                    && resultHandler.inputRecord.isRecord()) {
+                // ignore repeated call(s)
+                if (!retryInFlight.compareAndSet(false, true)) {
+                    return;
+                }
+
+                processRetryInMailBox(results, null);
+            } else {
+                resultHandler.complete(results);
+            }
+        }
+
+        private void processRetryInMailBox(Collection<OUT> results, Throwable error) {
+            mailboxExecutor.submit(
+                    () -> processRetry(this, results, error), "delayed retry or complete");
+        }
+
+        private void processRetry(
+                RetryableResultHandlerDelegator resultHandlerDelegator,
+                Collection<OUT> results,
+                Throwable error) {
+            boolean satisfy = false;
+            if (null != results && null != retryResultPredicate) {
+                satisfy = (satisfy || retryResultPredicate.test(results));
+            }
+            if (null != error && null != retryExceptionPredicate) {
+                satisfy = (satisfy || retryExceptionPredicate.test(error));
+            }
+            if (satisfy) {
+                if (asyncRetryStrategy.canRetry(currentAttempts)) {
+                    long nextBackoffTimeMillis =
+                            asyncRetryStrategy.getBackoffTimeMillis(currentAttempts);
+                    if (delayedRetryAvailable.get()) {
+                        final long delayedRetry =
+                                nextBackoffTimeMillis
+                                        + getProcessingTimeService().getCurrentProcessingTime();
+
+                        // timer thread will finally dispatch the task to mailbox executor,
+                        // and it can only be submitted once for one attempt.
+                        delayedRetryTimer =
+                                processingTimeService.registerTimer(
+                                        delayedRetry,
+                                        timestamp -> doRetryWithCleanup(resultHandlerDelegator));
+
+                        // add to incomplete retry handlers, will remove it after retry fired

Review Comment:
   Is it necessary to remove / add it each time? Perhaps we could only add it once on the first time and remove it if not retry? 
   
   Also it seems we indeed do not remove the handle if the retry stopped now?



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java:
##########
@@ -318,6 +389,143 @@ private void outputCompletedElement() {
         }
     }
 
+    /**
+     * Besides doRetry, the cleanup work will be done after retry fired, includes reset retry
+     * in-flight flag and remove retry handler from the incomplete retry handlers.
+     */
+    private void doRetryWithCleanup(RetryableResultHandlerDelegator resultHandlerDelegator)
+            throws Exception {
+        doRetry(resultHandlerDelegator);
+
+        // reset retryInFlight for next possible retry
+        resultHandlerDelegator.retryInFlight.set(false);
+        // remove from incomplete retry handlers
+        inFlightDelayRetryHandlers.remove(resultHandlerDelegator);
+    }
+
+    /** Increments number of attempts and fire the attempt. */
+    private void doRetry(RetryableResultHandlerDelegator resultHandlerDelegator) throws Exception {
+        // increment current attempt number
+        resultHandlerDelegator.currentAttempts++;
+
+        // fire a new attempt
+        userFunction.asyncInvoke(
+                resultHandlerDelegator.resultHandler.inputRecord.getValue(),
+                resultHandlerDelegator);
+    }
+
+    /** A delegator holds the real {@link ResultHandler} to handle retries. */
+    private class RetryableResultHandlerDelegator implements ResultFuture<OUT> {
+
+        private final ResultHandler resultHandler;
+        private final ProcessingTimeService processingTimeService;
+
+        private ScheduledFuture<?> delayedRetryTimer;
+
+        /** start from 1, when this entry created, the first attempt will happen. */
+        private int currentAttempts = 1;
+
+        /**
+         * A guard similar to ResultHandler.complete to prevent repeated complete calls from
+         * ill-written AsyncFunction. This flag indicates a retry is in-flight, will reject new
+         * retry request if true. And wil be reset to false after the retry fired.
+         */
+        private final AtomicBoolean retryInFlight = new AtomicBoolean(false);
+
+        public RetryableResultHandlerDelegator(
+                StreamRecord<IN> inputRecord,
+                ResultFuture<OUT> resultFuture,
+                ProcessingTimeService processingTimeService) {
+            this.resultHandler = new ResultHandler(inputRecord, resultFuture);
+            this.processingTimeService = processingTimeService;
+        }
+
+        public void registerTimeout(long timeout) {
+            resultHandler.registerTimeout(processingTimeService, timeout);
+        }
+
+        @Override
+        public void complete(Collection<OUT> results) {
+            Preconditions.checkNotNull(
+                    results, "Results must not be null, use empty collection to emit nothing");
+            if (retryEnabled
+                    && delayedRetryAvailable.get()
+                    && resultHandler.inputRecord.isRecord()) {
+                // ignore repeated call(s)
+                if (!retryInFlight.compareAndSet(false, true)) {

Review Comment:
   It seems a bit non-intuitive here since when completed `retryInFlight` seems should be `false`. Could we rename the variable to something like `lastRetryCompleted` ?



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java:
##########
@@ -197,11 +248,12 @@ public void processElement(StreamRecord<IN> record) throws Exception {
         // add element first to the queue
         final ResultFuture<OUT> entry = addToWorkQueue(element);
 
-        final ResultHandler resultHandler = new ResultHandler(element, entry);
+        final RetryableResultHandlerDelegator resultHandler =

Review Comment:
   I still think we'd better distinguish the two cases since it is in critical path (record level operations). For each record, we'll pay the cost of method calls and atomic variables reads, thus I think we should still use different handles.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java:
##########
@@ -109,11 +130,18 @@
     /** Whether object reuse has been enabled or disabled. */
     private transient boolean isObjectReuseEnabled;
 
+    private transient Predicate<Collection<OUT>> retryResultPredicate;
+
+    private transient Predicate<Throwable> retryExceptionPredicate;
+
+    private transient AtomicBoolean delayedRetryAvailable;

Review Comment:
   Might add some comment for this flag?
   
   Also, if possible I'm a bit tend to rename the variable to `retryDislabledOnFinish` and reverse the judgement to make it more visible. 



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java:
##########
@@ -318,6 +389,143 @@ private void outputCompletedElement() {
         }
     }
 
+    /**
+     * Besides doRetry, the cleanup work will be done after retry fired, includes reset retry
+     * in-flight flag and remove retry handler from the incomplete retry handlers.
+     */
+    private void doRetryWithCleanup(RetryableResultHandlerDelegator resultHandlerDelegator)

Review Comment:
   This could be moved into the `RetryableResultHandlerDelegator`. 



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java:
##########
@@ -318,6 +389,143 @@ private void outputCompletedElement() {
         }
     }
 
+    /**
+     * Besides doRetry, the cleanup work will be done after retry fired, includes reset retry
+     * in-flight flag and remove retry handler from the incomplete retry handlers.
+     */
+    private void doRetryWithCleanup(RetryableResultHandlerDelegator resultHandlerDelegator)
+            throws Exception {
+        doRetry(resultHandlerDelegator);
+
+        // reset retryInFlight for next possible retry
+        resultHandlerDelegator.retryInFlight.set(false);
+        // remove from incomplete retry handlers
+        inFlightDelayRetryHandlers.remove(resultHandlerDelegator);
+    }
+
+    /** Increments number of attempts and fire the attempt. */
+    private void doRetry(RetryableResultHandlerDelegator resultHandlerDelegator) throws Exception {
+        // increment current attempt number
+        resultHandlerDelegator.currentAttempts++;
+
+        // fire a new attempt
+        userFunction.asyncInvoke(
+                resultHandlerDelegator.resultHandler.inputRecord.getValue(),
+                resultHandlerDelegator);
+    }
+
+    /** A delegator holds the real {@link ResultHandler} to handle retries. */
+    private class RetryableResultHandlerDelegator implements ResultFuture<OUT> {
+
+        private final ResultHandler resultHandler;
+        private final ProcessingTimeService processingTimeService;
+
+        private ScheduledFuture<?> delayedRetryTimer;
+
+        /** start from 1, when this entry created, the first attempt will happen. */
+        private int currentAttempts = 1;
+
+        /**
+         * A guard similar to ResultHandler.complete to prevent repeated complete calls from
+         * ill-written AsyncFunction. This flag indicates a retry is in-flight, will reject new
+         * retry request if true. And wil be reset to false after the retry fired.
+         */
+        private final AtomicBoolean retryInFlight = new AtomicBoolean(false);
+
+        public RetryableResultHandlerDelegator(
+                StreamRecord<IN> inputRecord,
+                ResultFuture<OUT> resultFuture,
+                ProcessingTimeService processingTimeService) {
+            this.resultHandler = new ResultHandler(inputRecord, resultFuture);
+            this.processingTimeService = processingTimeService;
+        }
+
+        public void registerTimeout(long timeout) {
+            resultHandler.registerTimeout(processingTimeService, timeout);
+        }
+
+        @Override
+        public void complete(Collection<OUT> results) {
+            Preconditions.checkNotNull(
+                    results, "Results must not be null, use empty collection to emit nothing");
+            if (retryEnabled
+                    && delayedRetryAvailable.get()
+                    && resultHandler.inputRecord.isRecord()) {
+                // ignore repeated call(s)
+                if (!retryInFlight.compareAndSet(false, true)) {
+                    return;
+                }
+
+                processRetryInMailBox(results, null);
+            } else {
+                resultHandler.complete(results);
+            }
+        }
+
+        private void processRetryInMailBox(Collection<OUT> results, Throwable error) {
+            mailboxExecutor.submit(
+                    () -> processRetry(this, results, error), "delayed retry or complete");

Review Comment:
   It is not necessary to pass `this` as a separate parameter. The mix of accessing properties with `this` and `resultHandlerDelegator` in `processRetry` would complex the implementation. 



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java:
##########
@@ -318,6 +389,143 @@ private void outputCompletedElement() {
         }
     }
 
+    /**
+     * Besides doRetry, the cleanup work will be done after retry fired, includes reset retry
+     * in-flight flag and remove retry handler from the incomplete retry handlers.
+     */
+    private void doRetryWithCleanup(RetryableResultHandlerDelegator resultHandlerDelegator)
+            throws Exception {
+        doRetry(resultHandlerDelegator);
+
+        // reset retryInFlight for next possible retry
+        resultHandlerDelegator.retryInFlight.set(false);
+        // remove from incomplete retry handlers
+        inFlightDelayRetryHandlers.remove(resultHandlerDelegator);
+    }
+
+    /** Increments number of attempts and fire the attempt. */
+    private void doRetry(RetryableResultHandlerDelegator resultHandlerDelegator) throws Exception {
+        // increment current attempt number
+        resultHandlerDelegator.currentAttempts++;
+
+        // fire a new attempt
+        userFunction.asyncInvoke(
+                resultHandlerDelegator.resultHandler.inputRecord.getValue(),
+                resultHandlerDelegator);
+    }
+
+    /** A delegator holds the real {@link ResultHandler} to handle retries. */
+    private class RetryableResultHandlerDelegator implements ResultFuture<OUT> {
+
+        private final ResultHandler resultHandler;
+        private final ProcessingTimeService processingTimeService;
+
+        private ScheduledFuture<?> delayedRetryTimer;
+
+        /** start from 1, when this entry created, the first attempt will happen. */
+        private int currentAttempts = 1;
+
+        /**
+         * A guard similar to ResultHandler.complete to prevent repeated complete calls from
+         * ill-written AsyncFunction. This flag indicates a retry is in-flight, will reject new
+         * retry request if true. And wil be reset to false after the retry fired.
+         */
+        private final AtomicBoolean retryInFlight = new AtomicBoolean(false);
+
+        public RetryableResultHandlerDelegator(
+                StreamRecord<IN> inputRecord,
+                ResultFuture<OUT> resultFuture,
+                ProcessingTimeService processingTimeService) {
+            this.resultHandler = new ResultHandler(inputRecord, resultFuture);
+            this.processingTimeService = processingTimeService;
+        }
+
+        public void registerTimeout(long timeout) {
+            resultHandler.registerTimeout(processingTimeService, timeout);
+        }
+
+        @Override
+        public void complete(Collection<OUT> results) {
+            Preconditions.checkNotNull(
+                    results, "Results must not be null, use empty collection to emit nothing");
+            if (retryEnabled
+                    && delayedRetryAvailable.get()
+                    && resultHandler.inputRecord.isRecord()) {
+                // ignore repeated call(s)
+                if (!retryInFlight.compareAndSet(false, true)) {
+                    return;
+                }
+
+                processRetryInMailBox(results, null);
+            } else {
+                resultHandler.complete(results);
+            }
+        }
+
+        private void processRetryInMailBox(Collection<OUT> results, Throwable error) {
+            mailboxExecutor.submit(
+                    () -> processRetry(this, results, error), "delayed retry or complete");
+        }
+
+        private void processRetry(
+                RetryableResultHandlerDelegator resultHandlerDelegator,
+                Collection<OUT> results,
+                Throwable error) {
+            boolean satisfy = false;
+            if (null != results && null != retryResultPredicate) {
+                satisfy = (satisfy || retryResultPredicate.test(results));
+            }
+            if (null != error && null != retryExceptionPredicate) {
+                satisfy = (satisfy || retryExceptionPredicate.test(error));
+            }
+            if (satisfy) {
+                if (asyncRetryStrategy.canRetry(currentAttempts)) {
+                    long nextBackoffTimeMillis =
+                            asyncRetryStrategy.getBackoffTimeMillis(currentAttempts);
+                    if (delayedRetryAvailable.get()) {
+                        final long delayedRetry =
+                                nextBackoffTimeMillis
+                                        + getProcessingTimeService().getCurrentProcessingTime();
+
+                        // timer thread will finally dispatch the task to mailbox executor,
+                        // and it can only be submitted once for one attempt.
+                        delayedRetryTimer =
+                                processingTimeService.registerTimer(
+                                        delayedRetry,
+                                        timestamp -> doRetryWithCleanup(resultHandlerDelegator));
+
+                        // add to incomplete retry handlers, will remove it after retry fired
+                        inFlightDelayRetryHandlers.add(resultHandlerDelegator);
+
+                        return;
+                    }
+                }
+            }
+            // retry unsatisfied, complete it
+            if (null != results) {
+                resultHandlerDelegator.resultHandler.complete(results);
+            } else {
+                resultHandlerDelegator.resultHandler.completeExceptionally(error);
+            }
+        }
+
+        @Override
+        public void completeExceptionally(Throwable error) {

Review Comment:
   nit: moved to right after method complete(...) ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org