You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ni...@apache.org on 2019/08/13 15:51:16 UTC

[ignite] branch master updated: IGNITE-10619: Transfer of files between nodes via CommumicationSpi.

This is an automated email from the ASF dual-hosted git repository.

nizhikov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 7d1b87a  IGNITE-10619: Transfer of files between nodes via CommumicationSpi.
7d1b87a is described below

commit 7d1b87a67d6e115bff41eb80e7755a070b3f32ac
Author: Maxim Muzafarov <ma...@gmail.com>
AuthorDate: Tue Aug 13 18:51:00 2019 +0300

    IGNITE-10619: Transfer of files between nodes via CommumicationSpi.
---
 .../org/apache/ignite/internal/IgniteFeatures.java |   3 +
 .../communication/AbstractTransmission.java        | 111 +++
 .../managers/communication/ChunkReceiver.java      | 115 +++
 .../managers/communication/FileReceiver.java       | 129 +++
 .../managers/communication/FileSender.java         | 172 ++++
 .../managers/communication/GridIoManager.java      | 983 ++++++++++++++++++++-
 .../communication/GridIoMessageFactory.java        |   5 +
 .../communication/SessionChannelMessage.java       | 136 +++
 .../communication/TransmissionHandler.java         |  76 ++
 .../managers/communication/TransmissionMeta.java   | 197 +++++
 .../managers/communication/TransmissionPolicy.java |  43 +
 .../communication/TransmissionReceiver.java        |  71 ++
 .../processors/cache/persistence/file/FileIO.java  |  27 +
 .../cache/persistence/file/FileIODecorator.java    |  12 +
 .../cache/persistence/file/RandomAccessFileIO.java |  17 +
 .../cache/persistence/wal/crc/FastCrc.java         |  27 +-
 .../ignite/internal/util/nio/GridNioServer.java    |  21 +-
 .../util/nio/GridSelectorNioSessionImpl.java       |  21 +-
 .../spi/communication/tcp/TcpCommunicationSpi.java | 203 ++++-
 .../tcp/internal/CommunicationListenerEx.java      |  37 +
 .../communication/tcp/internal/ConnectionKey.java  |   8 +
 .../GridIoManagerFileTransmissionSelfTest.java     | 981 ++++++++++++++++++++
 .../ignite/testsuites/IgniteBasicTestSuite.java    |   5 +-
 23 files changed, 3387 insertions(+), 13 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java
