You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sd...@apache.org on 2022/10/12 07:54:14 UTC

[ignite-3] branch main updated: IGNITE-17249 DefaultMessagingService incorrect message sending order (#1169)

This is an automated email from the ASF dual-hosted git repository.

sdanilov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 70db5f06d0 IGNITE-17249 DefaultMessagingService incorrect message sending order (#1169)
70db5f06d0 is described below

commit 70db5f06d0b6b085da8f3264ad4fa77dd5416882
Author: Roman Puchkovskiy <ro...@gmail.com>
AuthorDate: Wed Oct 12 11:54:09 2022 +0400

    IGNITE-17249 DefaultMessagingService incorrect message sending order (#1169)
---
 .../ignite/internal/future/OrderingFuture.java     | 487 +++++++++++++++++
 .../OrderingFutureCallbackInterferenceTest.java    |  79 +++
 .../future/OrderingFutureCallbackOrderingTest.java | 134 +++++
 .../future/OrderingFutureConcurrencyTest.java      | 204 ++++++++
 .../ignite/internal/future/OrderingFutureTest.java | 573 +++++++++++++++++++++
 .../apache/ignite/network/MessagingService.java    |  10 +-
 .../internal/network/netty/ConnectionManager.java  |  63 ++-
 .../ignite/internal/network/netty/NettyClient.java |  32 +-
 .../ignite/internal/network/netty/NettyUtils.java  |  38 +-
 .../RecoveryClientHandhakeManagerFactory.java      |  41 ++
 .../recovery/RecoveryClientHandshakeManager.java   |  16 +-
 .../ignite/network/DefaultMessagingService.java    |  12 +-
 .../ignite/network/NettyBootstrapFactory.java      |   5 +-
 .../scalecube/ScaleCubeClusterServiceFactory.java  |   7 +-
 .../internal/network/netty/NettyClientTest.java    |  20 +-
 .../network/netty/RecoveryHandshakeTest.java       |   2 +-
 .../network/DefaultMessagingServiceTest.java       | 238 +++++++++
 17 files changed, 1894 insertions(+), 67 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/future/OrderingFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/future/OrderingFuture.java
new file mode 100644
index 0000000000..9bf8deba33
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/future/OrderingFuture.java
@@ -0,0 +1,487 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.future;
+
+import static java.util.concurrent.atomic.AtomicReferenceFieldUpdater.newUpdater;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * A little analogue of {@link CompletableFuture} that has the following property: callbacks (like {@link #whenComplete(BiConsumer)}
+ * and {@link #thenComposeToCompletable(Function)}) are invoked in the same order in which they were registered.
+ *
+ * @param <T> Type of payload.
+ * @see CompletableFuture
+ */
+public class OrderingFuture<T> {
+    @SuppressWarnings("rawtypes")
+    private static final AtomicReferenceFieldUpdater<OrderingFuture, State> STATE = newUpdater(OrderingFuture.class, State.class, "state");
+
+    /**
+     * Stores all the state of this future: whether it is completed, normal completion result (if any), cause
+     * of exceptional completion (if any), dependents. The State class and all of its components are immutable.
+     * We change the state using compare-and-set approach, next state is built from previous one.
+     */
+    private volatile State<T> state = State.empty();
+
+    /**
+     * Used to make sure that at most one thread executes completion code.
+     */
+    private final AtomicBoolean completionStarted = new AtomicBoolean(false);
+
+    /**
+     * Used by {@link #get(long, TimeUnit)} to wait for completion.
+     */
+    private final CountDownLatch completionLatch = new CountDownLatch(1);
+
+    /**
+     * Creates an incomplete future.
+     */
+    public OrderingFuture() {
+    }
+
+    /**
+     * Creates a future that is alredy completed with the given value.
+     *
+     * @param result Value with which the future is completed.
+     * @param <T> Payload type.
+     * @return Completed future.
+     */
+    public static <T> OrderingFuture<T> completedFuture(@Nullable T result) {
+        var future = new OrderingFuture<T>();
+        future.complete(result);
+        return future;
+    }
+
+    /**
+     * Creates a future that is alredy completed exceptionally (i.e. failed) with the given exception.
+     *
+     * @param ex Exception with which the future is failed.
+     * @param <T> Payload type.
+     * @return Failed future.
+     */
+    public static <T> OrderingFuture<T> failedFuture(Throwable ex) {
+        var future = new OrderingFuture<T>();
+        future.completeExceptionally(ex);
+        return future;
+    }
+
+    /**
+     * Adapts a {@link CompletableFuture}. That is, creates an {@link OrderingFuture} that gets completed when the
+     * original future is completed (and in the same way in which it gets completed).
+     *
+     * @param adaptee Future to adapt.
+     * @param <T> Payload type.
+     * @return Adapting future.
+     */
+    public static <T> OrderingFuture<T> adapt(CompletableFuture<T> adaptee) {
+        var future = new OrderingFuture<T>();
+
+        adaptee.whenComplete((res, ex) -> {
+            if (ex != null) {
+                future.completeExceptionally(ex);
+            } else {
+                future.complete(res);
+            }
+        });
+
+        return future;
+    }
+
+    /**
+     * Completes this future with the given result if it's not completed yet; otherwise has no effect.
+     *
+     * @param result Completion value (may be {@code null}).
+     */
+    public void complete(@Nullable T result) {
+        completeInternal(result, null);
+    }
+
+    /**
+     * Completes this future exceptionally with the given exception if it's not completed yet; otherwise has no effect.
+     *
+     * @param ex Exception.
+     */
+    public void completeExceptionally(Throwable ex) {
+        completeInternal(null, ex);
+    }
+
+    private void completeInternal(@Nullable T result, @Nullable Throwable ex) {
+        assert ex == null || result == null;
+
+        if (!completionStarted.compareAndSet(false, true)) {
+            // Someone has already started the completion. We must leave as the following code can produce duplicate
+            // notifications of dependents if executed by more than one thread.
+            return;
+        }
+
+        State<T> prevState;
+        ListNode<T> lastNotifiedNode = null;
+
+        while (true) {
+            prevState = state;
+
+            if (state.completed) {
+                return;
+            }
+
+            State<T> newState = new State<>(true, result, ex, null);
+
+            // We produce side-effects inside the retry loop, but it's ok as the queue can only grow, the queue
+            // state we see is always a prefix of a queue changed by a competitor (we only compete with operations
+            // that enqueue elements to the queue as competition with other completers is ruled out with AtomicBoolean)
+            // and we track what dependents have already been notified by us.
+            notifyDependents(result, ex, prevState.dependentsQueueTail, lastNotifiedNode);
+            lastNotifiedNode = prevState.dependentsQueueTail;
+
+            if (replaceState(prevState, newState)) {
+                break;
+            }
+        }
+
+        completionLatch.countDown();
+    }
+
+    /**
+     * Replaces state with compare-and-set semantics.
+     *
+     * @param prevState State that we expect to see.
+     * @param newState  New state we want to set.
+     * @return {@code true} if CAS was successful.
+     */
+    private boolean replaceState(State<T> prevState, State<T> newState) {
+        return STATE.compareAndSet(this, prevState, newState);
+    }
+
+    /**
+     * Notifies dependents about completion of this future. Does NOT notify notifiedDependents closest to the head of the queue.
+     *
+     * @param result           Normal completion result.
+     * @param ex               Exceptional completion cause.
+     * @param dependents       Dependents queue.
+     * @param lastNotifiedNode Node that was notified last on preceding iterations of while loop.
+     */
+    private void notifyDependents(
+            @Nullable T result,
+            @Nullable Throwable ex,
+            @Nullable ListNode<T> dependents,
+            ListNode<T> lastNotifiedNode
+    ) {
+        if (dependents != null) {
+            dependents.notifyHeadToTail(result, ex, lastNotifiedNode);
+        }
+    }
+
+    /**
+     * Returns {@code true} if this future is completed exceptionally, {@code false} if completed normally or not completed.
+     *
+     * @return {@code true} if this future is completed exceptionally, {@code false} if completed normally or not completed
+     */
+    public boolean isCompletedExceptionally() {
+        return state.exception != null;
+    }
+
+    /**
+     * Adds a callback that gets executed as soon as this future gets completed for any reason. The action will get both result
+     * and exception; if the completion is normal, exception will be {@code null}, otherwise result will be {@code null}.
+     * If it's already complete, the action is executed immediately.
+     * Any exception produced by the action is swallowed.
+     *
+     * @param action Action to execute.
+     */
+    public void whenComplete(BiConsumer<? super T, ? super Throwable> action) {
+        WhenComplete<T> dependent = null;
+
+        while (true) {
+            State<T> prevState = state;
+
+            if (prevState.completed) {
+                acceptQuietly(action, prevState.result, prevState.exception);
+                return;
+            }
+
+            if (dependent == null) {
+                dependent = new WhenComplete<>(action);
+            }
+            State<T> newState = prevState.enqueueDependent(dependent);
+
+            if (replaceState(prevState, newState)) {
+                return;
+            }
+        }
+    }
+
+    private static <T> void acceptQuietly(BiConsumer<? super T, ? super Throwable> action, T result, Throwable ex) {
+        try {
+            action.accept(result, ex);
+        } catch (Exception e) {
+            // ignore
+        }
+    }
+
+    /**
+     * Creates a composition of this future with a function producing a {@link CompletableFuture}.
+     *
+     * @param mapper Mapper used to produce a {@link CompletableFuture} from this future result.
+     * @param <U> Result future payload type.
+     * @return Composition.
+     * @see CompletableFuture#thenCompose(Function)
+     */
+    public <U> CompletableFuture<U> thenComposeToCompletable(Function<? super T, ? extends CompletableFuture<U>> mapper) {
+        ThenComposeToCompletable<T, U> dependent = null;
+
+        while (true) {
+            State<T> prevState = state;
+
+            if (prevState.completed) {
+                if (prevState.exception != null) {
+                    return CompletableFuture.failedFuture(wrapWithCompletionException(prevState.exception));
+                } else {
+                    return applyMapper(mapper, prevState.result);
+                }
+            }
+
+            if (dependent == null) {
+                dependent = new ThenComposeToCompletable<>(new CompletableFuture<>(), mapper);
+            }
+            State<T> newState = prevState.enqueueDependent(dependent);
+
+            if (replaceState(prevState, newState)) {
+                return dependent.resultFuture;
+            }
+        }
+    }
+
+    private static CompletionException wrapWithCompletionException(Throwable ex) {
+        return ex instanceof CompletionException ? (CompletionException) ex : new CompletionException(ex);
+    }
+
+    private static <T, U> CompletableFuture<U> applyMapper(Function<? super T, ? extends CompletableFuture<U>> mapper, T result) {
+        try {
+            return mapper.apply(result);
+        } catch (Throwable e) {
+            return CompletableFuture.failedFuture(e);
+        }
+    }
+
+    /**
+     * Returns the completion value, (if the future is completed normally), throws completion cause wrapped in
+     * {@link CompletionException} (if the future is completed exceptionally), or returns the provided default value
+     * if the future is not completed yet.
+     *
+     * @param valueIfAbsent Value to return if the future is not completed yet.
+     * @return Completion value or default value.
+     * @see CompletableFuture#getNow(Object)
+     */
+    public T getNow(T valueIfAbsent) {
+        State<T> currentState = state;
+
+        if (currentState.completed) {
+            if (currentState.exception != null) {
+                throw wrapWithCompletionException(currentState.exception);
+            } else {
+                return currentState.result;
+            }
+        } else {
+            return valueIfAbsent;
+        }
+    }
+
+    /**
+     * Returns completion value or throws completion exception (wrapped in {@link ExecutionException}), waiting for
+     * completion up to the specified amount of time, if not completed yet. If the time runs out while waiting,
+     * throws {@link TimeoutException}.
+     *
+     * @param timeout Maximum amount of time to wait.
+     * @param unit    Unit of time in which the timeout is given.
+     * @return Completion value.
+     * @throws InterruptedException Thrown if the current thread gets interrupted while waiting for completion.
+     * @throws TimeoutException Thrown if the wait for completion times out.
+     * @throws ExecutionException Thrown (with the original exception as a cause) if the future completes exceptionally.
+     * @see CompletableFuture#get(long, TimeUnit)
+     */
+    public T get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException, ExecutionException {
+        boolean completedInTime = completionLatch.await(timeout, unit);
+        if (!completedInTime) {
+            throw new TimeoutException();
+        }
+
+        State<T> currentState = state;
+
+        if (currentState.exception instanceof CancellationException) {
+            throw (CancellationException) currentState.exception;
+        } else if (currentState.exception != null) {
+            throw exceptionForThrowingFromGet(currentState);
+        } else {
+            return currentState.result;
+        }
+    }
+
+    private ExecutionException exceptionForThrowingFromGet(State<T> currentState) {
+        Throwable unwrapped = currentState.exception;
+        Throwable cause = unwrapped.getCause();
+        if (cause != null) {
+            unwrapped = cause;
+        }
+
+        return new ExecutionException(unwrapped);
+    }
+
+    /**
+     * Returns a {@link CompletableFuture} that gets completed when this future gets completed (and in the same way).
+     * The returned future does not provide any ordering guarantees that this future provides.
+     *
+     * @return An equivalent {@link CompletableFuture}.
+     */
+    public CompletableFuture<T> toCompletableFuture() {
+        CompletableFuture<T> completableFuture = new CompletableFuture<>();
+
+        this.whenComplete((res, ex) -> completeCompletableFuture(completableFuture, res, ex));
+
+        return completableFuture;
+    }
+
+    private static <T> void completeCompletableFuture(CompletableFuture<T> future, T result, Throwable ex) {
+        if (ex != null) {
+            future.completeExceptionally(ex);
+        } else {
+            future.complete(result);
+        }
+    }
+
+    /**
+     * Dependent action that gets notified when this future is completed.
+     *
+     * @param <T> Payload type.
+     */
+    private interface DependentAction<T> {
+        /**
+         * Informs that dependent that the host future is completed.
+         *
+         * @param result Normal completion result ({@code null} if completed exceptionally, but might be {@code null} for normal completion.
+         * @param ex     Exceptional completion cause ({@code null} if completed normally).
+         */
+        void onCompletion(T result, Throwable ex);
+    }
+
+    private static class WhenComplete<T> implements DependentAction<T> {
+        private final BiConsumer<? super T, ? super Throwable> action;
+
+        private WhenComplete(BiConsumer<? super T, ? super Throwable> action) {
+            this.action = action;
+        }
+
+        @Override
+        public void onCompletion(T result, Throwable ex) {
+            acceptQuietly(action, result, ex);
+        }
+    }
+
+    private static class ThenComposeToCompletable<T, U> implements DependentAction<T> {
+        private final CompletableFuture<U> resultFuture;
+        private final Function<? super T, ? extends CompletableFuture<U>> mapper;
+
+        private ThenComposeToCompletable(CompletableFuture<U> resultFuture, Function<? super T, ? extends CompletableFuture<U>> mapper) {
+            this.resultFuture = resultFuture;
+            this.mapper = mapper;
+        }
+
+        @Override
+        public void onCompletion(T result, Throwable ex) {
+            if (ex != null) {
+                resultFuture.completeExceptionally(wrapWithCompletionException(ex));
+                return;
+            }
+
+            try {
+                CompletableFuture<U> mapResult = mapper.apply(result);
+
+                mapResult.whenComplete((mapRes, mapEx) -> completeCompletableFuture(resultFuture, mapRes, mapEx));
+            } catch (Throwable e) {
+                resultFuture.completeExceptionally(e);
+            }
+        }
+    }
+
+    private static class State<T> {
+        private static final State<?> EMPTY_STATE = new State<>(false, null, null, null);
+
+        private final boolean completed;
+        private final T result;
+        private final Throwable exception;
+        private final ListNode<T> dependentsQueueTail;
+
+        private State(boolean completed, T result, Throwable exception, ListNode<T> dependentsQueueTail) {
+            this.completed = completed;
+            this.result = result;
+            this.exception = exception;
+            this.dependentsQueueTail = dependentsQueueTail;
+        }
+
+        @SuppressWarnings("unchecked")
+        private static <T> State<T> empty() {
+            return (State<T>) EMPTY_STATE;
+        }
+
+        public State<T> enqueueDependent(DependentAction<T> dependent) {
+            return new State<>(completed, result, exception, new ListNode<>(dependent, dependentsQueueTail));
+        }
+    }
+
+    private static class ListNode<T> {
+        private final DependentAction<T> dependent;
+        private final ListNode<T> next;
+
+        private ListNode(DependentAction<T> dependent, ListNode<T> next) {
+            this.dependent = dependent;
+            this.next = next;
+        }
+
+        public void notifyHeadToTail(T result, Throwable exception, ListNode<T> lastNotifiedNode) {
+            Deque<ListNode<T>> stack = new ArrayDeque<>();
+
+            for (ListNode<T> node = this; node != null && node != lastNotifiedNode; node = node.next) {
+                stack.addFirst(node);
+            }
+
+            // Notify those dependents that are not notified yet.
+            while (!stack.isEmpty()) {
+                ListNode<T> node = stack.removeFirst();
+
+                try {
+                    node.dependent.onCompletion(result, exception);
+                } catch (Exception e) {
+                    // ignore
+                }
+            }
+        }
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/future/OrderingFutureCallbackInterferenceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/future/OrderingFutureCallbackInterferenceTest.java
new file mode 100644
index 0000000000..4749dba8d8
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/future/OrderingFutureCallbackInterferenceTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.future;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests making sure that, when one callback throws an exception, its fellows still get a chance to be completed.
+ */
+class OrderingFutureCallbackInterferenceTest {
+    private final RuntimeException cause = new RuntimeException("Oops");
+
+    @Test
+    void composeToCompletableDoesNotInterfereWithEachOther() {
+        OrderingFuture<Integer> future = new OrderingFuture<>();
+
+        List<Integer> order = new CopyOnWriteArrayList<>();
+
+        future.thenComposeToCompletable(x -> {
+            throw cause;
+        });
+        future.thenComposeToCompletable(x -> {
+            order.add(1);
+            return CompletableFuture.completedFuture(null);
+        });
+        future.thenComposeToCompletable(x -> {
+            throw cause;
+        });
+        future.thenComposeToCompletable(x -> {
+            order.add(2);
+            return CompletableFuture.completedFuture(null);
+        });
+
+        future.complete(1);
+
+        assertThat(order, contains(1, 2));
+    }
+
+    @Test
+    void whenCompleteDoesNotInterfereWithEachOther() {
+        OrderingFuture<Integer> future = new OrderingFuture<>();
+
+        List<Integer> order = new CopyOnWriteArrayList<>();
+
+        future.whenComplete((res, ex) -> {
+            throw new RuntimeException("one");
+        });
+        future.whenComplete((res, ex) -> order.add(1));
+        future.whenComplete((res, ex) -> {
+            throw new RuntimeException("three");
+        });
+        future.whenComplete((res, ex) -> order.add(2));
+
+        future.complete(1);
+
+        assertThat(order, contains(1, 2));
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/future/OrderingFutureCallbackOrderingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/future/OrderingFutureCallbackOrderingTest.java
new file mode 100644
index 0000000000..c1ee16ead5
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/future/OrderingFutureCallbackOrderingTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.future;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests making sure that callbacks are ordered correctly.
+ */
+class OrderingFutureCallbackOrderingTest {
+    private final RuntimeException cause = new RuntimeException("Oops");
+
+    @Test
+    void composeToCompletableMaintainsCallbacksOrderOnCompletedFuture() {
+        OrderingFuture<Integer> future = OrderingFuture.completedFuture(42);
+        List<Integer> order = new CopyOnWriteArrayList<>();
+
+        future.thenComposeToCompletable(x -> {
+            order.add(1);
+            return CompletableFuture.completedFuture(null);
+        });
+        future.thenComposeToCompletable(x -> {
+            order.add(2);
+            return CompletableFuture.completedFuture(null);
+        });
+
+        assertThat(order, contains(1, 2));
+    }
+
+    @Test
+    void composeToCompletableMaintainsCallbacksOrderOnCompletion() {
+        OrderingFuture<Integer> future = new OrderingFuture<>();
+        List<Integer> order = new CopyOnWriteArrayList<>();
+
+        future.thenComposeToCompletable(x -> {
+            order.add(1);
+            return CompletableFuture.completedFuture(null);
+        });
+        future.thenComposeToCompletable(x -> {
+            order.add(2);
+            return CompletableFuture.completedFuture(null);
+        });
+
+        future.complete(42);
+
+        assertThat(order, contains(1, 2));
+    }
+
+    @Test
+    void whenCompleteMaintainsCallbacksOrderOnCompletedFuture() {
+        OrderingFuture<Integer> future = OrderingFuture.completedFuture(42);
+        List<Integer> order = new CopyOnWriteArrayList<>();
+
+        future.whenComplete((res, ex) -> order.add(1));
+        future.whenComplete((res, ex) -> order.add(2));
+
+        assertThat(order, contains(1, 2));
+    }
+
+    @Test
+    void whenCompleteMaintainsCallbacksOrderOnFailedFuture() {
+        OrderingFuture<Integer> future = OrderingFuture.failedFuture(cause);
+        List<Integer> order = new CopyOnWriteArrayList<>();
+
+        future.whenComplete((res, ex) -> order.add(1));
+        future.whenComplete((res, ex) -> order.add(2));
+
+        assertThat(order, contains(1, 2));
+    }
+
+    @Test
+    void whenCompleteMaintainsCallbacksOrderOnCompletion() {
+        OrderingFuture<Integer> future = new OrderingFuture<>();
+        List<Integer> order = new CopyOnWriteArrayList<>();
+
+        future.whenComplete((res, ex) -> order.add(1));
+        future.whenComplete((res, ex) -> order.add(2));
+
+        future.completeExceptionally(cause);
+
+        assertThat(order, contains(1, 2));
+    }
+
+    @Test
+    void differentCallbacksAreCalledInTheOrderOfAdditionToCompletedFuture() {
+        OrderingFuture<Integer> future = OrderingFuture.completedFuture(42);
+        List<Integer> order = new CopyOnWriteArrayList<>();
+
+        future.thenComposeToCompletable(x -> {
+            order.add(1);
+            return CompletableFuture.completedFuture(null);
+        });
+        future.whenComplete((res, ex) -> order.add(2));
+
+        assertThat(order, contains(1, 2));
+    }
+
+    @Test
+    void differentCallbacksAreCalledInTheOrderOfAdditionOnCompletion() {
+        OrderingFuture<Integer> future = new OrderingFuture<>();
+        List<Integer> order = new CopyOnWriteArrayList<>();
+
+        future.thenComposeToCompletable(x -> {
+            order.add(1);
+            return CompletableFuture.completedFuture(null);
+        });
+        future.whenComplete((res, ex) -> order.add(2));
+
+        future.complete(42);
+
+        assertThat(order, contains(1, 2));
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/future/OrderingFutureConcurrencyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/future/OrderingFutureConcurrencyTest.java
new file mode 100644
index 0000000000..5056df05b7
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/future/OrderingFutureConcurrencyTest.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.future;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Stream;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+/**
+ * Tests for concurrency aspects of {@link OrderingFuture}.
+ */
+class OrderingFutureConcurrencyTest {
+    private static final RuntimeException cause = new RuntimeException();
+
+    private final Object lockMonitor = new Object();
+
+    @Test
+    void concurrentAdditionOfWhenCompleteCallbacksIsCorrect() throws Exception {
+        OrderingFuture<Integer> future = new OrderingFuture<>();
+
+        AtomicInteger counter = new AtomicInteger();
+
+        Runnable addIncrementerTask = () -> {
+            for (int i = 0; i < 10_000; i++) {
+                future.whenComplete((res, ex) -> counter.incrementAndGet());
+            }
+        };
+
+        executeInParallel(addIncrementerTask, addIncrementerTask);
+
+        future.complete(1);
+
+        assertThat(counter.get(), is(20_000));
+    }
+
+    private void executeInParallel(Runnable task1, Runnable task2) throws InterruptedException {
+        Thread thread1 = new Thread(task1);
+        Thread thread2 = new Thread(task2);
+
+        thread1.start();
+        thread2.start();
+
+        thread1.join();
+        thread2.join();
+    }
+
+    @Test
+    void concurrentAdditionOfThenComposeToCompletableCallbacksIsCorrect() throws Exception {
+        OrderingFuture<Integer> future = new OrderingFuture<>();
+
+        AtomicInteger counter = new AtomicInteger();
+
+        Runnable addIncrementerTask = () -> {
+            for (int i = 0; i < 10_000; i++) {
+                future.thenComposeToCompletable(x -> {
+                    counter.incrementAndGet();
+                    return CompletableFuture.completedFuture(null);
+                });
+            }
+        };
+
+        executeInParallel(addIncrementerTask, addIncrementerTask);
+
+        future.complete(1);
+
+        assertThat(counter.get(), is(20_000));
+    }
+
+    @Timeout(value = 10, unit = TimeUnit.SECONDS)
+    @ParameterizedTest
+    @MethodSource("completionsAndOperations")
+    void noDeadlockBetweenCompletionAndOtherOperations(Completion completion, Operation operation) throws Exception {
+        for (int i = 0; i < 10; i++) {
+            OrderingFuture<Void> future = futureThatLocksMonitorViaCallbackOnCompletion();
+
+            Runnable isCompletedExceptionallyTask = () -> {
+                synchronized (lockMonitor) {
+                    long started = System.nanoTime();
+                    while (System.nanoTime() < started + TimeUnit.MILLISECONDS.toNanos(100)) {
+                        operation.execute(future);
+                    }
+                }
+            };
+
+            executeInParallel(completion.completionTask(future), isCompletedExceptionallyTask);
+        }
+    }
+
+    private static Stream<Arguments> completionsAndOperations() {
+        List<Arguments> args = new ArrayList<>();
+
+        for (Completion completion : Completion.values()) {
+            for (Operation operation : Operation.values()) {
+                args.add(Arguments.of(completion, operation));
+            }
+        }
+
+        return args.stream();
+    }
+
+    private OrderingFuture<Void> futureThatLocksMonitorViaCallbackOnCompletion() {
+        OrderingFuture<Void> future = new OrderingFuture<>();
+
+        future.whenComplete((res, ex) -> {
+            synchronized (lockMonitor) {
+                try {
+                    Thread.sleep(10);
+                } catch (InterruptedException e) {
+                    // ignore
+                }
+            }
+        });
+
+        return future;
+    }
+
+    private enum Completion {
+        NORMAL {
+            @Override
+            Runnable completionTask(OrderingFuture<?> future) {
+                return () -> future.complete(null);
+            }
+        },
+        EXCEPTIONAL {
+            @Override
+            Runnable completionTask(OrderingFuture<?> future) {
+                return () -> future.completeExceptionally(cause);
+            }
+        };
+
+        abstract Runnable completionTask(OrderingFuture<?> future);
+    }
+
+    private enum Operation {
+        COMPLETE_NORMALLY {
+            @Override
+            void execute(OrderingFuture<?> future) {
+                future.complete(null);
+            }
+        },
+        COMPLETE_EXCEPTIONALLY {
+            @Override
+            void execute(OrderingFuture<?> future) {
+                future.completeExceptionally(cause);
+            }
+        },
+        IS_COMPLETED_EXCEPTIONALLY {
+            @Override
+            void execute(OrderingFuture<?> future) {
+                future.isCompletedExceptionally();
+            }
+        },
+        WHEN_COMPLETE {
+            @Override
+            void execute(OrderingFuture<?> future) {
+                future.whenComplete((res, ex) -> {});
+            }
+        },
+        THEN_COMPOSE_TO_COMPLETABLE {
+            @Override
+            void execute(OrderingFuture<?> future) {
+                future.thenComposeToCompletable(x -> CompletableFuture.completedFuture(null));
+            }
+        },
+        GET_NOW {
+            @Override
+            void execute(OrderingFuture<?> future) {
+                try {
+                    future.getNow(null);
+                } catch (Exception e) {
+                    // ignored
+                }
+            }
+        };
+
+        abstract void execute(OrderingFuture<?> future);
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/future/OrderingFutureTest.java b/modules/core/src/test/java/org/apache/ignite/internal/future/OrderingFutureTest.java
new file mode 100644
index 0000000000..2d97daf138
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/future/OrderingFutureTest.java
@@ -0,0 +1,573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.future;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
+import static org.hamcrest.Matchers.sameInstance;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import org.junit.jupiter.api.Test;
+
+/**
+ * General tests for {@link OrderingFuture}.
+ */
+class OrderingFutureTest {
+    private final RuntimeException cause = new RuntimeException("Oops");
+
+    @Test
+    void completedFutureCreatesCompletedFuture() {
+        OrderingFuture<Integer> future = OrderingFuture.completedFuture(1);
+
+        assertThat(future.getNow(999), is(1));
+    }
+
+    @Test
+    void completedFutureIsNotFailed() {
+        OrderingFuture<Integer> future = OrderingFuture.completedFuture(1);
+
+        assertFalse(future.isCompletedExceptionally());
+    }
+
+    @Test
+    void failedFutureCreatesFutureCompletedExceptionally() {
+        OrderingFuture<Integer> future = OrderingFuture.failedFuture(cause);
+
+        assertThatFutureIsCompletedWithOurException(future);
+    }
+
+    private void assertThatFutureIsCompletedWithOurException(OrderingFuture<Integer> future) {
+        CompletionException ex = assertThrows(CompletionException.class, () -> future.getNow(999));
+
+        assertThat(ex.getCause(), is(sameInstance(cause)));
+    }
+
+    private void assertThatFutureIsCompletedWithOurException(CompletableFuture<Integer> future) {
+        CompletionException ex = assertThrows(CompletionException.class, () -> future.getNow(999));
+
+        assertThat(ex.getCause(), is(sameInstance(cause)));
+    }
+
+    @Test
+    void failedFutureIsCompletedExceptionally() {
+        OrderingFuture<Integer> future = OrderingFuture.failedFuture(cause);
+
+        assertTrue(future.isCompletedExceptionally());
+    }
+
+    @Test
+    void adaptingIncompleteFutureProducesIncompleteResult() {
+        CompletableFuture<Integer> adaptee = new CompletableFuture<>();
+        OrderingFuture<Integer> adaptor = OrderingFuture.adapt(adaptee);
+
+        assertThat(adaptor.getNow(999), is(999));
+    }
+
+    @Test
+    void normalCompletionIsPropagatedThrowAdapt() {
+        CompletableFuture<Integer> adaptee = new CompletableFuture<>();
+        OrderingFuture<Integer> adaptor = OrderingFuture.adapt(adaptee);
+
+        adaptee.complete(1);
+
+        assertThat(adaptor.getNow(999), is(1));
+    }
+
+    @Test
+    void exceptionalCompletionIsPropagatedThroughAdapter() {
+        CompletableFuture<Integer> adaptee = new CompletableFuture<>();
+        OrderingFuture<Integer> adaptor = OrderingFuture.adapt(adaptee);
+
+        adaptee.completeExceptionally(cause);
+
+        assertThatFutureIsCompletedWithOurException(adaptor);
+    }
+
+    @Test
+    void completeCompletesIncompleteFuture() {
+        var future = new OrderingFuture<Integer>();
+
+        future.complete(1);
+
+        assertThat(future.getNow(999), is(1));
+    }
+
+    @Test
+    void completeDoesNothingWithCompletedFuture() {
+        OrderingFuture<Integer> future = OrderingFuture.completedFuture(1);
+
+        future.complete(2);
+
+        assertThat(future.getNow(999), is(1));
+    }
+
+    @Test
+    void completeDoesDoesNotInvokeCallbacksSecondTimeOnCompletedFuture() {
+        OrderingFuture<Integer> future = OrderingFuture.completedFuture(1);
+
+        AtomicInteger completionCount = new AtomicInteger();
+
+        future.whenComplete((res, ex) -> completionCount.incrementAndGet());
+
+        future.complete(2);
+
+        assertThat(completionCount.get(), is(1));
+    }
+
+    @Test
+    void completeDoesNothingWithFailedFuture() {
+        OrderingFuture<Integer> future = OrderingFuture.failedFuture(cause);
+
+        future.complete(2);
+
+        assertThatFutureIsCompletedWithOurException(future);
+    }
+
+    @Test
+    void completeExceptionallyDoesDoesNotInvokeCallbacksSecondTimeOnCompletedFuture() {
+        OrderingFuture<Integer> future = OrderingFuture.completedFuture(1);
+
+        AtomicInteger completionCount = new AtomicInteger();
+
+        future.whenComplete((res, ex) -> completionCount.incrementAndGet());
+
+        future.completeExceptionally(cause);
+
+        assertThat(completionCount.get(), is(1));
+    }
+
+    @Test
+    void completeExceptionallyCompletesIncompleteFuture() {
+        var future = new OrderingFuture<Integer>();
+
+        future.completeExceptionally(cause);
+
+        assertThatFutureIsCompletedWithOurException(future);
+    }
+
+    @Test
+    void completeExceptionallyDoesNothingWithCompletedFuture() {
+        OrderingFuture<Integer> future = OrderingFuture.completedFuture(1);
+
+        future.completeExceptionally(cause);
+
+        assertThat(future.getNow(999), is(1));
+    }
+
+    @Test
+    void completeExceptionallyDoesNothingWithFailedFuture() {
+        OrderingFuture<Integer> future = OrderingFuture.failedFuture(cause);
+
+        future.completeExceptionally(new Exception("Another cause"));
+
+        assertThatFutureIsCompletedWithOurException(future);
+    }
+
+    @Test
+    void completionWithCompletionExceptionDoesNotDuplicateCompletionException() {
+        OrderingFuture<Integer> future = new OrderingFuture<>();
+
+        future.completeExceptionally(new CompletionException(cause));
+
+        assertThatFutureIsCompletedWithOurException(future);
+    }
+
+    @Test
+    void whenCompletePropagatesResultFromAlreadyCompletedFuture() {
+        AtomicInteger container = new AtomicInteger();
+        OrderingFuture<Integer> future = OrderingFuture.completedFuture(1);
+
+        future.whenComplete((res, ex) -> container.set(res));
+
+        assertThat(container.get(), is(1));
+    }
+
+    @Test
+    void whenCompletePropagatesExceptionFromAlreadyFailedFuture() {
+        AtomicReference<Throwable> container = new AtomicReference<>();
+        OrderingFuture<Integer> future = OrderingFuture.failedFuture(cause);
+
+        future.whenComplete((res, ex) -> container.set(ex));
+
+        assertThat(container.get(), is(sameInstance(cause)));
+    }
+
+    @Test
+    void whenCompletePropagatesResultFromFutureCompletion() {
+        AtomicInteger container = new AtomicInteger();
+        OrderingFuture<Integer> future = new OrderingFuture<>();
+
+        future.whenComplete((res, ex) -> container.set(res));
+
+        future.complete(1);
+
+        assertThat(container.get(), is(1));
+    }
+
+    @Test
+    void whenCompletePropagatesExceptionFromFutureCompletion() {
+        AtomicReference<Throwable> container = new AtomicReference<>();
+        OrderingFuture<Integer> future = new OrderingFuture<>();
+
+        future.whenComplete((res, ex) -> container.set(ex));
+
+        future.completeExceptionally(cause);
+
+        assertThat(container.get(), is(sameInstance(cause)));
+    }
+
+    @Test
+    void whenCompleteSwallowsExceptionThrownByActionOnAlreadyCompletedFuture() {
+        OrderingFuture<Integer> future = OrderingFuture.completedFuture(1);
+
+        assertDoesNotThrow(() -> future.whenComplete((res, ex) -> {
+            throw cause;
+        }));
+    }
+
+    @Test
+    void whenCompleteSwallowsExceptionThrownByActionOnAlreadyFailedFuture() {
+        OrderingFuture<Integer> future = OrderingFuture.failedFuture(cause);
+
+        assertDoesNotThrow(() -> future.whenComplete((res, ex) -> {
+            throw new RuntimeException("Another exception");
+        }));
+    }
+
+    @Test
+    void composeToCompletablePropagatesResultFromAlreadyCompletedFuture() {
+        OrderingFuture<Integer> orderingFuture = OrderingFuture.completedFuture(3);
+
+        CompletableFuture<Integer> completableFuture = orderingFuture.thenComposeToCompletable(
+                x -> CompletableFuture.completedFuture(x * 5)
+        );
+
+        assertThat(completableFuture.getNow(999), is(15));
+    }
+
+    @Test
+    void composeToCompletablePropagatesExceptionFromAlreadyFailedFuture() {
+        OrderingFuture<Integer> orderingFuture = OrderingFuture.failedFuture(cause);
+
+        CompletableFuture<Integer> completableFuture = orderingFuture.thenComposeToCompletable(CompletableFuture::completedFuture);
+
+        assertThatFutureIsCompletedWithOurException(completableFuture);
+    }
+
+    @Test
+    void composeToCompletableDoesNotInvokeActionOnAlreadyFailedFuture() {
+        AtomicBoolean called = new AtomicBoolean(false);
+        OrderingFuture<Integer> orderingFuture = OrderingFuture.failedFuture(cause);
+
+        CompletableFuture<Integer> completableFuture = orderingFuture.thenComposeToCompletable(value -> {
+            called.set(true);
+            return CompletableFuture.completedFuture(value);
+        });
+
+        assertFalse(called.get());
+        assertThatFutureIsCompletedWithOurException(completableFuture);
+    }
+
+    @Test
+    void composeToCompletablePropagatesResultFromFutureCompletion() {
+        OrderingFuture<Integer> orderingFuture = new OrderingFuture<>();
+
+        CompletableFuture<Integer> completableFuture = orderingFuture.thenComposeToCompletable(
+                x -> CompletableFuture.completedFuture(x * 5)
+        );
+
+        orderingFuture.complete(3);
+
+        assertThat(completableFuture.getNow(999), is(15));
+    }
+
+    @Test
+    void composeToCompletablePropagatesExceptionFromFutureCompletion() {
+        OrderingFuture<Integer> orderingFuture = new OrderingFuture<>();
+
+        CompletableFuture<Integer> completableFuture = orderingFuture.thenComposeToCompletable(CompletableFuture::completedFuture);
+
+        orderingFuture.completeExceptionally(cause);
+
+        assertThatFutureIsCompletedWithOurException(completableFuture);
+    }
+
+    @Test
+    void composeToCompletableDoesNotInvokeActionOnExceptionalCompletion() {
+        AtomicBoolean called = new AtomicBoolean(false);
+        OrderingFuture<Integer> orderingFuture = new OrderingFuture<>();
+
+        CompletableFuture<Integer> completableFuture = orderingFuture.thenComposeToCompletable(value -> {
+            called.set(true);
+            return CompletableFuture.completedFuture(value);
+        });
+        orderingFuture.completeExceptionally(cause);
+
+        assertFalse(called.get());
+        assertThatFutureIsCompletedWithOurException(completableFuture);
+    }
+
+    @Test
+    void composeToCompletablePropagatesExceptionFromActionOnCompletedFuture() {
+        OrderingFuture<Integer> orderingFuture = OrderingFuture.completedFuture(1);
+
+        CompletableFuture<Integer> completableFuture = orderingFuture.thenComposeToCompletable(x -> {
+            throw cause;
+        });
+
+        assertThatFutureIsCompletedWithOurException(completableFuture);
+    }
+
+    @Test
+    void composeToCompletablePropagatesExceptionFromActionOnNormalCompletion() {
+        OrderingFuture<Integer> orderingFuture = new OrderingFuture<>();
+
+        CompletableFuture<Integer> completableFuture = orderingFuture.thenComposeToCompletable(x -> {
+            throw cause;
+        });
+
+        orderingFuture.complete(1);
+
+        assertThatFutureIsCompletedWithOurException(completableFuture);
+    }
+
+    @Test
+    void composeToCompletableWrapsCancellationExceptionInCompletionException() {
+        AtomicReference<Throwable> causeRef = new AtomicReference<>();
+
+        OrderingFuture<Integer> future = new OrderingFuture<>();
+        future.thenComposeToCompletable(x -> CompletableFuture.completedFuture(null)).whenComplete((res, ex) -> causeRef.set(ex));
+
+        CancellationException cancellationException = new CancellationException("Oops");
+        future.completeExceptionally(cancellationException);
+
+        assertThat(causeRef.get(), is(instanceOf(CompletionException.class)));
+        assertThat(causeRef.get().getCause(), is(cancellationException));
+    }
+
+    @Test
+    void getNowReturnsCompletionValueFromCompletedFuture() {
+        OrderingFuture<Integer> future = OrderingFuture.completedFuture(1);
+
+        assertThat(future.getNow(999), is(1));
+    }
+
+    @Test
+    void getNowReturnsCompletionValueFromFutureCompletedManually() {
+        OrderingFuture<Integer> future = new OrderingFuture<>();
+        future.complete(1);
+
+        assertThat(future.getNow(999), is(1));
+    }
+
+    @Test
+    void getNowReturnsDefaultValueFromIncompleteFuture() {
+        OrderingFuture<Integer> incompleteFuture = new OrderingFuture<>();
+
+        assertThat(incompleteFuture.getNow(999), is(999));
+    }
+
+    @Test
+    void getNowThrowsOnFailedFuture() {
+        OrderingFuture<Integer> future = OrderingFuture.failedFuture(cause);
+
+        CompletionException ex = assertThrows(CompletionException.class, () -> future.getNow(999));
+        assertThat(ex.getCause(), is(sameInstance(cause)));
+    }
+
+    @Test
+    void getNowThrowsOnFutureCompletedExceptionally() {
+        OrderingFuture<Integer> future = new OrderingFuture<>();
+        future.completeExceptionally(cause);
+
+        CompletionException ex = assertThrows(CompletionException.class, () -> future.getNow(999));
+        assertThat(ex.getCause(), is(sameInstance(cause)));
+    }
+
+    @Test
+    void getWithTimeoutReturnsCompletionValueFromCompletedFuture() throws Exception {
+        OrderingFuture<Integer> future = OrderingFuture.completedFuture(1);
+
+        assertThat(future.get(1, TimeUnit.NANOSECONDS), is(1));
+    }
+
+    @Test
+    void getWithTimeoutReturnsCompletionValueFromFutureCompletedManually() throws Exception {
+        OrderingFuture<Integer> future = new OrderingFuture<>();
+        future.complete(1);
+
+        assertThat(future.get(1, TimeUnit.NANOSECONDS), is(1));
+    }
+
+    @Test
+    void getWithTimeoutReturnsCompletionValueFromFutureCompletedFromDifferentThread() throws Exception {
+        OrderingFuture<Integer> future = new OrderingFuture<>();
+
+        new Thread(() -> future.complete(1)).start();
+
+        assertThat(future.get(1, TimeUnit.SECONDS), is(1));
+    }
+
+    @Test
+    void getWithTimeoutThrowsOnFailedFuture() {
+        OrderingFuture<Integer> future = OrderingFuture.failedFuture(cause);
+
+        ExecutionException ex = assertThrows(ExecutionException.class, () -> future.get(1, TimeUnit.NANOSECONDS));
+        assertThat(ex.getCause(), is(sameInstance(cause)));
+    }
+
+    @Test
+    void getWithTimeoutThrowsOnFutureCompletedExceptionally() {
+        OrderingFuture<Integer> future = new OrderingFuture<>();
+
+        future.completeExceptionally(cause);
+
+        ExecutionException ex = assertThrows(ExecutionException.class, () -> future.get(1, TimeUnit.NANOSECONDS));
+        assertThat(ex.getCause(), is(sameInstance(cause)));
+    }
+
+    @Test
+    void getWithTimeoutThrowsOnFutureCompletedExceptionallyFromDifferentThread() {
+        OrderingFuture<Integer> future = new OrderingFuture<>();
+
+        new Thread(() -> future.completeExceptionally(cause)).start();
+
+        ExecutionException ex = assertThrows(ExecutionException.class, () -> future.get(1, TimeUnit.SECONDS));
+        assertThat(ex.getCause(), is(sameInstance(cause)));
+    }
+
+    @Test
+    void getWithTimeoutUnwrapsCompletionExceptionWhenThrowsExecutionException() {
+        OrderingFuture<Void> future = OrderingFuture.failedFuture(new CompletionException(cause));
+
+        ExecutionException ex = assertThrows(ExecutionException.class, () -> future.get(1, TimeUnit.SECONDS));
+        assertThat(ex.getCause(), is(sameInstance(cause)));
+    }
+
+    @Test
+    void getWithTimeoutDoesNotWrapCancellationExceptionInExecutionException() {
+        CancellationException cancellationException = new CancellationException();
+        OrderingFuture<Void> future = OrderingFuture.failedFuture(cancellationException);
+
+        CancellationException ex = assertThrows(CancellationException.class, () -> future.get(1, TimeUnit.NANOSECONDS));
+        assertThat(ex, is(cancellationException));
+    }
+
+    @Test
+    void getWithTimeoutThrowsTimeoutExceptionWhenTimesOut() {
+        OrderingFuture<Integer> incompleteFuture = new OrderingFuture<>();
+
+        assertThrows(TimeoutException.class, () -> incompleteFuture.get(1, TimeUnit.NANOSECONDS));
+    }
+
+    @Test
+    void getWithTimeoutThrowsTimeoutExceptionWhenTimeoutIsZero() {
+        OrderingFuture<Integer> incompleteFuture = new OrderingFuture<>();
+
+        assertThrows(TimeoutException.class, () -> incompleteFuture.get(0, TimeUnit.NANOSECONDS));
+    }
+
+    @Test
+    void getWithTimeoutThrowsInterruptedExceptionOnInterruption() throws Exception {
+        OrderingFuture<Integer> incompleteFuture = new OrderingFuture<>();
+
+        CountDownLatch workerStartReached = new CountDownLatch(1);
+        CountDownLatch workerEndReached = new CountDownLatch(1);
+        AtomicBoolean interrupted = new AtomicBoolean(false);
+        AtomicReference<Throwable> exRef = new AtomicReference<>();
+
+        Thread worker = new Thread(() -> {
+            workerStartReached.countDown();
+
+            try {
+                incompleteFuture.get(1, TimeUnit.MINUTES);
+            } catch (InterruptedException e) {
+                interrupted.set(true);
+            } catch (TimeoutException | ExecutionException e) {
+                exRef.set(e);
+            } finally {
+                workerEndReached.countDown();
+            }
+        });
+        worker.start();
+
+        assertTrue(workerStartReached.await(1, TimeUnit.SECONDS));
+
+        worker.interrupt();
+
+        assertTrue(workerEndReached.await(1, TimeUnit.SECONDS));
+
+        assertThat(interrupted.get(), is(true));
+        assertThat(exRef.get(), is(nullValue()));
+    }
+
+    @Test
+    void getWithTimeoutThrowsInterruptedExceptionIfThreadIsAlreadyInterruptedEvenWithZeroTimeout() {
+        OrderingFuture<Integer> incompleteFuture = new OrderingFuture<>();
+
+        Thread.currentThread().interrupt();
+
+        assertThrows(InterruptedException.class, () -> incompleteFuture.get(0, TimeUnit.NANOSECONDS));
+    }
+
+    @Test
+    void conversionOfIncompleteFutureToCompletableFutureProducesIncompleteResult() {
+        OrderingFuture<Integer> orderingFuture = new OrderingFuture<>();
+
+        CompletableFuture<Integer> completableFuture = orderingFuture.toCompletableFuture();
+
+        assertThat(completableFuture.getNow(999), is(999));
+    }
+
+    @Test
+    void normalCompletionIsPropagatedToCompletableFuture() {
+        OrderingFuture<Integer> orderingFuture = new OrderingFuture<>();
+
+        CompletableFuture<Integer> completableFuture = orderingFuture.toCompletableFuture();
+
+        orderingFuture.complete(1);
+
+        assertThat(completableFuture.getNow(999), is(1));
+    }
+
+    @Test
+    void exceptionalCompletionIsPropagatedToCompletableFuture() {
+        OrderingFuture<Integer> orderingFuture = new OrderingFuture<>();
+
+        CompletableFuture<Integer> completableFuture = orderingFuture.toCompletableFuture();
+
+        orderingFuture.completeExceptionally(cause);
+
+        assertThatFutureIsCompletedWithOurException(completableFuture);
+    }
+}
diff --git a/modules/network-api/src/main/java/org/apache/ignite/network/MessagingService.java b/modules/network-api/src/main/java/org/apache/ignite/network/MessagingService.java
index 5fe5d7b2d5..c12569df19 100644
--- a/modules/network-api/src/main/java/org/apache/ignite/network/MessagingService.java
+++ b/modules/network-api/src/main/java/org/apache/ignite/network/MessagingService.java
@@ -39,11 +39,15 @@ public interface MessagingService {
      *
      * <p>Guarantees:
      * <ul>
-     *     <li>Messages will be delivered in the same order as they were sent;</li>
-     *     <li>If a message N has been successfully delivered to a member implies that all messages preceding N
-     *     have also been successfully delivered.</li>
+     *     <li>Messages send to same receiver will be delivered in the same order as they were sent;</li>
+     *     <li>If a message N has been successfully delivered to a member implies that all messages to same receiver
+     *     preceding N have also been successfully delivered.</li>
      * </ul>
      *
+     * <p>Please note that the guarantees only work for same (sender, receiver) pairs. That is, if A sends m1 and m2
+     * to B, then the guarantees are maintained. If, on the other hand, A sends m1 to B and m2 to C, then no guarantees
+     * exist.
+     *
      * @param recipient Recipient of the message.
      * @param msg       Message which should be delivered.
      * @return Future of the send operation.
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
index 67a725bcaa..e49fbe0787 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
@@ -32,10 +32,12 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
 import java.util.stream.Stream;
 import org.apache.ignite.configuration.schemas.network.NetworkView;
+import org.apache.ignite.internal.future.OrderingFuture;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.network.NetworkMessagesFactory;
 import org.apache.ignite.internal.network.handshake.HandshakeManager;
+import org.apache.ignite.internal.network.recovery.RecoveryClientHandhakeManagerFactory;
 import org.apache.ignite.internal.network.recovery.RecoveryClientHandshakeManager;
 import org.apache.ignite.internal.network.recovery.RecoveryDescriptorProvider;
 import org.apache.ignite.internal.network.recovery.RecoveryServerHandshakeManager;
@@ -83,6 +85,9 @@ public class ConnectionManager {
     /** Node launch id. As opposed to {@link #consistentId}, this identifier changes between restarts. */
     private final UUID launchId;
 
+    /** Factory producing {@link RecoveryClientHandshakeManager} instances. */
+    private final RecoveryClientHandhakeManagerFactory clientHandhakeManagerFactory;
+
     /** Start flag. */
     private final AtomicBoolean started = new AtomicBoolean(false);
 
@@ -107,10 +112,39 @@ public class ConnectionManager {
             UUID launchId,
             String consistentId,
             NettyBootstrapFactory bootstrapFactory
+    ) {
+        this(
+                networkConfiguration,
+                serializationService,
+                launchId,
+                consistentId,
+                bootstrapFactory,
+                new DefaultRecoveryClientHandhakeManagerFactory()
+        );
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param networkConfiguration          Network configuration.
+     * @param serializationService          Serialization service.
+     * @param launchId                      Launch id of this node.
+     * @param consistentId                  Consistent id of this node.
+     * @param bootstrapFactory              Bootstrap factory.
+     * @param clientHandhakeManagerFactory  Factory for {@link RecoveryClientHandshakeManager} instances.
+     */
+    public ConnectionManager(
+            NetworkView networkConfiguration,
+            SerializationService serializationService,
+            UUID launchId,
+            String consistentId,
+            NettyBootstrapFactory bootstrapFactory,
+            RecoveryClientHandhakeManagerFactory clientHandhakeManagerFactory
     ) {
         this.serializationService = serializationService;
         this.launchId = launchId;
         this.consistentId = consistentId;
+        this.clientHandhakeManagerFactory = clientHandhakeManagerFactory;
 
         this.server = new NettyServer(
                 networkConfiguration,
@@ -170,7 +204,7 @@ public class ConnectionManager {
      * @param address      Another node's address.
      * @return Sender.
      */
-    public CompletableFuture<NettySender> channel(@Nullable String consistentId, SocketAddress address) {
+    public OrderingFuture<NettySender> channel(@Nullable String consistentId, SocketAddress address) {
         if (consistentId != null) {
             // If consistent id is known, try looking up a channel by consistent id. There can be an outbound connection
             // or an inbound connection associated with that consistent id.
@@ -180,7 +214,7 @@ public class ConnectionManager {
             );
 
             if (channel != null) {
-                return CompletableFuture.completedFuture(channel);
+                return OrderingFuture.completedFuture(channel);
             }
         }
 
@@ -193,11 +227,7 @@ public class ConnectionManager {
                         ? existingClient : connect(addr, (short) 0)
         );
 
-        CompletableFuture<NettySender> sender = client.sender();
-
-        assert sender != null;
-
-        return sender;
+        return client.sender();
     }
 
     /**
@@ -292,7 +322,12 @@ public class ConnectionManager {
     }
 
     private HandshakeManager createClientHandshakeManager(short connectionId) {
-        return new RecoveryClientHandshakeManager(launchId, consistentId, connectionId, FACTORY, descriptorProvider);
+        return clientHandhakeManagerFactory.create(
+                launchId,
+                consistentId,
+                connectionId,
+                descriptorProvider
+        );
     }
 
     private HandshakeManager createServerHandshakeManager() {
@@ -314,7 +349,6 @@ public class ConnectionManager {
      *
      * @return This node's consistent id.
      */
-    @TestOnly
     public String consistentId() {
         return consistentId;
     }
@@ -338,4 +372,15 @@ public class ConnectionManager {
     public Map<String, NettySender> channels() {
         return Collections.unmodifiableMap(channels);
     }
+
+    /**
+     * Factory producing vanilla {@link RecoveryClientHandshakeManager} instances.
+     */
+    private static class DefaultRecoveryClientHandhakeManagerFactory implements RecoveryClientHandhakeManagerFactory {
+        @Override
+        public RecoveryClientHandshakeManager create(UUID launchId, String consistentId, short connectionId,
+                RecoveryDescriptorProvider recoveryDescriptorProvider) {
+            return new RecoveryClientHandshakeManager(launchId, consistentId, connectionId, recoveryDescriptorProvider);
+        }
+    }
 }
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyClient.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyClient.java
index da0ad164ab..5ad57f3a84 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyClient.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyClient.java
@@ -22,10 +22,12 @@ import io.netty.channel.Channel;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.socket.SocketChannel;
 import java.net.SocketAddress;
+import java.util.Objects;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Consumer;
 import java.util.function.Function;
+import org.apache.ignite.internal.future.OrderingFuture;
 import org.apache.ignite.internal.network.handshake.HandshakeManager;
 import org.apache.ignite.internal.network.serialization.PerSessionSerializationService;
 import org.apache.ignite.internal.network.serialization.SerializationService;
@@ -47,10 +49,10 @@ public class NettyClient {
 
     /** Future that resolves when the client finished the handshake. */
     @Nullable
-    private volatile CompletableFuture<NettySender> clientFuture = null;
+    private volatile OrderingFuture<NettySender> senderFuture = null;
 
     /** Future that resolves when the client channel is opened. */
-    private CompletableFuture<Void> channelFuture = new CompletableFuture<>();
+    private final CompletableFuture<Void> channelFuture = new CompletableFuture<>();
 
     /** Client channel. */
     @Nullable
@@ -91,13 +93,13 @@ public class NettyClient {
      * @param bootstrapTemplate Template client bootstrap.
      * @return Future that resolves when client channel is opened.
      */
-    public CompletableFuture<NettySender> start(Bootstrap bootstrapTemplate) {
+    public OrderingFuture<NettySender> start(Bootstrap bootstrapTemplate) {
         synchronized (startStopLock) {
             if (stopped) {
                 throw new IgniteInternalException("Attempted to start an already stopped NettyClient");
             }
 
-            if (clientFuture != null) {
+            if (senderFuture != null) {
                 throw new IgniteInternalException("Attempted to start an already started NettyClient");
             }
 
@@ -113,7 +115,7 @@ public class NettyClient {
                 }
             });
 
-            clientFuture = NettyUtils.toChannelCompletableFuture(bootstrap.connect(address))
+            CompletableFuture<NettySender> senderCompletableFuture = NettyUtils.toChannelCompletableFuture(bootstrap.connect(address))
                     .handle((channel, throwable) -> {
                         synchronized (startStopLock) {
                             this.channel = channel;
@@ -134,8 +136,9 @@ public class NettyClient {
                         }
                     })
                     .thenCompose(Function.identity());
+            senderFuture = OrderingFuture.adapt(senderCompletableFuture);
 
-            return clientFuture;
+            return senderFuture;
         }
     }
 
@@ -144,9 +147,10 @@ public class NettyClient {
      *
      * @return Client start future.
      */
-    @Nullable
-    public CompletableFuture<NettySender> sender() {
-        return clientFuture;
+    public OrderingFuture<NettySender> sender() {
+        Objects.requireNonNull(senderFuture, "NettyClient is not connected yet");
+
+        return senderFuture;
     }
 
     /**
@@ -162,7 +166,7 @@ public class NettyClient {
 
             stopped = true;
 
-            if (clientFuture == null) {
+            if (senderFuture == null) {
                 return CompletableFuture.completedFuture(null);
             }
 
@@ -180,7 +184,9 @@ public class NettyClient {
      * @return {@code true} if the client has failed to connect to the remote server, {@code false} otherwise.
      */
     public boolean failedToConnect() {
-        return clientFuture != null && clientFuture.isCompletedExceptionally();
+        OrderingFuture<NettySender> currentFuture = senderFuture;
+
+        return currentFuture != null && currentFuture.isCompletedExceptionally();
     }
 
     /**
@@ -189,6 +195,8 @@ public class NettyClient {
      * @return {@code true} if the client has lost the connection or has been stopped, {@code false} otherwise.
      */
     public boolean isDisconnected() {
-        return (channel != null && !channel.isOpen()) || stopped;
+        Channel currentChannel = channel;
+
+        return (currentChannel != null && !currentChannel.isOpen()) || stopped;
     }
 }
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyUtils.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyUtils.java
index 2ae35d965c..d6d6abb751 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyUtils.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyUtils.java
@@ -22,6 +22,7 @@ import io.netty.channel.ChannelFuture;
 import io.netty.util.concurrent.Future;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
+import java.util.function.Supplier;
 import org.jetbrains.annotations.Async.Execute;
 import org.jetbrains.annotations.Async.Schedule;
 
@@ -43,19 +44,38 @@ public class NettyUtils {
             @Schedule F nettyFuture,
             Function<F, T> mapper
     ) {
-        var fut = new CompletableFuture<T>();
+        return toCompletableFuture(nettyFuture, mapper, CompletableFuture::new);
+    }
+
+    /**
+     * Convert a Netty {@link Future} to a {@link CompletableFuture}.
+     *
+     * @param nettyFuture Netty future.
+     * @param mapper      Function that maps successfully resolved Netty future to a value for a CompletableFuture.
+     * @param completableFutureFactory Factory used to produce a fresh instance of a {@link CompletableFuture}.
+     * @param <T>         Resulting future type.
+     * @param <R>         Netty future result type.
+     * @param <F>         Netty future type.
+     * @return CompletableFuture.
+     */
+    public static <T, R, F extends Future<R>> CompletableFuture<T> toCompletableFuture(
+            @Schedule F nettyFuture,
+            Function<F, T> mapper,
+            Supplier<? extends CompletableFuture<T>> completableFutureFactory
+    ) {
+        CompletableFuture<T> completableFuture = completableFutureFactory.get();
 
-        nettyFuture.addListener((@Execute F future) -> {
-            if (future.isSuccess()) {
-                fut.complete(mapper.apply(future));
-            } else if (future.isCancelled()) {
-                fut.cancel(true);
+        nettyFuture.addListener((@Execute F doneFuture) -> {
+            if (doneFuture.isSuccess()) {
+                completableFuture.complete(mapper.apply(doneFuture));
+            } else if (doneFuture.isCancelled()) {
+                completableFuture.cancel(true);
             } else {
-                fut.completeExceptionally(future.cause());
+                completableFuture.completeExceptionally(doneFuture.cause());
             }
         });
 
-        return fut;
+        return completableFuture;
     }
 
     /**
@@ -76,6 +96,6 @@ public class NettyUtils {
      * @return CompletableFuture.
      */
     public static CompletableFuture<Channel> toChannelCompletableFuture(ChannelFuture channelFuture) {
-        return toCompletableFuture(channelFuture, ChannelFuture::channel);
+        return toCompletableFuture(channelFuture, ChannelFuture::channel, CompletableFuture::new);
     }
 }
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandhakeManagerFactory.java b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandhakeManagerFactory.java
new file mode 100644
index 0000000000..2ab0dfec29
--- /dev/null
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandhakeManagerFactory.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.recovery;
+
+import java.util.UUID;
+
+/**
+ * Factory producing {@link RecoveryClientHandshakeManager} instances.
+ */
+public interface RecoveryClientHandhakeManagerFactory {
+    /**
+     * Produces a {@link RecoveryClientHandshakeManager} instance.
+     *
+     * @param launchId                   ID of the launch.
+     * @param consistentId               Consistent ID of the node.
+     * @param connectionId               ID of the connection.
+     * @param recoveryDescriptorProvider Provider of recovery descriptors to be used.
+     * @return Created manager.
+     */
+    RecoveryClientHandshakeManager create(
+            UUID launchId,
+            String consistentId,
+            short connectionId,
+            RecoveryDescriptorProvider recoveryDescriptorProvider
+    );
+}
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java
index 086919331c..87b2a889e7 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java
@@ -43,15 +43,15 @@ import org.jetbrains.annotations.TestOnly;
  * Recovery protocol handshake manager for a client.
  */
 public class RecoveryClientHandshakeManager implements HandshakeManager {
+    /** Message factory. */
+    private static final NetworkMessagesFactory MESSAGE_FACTORY = new NetworkMessagesFactory();
+
     /** Launch id. */
     private final UUID launchId;
 
     /** Consistent id. */
     private final String consistentId;
 
-    /** Message factory. */
-    private final NetworkMessagesFactory messageFactory;
-
     /** Recovery descriptor provider. */
     private final RecoveryDescriptorProvider recoveryDescriptorProvider;
 
@@ -84,16 +84,14 @@ public class RecoveryClientHandshakeManager implements HandshakeManager {
      *
      * @param launchId Launch id.
      * @param consistentId Consistent id.
-     * @param messageFactory Message factory.
      * @param recoveryDescriptorProvider Recovery descriptor provider.
      */
     public RecoveryClientHandshakeManager(
-            UUID launchId, String consistentId, short connectionId, NetworkMessagesFactory messageFactory,
+            UUID launchId, String consistentId, short connectionId,
             RecoveryDescriptorProvider recoveryDescriptorProvider) {
         this.launchId = launchId;
         this.consistentId = consistentId;
         this.connectionId = connectionId;
-        this.messageFactory = messageFactory;
         this.recoveryDescriptorProvider = recoveryDescriptorProvider;
     }
 
@@ -165,9 +163,9 @@ public class RecoveryClientHandshakeManager implements HandshakeManager {
     }
 
     private void handshake(RecoveryDescriptor descriptor) {
-        PipelineUtils.afterHandshake(ctx.pipeline(), descriptor, createMessageHandler(), messageFactory);
+        PipelineUtils.afterHandshake(ctx.pipeline(), descriptor, createMessageHandler(), MESSAGE_FACTORY);
 
-        HandshakeStartResponseMessage response = messageFactory.handshakeStartResponseMessage()
+        HandshakeStartResponseMessage response = MESSAGE_FACTORY.handshakeStartResponseMessage()
                 .launchId(launchId)
                 .consistentId(consistentId)
                 .receivedCount(descriptor.receivedCount())
@@ -197,7 +195,7 @@ public class RecoveryClientHandshakeManager implements HandshakeManager {
     /**
      * Finishes handshaking process by removing handshake handler from the pipeline and creating a {@link NettySender}.
      */
-    private void finishHandshake() {
+    protected void finishHandshake() {
         // Removes handshake handler from the pipeline as the handshake is finished
         this.ctx.pipeline().remove(this.handler);
 
diff --git a/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java b/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
index 2dd64270c2..4659dfe9f8 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
@@ -45,7 +45,6 @@ import org.apache.ignite.internal.network.netty.InNetworkObject;
 import org.apache.ignite.internal.network.netty.NettySender;
 import org.apache.ignite.internal.network.serialization.ClassDescriptorRegistry;
 import org.apache.ignite.internal.network.serialization.DescriptorRegistry;
-import org.apache.ignite.internal.network.serialization.UserObjectSerializationContext;
 import org.apache.ignite.internal.network.serialization.marshal.UserObjectMarshaller;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.lang.IgniteException;
@@ -98,14 +97,15 @@ public class DefaultMessagingService extends AbstractMessagingService {
      *
      * @param factory Network messages factory.
      * @param topologyService Topology service.
-     * @param userObjectSerializationContext Serialization context.
+     * @param classDescriptorRegistry Descriptor registry.
+     * @param marshaller Marshaller.
      */
     public DefaultMessagingService(NetworkMessagesFactory factory, TopologyService topologyService,
-            UserObjectSerializationContext userObjectSerializationContext) {
+            ClassDescriptorRegistry classDescriptorRegistry, UserObjectMarshaller marshaller) {
         this.factory = factory;
         this.topologyService = topologyService;
-        this.marshaller = userObjectSerializationContext.marshaller();
-        this.classDescriptorRegistry = userObjectSerializationContext.descriptorRegistry();
+        this.classDescriptorRegistry = classDescriptorRegistry;
+        this.marshaller = marshaller;
     }
 
     /**
@@ -251,7 +251,7 @@ public class DefaultMessagingService extends AbstractMessagingService {
         }
 
         return connectionManager.channel(recipientConsistentId, addr)
-                .thenCompose(sender -> sender.send(new OutNetworkObject(message, descriptors)));
+                .thenComposeToCompletable(sender -> sender.send(new OutNetworkObject(message, descriptors)));
     }
 
     private List<ClassDescriptorMessage> beforeRead(NetworkMessage msg) throws Exception {
diff --git a/modules/network/src/main/java/org/apache/ignite/network/NettyBootstrapFactory.java b/modules/network/src/main/java/org/apache/ignite/network/NettyBootstrapFactory.java
index 69b279a35c..14ea4ba141 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/NettyBootstrapFactory.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/NettyBootstrapFactory.java
@@ -57,10 +57,7 @@ public class NettyBootstrapFactory implements IgniteComponent {
      * @param networkConfiguration     Network configuration.
      * @param eventLoopGroupNamePrefix Prefix for event loop group names.
      */
-    public NettyBootstrapFactory(
-            NetworkConfiguration networkConfiguration,
-            String eventLoopGroupNamePrefix
-    ) {
+    public NettyBootstrapFactory(NetworkConfiguration networkConfiguration, String eventLoopGroupNamePrefix) {
         assert eventLoopGroupNamePrefix != null;
         assert networkConfiguration != null;
 
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
index 34dd9a82fb..a49064d82f 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
@@ -78,7 +78,12 @@ public class ScaleCubeClusterServiceFactory {
 
         UserObjectSerializationContext userObjectSerialization = createUserObjectSerializationContext();
 
-        var messagingService = new DefaultMessagingService(messageFactory, topologyService, userObjectSerialization);
+        var messagingService = new DefaultMessagingService(
+                messageFactory,
+                topologyService,
+                userObjectSerialization.descriptorRegistry(),
+                userObjectSerialization.marshaller()
+        );
 
         return new AbstractClusterService(context, topologyService, messagingService) {
 
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyClientTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyClientTest.java
index 0d44fdc308..29e69c7add 100644
--- a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyClientTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyClientTest.java
@@ -33,7 +33,7 @@ import java.nio.channels.ClosedChannelException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+import org.apache.ignite.internal.future.OrderingFuture;
 import org.apache.ignite.internal.network.handshake.HandshakeManager;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.network.NetworkMessage;
@@ -64,7 +64,7 @@ public class NettyClientTest {
      * @throws Exception If failed.
      */
     @Test
-    public void testSuccessfulConnect() throws InterruptedException, ExecutionException, TimeoutException {
+    public void testSuccessfulConnect() throws Exception {
         var channel = new EmbeddedChannel();
 
         ClientAndSender tuple = createClientAndSenderFromChannelFuture(channel.newSucceededFuture());
@@ -83,11 +83,9 @@ public class NettyClientTest {
 
     /**
      * Tests a scenario where NettyClient fails to connect.
-     *
-     * @throws Exception If failed.
      */
     @Test
-    public void testFailedToConnect() throws InterruptedException, ExecutionException, TimeoutException {
+    public void testFailedToConnect() {
         var channel = new EmbeddedChannel();
 
         ClientAndSender tuple = createClientAndSenderFromChannelFuture(channel.newFailedFuture(new ClosedChannelException()));
@@ -134,11 +132,9 @@ public class NettyClientTest {
 
     /**
      * Tests a scenario where a connection is established successfully after a client has been stopped.
-     *
-     * @throws Exception If failed.
      */
     @Test
-    public void testStoppedBeforeStarted() throws Exception {
+    public void testStoppedBeforeStarted() {
         var channel = new EmbeddedChannel();
 
         var future = channel.newPromise();
@@ -161,11 +157,9 @@ public class NettyClientTest {
 
     /**
      * Tests that a {@link NettyClient#start} method can be called only once.
-     *
-     * @throws Exception If failed.
      */
     @Test
-    public void testStartTwice() throws Exception {
+    public void testStartTwice() {
         var channel = new EmbeddedChannel();
 
         Bootstrap bootstrap = mockBootstrap();
@@ -230,7 +224,7 @@ public class NettyClientTest {
     private static class ClientAndSender {
         private final NettyClient client;
 
-        private final CompletableFuture<NettySender> sender;
+        private final OrderingFuture<NettySender> sender;
 
         /**
          * Constructor.
@@ -238,7 +232,7 @@ public class NettyClientTest {
          * @param client Netty client.
          * @param sender Netty sender.
          */
-        private ClientAndSender(NettyClient client, CompletableFuture<NettySender> sender) {
+        private ClientAndSender(NettyClient client, OrderingFuture<NettySender> sender) {
             this.client = client;
             this.sender = sender;
         }
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java
index 7933e5c040..a2b2eeecd0 100644
--- a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java
@@ -447,7 +447,7 @@ public class RecoveryHandshakeTest {
 
     private RecoveryClientHandshakeManager createRecoveryClientHandshakeManager(String consistentId, UUID launchId,
             RecoveryDescriptorProvider provider) {
-        return new RecoveryClientHandshakeManager(launchId, consistentId, CONNECTION_ID, MESSAGE_FACTORY, provider);
+        return new RecoveryClientHandshakeManager(launchId, consistentId, CONNECTION_ID, provider);
     }
 
     private RecoveryServerHandshakeManager createRecoveryServerHandshakeManager(RecoveryDescriptorProvider provider) {
diff --git a/modules/network/src/test/java/org/apache/ignite/network/DefaultMessagingServiceTest.java b/modules/network/src/test/java/org/apache/ignite/network/DefaultMessagingServiceTest.java
new file mode 100644
index 0000000000..2609603d5c
--- /dev/null
+++ b/modules/network/src/test/java/org/apache/ignite/network/DefaultMessagingServiceTest.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.when;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.configuration.schemas.network.InboundView;
+import org.apache.ignite.configuration.schemas.network.NetworkConfiguration;
+import org.apache.ignite.configuration.schemas.network.NetworkView;
+import org.apache.ignite.configuration.schemas.network.OutboundView;
+import org.apache.ignite.internal.network.NetworkMessagesFactory;
+import org.apache.ignite.internal.network.messages.TestMessage;
+import org.apache.ignite.internal.network.messages.TestMessageSerializationFactory;
+import org.apache.ignite.internal.network.messages.TestMessageTypes;
+import org.apache.ignite.internal.network.messages.TestMessagesFactory;
+import org.apache.ignite.internal.network.netty.ConnectionManager;
+import org.apache.ignite.internal.network.recovery.RecoveryClientHandhakeManagerFactory;
+import org.apache.ignite.internal.network.recovery.RecoveryClientHandshakeManager;
+import org.apache.ignite.internal.network.recovery.RecoveryDescriptorProvider;
+import org.apache.ignite.internal.network.serialization.ClassDescriptorFactory;
+import org.apache.ignite.internal.network.serialization.ClassDescriptorRegistry;
+import org.apache.ignite.internal.network.serialization.SerializationService;
+import org.apache.ignite.internal.network.serialization.UserObjectSerializationContext;
+import org.apache.ignite.internal.network.serialization.marshal.DefaultUserObjectMarshaller;
+import org.apache.ignite.internal.network.serialization.marshal.UserObjectMarshaller;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class DefaultMessagingServiceTest {
+    private static final int SENDER_PORT = 2001;
+    private static final int RECEIVER_PORT = 2002;
+
+    @Mock
+    private TopologyService topologyService;
+
+    @Mock
+    private NetworkConfiguration senderNetworkConfig;
+    @Mock
+    private NetworkView senderNetworkConfigView;
+    @Mock
+    private OutboundView senderOutboundConfig;
+    @Mock
+    private InboundView senderInboundConfig;
+
+    @Mock
+    private NetworkConfiguration receiverNetworkConfig;
+    @Mock
+    private NetworkView receiverNetworkConfigView;
+    @Mock
+    private OutboundView receiverOutboundConfig;
+    @Mock
+    private InboundView receiverInboundConfig;
+
+    private final NetworkMessagesFactory networkMessagesFactory = new NetworkMessagesFactory();
+    private final TestMessagesFactory testMessagesFactory = new TestMessagesFactory();
+    private final MessageSerializationRegistryImpl messageSerializationRegistry = new MessageSerializationRegistryImpl();
+
+    private final ClusterNode receiverNode = new ClusterNode(
+            "receiver",
+            "receiver",
+            new NetworkAddress("localhost", RECEIVER_PORT, "receiver")
+    );
+
+    @BeforeEach
+    void initSerializationRegistry() {
+        messageSerializationRegistry.registerFactory(
+                (short) 2,
+                TestMessageTypes.TEST,
+                new TestMessageSerializationFactory(testMessagesFactory)
+        );
+    }
+
+    @Test
+    void messagesSentBeforeChannelStartAreDeliveredInCorrectOrder() throws Exception {
+        configureSender();
+        configureReceiver();
+
+        CountDownLatch allowSendLatch = new CountDownLatch(1);
+
+        try (
+                Services senderServices = createMessagingService(
+                        "sender", "sender-network", senderNetworkConfig, () -> awaitQuietly(allowSendLatch)
+                );
+                Services receiverServices = createMessagingService("receiver", "receiver-network", receiverNetworkConfig, () -> {})
+        ) {
+            List<String> payloads = new CopyOnWriteArrayList<>();
+            CountDownLatch messagesDeliveredLatch = new CountDownLatch(2);
+
+            receiverServices.messagingService.addMessageHandler(
+                    TestMessageTypes.class,
+                    (message, senderAddr, correlationId) -> {
+                        payloads.add(((TestMessage) message).msg());
+                        messagesDeliveredLatch.countDown();
+                    }
+            );
+
+            senderServices.messagingService.send(receiverNode, testMessage("one"));
+            senderServices.messagingService.send(receiverNode, testMessage("two"));
+
+            allowSendLatch.countDown();
+
+            assertTrue(messagesDeliveredLatch.await(1, TimeUnit.SECONDS));
+
+            assertThat(payloads, contains("one", "two"));
+        }
+    }
+
+    private void configureSender() {
+        when(senderNetworkConfigView.port()).thenReturn(SENDER_PORT);
+        configureNetworkDefaults(senderNetworkConfig, senderNetworkConfigView, senderOutboundConfig, senderInboundConfig);
+    }
+
+    private void configureReceiver() {
+        when(receiverNetworkConfigView.port()).thenReturn(RECEIVER_PORT);
+        configureNetworkDefaults(receiverNetworkConfig, receiverNetworkConfigView, receiverOutboundConfig, receiverInboundConfig);
+    }
+
+    private static void configureNetworkDefaults(
+            NetworkConfiguration networkConfig,
+            NetworkView networkConfigView,
+            OutboundView outboundConfig,
+            InboundView inboundConfig
+    ) {
+        when(networkConfig.value()).thenReturn(networkConfigView);
+        when(networkConfigView.portRange()).thenReturn(0);
+        when(networkConfigView.outbound()).thenReturn(outboundConfig);
+        when(networkConfigView.inbound()).thenReturn(inboundConfig);
+    }
+
+    private static void awaitQuietly(CountDownLatch latch) {
+        try {
+            latch.await();
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private TestMessage testMessage(String message) {
+        return testMessagesFactory.testMessage().msg(message).build();
+    }
+
+    private Services createMessagingService(
+            String consistentId,
+            String senderEventLoopGroupNamePrefix,
+            NetworkConfiguration networkConfig,
+            Runnable beforeHandshake
+    ) {
+        ClassDescriptorRegistry classDescriptorRegistry = new ClassDescriptorRegistry();
+        ClassDescriptorFactory classDescriptorFactory = new ClassDescriptorFactory(classDescriptorRegistry);
+        UserObjectMarshaller marshaller = new DefaultUserObjectMarshaller(classDescriptorRegistry, classDescriptorFactory);
+        DefaultMessagingService messagingService = new DefaultMessagingService(
+                networkMessagesFactory,
+                topologyService,
+                classDescriptorRegistry,
+                marshaller
+        );
+
+        SerializationService serializationService = new SerializationService(
+                messageSerializationRegistry,
+                new UserObjectSerializationContext(classDescriptorRegistry, classDescriptorFactory, marshaller)
+        );
+        NettyBootstrapFactory bootstrapFactory = new NettyBootstrapFactory(networkConfig, senderEventLoopGroupNamePrefix);
+        bootstrapFactory.start();
+
+        ConnectionManager connectionManager = new ConnectionManager(
+                networkConfig.value(),
+                serializationService,
+                UUID.randomUUID(),
+                consistentId,
+                bootstrapFactory,
+                clientHandshakeManagerFactoryAdding(beforeHandshake)
+        );
+        connectionManager.start();
+
+        messagingService.setConnectionManager(connectionManager);
+
+        return new Services(connectionManager, messagingService);
+    }
+
+    private static RecoveryClientHandhakeManagerFactory clientHandshakeManagerFactoryAdding(Runnable beforeHandshake) {
+        return new RecoveryClientHandhakeManagerFactory() {
+            @Override
+            public RecoveryClientHandshakeManager create(UUID launchId, String consistentId, short connectionId,
+                    RecoveryDescriptorProvider recoveryDescriptorProvider) {
+                return new RecoveryClientHandshakeManager(launchId, consistentId, connectionId, recoveryDescriptorProvider) {
+                    @Override
+                    protected void finishHandshake() {
+                        beforeHandshake.run();
+                        super.finishHandshake();
+                    }
+                };
+            }
+        };
+    }
+
+    private static class Services implements AutoCloseable {
+        private final ConnectionManager connectionManager;
+        private final DefaultMessagingService messagingService;
+
+        private Services(ConnectionManager connectionManager, DefaultMessagingService messagingService) {
+            this.connectionManager = connectionManager;
+            this.messagingService = messagingService;
+        }
+
+        @Override
+        public void close() throws Exception {
+            IgniteUtils.closeAll(connectionManager::stop, messagingService::stop);
+        }
+    }
+}