You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sd...@apache.org on 2022/10/12 07:54:14 UTC
[ignite-3] branch main updated: IGNITE-17249 DefaultMessagingService incorrect message sending order (#1169)
This is an automated email from the ASF dual-hosted git repository.
sdanilov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 70db5f06d0 IGNITE-17249 DefaultMessagingService incorrect message sending order (#1169)
70db5f06d0 is described below
commit 70db5f06d0b6b085da8f3264ad4fa77dd5416882
Author: Roman Puchkovskiy <ro...@gmail.com>
AuthorDate: Wed Oct 12 11:54:09 2022 +0400
IGNITE-17249 DefaultMessagingService incorrect message sending order (#1169)
---
.../ignite/internal/future/OrderingFuture.java | 487 +++++++++++++++++
.../OrderingFutureCallbackInterferenceTest.java | 79 +++
.../future/OrderingFutureCallbackOrderingTest.java | 134 +++++
.../future/OrderingFutureConcurrencyTest.java | 204 ++++++++
.../ignite/internal/future/OrderingFutureTest.java | 573 +++++++++++++++++++++
.../apache/ignite/network/MessagingService.java | 10 +-
.../internal/network/netty/ConnectionManager.java | 63 ++-
.../ignite/internal/network/netty/NettyClient.java | 32 +-
.../ignite/internal/network/netty/NettyUtils.java | 38 +-
.../RecoveryClientHandhakeManagerFactory.java | 41 ++
.../recovery/RecoveryClientHandshakeManager.java | 16 +-
.../ignite/network/DefaultMessagingService.java | 12 +-
.../ignite/network/NettyBootstrapFactory.java | 5 +-
.../scalecube/ScaleCubeClusterServiceFactory.java | 7 +-
.../internal/network/netty/NettyClientTest.java | 20 +-
.../network/netty/RecoveryHandshakeTest.java | 2 +-
.../network/DefaultMessagingServiceTest.java | 238 +++++++++
17 files changed, 1894 insertions(+), 67 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/future/OrderingFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/future/OrderingFuture.java
new file mode 100644
index 0000000000..9bf8deba33
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/future/OrderingFuture.java
@@ -0,0 +1,487 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.future;
+
+import static java.util.concurrent.atomic.AtomicReferenceFieldUpdater.newUpdater;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * A little analogue of {@link CompletableFuture} that has the following property: callbacks (like {@link #whenComplete(BiConsumer)}
+ * and {@link #thenComposeToCompletable(Function)}) are invoked in the same order in which they were registered.
+ *
+ * @param <T> Type of payload.
+ * @see CompletableFuture
+ */
+public class OrderingFuture<T> {
+ @SuppressWarnings("rawtypes")
+ private static final AtomicReferenceFieldUpdater<OrderingFuture, State> STATE = newUpdater(OrderingFuture.class, State.class, "state");
+
+ /**
+ * Stores all the state of this future: whether it is completed, normal completion result (if any), cause
+ * of exceptional completion (if any), dependents. The State class and all of its components are immutable.
+ * We change the state using compare-and-set approach, next state is built from previous one.
+ */
+ private volatile State<T> state = State.empty();
+
+ /**
+ * Used to make sure that at most one thread executes completion code.
+ */
+ private final AtomicBoolean completionStarted = new AtomicBoolean(false);
+
+ /**
+ * Used by {@link #get(long, TimeUnit)} to wait for completion.
+ */
+ private final CountDownLatch completionLatch = new CountDownLatch(1);
+
+ /**
+ * Creates an incomplete future.
+ */
+ public OrderingFuture() {
+ }
+
+ /**
+ * Creates a future that is alredy completed with the given value.
+ *
+ * @param result Value with which the future is completed.
+ * @param <T> Payload type.
+ * @return Completed future.
+ */
+ public static <T> OrderingFuture<T> completedFuture(@Nullable T result) {
+ var future = new OrderingFuture<T>();
+ future.complete(result);
+ return future;
+ }
+
+ /**
+ * Creates a future that is alredy completed exceptionally (i.e. failed) with the given exception.
+ *
+ * @param ex Exception with which the future is failed.
+ * @param <T> Payload type.
+ * @return Failed future.
+ */
+ public static <T> OrderingFuture<T> failedFuture(Throwable ex) {
+ var future = new OrderingFuture<T>();
+ future.completeExceptionally(ex);
+ return future;
+ }
+
+ /**
+ * Adapts a {@link CompletableFuture}. That is, creates an {@link OrderingFuture} that gets completed when the
+ * original future is completed (and in the same way in which it gets completed).
+ *
+ * @param adaptee Future to adapt.
+ * @param <T> Payload type.
+ * @return Adapting future.
+ */
+ public static <T> OrderingFuture<T> adapt(CompletableFuture<T> adaptee) {
+ var future = new OrderingFuture<T>();
+
+ adaptee.whenComplete((res, ex) -> {
+ if (ex != null) {
+ future.completeExceptionally(ex);
+ } else {
+ future.complete(res);
+ }
+ });
+
+ return future;
+ }
+
+ /**
+ * Completes this future with the given result if it's not completed yet; otherwise has no effect.
+ *
+ * @param result Completion value (may be {@code null}).
+ */
+ public void complete(@Nullable T result) {
+ completeInternal(result, null);
+ }
+
+ /**
+ * Completes this future exceptionally with the given exception if it's not completed yet; otherwise has no effect.
+ *
+ * @param ex Exception.
+ */
+ public void completeExceptionally(Throwable ex) {
+ completeInternal(null, ex);
+ }
+
+ private void completeInternal(@Nullable T result, @Nullable Throwable ex) {
+ assert ex == null || result == null;
+
+ if (!completionStarted.compareAndSet(false, true)) {
+ // Someone has already started the completion. We must leave as the following code can produce duplicate
+ // notifications of dependents if executed by more than one thread.
+ return;
+ }
+
+ State<T> prevState;
+ ListNode<T> lastNotifiedNode = null;
+
+ while (true) {
+ prevState = state;
+
+ if (state.completed) {
+ return;
+ }
+
+ State<T> newState = new State<>(true, result, ex, null);
+
+ // We produce side-effects inside the retry loop, but it's ok as the queue can only grow, the queue
+ // state we see is always a prefix of a queue changed by a competitor (we only compete with operations
+ // that enqueue elements to the queue as competition with other completers is ruled out with AtomicBoolean)
+ // and we track what dependents have already been notified by us.
+ notifyDependents(result, ex, prevState.dependentsQueueTail, lastNotifiedNode);
+ lastNotifiedNode = prevState.dependentsQueueTail;
+
+ if (replaceState(prevState, newState)) {
+ break;
+ }
+ }
+
+ completionLatch.countDown();
+ }
+
+ /**
+ * Replaces state with compare-and-set semantics.
+ *
+ * @param prevState State that we expect to see.
+ * @param newState New state we want to set.
+ * @return {@code true} if CAS was successful.
+ */
+ private boolean replaceState(State<T> prevState, State<T> newState) {
+ return STATE.compareAndSet(this, prevState, newState);
+ }
+
+ /**
+ * Notifies dependents about completion of this future. Does NOT notify notifiedDependents closest to the head of the queue.
+ *
+ * @param result Normal completion result.
+ * @param ex Exceptional completion cause.
+ * @param dependents Dependents queue.
+ * @param lastNotifiedNode Node that was notified last on preceding iterations of while loop.
+ */
+ private void notifyDependents(
+ @Nullable T result,
+ @Nullable Throwable ex,
+ @Nullable ListNode<T> dependents,
+ ListNode<T> lastNotifiedNode
+ ) {
+ if (dependents != null) {
+ dependents.notifyHeadToTail(result, ex, lastNotifiedNode);
+ }
+ }
+
+ /**
+ * Returns {@code true} if this future is completed exceptionally, {@code false} if completed normally or not completed.
+ *
+ * @return {@code true} if this future is completed exceptionally, {@code false} if completed normally or not completed
+ */
+ public boolean isCompletedExceptionally() {
+ return state.exception != null;
+ }
+
+ /**
+ * Adds a callback that gets executed as soon as this future gets completed for any reason. The action will get both result
+ * and exception; if the completion is normal, exception will be {@code null}, otherwise result will be {@code null}.
+ * If it's already complete, the action is executed immediately.
+ * Any exception produced by the action is swallowed.
+ *
+ * @param action Action to execute.
+ */
+ public void whenComplete(BiConsumer<? super T, ? super Throwable> action) {
+ WhenComplete<T> dependent = null;
+
+ while (true) {
+ State<T> prevState = state;
+
+ if (prevState.completed) {
+ acceptQuietly(action, prevState.result, prevState.exception);
+ return;
+ }
+
+ if (dependent == null) {
+ dependent = new WhenComplete<>(action);
+ }
+ State<T> newState = prevState.enqueueDependent(dependent);
+
+ if (replaceState(prevState, newState)) {
+ return;
+ }
+ }
+ }
+
+ private static <T> void acceptQuietly(BiConsumer<? super T, ? super Throwable> action, T result, Throwable ex) {
+ try {
+ action.accept(result, ex);
+ } catch (Exception e) {
+ // ignore
+ }
+ }
+
+ /**
+ * Creates a composition of this future with a function producing a {@link CompletableFuture}.
+ *
+ * @param mapper Mapper used to produce a {@link CompletableFuture} from this future result.
+ * @param <U> Result future payload type.
+ * @return Composition.
+ * @see CompletableFuture#thenCompose(Function)
+ */
+ public <U> CompletableFuture<U> thenComposeToCompletable(Function<? super T, ? extends CompletableFuture<U>> mapper) {
+ ThenComposeToCompletable<T, U> dependent = null;
+
+ while (true) {
+ State<T> prevState = state;
+
+ if (prevState.completed) {
+ if (prevState.exception != null) {
+ return CompletableFuture.failedFuture(wrapWithCompletionException(prevState.exception));
+ } else {
+ return applyMapper(mapper, prevState.result);
+ }
+ }
+
+ if (dependent == null) {
+ dependent = new ThenComposeToCompletable<>(new CompletableFuture<>(), mapper);
+ }
+ State<T> newState = prevState.enqueueDependent(dependent);
+
+ if (replaceState(prevState, newState)) {
+ return dependent.resultFuture;
+ }
+ }
+ }
+
+ private static CompletionException wrapWithCompletionException(Throwable ex) {
+ return ex instanceof CompletionException ? (CompletionException) ex : new CompletionException(ex);
+ }
+
+ private static <T, U> CompletableFuture<U> applyMapper(Function<? super T, ? extends CompletableFuture<U>> mapper, T result) {
+ try {
+ return mapper.apply(result);
+ } catch (Throwable e) {
+ return CompletableFuture.failedFuture(e);
+ }
+ }
+
+ /**
+ * Returns the completion value, (if the future is completed normally), throws completion cause wrapped in
+ * {@link CompletionException} (if the future is completed exceptionally), or returns the provided default value
+ * if the future is not completed yet.
+ *
+ * @param valueIfAbsent Value to return if the future is not completed yet.
+ * @return Completion value or default value.
+ * @see CompletableFuture#getNow(Object)
+ */
+ public T getNow(T valueIfAbsent) {
+ State<T> currentState = state;
+
+ if (currentState.completed) {
+ if (currentState.exception != null) {
+ throw wrapWithCompletionException(currentState.exception);
+ } else {
+ return currentState.result;
+ }
+ } else {
+ return valueIfAbsent;
+ }
+ }
+
+ /**
+ * Returns completion value or throws completion exception (wrapped in {@link ExecutionException}), waiting for
+ * completion up to the specified amount of time, if not completed yet. If the time runs out while waiting,
+ * throws {@link TimeoutException}.
+ *
+ * @param timeout Maximum amount of time to wait.
+ * @param unit Unit of time in which the timeout is given.
+ * @return Completion value.
+ * @throws InterruptedException Thrown if the current thread gets interrupted while waiting for completion.
+ * @throws TimeoutException Thrown if the wait for completion times out.
+ * @throws ExecutionException Thrown (with the original exception as a cause) if the future completes exceptionally.
+ * @see CompletableFuture#get(long, TimeUnit)
+ */
+ public T get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException, ExecutionException {
+ boolean completedInTime = completionLatch.await(timeout, unit);
+ if (!completedInTime) {
+ throw new TimeoutException();
+ }
+
+ State<T> currentState = state;
+
+ if (currentState.exception instanceof CancellationException) {
+ throw (CancellationException) currentState.exception;
+ } else if (currentState.exception != null) {
+ throw exceptionForThrowingFromGet(currentState);
+ } else {
+ return currentState.result;
+ }
+ }
+
+ private ExecutionException exceptionForThrowingFromGet(State<T> currentState) {
+ Throwable unwrapped = currentState.exception;
+ Throwable cause = unwrapped.getCause();
+ if (cause != null) {
+ unwrapped = cause;
+ }
+
+ return new ExecutionException(unwrapped);
+ }
+
+ /**
+ * Returns a {@link CompletableFuture} that gets completed when this future gets completed (and in the same way).
+ * The returned future does not provide any ordering guarantees that this future provides.
+ *
+ * @return An equivalent {@link CompletableFuture}.
+ */
+ public CompletableFuture<T> toCompletableFuture() {
+ CompletableFuture<T> completableFuture = new CompletableFuture<>();
+
+ this.whenComplete((res, ex) -> completeCompletableFuture(completableFuture, res, ex));
+
+ return completableFuture;
+ }
+
+ private static <T> void completeCompletableFuture(CompletableFuture<T> future, T result, Throwable ex) {
+ if (ex != null) {
+ future.completeExceptionally(ex);
+ } else {
+ future.complete(result);
+ }
+ }
+
+ /**
+ * Dependent action that gets notified when this future is completed.
+ *
+ * @param <T> Payload type.
+ */
+ private interface DependentAction<T> {
+ /**
+ * Informs that dependent that the host future is completed.
+ *
+ * @param result Normal completion result ({@code null} if completed exceptionally, but might be {@code null} for normal completion.
+ * @param ex Exceptional completion cause ({@code null} if completed normally).
+ */
+ void onCompletion(T result, Throwable ex);
+ }
+
+ private static class WhenComplete<T> implements DependentAction<T> {
+ private final BiConsumer<? super T, ? super Throwable> action;
+
+ private WhenComplete(BiConsumer<? super T, ? super Throwable> action) {
+ this.action = action;
+ }
+
+ @Override
+ public void onCompletion(T result, Throwable ex) {
+ acceptQuietly(action, result, ex);
+ }
+ }
+
+ private static class ThenComposeToCompletable<T, U> implements DependentAction<T> {
+ private final CompletableFuture<U> resultFuture;
+ private final Function<? super T, ? extends CompletableFuture<U>> mapper;
+
+ private ThenComposeToCompletable(CompletableFuture<U> resultFuture, Function<? super T, ? extends CompletableFuture<U>> mapper) {
+ this.resultFuture = resultFuture;
+ this.mapper = mapper;
+ }
+
+ @Override
+ public void onCompletion(T result, Throwable ex) {
+ if (ex != null) {
+ resultFuture.completeExceptionally(wrapWithCompletionException(ex));
+ return;
+ }
+
+ try {
+ CompletableFuture<U> mapResult = mapper.apply(result);
+
+ mapResult.whenComplete((mapRes, mapEx) -> completeCompletableFuture(resultFuture, mapRes, mapEx));
+ } catch (Throwable e) {
+ resultFuture.completeExceptionally(e);
+ }
+ }
+ }
+
+ private static class State<T> {
+ private static final State<?> EMPTY_STATE = new State<>(false, null, null, null);
+
+ private final boolean completed;
+ private final T result;
+ private final Throwable exception;
+ private final ListNode<T> dependentsQueueTail;
+
+ private State(boolean completed, T result, Throwable exception, ListNode<T> dependentsQueueTail) {
+ this.completed = completed;
+ this.result = result;
+ this.exception = exception;
+ this.dependentsQueueTail = dependentsQueueTail;
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <T> State<T> empty() {
+ return (State<T>) EMPTY_STATE;
+ }
+
+ public State<T> enqueueDependent(DependentAction<T> dependent) {
+ return new State<>(completed, result, exception, new ListNode<>(dependent, dependentsQueueTail));
+ }
+ }
+
+ private static class ListNode<T> {
+ private final DependentAction<T> dependent;
+ private final ListNode<T> next;
+
+ private ListNode(DependentAction<T> dependent, ListNode<T> next) {
+ this.dependent = dependent;
+ this.next = next;
+ }
+
+ public void notifyHeadToTail(T result, Throwable exception, ListNode<T> lastNotifiedNode) {
+ Deque<ListNode<T>> stack = new ArrayDeque<>();
+
+ for (ListNode<T> node = this; node != null && node != lastNotifiedNode; node = node.next) {
+ stack.addFirst(node);
+ }
+
+ // Notify those dependents that are not notified yet.
+ while (!stack.isEmpty()) {
+ ListNode<T> node = stack.removeFirst();
+
+ try {
+ node.dependent.onCompletion(result, exception);
+ } catch (Exception e) {
+ // ignore
+ }
+ }
+ }
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/future/OrderingFutureCallbackInterferenceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/future/OrderingFutureCallbackInterferenceTest.java
new file mode 100644
index 0000000000..4749dba8d8
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/future/OrderingFutureCallbackInterferenceTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.future;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests making sure that, when one callback throws an exception, its fellows still get a chance to be completed.
+ */
+class OrderingFutureCallbackInterferenceTest {
+ private final RuntimeException cause = new RuntimeException("Oops");
+
+ @Test
+ void composeToCompletableDoesNotInterfereWithEachOther() {
+ OrderingFuture<Integer> future = new OrderingFuture<>();
+
+ List<Integer> order = new CopyOnWriteArrayList<>();
+
+ future.thenComposeToCompletable(x -> {
+ throw cause;
+ });
+ future.thenComposeToCompletable(x -> {
+ order.add(1);
+ return CompletableFuture.completedFuture(null);
+ });
+ future.thenComposeToCompletable(x -> {
+ throw cause;
+ });
+ future.thenComposeToCompletable(x -> {
+ order.add(2);
+ return CompletableFuture.completedFuture(null);
+ });
+
+ future.complete(1);
+
+ assertThat(order, contains(1, 2));
+ }
+
+ @Test
+ void whenCompleteDoesNotInterfereWithEachOther() {
+ OrderingFuture<Integer> future = new OrderingFuture<>();
+
+ List<Integer> order = new CopyOnWriteArrayList<>();
+
+ future.whenComplete((res, ex) -> {
+ throw new RuntimeException("one");
+ });
+ future.whenComplete((res, ex) -> order.add(1));
+ future.whenComplete((res, ex) -> {
+ throw new RuntimeException("three");
+ });
+ future.whenComplete((res, ex) -> order.add(2));
+
+ future.complete(1);
+
+ assertThat(order, contains(1, 2));
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/future/OrderingFutureCallbackOrderingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/future/OrderingFutureCallbackOrderingTest.java
new file mode 100644
index 0000000000..c1ee16ead5
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/future/OrderingFutureCallbackOrderingTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.future;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests making sure that callbacks are ordered correctly.
+ */
+class OrderingFutureCallbackOrderingTest {
+ private final RuntimeException cause = new RuntimeException("Oops");
+
+ @Test
+ void composeToCompletableMaintainsCallbacksOrderOnCompletedFuture() {
+ OrderingFuture<Integer> future = OrderingFuture.completedFuture(42);
+ List<Integer> order = new CopyOnWriteArrayList<>();
+
+ future.thenComposeToCompletable(x -> {
+ order.add(1);
+ return CompletableFuture.completedFuture(null);
+ });
+ future.thenComposeToCompletable(x -> {
+ order.add(2);
+ return CompletableFuture.completedFuture(null);
+ });
+
+ assertThat(order, contains(1, 2));
+ }
+
+ @Test
+ void composeToCompletableMaintainsCallbacksOrderOnCompletion() {
+ OrderingFuture<Integer> future = new OrderingFuture<>();
+ List<Integer> order = new CopyOnWriteArrayList<>();
+
+ future.thenComposeToCompletable(x -> {
+ order.add(1);
+ return CompletableFuture.completedFuture(null);
+ });
+ future.thenComposeToCompletable(x -> {
+ order.add(2);
+ return CompletableFuture.completedFuture(null);
+ });
+
+ future.complete(42);
+
+ assertThat(order, contains(1, 2));
+ }
+
+ @Test
+ void whenCompleteMaintainsCallbacksOrderOnCompletedFuture() {
+ OrderingFuture<Integer> future = OrderingFuture.completedFuture(42);
+ List<Integer> order = new CopyOnWriteArrayList<>();
+
+ future.whenComplete((res, ex) -> order.add(1));
+ future.whenComplete((res, ex) -> order.add(2));
+
+ assertThat(order, contains(1, 2));
+ }
+
+ @Test
+ void whenCompleteMaintainsCallbacksOrderOnFailedFuture() {
+ OrderingFuture<Integer> future = OrderingFuture.failedFuture(cause);
+ List<Integer> order = new CopyOnWriteArrayList<>();
+
+ future.whenComplete((res, ex) -> order.add(1));
+ future.whenComplete((res, ex) -> order.add(2));
+
+ assertThat(order, contains(1, 2));
+ }
+
+ @Test
+ void whenCompleteMaintainsCallbacksOrderOnCompletion() {
+ OrderingFuture<Integer> future = new OrderingFuture<>();
+ List<Integer> order = new CopyOnWriteArrayList<>();
+
+ future.whenComplete((res, ex) -> order.add(1));
+ future.whenComplete((res, ex) -> order.add(2));
+
+ future.completeExceptionally(cause);
+
+ assertThat(order, contains(1, 2));
+ }
+
+ @Test
+ void differentCallbacksAreCalledInTheOrderOfAdditionToCompletedFuture() {
+ OrderingFuture<Integer> future = OrderingFuture.completedFuture(42);
+ List<Integer> order = new CopyOnWriteArrayList<>();
+
+ future.thenComposeToCompletable(x -> {
+ order.add(1);
+ return CompletableFuture.completedFuture(null);
+ });
+ future.whenComplete((res, ex) -> order.add(2));
+
+ assertThat(order, contains(1, 2));
+ }
+
+ @Test
+ void differentCallbacksAreCalledInTheOrderOfAdditionOnCompletion() {
+ OrderingFuture<Integer> future = new OrderingFuture<>();
+ List<Integer> order = new CopyOnWriteArrayList<>();
+
+ future.thenComposeToCompletable(x -> {
+ order.add(1);
+ return CompletableFuture.completedFuture(null);
+ });
+ future.whenComplete((res, ex) -> order.add(2));
+
+ future.complete(42);
+
+ assertThat(order, contains(1, 2));
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/future/OrderingFutureConcurrencyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/future/OrderingFutureConcurrencyTest.java
new file mode 100644
index 0000000000..5056df05b7
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/future/OrderingFutureConcurrencyTest.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.future;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Stream;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+/**
+ * Tests for concurrency aspects of {@link OrderingFuture}.
+ */
+class OrderingFutureConcurrencyTest {
+ private static final RuntimeException cause = new RuntimeException();
+
+ private final Object lockMonitor = new Object();
+
+ @Test
+ void concurrentAdditionOfWhenCompleteCallbacksIsCorrect() throws Exception {
+ OrderingFuture<Integer> future = new OrderingFuture<>();
+
+ AtomicInteger counter = new AtomicInteger();
+
+ Runnable addIncrementerTask = () -> {
+ for (int i = 0; i < 10_000; i++) {
+ future.whenComplete((res, ex) -> counter.incrementAndGet());
+ }
+ };
+
+ executeInParallel(addIncrementerTask, addIncrementerTask);
+
+ future.complete(1);
+
+ assertThat(counter.get(), is(20_000));
+ }
+
+ private void executeInParallel(Runnable task1, Runnable task2) throws InterruptedException {
+ Thread thread1 = new Thread(task1);
+ Thread thread2 = new Thread(task2);
+
+ thread1.start();
+ thread2.start();
+
+ thread1.join();
+ thread2.join();
+ }
+
+ @Test
+ void concurrentAdditionOfThenComposeToCompletableCallbacksIsCorrect() throws Exception {
+ OrderingFuture<Integer> future = new OrderingFuture<>();
+
+ AtomicInteger counter = new AtomicInteger();
+
+ Runnable addIncrementerTask = () -> {
+ for (int i = 0; i < 10_000; i++) {
+ future.thenComposeToCompletable(x -> {
+ counter.incrementAndGet();
+ return CompletableFuture.completedFuture(null);
+ });
+ }
+ };
+
+ executeInParallel(addIncrementerTask, addIncrementerTask);
+
+ future.complete(1);
+
+ assertThat(counter.get(), is(20_000));
+ }
+
+ @Timeout(value = 10, unit = TimeUnit.SECONDS)
+ @ParameterizedTest
+ @MethodSource("completionsAndOperations")
+ void noDeadlockBetweenCompletionAndOtherOperations(Completion completion, Operation operation) throws Exception {
+ for (int i = 0; i < 10; i++) {
+ OrderingFuture<Void> future = futureThatLocksMonitorViaCallbackOnCompletion();
+
+ Runnable isCompletedExceptionallyTask = () -> {
+ synchronized (lockMonitor) {
+ long started = System.nanoTime();
+ while (System.nanoTime() < started + TimeUnit.MILLISECONDS.toNanos(100)) {
+ operation.execute(future);
+ }
+ }
+ };
+
+ executeInParallel(completion.completionTask(future), isCompletedExceptionallyTask);
+ }
+ }
+
+ private static Stream<Arguments> completionsAndOperations() {
+ List<Arguments> args = new ArrayList<>();
+
+ for (Completion completion : Completion.values()) {
+ for (Operation operation : Operation.values()) {
+ args.add(Arguments.of(completion, operation));
+ }
+ }
+
+ return args.stream();
+ }
+
+ private OrderingFuture<Void> futureThatLocksMonitorViaCallbackOnCompletion() {
+ OrderingFuture<Void> future = new OrderingFuture<>();
+
+ future.whenComplete((res, ex) -> {
+ synchronized (lockMonitor) {
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ });
+
+ return future;
+ }
+
+ private enum Completion {
+ NORMAL {
+ @Override
+ Runnable completionTask(OrderingFuture<?> future) {
+ return () -> future.complete(null);
+ }
+ },
+ EXCEPTIONAL {
+ @Override
+ Runnable completionTask(OrderingFuture<?> future) {
+ return () -> future.completeExceptionally(cause);
+ }
+ };
+
+ abstract Runnable completionTask(OrderingFuture<?> future);
+ }
+
+ private enum Operation {
+ COMPLETE_NORMALLY {
+ @Override
+ void execute(OrderingFuture<?> future) {
+ future.complete(null);
+ }
+ },
+ COMPLETE_EXCEPTIONALLY {
+ @Override
+ void execute(OrderingFuture<?> future) {
+ future.completeExceptionally(cause);
+ }
+ },
+ IS_COMPLETED_EXCEPTIONALLY {
+ @Override
+ void execute(OrderingFuture<?> future) {
+ future.isCompletedExceptionally();
+ }
+ },
+ WHEN_COMPLETE {
+ @Override
+ void execute(OrderingFuture<?> future) {
+ future.whenComplete((res, ex) -> {});
+ }
+ },
+ THEN_COMPOSE_TO_COMPLETABLE {
+ @Override
+ void execute(OrderingFuture<?> future) {
+ future.thenComposeToCompletable(x -> CompletableFuture.completedFuture(null));
+ }
+ },
+ GET_NOW {
+ @Override
+ void execute(OrderingFuture<?> future) {
+ try {
+ future.getNow(null);
+ } catch (Exception e) {
+ // ignored
+ }
+ }
+ };
+
+ abstract void execute(OrderingFuture<?> future);
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/future/OrderingFutureTest.java b/modules/core/src/test/java/org/apache/ignite/internal/future/OrderingFutureTest.java
new file mode 100644
index 0000000000..2d97daf138
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/future/OrderingFutureTest.java
@@ -0,0 +1,573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.future;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
+import static org.hamcrest.Matchers.sameInstance;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import org.junit.jupiter.api.Test;
+
+/**
+ * General tests for {@link OrderingFuture}.
+ */
+class OrderingFutureTest {
+ private final RuntimeException cause = new RuntimeException("Oops");
+
+ @Test
+ void completedFutureCreatesCompletedFuture() {
+ OrderingFuture<Integer> future = OrderingFuture.completedFuture(1);
+
+ assertThat(future.getNow(999), is(1));
+ }
+
+ @Test
+ void completedFutureIsNotFailed() {
+ OrderingFuture<Integer> future = OrderingFuture.completedFuture(1);
+
+ assertFalse(future.isCompletedExceptionally());
+ }
+
+ @Test
+ void failedFutureCreatesFutureCompletedExceptionally() {
+ OrderingFuture<Integer> future = OrderingFuture.failedFuture(cause);
+
+ assertThatFutureIsCompletedWithOurException(future);
+ }
+
+ private void assertThatFutureIsCompletedWithOurException(OrderingFuture<Integer> future) {
+ CompletionException ex = assertThrows(CompletionException.class, () -> future.getNow(999));
+
+ assertThat(ex.getCause(), is(sameInstance(cause)));
+ }
+
+ private void assertThatFutureIsCompletedWithOurException(CompletableFuture<Integer> future) {
+ CompletionException ex = assertThrows(CompletionException.class, () -> future.getNow(999));
+
+ assertThat(ex.getCause(), is(sameInstance(cause)));
+ }
+
+ @Test
+ void failedFutureIsCompletedExceptionally() {
+ OrderingFuture<Integer> future = OrderingFuture.failedFuture(cause);
+
+ assertTrue(future.isCompletedExceptionally());
+ }
+
+ @Test
+ void adaptingIncompleteFutureProducesIncompleteResult() {
+ CompletableFuture<Integer> adaptee = new CompletableFuture<>();
+ OrderingFuture<Integer> adaptor = OrderingFuture.adapt(adaptee);
+
+ assertThat(adaptor.getNow(999), is(999));
+ }
+
+ @Test
+ void normalCompletionIsPropagatedThrowAdapt() {
+ CompletableFuture<Integer> adaptee = new CompletableFuture<>();
+ OrderingFuture<Integer> adaptor = OrderingFuture.adapt(adaptee);
+
+ adaptee.complete(1);
+
+ assertThat(adaptor.getNow(999), is(1));
+ }
+
+ @Test
+ void exceptionalCompletionIsPropagatedThroughAdapter() {
+ CompletableFuture<Integer> adaptee = new CompletableFuture<>();
+ OrderingFuture<Integer> adaptor = OrderingFuture.adapt(adaptee);
+
+ adaptee.completeExceptionally(cause);
+
+ assertThatFutureIsCompletedWithOurException(adaptor);
+ }
+
+ @Test
+ void completeCompletesIncompleteFuture() {
+ var future = new OrderingFuture<Integer>();
+
+ future.complete(1);
+
+ assertThat(future.getNow(999), is(1));
+ }
+
+ @Test
+ void completeDoesNothingWithCompletedFuture() {
+ OrderingFuture<Integer> future = OrderingFuture.completedFuture(1);
+
+ future.complete(2);
+
+ assertThat(future.getNow(999), is(1));
+ }
+
+ @Test
+ void completeDoesDoesNotInvokeCallbacksSecondTimeOnCompletedFuture() {
+ OrderingFuture<Integer> future = OrderingFuture.completedFuture(1);
+
+ AtomicInteger completionCount = new AtomicInteger();
+
+ future.whenComplete((res, ex) -> completionCount.incrementAndGet());
+
+ future.complete(2);
+
+ assertThat(completionCount.get(), is(1));
+ }
+
+ @Test
+ void completeDoesNothingWithFailedFuture() {
+ OrderingFuture<Integer> future = OrderingFuture.failedFuture(cause);
+
+ future.complete(2);
+
+ assertThatFutureIsCompletedWithOurException(future);
+ }
+
+ @Test
+ void completeExceptionallyDoesDoesNotInvokeCallbacksSecondTimeOnCompletedFuture() {
+ OrderingFuture<Integer> future = OrderingFuture.completedFuture(1);
+
+ AtomicInteger completionCount = new AtomicInteger();
+
+ future.whenComplete((res, ex) -> completionCount.incrementAndGet());
+
+ future.completeExceptionally(cause);
+
+ assertThat(completionCount.get(), is(1));
+ }
+
+ @Test
+ void completeExceptionallyCompletesIncompleteFuture() {
+ var future = new OrderingFuture<Integer>();
+
+ future.completeExceptionally(cause);
+
+ assertThatFutureIsCompletedWithOurException(future);
+ }
+
+ @Test
+ void completeExceptionallyDoesNothingWithCompletedFuture() {
+ OrderingFuture<Integer> future = OrderingFuture.completedFuture(1);
+
+ future.completeExceptionally(cause);
+
+ assertThat(future.getNow(999), is(1));
+ }
+
+ @Test
+ void completeExceptionallyDoesNothingWithFailedFuture() {
+ OrderingFuture<Integer> future = OrderingFuture.failedFuture(cause);
+
+ future.completeExceptionally(new Exception("Another cause"));
+
+ assertThatFutureIsCompletedWithOurException(future);
+ }
+
+ @Test
+ void completionWithCompletionExceptionDoesNotDuplicateCompletionException() {
+ OrderingFuture<Integer> future = new OrderingFuture<>();
+
+ future.completeExceptionally(new CompletionException(cause));
+
+ assertThatFutureIsCompletedWithOurException(future);
+ }
+
+ @Test
+ void whenCompletePropagatesResultFromAlreadyCompletedFuture() {
+ AtomicInteger container = new AtomicInteger();
+ OrderingFuture<Integer> future = OrderingFuture.completedFuture(1);
+
+ future.whenComplete((res, ex) -> container.set(res));
+
+ assertThat(container.get(), is(1));
+ }
+
+ @Test
+ void whenCompletePropagatesExceptionFromAlreadyFailedFuture() {
+ AtomicReference<Throwable> container = new AtomicReference<>();
+ OrderingFuture<Integer> future = OrderingFuture.failedFuture(cause);
+
+ future.whenComplete((res, ex) -> container.set(ex));
+
+ assertThat(container.get(), is(sameInstance(cause)));
+ }
+
+ @Test
+ void whenCompletePropagatesResultFromFutureCompletion() {
+ AtomicInteger container = new AtomicInteger();
+ OrderingFuture<Integer> future = new OrderingFuture<>();
+
+ future.whenComplete((res, ex) -> container.set(res));
+
+ future.complete(1);
+
+ assertThat(container.get(), is(1));
+ }
+
+ @Test
+ void whenCompletePropagatesExceptionFromFutureCompletion() {
+ AtomicReference<Throwable> container = new AtomicReference<>();
+ OrderingFuture<Integer> future = new OrderingFuture<>();
+
+ future.whenComplete((res, ex) -> container.set(ex));
+
+ future.completeExceptionally(cause);
+
+ assertThat(container.get(), is(sameInstance(cause)));
+ }
+
+ @Test
+ void whenCompleteSwallowsExceptionThrownByActionOnAlreadyCompletedFuture() {
+ OrderingFuture<Integer> future = OrderingFuture.completedFuture(1);
+
+ assertDoesNotThrow(() -> future.whenComplete((res, ex) -> {
+ throw cause;
+ }));
+ }
+
+ @Test
+ void whenCompleteSwallowsExceptionThrownByActionOnAlreadyFailedFuture() {
+ OrderingFuture<Integer> future = OrderingFuture.failedFuture(cause);
+
+ assertDoesNotThrow(() -> future.whenComplete((res, ex) -> {
+ throw new RuntimeException("Another exception");
+ }));
+ }
+
+ @Test
+ void composeToCompletablePropagatesResultFromAlreadyCompletedFuture() {
+ OrderingFuture<Integer> orderingFuture = OrderingFuture.completedFuture(3);
+
+ CompletableFuture<Integer> completableFuture = orderingFuture.thenComposeToCompletable(
+ x -> CompletableFuture.completedFuture(x * 5)
+ );
+
+ assertThat(completableFuture.getNow(999), is(15));
+ }
+
+ @Test
+ void composeToCompletablePropagatesExceptionFromAlreadyFailedFuture() {
+ OrderingFuture<Integer> orderingFuture = OrderingFuture.failedFuture(cause);
+
+ CompletableFuture<Integer> completableFuture = orderingFuture.thenComposeToCompletable(CompletableFuture::completedFuture);
+
+ assertThatFutureIsCompletedWithOurException(completableFuture);
+ }
+
+ @Test
+ void composeToCompletableDoesNotInvokeActionOnAlreadyFailedFuture() {
+ AtomicBoolean called = new AtomicBoolean(false);
+ OrderingFuture<Integer> orderingFuture = OrderingFuture.failedFuture(cause);
+
+ CompletableFuture<Integer> completableFuture = orderingFuture.thenComposeToCompletable(value -> {
+ called.set(true);
+ return CompletableFuture.completedFuture(value);
+ });
+
+ assertFalse(called.get());
+ assertThatFutureIsCompletedWithOurException(completableFuture);
+ }
+
+ @Test
+ void composeToCompletablePropagatesResultFromFutureCompletion() {
+ OrderingFuture<Integer> orderingFuture = new OrderingFuture<>();
+
+ CompletableFuture<Integer> completableFuture = orderingFuture.thenComposeToCompletable(
+ x -> CompletableFuture.completedFuture(x * 5)
+ );
+
+ orderingFuture.complete(3);
+
+ assertThat(completableFuture.getNow(999), is(15));
+ }
+
+ @Test
+ void composeToCompletablePropagatesExceptionFromFutureCompletion() {
+ OrderingFuture<Integer> orderingFuture = new OrderingFuture<>();
+
+ CompletableFuture<Integer> completableFuture = orderingFuture.thenComposeToCompletable(CompletableFuture::completedFuture);
+
+ orderingFuture.completeExceptionally(cause);
+
+ assertThatFutureIsCompletedWithOurException(completableFuture);
+ }
+
+ @Test
+ void composeToCompletableDoesNotInvokeActionOnExceptionalCompletion() {
+ AtomicBoolean called = new AtomicBoolean(false);
+ OrderingFuture<Integer> orderingFuture = new OrderingFuture<>();
+
+ CompletableFuture<Integer> completableFuture = orderingFuture.thenComposeToCompletable(value -> {
+ called.set(true);
+ return CompletableFuture.completedFuture(value);
+ });
+ orderingFuture.completeExceptionally(cause);
+
+ assertFalse(called.get());
+ assertThatFutureIsCompletedWithOurException(completableFuture);
+ }
+
+ @Test
+ void composeToCompletablePropagatesExceptionFromActionOnCompletedFuture() {
+ OrderingFuture<Integer> orderingFuture = OrderingFuture.completedFuture(1);
+
+ CompletableFuture<Integer> completableFuture = orderingFuture.thenComposeToCompletable(x -> {
+ throw cause;
+ });
+
+ assertThatFutureIsCompletedWithOurException(completableFuture);
+ }
+
+ @Test
+ void composeToCompletablePropagatesExceptionFromActionOnNormalCompletion() {
+ OrderingFuture<Integer> orderingFuture = new OrderingFuture<>();
+
+ CompletableFuture<Integer> completableFuture = orderingFuture.thenComposeToCompletable(x -> {
+ throw cause;
+ });
+
+ orderingFuture.complete(1);
+
+ assertThatFutureIsCompletedWithOurException(completableFuture);
+ }
+
+ @Test
+ void composeToCompletableWrapsCancellationExceptionInCompletionException() {
+ AtomicReference<Throwable> causeRef = new AtomicReference<>();
+
+ OrderingFuture<Integer> future = new OrderingFuture<>();
+ future.thenComposeToCompletable(x -> CompletableFuture.completedFuture(null)).whenComplete((res, ex) -> causeRef.set(ex));
+
+ CancellationException cancellationException = new CancellationException("Oops");
+ future.completeExceptionally(cancellationException);
+
+ assertThat(causeRef.get(), is(instanceOf(CompletionException.class)));
+ assertThat(causeRef.get().getCause(), is(cancellationException));
+ }
+
+ @Test
+ void getNowReturnsCompletionValueFromCompletedFuture() {
+ OrderingFuture<Integer> future = OrderingFuture.completedFuture(1);
+
+ assertThat(future.getNow(999), is(1));
+ }
+
+ @Test
+ void getNowReturnsCompletionValueFromFutureCompletedManually() {
+ OrderingFuture<Integer> future = new OrderingFuture<>();
+ future.complete(1);
+
+ assertThat(future.getNow(999), is(1));
+ }
+
+ @Test
+ void getNowReturnsDefaultValueFromIncompleteFuture() {
+ OrderingFuture<Integer> incompleteFuture = new OrderingFuture<>();
+
+ assertThat(incompleteFuture.getNow(999), is(999));
+ }
+
+ @Test
+ void getNowThrowsOnFailedFuture() {
+ OrderingFuture<Integer> future = OrderingFuture.failedFuture(cause);
+
+ CompletionException ex = assertThrows(CompletionException.class, () -> future.getNow(999));
+ assertThat(ex.getCause(), is(sameInstance(cause)));
+ }
+
+ @Test
+ void getNowThrowsOnFutureCompletedExceptionally() {
+ OrderingFuture<Integer> future = new OrderingFuture<>();
+ future.completeExceptionally(cause);
+
+ CompletionException ex = assertThrows(CompletionException.class, () -> future.getNow(999));
+ assertThat(ex.getCause(), is(sameInstance(cause)));
+ }
+
+ @Test
+ void getWithTimeoutReturnsCompletionValueFromCompletedFuture() throws Exception {
+ OrderingFuture<Integer> future = OrderingFuture.completedFuture(1);
+
+ assertThat(future.get(1, TimeUnit.NANOSECONDS), is(1));
+ }
+
+ @Test
+ void getWithTimeoutReturnsCompletionValueFromFutureCompletedManually() throws Exception {
+ OrderingFuture<Integer> future = new OrderingFuture<>();
+ future.complete(1);
+
+ assertThat(future.get(1, TimeUnit.NANOSECONDS), is(1));
+ }
+
+ @Test
+ void getWithTimeoutReturnsCompletionValueFromFutureCompletedFromDifferentThread() throws Exception {
+ OrderingFuture<Integer> future = new OrderingFuture<>();
+
+ new Thread(() -> future.complete(1)).start();
+
+ assertThat(future.get(1, TimeUnit.SECONDS), is(1));
+ }
+
+ @Test
+ void getWithTimeoutThrowsOnFailedFuture() {
+ OrderingFuture<Integer> future = OrderingFuture.failedFuture(cause);
+
+ ExecutionException ex = assertThrows(ExecutionException.class, () -> future.get(1, TimeUnit.NANOSECONDS));
+ assertThat(ex.getCause(), is(sameInstance(cause)));
+ }
+
+ @Test
+ void getWithTimeoutThrowsOnFutureCompletedExceptionally() {
+ OrderingFuture<Integer> future = new OrderingFuture<>();
+
+ future.completeExceptionally(cause);
+
+ ExecutionException ex = assertThrows(ExecutionException.class, () -> future.get(1, TimeUnit.NANOSECONDS));
+ assertThat(ex.getCause(), is(sameInstance(cause)));
+ }
+
+ @Test
+ void getWithTimeoutThrowsOnFutureCompletedExceptionallyFromDifferentThread() {
+ OrderingFuture<Integer> future = new OrderingFuture<>();
+
+ new Thread(() -> future.completeExceptionally(cause)).start();
+
+ ExecutionException ex = assertThrows(ExecutionException.class, () -> future.get(1, TimeUnit.SECONDS));
+ assertThat(ex.getCause(), is(sameInstance(cause)));
+ }
+
+ @Test
+ void getWithTimeoutUnwrapsCompletionExceptionWhenThrowsExecutionException() {
+ OrderingFuture<Void> future = OrderingFuture.failedFuture(new CompletionException(cause));
+
+ ExecutionException ex = assertThrows(ExecutionException.class, () -> future.get(1, TimeUnit.SECONDS));
+ assertThat(ex.getCause(), is(sameInstance(cause)));
+ }
+
+ @Test
+ void getWithTimeoutDoesNotWrapCancellationExceptionInExecutionException() {
+ CancellationException cancellationException = new CancellationException();
+ OrderingFuture<Void> future = OrderingFuture.failedFuture(cancellationException);
+
+ CancellationException ex = assertThrows(CancellationException.class, () -> future.get(1, TimeUnit.NANOSECONDS));
+ assertThat(ex, is(cancellationException));
+ }
+
+ @Test
+ void getWithTimeoutThrowsTimeoutExceptionWhenTimesOut() {
+ OrderingFuture<Integer> incompleteFuture = new OrderingFuture<>();
+
+ assertThrows(TimeoutException.class, () -> incompleteFuture.get(1, TimeUnit.NANOSECONDS));
+ }
+
+ @Test
+ void getWithTimeoutThrowsTimeoutExceptionWhenTimeoutIsZero() {
+ OrderingFuture<Integer> incompleteFuture = new OrderingFuture<>();
+
+ assertThrows(TimeoutException.class, () -> incompleteFuture.get(0, TimeUnit.NANOSECONDS));
+ }
+
+ @Test
+ void getWithTimeoutThrowsInterruptedExceptionOnInterruption() throws Exception {
+ OrderingFuture<Integer> incompleteFuture = new OrderingFuture<>();
+
+ CountDownLatch workerStartReached = new CountDownLatch(1);
+ CountDownLatch workerEndReached = new CountDownLatch(1);
+ AtomicBoolean interrupted = new AtomicBoolean(false);
+ AtomicReference<Throwable> exRef = new AtomicReference<>();
+
+ Thread worker = new Thread(() -> {
+ workerStartReached.countDown();
+
+ try {
+ incompleteFuture.get(1, TimeUnit.MINUTES);
+ } catch (InterruptedException e) {
+ interrupted.set(true);
+ } catch (TimeoutException | ExecutionException e) {
+ exRef.set(e);
+ } finally {
+ workerEndReached.countDown();
+ }
+ });
+ worker.start();
+
+ assertTrue(workerStartReached.await(1, TimeUnit.SECONDS));
+
+ worker.interrupt();
+
+ assertTrue(workerEndReached.await(1, TimeUnit.SECONDS));
+
+ assertThat(interrupted.get(), is(true));
+ assertThat(exRef.get(), is(nullValue()));
+ }
+
+ @Test
+ void getWithTimeoutThrowsInterruptedExceptionIfThreadIsAlreadyInterruptedEvenWithZeroTimeout() {
+ OrderingFuture<Integer> incompleteFuture = new OrderingFuture<>();
+
+ Thread.currentThread().interrupt();
+
+ assertThrows(InterruptedException.class, () -> incompleteFuture.get(0, TimeUnit.NANOSECONDS));
+ }
+
+ @Test
+ void conversionOfIncompleteFutureToCompletableFutureProducesIncompleteResult() {
+ OrderingFuture<Integer> orderingFuture = new OrderingFuture<>();
+
+ CompletableFuture<Integer> completableFuture = orderingFuture.toCompletableFuture();
+
+ assertThat(completableFuture.getNow(999), is(999));
+ }
+
+ @Test
+ void normalCompletionIsPropagatedToCompletableFuture() {
+ OrderingFuture<Integer> orderingFuture = new OrderingFuture<>();
+
+ CompletableFuture<Integer> completableFuture = orderingFuture.toCompletableFuture();
+
+ orderingFuture.complete(1);
+
+ assertThat(completableFuture.getNow(999), is(1));
+ }
+
+ @Test
+ void exceptionalCompletionIsPropagatedToCompletableFuture() {
+ OrderingFuture<Integer> orderingFuture = new OrderingFuture<>();
+
+ CompletableFuture<Integer> completableFuture = orderingFuture.toCompletableFuture();
+
+ orderingFuture.completeExceptionally(cause);
+
+ assertThatFutureIsCompletedWithOurException(completableFuture);
+ }
+}
diff --git a/modules/network-api/src/main/java/org/apache/ignite/network/MessagingService.java b/modules/network-api/src/main/java/org/apache/ignite/network/MessagingService.java
index 5fe5d7b2d5..c12569df19 100644
--- a/modules/network-api/src/main/java/org/apache/ignite/network/MessagingService.java
+++ b/modules/network-api/src/main/java/org/apache/ignite/network/MessagingService.java
@@ -39,11 +39,15 @@ public interface MessagingService {
*
* <p>Guarantees:
* <ul>
- * <li>Messages will be delivered in the same order as they were sent;</li>
- * <li>If a message N has been successfully delivered to a member implies that all messages preceding N
- * have also been successfully delivered.</li>
+ * <li>Messages send to same receiver will be delivered in the same order as they were sent;</li>
+ * <li>If a message N has been successfully delivered to a member implies that all messages to same receiver
+ * preceding N have also been successfully delivered.</li>
* </ul>
*
+ * <p>Please note that the guarantees only work for same (sender, receiver) pairs. That is, if A sends m1 and m2
+ * to B, then the guarantees are maintained. If, on the other hand, A sends m1 to B and m2 to C, then no guarantees
+ * exist.
+ *
* @param recipient Recipient of the message.
* @param msg Message which should be delivered.
* @return Future of the send operation.
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
index 67a725bcaa..e49fbe0787 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
@@ -32,10 +32,12 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.stream.Stream;
import org.apache.ignite.configuration.schemas.network.NetworkView;
+import org.apache.ignite.internal.future.OrderingFuture;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.network.NetworkMessagesFactory;
import org.apache.ignite.internal.network.handshake.HandshakeManager;
+import org.apache.ignite.internal.network.recovery.RecoveryClientHandhakeManagerFactory;
import org.apache.ignite.internal.network.recovery.RecoveryClientHandshakeManager;
import org.apache.ignite.internal.network.recovery.RecoveryDescriptorProvider;
import org.apache.ignite.internal.network.recovery.RecoveryServerHandshakeManager;
@@ -83,6 +85,9 @@ public class ConnectionManager {
/** Node launch id. As opposed to {@link #consistentId}, this identifier changes between restarts. */
private final UUID launchId;
+ /** Factory producing {@link RecoveryClientHandshakeManager} instances. */
+ private final RecoveryClientHandhakeManagerFactory clientHandhakeManagerFactory;
+
/** Start flag. */
private final AtomicBoolean started = new AtomicBoolean(false);
@@ -107,10 +112,39 @@ public class ConnectionManager {
UUID launchId,
String consistentId,
NettyBootstrapFactory bootstrapFactory
+ ) {
+ this(
+ networkConfiguration,
+ serializationService,
+ launchId,
+ consistentId,
+ bootstrapFactory,
+ new DefaultRecoveryClientHandhakeManagerFactory()
+ );
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param networkConfiguration Network configuration.
+ * @param serializationService Serialization service.
+ * @param launchId Launch id of this node.
+ * @param consistentId Consistent id of this node.
+ * @param bootstrapFactory Bootstrap factory.
+ * @param clientHandhakeManagerFactory Factory for {@link RecoveryClientHandshakeManager} instances.
+ */
+ public ConnectionManager(
+ NetworkView networkConfiguration,
+ SerializationService serializationService,
+ UUID launchId,
+ String consistentId,
+ NettyBootstrapFactory bootstrapFactory,
+ RecoveryClientHandhakeManagerFactory clientHandhakeManagerFactory
) {
this.serializationService = serializationService;
this.launchId = launchId;
this.consistentId = consistentId;
+ this.clientHandhakeManagerFactory = clientHandhakeManagerFactory;
this.server = new NettyServer(
networkConfiguration,
@@ -170,7 +204,7 @@ public class ConnectionManager {
* @param address Another node's address.
* @return Sender.
*/
- public CompletableFuture<NettySender> channel(@Nullable String consistentId, SocketAddress address) {
+ public OrderingFuture<NettySender> channel(@Nullable String consistentId, SocketAddress address) {
if (consistentId != null) {
// If consistent id is known, try looking up a channel by consistent id. There can be an outbound connection
// or an inbound connection associated with that consistent id.
@@ -180,7 +214,7 @@ public class ConnectionManager {
);
if (channel != null) {
- return CompletableFuture.completedFuture(channel);
+ return OrderingFuture.completedFuture(channel);
}
}
@@ -193,11 +227,7 @@ public class ConnectionManager {
? existingClient : connect(addr, (short) 0)
);
- CompletableFuture<NettySender> sender = client.sender();
-
- assert sender != null;
-
- return sender;
+ return client.sender();
}
/**
@@ -292,7 +322,12 @@ public class ConnectionManager {
}
private HandshakeManager createClientHandshakeManager(short connectionId) {
- return new RecoveryClientHandshakeManager(launchId, consistentId, connectionId, FACTORY, descriptorProvider);
+ return clientHandhakeManagerFactory.create(
+ launchId,
+ consistentId,
+ connectionId,
+ descriptorProvider
+ );
}
private HandshakeManager createServerHandshakeManager() {
@@ -314,7 +349,6 @@ public class ConnectionManager {
*
* @return This node's consistent id.
*/
- @TestOnly
public String consistentId() {
return consistentId;
}
@@ -338,4 +372,15 @@ public class ConnectionManager {
public Map<String, NettySender> channels() {
return Collections.unmodifiableMap(channels);
}
+
+ /**
+ * Factory producing vanilla {@link RecoveryClientHandshakeManager} instances.
+ */
+ private static class DefaultRecoveryClientHandhakeManagerFactory implements RecoveryClientHandhakeManagerFactory {
+ @Override
+ public RecoveryClientHandshakeManager create(UUID launchId, String consistentId, short connectionId,
+ RecoveryDescriptorProvider recoveryDescriptorProvider) {
+ return new RecoveryClientHandshakeManager(launchId, consistentId, connectionId, recoveryDescriptorProvider);
+ }
+ }
}
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyClient.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyClient.java
index da0ad164ab..5ad57f3a84 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyClient.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyClient.java
@@ -22,10 +22,12 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import java.net.SocketAddress;
+import java.util.Objects;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Function;
+import org.apache.ignite.internal.future.OrderingFuture;
import org.apache.ignite.internal.network.handshake.HandshakeManager;
import org.apache.ignite.internal.network.serialization.PerSessionSerializationService;
import org.apache.ignite.internal.network.serialization.SerializationService;
@@ -47,10 +49,10 @@ public class NettyClient {
/** Future that resolves when the client finished the handshake. */
@Nullable
- private volatile CompletableFuture<NettySender> clientFuture = null;
+ private volatile OrderingFuture<NettySender> senderFuture = null;
/** Future that resolves when the client channel is opened. */
- private CompletableFuture<Void> channelFuture = new CompletableFuture<>();
+ private final CompletableFuture<Void> channelFuture = new CompletableFuture<>();
/** Client channel. */
@Nullable
@@ -91,13 +93,13 @@ public class NettyClient {
* @param bootstrapTemplate Template client bootstrap.
* @return Future that resolves when client channel is opened.
*/
- public CompletableFuture<NettySender> start(Bootstrap bootstrapTemplate) {
+ public OrderingFuture<NettySender> start(Bootstrap bootstrapTemplate) {
synchronized (startStopLock) {
if (stopped) {
throw new IgniteInternalException("Attempted to start an already stopped NettyClient");
}
- if (clientFuture != null) {
+ if (senderFuture != null) {
throw new IgniteInternalException("Attempted to start an already started NettyClient");
}
@@ -113,7 +115,7 @@ public class NettyClient {
}
});
- clientFuture = NettyUtils.toChannelCompletableFuture(bootstrap.connect(address))
+ CompletableFuture<NettySender> senderCompletableFuture = NettyUtils.toChannelCompletableFuture(bootstrap.connect(address))
.handle((channel, throwable) -> {
synchronized (startStopLock) {
this.channel = channel;
@@ -134,8 +136,9 @@ public class NettyClient {
}
})
.thenCompose(Function.identity());
+ senderFuture = OrderingFuture.adapt(senderCompletableFuture);
- return clientFuture;
+ return senderFuture;
}
}
@@ -144,9 +147,10 @@ public class NettyClient {
*
* @return Client start future.
*/
- @Nullable
- public CompletableFuture<NettySender> sender() {
- return clientFuture;
+ public OrderingFuture<NettySender> sender() {
+ Objects.requireNonNull(senderFuture, "NettyClient is not connected yet");
+
+ return senderFuture;
}
/**
@@ -162,7 +166,7 @@ public class NettyClient {
stopped = true;
- if (clientFuture == null) {
+ if (senderFuture == null) {
return CompletableFuture.completedFuture(null);
}
@@ -180,7 +184,9 @@ public class NettyClient {
* @return {@code true} if the client has failed to connect to the remote server, {@code false} otherwise.
*/
public boolean failedToConnect() {
- return clientFuture != null && clientFuture.isCompletedExceptionally();
+ OrderingFuture<NettySender> currentFuture = senderFuture;
+
+ return currentFuture != null && currentFuture.isCompletedExceptionally();
}
/**
@@ -189,6 +195,8 @@ public class NettyClient {
* @return {@code true} if the client has lost the connection or has been stopped, {@code false} otherwise.
*/
public boolean isDisconnected() {
- return (channel != null && !channel.isOpen()) || stopped;
+ Channel currentChannel = channel;
+
+ return (currentChannel != null && !currentChannel.isOpen()) || stopped;
}
}
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyUtils.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyUtils.java
index 2ae35d965c..d6d6abb751 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyUtils.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyUtils.java
@@ -22,6 +22,7 @@ import io.netty.channel.ChannelFuture;
import io.netty.util.concurrent.Future;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
+import java.util.function.Supplier;
import org.jetbrains.annotations.Async.Execute;
import org.jetbrains.annotations.Async.Schedule;
@@ -43,19 +44,38 @@ public class NettyUtils {
@Schedule F nettyFuture,
Function<F, T> mapper
) {
- var fut = new CompletableFuture<T>();
+ return toCompletableFuture(nettyFuture, mapper, CompletableFuture::new);
+ }
+
+ /**
+ * Convert a Netty {@link Future} to a {@link CompletableFuture}.
+ *
+ * @param nettyFuture Netty future.
+ * @param mapper Function that maps successfully resolved Netty future to a value for a CompletableFuture.
+ * @param completableFutureFactory Factory used to produce a fresh instance of a {@link CompletableFuture}.
+ * @param <T> Resulting future type.
+ * @param <R> Netty future result type.
+ * @param <F> Netty future type.
+ * @return CompletableFuture.
+ */
+ public static <T, R, F extends Future<R>> CompletableFuture<T> toCompletableFuture(
+ @Schedule F nettyFuture,
+ Function<F, T> mapper,
+ Supplier<? extends CompletableFuture<T>> completableFutureFactory
+ ) {
+ CompletableFuture<T> completableFuture = completableFutureFactory.get();
- nettyFuture.addListener((@Execute F future) -> {
- if (future.isSuccess()) {
- fut.complete(mapper.apply(future));
- } else if (future.isCancelled()) {
- fut.cancel(true);
+ nettyFuture.addListener((@Execute F doneFuture) -> {
+ if (doneFuture.isSuccess()) {
+ completableFuture.complete(mapper.apply(doneFuture));
+ } else if (doneFuture.isCancelled()) {
+ completableFuture.cancel(true);
} else {
- fut.completeExceptionally(future.cause());
+ completableFuture.completeExceptionally(doneFuture.cause());
}
});
- return fut;
+ return completableFuture;
}
/**
@@ -76,6 +96,6 @@ public class NettyUtils {
* @return CompletableFuture.
*/
public static CompletableFuture<Channel> toChannelCompletableFuture(ChannelFuture channelFuture) {
- return toCompletableFuture(channelFuture, ChannelFuture::channel);
+ return toCompletableFuture(channelFuture, ChannelFuture::channel, CompletableFuture::new);
}
}
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandhakeManagerFactory.java b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandhakeManagerFactory.java
new file mode 100644
index 0000000000..2ab0dfec29
--- /dev/null
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandhakeManagerFactory.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.recovery;
+
+import java.util.UUID;
+
+/**
+ * Factory producing {@link RecoveryClientHandshakeManager} instances.
+ */
+public interface RecoveryClientHandhakeManagerFactory {
+ /**
+ * Produces a {@link RecoveryClientHandshakeManager} instance.
+ *
+ * @param launchId ID of the launch.
+ * @param consistentId Consistent ID of the node.
+ * @param connectionId ID of the connection.
+ * @param recoveryDescriptorProvider Provider of recovery descriptors to be used.
+ * @return Created manager.
+ */
+ RecoveryClientHandshakeManager create(
+ UUID launchId,
+ String consistentId,
+ short connectionId,
+ RecoveryDescriptorProvider recoveryDescriptorProvider
+ );
+}
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java
index 086919331c..87b2a889e7 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java
@@ -43,15 +43,15 @@ import org.jetbrains.annotations.TestOnly;
* Recovery protocol handshake manager for a client.
*/
public class RecoveryClientHandshakeManager implements HandshakeManager {
+ /** Message factory. */
+ private static final NetworkMessagesFactory MESSAGE_FACTORY = new NetworkMessagesFactory();
+
/** Launch id. */
private final UUID launchId;
/** Consistent id. */
private final String consistentId;
- /** Message factory. */
- private final NetworkMessagesFactory messageFactory;
-
/** Recovery descriptor provider. */
private final RecoveryDescriptorProvider recoveryDescriptorProvider;
@@ -84,16 +84,14 @@ public class RecoveryClientHandshakeManager implements HandshakeManager {
*
* @param launchId Launch id.
* @param consistentId Consistent id.
- * @param messageFactory Message factory.
* @param recoveryDescriptorProvider Recovery descriptor provider.
*/
public RecoveryClientHandshakeManager(
- UUID launchId, String consistentId, short connectionId, NetworkMessagesFactory messageFactory,
+ UUID launchId, String consistentId, short connectionId,
RecoveryDescriptorProvider recoveryDescriptorProvider) {
this.launchId = launchId;
this.consistentId = consistentId;
this.connectionId = connectionId;
- this.messageFactory = messageFactory;
this.recoveryDescriptorProvider = recoveryDescriptorProvider;
}
@@ -165,9 +163,9 @@ public class RecoveryClientHandshakeManager implements HandshakeManager {
}
private void handshake(RecoveryDescriptor descriptor) {
- PipelineUtils.afterHandshake(ctx.pipeline(), descriptor, createMessageHandler(), messageFactory);
+ PipelineUtils.afterHandshake(ctx.pipeline(), descriptor, createMessageHandler(), MESSAGE_FACTORY);
- HandshakeStartResponseMessage response = messageFactory.handshakeStartResponseMessage()
+ HandshakeStartResponseMessage response = MESSAGE_FACTORY.handshakeStartResponseMessage()
.launchId(launchId)
.consistentId(consistentId)
.receivedCount(descriptor.receivedCount())
@@ -197,7 +195,7 @@ public class RecoveryClientHandshakeManager implements HandshakeManager {
/**
* Finishes handshaking process by removing handshake handler from the pipeline and creating a {@link NettySender}.
*/
- private void finishHandshake() {
+ protected void finishHandshake() {
// Removes handshake handler from the pipeline as the handshake is finished
this.ctx.pipeline().remove(this.handler);
diff --git a/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java b/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
index 2dd64270c2..4659dfe9f8 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
@@ -45,7 +45,6 @@ import org.apache.ignite.internal.network.netty.InNetworkObject;
import org.apache.ignite.internal.network.netty.NettySender;
import org.apache.ignite.internal.network.serialization.ClassDescriptorRegistry;
import org.apache.ignite.internal.network.serialization.DescriptorRegistry;
-import org.apache.ignite.internal.network.serialization.UserObjectSerializationContext;
import org.apache.ignite.internal.network.serialization.marshal.UserObjectMarshaller;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.lang.IgniteException;
@@ -98,14 +97,15 @@ public class DefaultMessagingService extends AbstractMessagingService {
*
* @param factory Network messages factory.
* @param topologyService Topology service.
- * @param userObjectSerializationContext Serialization context.
+ * @param classDescriptorRegistry Descriptor registry.
+ * @param marshaller Marshaller.
*/
public DefaultMessagingService(NetworkMessagesFactory factory, TopologyService topologyService,
- UserObjectSerializationContext userObjectSerializationContext) {
+ ClassDescriptorRegistry classDescriptorRegistry, UserObjectMarshaller marshaller) {
this.factory = factory;
this.topologyService = topologyService;
- this.marshaller = userObjectSerializationContext.marshaller();
- this.classDescriptorRegistry = userObjectSerializationContext.descriptorRegistry();
+ this.classDescriptorRegistry = classDescriptorRegistry;
+ this.marshaller = marshaller;
}
/**
@@ -251,7 +251,7 @@ public class DefaultMessagingService extends AbstractMessagingService {
}
return connectionManager.channel(recipientConsistentId, addr)
- .thenCompose(sender -> sender.send(new OutNetworkObject(message, descriptors)));
+ .thenComposeToCompletable(sender -> sender.send(new OutNetworkObject(message, descriptors)));
}
private List<ClassDescriptorMessage> beforeRead(NetworkMessage msg) throws Exception {
diff --git a/modules/network/src/main/java/org/apache/ignite/network/NettyBootstrapFactory.java b/modules/network/src/main/java/org/apache/ignite/network/NettyBootstrapFactory.java
index 69b279a35c..14ea4ba141 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/NettyBootstrapFactory.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/NettyBootstrapFactory.java
@@ -57,10 +57,7 @@ public class NettyBootstrapFactory implements IgniteComponent {
* @param networkConfiguration Network configuration.
* @param eventLoopGroupNamePrefix Prefix for event loop group names.
*/
- public NettyBootstrapFactory(
- NetworkConfiguration networkConfiguration,
- String eventLoopGroupNamePrefix
- ) {
+ public NettyBootstrapFactory(NetworkConfiguration networkConfiguration, String eventLoopGroupNamePrefix) {
assert eventLoopGroupNamePrefix != null;
assert networkConfiguration != null;
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
index 34dd9a82fb..a49064d82f 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
@@ -78,7 +78,12 @@ public class ScaleCubeClusterServiceFactory {
UserObjectSerializationContext userObjectSerialization = createUserObjectSerializationContext();
- var messagingService = new DefaultMessagingService(messageFactory, topologyService, userObjectSerialization);
+ var messagingService = new DefaultMessagingService(
+ messageFactory,
+ topologyService,
+ userObjectSerialization.descriptorRegistry(),
+ userObjectSerialization.marshaller()
+ );
return new AbstractClusterService(context, topologyService, messagingService) {
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyClientTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyClientTest.java
index 0d44fdc308..29e69c7add 100644
--- a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyClientTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyClientTest.java
@@ -33,7 +33,7 @@ import java.nio.channels.ClosedChannelException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+import org.apache.ignite.internal.future.OrderingFuture;
import org.apache.ignite.internal.network.handshake.HandshakeManager;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.network.NetworkMessage;
@@ -64,7 +64,7 @@ public class NettyClientTest {
* @throws Exception If failed.
*/
@Test
- public void testSuccessfulConnect() throws InterruptedException, ExecutionException, TimeoutException {
+ public void testSuccessfulConnect() throws Exception {
var channel = new EmbeddedChannel();
ClientAndSender tuple = createClientAndSenderFromChannelFuture(channel.newSucceededFuture());
@@ -83,11 +83,9 @@ public class NettyClientTest {
/**
* Tests a scenario where NettyClient fails to connect.
- *
- * @throws Exception If failed.
*/
@Test
- public void testFailedToConnect() throws InterruptedException, ExecutionException, TimeoutException {
+ public void testFailedToConnect() {
var channel = new EmbeddedChannel();
ClientAndSender tuple = createClientAndSenderFromChannelFuture(channel.newFailedFuture(new ClosedChannelException()));
@@ -134,11 +132,9 @@ public class NettyClientTest {
/**
* Tests a scenario where a connection is established successfully after a client has been stopped.
- *
- * @throws Exception If failed.
*/
@Test
- public void testStoppedBeforeStarted() throws Exception {
+ public void testStoppedBeforeStarted() {
var channel = new EmbeddedChannel();
var future = channel.newPromise();
@@ -161,11 +157,9 @@ public class NettyClientTest {
/**
* Tests that a {@link NettyClient#start} method can be called only once.
- *
- * @throws Exception If failed.
*/
@Test
- public void testStartTwice() throws Exception {
+ public void testStartTwice() {
var channel = new EmbeddedChannel();
Bootstrap bootstrap = mockBootstrap();
@@ -230,7 +224,7 @@ public class NettyClientTest {
private static class ClientAndSender {
private final NettyClient client;
- private final CompletableFuture<NettySender> sender;
+ private final OrderingFuture<NettySender> sender;
/**
* Constructor.
@@ -238,7 +232,7 @@ public class NettyClientTest {
* @param client Netty client.
* @param sender Netty sender.
*/
- private ClientAndSender(NettyClient client, CompletableFuture<NettySender> sender) {
+ private ClientAndSender(NettyClient client, OrderingFuture<NettySender> sender) {
this.client = client;
this.sender = sender;
}
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java
index 7933e5c040..a2b2eeecd0 100644
--- a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java
@@ -447,7 +447,7 @@ public class RecoveryHandshakeTest {
private RecoveryClientHandshakeManager createRecoveryClientHandshakeManager(String consistentId, UUID launchId,
RecoveryDescriptorProvider provider) {
- return new RecoveryClientHandshakeManager(launchId, consistentId, CONNECTION_ID, MESSAGE_FACTORY, provider);
+ return new RecoveryClientHandshakeManager(launchId, consistentId, CONNECTION_ID, provider);
}
private RecoveryServerHandshakeManager createRecoveryServerHandshakeManager(RecoveryDescriptorProvider provider) {
diff --git a/modules/network/src/test/java/org/apache/ignite/network/DefaultMessagingServiceTest.java b/modules/network/src/test/java/org/apache/ignite/network/DefaultMessagingServiceTest.java
new file mode 100644
index 0000000000..2609603d5c
--- /dev/null
+++ b/modules/network/src/test/java/org/apache/ignite/network/DefaultMessagingServiceTest.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.when;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.configuration.schemas.network.InboundView;
+import org.apache.ignite.configuration.schemas.network.NetworkConfiguration;
+import org.apache.ignite.configuration.schemas.network.NetworkView;
+import org.apache.ignite.configuration.schemas.network.OutboundView;
+import org.apache.ignite.internal.network.NetworkMessagesFactory;
+import org.apache.ignite.internal.network.messages.TestMessage;
+import org.apache.ignite.internal.network.messages.TestMessageSerializationFactory;
+import org.apache.ignite.internal.network.messages.TestMessageTypes;
+import org.apache.ignite.internal.network.messages.TestMessagesFactory;
+import org.apache.ignite.internal.network.netty.ConnectionManager;
+import org.apache.ignite.internal.network.recovery.RecoveryClientHandhakeManagerFactory;
+import org.apache.ignite.internal.network.recovery.RecoveryClientHandshakeManager;
+import org.apache.ignite.internal.network.recovery.RecoveryDescriptorProvider;
+import org.apache.ignite.internal.network.serialization.ClassDescriptorFactory;
+import org.apache.ignite.internal.network.serialization.ClassDescriptorRegistry;
+import org.apache.ignite.internal.network.serialization.SerializationService;
+import org.apache.ignite.internal.network.serialization.UserObjectSerializationContext;
+import org.apache.ignite.internal.network.serialization.marshal.DefaultUserObjectMarshaller;
+import org.apache.ignite.internal.network.serialization.marshal.UserObjectMarshaller;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class DefaultMessagingServiceTest {
+ private static final int SENDER_PORT = 2001;
+ private static final int RECEIVER_PORT = 2002;
+
+ @Mock
+ private TopologyService topologyService;
+
+ @Mock
+ private NetworkConfiguration senderNetworkConfig;
+ @Mock
+ private NetworkView senderNetworkConfigView;
+ @Mock
+ private OutboundView senderOutboundConfig;
+ @Mock
+ private InboundView senderInboundConfig;
+
+ @Mock
+ private NetworkConfiguration receiverNetworkConfig;
+ @Mock
+ private NetworkView receiverNetworkConfigView;
+ @Mock
+ private OutboundView receiverOutboundConfig;
+ @Mock
+ private InboundView receiverInboundConfig;
+
+ private final NetworkMessagesFactory networkMessagesFactory = new NetworkMessagesFactory();
+ private final TestMessagesFactory testMessagesFactory = new TestMessagesFactory();
+ private final MessageSerializationRegistryImpl messageSerializationRegistry = new MessageSerializationRegistryImpl();
+
+ private final ClusterNode receiverNode = new ClusterNode(
+ "receiver",
+ "receiver",
+ new NetworkAddress("localhost", RECEIVER_PORT, "receiver")
+ );
+
+ @BeforeEach
+ void initSerializationRegistry() {
+ messageSerializationRegistry.registerFactory(
+ (short) 2,
+ TestMessageTypes.TEST,
+ new TestMessageSerializationFactory(testMessagesFactory)
+ );
+ }
+
+ @Test
+ void messagesSentBeforeChannelStartAreDeliveredInCorrectOrder() throws Exception {
+ configureSender();
+ configureReceiver();
+
+ CountDownLatch allowSendLatch = new CountDownLatch(1);
+
+ try (
+ Services senderServices = createMessagingService(
+ "sender", "sender-network", senderNetworkConfig, () -> awaitQuietly(allowSendLatch)
+ );
+ Services receiverServices = createMessagingService("receiver", "receiver-network", receiverNetworkConfig, () -> {})
+ ) {
+ List<String> payloads = new CopyOnWriteArrayList<>();
+ CountDownLatch messagesDeliveredLatch = new CountDownLatch(2);
+
+ receiverServices.messagingService.addMessageHandler(
+ TestMessageTypes.class,
+ (message, senderAddr, correlationId) -> {
+ payloads.add(((TestMessage) message).msg());
+ messagesDeliveredLatch.countDown();
+ }
+ );
+
+ senderServices.messagingService.send(receiverNode, testMessage("one"));
+ senderServices.messagingService.send(receiverNode, testMessage("two"));
+
+ allowSendLatch.countDown();
+
+ assertTrue(messagesDeliveredLatch.await(1, TimeUnit.SECONDS));
+
+ assertThat(payloads, contains("one", "two"));
+ }
+ }
+
+ private void configureSender() {
+ when(senderNetworkConfigView.port()).thenReturn(SENDER_PORT);
+ configureNetworkDefaults(senderNetworkConfig, senderNetworkConfigView, senderOutboundConfig, senderInboundConfig);
+ }
+
+ private void configureReceiver() {
+ when(receiverNetworkConfigView.port()).thenReturn(RECEIVER_PORT);
+ configureNetworkDefaults(receiverNetworkConfig, receiverNetworkConfigView, receiverOutboundConfig, receiverInboundConfig);
+ }
+
+ private static void configureNetworkDefaults(
+ NetworkConfiguration networkConfig,
+ NetworkView networkConfigView,
+ OutboundView outboundConfig,
+ InboundView inboundConfig
+ ) {
+ when(networkConfig.value()).thenReturn(networkConfigView);
+ when(networkConfigView.portRange()).thenReturn(0);
+ when(networkConfigView.outbound()).thenReturn(outboundConfig);
+ when(networkConfigView.inbound()).thenReturn(inboundConfig);
+ }
+
+ private static void awaitQuietly(CountDownLatch latch) {
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private TestMessage testMessage(String message) {
+ return testMessagesFactory.testMessage().msg(message).build();
+ }
+
+ private Services createMessagingService(
+ String consistentId,
+ String senderEventLoopGroupNamePrefix,
+ NetworkConfiguration networkConfig,
+ Runnable beforeHandshake
+ ) {
+ ClassDescriptorRegistry classDescriptorRegistry = new ClassDescriptorRegistry();
+ ClassDescriptorFactory classDescriptorFactory = new ClassDescriptorFactory(classDescriptorRegistry);
+ UserObjectMarshaller marshaller = new DefaultUserObjectMarshaller(classDescriptorRegistry, classDescriptorFactory);
+ DefaultMessagingService messagingService = new DefaultMessagingService(
+ networkMessagesFactory,
+ topologyService,
+ classDescriptorRegistry,
+ marshaller
+ );
+
+ SerializationService serializationService = new SerializationService(
+ messageSerializationRegistry,
+ new UserObjectSerializationContext(classDescriptorRegistry, classDescriptorFactory, marshaller)
+ );
+ NettyBootstrapFactory bootstrapFactory = new NettyBootstrapFactory(networkConfig, senderEventLoopGroupNamePrefix);
+ bootstrapFactory.start();
+
+ ConnectionManager connectionManager = new ConnectionManager(
+ networkConfig.value(),
+ serializationService,
+ UUID.randomUUID(),
+ consistentId,
+ bootstrapFactory,
+ clientHandshakeManagerFactoryAdding(beforeHandshake)
+ );
+ connectionManager.start();
+
+ messagingService.setConnectionManager(connectionManager);
+
+ return new Services(connectionManager, messagingService);
+ }
+
+ private static RecoveryClientHandhakeManagerFactory clientHandshakeManagerFactoryAdding(Runnable beforeHandshake) {
+ return new RecoveryClientHandhakeManagerFactory() {
+ @Override
+ public RecoveryClientHandshakeManager create(UUID launchId, String consistentId, short connectionId,
+ RecoveryDescriptorProvider recoveryDescriptorProvider) {
+ return new RecoveryClientHandshakeManager(launchId, consistentId, connectionId, recoveryDescriptorProvider) {
+ @Override
+ protected void finishHandshake() {
+ beforeHandshake.run();
+ super.finishHandshake();
+ }
+ };
+ }
+ };
+ }
+
+ private static class Services implements AutoCloseable {
+ private final ConnectionManager connectionManager;
+ private final DefaultMessagingService messagingService;
+
+ private Services(ConnectionManager connectionManager, DefaultMessagingService messagingService) {
+ this.connectionManager = connectionManager;
+ this.messagingService = messagingService;
+ }
+
+ @Override
+ public void close() throws Exception {
+ IgniteUtils.closeAll(connectionManager::stop, messagingService::stop);
+ }
+ }
+}