You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2021/02/15 19:21:35 UTC
[flink] branch master updated: [FLINK-21365][runtime] Document and
make the contract of FutureUtils.ResultConjunctFuture more explicit
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new fa401b2 [FLINK-21365][runtime] Document and make the contract of FutureUtils.ResultConjunctFuture more explicit
fa401b2 is described below
commit fa401b2b2e859228ef60469266588852c7c89f0f
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Mon Feb 15 13:38:22 2021 +0100
[FLINK-21365][runtime] Document and make the contract of FutureUtils.ResultConjunctFuture more explicit
Add an explanation why is the current solution working and also
clean it up a little bit (dropping unecessary volatile and adding final keyword).
---
.../flink/runtime/concurrent/FutureUtils.java | 22 +++++++++++++++++++++-
1 file changed, 21 insertions(+), 1 deletion(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
index 49a628a..629c144 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
@@ -798,7 +798,7 @@ public class FutureUtils {
private final AtomicInteger numCompleted = new AtomicInteger(0);
/** The set of collected results so far. */
- private volatile T[] results;
+ private final T[] results;
/**
* The function that is attached to all futures in the conjunction. Once a future is
@@ -808,6 +808,26 @@ public class FutureUtils {
if (throwable != null) {
completeExceptionally(throwable);
} else {
+ /**
+ * This {@link #results} update itself is not synchronised in any way and it's fine
+ * because:
+ *
+ * <ul>
+ * <li>There is a happens-before relationship for each thread (that is completing
+ * the future) between setting {@link #results} and incrementing {@link
+ * #numCompleted}.
+ * <li>Each thread is updating uniquely different field of the {@link #results}
+ * array.
+ * <li>There is a happens-before relationship between all of the writing threads
+ * and the last one thread (thanks to the {@code
+ * numCompleted.incrementAndGet() == numTotal} check.
+ * <li>The last thread will be completing the future, so it has transitively
+ * happens-before relationship with all of preceding updated/writes to {@link
+ * #results}.
+ * <li>{@link AtomicInteger#incrementAndGet} is an equivalent of both volatile
+ * read & write
+ * </ul>
+ */
results[index] = value;
if (numCompleted.incrementAndGet() == numTotal) {