index 9f11a69..f356a9b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java
@@ -66,6 +66,9 @@ public enum IgniteFeatures {
     /** Distributed metastorage. */
     DISTRIBUTED_METASTORAGE(11),
 
+    /** The node can communicate with others via socket channel. */
+    CHANNEL_COMMUNICATION(12),
+
     /** Replacing TcpDiscoveryNode field with nodeId field in discovery messages. */
     TCP_DISCOVERY_MESSAGE_NODE_COMPACT_REPRESENTATION(14);
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/AbstractTransmission.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/AbstractTransmission.java
new file mode 100644
index 0000000..bd1da54
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/AbstractTransmission.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.communication;
+
+import java.io.Closeable;
+import java.nio.channels.SocketChannel;
+import java.util.function.BooleanSupplier;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * Class represents base object which can transmit files (read or write) by chunks of
+ * predefined size over an opened {@link SocketChannel}.
+ */
+abstract class AbstractTransmission implements Closeable {
+    /** Node stopping checker. */
+    private final BooleanSupplier stopChecker;
+
+    /** The size of segment for the read. */
+    protected final int chunkSize;
+
+    /** Ignite logger. */
+    protected final IgniteLogger log;
+
+    /** Initial meta with file transferred attributes. */
+    protected TransmissionMeta meta;
+
+    /** The number of bytes successfully transferred druring iteration. */
+    protected long transferred;
+
+    /**
+     * @param meta Initial file meta info.
+     * @param stopChecker Node stop or prcoess interrupt checker.
+     * @param log Ignite logger.
+     * @param chunkSize Size of chunks.
+     */
+    protected AbstractTransmission(
+        TransmissionMeta meta,
+        BooleanSupplier stopChecker,
+        IgniteLogger log,
+        int chunkSize
+    ) {
+        A.notNull(meta, "Initial file meta cannot be null");
+        A.notNullOrEmpty(meta.name(), "Trasmisson name cannot be empty or null");
+        A.ensure(meta.offset() >= 0, "File start position cannot be negative");
+        A.ensure(meta.count() > 0, "Total number of bytes to transfer must be greater than zero");
+        A.notNull(stopChecker, "Process stop checker cannot be null");
+        A.ensure(chunkSize > 0, "Size of chunks to transfer data must be positive");
+
+        this.stopChecker = stopChecker;
+        this.log = log.getLogger(AbstractTransmission.class);
+        this.chunkSize = chunkSize;
+        this.meta = meta;
+    }
+
+    /**
+     * @return Current receiver state written to a {@link TransmissionMeta} instance.
+     */
+    public TransmissionMeta state() {
+        assert meta != null;
+
+        return new TransmissionMeta(meta.name(),
+            meta.offset() + transferred,
+            meta.count() - transferred,
+            meta.params(),
+            meta.policy(),
+            null);
+    }
+
+    /**
+     * @return Number of bytes which has been transferred.
+     */
+    public long transferred() {
+        return transferred;
+    }
+
+    /**
+     * @return {@code true} if the transmission process should be interrupted.
+     */
+    protected boolean stopped() {
+        return stopChecker.getAsBoolean();
+    }
+
+    /**
+     * @return {@code true} if and only if a chunked object has received all the data it expects.
+     */
+    protected boolean hasNextChunk() {
+        return transferred < meta.count();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(AbstractTransmission.class, this);
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/ChunkReceiver.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/ChunkReceiver.java
new file mode 100644
index 0000000..24aeef2
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/ChunkReceiver.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.communication;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.SocketChannel;
+import java.util.function.BooleanSupplier;
+import java.util.function.Consumer;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * Chunk receiver used to read a given {@link SocketChannel} by chunks of predefined size into
+ * an allocated {@link ByteBuffer}.
+ */
+class ChunkReceiver extends TransmissionReceiver {
+    /** Handler which accepts received data from the given socket. */
+    private final Consumer<ByteBuffer> hnd;
+
+    /** Destination buffer to transfer data to\from. */
+    private ByteBuffer buf;
+
+    /**
+     * @param meta Initial file meta info.
+     * @param chunkSize Size of the chunk.
+     * @param stopChecker Node stop or prcoess interrupt checker.
+     * @param hnd Transmission handler to process received data.
+     * @param log Ignite logger.
+     */
+    public ChunkReceiver(
+        TransmissionMeta meta,
+        int chunkSize,
+        BooleanSupplier stopChecker,
+        Consumer<ByteBuffer> hnd,
+        IgniteLogger log
+    ) {
+        super(meta, stopChecker, log, chunkSize);
+
+        A.notNull(hnd, "ChunkHandler must be provided by transmission handler");
+
+        this.hnd = hnd;
+
+        buf = ByteBuffer.allocate(chunkSize);
+        buf.order(ByteOrder.nativeOrder());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void readChunk(ReadableByteChannel ch) throws IOException {
+        assert buf != null : "Buffer cannot be null since it is used to receive the data from channel: " + this;
+
+        buf.rewind();
+
+        int read = 0;
+        int res;
+
+        // Read data from input channel until the buffer will be completely filled
+        // (buf.remaining() returns 0) or partitially filled buffer if it was the last chunk.
+        while (true) {
+            res = ch.read(buf);
+
+            // Read will return -1 if remote node close connection.
+            if (res < 0) {
+                if (transferred + read != meta.count()) {
+                    throw new IOException("Input data channel reached its end, but file has not fully loaded " +
+                        "[transferred=" + transferred + ", read=" + read + ", total=" + meta.count() + ']');
+                }
+
+                break;
+            }
+
+            read += res;
+
+            if (read == buf.capacity() || buf.position() == buf.capacity())
+                break;
+        }
+
+        if (read == 0)
+            return;
+
+        transferred += read;
+
+        buf.flip();
+
+        hnd.accept(buf);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(ChunkReceiver.class, this, "super", super.toString());
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/FileReceiver.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/FileReceiver.java
new file mode 100644
index 0000000..6af3ca4
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/FileReceiver.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.communication;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.file.Files;
+import java.util.function.BooleanSupplier;
+import java.util.function.Consumer;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ * Class represents the data receiver which is pulling data from channel using
+ * {@link FileChannel#transferFrom(ReadableByteChannel, long, long)} until the
+ * whole file will be completely received.
+ */
+class FileReceiver extends TransmissionReceiver {
+    /** Handler to notify when a file has been received. */
+    private final Consumer<File> hnd;
+
+    /** File to receive data into. */
+    private File file;
+
+    /** The corresponding file channel to work with. */
+    @GridToStringExclude
+    private FileIO fileIo;
+
+    /**
+     * @param meta Initial file meta info.
+     * @param stopChecker Node stop or prcoess interrupt checker.
+     * @param factory Factory to produce IO interface on files.
+     * @param hnd Transmission handler provider to process download result.
+     * @param path File path to destination receiver source.
+     * @param log Ignite logger.
+     */
+    public FileReceiver(
+        TransmissionMeta meta,
+        int chunkSize,
+        BooleanSupplier stopChecker,
+        FileIOFactory factory,
+        Consumer<File> hnd,
+        String path,
+        IgniteLogger log
+    ) {
+        super(meta, stopChecker, log, chunkSize);
+
+        A.notNull(hnd, "FileHandler must be provided by transmission handler");
+        A.notNull(path, "File absolute path cannot be null");
+        A.ensure(!path.trim().isEmpty(), "File absolute path cannot be empty ");
+
+        this.hnd = hnd;
+
+        file = new File(path);
+
+        try {
+            fileIo = factory.create(file);
+
+            fileIo.position(meta.offset());
+        }
+        catch (IOException e) {
+            throw new IgniteException("Unable to open destination file. Receiver will will be stopped", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void receive(ReadableByteChannel ch) throws IOException, InterruptedException {
+        super.receive(ch);
+
+        if (transferred == meta.count())
+            hnd.accept(file);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void readChunk(ReadableByteChannel ch) throws IOException {
+        assert fileIo != null;
+
+        long batchSize = Math.min(chunkSize, meta.count() - transferred);
+
+        long read = fileIo.transferFrom(ch, meta.offset() + transferred, batchSize);
+
+        if (read == 0)
+            throw new IOException("Channel is reached the end of stream. Probably, channel is closed on the remote node");
+
+        if (read > 0)
+            transferred += read;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() {
+        U.closeQuiet(fileIo);
+
+        try {
+            if (transferred != meta.count())
+                Files.delete(file.toPath());
+        }
+        catch (IOException e) {
+            U.error(log, "Error deleting not fully loaded file: " + file, e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(FileReceiver.class, this, "super", super.toString());
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/FileSender.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/FileSender.java
new file mode 100644
index 0000000..a4c060f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/FileSender.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.communication;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.ObjectOutput;
+import java.io.Serializable;
+import java.nio.channels.FileChannel;
+import java.nio.channels.WritableByteChannel;
+import java.util.Map;
+import java.util.function.BooleanSupplier;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.util.IgniteUtils.assertParameter;
+
+/**
+ * Class represents a data sender by chunks of predefined size. All of the chunks will be written to the
+ * given socket channel. Please note, that for each file you are going to send a new {@link FileSender}
+ * instance will be created. The sender must keep its internal state of how much data already being
+ * transferred to send its state to remote node when reconnection required.
+ * <p>
+ * The <em>FileSender</em> uses the zero-copy streaming approach, see <em>FileChannel#transferTo</em> for details.
+ *
+ * @see FileChannel#transferTo(long, long, WritableByteChannel)
+ */
+class FileSender extends AbstractTransmission {
+    /** Corresponding file channel to work with a given file. */
+    @GridToStringExclude
+    private FileIO fileIo;
+
+    /**
+     * @param file File which is going to be sent by chunks.
+     * @param off File offset.
+     * @param cnt Number of bytes to transfer.
+     * @param params Additional file params.
+     * @param plc Policy of handling data on remote.
+     * @param stopChecker Node stop or process interrupt checker.
+     * @param log Ignite logger.
+     * @param factory Factory to produce IO interface on given file.
+     * @param chunkSize Size of chunks.
+     * @throws IOException If fails.
+     */
+    public FileSender(
+        File file,
+        long off,
+        long cnt,
+        Map<String, Serializable> params,
+        TransmissionPolicy plc,
+        BooleanSupplier stopChecker,
+        IgniteLogger log,
+        FileIOFactory factory,
+        int chunkSize
+    ) throws IOException {
+        super(new TransmissionMeta(file.getName(), off, cnt, params, plc, null), stopChecker, log, chunkSize);
+
+        assert file != null;
+
+        fileIo = factory.create(file);
+    }
+
+    /**
+     * @param ch Output channel to write file to.
+     * @param oo Channel to write meta info to.
+     * @param rcvMeta Connection meta received.
+     * @throws IOException If a transport exception occurred.
+     * @throws InterruptedException If thread interrupted.
+     */
+    public void send(WritableByteChannel ch,
+        ObjectOutput oo,
+        @Nullable TransmissionMeta rcvMeta
+    ) throws IOException, InterruptedException {
+        updateSenderState(rcvMeta);
+
+        // Write flag to remote to keep currnet transmission opened.
+        oo.writeBoolean(false);
+
+        // Send meta about current file to remote.
+        oo.writeObject(new TransmissionMeta(meta.name(),
+            meta.offset() + transferred,
+            meta.count() - transferred,
+            meta.params(),
+            meta.policy(),
+            null));
+
+        while (hasNextChunk()) {
+            if (Thread.currentThread().isInterrupted())
+                throw new InterruptedException("Sender thread has been interruped");
+
+            if (stopped())
+                throw new IgniteException("Sender has been cancelled due to the local node is stopping");
+
+            writeChunk(ch);
+        }
+
+        assert transferred == meta.count() : "File is not fully transferred [expect=" + meta.count() +
+            ", actual=" + transferred + ']';
+    }
+
+    /**
+     * @param rcvMeta Conneciton meta info.
+     */
+    private void updateSenderState(TransmissionMeta rcvMeta) {
+        // The remote node doesn't have a file meta info.
+        if (rcvMeta == null || rcvMeta.offset() < 0) {
+            transferred = 0;
+
+            return;
+        }
+
+        long uploadedBytes = rcvMeta.offset() - meta.offset();
+
+        assertParameter(meta.name().equals(rcvMeta.name()), "Attempt to transfer different file " +
+            "while previous is not completed [meta=" + meta + ", meta=" + rcvMeta + ']');
+
+        assertParameter(uploadedBytes >= 0, "Incorrect sync meta [offset=" + rcvMeta.offset() +
+            ", meta=" + meta + ']');
+
+        // No need to set new file position, if it is not changed.
+        if (uploadedBytes == 0)
+            return;
+
+        transferred = uploadedBytes;
+
+        U.log(log, "The number of transferred bytes after reconnect has been updated: " + uploadedBytes);
+    }
+
+    /**
+     * @param ch Channel to write data to.
+     * @throws IOException If fails.
+     */
+    private void writeChunk(WritableByteChannel ch) throws IOException {
+        long batchSize = Math.min(chunkSize, meta.count() - transferred);
+
+        long sent = fileIo.transferTo(meta.offset() + transferred, batchSize, ch);
+
+        if (sent > 0)
+            transferred += sent;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() {
+        U.closeQuiet(fileIo);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(FileSender.class, this, "super", super.toString());
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 91064d3..c952d68 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -17,7 +17,23 @@
 
 package org.apache.ignite.internal.managers.communication;
 
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInput;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutput;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
 import java.io.Serializable;
+import java.net.Socket;
+import java.net.SocketTimeoutException;
+import java.nio.channels.Channel;
+import java.nio.channels.FileChannel;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.SocketChannel;
+import java.nio.channels.WritableByteChannel;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -25,10 +41,13 @@ import java.util.Collection;
 import java.util.Date;
 import java.util.Deque;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Queue;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.ConcurrentHashMap;
@@ -48,10 +67,15 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.atomic.LongAdder;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.BooleanSupplier;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteMessaging;
+import org.apache.ignite.cluster.ClusterGroup;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.GridKernalContext;
@@ -59,7 +83,10 @@ import org.apache.ignite.internal.GridTopic;
 import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
 import org.apache.ignite.internal.IgniteComponentType;
 import org.apache.ignite.internal.IgniteDeploymentCheckedException;
+import org.apache.ignite.internal.IgniteFeatures;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.NodeStoppingException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.direct.DirectMessageReader;
 import org.apache.ignite.internal.direct.DirectMessageWriter;
@@ -68,6 +95,10 @@ import org.apache.ignite.internal.managers.deployment.GridDeployment;
 import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccMessage;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIO;
+import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
 import org.apache.ignite.internal.processors.metric.MetricRegistry;
 import org.apache.ignite.internal.processors.platform.message.PlatformMessageFilter;
 import org.apache.ignite.internal.processors.pool.PoolProcessor;
@@ -79,9 +110,12 @@ import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.lang.GridTuple3;
 import org.apache.ignite.internal.util.lang.IgnitePair;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -99,6 +133,7 @@ import org.apache.ignite.spi.IgniteSpiException;
 import org.apache.ignite.spi.communication.CommunicationListener;
 import org.apache.ignite.spi.communication.CommunicationSpi;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.communication.tcp.internal.CommunicationListenerEx;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
@@ -108,6 +143,8 @@ import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE_COORDINATOR;
 import static org.apache.ignite.internal.GridTopic.TOPIC_COMM_USER;
 import static org.apache.ignite.internal.GridTopic.TOPIC_IO_TEST;
+import static org.apache.ignite.internal.IgniteFeatures.CHANNEL_COMMUNICATION;
+import static org.apache.ignite.internal.IgniteFeatures.nodeSupports;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.AFFINITY_POOL;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.DATA_STREAMER_POOL;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.IDX_POOL;
@@ -126,7 +163,68 @@ import static org.apache.ignite.internal.util.nio.GridNioBackPressureControl.thr
 import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q_OPTIMIZED_RMV;
 
 /**
- * Grid communication manager.
+ * This class represents the internal grid communication (<em>input</em> and <em>output</em>) manager
+ * which is placed as a layer of indirection between the {@link IgniteKernal} and {@link CommunicationSpi}.
+ * The IO manager is responsible for controlling CommunicationSPI which in turn is responsible
+ * for exchanging data between Ignite nodes.
+ *
+ * <h2>Data exchanging</h2>
+ * <p>
+ * Communication manager provides a rich API for data exchanging between a pair of cluster nodes. Two types
+ * of communication <em>Message-based communication</em> and <em>File-based communication</em> are available.
+ * Each of them support sending data to an arbitrary topic on the remote node (see {@link GridTopic} for an
+ * additional information about Ignite topics).
+ *
+ * <h3>Message-based communication</h3>
+ * <p>
+ * {@link Message} and {@link GridTopic} are used to provide a topic-based messaging protocol
+ * between cluster nodes. All of messages used for data exchanging can be devided into two general types:
+ * <em>internal</em> and <em>user</em> messages.
+ * <p>
+ * <em>Internal message</em> communication is used by Ignite Kernal. Please, refer to appropriate methods below:
+ * <ul>
+ * <li>{@link #sendToGridTopic(ClusterNode, GridTopic, Message, byte)}</li>
+ * <li>{@link #sendOrderedMessage(ClusterNode, Object, Message, byte, long, boolean)}</li>
+ * <li>{@link #addMessageListener(Object, GridMessageListener)}</li>
+ * </ul>
+ * <p>
+ * <em>User message</em> communication is directly exposed to the {@link IgniteMessaging} API and provides
+ * for user functionality for topic-based message exchanging among nodes within the cluser defined
+ * by {@link ClusterGroup}. Please, refer to appropriate methods below:
+ * <ul>
+ * <li>{@link #sendToCustomTopic(ClusterNode, Object, Message, byte)}</li>
+ * <li>{@link #addUserMessageListener(Object, IgniteBiPredicate, UUID)}</li>
+ * </ul>
+ *
+ * <h3>File-based communication</h3>
+ * <p>
+ * Sending or receiving binary data (represented by a <em>File</em>) over a <em>SocketChannel</em> is only
+ * possible when the build-in {@link TcpCommunicationSpi} implementation of Communication SPI is used and
+ * both local and remote nodes are {@link IgniteFeatures#CHANNEL_COMMUNICATION CHANNEL_COMMUNICATION} feature
+ * support. To ensue that the remote node satisfies all conditions the {@link #fileTransmissionSupported(ClusterNode)}
+ * method must be called prior to data sending.
+ * <p>
+ * It is possible to receive a set of files on a particular topic (any of {@link GridTopic}) on the remote node.
+ * A transmission handler for desired topic must be registered prior to opening transmission sender to it.
+ * Methods below are used to register handlers and open new transmissions:
+ * <ul>
+ * <li>{@link #addTransmissionHandler(Object, TransmissionHandler)}</li>
+ * <li>{@link #removeTransmissionHandler(Object)}</li>
+ * <li>{@link #openTransmissionSender(UUID, Object)}</li>
+ * </ul>
+ * <p>
+ * Each transmission sender opens a new transmission session to remote node prior to sending files over it.
+ * (see description of {@link TransmissionSender} for details). The {@link TransmissionSender}
+ * will send all files within single session syncronously one by one.
+ * <p>
+ * <em>NOTE.</em> It is important to call <em>close()</em> method or use <em>try-with-resource</em>
+ * statement to release all resources once you've done with the transmission session. This ensures that all
+ * resources are released on remote node in a proper way (i.e. transmission handlers are closed).
+ * <p>
+ *
+ * @see TcpCommunicationSpi
+ * @see IgniteMessaging
+ * @see TransmissionHandler
  */
 public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializable>> {
     /** Io communication metrics registry name. */
@@ -162,6 +260,45 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
     /** Current IO policy. */
     private static final ThreadLocal<Byte> CUR_PLC = new ThreadLocal<>();
 
+    /**
+     * Default chunk size in bytes used for sending\receiving files over a {@link SocketChannel}.
+     * Setting the transfer chunk size more than <tt>1 MB</tt> is meaningless because there is
+     * no asymptotic benefit. What you're trying to achieve with larger transfer chunk sizes is
+     * fewer thread context switches, and every time we double the transfer size you have
+     * the context switch cost.
+     * <p>
+     * Default value is {@code 256Kb}.
+     */
+    private static final int DFLT_CHUNK_SIZE_BYTES = 256 * 1024;
+
+    /** Mutex to achieve consistency of transmission handlers and receiver contexts. */
+    private final Object rcvMux = new Object();
+
+    /** Map of registered handlers per each IO topic. */
+    private final ConcurrentMap<Object, TransmissionHandler> topicTransmissionHnds = new ConcurrentHashMap<>();
+
+    /** The map of already known channel read contexts by its registered topics. */
+    private final ConcurrentMap<Object, ReceiverContext> rcvCtxs = new ConcurrentHashMap<>();
+
+    /** The map of sessions which are currently writing files and their corresponding interruption flags. */
+    private final ConcurrentMap<T2<UUID, IgniteUuid>, AtomicBoolean> senderStopFlags = new ConcurrentHashMap<>();
+
+    /**
+     * Default factory to provide IO operation interface over files for further transmission them between nodes.
+     * Some implementations of file senders\receivers are using the zero-copy approach to transfer bytes
+     * from a file to the given {@link SocketChannel} and vice-versa. So, it is necessary to produce an {@link FileIO}
+     * implementation based on {@link FileChannel} which is reflected in Ignite project as {@link RandomAccessFileIO}.
+     *
+     * @see FileChannel#transferTo(long, long, WritableByteChannel)
+     */
+    private final FileIOFactory fileIoFactory = new RandomAccessFileIOFactory();
+
+    /** The maximum number of retry attempts (read or write attempts). */
+    private final int retryCnt;
+
+    /** Network timeout in milliseconds. */
+    private final int netTimeoutMs;
+
     /** Listeners by topic. */
     private final ConcurrentMap<Object, GridMessageListener> lsnrMap = new ConcurrentHashMap<>();
 
@@ -191,7 +328,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
     private final ConcurrentMap<UUID, Deque<DelayedMessage>> waitMap = new ConcurrentHashMap<>();
 
     /** Communication message listener. */
-    private CommunicationListener<Serializable> commLsnr;
+    private CommunicationListenerEx<Serializable> commLsnr;
 
     /** Grid marshaller. */
     private final Marshaller marsh;
@@ -218,7 +355,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
     private MessageFormatter formatter;
 
     /** Stopping flag. */
-    private boolean stopping;
+    private volatile boolean stopping;
 
     /** */
     private final AtomicReference<ConcurrentHashMap<Long, IoTestFuture>> ioTestMap = new AtomicReference<>();
@@ -263,6 +400,9 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
             "Received messages count.");
 
         ioMetric.register(RCVD_BYTES_CNT, spi::getReceivedBytesCount, "Received bytes count.");
+
+        retryCnt = ctx.config().getNetworkSendRetryCount();
+        netTimeoutMs = (int)ctx.config().getNetworkTimeout();
     }
 
     /**
@@ -294,7 +434,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
     @Override public void start() throws IgniteCheckedException {
         startSpi();
 
-        getSpi().setListener(commLsnr = new CommunicationListener<Serializable>() {
+        getSpi().setListener(commLsnr = new CommunicationListenerEx<Serializable>() {
             @Override public void onMessage(UUID nodeId, Serializable msg, IgniteRunnable msgC) {
                 try {
                     onMessage0(nodeId, (GridIoMessage)msg, msgC);
@@ -310,6 +450,17 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
                 for (GridDisconnectListener lsnr : disconnectLsnrs)
                     lsnr.onNodeDisconnected(nodeId);
             }
+
+            @Override public void onChannelOpened(UUID rmtNodeId, Serializable initMsg, Channel channel) {
+                try {
+                    onChannelOpened0(rmtNodeId, (GridIoMessage)initMsg, channel);
+                }
+                catch (ClassCastException ignored) {
+                    U.error(log, "Communication manager received message of unknown type (will ignore): " +
+                        initMsg.getClass().getName() + ". Most likely GridCommunicationSpi is being used directly, " +
+                        "which is illegal - make sure to send messages only via GridProjection API.");
+                }
+            }
         });
 
         ctx.addNodeAttribute(DIRECT_PROTO_VER_ATTR, DIRECT_PROTO_VER);
@@ -773,6 +924,36 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
 
                     case EVT_NODE_LEFT:
                     case EVT_NODE_FAILED:
+                        busyLock.readLock().lock();
+
+                        try {
+                            // Stop all writer sessions.
+                            for (Map.Entry<T2<UUID, IgniteUuid>, AtomicBoolean> writeSesEntry: senderStopFlags.entrySet()) {
+                                if (writeSesEntry.getKey().get1().equals(nodeId))
+                                    writeSesEntry.getValue().set(true);
+                            }
+
+                            synchronized (rcvMux) {
+                                // Clear the context on the uploader node left.
+                                Iterator<Map.Entry<Object, ReceiverContext>> it = rcvCtxs.entrySet().iterator();
+
+                                while (it.hasNext()) {
+                                    Map.Entry<Object, ReceiverContext> e = it.next();
+
+                                    if (nodeId.equals(e.getValue().rmtNodeId)) {
+                                        it.remove();
+
+                                        interruptRecevier(e.getValue(),
+                                            new ClusterTopologyCheckedException("Remote node left the grid. " +
+                                                "Receiver has been stopped : " + nodeId));
+                                    }
+                                }
+                            }
+                        }
+                        finally {
+                            busyLock.readLock().unlock();
+                        }
+
                         for (Map.Entry<Object, ConcurrentMap<UUID, GridCommunicationMessageSet>> e :
                             msgSetMap.entrySet()) {
                             ConcurrentMap<UUID, GridCommunicationMessageSet> map = e.getValue();
@@ -933,6 +1114,21 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
                 evtMgr.removeLocalEventListener(discoLsnr);
 
             stopping = true;
+
+            Set<ReceiverContext> rcvs;
+
+            synchronized (rcvMux) {
+                topicTransmissionHnds.clear();
+
+                rcvs = new HashSet<>(rcvCtxs.values());
+
+                rcvCtxs.clear();
+            }
+
+            for (ReceiverContext rctx : rcvs) {
+                interruptRecevier(rctx, new NodeStoppingException("Local node io manager requested to be stopped: "
+                    + ctx.localNodeId()));
+            }
         }
         finally {
             busyLock.writeLock().unlock();
@@ -948,6 +1144,51 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
     }
 
     /**
+     * @param rmtNodeId The remote node id.
+     * @param initMsg Message with additional channel params.
+     * @param channel The channel to notify listeners with.
+     */
+    private void onChannelOpened0(UUID rmtNodeId, GridIoMessage initMsg, Channel channel) {
+        Lock busyLock0 = busyLock.readLock();
+
+        busyLock0.lock();
+
+        try {
+            if (stopping) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Received communication channel create event while node stopping (will ignore) " +
+                        "[rmtNodeId=" + rmtNodeId + ", initMsg=" + initMsg + ']');
+                }
+
+                return;
+            }
+
+            if (initMsg.topic() == null) {
+                int topicOrd = initMsg.topicOrdinal();
+
+                initMsg.topic(topicOrd >= 0 ? GridTopic.fromOrdinal(topicOrd) :
+                    U.unmarshal(marsh, initMsg.topicBytes(), U.resolveClassLoader(ctx.config())));
+            }
+
+            byte plc = initMsg.policy();
+
+            pools.poolForPolicy(plc).execute(new Runnable() {
+                @Override public void run() {
+                    processOpenedChannel(initMsg.topic(), rmtNodeId, (SessionChannelMessage)initMsg.message(),
+                        (SocketChannel)channel);
+                }
+            });
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to process channel creation event due to exception " +
+                "[rmtNodeId=" + rmtNodeId + ", initMsg=" + initMsg + ']' , e);
+        }
+        finally {
+            busyLock0.unlock();
+        }
+    }
+
+    /**
      * @param nodeId Node ID.
      * @param msg Message bytes.
      * @param msgC Closure to call when message processing finished.
@@ -1630,6 +1871,107 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
     }
 
     /**
+     * @param remoteId The remote node to connect to.
+     * @param topic The remote topic to connect to.
+     * @return The channel instance to communicate with remote.
+     */
+    public TransmissionSender openTransmissionSender(UUID remoteId, Object topic) {
+        return new TransmissionSender(remoteId, topic);
+    }
+
+    /**
+     * @param topic The {@link GridTopic} to register handler to.
+     * @param hnd Handler which will handle file upload requests.
+     */
+    public void addTransmissionHandler(Object topic, TransmissionHandler hnd) {
+        TransmissionHandler hnd0 = topicTransmissionHnds.putIfAbsent(topic, hnd);
+
+        assert hnd0 == null : "The topic already have an appropriate session handler [topic=" + topic + ']';
+    }
+
+    /**
+     * @param topic The topic to erase handler from.
+     */
+    public void removeTransmissionHandler(Object topic) {
+        ReceiverContext rcvCtx0;
+
+        synchronized (rcvMux) {
+            topicTransmissionHnds.remove(topic);
+
+            rcvCtx0 = rcvCtxs.remove(topic);
+        }
+
+        interruptRecevier(rcvCtx0,
+            new IgniteCheckedException("Receiver has been closed due to removing corresponding transmission handler " +
+                "on local node [nodeId=" + ctx.localNodeId() + ']'));
+    }
+
+    /**
+     * This method must be used prior to opening a {@link TransmissionSender} by calling
+     * {@link #openTransmissionSender(UUID, Object)} to ensure that remote and local nodes
+     * are fully support direct {@link SocketChannel} connection to transfer data.
+     *
+     * @param node Remote node to check.
+     * @return {@code true} if a file can be sent over socket channel directly.
+     */
+    public boolean fileTransmissionSupported(ClusterNode node) {
+        return ((CommunicationSpi)getSpi() instanceof TcpCommunicationSpi) &&
+            nodeSupports(node, CHANNEL_COMMUNICATION);
+    }
+
+    /**
+     * @param nodeId Destination node to connect to.
+     * @param topic Topic to send the request to.
+     * @param initMsg Channel initialization message.
+     * @return Established {@link Channel} to use.
+     * @throws IgniteCheckedException If fails.
+     */
+    private IgniteInternalFuture<Channel> openChannel(
+        UUID nodeId,
+        Object topic,
+        Message initMsg
+    ) throws IgniteCheckedException {
+        assert nodeId != null;
+        assert topic != null;
+        assert !locNodeId.equals(nodeId) : "Channel cannot be opened to the local node itself: " + nodeId;
+        assert (CommunicationSpi)getSpi() instanceof TcpCommunicationSpi : "Only TcpCommunicationSpi supports direct " +
+            "connections between nodes: " + getSpi().getClass();
+
+        ClusterNode node = ctx.discovery().node(nodeId);
+
+        if (node == null)
+            throw new ClusterTopologyCheckedException("Failed to open a new channel to remote node (node left): " + nodeId);
+
+        int topicOrd = topic instanceof GridTopic ? ((Enum<GridTopic>)topic).ordinal() : -1;
+
+        GridIoMessage ioMsg = createGridIoMessage(topic,
+            topicOrd,
+            initMsg,
+            PUBLIC_POOL,
+            false,
+            0,
+            false);
+
+        try {
+            if (topicOrd < 0)
+                ioMsg.topicBytes(U.marshal(marsh, topic));
+
+            return ((TcpCommunicationSpi)(CommunicationSpi)getSpi()).openChannel(node, ioMsg);
+        }
+        catch (IgniteSpiException e) {
+            if (e.getCause() instanceof ClusterTopologyCheckedException)
+                throw (ClusterTopologyCheckedException)e.getCause();
+
+            if (!ctx.discovery().alive(node))
+                throw new ClusterTopologyCheckedException("Failed to create channel (node left): " + node.id(), e);
+
+            throw new IgniteCheckedException("Failed to create channel (node may have left the grid or " +
+                "TCP connection cannot be established due to unknown issues) " +
+                "[node=" + node + ", topic=" + topic + ']', e);
+        }
+    }
+
+    /**
      * @param node Destination node.
      * @param topic Topic to send the message to.
      * @param topicOrd GridTopic enumeration ordinal.
@@ -2349,6 +2691,288 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
     }
 
     /**
+     * @param rctx Receiver context to use.
+     * @param ex Exception to close receiver with.
+     */
+    private void interruptRecevier(ReceiverContext rctx, Exception ex) {
+        if (rctx == null)
+            return;
+
+        if (rctx.interrupted.compareAndSet(false, true)) {
+            if (rctx.timeoutObj != null)
+                ctx.timeout().removeTimeoutObject(rctx.timeoutObj);
+
+            U.closeQuiet(rctx.rcv);
+
+            rctx.lastState = rctx.lastState == null ?
+                new TransmissionMeta(ex) : rctx.lastState.error(ex);
+
+            rctx.hnd.onException(rctx.rmtNodeId, ex);
+
+            U.error(log, "Receiver has been interrupted due to an exception occurred [ctx=" + rctx + ']', ex);
+        }
+    }
+
+    /**
+     * @param topic Topic to which the channel is created.
+     * @param rmtNodeId Remote node id.
+     * @param initMsg Channel initialization message with additional params.
+     * @param ch Channel instance.
+     */
+    private void processOpenedChannel(Object topic, UUID rmtNodeId, SessionChannelMessage initMsg, SocketChannel ch) {
+        ReceiverContext rcvCtx = null;
+        ObjectInputStream in = null;
+        ObjectOutputStream out = null;
+
+        try {
+            if (stopping) {
+                throw new NodeStoppingException("Local node is stopping. Channel will be closed [topic=" + topic +
+                    ", channel=" + ch + ']');
+            }
+
+            if (initMsg == null || initMsg.sesId() == null) {
+                U.warn(log, "There is no initial message provied for given topic. Opened channel will be closed " +
+                    "[rmtNodeId=" + rmtNodeId + ", topic=" + topic + ", initMsg=" + initMsg + ']');
+
+                return;
+            }
+
+            configureChannel(ch, netTimeoutMs);
+
+            in = new ObjectInputStream(ch.socket().getInputStream());
+            out = new ObjectOutputStream(ch.socket().getOutputStream());
+
+            IgniteUuid newSesId = initMsg.sesId();
+
+            synchronized (rcvMux) {
+                TransmissionHandler hnd = topicTransmissionHnds.get(topic);
+
+                if (hnd == null) {
+                    U.warn(log, "There is no handler for a given topic. Channel will be closed [rmtNodeId=" + rmtNodeId +
+                        ", topic=" + topic + ']');
+
+                    return;
+                }
+
+                rcvCtx = rcvCtxs.computeIfAbsent(topic, t -> new ReceiverContext(rmtNodeId, hnd, newSesId));
+            }
+
+            // Do not allow multiple connections for the same session.
+            if (!newSesId.equals(rcvCtx.sesId)) {
+                IgniteCheckedException err = new IgniteCheckedException("Requested topic is busy by another transmission. " +
+                    "It's not allowed to process different sessions over the same topic simultaneously. " +
+                    "Channel will be closed [initMsg=" + initMsg + ", channel=" + ch + ", nodeId=" + rmtNodeId + ']');
+
+                U.error(log, err);
+
+                out.writeObject(new TransmissionMeta(err));
+
+                return;
+            }
+
+            if (log.isDebugEnabled()) {
+                log.debug("Trasmission open a new channel [rmtNodeId=" + rmtNodeId + ", topic=" + topic +
+                    ", initMsg=" + initMsg + ']');
+            }
+
+            if (!rcvCtx.lock.tryLock(netTimeoutMs, TimeUnit.MILLISECONDS))
+                throw new IgniteException("Wait for the previous receiver finished its work timeouted: " + rcvCtx);
+
+            try {
+                if (rcvCtx.timeoutObj != null)
+                    ctx.timeout().removeTimeoutObject(rcvCtx.timeoutObj);
+
+                // Send previous context state to sync remote and local node.
+                out.writeObject(rcvCtx.lastState == null ? new TransmissionMeta() : rcvCtx.lastState);
+
+                if (rcvCtx.lastState == null || rcvCtx.lastState.error() == null)
+                    receiveFromChannel(topic, rcvCtx, in, out, ch);
+                else
+                    interruptRecevier(rcvCtxs.remove(topic), rcvCtx.lastState.error());
+            }
+            finally {
+                rcvCtx.lock.unlock();
+            }
+        }
+        catch (Throwable t) {
+            U.error(log, "Download session cannot be finished due to an unexpected error [ctx=" + rcvCtx + ']', t);
+
+            // Do not remove receiver context here, since sender will recconect to get this error.
+            interruptRecevier(rcvCtx, new IgniteCheckedException("Channel processing error [nodeId=" + rmtNodeId + ']', t));
+        }
+        finally {
+            U.closeQuiet(in);
+            U.closeQuiet(out);
+            U.closeQuiet(ch);
+        }
+    }
+
+    /**
+     * @param topic Topic handler related to.
+     * @param rcvCtx Receiver read context.
+     * @throws NodeStoppingException If processing fails.
+     * @throws InterruptedException If thread interrupted.
+     */
+    private void receiveFromChannel(
+        Object topic,
+        ReceiverContext rcvCtx,
+        ObjectInputStream in,
+        ObjectOutputStream out,
+        ReadableByteChannel ch
+    ) throws NodeStoppingException, InterruptedException {
+        try {
+            while (true) {
+                if (Thread.currentThread().isInterrupted())
+                    throw new InterruptedException("The thread has been interrupted. Stop downloading file.");
+
+                if (stopping)
+                    throw new NodeStoppingException("Operation has been cancelled (node is stopping)");
+
+                boolean exit = in.readBoolean();
+
+                if (exit) {
+                    rcvCtxs.remove(topic);
+
+                    break;
+                }
+
+                TransmissionMeta meta = (TransmissionMeta)in.readObject();
+
+                if (rcvCtx.rcv == null) {
+                    rcvCtx.rcv = createReceiver(rcvCtx.rmtNodeId,
+                        rcvCtx.hnd,
+                        meta,
+                        () -> stopping || rcvCtx.interrupted.get());
+
+                    rcvCtx.lastState = meta;
+                }
+
+                validate(rcvCtx.lastState, meta);
+
+                try {
+                    long startTime = U.currentTimeMillis();
+
+                    rcvCtx.rcv.receive(ch);
+
+                    // Write processing ack.
+                    out.writeBoolean(true);
+                    out.flush();
+
+                    rcvCtx.rcv.close();
+
+                    U.log(log, "File has been received " +
+                        "[name=" + rcvCtx.rcv.state().name() + ", transferred=" + rcvCtx.rcv.transferred() +
+                        ", time=" + (double)((U.currentTimeMillis() - startTime) / 1000) + " sec" +
+                        ", rmtId=" + rcvCtx.rmtNodeId + ']');
+
+                    rcvCtx.rcv = null;
+                }
+                catch (Throwable e) {
+                    rcvCtx.lastState = rcvCtx.rcv.state();
+
+                    throw e;
+                }
+            }
+        }
+        catch (ClassNotFoundException e) {
+            throw new IgniteException(e);
+        }
+        catch (IOException e) {
+            // Waiting for re-establishing connection.
+            U.warn(log, "Сonnection from the remote node lost. Will wait for the new one to continue file " +
+                "receive [nodeId=" + rcvCtx.rmtNodeId + ", sesKey=" + rcvCtx.sesId + ']', e);
+
+            long startTs = U.currentTimeMillis();
+
+            boolean added = ctx.timeout().addTimeoutObject(rcvCtx.timeoutObj = new GridTimeoutObject() {
+                @Override public IgniteUuid timeoutId() {
+                    return rcvCtx.sesId;
+                }
+
+                @Override public long endTime() {
+                    return startTs + netTimeoutMs;
+                }
+
+                @Override public void onTimeout() {
+                    interruptRecevier(rcvCtxs.remove(topic), new IgniteCheckedException("Receiver is closed due to " +
+                        "waiting for the reconnect has been timeouted"));
+                }
+            });
+
+            assert added;
+        }
+    }
+
+    /**
+     * @param prev Previous available transmission meta.
+     * @param next Next transmission meta.
+     */
+    private void validate(TransmissionMeta prev, TransmissionMeta next) {
+        A.ensure(prev.name().equals(next.name()), "Attempt to load different file " +
+            "[prev=" + prev + ", next=" + next + ']');
+
+        A.ensure(prev.offset() == next.offset(),
+            "The next chunk offest is incorrect [prev=" + prev + ", meta=" + next + ']');
+
+        A.ensure(prev.count() == next.count(), " The count of bytes to transfer for " +
+            "the next chunk is incorrect [prev=" + prev + ", next=" + next + ']');
+
+        A.ensure(prev.policy() == next.policy(), "Attemt to continue file upload with" +
+            " different transmission policy [prev=" + prev + ", next=" + next + ']');
+    }
+
+    /**
+     * @param nodeId Remote node id.
+     * @param hnd Current handler instance which produces file handlers.
+     * @param meta Meta information about file pending to receive to create appropriate receiver.
+     * @param stopChecker Process interrupt checker.
+     * @return Chunk data recevier.
+     */
+    private TransmissionReceiver createReceiver(
+        UUID nodeId,
+        TransmissionHandler hnd,
+        TransmissionMeta meta,
+        BooleanSupplier stopChecker
+    ) {
+        switch (meta.policy()) {
+            case FILE:
+                return new FileReceiver(
+                    meta,
+                    DFLT_CHUNK_SIZE_BYTES,
+                    stopChecker,
+                    fileIoFactory,
+                    hnd.fileHandler(nodeId, meta),
+                    hnd.filePath(nodeId, meta),
+                    log);
+
+            case CHUNK:
+                return new ChunkReceiver(
+                    meta,
+                    ctx.config()
+                        .getDataStorageConfiguration()
+                        .getPageSize(),
+                    stopChecker,
+                    hnd.chunkHandler(nodeId, meta),
+                    log);
+
+            default:
+                throw new IllegalStateException("The type of transmission policy is unknown. An implementation " +
+                    "required: " + meta.policy());
+        }
+    }
+
+    /**
+     * @param channel Socket channel to configure blocking mode.
+     * @param timeout Ignite network configuration timeout.
+     * @throws IOException If fails.
+     */
+    private static void configureChannel(SocketChannel channel, int timeout) throws IOException {
+        // Timeout must be enabled prior to entering the blocking mode to have effect.
+        channel.socket().setSoTimeout(timeout);
+        channel.configureBlocking(true);
+    }
+
+    /**
      * Dumps SPI stats to diagnostic logs in case TcpCommunicationSpi is used, no-op otherwise.
      */
     public void dumpStats() {
@@ -2369,6 +2993,357 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
     }
 
     /**
+     * Read context holds all the information about current transfer read from channel process.
+     */
+    private static class ReceiverContext {
+        /** The remote node input channel came from. */
+        private final UUID rmtNodeId;
+
+        /** Current sesssion handler. */
+        @GridToStringExclude
+        private final TransmissionHandler hnd;
+
+        /** Unique session request id. */
+        private final IgniteUuid sesId;
+
+        /** Flag indicates that current file handling process must be interrupted. */
+        private final AtomicBoolean interrupted = new AtomicBoolean();
+
+        /** Only one thread can handle receiver context. */
+        private final Lock lock = new ReentrantLock();
+
+        /** Last infinished downloading object. */
+        private TransmissionReceiver rcv;
+
+        /** Last saved state about file data processing. */
+        private TransmissionMeta lastState;
+
+        /** Close receiver timeout object. */
+        private GridTimeoutObject timeoutObj;
+
+        /**
+         * @param rmtNodeId Remote node id.
+         * @param hnd Channel handler of current topic.
+         * @param sesId Unique session request id.
+         */
+        public ReceiverContext(UUID rmtNodeId, TransmissionHandler hnd, IgniteUuid sesId) {
+            this.rmtNodeId = rmtNodeId;
+            this.hnd = hnd;
+            this.sesId = sesId;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(ReceiverContext.class, this);
+        }
+    }
+
+    /**
+     * Сlass represents an implementation of transmission file writer. Each new instance of transmission sender
+     * will establish a new connection with unique transmission session identifier to the remote node and given
+     * topic (an arbitraty {@link GridTopic} can be used).
+     *
+     * <h2>Zero-copy approach</h2>
+     * <p>
+     * Current implementation of transmission sender is based on file zero-copy approach (the {@link FileSender}
+     * is used under the hood). It is much more efficient than a simple loop that reads data from
+     * given file and writes it to the target socket channel. But if operating system does not support zero-copy
+     * file transfer, sending a file with {@link TransmissionSender} might fail or yield worse performance.
+     * <p>
+     * Please, refer to <a href="http://en.wikipedia.org/wiki/Zero-copy">http://en.wikipedia.org/wiki/Zero-copy</a>
+     * or {@link FileChannel#transferTo(long, long, WritableByteChannel)} for details of such approach.
+     *
+     * <h2>File and Chunk handlers</h2>
+     * <p>
+     * It is possible to choose a file handler prior to sendig the file to remote node  within opened transmission
+     * session. There are two types of handlers available:
+     * {@link TransmissionHandler#chunkHandler(UUID, TransmissionMeta)} and
+     * {@link TransmissionHandler#fileHandler(UUID, TransmissionMeta)}. You can use an appropriate
+     * {@link TransmissionPolicy} for {@link #send(File, long, long, Map, TransmissionPolicy)} method to switch
+     * between them.
+     *
+     * <h2>Exceptions handling</h2>
+     * <p>
+     * Each transmission can have two different high-level types of exception which are handled differently:
+     * <ul>
+     * <li><em>transport</em> exception(e.g. some network issues)</li>
+     * <li><em>application</em>\<em>handler</em> level exception</li>
+     * </ul>
+     *
+     * <h3><em>Application</em> exceptions</h3>
+     * <p>
+     * The transmission will be stopped immediately and wrapping <em>IgniteExcpetion</em> thrown in case of
+     * any <em>application</em> exception occured.
+     *
+     * <h3><em>Transport</em> exceptions</h3>
+     * <p>
+     * All transport level exceptions of transmission file sender will require transmission to be reconnected.
+     * For instance, when the local node closes the socket connection in orderly way, but the file is not fully
+     * handled by remote node, the read operation over the same socket endpoint will return <tt>-1</tt>. Such
+     * result will be consideread as an {@link IOException} by handler and it will wait for reestablishing connection
+     * to continue file loading.
+     * <p>
+     * Another example, the transmission sender gets the <em>Connection reset by peer</em> IOException message.
+     * This means that the remote node you are connected to has to reset the connection. This is usually caused by a
+     * high amount of traffic on the host, but may be caused by a server error or the remote node has exhausted
+     * system resources as well. Such {@link IOException} will be considered as <em>reconnection required</em>.
+     *
+     * <h3>Timeout exceptions</h3>
+     * <p>
+     * For read operations over the {@link InputStream} or write operation through the {@link OutputStream}
+     * the {@link Socket#setSoTimeout(int)} will be used and an {@link SocketTimeoutException} will be
+     * thrown when the timeout occured. The default value is taken from {@link IgniteConfiguration#getNetworkTimeout()}.
+     * <p>
+     * If reconnection is not occurred withing configured timeout interval the timeout object will be fired which
+     * clears corresponding to the used topic the {@link ReceiverContext}.
+     *
+     * <h2>Release resources</h2>
+     * <p>
+     * It is important to call <em>close()</em> method or use <em>try-with-resource</em> statement to release
+     * all resources once you've done with sending files.
+     *
+     * @see FileChannel#transferTo(long, long, WritableByteChannel)
+     */
+    public class TransmissionSender implements Closeable {
+        /** Remote node id to connect to. */
+        private final UUID rmtId;
+
+        /** Remote topic to connect to. */
+        private final Object topic;
+
+        /** Current unique session identifier to transfer files to remote node. */
+        private T2<UUID, IgniteUuid> sesKey;
+
+        /** Instance of opened writable channel to work with. */
+        private WritableByteChannel channel;
+
+        /** Decorated with data operations socket of output channel. */
+        private ObjectOutput out;
+
+        /** Decorated with data operations socket of input channel. */
+        private ObjectInput in;
+
+        /**
+         * @param rmtId The remote node to connect to.
+         * @param topic The remote topic to connect to.
+         */
+        public TransmissionSender(
+            UUID rmtId,
+            Object topic
+        ) {
+            this.rmtId = rmtId;
+            this.topic = topic;
+            sesKey = new T2<>(rmtId, IgniteUuid.randomUuid());
+        }
+
+        /**
+         * @return The synchronization meta if case connection has been reset.
+         * @throws IgniteCheckedException If fails.
+         * @throws IOException If fails.
+         */
+        private TransmissionMeta connect() throws IgniteCheckedException, IOException {
+            SocketChannel channel = (SocketChannel)openChannel(rmtId,
+                topic,
+                new SessionChannelMessage(sesKey.get2()))
+                .get();
+
+            configureChannel(channel, netTimeoutMs);
+
+            this.channel = (WritableByteChannel)channel;
+            out = new ObjectOutputStream(channel.socket().getOutputStream());
+            in = new ObjectInputStream(channel.socket().getInputStream());
+
+            TransmissionMeta syncMeta;
+
+            try {
+                // Synchronize state between remote and local nodes.
+                syncMeta = (TransmissionMeta)in.readObject();
+            }
+            catch (ClassNotFoundException e) {
+                throw new IgniteException (e);
+            }
+
+            return syncMeta;
+        }
+
+        /**
+         * @param file Source file to send to remote.
+         * @param params Additional file params.
+         * @param plc The policy of handling data on remote.
+         * @throws IgniteCheckedException If fails.
+         */
+        public void send(
+            File file,
+            Map<String, Serializable> params,
+            TransmissionPolicy plc
+        ) throws IgniteCheckedException, InterruptedException, IOException {
+            send(file, 0, file.length(), params, plc);
+        }
+
+        /**
+         * @param file Source file to send to remote.
+         * @param plc The policy of handling data on remote.
+         * @throws IgniteCheckedException If fails.
+         */
+        public void send(
+            File file,
+            TransmissionPolicy plc
+        ) throws IgniteCheckedException, InterruptedException, IOException {
+            send(file, 0, file.length(), new HashMap<>(), plc);
+        }
+
+        /**
+         * @param file Source file to send to remote.
+         * @param offset Position to start trasfer at.
+         * @param cnt Number of bytes to transfer.
+         * @param params Additional file params.
+         * @param plc The policy of handling data on remote.
+         * @throws IgniteCheckedException If fails.
+         */
+        public void send(
+            File file,
+            long offset,
+            long cnt,
+            Map<String, Serializable> params,
+            TransmissionPolicy plc
+        ) throws IgniteCheckedException, InterruptedException, IOException {
+            long startTime = U.currentTimeMillis();
+            int retries = 0;
+
+            senderStopFlags.putIfAbsent(sesKey, new AtomicBoolean());
+
+            try (FileSender snd = new FileSender(file,
+                offset,
+                cnt,
+                params,
+                plc,
+                () -> stopping || senderStopFlags.get(sesKey).get(),
+                log,
+                fileIoFactory,
+                DFLT_CHUNK_SIZE_BYTES)
+            ) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Start writing file to remote node [file=" + file.getName() +
+                        ", rmtNodeId=" + rmtId + ", topic=" + topic + ']');
+                }
+
+                while (true) {
+                    if (Thread.currentThread().isInterrupted())
+                        throw new InterruptedException("The thread has been interrupted. Stop uploading file.");
+
+                    if (stopping)
+                        throw new NodeStoppingException("Operation has been cancelled (node is stopping)");
+
+                    try {
+                        TransmissionMeta rcvMeta = null;
+
+                        // In/out streams are not null if file has been sent successfully.
+                        if (out == null && in == null) {
+                            rcvMeta = connect();
+
+                            assert rcvMeta != null : "Remote receiver has not sent its meta";
+
+                            // Stop in case of any error occurred on remote node during file processing.
+                            if (rcvMeta.error() != null)
+                                throw rcvMeta.error();
+                        }
+
+                        snd.send(channel, out, rcvMeta);
+
+                        // Read file received acknowledge.
+                        boolean written = in.readBoolean();
+
+                        assert written : "File is not fully written: " + file.getAbsolutePath();
+
+                        break;
+                    }
+                    catch (IOException e) {
+                        closeChannelQuiet();
+
+                        retries++;
+
+                        if (retries >= retryCnt) {
+                            throw new IgniteException("The number of retry attempts to upload file exceeded " +
+                                "the limit: " + retryCnt, e);
+                        }
+
+                        // Re-establish the new connection to continue upload.
+                        U.warn(log, "Connection lost while writing a file to remote node and " +
+                            "will be reestablished [rmtId=" + rmtId + ", file=" + file.getName() +
+                            ", sesKey=" + sesKey + ", retries=" + retries +
+                            ", transferred=" + snd.transferred() +
+                            ", total=" + snd.state().count() + ']', e);
+                    }
+                }
+
+                U.log(log, "File has been sent to remote node [name=" + file.getName() +
+                    ", uploadTime=" + (double)((U.currentTimeMillis() - startTime) / 1000) + " sec, retries=" + retries +
+                    ", transferred=" + snd.transferred() + ", rmtId=" + rmtId +']');
+
+            }
+            catch (InterruptedException e) {
+                closeChannelQuiet();
+
+                throw e;
+            }
+            catch (IgniteCheckedException e) {
+                closeChannelQuiet();
+
+                throw new IgniteCheckedException("Exception while sending file [rmtId=" + rmtId +
+                    ", file=" + file.getName() + ", sesKey=" + sesKey + ", retries=" + retries + ']', e);
+            }
+            catch (Throwable e) {
+                closeChannelQuiet();
+
+                if (stopping)
+                    throw new NodeStoppingException("Operation has been cancelled (node is stopping)");
+
+                if (senderStopFlags.get(sesKey).get())
+                    throw new ClusterTopologyCheckedException("Remote node left the cluster: " + rmtId, e);
+
+                throw new IgniteException("Unexpected exception while sending file to the remote node. The process stopped " +
+                    "[rmtId=" + rmtId + ", file=" + file.getName() + ", sesKey=" + sesKey + ']', e);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void close() throws IOException {
+            try {
+                senderStopFlags.remove(sesKey);
+
+                ObjectOutput out0 = out;
+
+                if (out0 == null)
+                    return;
+
+                U.log(log, "Close file writer session: " + sesKey);
+
+                // Send transmission close flag.
+                out0.writeBoolean(true);
+                out0.flush();
+            }
+            catch (IOException e) {
+                U.warn(log, "An exception while writing close session flag occured. " +
+                    " Session close operation has been ignored", e);
+            }
+            finally {
+                closeChannelQuiet();
+            }
+        }
+
+        /** Close channel and relese resources. */
+        private void closeChannelQuiet() {
+            U.closeQuiet(out);
+            U.closeQuiet(in);
+            U.closeQuiet(channel);
+
+            out = null;
+            in = null;
+            channel = null;
+        }
+    }
+
+    /**
      * Linked chain of listeners.
      */
     private static class ArrayListener implements GridMessageListener {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 5d2604d..d8d62d4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -1161,6 +1161,11 @@ public class GridIoMessageFactory implements MessageFactory {
 
                 break;
 
+            case SessionChannelMessage.TYPE_CODE:
+                msg = new SessionChannelMessage();
+
+                break;
+
             // [-3..119] [124..129] [-23..-28] [-36..-55] - this
             // [120..123] - DR
             // [-4..-22, -30..-35] - SQL
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/SessionChannelMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/SessionChannelMessage.java
new file mode 100644
index 0000000..7de2a90
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/SessionChannelMessage.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.communication;
+
+import java.io.Externalizable;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channel;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ * A message with additional {@link Channel} attibutes which is send on connection established and
+ * an appropriate channel is opened.
+ */
+class SessionChannelMessage implements Message {
+    /** Initial channel message type (value is {@code 175}). */
+    public static final short TYPE_CODE = 175;
+
+    /** Serialization version. */
+    private static final long serialVersionUID = 0L;
+
+    /** Channel session unique identifier. */
+    private IgniteUuid sesId;
+
+    /**
+     * No-op constructor to support {@link Externalizable} interface.
+     * This constructor is not meant to be used for other purposes.
+     */
+    public SessionChannelMessage() {
+        // No-op.
+    }
+
+    /**
+     * @param sesId Channel session unique identifier.
+     */
+    public SessionChannelMessage(IgniteUuid sesId) {
+        this.sesId = sesId;
+    }
+
+    /**
+     * @return The unique session id for the channel.
+     */
+    public IgniteUuid sesId() {
+        return sesId;
+    }
+
+    /**
+     * @param sesId The unique session id for the channel.
+     * @return {@code This} for chaining.
+     */
+    public SessionChannelMessage sesId(IgniteUuid sesId) {
+        this.sesId = sesId;
+        return this;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeIgniteUuid("sesId", sesId))
+                    return false;
+
+                writer.incrementState();
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                sesId = reader.readIgniteUuid("sesId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+        }
+
+        return reader.afterMessageRead(SessionChannelMessage.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return TYPE_CODE;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 1;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(SessionChannelMessage.class, this);
+    }
+
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/TransmissionHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/TransmissionHandler.java
new file mode 100644
index 0000000..a55f1e6
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/TransmissionHandler.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.communication;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.UUID;
+import java.util.function.Consumer;
+
+/**
+ * Class represents a handler for the set of files considered to be transferred from the remote node. This handler
+ * must be registered to and appropriate topic in {@link GridIoManager} prior to opening a new transmission connection
+ * to this topic.
+ * <p>
+ * <em>NOTE:</em> Only one such handler per registered topic is allowed for the communication
+ * manager. Only one thread is allowed for data processing within a single topic.
+ *
+ * <h3>TransmissionPolicy</h3>
+ * <p>
+ * Files from the remote node can be handled of two different ways within a single established connection.
+ * It is up to the sender to decide how the particular file must be processed by the remote node. The
+ * {@link TransmissionPolicy} is used for such purpose. If {@link TransmissionPolicy#FILE} type is received by
+ * remote node the <em>#fileHandler()</em> will be picked up to process this file, the otherwise for the
+ * {@link TransmissionPolicy#CHUNK} the <em>#chunkHandler()</em> will be picked up.
+ */
+public interface TransmissionHandler {
+    /**
+     * @param err The err of fail handling process.
+     */
+    public void onException(UUID nodeId, Throwable err);
+
+    /**
+     * @param nodeId Remote node id from which request has been received.
+     * @param fileMeta File meta info.
+     * @return Absolute pathname denoting a file.
+     */
+    public String filePath(UUID nodeId, TransmissionMeta fileMeta);
+
+    /**
+     * <em>Chunk handler</em> represents by itself the way of input data stream processing.
+     * It accepts within each chunk a {@link ByteBuffer} with data from input for further processing.
+     * Activated when the {@link TransmissionPolicy#CHUNK} policy sent.
+     *
+     * @param nodeId Remote node id from which request has been received.
+     * @param initMeta Initial handler meta info.
+     * @return Instance of chunk handler to process incoming data by chunks.
+     */
+    public Consumer<ByteBuffer> chunkHandler(UUID nodeId, TransmissionMeta initMeta);
+
+    /**
+     * <em>File handler</em> represents by itself the way of input data stream processing. All the data will
+     * be processed under the hood using zero-copy transferring algorithm and only start file processing and
+     * the end of processing will be provided. Activated when the {@link TransmissionPolicy#FILE} policy sent.
+     *
+     * @param nodeId Remote node id from which request has been received.
+     * @param initMeta Initial handler meta info.
+     * @return Intance of read handler to process incoming data like the {@link FileChannel} manner.
+     */
+    public Consumer<File> fileHandler(UUID nodeId, TransmissionMeta initMeta);
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/TransmissionMeta.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/TransmissionMeta.java
new file mode 100644
index 0000000..986bf55
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/TransmissionMeta.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.communication;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.io.Serializable;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * Class represents a file meta information to send to the remote node. Used to initiate a new file transfer
+ * process or to continue the previous unfinished from the last transmitted point.
+ */
+class TransmissionMeta implements Externalizable {
+    /** Serial version uid. */
+    private static final long serialVersionUID = 0L;
+
+    /**
+     * The name to associate particular meta with.
+     * Can be the particular file name, or an a transfer session identifier.
+     */
+    private String name;
+
+    /** Offset of transferred file. */
+    private long offset;
+
+    /** Number of bytes to transfer started from given {@link #offset}. */
+    private long cnt;
+
+    /** Additional file params to transfer (e.g. partition id, partition name etc.). */
+    private Map<String, Serializable> params;
+
+    /** Read policy the way of how particular file will be handled. */
+    private TransmissionPolicy plc;
+
+    /** Last seen error if it has been occurred, or {@code null} the otherwise. */
+    private Exception err;
+
+    /** Default constructor, usually used to create meta to read channel data into. */
+    public TransmissionMeta() {
+        this(null);
+    }
+
+    /**
+     * @param err Last seen error if it has been occurred, or {@code null} the otherwise.
+     */
+    public TransmissionMeta(Exception err) {
+        this("", -1, -1, null, null, err);
+    }
+
+    /**
+     * @param name File name to assoticate particular meta with.
+     * @param offset The start position of file.
+     * @param cnt Number of bytes expected to transfer.
+     * @param params Additional meta params.
+     * @param plc Policy of how file will be handled.
+     * @param err Last seen error if it has been occurred, or {@code null} the otherwise.
+     */
+    public TransmissionMeta(
+        String name,
+        long offset,
+        long cnt,
+        Map<String, Serializable> params,
+        TransmissionPolicy plc,
+        Exception err
+    ) {
+        assert params instanceof Serializable || params == null : params.getClass();
+
+        this.name = name;
+        this.offset = offset;
+        this.cnt = cnt;
+        this.params = params;
+        this.plc = plc;
+        this.err = err;
+    }
+
+    /**
+     * @return File name.
+     */
+    public String name() {
+        assert name != null;
+
+        return name;
+    }
+
+    /**
+     * @return Position to start channel transfer at.
+     */
+    public long offset() {
+        return offset;
+    }
+
+    /**
+     * @return Number of bytes expected to transfer.
+     */
+    public long count() {
+        return cnt;
+    }
+
+    /**
+     * @return Additional params.
+     */
+    public Map<String, Serializable> params() {
+        return params;
+    }
+
+    /**
+     * @return Transmission policy.
+     */
+    public TransmissionPolicy policy() {
+        return plc;
+    }
+
+    /**
+     * @param err An exception instance if it has been previously occurred.
+     * @return {@code This} for chaining.
+     */
+    public TransmissionMeta error(Exception err) {
+        this.err = err;
+
+        return this;
+    }
+
+    /**
+     * @return An exception instance if it has been previously occurred.
+     */
+    public Exception error() {
+        return err;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeUTF(name);
+        out.writeLong(offset);
+        out.writeLong(cnt);
+        out.writeObject(params);
+        out.writeObject(plc);
+        out.writeObject(err);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        name = in.readUTF();
+        offset = in.readLong();
+        cnt = in.readLong();
+        params = (Map)in.readObject();
+        plc = (TransmissionPolicy)in.readObject();
+        err = (Exception)in.readObject();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        if (this == o)
+            return true;
+
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        TransmissionMeta meta = (TransmissionMeta)o;
+
+        return offset == meta.offset &&
+            cnt == meta.cnt &&
+            name.equals(meta.name) &&
+            Objects.equals(params, meta.params) &&
+            plc == meta.plc &&
+            Objects.equals(err, meta.err);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        return Objects.hash(name, offset, cnt, params, plc, err);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(TransmissionMeta.class, this);
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/TransmissionPolicy.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/TransmissionPolicy.java
new file mode 100644
index 0000000..be4396f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/TransmissionPolicy.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.communication;
+
+import java.util.UUID;
+
+/**
+ * Class represents ways of data handling for a file ready to be sent through an opened transmission sender session.
+ * It is necessary to choose which type of handler will be used and how file should be handled prior to sending file
+ * to the remote node.
+ *
+ * @see GridIoManager.TransmissionSender
+ */
+public enum TransmissionPolicy {
+    /**
+     * A file which is considered to be sent through {@link GridIoManager.TransmissionSender}s session will use
+     * the {@link TransmissionHandler#fileHandler(UUID, TransmissionMeta)} of {@link TransmissionHandler}
+     * to handle transmitted binary data.
+     */
+    FILE,
+
+    /**
+     * A file which is considered to be sent through {@link GridIoManager.TransmissionSender}s session will use
+     * the {@link TransmissionHandler#chunkHandler(UUID, TransmissionMeta)} of {@link TransmissionHandler}
+     * to handle transmitted binary data. This file will be processed by chunks of handlers defined size.
+     */
+    CHUNK
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/TransmissionReceiver.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/TransmissionReceiver.java
new file mode 100644
index 0000000..fc72b2f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/TransmissionReceiver.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.communication;
+
+import java.io.IOException;
+import java.nio.channels.ReadableByteChannel;
+import java.util.function.BooleanSupplier;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+
+/**
+ * Class represents a receiver of data which can be pulled from a channel by chunks of
+ * predefined size. Closes when a transmission of represented object ends.
+ */
+abstract class TransmissionReceiver extends AbstractTransmission {
+    /**
+     * @param meta Initial file meta info.
+     * @param stopChecker Node stop or prcoess interrupt checker.
+     * @param log Ignite logger.
+     * @param chunkSize Size of chunks.
+     */
+    protected TransmissionReceiver(
+        TransmissionMeta meta,
+        BooleanSupplier stopChecker,
+        IgniteLogger log,
+        int chunkSize
+    ) {
+        super(meta, stopChecker, log, chunkSize);
+    }
+
+    /**
+     * @param ch Input channel to read data from.
+     * @throws IOException If an io exception occurred.
+     */
+    public void receive(ReadableByteChannel ch) throws IOException, InterruptedException {
+        // Read data from the input.
+        while (hasNextChunk()) {
+            if (Thread.currentThread().isInterrupted())
+                throw new InterruptedException("Recevier has been interrupted");
+
+            if (stopped())
+                throw new IgniteException("Receiver has been cancelled. Channel processing has been stopped.");
+
+            readChunk(ch);
+        }
+
+        assert transferred == meta.count() : "The number of transferred bytes are not as expected " +
+            "[expect=" + meta.count() + ", actual=" + transferred + ']';
+    }
+
+    /**
+     * @param ch Channel to read data from.
+     * @throws IOException If fails.
+     */
+    protected abstract void readChunk(ReadableByteChannel ch) throws IOException;
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIO.java
index 546d1a7..3ecdd22 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIO.java
@@ -20,6 +20,8 @@ package org.apache.ignite.internal.processors.cache.persistence.file;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.MappedByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
 
 /**
  * Interface to perform file I/O operations.
@@ -272,4 +274,29 @@ public interface FileIO extends AutoCloseable {
      * @see #punchHole
      */
     long getSparseSize();
+
+    /**
+     * This method will transfers the content of file to the specified channel. This is a synchronous
+     * operation, so performing it on asynchronous channels makes no sense and not provied.
+     *
+     * @param position The relative offset of the file where the transfer begins from.
+     * @param count The number of bytes to be transferred.
+     * @param target Destination channel of the transfer.
+     * @return Count of bytes which was successfully transferred.
+     * @throws IOException If fails.
+     */
+    public default long transferTo(long position, long count, WritableByteChannel target) throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    /**
+     * @param src The source channel.
+     * @param position The position within the file at which the transfer is to begin.
+     * @param count The maximum number of bytes to be transferred.
+     * @return The number of bytes, possibly zero, that were actually transferred.
+     * @throws IOException If fails.
+     */
+    public default long transferFrom(ReadableByteChannel src, long position, long count) throws IOException {
+        throw new UnsupportedOperationException();
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIODecorator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIODecorator.java
index c615a34..dfefd29 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIODecorator.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIODecorator.java
@@ -20,6 +20,8 @@ package org.apache.ignite.internal.processors.cache.persistence.file;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.MappedByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
 
 /**
  * Decorator class for File I/O
@@ -120,4 +122,14 @@ public class FileIODecorator extends AbstractFileIO {
     @Override public void close() throws IOException {
         delegate.close();
     }
+
+    /** {@inheritDoc} */
+    @Override public long transferTo(long position, long count, WritableByteChannel target) throws IOException {
+        return delegate.transferTo(position, count, target);
+    }
+
+    /** {@inheritDoc} */
+    @Override public long transferFrom(ReadableByteChannel src, long position, long count) throws IOException {
+        return delegate.transferFrom(src, position, count);
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIO.java
index c6922bc..6689609 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIO.java
@@ -23,6 +23,8 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
 import java.nio.file.OpenOption;
 import org.apache.ignite.internal.processors.compress.FileSystemUtils;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -147,4 +149,19 @@ public class RandomAccessFileIO extends AbstractFileIO {
     @Override public void force() throws IOException {
         force(false);
     }
+
+    /** {@inheritDoc} */
+    @Override public long transferTo(long position, long count, WritableByteChannel target) throws IOException {
+        return ch.transferTo(position, count, target);
+    }
+
+    /** {@inheritDoc} */
+    @Override public long transferFrom(ReadableByteChannel src, long position, long count) throws IOException {
+        long written = ch.transferFrom(src, position, count);
+
+        if (written > 0)
+            position(position + written);
+
+        return written;
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/crc/FastCrc.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/crc/FastCrc.java
index 0dcbafd..7cbaadb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/crc/FastCrc.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/crc/FastCrc.java
@@ -17,8 +17,13 @@
 
 package org.apache.ignite.internal.processors.cache.persistence.wal.crc;
 
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
 import java.nio.ByteBuffer;
 import java.util.zip.CRC32;
+import java.util.zip.CheckedInputStream;
 
 /**
  * This CRC calculation implementation workf much faster then {@link PureJavaCrc32}
@@ -81,6 +86,26 @@ public final class FastCrc {
     }
 
     /**
+     * @param file A file to calculate checksum over it.
+     * @return CRC32 checksum.
+     * @throws IOException If fails.
+     */
+    public static int calcCrc(File file) throws IOException {
+        assert !file.isDirectory() : "CRC32 can't be calculated over directories";
+
+        CRC32 algo = new CRC32();
+
+        try (InputStream in = new CheckedInputStream(new FileInputStream(file), algo)) {
+            byte[] buf = new byte[1024];
+
+            while (in.read(buf) != -1)
+                ;
+        }
+
+        return ~(int)algo.getValue();
+    }
+
+    /**
      * @param crcAlgo CRC algorithm.
      * @param buf Input buffer.
      * @param len Buffer length.
@@ -96,6 +121,6 @@ public final class FastCrc {
 
         buf.limit(initLimit);
 
-        return (int)crcAlgo.getValue() ^ 0xFFFFFFFF;
+        return ~(int)crcAlgo.getValue();
     }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index 4584c87..4d6dfb8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -2684,13 +2684,27 @@ public class GridNioServer<T> {
         }
 
         /**
+         * @param ses Session to be closed.
+         * @param e Exception to be passed to the listener, if any.
+         * @return {@code True} if this call closed the ses.
+         */
+        protected boolean close(final GridSelectorNioSessionImpl ses, @Nullable final IgniteCheckedException e) {
+            return close(ses, e, ses.closeSocketOnSessionClose());
+        }
+
+        /**
          * Closes the session and all associated resources, then notifies the listener.
          *
          * @param ses Session to be closed.
          * @param e Exception to be passed to the listener, if any.
+         * @param closeSock If {@code True} the channel will be closed.
          * @return {@code True} if this call closed the ses.
          */
-        protected boolean close(final GridSelectorNioSessionImpl ses, @Nullable final IgniteCheckedException e) {
+        protected boolean close(
+            final GridSelectorNioSessionImpl ses,
+            @Nullable final IgniteCheckedException e,
+            boolean closeSock
+        ) {
             if (e != null) {
                 // Print stack trace only if has runtime exception in it's cause.
                 if (e.hasCause(IOException.class))
@@ -2714,7 +2728,10 @@ public class GridNioServer<T> {
                         GridUnsafe.cleanDirectBuffer(ses.readBuffer());
                 }
 
-                closeKey(ses.key());
+                if (closeSock)
+                    closeKey(ses.key());
+                else
+                    ses.key().cancel(); // Unbind socket to the current SelectionKey.
 
                 if (e != null)
                     filterChain.onExceptionCaught(ses, e);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
index 2739299..ac2fc61 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
@@ -39,7 +39,7 @@ import org.jetbrains.annotations.Nullable;
  * Note that this implementation requires non-null values for local and remote
  * socket addresses.
  */
-class GridSelectorNioSessionImpl extends GridNioSessionImpl implements GridNioKeyAttachment {
+public class GridSelectorNioSessionImpl extends GridNioSessionImpl implements GridNioKeyAttachment {
     /** Pending write requests. */
     private final FastSizeDeque<SessionWriteRequest> queue = new FastSizeDeque<>(new ConcurrentLinkedDeque<>());
 
@@ -78,6 +78,9 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl implements GridNioKe
     /** */
     private Object sysMsg;
 
+    /** Close channel on session #close() called. */
+    private volatile boolean closeSocket = true;
+
     /**
      * Creates session instance.
      *
@@ -176,11 +179,25 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl implements GridNioKe
     /**
      * @return Registered selection key for this session.
      */
-    SelectionKey key() {
+    public SelectionKey key() {
         return key;
     }
 
     /**
+     * @return {@code True} to close SocketChannel on current session close occured.
+     */
+    public boolean closeSocketOnSessionClose() {
+        return closeSocket;
+    }
+
+    /**
+     * @param closeSocket {@code False} remain SocketChannel open on session close.
+     */
+    public void closeSocketOnSessionClose(boolean closeSocket) {
+        this.closeSocket = closeSocket;
+    }
+
+    /**
      * @param from Current session worker.
      * @param fut Move future.
      * @return {@code True} if session move was scheduled.
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 0c99051..569798e 100755
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -27,9 +27,9 @@ import java.net.SocketAddress;
 import java.net.SocketTimeoutException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
+import java.nio.channels.Channel;
 import java.nio.channels.SelectableChannel;
 import java.nio.channels.SocketChannel;
-import java.nio.channels.spi.AbstractInterruptibleChannel;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.BitSet;
@@ -49,6 +49,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import javax.net.ssl.SSLEngine;
 import javax.net.ssl.SSLException;
 import org.apache.ignite.Ignite;
@@ -98,6 +99,7 @@ import org.apache.ignite.internal.util.nio.GridNioServerListener;
 import org.apache.ignite.internal.util.nio.GridNioServerListenerAdapter;
 import org.apache.ignite.internal.util.nio.GridNioSession;
 import org.apache.ignite.internal.util.nio.GridNioSessionMetaKey;
+import org.apache.ignite.internal.util.nio.GridSelectorNioSessionImpl;
 import org.apache.ignite.internal.util.nio.GridShmemCommunicationClient;
 import org.apache.ignite.internal.util.nio.GridTcpNioCommunicationClient;
 import org.apache.ignite.internal.util.nio.ssl.BlockingSslHandler;
@@ -143,6 +145,7 @@ import org.apache.ignite.spi.IgniteSpiTimeoutObject;
 import org.apache.ignite.spi.TimeoutStrategy;
 import org.apache.ignite.spi.communication.CommunicationListener;
 import org.apache.ignite.spi.communication.CommunicationSpi;
+import org.apache.ignite.spi.communication.tcp.internal.CommunicationListenerEx;
 import org.apache.ignite.spi.communication.tcp.internal.ConnectionKey;
 import org.apache.ignite.spi.communication.tcp.internal.HandshakeException;
 import org.apache.ignite.spi.communication.tcp.internal.TcpCommunicationConnectionCheckFuture;
@@ -161,6 +164,8 @@ import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR;
 import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION;
+import static org.apache.ignite.internal.IgniteFeatures.CHANNEL_COMMUNICATION;
+import static org.apache.ignite.internal.IgniteFeatures.nodeSupports;
 import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.SSL_META;
 import static org.apache.ignite.plugin.extensions.communication.Message.DIRECT_TYPE_SIZE;
 import static org.apache.ignite.spi.communication.tcp.internal.TcpCommunicationConnectionCheckFuture.SES_FUT_META;
@@ -352,6 +357,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
     /** Message tracker meta for session. */
     private static final int TRACKER_META = GridNioSessionMetaKey.nextUniqueKey();
 
+    /** Channel meta used for establishing channel connections. */
+    private static final int CHANNEL_FUT_META = GridNioSessionMetaKey.nextUniqueKey();
+
     /**
      * Default local port range (value is <tt>100</tt>).
      * See {@link #setLocalPortRange(int)} for details.
@@ -373,6 +381,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
     /** Default connections per node. */
     public static final int DFLT_CONN_PER_NODE = 1;
 
+    /** Maximum {@link GridNioSession} connections per node. */
+    public static final int MAX_CONN_PER_NODE = 1024;
+
     /** No-op runnable. */
     private static final IgniteRunnable NOOP = new IgniteRunnable() {
         @Override public void run() {
@@ -398,6 +409,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
     /** */
     private ConnectionPolicy connPlc = new FirstConnectionPolicy();
 
+    /** Channel connection index provider. */
+    private ConnectionPolicy chConnPlc;
+
     /** */
     private boolean enableForcibleNodeKill = IgniteSystemProperties
         .getBoolean(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL);
@@ -739,6 +753,44 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
                 }
             }
 
+            private void onChannelCreate(
+                GridSelectorNioSessionImpl ses,
+                ConnectionKey connKey,
+                Message msg
+            ) {
+                cleanupLocalNodeRecoveryDescriptor(connKey);
+
+                ses.send(msg)
+                    .listen(sendFut -> {
+                        if (sendFut.error() != null) {
+                            U.error(log, "Fail to send channel creation response to the remote node. " +
+                                "Session will be closed [nodeId=" + connKey.nodeId() +
+                                ", idx=" + connKey.connectionIndex() + ']', sendFut.error());
+
+                            ses.close();
+
+                            return;
+                        }
+
+                        ses.closeSocketOnSessionClose(false);
+
+                        // Close session and send response.
+                        ses.close().listen(closeFut -> {
+                            if (closeFut.error() != null) {
+                                U.error(log, "Nio session has not been properly closed " +
+                                        "[nodeId=" + connKey.nodeId() + ", idx=" + connKey.connectionIndex() + ']',
+                                    closeFut.error());
+
+                                U.closeQuiet(ses.key().channel());
+
+                                return;
+                            }
+
+                            notifyChannelEvtListener(connKey.nodeId(), ses.key().channel(), msg);
+                        });
+                    });
+            }
+
             @Override public void onMessage(final GridNioSession ses, Message msg) {
                 ConnectionKey connKey = ses.meta(CONN_IDX_META);
 
@@ -766,6 +818,29 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
                     }
                 }
                 else {
+                    if (isChannelConnIdx(connKey.connectionIndex())) {
+                        if (ses.meta(CHANNEL_FUT_META) == null)
+                            onChannelCreate((GridSelectorNioSessionImpl)ses, connKey, msg);
+                        else {
+                            GridFutureAdapter<Channel> fut = ses.meta(CHANNEL_FUT_META);
+                            GridSelectorNioSessionImpl ses0 = (GridSelectorNioSessionImpl)ses;
+
+                            ses0.closeSocketOnSessionClose(false);
+
+                            ses0.close().listen(f -> {
+                                if (f.error() != null) {
+                                    fut.onDone(f.error());
+
+                                    return;
+                                }
+
+                                fut.onDone(ses0.key().channel());
+                            });
+                        }
+
+                        return;
+                    }
+
                     if (msg instanceof RecoveryLastReceivedMessage) {
                         metricsLsnr.onMessageReceived(msg, connKey.nodeId());
 
@@ -2122,7 +2197,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
         assertParameter(shmemPort > 0 || shmemPort == -1, "shmemPort > 0 || shmemPort == -1");
         assertParameter(selectorsCnt > 0, "selectorsCnt > 0");
         assertParameter(connectionsPerNode > 0, "connectionsPerNode > 0");
-        assertParameter(connectionsPerNode <= 1024, "connectionsPerNode <= 1024");
+        assertParameter(connectionsPerNode <= MAX_CONN_PER_NODE, "connectionsPerNode <= 1024");
 
         if (!failureDetectionTimeoutEnabled()) {
             assertParameter(reconCnt > 0, "reconnectCnt > 0");
@@ -2147,6 +2222,15 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
         else
             connPlc = new FirstConnectionPolicy();
 
+        chConnPlc = new ConnectionPolicy() {
+            /** Sequential connection index provider. */
+            private final AtomicInteger chIdx = new AtomicInteger(MAX_CONN_PER_NODE + 1);
+
+            @Override public int connectionIndex() {
+                return chIdx.incrementAndGet();
+            }
+        };
+
         try {
             locHost = U.resolveLocalHost(locAddr);
         }
@@ -3956,6 +4040,21 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
     }
 
     /**
+     * @param nodeId The remote node id.
+     * @param channel The configured channel to notify listeners with.
+     * @param initMsg Channel initialization message with additional channel params.
+     */
+    private void notifyChannelEvtListener(UUID nodeId, Channel channel, Message initMsg) {
+        if (log.isDebugEnabled())
+            log.debug("Notify appropriate listeners due to a new channel opened: " + channel);
+
+        CommunicationListener<Message> lsnr0 = lsnr;
+
+        if (lsnr0 instanceof CommunicationListenerEx)
+            ((CommunicationListenerEx<Message>)lsnr0).onChannelOpened(nodeId, initMsg, channel);
+    }
+
+    /**
      * @param target Target buffer to append to.
      * @param src Source buffer to get data.
      * @return Original or expanded buffer.
@@ -4041,6 +4140,20 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
     }
 
     /**
+     * @param key The connection key to cleanup descriptors on local node.
+     */
+    private void cleanupLocalNodeRecoveryDescriptor(ConnectionKey key) {
+        ClusterNode node = getLocalNode();
+
+        if (usePairedConnections(node)) {
+            inRecDescs.remove(key);
+            outRecDescs.remove(key);
+        }
+        else
+            recoveryDescs.remove(key);
+    }
+
+    /**
      * @param recoveryDescs Descriptors map.
      * @param pairedConnections {@code True} if in/out connections pair is used for communication with node.
      * @param node Node.
@@ -4233,6 +4346,90 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
     }
 
     /**
+     * @param remote Destination cluster node to communicate with.
+     * @param initMsg Configuration channel attributes wrapped into the message.
+     * @return The future, which will be finished on channel ready.
+     * @throws IgniteSpiException If fails.
+     */
+    public IgniteInternalFuture<Channel> openChannel(
+        ClusterNode remote,
+        Message initMsg
+    ) throws IgniteSpiException {
+        assert !remote.isLocal() : remote;
+        assert initMsg != null;
+        assert chConnPlc != null;
+        assert nodeSupports(remote, CHANNEL_COMMUNICATION) : "Node doesn't support direct connection over socket channel " +
+                "[nodeId=" + remote.id() + ']';
+
+        ConnectionKey key = new ConnectionKey(remote.id(), chConnPlc.connectionIndex());
+
+        GridFutureAdapter<Channel> chFut = new GridFutureAdapter<>();
+
+        connectGate.enter();
+
+        try {
+            GridNioSession ses = createNioSession(remote, key.connectionIndex());
+
+            assert ses != null : "Session must be established [remoteId=" + remote.id() + ", key=" + key + ']';
+
+            cleanupLocalNodeRecoveryDescriptor(key);
+            ses.addMeta(CHANNEL_FUT_META, chFut);
+
+            // Send configuration message over the created session.
+            ses.send(initMsg)
+                .listen(f -> {
+                    if (f.error() != null) {
+                        GridFutureAdapter<Channel> rq = ses.meta(CHANNEL_FUT_META);
+
+                        assert rq != null;
+
+                        rq.onDone(f.error());
+
+                        ses.close();
+
+                        return;
+                    }
+
+                    addTimeoutObject(new IgniteSpiTimeoutObject() {
+                        @Override public IgniteUuid id() {
+                            return IgniteUuid.randomUuid();
+                        }
+
+                        @Override public long endTime() {
+                            return U.currentTimeMillis() + connTimeout;
+                        }
+
+                        @Override public void onTimeout() {
+                            // Close session if request not complete yet.
+                            GridFutureAdapter<Channel> rq = ses.meta(CHANNEL_FUT_META);
+
+                            assert rq != null;
+
+                            if (rq.onDone(handshakeTimeoutException()))
+                                ses.close();
+                        }
+                    });
+                });
+
+            return chFut;
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteSpiException("Unable to create new channel connection to the remote node: " + remote, e);
+        }
+        finally {
+            connectGate.leave();
+        }
+    }
+
+    /**
+     * @param connIdx Connection index to check.
+     * @return {@code true} if connection index is related to the channel create request\response.
+     */
+    private boolean isChannelConnIdx(int connIdx) {
+        return connIdx > MAX_CONN_PER_NODE;
+    }
+
+    /**
      *
      */
     private class DiscoveryListener implements GridLocalEventListener, HighPriorityListener {
@@ -4679,7 +4876,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
                 if (obj instanceof GridCommunicationClient)
                     ((GridCommunicationClient)obj).forceClose();
                 else
-                    U.closeQuiet((AbstractInterruptibleChannel)obj);
+                    U.closeQuiet((AutoCloseable)obj);
             }
         }
 
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/CommunicationListenerEx.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/CommunicationListenerEx.java
new file mode 100644
index 0000000..f6022c2
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/CommunicationListenerEx.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp.internal;
+
+import java.io.Serializable;
+import java.nio.channels.Channel;
+import java.util.UUID;
+import org.apache.ignite.spi.communication.CommunicationListener;
+
+/**
+ * Extended communication SPI listener to provide {@link Channel} opened events.
+ */
+public interface CommunicationListenerEx<T extends Serializable> extends CommunicationListener<T> {
+    /**
+     * @param rmtNodeId Remote node id.
+     * @param initMsg Init channel message.
+     * @param channel Locally created channel endpoint.
+     */
+    public default void onChannelOpened(UUID rmtNodeId, T initMsg, Channel channel) {
+        // No-op.
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ConnectionKey.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ConnectionKey.java
index 0559df7..a107c87 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ConnectionKey.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ConnectionKey.java
@@ -38,6 +38,14 @@ public class ConnectionKey {
     private final boolean dummy;
 
     /**
+     * @param nodeId Node ID. Should be not null.
+     * @param idx Connection index.
+     */
+    public ConnectionKey(@NotNull UUID nodeId, int idx) {
+        this(nodeId, idx, -1, false);
+    }
+
+    /**
      * Creates ConnectionKey with false value of dummy flag.
      *
      * @param nodeId Node ID. Should be not null.
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerFileTransmissionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerFileTransmissionSelfTest.java
new file mode 100644
index 0000000..b4254d8
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerFileTransmissionSelfTest.java
@@ -0,0 +1,981 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.communication;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.Channel;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.nio.file.OpenOption;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.function.Consumer;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.GridTopic;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.NodeStoppingException;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIODecorator;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
+import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.wal.crc.FastCrc;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.FILE_SUFFIX;
+import static org.apache.ignite.internal.util.IgniteUtils.fileCount;
+import static org.apache.ignite.testframework.GridTestUtils.setFieldValue;
+
+/**
+ * Test file transmission manager operations.
+ */
+public class GridIoManagerFileTransmissionSelfTest extends GridCommonAbstractTest {
+    /** Number of cache keys to generate. */
+    private static final long CACHE_SIZE = 50_000L;
+
+    /** Network timeout in ms. */
+    private static final long NET_TIMEOUT_MS = 2000L;
+
+    /** Temporary directory to store files. */
+    private static final String TEMP_FILES_DIR = "ctmp";
+
+    /** Factory to produce IO interfaces over files to transmit. */
+    private static final FileIOFactory IO_FACTORY = new RandomAccessFileIOFactory();
+
+    /** The topic to send files to. */
+    private static Object topic;
+
+    /** File filter. */
+    private static FilenameFilter fileBinFilter;
+
+    /** Locally used fileIo to interact with output file. */
+    private final FileIO[] fileIo = new FileIO[1];
+
+    /** The temporary directory to store files. */
+    private File tempStore;
+
+    /** Ignite instance which receives files. */
+    private IgniteEx rcv;
+
+    /** Ignite instance which sends files. */
+    private IgniteEx snd;
+
+    /** Called before tests started. */
+    @BeforeClass
+    public static void beforeAll() {
+        topic = GridTopic.TOPIC_CACHE.topic("test", 0);
+
+        fileBinFilter = new FilenameFilter() {
+            @Override public boolean accept(File dir, String name) {
+                return name.endsWith(FILE_SUFFIX);
+            }
+        };
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    @Before
+    public void before() throws Exception {
+        cleanPersistenceDir();
+
+        tempStore = U.resolveWorkDirectory(U.defaultWorkDirectory(), TEMP_FILES_DIR, true);
+    }
+
+    /** Called after test run. */
+    @After
+    public void after() {
+        try {
+            ensureResourcesFree(snd);
+            ensureResourcesFree(rcv);
+        }
+        finally {
+            stopAllGrids();
+
+            U.closeQuiet(fileIo[0]);
+
+            snd = null;
+            rcv = null;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        return super.getConfiguration(igniteInstanceName)
+            .setDataStorageConfiguration(new DataStorageConfiguration()
+                .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+                    .setPersistenceEnabled(true)
+                    .setMaxSize(500L * 1024 * 1024)))
+            .setCacheConfiguration(new CacheConfiguration<Integer, Integer>(DEFAULT_CACHE_NAME))
+            .setCommunicationSpi(new BlockingOpenChannelCommunicationSpi())
+            .setNetworkTimeout(NET_TIMEOUT_MS);
+    }
+
+    /**
+     * Transmit all cache partition to particular topic on the remote node.
+     *
+     * @throws Exception If fails.
+     */
+    @Test
+    public void testFileHandlerBase() throws Exception {
+        snd = startGrid(0);
+        rcv = startGrid(1);
+
+        snd.cluster().active(true);
+
+        addCacheData(snd, DEFAULT_CACHE_NAME);
+
+        awaitPartitionMapExchange();
+
+        Map<String, Long> fileSizes = new HashMap<>();
+        Map<String, Integer> fileCrcs = new HashMap<>();
+        Map<String, Serializable> fileParams = new HashMap<>();
+
+        assertTrue(snd.context().io().fileTransmissionSupported(rcv.localNode()));
+
+        rcv.context().io().addTransmissionHandler(topic, new TransmissionHandlerAdapter() {
+            @Override public String filePath(UUID nodeId, TransmissionMeta fileMeta) {
+                return new File(tempStore, fileMeta.name()).getAbsolutePath();
+            }
+
+            @Override public Consumer<File> fileHandler(UUID nodeId, TransmissionMeta initMeta) {
+                return new Consumer<File>() {
+                    @Override public void accept(File file) {
+                        assertTrue(fileSizes.containsKey(file.getName()));
+                        // Save all params.
+                        fileParams.putAll(initMeta.params());
+                    }
+                };
+            }
+        });
+
+        File cacheDirIg0 = cacheWorkDir(snd, DEFAULT_CACHE_NAME);
+
+        File[] cacheParts = cacheDirIg0.listFiles(fileBinFilter);
+
+        for (File file : cacheParts) {
+            fileSizes.put(file.getName(), file.length());
+            fileCrcs.put(file.getName(), FastCrc.calcCrc(file));
+        }
+
+        try (GridIoManager.TransmissionSender sender = snd.context()
+            .io()
+            .openTransmissionSender(rcv.localNode().id(), topic)) {
+            // Iterate over cache partition cacheParts.
+            for (File file : cacheParts) {
+                Map<String, Serializable> params = new HashMap<>();
+
+                params.put(file.getName(), file.hashCode());
+
+                sender.send(file, params, TransmissionPolicy.FILE);
+            }
+        }
+
+        stopAllGrids();
+
+        assertEquals(fileSizes.size(), tempStore.listFiles(fileBinFilter).length);
+
+        for (File file : cacheParts) {
+            // Check received file lengths.
+            assertEquals("Received the file length is incorrect: " + file.getName(),
+                fileSizes.get(file.getName()), new Long(file.length()));
+
+            // Check received params.
+            assertEquals("File additional parameters are not fully transmitted",
+                fileParams.get(file.getName()), file.hashCode());
+        }
+
+        // Check received file CRCs.
+        for (File file : tempStore.listFiles(fileBinFilter)) {
+            assertEquals("Received file CRC-32 checksum is incorrect: " + file.getName(),
+                fileCrcs.get(file.getName()), new Integer(FastCrc.calcCrc(file)));
+        }
+    }
+
+    /**
+     * @throws Exception If fails.
+     */
+    @Test(expected = IgniteCheckedException.class)
+    public void testFileHandlerFilePathThrowsEx() throws Exception {
+        final String exTestMessage = "Test exception. Handler initialization failed at onBegin.";
+
+        snd = startGrid(0);
+        rcv = startGrid(1);
+
+        snd.cluster().active(true);
+
+        File fileToSend = createFileRandomData("1Mb", 1024 * 1024);
+
+        rcv.context().io().addTransmissionHandler(topic, new DefaultTransmissionHandler(rcv, fileToSend, tempStore) {
+            @Override public String filePath(UUID nodeId, TransmissionMeta fileMeta) {
+                throw new IgniteException(exTestMessage);
+            }
+
+            @Override public Consumer<File> fileHandler(UUID nodeId, TransmissionMeta initMeta) {
+                fail("fileHandler must never be called");
+
+                return super.fileHandler(nodeId, initMeta);
+            }
+
+            @Override public Consumer<ByteBuffer> chunkHandler(UUID nodeId, TransmissionMeta initMeta) {
+                fail("chunkHandler must never be called");
+
+                return super.chunkHandler(nodeId, initMeta);
+            }
+
+            @Override public void onException(UUID nodeId, Throwable err) {
+                assertEquals(exTestMessage, err.getMessage());
+            }
+        });
+
+        try (GridIoManager.TransmissionSender sender = snd.context()
+            .io()
+            .openTransmissionSender(rcv.localNode().id(), topic)) {
+            sender.send(fileToSend, TransmissionPolicy.FILE);
+        }
+    }
+
+    /**
+     * @throws Exception If fails.
+     */
+    @Test(expected = IgniteCheckedException.class)
+    public void testFileHandlerOnReceiverLeft() throws Exception {
+        final int fileSizeBytes = 5 * 1024 * 1024;
+        final AtomicInteger chunksCnt = new AtomicInteger();
+
+        snd = startGrid(0);
+        rcv = startGrid(1);
+
+        snd.cluster().active(true);
+
+        File fileToSend = createFileRandomData("testFile", fileSizeBytes);
+
+        transmissionFileIoFactory(snd, new FileIOFactory() {
+            @Override public FileIO create(File file, OpenOption... modes) throws IOException {
+                FileIO fileIo = IO_FACTORY.create(file, modes);
+
+                // Blocking writer and stopping node FileIo.
+                return new FileIODecorator(fileIo) {
+                    /** {@inheritDoc} */
+                    @Override public long transferTo(long position, long count, WritableByteChannel target)
+                        throws IOException {
+                        // Send 5 chunks than stop the rcv.
+                        if (chunksCnt.incrementAndGet() == 5)
+                            stopGrid(rcv.name(), true);
+
+                        return super.transferTo(position, count, target);
+                    }
+                };
+            }
+        });
+
+        rcv.context().io().addTransmissionHandler(topic, new DefaultTransmissionHandler(rcv, fileToSend, tempStore));
+
+        try (GridIoManager.TransmissionSender sender = snd.context()
+            .io()
+            .openTransmissionSender(rcv.localNode().id(), topic)) {
+            sender.send(fileToSend, TransmissionPolicy.FILE);
+        }
+    }
+
+    /**
+     * @throws Exception If fails.
+     */
+    @Test
+    public void tesFileHandlerReconnectTimeouted() throws Exception {
+        rcv = startGrid(1);
+        snd = startGrid(0);
+
+        final AtomicInteger chunksCnt = new AtomicInteger();
+        final CountDownLatch sndLatch = ((BlockingOpenChannelCommunicationSpi)snd.context()
+            .config()
+            .getCommunicationSpi()).latch;
+        final AtomicReference<Throwable> refErr = new AtomicReference<>();
+
+        snd.cluster().active(true);
+
+        File fileToSend = createFileRandomData("testFile", 5 * 1024 * 1024);
+
+        transmissionFileIoFactory(snd, new FileIOFactory() {
+            @Override public FileIO create(File file, OpenOption... modes) throws IOException {
+                FileIO fileIo = IO_FACTORY.create(file, modes);
+
+                return new FileIODecorator(fileIo) {
+                    /** {@inheritDoc} */
+                    @Override public long transferTo(long position, long count, WritableByteChannel target)
+                        throws IOException {
+                        if (chunksCnt.incrementAndGet() == 10) {
+                            target.close();
+
+                            ((BlockingOpenChannelCommunicationSpi)snd.context()
+                                .config()
+                                .getCommunicationSpi()).block = true;
+                        }
+
+                        return super.transferTo(position, count, target);
+                    }
+                };
+            }
+        });
+
+        rcv.context().io().addTransmissionHandler(topic, new DefaultTransmissionHandler(rcv, fileToSend, tempStore) {
+            @Override public void onException(UUID nodeId, Throwable err) {
+                refErr.compareAndSet(null, err);
+
+                sndLatch.countDown();
+            }
+        });
+
+        try (GridIoManager.TransmissionSender sender = snd.context()
+            .io()
+            .openTransmissionSender(rcv.localNode().id(), topic)) {
+            sender.send(fileToSend, TransmissionPolicy.FILE);
+        }
+        catch (IgniteCheckedException | IOException | InterruptedException e) {
+            // Ignore err.
+            U.warn(log, e);
+        }
+
+        assertNotNull("Timeout exception not occurred", refErr.get());
+        assertEquals("Type of timeout exception incorrect: " + refErr.get(),
+            IgniteCheckedException.class,
+            refErr.get().getClass());
+    }
+
+    /**
+     * @throws Exception If fails.
+     */
+    @Test
+    public void testFileHandlerCleanedUpIfSenderLeft() throws Exception {
+        snd = startGrid(0);
+        rcv = startGrid(1);
+
+        snd.cluster().active(true);
+
+        File fileToSend = createFileRandomData("tempFile15Mb", 15 * 1024 * 1024);
+        File downloadTo = U.resolveWorkDirectory(tempStore.getAbsolutePath(), "download", true);
+
+        transmissionFileIoFactory(snd, new FileIOFactory() {
+            @Override public FileIO create(File file, OpenOption... modes) throws IOException {
+                FileIO fileIo = IO_FACTORY.create(file, modes);
+
+                return new FileIODecorator(fileIo) {
+                    /** {@inheritDoc} */
+                    @Override public long transferTo(long position, long count, WritableByteChannel target)
+                        throws IOException {
+
+                        long transferred = super.transferTo(position, count, target);
+
+                        stopGrid(snd.name(), true);
+
+                        return transferred;
+                    }
+                };
+            }
+        });
+
+        rcv.context().io().addTransmissionHandler(topic, new DefaultTransmissionHandler(rcv, fileToSend, tempStore){
+            /** {@inheritDoc} */
+            @Override public String filePath(UUID nodeId, TransmissionMeta fileMeta) {
+                return new File(downloadTo, fileMeta.name()).getAbsolutePath();
+            }
+        });
+
+        Exception err = null;
+
+        try (GridIoManager.TransmissionSender sender = snd.context()
+            .io()
+            .openTransmissionSender(rcv.localNode().id(), topic)) {
+            sender.send(fileToSend, TransmissionPolicy.FILE);
+        }
+        catch (Exception e) {
+            // Ignore node stopping exception.
+            err = e;
+        }
+
+        assertEquals(NodeStoppingException.class, err.getClass());
+        assertEquals("Incomplete resources must be cleaned up on sender left",
+            0,
+            fileCount(downloadTo.toPath()));
+    }
+
+    /**
+     * @throws Exception If fails.
+     */
+    @Test(expected = IgniteCheckedException.class)
+    public void testFileHandlerReconnectOnReadFail() throws Exception {
+        final String chunkDownloadExMsg = "Test exception. Chunk processing error.";
+
+        snd = startGrid(0);
+        rcv = startGrid(1);
+
+        snd.cluster().active(true);
+
+        File fileToSend = createFileRandomData("testFile", 5 * 1024 * 1024);
+        final AtomicInteger readChunks = new AtomicInteger();
+
+        transmissionFileIoFactory(rcv, new FileIOFactory() {
+            @Override public FileIO create(File file, OpenOption... modes) throws IOException {
+                fileIo[0] = IO_FACTORY.create(file, modes);
+
+                // Blocking writer and stopping node FileIo.
+                return new FileIODecorator(fileIo[0]) {
+                    @Override public long transferFrom(ReadableByteChannel src, long position, long count)
+                        throws IOException {
+                        // Read 4 chunks than throw an exception to emulate error processing.
+                        if (readChunks.incrementAndGet() == 4)
+                            throw new IgniteException(chunkDownloadExMsg);
+
+                        return super.transferFrom(src, position, count);
+                    }
+                };
+            }
+        });
+
+        rcv.context().io().addTransmissionHandler(topic, new DefaultTransmissionHandler(rcv, fileToSend, tempStore) {
+            @Override public void onException(UUID nodeId, Throwable err) {
+                assertEquals(chunkDownloadExMsg, err.getMessage());
+            }
+        });
+
+        try (GridIoManager.TransmissionSender sender = snd.context()
+            .io()
+            .openTransmissionSender(rcv.localNode().id(), topic)) {
+            sender.send(fileToSend, TransmissionPolicy.FILE);
+        }
+    }
+
+    /**
+     * @throws Exception If fails.
+     */
+    @Test(expected = IgniteCheckedException.class)
+    public void testFileHandlerSenderStoppedIfReceiverInitFail() throws Exception {
+        final int fileSizeBytes = 5 * 1024 * 1024;
+        final AtomicBoolean throwFirstTime = new AtomicBoolean();
+
+        snd = startGrid(0);
+        rcv = startGrid(1);
+
+        snd.cluster().active(true);
+
+        File fileToSend = createFileRandomData("testFile", fileSizeBytes);
+        File rcvFile = new File(tempStore, "testFile" + "_" + rcv.localNode().id());
+
+        rcv.context().io().addTransmissionHandler(topic, new DefaultTransmissionHandler(rcv, fileToSend, tempStore) {
+            @Override public String filePath(UUID nodeId, TransmissionMeta fileMeta) {
+                if (throwFirstTime.compareAndSet(false, true))
+                    throw new IgniteException("Test exception. Initialization fail.");
+
+                return rcvFile.getAbsolutePath();
+            }
+        });
+
+        try (GridIoManager.TransmissionSender sender = snd.context()
+            .io()
+            .openTransmissionSender(rcv.localNode().id(), topic)) {
+            sender.send(fileToSend, TransmissionPolicy.FILE);
+        }
+
+        assertEquals(fileToSend.length(), rcvFile.length());
+        assertCrcEquals(fileToSend, rcvFile);
+    }
+
+    /**
+     * @throws Exception If fails.
+     */
+    @Test
+    public void testFileHandlerNextWriterOpened() throws Exception {
+        final int fileSizeBytes = 5 * 1024 * 1024;
+        final AtomicBoolean networkExThrown = new AtomicBoolean();
+
+        snd = startGrid(0);
+        rcv = startGrid(1);
+
+        snd.cluster().active(true);
+
+        File fileToSend = createFileRandomData("File_5MB", fileSizeBytes);
+        File rcvFile = new File(tempStore, "File_5MB" + "_" + rcv.localNode().id());
+
+        rcv.context().io().addTransmissionHandler(topic, new DefaultTransmissionHandler(rcv, fileToSend, tempStore) {
+            @Override public void onException(UUID nodeId, Throwable err) {
+                assertEquals("Previous session is not closed properly", IgniteCheckedException.class, err.getClass());
+            }
+
+            @Override public String filePath(UUID nodeId, TransmissionMeta fileMeta) {
+                if (networkExThrown.compareAndSet(false, true))
+                    return null;
+
+                return rcvFile.getAbsolutePath();
+            }
+        });
+
+        Exception expectedErr = null;
+
+        try (GridIoManager.TransmissionSender sender = snd.context()
+            .io()
+            .openTransmissionSender(rcv.localNode().id(), topic)) {
+            sender.send(fileToSend, TransmissionPolicy.FILE);
+        }
+        catch (IgniteCheckedException e) {
+            // Expected exception.
+            expectedErr = e;
+        }
+
+        assertNotNull("Transmission must ends with an exception", expectedErr);
+
+        // Open next session and complete successfull.
+        try (GridIoManager.TransmissionSender sender = snd.context()
+            .io()
+            .openTransmissionSender(rcv.localNode().id(), topic)) {
+            sender.send(fileToSend, TransmissionPolicy.FILE);
+        }
+
+        assertEquals(fileToSend.length(), rcvFile.length());
+        assertCrcEquals(fileToSend, rcvFile);
+    }
+
+    /**
+     * @throws Exception If fails.
+     */
+    @Test(expected = IgniteException.class)
+    public void testFileHandlerSendToNullTopic() throws Exception {
+        snd = startGrid(0);
+        rcv = startGrid(1);
+
+        snd.cluster().active(true);
+
+        // Ensure topic handler is empty.
+        rcv.context().io().removeTransmissionHandler(topic);
+
+        // Open next writer on removed topic.
+        try (GridIoManager.TransmissionSender sender = snd.context()
+            .io()
+            .openTransmissionSender(rcv.localNode().id(), topic)) {
+            sender.send(createFileRandomData("File_1MB", 1024 * 1024), TransmissionPolicy.FILE);
+        }
+    }
+
+    /**
+     * @throws Exception If fails.
+     */
+    @Test(expected = IgniteCheckedException.class)
+    public void testFileHandlerChannelCloseIfAnotherOpened() throws Exception {
+        final int fileSizeBytes = 5 * 1024 * 1024;
+        final CountDownLatch waitLatch = new CountDownLatch(2);
+        final CountDownLatch completionWait = new CountDownLatch(2);
+
+        snd = startGrid(0);
+        rcv = startGrid(1);
+
+        snd.cluster().active(true);
+
+        File fileToSend = createFileRandomData("file5MBSize", fileSizeBytes);
+
+        rcv.context().io().addTransmissionHandler(topic, new DefaultTransmissionHandler(rcv, fileToSend, tempStore) {
+            @Override public String filePath(UUID nodeId, TransmissionMeta fileMeta) {
+                waitLatch.countDown();
+
+                return super.filePath(nodeId, fileMeta);
+            }
+        });
+
+        Exception[] errs = new Exception[1];
+
+        try (GridIoManager.TransmissionSender sender = snd.context()
+            .io()
+            .openTransmissionSender(rcv.localNode().id(), topic);
+
+             GridIoManager.TransmissionSender anotherSender = snd.context()
+                 .io()
+                 .openTransmissionSender(rcv.localNode().id(), topic)) {
+            // Will connect on write attempt.
+            GridTestUtils.runAsync(() -> {
+                try {
+                    sender.send(fileToSend, TransmissionPolicy.FILE);
+                }
+                catch (IgniteCheckedException | IOException | InterruptedException e) {
+                    errs[0] = e;
+                }
+                finally {
+                    completionWait.countDown();
+                }
+            });
+
+            GridTestUtils.runAsync(() -> {
+                try {
+                    anotherSender.send(fileToSend, TransmissionPolicy.FILE);
+                }
+                catch (IgniteCheckedException | IOException | InterruptedException e) {
+                    errs[0] = e;
+                }
+                finally {
+                    completionWait.countDown();
+                }
+            });
+
+            waitLatch.await(5, TimeUnit.SECONDS);
+
+            // Expected that one of the writers will throw exception.
+            assertFalse("An error must be thrown if connected to the same topic during processing",
+                errs[0] == null);
+
+            completionWait.await(5, TimeUnit.SECONDS);
+
+            throw errs[0];
+        }
+    }
+
+    /**
+     * @throws Exception If fails.
+     */
+    @Test
+    public void testChunkHandlerWithReconnect() throws Exception {
+        snd = startGrid(0);
+        rcv = startGrid(1);
+
+        final String filePrefix = "testFile";
+        final AtomicInteger cnt = new AtomicInteger();
+        final AtomicInteger acceptedChunks = new AtomicInteger();
+        final File file = new File(tempStore, filePrefix + "_" + rcv.localNode().id());
+
+        snd.cluster().active(true);
+
+        File fileToSend = createFileRandomData(filePrefix, 10 * 1024 * 1024);
+
+        transmissionFileIoFactory(snd, new FileIOFactory() {
+            @Override public FileIO create(File file, OpenOption... modes) throws IOException {
+                FileIO fileIo = IO_FACTORY.create(file, modes);
+
+                return new FileIODecorator(fileIo) {
+                    /** {@inheritDoc} */
+                    @Override public long transferTo(long position, long count, WritableByteChannel target)
+                        throws IOException {
+                        // Send 5 chunks and close the channel.
+                        if (cnt.incrementAndGet() == 10)
+                            target.close();
+
+                        return super.transferTo(position, count, target);
+                    }
+                };
+            }
+        });
+
+        rcv.context().io().addTransmissionHandler(topic, new TransmissionHandlerAdapter() {
+            /** {@inheritDoc} */
+            @Override public void onException(UUID nodeId, Throwable err) {
+                U.closeQuiet(fileIo[0]);
+
+                fileIo[0] = null;
+            }
+
+            /** {@inheritDoc} */
+            @Override public Consumer<ByteBuffer> chunkHandler(UUID nodeId, TransmissionMeta initMeta) {
+
+                if (fileIo[0] == null) {
+                    try {
+                        fileIo[0] = IO_FACTORY.create(file);
+                        fileIo[0].position(initMeta.offset());
+                    }
+                    catch (IOException e) {
+                        throw new IgniteException(e);
+                    }
+                }
+
+                return new Consumer<ByteBuffer>() {
+                    final LongAdder transferred = new LongAdder();
+
+                    @Override public void accept(ByteBuffer buff) {
+                        try {
+                            assertTrue(buff.order() == ByteOrder.nativeOrder());
+                            assertEquals(0, buff.position());
+                            assertEquals(buff.limit(), buff.capacity());
+
+                            fileIo[0].writeFully(buff);
+
+                            acceptedChunks.getAndIncrement();
+                            transferred.add(buff.capacity());
+                        }
+                        catch (Throwable e) {
+                            throw new IgniteException(e);
+                        }
+                        finally {
+                            closeIfTransferred();
+                        }
+                    }
+
+                    private void closeIfTransferred() {
+                        if (transferred.longValue() == initMeta.count()) {
+                            U.closeQuiet(fileIo[0]);
+
+                            fileIo[0] = null;
+                        }
+                    }
+                };
+            }
+        });
+
+        try (GridIoManager.TransmissionSender sender = snd.context()
+            .io()
+            .openTransmissionSender(rcv.localNode().id(), topic)) {
+            sender.send(fileToSend, TransmissionPolicy.CHUNK);
+        }
+
+        assertEquals("Remote node must accept all chunks",
+            fileToSend.length() / rcv.configuration().getDataStorageConfiguration().getPageSize(),
+            acceptedChunks.get());
+        assertEquals("Received file and sent files have not the same lengtgh", fileToSend.length(), file.length());
+        assertCrcEquals(fileToSend, file);
+        assertNull(fileIo[0]);
+    }
+
+    /**
+     * @throws Exception If fails.
+     */
+    @Test(expected = IgniteCheckedException.class)
+    public void testChunkHandlerInitSizeFail() throws Exception {
+        snd = startGrid(0);
+        rcv = startGrid(1);
+
+        snd.cluster().active(true);
+
+        File fileToSend = createFileRandomData("testFile", 1024 * 1024);
+
+        rcv.context().io().addTransmissionHandler(topic, new TransmissionHandlerAdapter() {
+            /** {@inheritDoc} */
+            @Override public Consumer<ByteBuffer> chunkHandler(UUID nodeId, TransmissionMeta initMeta) {
+                throw new IgniteException("Test exception. Initialization failed");
+            }
+        });
+
+        try (GridIoManager.TransmissionSender sender = snd.context()
+            .io()
+            .openTransmissionSender(rcv.localNode().id(), topic)) {
+            sender.send(fileToSend, TransmissionPolicy.CHUNK);
+        }
+    }
+
+    /**
+     * @param ig Ignite instance to check.
+     */
+    private static void ensureResourcesFree(IgniteEx ig) {
+        if (ig == null)
+            return;
+
+        final GridIoManager io = ig.context().io();
+
+        ConcurrentMap<Object, Object> ctxs = GridTestUtils.getFieldValue(io, "rcvCtxs");
+        ConcurrentMap<T2<UUID, IgniteUuid>, AtomicBoolean> sndrFlags = GridTestUtils.getFieldValue(io, "senderStopFlags");
+
+        assertTrue("Receiver context map must be empty: " + ctxs, ctxs.isEmpty());
+        assertTrue("Sender stop flags must be empty: " + sndrFlags, sndrFlags.isEmpty());
+    }
+
+    /**
+     * @param ignite Ignite instance.
+     * @param cacheName Cache name to add data to.
+     */
+    private void addCacheData(Ignite ignite, String cacheName) {
+        try (IgniteDataStreamer<Integer, Integer> dataStreamer = ignite.dataStreamer(cacheName)) {
+            dataStreamer.allowOverwrite(true);
+
+            for (int i = 0; i < CACHE_SIZE; i++)
+                dataStreamer.addData(i, i + cacheName.hashCode());
+        }
+    }
+
+    /**
+     * @param ignite An ignite instance.
+     * @param cacheName Cache name.
+     * @return The cache working directory.
+     */
+    private File cacheWorkDir(IgniteEx ignite, String cacheName) {
+        // Resolve cache directory.
+        IgniteInternalCache<?, ?> cache = ignite.cachex(cacheName);
+
+        FilePageStoreManager pageStoreMgr = (FilePageStoreManager)cache.context()
+            .shared()
+            .pageStore();
+
+        return pageStoreMgr.cacheWorkDir(cache.configuration());
+    }
+
+    /**
+     * @param name The file name to create.
+     * @param size The file size.
+     * @throws IOException If fails.
+     */
+    private File createFileRandomData(String name, final int size) throws IOException {
+        ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+        File out = new File(tempStore, name);
+
+        try (RandomAccessFile raf = new RandomAccessFile(out, "rw")) {
+            byte[] buf = new byte[size];
+            rnd.nextBytes(buf);
+            raf.write(buf);
+        }
+
+        return out;
+    }
+
+    /**
+     * @param ignite Ignite instance to set factory.
+     * @param factory New factory to use.
+     */
+    private static void transmissionFileIoFactory(IgniteEx ignite, FileIOFactory factory) {
+        setFieldValue(ignite.context().io(), "fileIoFactory", factory);
+    }
+
+    /**
+     * @param fileToSend Source file to check CRC.
+     * @param fileReceived Destination file to check CRC.
+     */
+    private static void assertCrcEquals(File fileToSend, File fileReceived) {
+        try {
+            assertEquals(FastCrc.calcCrc(fileToSend), FastCrc.calcCrc(fileReceived));
+        }
+        catch (IOException e) {
+            throw new AssertionError(e);
+        }
+    }
+
+    /** The defailt implementation of transmit session. */
+    private static class DefaultTransmissionHandler extends TransmissionHandlerAdapter {
+        /** Ignite recevier node. */
+        private final IgniteEx rcv;
+
+        /** File to be send. */
+        private final File fileToSend;
+
+        /** Temporary local storage. */
+        private final File tempStorage;
+
+        /**
+         * @param rcv Ignite recevier node.
+         * @param fileToSend File to be send.
+         * @param tempStorage Temporary local storage.
+         */
+        public DefaultTransmissionHandler(IgniteEx rcv, File fileToSend, File tempStorage) {
+            this.rcv = rcv;
+            this.fileToSend = fileToSend;
+            this.tempStorage = tempStorage;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String filePath(UUID nodeId, TransmissionMeta fileMeta) {
+            return new File(tempStorage, fileMeta.name() + "_" + rcv.localNode().id()).getAbsolutePath();
+        }
+
+        /** {@inheritDoc} */
+        @Override public Consumer<File> fileHandler(UUID nodeId, TransmissionMeta initMeta) {
+            return new Consumer<File>() {
+                @Override public void accept(File file) {
+                    assertEquals(fileToSend.length(), file.length());
+                    assertCrcEquals(fileToSend, file);
+                }
+            };
+        }
+    }
+
+    /** */
+    private static class BlockingOpenChannelCommunicationSpi extends TcpCommunicationSpi {
+        /** Latch to wait at. */
+        private final CountDownLatch latch = new CountDownLatch(1);
+
+        /** {@code true} to start waiting. */
+        private volatile boolean block;
+
+        /** {@inheritDoc} */
+        @Override public IgniteInternalFuture<Channel> openChannel(ClusterNode remote,
+            Message initMsg) throws IgniteSpiException {
+            try {
+                if (block) {
+                    U.log(log, "Start waiting on trying open a new channel");
+
+                    latch.await(5, TimeUnit.SECONDS);
+                }
+            }
+            catch (InterruptedException e) {
+                throw new IgniteException(e);
+            }
+
+            return super.openChannel(remote, initMsg);
+        }
+    }
+
+    /**
+     * The defailt implementation of transmit session.
+     */
+    private static class TransmissionHandlerAdapter implements TransmissionHandler {
+        /** {@inheritDoc} */
+        @Override public String filePath(UUID nodeId, TransmissionMeta fileMeta) {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Consumer<ByteBuffer> chunkHandler(UUID nodeId, TransmissionMeta initMeta) {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Consumer<File> fileHandler(UUID nodeId, TransmissionMeta initMeta) {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onException(UUID nodeId, Throwable err) {
+            // No-op.
+        }
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
index d6b9b99..ed9079d 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
@@ -45,6 +45,7 @@ import org.apache.ignite.internal.MarshallerContextLockingSelfTest;
 import org.apache.ignite.internal.TransactionsMXBeanImplTest;
 import org.apache.ignite.internal.managers.IgniteDiagnosticMessagesMultipleConnectionsTest;
 import org.apache.ignite.internal.managers.IgniteDiagnosticMessagesTest;
+import org.apache.ignite.internal.managers.communication.GridIoManagerFileTransmissionSelfTest;
 import org.apache.ignite.internal.managers.discovery.IncompleteDeserializationExceptionTest;
 import org.apache.ignite.internal.pagemem.wal.record.WALRecordTest;
 import org.apache.ignite.internal.processors.DeadLockOnNodeLeftExchangeTest;
@@ -252,7 +253,9 @@ import org.junit.runners.Suite;
 
     ClassPathContentLoggingTest.class,
 
-    IncompleteDeserializationExceptionTest.class
+    IncompleteDeserializationExceptionTest.class,
+
+    GridIoManagerFileTransmissionSelfTest.class
 })
 public class IgniteBasicTestSuite {
 }