You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2021/02/15 19:21:35 UTC

[flink] branch master updated: [FLINK-21365][runtime] Document and make the contract of FutureUtils.ResultConjunctFuture more explicit

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new fa401b2  [FLINK-21365][runtime] Document and make the contract of FutureUtils.ResultConjunctFuture more explicit
fa401b2 is described below

commit fa401b2b2e859228ef60469266588852c7c89f0f
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Mon Feb 15 13:38:22 2021 +0100

    [FLINK-21365][runtime] Document and make the contract of FutureUtils.ResultConjunctFuture more explicit
    
    Add an explanation why is the current solution working and also
    clean it up a little bit (dropping unecessary volatile and adding final keyword).
---
 .../flink/runtime/concurrent/FutureUtils.java      | 22 +++++++++++++++++++++-
 1 file changed, 21 insertions(+), 1 deletion(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
index 49a628a..629c144 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
@@ -798,7 +798,7 @@ public class FutureUtils {
         private final AtomicInteger numCompleted = new AtomicInteger(0);
 
         /** The set of collected results so far. */
-        private volatile T[] results;
+        private final T[] results;
 
         /**
          * The function that is attached to all futures in the conjunction. Once a future is
@@ -808,6 +808,26 @@ public class FutureUtils {
             if (throwable != null) {
                 completeExceptionally(throwable);
             } else {
+                /**
+                 * This {@link #results} update itself is not synchronised in any way and it's fine
+                 * because:
+                 *
+                 * <ul>
+                 *   <li>There is a happens-before relationship for each thread (that is completing
+                 *       the future) between setting {@link #results} and incrementing {@link
+                 *       #numCompleted}.
+                 *   <li>Each thread is updating uniquely different field of the {@link #results}
+                 *       array.
+                 *   <li>There is a happens-before relationship between all of the writing threads
+                 *       and the last one thread (thanks to the {@code
+                 *       numCompleted.incrementAndGet() == numTotal} check.
+                 *   <li>The last thread will be completing the future, so it has transitively
+                 *       happens-before relationship with all of preceding updated/writes to {@link
+                 *       #results}.
+                 *   <li>{@link AtomicInteger#incrementAndGet} is an equivalent of both volatile
+                 *       read & write
+                 * </ul>
+                 */
                 results[index] = value;
 
                 if (numCompleted.incrementAndGet() == numTotal) {