You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ni...@apache.org on 2019/08/13 15:51:16 UTC
[ignite] branch master updated: IGNITE-10619: Transfer of files
between nodes via CommumicationSpi.
This is an automated email from the ASF dual-hosted git repository.
nizhikov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 7d1b87a IGNITE-10619: Transfer of files between nodes via CommumicationSpi.
7d1b87a is described below
commit 7d1b87a67d6e115bff41eb80e7755a070b3f32ac
Author: Maxim Muzafarov <ma...@gmail.com>
AuthorDate: Tue Aug 13 18:51:00 2019 +0300
IGNITE-10619: Transfer of files between nodes via CommumicationSpi.
---
.../org/apache/ignite/internal/IgniteFeatures.java | 3 +
.../communication/AbstractTransmission.java | 111 +++
.../managers/communication/ChunkReceiver.java | 115 +++
.../managers/communication/FileReceiver.java | 129 +++
.../managers/communication/FileSender.java | 172 ++++
.../managers/communication/GridIoManager.java | 983 ++++++++++++++++++++-
.../communication/GridIoMessageFactory.java | 5 +
.../communication/SessionChannelMessage.java | 136 +++
.../communication/TransmissionHandler.java | 76 ++
.../managers/communication/TransmissionMeta.java | 197 +++++
.../managers/communication/TransmissionPolicy.java | 43 +
.../communication/TransmissionReceiver.java | 71 ++
.../processors/cache/persistence/file/FileIO.java | 27 +
.../cache/persistence/file/FileIODecorator.java | 12 +
.../cache/persistence/file/RandomAccessFileIO.java | 17 +
.../cache/persistence/wal/crc/FastCrc.java | 27 +-
.../ignite/internal/util/nio/GridNioServer.java | 21 +-
.../util/nio/GridSelectorNioSessionImpl.java | 21 +-
.../spi/communication/tcp/TcpCommunicationSpi.java | 203 ++++-
.../tcp/internal/CommunicationListenerEx.java | 37 +
.../communication/tcp/internal/ConnectionKey.java | 8 +
.../GridIoManagerFileTransmissionSelfTest.java | 981 ++++++++++++++++++++
.../ignite/testsuites/IgniteBasicTestSuite.java | 5 +-
23 files changed, 3387 insertions(+), 13 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java
index 9f11a69..f356a9b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java
@@ -66,6 +66,9 @@ public enum IgniteFeatures {
/** Distributed metastorage. */
DISTRIBUTED_METASTORAGE(11),
+ /** The node can communicate with others via socket channel. */
+ CHANNEL_COMMUNICATION(12),
+
/** Replacing TcpDiscoveryNode field with nodeId field in discovery messages. */
TCP_DISCOVERY_MESSAGE_NODE_COMPACT_REPRESENTATION(14);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/AbstractTransmission.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/AbstractTransmission.java
new file mode 100644
index 0000000..bd1da54
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/AbstractTransmission.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.communication;
+
+import java.io.Closeable;
+import java.nio.channels.SocketChannel;
+import java.util.function.BooleanSupplier;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * Class represents base object which can transmit files (read or write) by chunks of
+ * predefined size over an opened {@link SocketChannel}.
+ */
+abstract class AbstractTransmission implements Closeable {
+ /** Node stopping checker. */
+ private final BooleanSupplier stopChecker;
+
+ /** The size of segment for the read. */
+ protected final int chunkSize;
+
+ /** Ignite logger. */
+ protected final IgniteLogger log;
+
+ /** Initial meta with file transferred attributes. */
+ protected TransmissionMeta meta;
+
+ /** The number of bytes successfully transferred druring iteration. */
+ protected long transferred;
+
+ /**
+ * @param meta Initial file meta info.
+ * @param stopChecker Node stop or prcoess interrupt checker.
+ * @param log Ignite logger.
+ * @param chunkSize Size of chunks.
+ */
+ protected AbstractTransmission(
+ TransmissionMeta meta,
+ BooleanSupplier stopChecker,
+ IgniteLogger log,
+ int chunkSize
+ ) {
+ A.notNull(meta, "Initial file meta cannot be null");
+ A.notNullOrEmpty(meta.name(), "Trasmisson name cannot be empty or null");
+ A.ensure(meta.offset() >= 0, "File start position cannot be negative");
+ A.ensure(meta.count() > 0, "Total number of bytes to transfer must be greater than zero");
+ A.notNull(stopChecker, "Process stop checker cannot be null");
+ A.ensure(chunkSize > 0, "Size of chunks to transfer data must be positive");
+
+ this.stopChecker = stopChecker;
+ this.log = log.getLogger(AbstractTransmission.class);
+ this.chunkSize = chunkSize;
+ this.meta = meta;
+ }
+
+ /**
+ * @return Current receiver state written to a {@link TransmissionMeta} instance.
+ */
+ public TransmissionMeta state() {
+ assert meta != null;
+
+ return new TransmissionMeta(meta.name(),
+ meta.offset() + transferred,
+ meta.count() - transferred,
+ meta.params(),
+ meta.policy(),
+ null);
+ }
+
+ /**
+ * @return Number of bytes which has been transferred.
+ */
+ public long transferred() {
+ return transferred;
+ }
+
+ /**
+ * @return {@code true} if the transmission process should be interrupted.
+ */
+ protected boolean stopped() {
+ return stopChecker.getAsBoolean();
+ }
+
+ /**
+ * @return {@code true} if and only if a chunked object has received all the data it expects.
+ */
+ protected boolean hasNextChunk() {
+ return transferred < meta.count();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(AbstractTransmission.class, this);
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/ChunkReceiver.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/ChunkReceiver.java
new file mode 100644
index 0000000..24aeef2
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/ChunkReceiver.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.communication;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.SocketChannel;
+import java.util.function.BooleanSupplier;
+import java.util.function.Consumer;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * Chunk receiver used to read a given {@link SocketChannel} by chunks of predefined size into
+ * an allocated {@link ByteBuffer}.
+ */
+class ChunkReceiver extends TransmissionReceiver {
+ /** Handler which accepts received data from the given socket. */
+ private final Consumer<ByteBuffer> hnd;
+
+ /** Destination buffer to transfer data to\from. */
+ private ByteBuffer buf;
+
+ /**
+ * @param meta Initial file meta info.
+ * @param chunkSize Size of the chunk.
+ * @param stopChecker Node stop or prcoess interrupt checker.
+ * @param hnd Transmission handler to process received data.
+ * @param log Ignite logger.
+ */
+ public ChunkReceiver(
+ TransmissionMeta meta,
+ int chunkSize,
+ BooleanSupplier stopChecker,
+ Consumer<ByteBuffer> hnd,
+ IgniteLogger log
+ ) {
+ super(meta, stopChecker, log, chunkSize);
+
+ A.notNull(hnd, "ChunkHandler must be provided by transmission handler");
+
+ this.hnd = hnd;
+
+ buf = ByteBuffer.allocate(chunkSize);
+ buf.order(ByteOrder.nativeOrder());
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void readChunk(ReadableByteChannel ch) throws IOException {
+ assert buf != null : "Buffer cannot be null since it is used to receive the data from channel: " + this;
+
+ buf.rewind();
+
+ int read = 0;
+ int res;
+
+ // Read data from input channel until the buffer will be completely filled
+ // (buf.remaining() returns 0) or partitially filled buffer if it was the last chunk.
+ while (true) {
+ res = ch.read(buf);
+
+ // Read will return -1 if remote node close connection.
+ if (res < 0) {
+ if (transferred + read != meta.count()) {
+ throw new IOException("Input data channel reached its end, but file has not fully loaded " +
+ "[transferred=" + transferred + ", read=" + read + ", total=" + meta.count() + ']');
+ }
+
+ break;
+ }
+
+ read += res;
+
+ if (read == buf.capacity() || buf.position() == buf.capacity())
+ break;
+ }
+
+ if (read == 0)
+ return;
+
+ transferred += read;
+
+ buf.flip();
+
+ hnd.accept(buf);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(ChunkReceiver.class, this, "super", super.toString());
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/FileReceiver.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/FileReceiver.java
new file mode 100644
index 0000000..6af3ca4
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/FileReceiver.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.communication;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.file.Files;
+import java.util.function.BooleanSupplier;
+import java.util.function.Consumer;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ * Class represents the data receiver which is pulling data from channel using
+ * {@link FileChannel#transferFrom(ReadableByteChannel, long, long)} until the
+ * whole file will be completely received.
+ */
+class FileReceiver extends TransmissionReceiver {
+ /** Handler to notify when a file has been received. */
+ private final Consumer<File> hnd;
+
+ /** File to receive data into. */
+ private File file;
+
+ /** The corresponding file channel to work with. */
+ @GridToStringExclude
+ private FileIO fileIo;
+
+ /**
+ * @param meta Initial file meta info.
+ * @param stopChecker Node stop or prcoess interrupt checker.
+ * @param factory Factory to produce IO interface on files.
+ * @param hnd Transmission handler provider to process download result.
+ * @param path File path to destination receiver source.
+ * @param log Ignite logger.
+ */
+ public FileReceiver(
+ TransmissionMeta meta,
+ int chunkSize,
+ BooleanSupplier stopChecker,
+ FileIOFactory factory,
+ Consumer<File> hnd,
+ String path,
+ IgniteLogger log
+ ) {
+ super(meta, stopChecker, log, chunkSize);
+
+ A.notNull(hnd, "FileHandler must be provided by transmission handler");
+ A.notNull(path, "File absolute path cannot be null");
+ A.ensure(!path.trim().isEmpty(), "File absolute path cannot be empty ");
+
+ this.hnd = hnd;
+
+ file = new File(path);
+
+ try {
+ fileIo = factory.create(file);
+
+ fileIo.position(meta.offset());
+ }
+ catch (IOException e) {
+ throw new IgniteException("Unable to open destination file. Receiver will will be stopped", e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void receive(ReadableByteChannel ch) throws IOException, InterruptedException {
+ super.receive(ch);
+
+ if (transferred == meta.count())
+ hnd.accept(file);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void readChunk(ReadableByteChannel ch) throws IOException {
+ assert fileIo != null;
+
+ long batchSize = Math.min(chunkSize, meta.count() - transferred);
+
+ long read = fileIo.transferFrom(ch, meta.offset() + transferred, batchSize);
+
+ if (read == 0)
+ throw new IOException("Channel is reached the end of stream. Probably, channel is closed on the remote node");
+
+ if (read > 0)
+ transferred += read;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() {
+ U.closeQuiet(fileIo);
+
+ try {
+ if (transferred != meta.count())
+ Files.delete(file.toPath());
+ }
+ catch (IOException e) {
+ U.error(log, "Error deleting not fully loaded file: " + file, e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(FileReceiver.class, this, "super", super.toString());
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/FileSender.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/FileSender.java
new file mode 100644
index 0000000..a4c060f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/FileSender.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.communication;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.ObjectOutput;
+import java.io.Serializable;
+import java.nio.channels.FileChannel;
+import java.nio.channels.WritableByteChannel;
+import java.util.Map;
+import java.util.function.BooleanSupplier;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.util.IgniteUtils.assertParameter;
+
+/**
+ * Class represents a data sender by chunks of predefined size. All of the chunks will be written to the
+ * given socket channel. Please note, that for each file you are going to send a new {@link FileSender}
+ * instance will be created. The sender must keep its internal state of how much data already being
+ * transferred to send its state to remote node when reconnection required.
+ * <p>
+ * The <em>FileSender</em> uses the zero-copy streaming approach, see <em>FileChannel#transferTo</em> for details.
+ *
+ * @see FileChannel#transferTo(long, long, WritableByteChannel)
+ */
+class FileSender extends AbstractTransmission {
+ /** Corresponding file channel to work with a given file. */
+ @GridToStringExclude
+ private FileIO fileIo;
+
+ /**
+ * @param file File which is going to be sent by chunks.
+ * @param off File offset.
+ * @param cnt Number of bytes to transfer.
+ * @param params Additional file params.
+ * @param plc Policy of handling data on remote.
+ * @param stopChecker Node stop or process interrupt checker.
+ * @param log Ignite logger.
+ * @param factory Factory to produce IO interface on given file.
+ * @param chunkSize Size of chunks.
+ * @throws IOException If fails.
+ */
+ public FileSender(
+ File file,
+ long off,
+ long cnt,
+ Map<String, Serializable> params,
+ TransmissionPolicy plc,
+ BooleanSupplier stopChecker,
+ IgniteLogger log,
+ FileIOFactory factory,
+ int chunkSize
+ ) throws IOException {
+ super(new TransmissionMeta(file.getName(), off, cnt, params, plc, null), stopChecker, log, chunkSize);
+
+ assert file != null;
+
+ fileIo = factory.create(file);
+ }
+
+ /**
+ * @param ch Output channel to write file to.
+ * @param oo Channel to write meta info to.
+ * @param rcvMeta Connection meta received.
+ * @throws IOException If a transport exception occurred.
+ * @throws InterruptedException If thread interrupted.
+ */
+ public void send(WritableByteChannel ch,
+ ObjectOutput oo,
+ @Nullable TransmissionMeta rcvMeta
+ ) throws IOException, InterruptedException {
+ updateSenderState(rcvMeta);
+
+ // Write flag to remote to keep currnet transmission opened.
+ oo.writeBoolean(false);
+
+ // Send meta about current file to remote.
+ oo.writeObject(new TransmissionMeta(meta.name(),
+ meta.offset() + transferred,
+ meta.count() - transferred,
+ meta.params(),
+ meta.policy(),
+ null));
+
+ while (hasNextChunk()) {
+ if (Thread.currentThread().isInterrupted())
+ throw new InterruptedException("Sender thread has been interruped");
+
+ if (stopped())
+ throw new IgniteException("Sender has been cancelled due to the local node is stopping");
+
+ writeChunk(ch);
+ }
+
+ assert transferred == meta.count() : "File is not fully transferred [expect=" + meta.count() +
+ ", actual=" + transferred + ']';
+ }
+
+ /**
+ * @param rcvMeta Conneciton meta info.
+ */
+ private void updateSenderState(TransmissionMeta rcvMeta) {
+ // The remote node doesn't have a file meta info.
+ if (rcvMeta == null || rcvMeta.offset() < 0) {
+ transferred = 0;
+
+ return;
+ }
+
+ long uploadedBytes = rcvMeta.offset() - meta.offset();
+
+ assertParameter(meta.name().equals(rcvMeta.name()), "Attempt to transfer different file " +
+ "while previous is not completed [meta=" + meta + ", meta=" + rcvMeta + ']');
+
+ assertParameter(uploadedBytes >= 0, "Incorrect sync meta [offset=" + rcvMeta.offset() +
+ ", meta=" + meta + ']');
+
+ // No need to set new file position, if it is not changed.
+ if (uploadedBytes == 0)
+ return;
+
+ transferred = uploadedBytes;
+
+ U.log(log, "The number of transferred bytes after reconnect has been updated: " + uploadedBytes);
+ }
+
+ /**
+ * @param ch Channel to write data to.
+ * @throws IOException If fails.
+ */
+ private void writeChunk(WritableByteChannel ch) throws IOException {
+ long batchSize = Math.min(chunkSize, meta.count() - transferred);
+
+ long sent = fileIo.transferTo(meta.offset() + transferred, batchSize, ch);
+
+ if (sent > 0)
+ transferred += sent;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() {
+ U.closeQuiet(fileIo);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(FileSender.class, this, "super", super.toString());
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 91064d3..c952d68 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -17,7 +17,23 @@
package org.apache.ignite.internal.managers.communication;
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInput;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutput;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
import java.io.Serializable;
+import java.net.Socket;
+import java.net.SocketTimeoutException;
+import java.nio.channels.Channel;
+import java.nio.channels.FileChannel;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.SocketChannel;
+import java.nio.channels.WritableByteChannel;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
@@ -25,10 +41,13 @@ import java.util.Collection;
import java.util.Date;
import java.util.Deque;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Queue;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentHashMap;
@@ -48,10 +67,15 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.BooleanSupplier;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteMessaging;
+import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.GridKernalContext;
@@ -59,7 +83,10 @@ import org.apache.ignite.internal.GridTopic;
import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
import org.apache.ignite.internal.IgniteComponentType;
import org.apache.ignite.internal.IgniteDeploymentCheckedException;
+import org.apache.ignite.internal.IgniteFeatures;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.direct.DirectMessageReader;
import org.apache.ignite.internal.direct.DirectMessageWriter;
@@ -68,6 +95,10 @@ import org.apache.ignite.internal.managers.deployment.GridDeployment;
import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccMessage;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIO;
+import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
import org.apache.ignite.internal.processors.metric.MetricRegistry;
import org.apache.ignite.internal.processors.platform.message.PlatformMessageFilter;
import org.apache.ignite.internal.processors.pool.PoolProcessor;
@@ -79,9 +110,12 @@ import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridTuple3;
import org.apache.ignite.internal.util.lang.IgnitePair;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -99,6 +133,7 @@ import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.communication.CommunicationListener;
import org.apache.ignite.spi.communication.CommunicationSpi;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.communication.tcp.internal.CommunicationListenerEx;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -108,6 +143,8 @@ import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE_COORDINATOR;
import static org.apache.ignite.internal.GridTopic.TOPIC_COMM_USER;
import static org.apache.ignite.internal.GridTopic.TOPIC_IO_TEST;
+import static org.apache.ignite.internal.IgniteFeatures.CHANNEL_COMMUNICATION;
+import static org.apache.ignite.internal.IgniteFeatures.nodeSupports;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.AFFINITY_POOL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.DATA_STREAMER_POOL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.IDX_POOL;
@@ -126,7 +163,68 @@ import static org.apache.ignite.internal.util.nio.GridNioBackPressureControl.thr
import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q_OPTIMIZED_RMV;
/**
- * Grid communication manager.
+ * This class represents the internal grid communication (<em>input</em> and <em>output</em>) manager
+ * which is placed as a layer of indirection between the {@link IgniteKernal} and {@link CommunicationSpi}.
+ * The IO manager is responsible for controlling CommunicationSPI which in turn is responsible
+ * for exchanging data between Ignite nodes.
+ *
+ * <h2>Data exchanging</h2>
+ * <p>
+ * Communication manager provides a rich API for data exchanging between a pair of cluster nodes. Two types
+ * of communication <em>Message-based communication</em> and <em>File-based communication</em> are available.
+ * Each of them support sending data to an arbitrary topic on the remote node (see {@link GridTopic} for an
+ * additional information about Ignite topics).
+ *
+ * <h3>Message-based communication</h3>
+ * <p>
+ * {@link Message} and {@link GridTopic} are used to provide a topic-based messaging protocol
+ * between cluster nodes. All of messages used for data exchanging can be devided into two general types:
+ * <em>internal</em> and <em>user</em> messages.
+ * <p>
+ * <em>Internal message</em> communication is used by Ignite Kernal. Please, refer to appropriate methods below:
+ * <ul>
+ * <li>{@link #sendToGridTopic(ClusterNode, GridTopic, Message, byte)}</li>
+ * <li>{@link #sendOrderedMessage(ClusterNode, Object, Message, byte, long, boolean)}</li>
+ * <li>{@link #addMessageListener(Object, GridMessageListener)}</li>
+ * </ul>
+ * <p>
+ * <em>User message</em> communication is directly exposed to the {@link IgniteMessaging} API and provides
+ * for user functionality for topic-based message exchanging among nodes within the cluser defined
+ * by {@link ClusterGroup}. Please, refer to appropriate methods below:
+ * <ul>
+ * <li>{@link #sendToCustomTopic(ClusterNode, Object, Message, byte)}</li>
+ * <li>{@link #addUserMessageListener(Object, IgniteBiPredicate, UUID)}</li>
+ * </ul>
+ *
+ * <h3>File-based communication</h3>
+ * <p>
+ * Sending or receiving binary data (represented by a <em>File</em>) over a <em>SocketChannel</em> is only
+ * possible when the build-in {@link TcpCommunicationSpi} implementation of Communication SPI is used and
+ * both local and remote nodes are {@link IgniteFeatures#CHANNEL_COMMUNICATION CHANNEL_COMMUNICATION} feature
+ * support. To ensue that the remote node satisfies all conditions the {@link #fileTransmissionSupported(ClusterNode)}
+ * method must be called prior to data sending.
+ * <p>
+ * It is possible to receive a set of files on a particular topic (any of {@link GridTopic}) on the remote node.
+ * A transmission handler for desired topic must be registered prior to opening transmission sender to it.
+ * Methods below are used to register handlers and open new transmissions:
+ * <ul>
+ * <li>{@link #addTransmissionHandler(Object, TransmissionHandler)}</li>
+ * <li>{@link #removeTransmissionHandler(Object)}</li>
+ * <li>{@link #openTransmissionSender(UUID, Object)}</li>
+ * </ul>
+ * <p>
+ * Each transmission sender opens a new transmission session to remote node prior to sending files over it.
+ * (see description of {@link TransmissionSender} for details). The {@link TransmissionSender}
+ * will send all files within single session syncronously one by one.
+ * <p>
+ * <em>NOTE.</em> It is important to call <em>close()</em> method or use <em>try-with-resource</em>
+ * statement to release all resources once you've done with the transmission session. This ensures that all
+ * resources are released on remote node in a proper way (i.e. transmission handlers are closed).
+ * <p>
+ *
+ * @see TcpCommunicationSpi
+ * @see IgniteMessaging
+ * @see TransmissionHandler
*/
public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializable>> {
/** Io communication metrics registry name. */
@@ -162,6 +260,45 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
/** Current IO policy. */
private static final ThreadLocal<Byte> CUR_PLC = new ThreadLocal<>();
+ /**
+ * Default chunk size in bytes used for sending\receiving files over a {@link SocketChannel}.
+ * Setting the transfer chunk size more than <tt>1 MB</tt> is meaningless because there is
+ * no asymptotic benefit. What you're trying to achieve with larger transfer chunk sizes is
+ * fewer thread context switches, and every time we double the transfer size you have
+ * the context switch cost.
+ * <p>
+ * Default value is {@code 256Kb}.
+ */
+ private static final int DFLT_CHUNK_SIZE_BYTES = 256 * 1024;
+
+ /** Mutex to achieve consistency of transmission handlers and receiver contexts. */
+ private final Object rcvMux = new Object();
+
+ /** Map of registered handlers per each IO topic. */
+ private final ConcurrentMap<Object, TransmissionHandler> topicTransmissionHnds = new ConcurrentHashMap<>();
+
+ /** The map of already known channel read contexts by its registered topics. */
+ private final ConcurrentMap<Object, ReceiverContext> rcvCtxs = new ConcurrentHashMap<>();
+
+ /** The map of sessions which are currently writing files and their corresponding interruption flags. */
+ private final ConcurrentMap<T2<UUID, IgniteUuid>, AtomicBoolean> senderStopFlags = new ConcurrentHashMap<>();
+
+ /**
+ * Default factory to provide IO operation interface over files for further transmission them between nodes.
+ * Some implementations of file senders\receivers are using the zero-copy approach to transfer bytes
+ * from a file to the given {@link SocketChannel} and vice-versa. So, it is necessary to produce an {@link FileIO}
+ * implementation based on {@link FileChannel} which is reflected in Ignite project as {@link RandomAccessFileIO}.
+ *
+ * @see FileChannel#transferTo(long, long, WritableByteChannel)
+ */
+ private final FileIOFactory fileIoFactory = new RandomAccessFileIOFactory();
+
+ /** The maximum number of retry attempts (read or write attempts). */
+ private final int retryCnt;
+
+ /** Network timeout in milliseconds. */
+ private final int netTimeoutMs;
+
/** Listeners by topic. */
private final ConcurrentMap<Object, GridMessageListener> lsnrMap = new ConcurrentHashMap<>();
@@ -191,7 +328,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
private final ConcurrentMap<UUID, Deque<DelayedMessage>> waitMap = new ConcurrentHashMap<>();
/** Communication message listener. */
- private CommunicationListener<Serializable> commLsnr;
+ private CommunicationListenerEx<Serializable> commLsnr;
/** Grid marshaller. */
private final Marshaller marsh;
@@ -218,7 +355,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
private MessageFormatter formatter;
/** Stopping flag. */
- private boolean stopping;
+ private volatile boolean stopping;
/** */
private final AtomicReference<ConcurrentHashMap<Long, IoTestFuture>> ioTestMap = new AtomicReference<>();
@@ -263,6 +400,9 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
"Received messages count.");
ioMetric.register(RCVD_BYTES_CNT, spi::getReceivedBytesCount, "Received bytes count.");
+
+ retryCnt = ctx.config().getNetworkSendRetryCount();
+ netTimeoutMs = (int)ctx.config().getNetworkTimeout();
}
/**
@@ -294,7 +434,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
@Override public void start() throws IgniteCheckedException {
startSpi();
- getSpi().setListener(commLsnr = new CommunicationListener<Serializable>() {
+ getSpi().setListener(commLsnr = new CommunicationListenerEx<Serializable>() {
@Override public void onMessage(UUID nodeId, Serializable msg, IgniteRunnable msgC) {
try {
onMessage0(nodeId, (GridIoMessage)msg, msgC);
@@ -310,6 +450,17 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
for (GridDisconnectListener lsnr : disconnectLsnrs)
lsnr.onNodeDisconnected(nodeId);
}
+
+ @Override public void onChannelOpened(UUID rmtNodeId, Serializable initMsg, Channel channel) {
+ try {
+ onChannelOpened0(rmtNodeId, (GridIoMessage)initMsg, channel);
+ }
+ catch (ClassCastException ignored) {
+ U.error(log, "Communication manager received message of unknown type (will ignore): " +
+ initMsg.getClass().getName() + ". Most likely GridCommunicationSpi is being used directly, " +
+ "which is illegal - make sure to send messages only via GridProjection API.");
+ }
+ }
});
ctx.addNodeAttribute(DIRECT_PROTO_VER_ATTR, DIRECT_PROTO_VER);
@@ -773,6 +924,36 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
case EVT_NODE_LEFT:
case EVT_NODE_FAILED:
+ busyLock.readLock().lock();
+
+ try {
+ // Stop all writer sessions.
+ for (Map.Entry<T2<UUID, IgniteUuid>, AtomicBoolean> writeSesEntry: senderStopFlags.entrySet()) {
+ if (writeSesEntry.getKey().get1().equals(nodeId))
+ writeSesEntry.getValue().set(true);
+ }
+
+ synchronized (rcvMux) {
+ // Clear the context on the uploader node left.
+ Iterator<Map.Entry<Object, ReceiverContext>> it = rcvCtxs.entrySet().iterator();
+
+ while (it.hasNext()) {
+ Map.Entry<Object, ReceiverContext> e = it.next();
+
+ if (nodeId.equals(e.getValue().rmtNodeId)) {
+ it.remove();
+
+ interruptRecevier(e.getValue(),
+ new ClusterTopologyCheckedException("Remote node left the grid. " +
+ "Receiver has been stopped : " + nodeId));
+ }
+ }
+ }
+ }
+ finally {
+ busyLock.readLock().unlock();
+ }
+
for (Map.Entry<Object, ConcurrentMap<UUID, GridCommunicationMessageSet>> e :
msgSetMap.entrySet()) {
ConcurrentMap<UUID, GridCommunicationMessageSet> map = e.getValue();
@@ -933,6 +1114,21 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
evtMgr.removeLocalEventListener(discoLsnr);
stopping = true;
+
+ Set<ReceiverContext> rcvs;
+
+ synchronized (rcvMux) {
+ topicTransmissionHnds.clear();
+
+ rcvs = new HashSet<>(rcvCtxs.values());
+
+ rcvCtxs.clear();
+ }
+
+ for (ReceiverContext rctx : rcvs) {
+ interruptRecevier(rctx, new NodeStoppingException("Local node io manager requested to be stopped: "
+ + ctx.localNodeId()));
+ }
}
finally {
busyLock.writeLock().unlock();
@@ -948,6 +1144,51 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
}
/**
+ * @param rmtNodeId The remote node id.
+ * @param initMsg Message with additional channel params.
+ * @param channel The channel to notify listeners with.
+ */
+ private void onChannelOpened0(UUID rmtNodeId, GridIoMessage initMsg, Channel channel) {
+ Lock busyLock0 = busyLock.readLock();
+
+ busyLock0.lock();
+
+ try {
+ if (stopping) {
+ if (log.isDebugEnabled()) {
+ log.debug("Received communication channel create event while node stopping (will ignore) " +
+ "[rmtNodeId=" + rmtNodeId + ", initMsg=" + initMsg + ']');
+ }
+
+ return;
+ }
+
+ if (initMsg.topic() == null) {
+ int topicOrd = initMsg.topicOrdinal();
+
+ initMsg.topic(topicOrd >= 0 ? GridTopic.fromOrdinal(topicOrd) :
+ U.unmarshal(marsh, initMsg.topicBytes(), U.resolveClassLoader(ctx.config())));
+ }
+
+ byte plc = initMsg.policy();
+
+ pools.poolForPolicy(plc).execute(new Runnable() {
+ @Override public void run() {
+ processOpenedChannel(initMsg.topic(), rmtNodeId, (SessionChannelMessage)initMsg.message(),
+ (SocketChannel)channel);
+ }
+ });
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to process channel creation event due to exception " +
+ "[rmtNodeId=" + rmtNodeId + ", initMsg=" + initMsg + ']' , e);
+ }
+ finally {
+ busyLock0.unlock();
+ }
+ }
+
+ /**
* @param nodeId Node ID.
* @param msg Message bytes.
* @param msgC Closure to call when message processing finished.
@@ -1630,6 +1871,107 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
}
/**
+ * @param remoteId The remote node to connect to.
+ * @param topic The remote topic to connect to.
+ * @return The channel instance to communicate with remote.
+ */
+ public TransmissionSender openTransmissionSender(UUID remoteId, Object topic) {
+ return new TransmissionSender(remoteId, topic);
+ }
+
+ /**
+ * @param topic The {@link GridTopic} to register handler to.
+ * @param hnd Handler which will handle file upload requests.
+ */
+ public void addTransmissionHandler(Object topic, TransmissionHandler hnd) {
+ TransmissionHandler hnd0 = topicTransmissionHnds.putIfAbsent(topic, hnd);
+
+ assert hnd0 == null : "The topic already have an appropriate session handler [topic=" + topic + ']';
+ }
+
+ /**
+ * @param topic The topic to erase handler from.
+ */
+ public void removeTransmissionHandler(Object topic) {
+ ReceiverContext rcvCtx0;
+
+ synchronized (rcvMux) {
+ topicTransmissionHnds.remove(topic);
+
+ rcvCtx0 = rcvCtxs.remove(topic);
+ }
+
+ interruptRecevier(rcvCtx0,
+ new IgniteCheckedException("Receiver has been closed due to removing corresponding transmission handler " +
+ "on local node [nodeId=" + ctx.localNodeId() + ']'));
+ }
+
+ /**
+ * This method must be used prior to opening a {@link TransmissionSender} by calling
+ * {@link #openTransmissionSender(UUID, Object)} to ensure that remote and local nodes
+ * are fully support direct {@link SocketChannel} connection to transfer data.
+ *
+ * @param node Remote node to check.
+ * @return {@code true} if a file can be sent over socket channel directly.
+ */
+ public boolean fileTransmissionSupported(ClusterNode node) {
+ return ((CommunicationSpi)getSpi() instanceof TcpCommunicationSpi) &&
+ nodeSupports(node, CHANNEL_COMMUNICATION);
+ }
+
+ /**
+ * @param nodeId Destination node to connect to.
+ * @param topic Topic to send the request to.
+ * @param initMsg Channel initialization message.
+ * @return Established {@link Channel} to use.
+ * @throws IgniteCheckedException If fails.
+ */
+ private IgniteInternalFuture<Channel> openChannel(
+ UUID nodeId,
+ Object topic,
+ Message initMsg
+ ) throws IgniteCheckedException {
+ assert nodeId != null;
+ assert topic != null;
+ assert !locNodeId.equals(nodeId) : "Channel cannot be opened to the local node itself: " + nodeId;
+ assert (CommunicationSpi)getSpi() instanceof TcpCommunicationSpi : "Only TcpCommunicationSpi supports direct " +
+ "connections between nodes: " + getSpi().getClass();
+
+ ClusterNode node = ctx.discovery().node(nodeId);
+
+ if (node == null)
+ throw new ClusterTopologyCheckedException("Failed to open a new channel to remote node (node left): " + nodeId);
+
+ int topicOrd = topic instanceof GridTopic ? ((Enum<GridTopic>)topic).ordinal() : -1;
+
+ GridIoMessage ioMsg = createGridIoMessage(topic,
+ topicOrd,
+ initMsg,
+ PUBLIC_POOL,
+ false,
+ 0,
+ false);
+
+ try {
+ if (topicOrd < 0)
+ ioMsg.topicBytes(U.marshal(marsh, topic));
+
+ return ((TcpCommunicationSpi)(CommunicationSpi)getSpi()).openChannel(node, ioMsg);
+ }
+ catch (IgniteSpiException e) {
+ if (e.getCause() instanceof ClusterTopologyCheckedException)
+ throw (ClusterTopologyCheckedException)e.getCause();
+
+ if (!ctx.discovery().alive(node))
+ throw new ClusterTopologyCheckedException("Failed to create channel (node left): " + node.id(), e);
+
+ throw new IgniteCheckedException("Failed to create channel (node may have left the grid or " +
+ "TCP connection cannot be established due to unknown issues) " +
+ "[node=" + node + ", topic=" + topic + ']', e);
+ }
+ }
+
+ /**
* @param node Destination node.
* @param topic Topic to send the message to.
* @param topicOrd GridTopic enumeration ordinal.
@@ -2349,6 +2691,288 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
}
/**
+ * @param rctx Receiver context to use.
+ * @param ex Exception to close receiver with.
+ */
+ private void interruptRecevier(ReceiverContext rctx, Exception ex) {
+ if (rctx == null)
+ return;
+
+ if (rctx.interrupted.compareAndSet(false, true)) {
+ if (rctx.timeoutObj != null)
+ ctx.timeout().removeTimeoutObject(rctx.timeoutObj);
+
+ U.closeQuiet(rctx.rcv);
+
+ rctx.lastState = rctx.lastState == null ?
+ new TransmissionMeta(ex) : rctx.lastState.error(ex);
+
+ rctx.hnd.onException(rctx.rmtNodeId, ex);
+
+ U.error(log, "Receiver has been interrupted due to an exception occurred [ctx=" + rctx + ']', ex);
+ }
+ }
+
+ /**
+ * @param topic Topic to which the channel is created.
+ * @param rmtNodeId Remote node id.
+ * @param initMsg Channel initialization message with additional params.
+ * @param ch Channel instance.
+ */
+ private void processOpenedChannel(Object topic, UUID rmtNodeId, SessionChannelMessage initMsg, SocketChannel ch) {
+ ReceiverContext rcvCtx = null;
+ ObjectInputStream in = null;
+ ObjectOutputStream out = null;
+
+ try {
+ if (stopping) {
+ throw new NodeStoppingException("Local node is stopping. Channel will be closed [topic=" + topic +
+ ", channel=" + ch + ']');
+ }
+
+ if (initMsg == null || initMsg.sesId() == null) {
+ U.warn(log, "There is no initial message provied for given topic. Opened channel will be closed " +
+ "[rmtNodeId=" + rmtNodeId + ", topic=" + topic + ", initMsg=" + initMsg + ']');
+
+ return;
+ }
+
+ configureChannel(ch, netTimeoutMs);
+
+ in = new ObjectInputStream(ch.socket().getInputStream());
+ out = new ObjectOutputStream(ch.socket().getOutputStream());
+
+ IgniteUuid newSesId = initMsg.sesId();
+
+ synchronized (rcvMux) {
+ TransmissionHandler hnd = topicTransmissionHnds.get(topic);
+
+ if (hnd == null) {
+ U.warn(log, "There is no handler for a given topic. Channel will be closed [rmtNodeId=" + rmtNodeId +
+ ", topic=" + topic + ']');
+
+ return;
+ }
+
+ rcvCtx = rcvCtxs.computeIfAbsent(topic, t -> new ReceiverContext(rmtNodeId, hnd, newSesId));
+ }
+
+ // Do not allow multiple connections for the same session.
+ if (!newSesId.equals(rcvCtx.sesId)) {
+ IgniteCheckedException err = new IgniteCheckedException("Requested topic is busy by another transmission. " +
+ "It's not allowed to process different sessions over the same topic simultaneously. " +
+ "Channel will be closed [initMsg=" + initMsg + ", channel=" + ch + ", nodeId=" + rmtNodeId + ']');
+
+ U.error(log, err);
+
+ out.writeObject(new TransmissionMeta(err));
+
+ return;
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug("Trasmission open a new channel [rmtNodeId=" + rmtNodeId + ", topic=" + topic +
+ ", initMsg=" + initMsg + ']');
+ }
+
+ if (!rcvCtx.lock.tryLock(netTimeoutMs, TimeUnit.MILLISECONDS))
+ throw new IgniteException("Wait for the previous receiver finished its work timeouted: " + rcvCtx);
+
+ try {
+ if (rcvCtx.timeoutObj != null)
+ ctx.timeout().removeTimeoutObject(rcvCtx.timeoutObj);
+
+ // Send previous context state to sync remote and local node.
+ out.writeObject(rcvCtx.lastState == null ? new TransmissionMeta() : rcvCtx.lastState);
+
+ if (rcvCtx.lastState == null || rcvCtx.lastState.error() == null)
+ receiveFromChannel(topic, rcvCtx, in, out, ch);
+ else
+ interruptRecevier(rcvCtxs.remove(topic), rcvCtx.lastState.error());
+ }
+ finally {
+ rcvCtx.lock.unlock();
+ }
+ }
+ catch (Throwable t) {
+ U.error(log, "Download session cannot be finished due to an unexpected error [ctx=" + rcvCtx + ']', t);
+
+ // Do not remove receiver context here, since sender will recconect to get this error.
+ interruptRecevier(rcvCtx, new IgniteCheckedException("Channel processing error [nodeId=" + rmtNodeId + ']', t));
+ }
+ finally {
+ U.closeQuiet(in);
+ U.closeQuiet(out);
+ U.closeQuiet(ch);
+ }
+ }
+
+ /**
+ * @param topic Topic handler related to.
+ * @param rcvCtx Receiver read context.
+ * @throws NodeStoppingException If processing fails.
+ * @throws InterruptedException If thread interrupted.
+ */
+ private void receiveFromChannel(
+ Object topic,
+ ReceiverContext rcvCtx,
+ ObjectInputStream in,
+ ObjectOutputStream out,
+ ReadableByteChannel ch
+ ) throws NodeStoppingException, InterruptedException {
+ try {
+ while (true) {
+ if (Thread.currentThread().isInterrupted())
+ throw new InterruptedException("The thread has been interrupted. Stop downloading file.");
+
+ if (stopping)
+ throw new NodeStoppingException("Operation has been cancelled (node is stopping)");
+
+ boolean exit = in.readBoolean();
+
+ if (exit) {
+ rcvCtxs.remove(topic);
+
+ break;
+ }
+
+ TransmissionMeta meta = (TransmissionMeta)in.readObject();
+
+ if (rcvCtx.rcv == null) {
+ rcvCtx.rcv = createReceiver(rcvCtx.rmtNodeId,
+ rcvCtx.hnd,
+ meta,
+ () -> stopping || rcvCtx.interrupted.get());
+
+ rcvCtx.lastState = meta;
+ }
+
+ validate(rcvCtx.lastState, meta);
+
+ try {
+ long startTime = U.currentTimeMillis();
+
+ rcvCtx.rcv.receive(ch);
+
+ // Write processing ack.
+ out.writeBoolean(true);
+ out.flush();
+
+ rcvCtx.rcv.close();
+
+ U.log(log, "File has been received " +
+ "[name=" + rcvCtx.rcv.state().name() + ", transferred=" + rcvCtx.rcv.transferred() +
+ ", time=" + (double)((U.currentTimeMillis() - startTime) / 1000) + " sec" +
+ ", rmtId=" + rcvCtx.rmtNodeId + ']');
+
+ rcvCtx.rcv = null;
+ }
+ catch (Throwable e) {
+ rcvCtx.lastState = rcvCtx.rcv.state();
+
+ throw e;
+ }
+ }
+ }
+ catch (ClassNotFoundException e) {
+ throw new IgniteException(e);
+ }
+ catch (IOException e) {
+ // Waiting for re-establishing connection.
+ U.warn(log, "Сonnection from the remote node lost. Will wait for the new one to continue file " +
+ "receive [nodeId=" + rcvCtx.rmtNodeId + ", sesKey=" + rcvCtx.sesId + ']', e);
+
+ long startTs = U.currentTimeMillis();
+
+ boolean added = ctx.timeout().addTimeoutObject(rcvCtx.timeoutObj = new GridTimeoutObject() {
+ @Override public IgniteUuid timeoutId() {
+ return rcvCtx.sesId;
+ }
+
+ @Override public long endTime() {
+ return startTs + netTimeoutMs;
+ }
+
+ @Override public void onTimeout() {
+ interruptRecevier(rcvCtxs.remove(topic), new IgniteCheckedException("Receiver is closed due to " +
+ "waiting for the reconnect has been timeouted"));
+ }
+ });
+
+ assert added;
+ }
+ }
+
+ /**
+ * @param prev Previous available transmission meta.
+ * @param next Next transmission meta.
+ */
+ private void validate(TransmissionMeta prev, TransmissionMeta next) {
+ A.ensure(prev.name().equals(next.name()), "Attempt to load different file " +
+ "[prev=" + prev + ", next=" + next + ']');
+
+ A.ensure(prev.offset() == next.offset(),
+ "The next chunk offest is incorrect [prev=" + prev + ", meta=" + next + ']');
+
+ A.ensure(prev.count() == next.count(), " The count of bytes to transfer for " +
+ "the next chunk is incorrect [prev=" + prev + ", next=" + next + ']');
+
+ A.ensure(prev.policy() == next.policy(), "Attemt to continue file upload with" +
+ " different transmission policy [prev=" + prev + ", next=" + next + ']');
+ }
+
+ /**
+ * @param nodeId Remote node id.
+ * @param hnd Current handler instance which produces file handlers.
+ * @param meta Meta information about file pending to receive to create appropriate receiver.
+ * @param stopChecker Process interrupt checker.
+ * @return Chunk data recevier.
+ */
+ private TransmissionReceiver createReceiver(
+ UUID nodeId,
+ TransmissionHandler hnd,
+ TransmissionMeta meta,
+ BooleanSupplier stopChecker
+ ) {
+ switch (meta.policy()) {
+ case FILE:
+ return new FileReceiver(
+ meta,
+ DFLT_CHUNK_SIZE_BYTES,
+ stopChecker,
+ fileIoFactory,
+ hnd.fileHandler(nodeId, meta),
+ hnd.filePath(nodeId, meta),
+ log);
+
+ case CHUNK:
+ return new ChunkReceiver(
+ meta,
+ ctx.config()
+ .getDataStorageConfiguration()
+ .getPageSize(),
+ stopChecker,
+ hnd.chunkHandler(nodeId, meta),
+ log);
+
+ default:
+ throw new IllegalStateException("The type of transmission policy is unknown. An implementation " +
+ "required: " + meta.policy());
+ }
+ }
+
+ /**
+ * @param channel Socket channel to configure blocking mode.
+ * @param timeout Ignite network configuration timeout.
+ * @throws IOException If fails.
+ */
+ private static void configureChannel(SocketChannel channel, int timeout) throws IOException {
+ // Timeout must be enabled prior to entering the blocking mode to have effect.
+ channel.socket().setSoTimeout(timeout);
+ channel.configureBlocking(true);
+ }
+
+ /**
* Dumps SPI stats to diagnostic logs in case TcpCommunicationSpi is used, no-op otherwise.
*/
public void dumpStats() {
@@ -2369,6 +2993,357 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
}
/**
+ * Read context holds all the information about current transfer read from channel process.
+ */
+ private static class ReceiverContext {
+ /** The remote node input channel came from. */
+ private final UUID rmtNodeId;
+
+ /** Current sesssion handler. */
+ @GridToStringExclude
+ private final TransmissionHandler hnd;
+
+ /** Unique session request id. */
+ private final IgniteUuid sesId;
+
+ /** Flag indicates that current file handling process must be interrupted. */
+ private final AtomicBoolean interrupted = new AtomicBoolean();
+
+ /** Only one thread can handle receiver context. */
+ private final Lock lock = new ReentrantLock();
+
+ /** Last infinished downloading object. */
+ private TransmissionReceiver rcv;
+
+ /** Last saved state about file data processing. */
+ private TransmissionMeta lastState;
+
+ /** Close receiver timeout object. */
+ private GridTimeoutObject timeoutObj;
+
+ /**
+ * @param rmtNodeId Remote node id.
+ * @param hnd Channel handler of current topic.
+ * @param sesId Unique session request id.
+ */
+ public ReceiverContext(UUID rmtNodeId, TransmissionHandler hnd, IgniteUuid sesId) {
+ this.rmtNodeId = rmtNodeId;
+ this.hnd = hnd;
+ this.sesId = sesId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(ReceiverContext.class, this);
+ }
+ }
+
+ /**
+ * Сlass represents an implementation of transmission file writer. Each new instance of transmission sender
+ * will establish a new connection with unique transmission session identifier to the remote node and given
+ * topic (an arbitraty {@link GridTopic} can be used).
+ *
+ * <h2>Zero-copy approach</h2>
+ * <p>
+ * Current implementation of transmission sender is based on file zero-copy approach (the {@link FileSender}
+ * is used under the hood). It is much more efficient than a simple loop that reads data from
+ * given file and writes it to the target socket channel. But if operating system does not support zero-copy
+ * file transfer, sending a file with {@link TransmissionSender} might fail or yield worse performance.
+ * <p>
+ * Please, refer to <a href="http://en.wikipedia.org/wiki/Zero-copy">http://en.wikipedia.org/wiki/Zero-copy</a>
+ * or {@link FileChannel#transferTo(long, long, WritableByteChannel)} for details of such approach.
+ *
+ * <h2>File and Chunk handlers</h2>
+ * <p>
+ * It is possible to choose a file handler prior to sendig the file to remote node within opened transmission
+ * session. There are two types of handlers available:
+ * {@link TransmissionHandler#chunkHandler(UUID, TransmissionMeta)} and
+ * {@link TransmissionHandler#fileHandler(UUID, TransmissionMeta)}. You can use an appropriate
+ * {@link TransmissionPolicy} for {@link #send(File, long, long, Map, TransmissionPolicy)} method to switch
+ * between them.
+ *
+ * <h2>Exceptions handling</h2>
+ * <p>
+ * Each transmission can have two different high-level types of exception which are handled differently:
+ * <ul>
+ * <li><em>transport</em> exception(e.g. some network issues)</li>
+ * <li><em>application</em>\<em>handler</em> level exception</li>
+ * </ul>
+ *
+ * <h3><em>Application</em> exceptions</h3>
+ * <p>
+ * The transmission will be stopped immediately and wrapping <em>IgniteExcpetion</em> thrown in case of
+ * any <em>application</em> exception occured.
+ *
+ * <h3><em>Transport</em> exceptions</h3>
+ * <p>
+ * All transport level exceptions of transmission file sender will require transmission to be reconnected.
+ * For instance, when the local node closes the socket connection in orderly way, but the file is not fully
+ * handled by remote node, the read operation over the same socket endpoint will return <tt>-1</tt>. Such
+ * result will be consideread as an {@link IOException} by handler and it will wait for reestablishing connection
+ * to continue file loading.
+ * <p>
+ * Another example, the transmission sender gets the <em>Connection reset by peer</em> IOException message.
+ * This means that the remote node you are connected to has to reset the connection. This is usually caused by a
+ * high amount of traffic on the host, but may be caused by a server error or the remote node has exhausted
+ * system resources as well. Such {@link IOException} will be considered as <em>reconnection required</em>.
+ *
+ * <h3>Timeout exceptions</h3>
+ * <p>
+ * For read operations over the {@link InputStream} or write operation through the {@link OutputStream}
+ * the {@link Socket#setSoTimeout(int)} will be used and an {@link SocketTimeoutException} will be
+ * thrown when the timeout occured. The default value is taken from {@link IgniteConfiguration#getNetworkTimeout()}.
+ * <p>
+ * If reconnection is not occurred withing configured timeout interval the timeout object will be fired which
+ * clears corresponding to the used topic the {@link ReceiverContext}.
+ *
+ * <h2>Release resources</h2>
+ * <p>
+ * It is important to call <em>close()</em> method or use <em>try-with-resource</em> statement to release
+ * all resources once you've done with sending files.
+ *
+ * @see FileChannel#transferTo(long, long, WritableByteChannel)
+ */
+ public class TransmissionSender implements Closeable {
+ /** Remote node id to connect to. */
+ private final UUID rmtId;
+
+ /** Remote topic to connect to. */
+ private final Object topic;
+
+ /** Current unique session identifier to transfer files to remote node. */
+ private T2<UUID, IgniteUuid> sesKey;
+
+ /** Instance of opened writable channel to work with. */
+ private WritableByteChannel channel;
+
+ /** Decorated with data operations socket of output channel. */
+ private ObjectOutput out;
+
+ /** Decorated with data operations socket of input channel. */
+ private ObjectInput in;
+
+ /**
+ * @param rmtId The remote node to connect to.
+ * @param topic The remote topic to connect to.
+ */
+ public TransmissionSender(
+ UUID rmtId,
+ Object topic
+ ) {
+ this.rmtId = rmtId;
+ this.topic = topic;
+ sesKey = new T2<>(rmtId, IgniteUuid.randomUuid());
+ }
+
+ /**
+ * @return The synchronization meta if case connection has been reset.
+ * @throws IgniteCheckedException If fails.
+ * @throws IOException If fails.
+ */
+ private TransmissionMeta connect() throws IgniteCheckedException, IOException {
+ SocketChannel channel = (SocketChannel)openChannel(rmtId,
+ topic,
+ new SessionChannelMessage(sesKey.get2()))
+ .get();
+
+ configureChannel(channel, netTimeoutMs);
+
+ this.channel = (WritableByteChannel)channel;
+ out = new ObjectOutputStream(channel.socket().getOutputStream());
+ in = new ObjectInputStream(channel.socket().getInputStream());
+
+ TransmissionMeta syncMeta;
+
+ try {
+ // Synchronize state between remote and local nodes.
+ syncMeta = (TransmissionMeta)in.readObject();
+ }
+ catch (ClassNotFoundException e) {
+ throw new IgniteException (e);
+ }
+
+ return syncMeta;
+ }
+
+ /**
+ * @param file Source file to send to remote.
+ * @param params Additional file params.
+ * @param plc The policy of handling data on remote.
+ * @throws IgniteCheckedException If fails.
+ */
+ public void send(
+ File file,
+ Map<String, Serializable> params,
+ TransmissionPolicy plc
+ ) throws IgniteCheckedException, InterruptedException, IOException {
+ send(file, 0, file.length(), params, plc);
+ }
+
+ /**
+ * @param file Source file to send to remote.
+ * @param plc The policy of handling data on remote.
+ * @throws IgniteCheckedException If fails.
+ */
+ public void send(
+ File file,
+ TransmissionPolicy plc
+ ) throws IgniteCheckedException, InterruptedException, IOException {
+ send(file, 0, file.length(), new HashMap<>(), plc);
+ }
+
+ /**
+ * @param file Source file to send to remote.
+ * @param offset Position to start trasfer at.
+ * @param cnt Number of bytes to transfer.
+ * @param params Additional file params.
+ * @param plc The policy of handling data on remote.
+ * @throws IgniteCheckedException If fails.
+ */
+ public void send(
+ File file,
+ long offset,
+ long cnt,
+ Map<String, Serializable> params,
+ TransmissionPolicy plc
+ ) throws IgniteCheckedException, InterruptedException, IOException {
+ long startTime = U.currentTimeMillis();
+ int retries = 0;
+
+ senderStopFlags.putIfAbsent(sesKey, new AtomicBoolean());
+
+ try (FileSender snd = new FileSender(file,
+ offset,
+ cnt,
+ params,
+ plc,
+ () -> stopping || senderStopFlags.get(sesKey).get(),
+ log,
+ fileIoFactory,
+ DFLT_CHUNK_SIZE_BYTES)
+ ) {
+ if (log.isDebugEnabled()) {
+ log.debug("Start writing file to remote node [file=" + file.getName() +
+ ", rmtNodeId=" + rmtId + ", topic=" + topic + ']');
+ }
+
+ while (true) {
+ if (Thread.currentThread().isInterrupted())
+ throw new InterruptedException("The thread has been interrupted. Stop uploading file.");
+
+ if (stopping)
+ throw new NodeStoppingException("Operation has been cancelled (node is stopping)");
+
+ try {
+ TransmissionMeta rcvMeta = null;
+
+ // In/out streams are not null if file has been sent successfully.
+ if (out == null && in == null) {
+ rcvMeta = connect();
+
+ assert rcvMeta != null : "Remote receiver has not sent its meta";
+
+ // Stop in case of any error occurred on remote node during file processing.
+ if (rcvMeta.error() != null)
+ throw rcvMeta.error();
+ }
+
+ snd.send(channel, out, rcvMeta);
+
+ // Read file received acknowledge.
+ boolean written = in.readBoolean();
+
+ assert written : "File is not fully written: " + file.getAbsolutePath();
+
+ break;
+ }
+ catch (IOException e) {
+ closeChannelQuiet();
+
+ retries++;
+
+ if (retries >= retryCnt) {
+ throw new IgniteException("The number of retry attempts to upload file exceeded " +
+ "the limit: " + retryCnt, e);
+ }
+
+ // Re-establish the new connection to continue upload.
+ U.warn(log, "Connection lost while writing a file to remote node and " +
+ "will be reestablished [rmtId=" + rmtId + ", file=" + file.getName() +
+ ", sesKey=" + sesKey + ", retries=" + retries +
+ ", transferred=" + snd.transferred() +
+ ", total=" + snd.state().count() + ']', e);
+ }
+ }
+
+ U.log(log, "File has been sent to remote node [name=" + file.getName() +
+ ", uploadTime=" + (double)((U.currentTimeMillis() - startTime) / 1000) + " sec, retries=" + retries +
+ ", transferred=" + snd.transferred() + ", rmtId=" + rmtId +']');
+
+ }
+ catch (InterruptedException e) {
+ closeChannelQuiet();
+
+ throw e;
+ }
+ catch (IgniteCheckedException e) {
+ closeChannelQuiet();
+
+ throw new IgniteCheckedException("Exception while sending file [rmtId=" + rmtId +
+ ", file=" + file.getName() + ", sesKey=" + sesKey + ", retries=" + retries + ']', e);
+ }
+ catch (Throwable e) {
+ closeChannelQuiet();
+
+ if (stopping)
+ throw new NodeStoppingException("Operation has been cancelled (node is stopping)");
+
+ if (senderStopFlags.get(sesKey).get())
+ throw new ClusterTopologyCheckedException("Remote node left the cluster: " + rmtId, e);
+
+ throw new IgniteException("Unexpected exception while sending file to the remote node. The process stopped " +
+ "[rmtId=" + rmtId + ", file=" + file.getName() + ", sesKey=" + sesKey + ']', e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() throws IOException {
+ try {
+ senderStopFlags.remove(sesKey);
+
+ ObjectOutput out0 = out;
+
+ if (out0 == null)
+ return;
+
+ U.log(log, "Close file writer session: " + sesKey);
+
+ // Send transmission close flag.
+ out0.writeBoolean(true);
+ out0.flush();
+ }
+ catch (IOException e) {
+ U.warn(log, "An exception while writing close session flag occured. " +
+ " Session close operation has been ignored", e);
+ }
+ finally {
+ closeChannelQuiet();
+ }
+ }
+
+ /** Close channel and relese resources. */
+ private void closeChannelQuiet() {
+ U.closeQuiet(out);
+ U.closeQuiet(in);
+ U.closeQuiet(channel);
+
+ out = null;
+ in = null;
+ channel = null;
+ }
+ }
+
+ /**
* Linked chain of listeners.
*/
private static class ArrayListener implements GridMessageListener {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 5d2604d..d8d62d4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -1161,6 +1161,11 @@ public class GridIoMessageFactory implements MessageFactory {
break;
+ case SessionChannelMessage.TYPE_CODE:
+ msg = new SessionChannelMessage();
+
+ break;
+
// [-3..119] [124..129] [-23..-28] [-36..-55] - this
// [120..123] - DR
// [-4..-22, -30..-35] - SQL
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/SessionChannelMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/SessionChannelMessage.java
new file mode 100644
index 0000000..7de2a90
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/SessionChannelMessage.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.communication;
+
+import java.io.Externalizable;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channel;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ * A message with additional {@link Channel} attibutes which is send on connection established and
+ * an appropriate channel is opened.
+ */
+class SessionChannelMessage implements Message {
+ /** Initial channel message type (value is {@code 175}). */
+ public static final short TYPE_CODE = 175;
+
+ /** Serialization version. */
+ private static final long serialVersionUID = 0L;
+
+ /** Channel session unique identifier. */
+ private IgniteUuid sesId;
+
+ /**
+ * No-op constructor to support {@link Externalizable} interface.
+ * This constructor is not meant to be used for other purposes.
+ */
+ public SessionChannelMessage() {
+ // No-op.
+ }
+
+ /**
+ * @param sesId Channel session unique identifier.
+ */
+ public SessionChannelMessage(IgniteUuid sesId) {
+ this.sesId = sesId;
+ }
+
+ /**
+ * @return The unique session id for the channel.
+ */
+ public IgniteUuid sesId() {
+ return sesId;
+ }
+
+ /**
+ * @param sesId The unique session id for the channel.
+ * @return {@code This} for chaining.
+ */
+ public SessionChannelMessage sesId(IgniteUuid sesId) {
+ this.sesId = sesId;
+ return this;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onAckReceived() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 0:
+ if (!writer.writeIgniteUuid("sesId", sesId))
+ return false;
+
+ writer.incrementState();
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ switch (reader.state()) {
+ case 0:
+ sesId = reader.readIgniteUuid("sesId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+ }
+
+ return reader.afterMessageRead(SessionChannelMessage.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return TYPE_CODE;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 1;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(SessionChannelMessage.class, this);
+ }
+
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/TransmissionHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/TransmissionHandler.java
new file mode 100644
index 0000000..a55f1e6
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/TransmissionHandler.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.communication;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.UUID;
+import java.util.function.Consumer;
+
+/**
+ * Class represents a handler for the set of files considered to be transferred from the remote node. This handler
+ * must be registered to and appropriate topic in {@link GridIoManager} prior to opening a new transmission connection
+ * to this topic.
+ * <p>
+ * <em>NOTE:</em> Only one such handler per registered topic is allowed for the communication
+ * manager. Only one thread is allowed for data processing within a single topic.
+ *
+ * <h3>TransmissionPolicy</h3>
+ * <p>
+ * Files from the remote node can be handled of two different ways within a single established connection.
+ * It is up to the sender to decide how the particular file must be processed by the remote node. The
+ * {@link TransmissionPolicy} is used for such purpose. If {@link TransmissionPolicy#FILE} type is received by
+ * remote node the <em>#fileHandler()</em> will be picked up to process this file, the otherwise for the
+ * {@link TransmissionPolicy#CHUNK} the <em>#chunkHandler()</em> will be picked up.
+ */
+public interface TransmissionHandler {
+ /**
+ * @param err The err of fail handling process.
+ */
+ public void onException(UUID nodeId, Throwable err);
+
+ /**
+ * @param nodeId Remote node id from which request has been received.
+ * @param fileMeta File meta info.
+ * @return Absolute pathname denoting a file.
+ */
+ public String filePath(UUID nodeId, TransmissionMeta fileMeta);
+
+ /**
+ * <em>Chunk handler</em> represents by itself the way of input data stream processing.
+ * It accepts within each chunk a {@link ByteBuffer} with data from input for further processing.
+ * Activated when the {@link TransmissionPolicy#CHUNK} policy sent.
+ *
+ * @param nodeId Remote node id from which request has been received.
+ * @param initMeta Initial handler meta info.
+ * @return Instance of chunk handler to process incoming data by chunks.
+ */
+ public Consumer<ByteBuffer> chunkHandler(UUID nodeId, TransmissionMeta initMeta);
+
+ /**
+ * <em>File handler</em> represents by itself the way of input data stream processing. All the data will
+ * be processed under the hood using zero-copy transferring algorithm and only start file processing and
+ * the end of processing will be provided. Activated when the {@link TransmissionPolicy#FILE} policy sent.
+ *
+ * @param nodeId Remote node id from which request has been received.
+ * @param initMeta Initial handler meta info.
+ * @return Intance of read handler to process incoming data like the {@link FileChannel} manner.
+ */
+ public Consumer<File> fileHandler(UUID nodeId, TransmissionMeta initMeta);
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/TransmissionMeta.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/TransmissionMeta.java
new file mode 100644
index 0000000..986bf55
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/TransmissionMeta.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.communication;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.io.Serializable;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * Class represents a file meta information to send to the remote node. Used to initiate a new file transfer
+ * process or to continue the previous unfinished from the last transmitted point.
+ */
+class TransmissionMeta implements Externalizable {
+ /** Serial version uid. */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ * The name to associate particular meta with.
+ * Can be the particular file name, or an a transfer session identifier.
+ */
+ private String name;
+
+ /** Offset of transferred file. */
+ private long offset;
+
+ /** Number of bytes to transfer started from given {@link #offset}. */
+ private long cnt;
+
+ /** Additional file params to transfer (e.g. partition id, partition name etc.). */
+ private Map<String, Serializable> params;
+
+ /** Read policy the way of how particular file will be handled. */
+ private TransmissionPolicy plc;
+
+ /** Last seen error if it has been occurred, or {@code null} the otherwise. */
+ private Exception err;
+
+ /** Default constructor, usually used to create meta to read channel data into. */
+ public TransmissionMeta() {
+ this(null);
+ }
+
+ /**
+ * @param err Last seen error if it has been occurred, or {@code null} the otherwise.
+ */
+ public TransmissionMeta(Exception err) {
+ this("", -1, -1, null, null, err);
+ }
+
+ /**
+ * @param name File name to assoticate particular meta with.
+ * @param offset The start position of file.
+ * @param cnt Number of bytes expected to transfer.
+ * @param params Additional meta params.
+ * @param plc Policy of how file will be handled.
+ * @param err Last seen error if it has been occurred, or {@code null} the otherwise.
+ */
+ public TransmissionMeta(
+ String name,
+ long offset,
+ long cnt,
+ Map<String, Serializable> params,
+ TransmissionPolicy plc,
+ Exception err
+ ) {
+ assert params instanceof Serializable || params == null : params.getClass();
+
+ this.name = name;
+ this.offset = offset;
+ this.cnt = cnt;
+ this.params = params;
+ this.plc = plc;
+ this.err = err;
+ }
+
+ /**
+ * @return File name.
+ */
+ public String name() {
+ assert name != null;
+
+ return name;
+ }
+
+ /**
+ * @return Position to start channel transfer at.
+ */
+ public long offset() {
+ return offset;
+ }
+
+ /**
+ * @return Number of bytes expected to transfer.
+ */
+ public long count() {
+ return cnt;
+ }
+
+ /**
+ * @return Additional params.
+ */
+ public Map<String, Serializable> params() {
+ return params;
+ }
+
+ /**
+ * @return Transmission policy.
+ */
+ public TransmissionPolicy policy() {
+ return plc;
+ }
+
+ /**
+ * @param err An exception instance if it has been previously occurred.
+ * @return {@code This} for chaining.
+ */
+ public TransmissionMeta error(Exception err) {
+ this.err = err;
+
+ return this;
+ }
+
+ /**
+ * @return An exception instance if it has been previously occurred.
+ */
+ public Exception error() {
+ return err;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeUTF(name);
+ out.writeLong(offset);
+ out.writeLong(cnt);
+ out.writeObject(params);
+ out.writeObject(plc);
+ out.writeObject(err);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ name = in.readUTF();
+ offset = in.readLong();
+ cnt = in.readLong();
+ params = (Map)in.readObject();
+ plc = (TransmissionPolicy)in.readObject();
+ err = (Exception)in.readObject();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ TransmissionMeta meta = (TransmissionMeta)o;
+
+ return offset == meta.offset &&
+ cnt == meta.cnt &&
+ name.equals(meta.name) &&
+ Objects.equals(params, meta.params) &&
+ plc == meta.plc &&
+ Objects.equals(err, meta.err);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return Objects.hash(name, offset, cnt, params, plc, err);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(TransmissionMeta.class, this);
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/TransmissionPolicy.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/TransmissionPolicy.java
new file mode 100644
index 0000000..be4396f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/TransmissionPolicy.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.communication;
+
+import java.util.UUID;
+
+/**
+ * Class represents ways of data handling for a file ready to be sent through an opened transmission sender session.
+ * It is necessary to choose which type of handler will be used and how file should be handled prior to sending file
+ * to the remote node.
+ *
+ * @see GridIoManager.TransmissionSender
+ */
+public enum TransmissionPolicy {
+ /**
+ * A file which is considered to be sent through {@link GridIoManager.TransmissionSender}s session will use
+ * the {@link TransmissionHandler#fileHandler(UUID, TransmissionMeta)} of {@link TransmissionHandler}
+ * to handle transmitted binary data.
+ */
+ FILE,
+
+ /**
+ * A file which is considered to be sent through {@link GridIoManager.TransmissionSender}s session will use
+ * the {@link TransmissionHandler#chunkHandler(UUID, TransmissionMeta)} of {@link TransmissionHandler}
+ * to handle transmitted binary data. This file will be processed by chunks of handlers defined size.
+ */
+ CHUNK
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/TransmissionReceiver.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/TransmissionReceiver.java
new file mode 100644
index 0000000..fc72b2f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/TransmissionReceiver.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.communication;
+
+import java.io.IOException;
+import java.nio.channels.ReadableByteChannel;
+import java.util.function.BooleanSupplier;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+
+/**
+ * Class represents a receiver of data which can be pulled from a channel by chunks of
+ * predefined size. Closes when a transmission of represented object ends.
+ */
+abstract class TransmissionReceiver extends AbstractTransmission {
+ /**
+ * @param meta Initial file meta info.
+ * @param stopChecker Node stop or prcoess interrupt checker.
+ * @param log Ignite logger.
+ * @param chunkSize Size of chunks.
+ */
+ protected TransmissionReceiver(
+ TransmissionMeta meta,
+ BooleanSupplier stopChecker,
+ IgniteLogger log,
+ int chunkSize
+ ) {
+ super(meta, stopChecker, log, chunkSize);
+ }
+
+ /**
+ * @param ch Input channel to read data from.
+ * @throws IOException If an io exception occurred.
+ */
+ public void receive(ReadableByteChannel ch) throws IOException, InterruptedException {
+ // Read data from the input.
+ while (hasNextChunk()) {
+ if (Thread.currentThread().isInterrupted())
+ throw new InterruptedException("Recevier has been interrupted");
+
+ if (stopped())
+ throw new IgniteException("Receiver has been cancelled. Channel processing has been stopped.");
+
+ readChunk(ch);
+ }
+
+ assert transferred == meta.count() : "The number of transferred bytes are not as expected " +
+ "[expect=" + meta.count() + ", actual=" + transferred + ']';
+ }
+
+ /**
+ * @param ch Channel to read data from.
+ * @throws IOException If fails.
+ */
+ protected abstract void readChunk(ReadableByteChannel ch) throws IOException;
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIO.java
index 546d1a7..3ecdd22 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIO.java
@@ -20,6 +20,8 @@ package org.apache.ignite.internal.processors.cache.persistence.file;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
/**
* Interface to perform file I/O operations.
@@ -272,4 +274,29 @@ public interface FileIO extends AutoCloseable {
* @see #punchHole
*/
long getSparseSize();
+
+ /**
+ * This method will transfers the content of file to the specified channel. This is a synchronous
+ * operation, so performing it on asynchronous channels makes no sense and not provied.
+ *
+ * @param position The relative offset of the file where the transfer begins from.
+ * @param count The number of bytes to be transferred.
+ * @param target Destination channel of the transfer.
+ * @return Count of bytes which was successfully transferred.
+ * @throws IOException If fails.
+ */
+ public default long transferTo(long position, long count, WritableByteChannel target) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * @param src The source channel.
+ * @param position The position within the file at which the transfer is to begin.
+ * @param count The maximum number of bytes to be transferred.
+ * @return The number of bytes, possibly zero, that were actually transferred.
+ * @throws IOException If fails.
+ */
+ public default long transferFrom(ReadableByteChannel src, long position, long count) throws IOException {
+ throw new UnsupportedOperationException();
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIODecorator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIODecorator.java
index c615a34..dfefd29 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIODecorator.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIODecorator.java
@@ -20,6 +20,8 @@ package org.apache.ignite.internal.processors.cache.persistence.file;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
/**
* Decorator class for File I/O
@@ -120,4 +122,14 @@ public class FileIODecorator extends AbstractFileIO {
@Override public void close() throws IOException {
delegate.close();
}
+
+ /** {@inheritDoc} */
+ @Override public long transferTo(long position, long count, WritableByteChannel target) throws IOException {
+ return delegate.transferTo(position, count, target);
+ }
+
+ /** {@inheritDoc} */
+ @Override public long transferFrom(ReadableByteChannel src, long position, long count) throws IOException {
+ return delegate.transferFrom(src, position, count);
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIO.java
index c6922bc..6689609 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIO.java
@@ -23,6 +23,8 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
import java.nio.file.OpenOption;
import org.apache.ignite.internal.processors.compress.FileSystemUtils;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -147,4 +149,19 @@ public class RandomAccessFileIO extends AbstractFileIO {
@Override public void force() throws IOException {
force(false);
}
+
+ /** {@inheritDoc} */
+ @Override public long transferTo(long position, long count, WritableByteChannel target) throws IOException {
+ return ch.transferTo(position, count, target);
+ }
+
+ /** {@inheritDoc} */
+ @Override public long transferFrom(ReadableByteChannel src, long position, long count) throws IOException {
+ long written = ch.transferFrom(src, position, count);
+
+ if (written > 0)
+ position(position + written);
+
+ return written;
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/crc/FastCrc.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/crc/FastCrc.java
index 0dcbafd..7cbaadb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/crc/FastCrc.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/crc/FastCrc.java
@@ -17,8 +17,13 @@
package org.apache.ignite.internal.processors.cache.persistence.wal.crc;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.zip.CRC32;
+import java.util.zip.CheckedInputStream;
/**
* This CRC calculation implementation workf much faster then {@link PureJavaCrc32}
@@ -81,6 +86,26 @@ public final class FastCrc {
}
/**
+ * @param file A file to calculate checksum over it.
+ * @return CRC32 checksum.
+ * @throws IOException If fails.
+ */
+ public static int calcCrc(File file) throws IOException {
+ assert !file.isDirectory() : "CRC32 can't be calculated over directories";
+
+ CRC32 algo = new CRC32();
+
+ try (InputStream in = new CheckedInputStream(new FileInputStream(file), algo)) {
+ byte[] buf = new byte[1024];
+
+ while (in.read(buf) != -1)
+ ;
+ }
+
+ return ~(int)algo.getValue();
+ }
+
+ /**
* @param crcAlgo CRC algorithm.
* @param buf Input buffer.
* @param len Buffer length.
@@ -96,6 +121,6 @@ public final class FastCrc {
buf.limit(initLimit);
- return (int)crcAlgo.getValue() ^ 0xFFFFFFFF;
+ return ~(int)crcAlgo.getValue();
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index 4584c87..4d6dfb8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -2684,13 +2684,27 @@ public class GridNioServer<T> {
}
/**
+ * @param ses Session to be closed.
+ * @param e Exception to be passed to the listener, if any.
+ * @return {@code True} if this call closed the ses.
+ */
+ protected boolean close(final GridSelectorNioSessionImpl ses, @Nullable final IgniteCheckedException e) {
+ return close(ses, e, ses.closeSocketOnSessionClose());
+ }
+
+ /**
* Closes the session and all associated resources, then notifies the listener.
*
* @param ses Session to be closed.
* @param e Exception to be passed to the listener, if any.
+ * @param closeSock If {@code True} the channel will be closed.
* @return {@code True} if this call closed the ses.
*/
- protected boolean close(final GridSelectorNioSessionImpl ses, @Nullable final IgniteCheckedException e) {
+ protected boolean close(
+ final GridSelectorNioSessionImpl ses,
+ @Nullable final IgniteCheckedException e,
+ boolean closeSock
+ ) {
if (e != null) {
// Print stack trace only if has runtime exception in it's cause.
if (e.hasCause(IOException.class))
@@ -2714,7 +2728,10 @@ public class GridNioServer<T> {
GridUnsafe.cleanDirectBuffer(ses.readBuffer());
}
- closeKey(ses.key());
+ if (closeSock)
+ closeKey(ses.key());
+ else
+ ses.key().cancel(); // Unbind socket to the current SelectionKey.
if (e != null)
filterChain.onExceptionCaught(ses, e);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
index 2739299..ac2fc61 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
@@ -39,7 +39,7 @@ import org.jetbrains.annotations.Nullable;
* Note that this implementation requires non-null values for local and remote
* socket addresses.
*/
-class GridSelectorNioSessionImpl extends GridNioSessionImpl implements GridNioKeyAttachment {
+public class GridSelectorNioSessionImpl extends GridNioSessionImpl implements GridNioKeyAttachment {
/** Pending write requests. */
private final FastSizeDeque<SessionWriteRequest> queue = new FastSizeDeque<>(new ConcurrentLinkedDeque<>());
@@ -78,6 +78,9 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl implements GridNioKe
/** */
private Object sysMsg;
+ /** Close channel on session #close() called. */
+ private volatile boolean closeSocket = true;
+
/**
* Creates session instance.
*
@@ -176,11 +179,25 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl implements GridNioKe
/**
* @return Registered selection key for this session.
*/
- SelectionKey key() {
+ public SelectionKey key() {
return key;
}
/**
+ * @return {@code True} to close SocketChannel on current session close occured.
+ */
+ public boolean closeSocketOnSessionClose() {
+ return closeSocket;
+ }
+
+ /**
+ * @param closeSocket {@code False} remain SocketChannel open on session close.
+ */
+ public void closeSocketOnSessionClose(boolean closeSocket) {
+ this.closeSocket = closeSocket;
+ }
+
+ /**
* @param from Current session worker.
* @param fut Move future.
* @return {@code True} if session move was scheduled.
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 0c99051..569798e 100755
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -27,9 +27,9 @@ import java.net.SocketAddress;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
+import java.nio.channels.Channel;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SocketChannel;
-import java.nio.channels.spi.AbstractInterruptibleChannel;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
@@ -49,6 +49,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLException;
import org.apache.ignite.Ignite;
@@ -98,6 +99,7 @@ import org.apache.ignite.internal.util.nio.GridNioServerListener;
import org.apache.ignite.internal.util.nio.GridNioServerListenerAdapter;
import org.apache.ignite.internal.util.nio.GridNioSession;
import org.apache.ignite.internal.util.nio.GridNioSessionMetaKey;
+import org.apache.ignite.internal.util.nio.GridSelectorNioSessionImpl;
import org.apache.ignite.internal.util.nio.GridShmemCommunicationClient;
import org.apache.ignite.internal.util.nio.GridTcpNioCommunicationClient;
import org.apache.ignite.internal.util.nio.ssl.BlockingSslHandler;
@@ -143,6 +145,7 @@ import org.apache.ignite.spi.IgniteSpiTimeoutObject;
import org.apache.ignite.spi.TimeoutStrategy;
import org.apache.ignite.spi.communication.CommunicationListener;
import org.apache.ignite.spi.communication.CommunicationSpi;
+import org.apache.ignite.spi.communication.tcp.internal.CommunicationListenerEx;
import org.apache.ignite.spi.communication.tcp.internal.ConnectionKey;
import org.apache.ignite.spi.communication.tcp.internal.HandshakeException;
import org.apache.ignite.spi.communication.tcp.internal.TcpCommunicationConnectionCheckFuture;
@@ -161,6 +164,8 @@ import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR;
import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION;
+import static org.apache.ignite.internal.IgniteFeatures.CHANNEL_COMMUNICATION;
+import static org.apache.ignite.internal.IgniteFeatures.nodeSupports;
import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.SSL_META;
import static org.apache.ignite.plugin.extensions.communication.Message.DIRECT_TYPE_SIZE;
import static org.apache.ignite.spi.communication.tcp.internal.TcpCommunicationConnectionCheckFuture.SES_FUT_META;
@@ -352,6 +357,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
/** Message tracker meta for session. */
private static final int TRACKER_META = GridNioSessionMetaKey.nextUniqueKey();
+ /** Channel meta used for establishing channel connections. */
+ private static final int CHANNEL_FUT_META = GridNioSessionMetaKey.nextUniqueKey();
+
/**
* Default local port range (value is <tt>100</tt>).
* See {@link #setLocalPortRange(int)} for details.
@@ -373,6 +381,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
/** Default connections per node. */
public static final int DFLT_CONN_PER_NODE = 1;
+ /** Maximum {@link GridNioSession} connections per node. */
+ public static final int MAX_CONN_PER_NODE = 1024;
+
/** No-op runnable. */
private static final IgniteRunnable NOOP = new IgniteRunnable() {
@Override public void run() {
@@ -398,6 +409,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
/** */
private ConnectionPolicy connPlc = new FirstConnectionPolicy();
+ /** Channel connection index provider. */
+ private ConnectionPolicy chConnPlc;
+
/** */
private boolean enableForcibleNodeKill = IgniteSystemProperties
.getBoolean(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL);
@@ -739,6 +753,44 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
}
}
+ private void onChannelCreate(
+ GridSelectorNioSessionImpl ses,
+ ConnectionKey connKey,
+ Message msg
+ ) {
+ cleanupLocalNodeRecoveryDescriptor(connKey);
+
+ ses.send(msg)
+ .listen(sendFut -> {
+ if (sendFut.error() != null) {
+ U.error(log, "Fail to send channel creation response to the remote node. " +
+ "Session will be closed [nodeId=" + connKey.nodeId() +
+ ", idx=" + connKey.connectionIndex() + ']', sendFut.error());
+
+ ses.close();
+
+ return;
+ }
+
+ ses.closeSocketOnSessionClose(false);
+
+ // Close session and send response.
+ ses.close().listen(closeFut -> {
+ if (closeFut.error() != null) {
+ U.error(log, "Nio session has not been properly closed " +
+ "[nodeId=" + connKey.nodeId() + ", idx=" + connKey.connectionIndex() + ']',
+ closeFut.error());
+
+ U.closeQuiet(ses.key().channel());
+
+ return;
+ }
+
+ notifyChannelEvtListener(connKey.nodeId(), ses.key().channel(), msg);
+ });
+ });
+ }
+
@Override public void onMessage(final GridNioSession ses, Message msg) {
ConnectionKey connKey = ses.meta(CONN_IDX_META);
@@ -766,6 +818,29 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
}
}
else {
+ if (isChannelConnIdx(connKey.connectionIndex())) {
+ if (ses.meta(CHANNEL_FUT_META) == null)
+ onChannelCreate((GridSelectorNioSessionImpl)ses, connKey, msg);
+ else {
+ GridFutureAdapter<Channel> fut = ses.meta(CHANNEL_FUT_META);
+ GridSelectorNioSessionImpl ses0 = (GridSelectorNioSessionImpl)ses;
+
+ ses0.closeSocketOnSessionClose(false);
+
+ ses0.close().listen(f -> {
+ if (f.error() != null) {
+ fut.onDone(f.error());
+
+ return;
+ }
+
+ fut.onDone(ses0.key().channel());
+ });
+ }
+
+ return;
+ }
+
if (msg instanceof RecoveryLastReceivedMessage) {
metricsLsnr.onMessageReceived(msg, connKey.nodeId());
@@ -2122,7 +2197,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
assertParameter(shmemPort > 0 || shmemPort == -1, "shmemPort > 0 || shmemPort == -1");
assertParameter(selectorsCnt > 0, "selectorsCnt > 0");
assertParameter(connectionsPerNode > 0, "connectionsPerNode > 0");
- assertParameter(connectionsPerNode <= 1024, "connectionsPerNode <= 1024");
+ assertParameter(connectionsPerNode <= MAX_CONN_PER_NODE, "connectionsPerNode <= 1024");
if (!failureDetectionTimeoutEnabled()) {
assertParameter(reconCnt > 0, "reconnectCnt > 0");
@@ -2147,6 +2222,15 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
else
connPlc = new FirstConnectionPolicy();
+ chConnPlc = new ConnectionPolicy() {
+ /** Sequential connection index provider. */
+ private final AtomicInteger chIdx = new AtomicInteger(MAX_CONN_PER_NODE + 1);
+
+ @Override public int connectionIndex() {
+ return chIdx.incrementAndGet();
+ }
+ };
+
try {
locHost = U.resolveLocalHost(locAddr);
}
@@ -3956,6 +4040,21 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
}
/**
+ * @param nodeId The remote node id.
+ * @param channel The configured channel to notify listeners with.
+ * @param initMsg Channel initialization message with additional channel params.
+ */
+ private void notifyChannelEvtListener(UUID nodeId, Channel channel, Message initMsg) {
+ if (log.isDebugEnabled())
+ log.debug("Notify appropriate listeners due to a new channel opened: " + channel);
+
+ CommunicationListener<Message> lsnr0 = lsnr;
+
+ if (lsnr0 instanceof CommunicationListenerEx)
+ ((CommunicationListenerEx<Message>)lsnr0).onChannelOpened(nodeId, initMsg, channel);
+ }
+
+ /**
* @param target Target buffer to append to.
* @param src Source buffer to get data.
* @return Original or expanded buffer.
@@ -4041,6 +4140,20 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
}
/**
+ * @param key The connection key to cleanup descriptors on local node.
+ */
+ private void cleanupLocalNodeRecoveryDescriptor(ConnectionKey key) {
+ ClusterNode node = getLocalNode();
+
+ if (usePairedConnections(node)) {
+ inRecDescs.remove(key);
+ outRecDescs.remove(key);
+ }
+ else
+ recoveryDescs.remove(key);
+ }
+
+ /**
* @param recoveryDescs Descriptors map.
* @param pairedConnections {@code True} if in/out connections pair is used for communication with node.
* @param node Node.
@@ -4233,6 +4346,90 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
}
/**
+ * @param remote Destination cluster node to communicate with.
+ * @param initMsg Configuration channel attributes wrapped into the message.
+ * @return The future, which will be finished on channel ready.
+ * @throws IgniteSpiException If fails.
+ */
+ public IgniteInternalFuture<Channel> openChannel(
+ ClusterNode remote,
+ Message initMsg
+ ) throws IgniteSpiException {
+ assert !remote.isLocal() : remote;
+ assert initMsg != null;
+ assert chConnPlc != null;
+ assert nodeSupports(remote, CHANNEL_COMMUNICATION) : "Node doesn't support direct connection over socket channel " +
+ "[nodeId=" + remote.id() + ']';
+
+ ConnectionKey key = new ConnectionKey(remote.id(), chConnPlc.connectionIndex());
+
+ GridFutureAdapter<Channel> chFut = new GridFutureAdapter<>();
+
+ connectGate.enter();
+
+ try {
+ GridNioSession ses = createNioSession(remote, key.connectionIndex());
+
+ assert ses != null : "Session must be established [remoteId=" + remote.id() + ", key=" + key + ']';
+
+ cleanupLocalNodeRecoveryDescriptor(key);
+ ses.addMeta(CHANNEL_FUT_META, chFut);
+
+ // Send configuration message over the created session.
+ ses.send(initMsg)
+ .listen(f -> {
+ if (f.error() != null) {
+ GridFutureAdapter<Channel> rq = ses.meta(CHANNEL_FUT_META);
+
+ assert rq != null;
+
+ rq.onDone(f.error());
+
+ ses.close();
+
+ return;
+ }
+
+ addTimeoutObject(new IgniteSpiTimeoutObject() {
+ @Override public IgniteUuid id() {
+ return IgniteUuid.randomUuid();
+ }
+
+ @Override public long endTime() {
+ return U.currentTimeMillis() + connTimeout;
+ }
+
+ @Override public void onTimeout() {
+ // Close session if request not complete yet.
+ GridFutureAdapter<Channel> rq = ses.meta(CHANNEL_FUT_META);
+
+ assert rq != null;
+
+ if (rq.onDone(handshakeTimeoutException()))
+ ses.close();
+ }
+ });
+ });
+
+ return chFut;
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteSpiException("Unable to create new channel connection to the remote node: " + remote, e);
+ }
+ finally {
+ connectGate.leave();
+ }
+ }
+
+ /**
+ * @param connIdx Connection index to check.
+ * @return {@code true} if connection index is related to the channel create request\response.
+ */
+ private boolean isChannelConnIdx(int connIdx) {
+ return connIdx > MAX_CONN_PER_NODE;
+ }
+
+ /**
*
*/
private class DiscoveryListener implements GridLocalEventListener, HighPriorityListener {
@@ -4679,7 +4876,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
if (obj instanceof GridCommunicationClient)
((GridCommunicationClient)obj).forceClose();
else
- U.closeQuiet((AbstractInterruptibleChannel)obj);
+ U.closeQuiet((AutoCloseable)obj);
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/CommunicationListenerEx.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/CommunicationListenerEx.java
new file mode 100644
index 0000000..f6022c2
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/CommunicationListenerEx.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp.internal;
+
+import java.io.Serializable;
+import java.nio.channels.Channel;
+import java.util.UUID;
+import org.apache.ignite.spi.communication.CommunicationListener;
+
+/**
+ * Extended communication SPI listener to provide {@link Channel} opened events.
+ */
+public interface CommunicationListenerEx<T extends Serializable> extends CommunicationListener<T> {
+ /**
+ * @param rmtNodeId Remote node id.
+ * @param initMsg Init channel message.
+ * @param channel Locally created channel endpoint.
+ */
+ public default void onChannelOpened(UUID rmtNodeId, T initMsg, Channel channel) {
+ // No-op.
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ConnectionKey.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ConnectionKey.java
index 0559df7..a107c87 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ConnectionKey.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ConnectionKey.java
@@ -38,6 +38,14 @@ public class ConnectionKey {
private final boolean dummy;
/**
+ * @param nodeId Node ID. Should be not null.
+ * @param idx Connection index.
+ */
+ public ConnectionKey(@NotNull UUID nodeId, int idx) {
+ this(nodeId, idx, -1, false);
+ }
+
+ /**
* Creates ConnectionKey with false value of dummy flag.
*
* @param nodeId Node ID. Should be not null.
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerFileTransmissionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerFileTransmissionSelfTest.java
new file mode 100644
index 0000000..b4254d8
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerFileTransmissionSelfTest.java
@@ -0,0 +1,981 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.communication;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.Channel;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.nio.file.OpenOption;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.function.Consumer;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.GridTopic;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.NodeStoppingException;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIODecorator;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
+import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.wal.crc.FastCrc;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.FILE_SUFFIX;
+import static org.apache.ignite.internal.util.IgniteUtils.fileCount;
+import static org.apache.ignite.testframework.GridTestUtils.setFieldValue;
+
+/**
+ * Test file transmission manager operations.
+ */
+public class GridIoManagerFileTransmissionSelfTest extends GridCommonAbstractTest {
+ /** Number of cache keys to generate. */
+ private static final long CACHE_SIZE = 50_000L;
+
+ /** Network timeout in ms. */
+ private static final long NET_TIMEOUT_MS = 2000L;
+
+ /** Temporary directory to store files. */
+ private static final String TEMP_FILES_DIR = "ctmp";
+
+ /** Factory to produce IO interfaces over files to transmit. */
+ private static final FileIOFactory IO_FACTORY = new RandomAccessFileIOFactory();
+
+ /** The topic to send files to. */
+ private static Object topic;
+
+ /** File filter. */
+ private static FilenameFilter fileBinFilter;
+
+ /** Locally used fileIo to interact with output file. */
+ private final FileIO[] fileIo = new FileIO[1];
+
+ /** The temporary directory to store files. */
+ private File tempStore;
+
+ /** Ignite instance which receives files. */
+ private IgniteEx rcv;
+
+ /** Ignite instance which sends files. */
+ private IgniteEx snd;
+
+ /** Called before tests started. */
+ @BeforeClass
+ public static void beforeAll() {
+ topic = GridTopic.TOPIC_CACHE.topic("test", 0);
+
+ fileBinFilter = new FilenameFilter() {
+ @Override public boolean accept(File dir, String name) {
+ return name.endsWith(FILE_SUFFIX);
+ }
+ };
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ @Before
+ public void before() throws Exception {
+ cleanPersistenceDir();
+
+ tempStore = U.resolveWorkDirectory(U.defaultWorkDirectory(), TEMP_FILES_DIR, true);
+ }
+
+ /** Called after test run. */
+ @After
+ public void after() {
+ try {
+ ensureResourcesFree(snd);
+ ensureResourcesFree(rcv);
+ }
+ finally {
+ stopAllGrids();
+
+ U.closeQuiet(fileIo[0]);
+
+ snd = null;
+ rcv = null;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ return super.getConfiguration(igniteInstanceName)
+ .setDataStorageConfiguration(new DataStorageConfiguration()
+ .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+ .setPersistenceEnabled(true)
+ .setMaxSize(500L * 1024 * 1024)))
+ .setCacheConfiguration(new CacheConfiguration<Integer, Integer>(DEFAULT_CACHE_NAME))
+ .setCommunicationSpi(new BlockingOpenChannelCommunicationSpi())
+ .setNetworkTimeout(NET_TIMEOUT_MS);
+ }
+
+ /**
+ * Transmit all cache partition to particular topic on the remote node.
+ *
+ * @throws Exception If fails.
+ */
+ @Test
+ public void testFileHandlerBase() throws Exception {
+ snd = startGrid(0);
+ rcv = startGrid(1);
+
+ snd.cluster().active(true);
+
+ addCacheData(snd, DEFAULT_CACHE_NAME);
+
+ awaitPartitionMapExchange();
+
+ Map<String, Long> fileSizes = new HashMap<>();
+ Map<String, Integer> fileCrcs = new HashMap<>();
+ Map<String, Serializable> fileParams = new HashMap<>();
+
+ assertTrue(snd.context().io().fileTransmissionSupported(rcv.localNode()));
+
+ rcv.context().io().addTransmissionHandler(topic, new TransmissionHandlerAdapter() {
+ @Override public String filePath(UUID nodeId, TransmissionMeta fileMeta) {
+ return new File(tempStore, fileMeta.name()).getAbsolutePath();
+ }
+
+ @Override public Consumer<File> fileHandler(UUID nodeId, TransmissionMeta initMeta) {
+ return new Consumer<File>() {
+ @Override public void accept(File file) {
+ assertTrue(fileSizes.containsKey(file.getName()));
+ // Save all params.
+ fileParams.putAll(initMeta.params());
+ }
+ };
+ }
+ });
+
+ File cacheDirIg0 = cacheWorkDir(snd, DEFAULT_CACHE_NAME);
+
+ File[] cacheParts = cacheDirIg0.listFiles(fileBinFilter);
+
+ for (File file : cacheParts) {
+ fileSizes.put(file.getName(), file.length());
+ fileCrcs.put(file.getName(), FastCrc.calcCrc(file));
+ }
+
+ try (GridIoManager.TransmissionSender sender = snd.context()
+ .io()
+ .openTransmissionSender(rcv.localNode().id(), topic)) {
+ // Iterate over cache partition cacheParts.
+ for (File file : cacheParts) {
+ Map<String, Serializable> params = new HashMap<>();
+
+ params.put(file.getName(), file.hashCode());
+
+ sender.send(file, params, TransmissionPolicy.FILE);
+ }
+ }
+
+ stopAllGrids();
+
+ assertEquals(fileSizes.size(), tempStore.listFiles(fileBinFilter).length);
+
+ for (File file : cacheParts) {
+ // Check received file lengths.
+ assertEquals("Received the file length is incorrect: " + file.getName(),
+ fileSizes.get(file.getName()), new Long(file.length()));
+
+ // Check received params.
+ assertEquals("File additional parameters are not fully transmitted",
+ fileParams.get(file.getName()), file.hashCode());
+ }
+
+ // Check received file CRCs.
+ for (File file : tempStore.listFiles(fileBinFilter)) {
+ assertEquals("Received file CRC-32 checksum is incorrect: " + file.getName(),
+ fileCrcs.get(file.getName()), new Integer(FastCrc.calcCrc(file)));
+ }
+ }
+
+ /**
+ * @throws Exception If fails.
+ */
+ @Test(expected = IgniteCheckedException.class)
+ public void testFileHandlerFilePathThrowsEx() throws Exception {
+ final String exTestMessage = "Test exception. Handler initialization failed at onBegin.";
+
+ snd = startGrid(0);
+ rcv = startGrid(1);
+
+ snd.cluster().active(true);
+
+ File fileToSend = createFileRandomData("1Mb", 1024 * 1024);
+
+ rcv.context().io().addTransmissionHandler(topic, new DefaultTransmissionHandler(rcv, fileToSend, tempStore) {
+ @Override public String filePath(UUID nodeId, TransmissionMeta fileMeta) {
+ throw new IgniteException(exTestMessage);
+ }
+
+ @Override public Consumer<File> fileHandler(UUID nodeId, TransmissionMeta initMeta) {
+ fail("fileHandler must never be called");
+
+ return super.fileHandler(nodeId, initMeta);
+ }
+
+ @Override public Consumer<ByteBuffer> chunkHandler(UUID nodeId, TransmissionMeta initMeta) {
+ fail("chunkHandler must never be called");
+
+ return super.chunkHandler(nodeId, initMeta);
+ }
+
+ @Override public void onException(UUID nodeId, Throwable err) {
+ assertEquals(exTestMessage, err.getMessage());
+ }
+ });
+
+ try (GridIoManager.TransmissionSender sender = snd.context()
+ .io()
+ .openTransmissionSender(rcv.localNode().id(), topic)) {
+ sender.send(fileToSend, TransmissionPolicy.FILE);
+ }
+ }
+
+ /**
+ * @throws Exception If fails.
+ */
+ @Test(expected = IgniteCheckedException.class)
+ public void testFileHandlerOnReceiverLeft() throws Exception {
+ final int fileSizeBytes = 5 * 1024 * 1024;
+ final AtomicInteger chunksCnt = new AtomicInteger();
+
+ snd = startGrid(0);
+ rcv = startGrid(1);
+
+ snd.cluster().active(true);
+
+ File fileToSend = createFileRandomData("testFile", fileSizeBytes);
+
+ transmissionFileIoFactory(snd, new FileIOFactory() {
+ @Override public FileIO create(File file, OpenOption... modes) throws IOException {
+ FileIO fileIo = IO_FACTORY.create(file, modes);
+
+ // Blocking writer and stopping node FileIo.
+ return new FileIODecorator(fileIo) {
+ /** {@inheritDoc} */
+ @Override public long transferTo(long position, long count, WritableByteChannel target)
+ throws IOException {
+ // Send 5 chunks than stop the rcv.
+ if (chunksCnt.incrementAndGet() == 5)
+ stopGrid(rcv.name(), true);
+
+ return super.transferTo(position, count, target);
+ }
+ };
+ }
+ });
+
+ rcv.context().io().addTransmissionHandler(topic, new DefaultTransmissionHandler(rcv, fileToSend, tempStore));
+
+ try (GridIoManager.TransmissionSender sender = snd.context()
+ .io()
+ .openTransmissionSender(rcv.localNode().id(), topic)) {
+ sender.send(fileToSend, TransmissionPolicy.FILE);
+ }
+ }
+
+ /**
+ * @throws Exception If fails.
+ */
+ @Test
+ public void tesFileHandlerReconnectTimeouted() throws Exception {
+ rcv = startGrid(1);
+ snd = startGrid(0);
+
+ final AtomicInteger chunksCnt = new AtomicInteger();
+ final CountDownLatch sndLatch = ((BlockingOpenChannelCommunicationSpi)snd.context()
+ .config()
+ .getCommunicationSpi()).latch;
+ final AtomicReference<Throwable> refErr = new AtomicReference<>();
+
+ snd.cluster().active(true);
+
+ File fileToSend = createFileRandomData("testFile", 5 * 1024 * 1024);
+
+ transmissionFileIoFactory(snd, new FileIOFactory() {
+ @Override public FileIO create(File file, OpenOption... modes) throws IOException {
+ FileIO fileIo = IO_FACTORY.create(file, modes);
+
+ return new FileIODecorator(fileIo) {
+ /** {@inheritDoc} */
+ @Override public long transferTo(long position, long count, WritableByteChannel target)
+ throws IOException {
+ if (chunksCnt.incrementAndGet() == 10) {
+ target.close();
+
+ ((BlockingOpenChannelCommunicationSpi)snd.context()
+ .config()
+ .getCommunicationSpi()).block = true;
+ }
+
+ return super.transferTo(position, count, target);
+ }
+ };
+ }
+ });
+
+ rcv.context().io().addTransmissionHandler(topic, new DefaultTransmissionHandler(rcv, fileToSend, tempStore) {
+ @Override public void onException(UUID nodeId, Throwable err) {
+ refErr.compareAndSet(null, err);
+
+ sndLatch.countDown();
+ }
+ });
+
+ try (GridIoManager.TransmissionSender sender = snd.context()
+ .io()
+ .openTransmissionSender(rcv.localNode().id(), topic)) {
+ sender.send(fileToSend, TransmissionPolicy.FILE);
+ }
+ catch (IgniteCheckedException | IOException | InterruptedException e) {
+ // Ignore err.
+ U.warn(log, e);
+ }
+
+ assertNotNull("Timeout exception not occurred", refErr.get());
+ assertEquals("Type of timeout exception incorrect: " + refErr.get(),
+ IgniteCheckedException.class,
+ refErr.get().getClass());
+ }
+
+ /**
+ * @throws Exception If fails.
+ */
+ @Test
+ public void testFileHandlerCleanedUpIfSenderLeft() throws Exception {
+ snd = startGrid(0);
+ rcv = startGrid(1);
+
+ snd.cluster().active(true);
+
+ File fileToSend = createFileRandomData("tempFile15Mb", 15 * 1024 * 1024);
+ File downloadTo = U.resolveWorkDirectory(tempStore.getAbsolutePath(), "download", true);
+
+ transmissionFileIoFactory(snd, new FileIOFactory() {
+ @Override public FileIO create(File file, OpenOption... modes) throws IOException {
+ FileIO fileIo = IO_FACTORY.create(file, modes);
+
+ return new FileIODecorator(fileIo) {
+ /** {@inheritDoc} */
+ @Override public long transferTo(long position, long count, WritableByteChannel target)
+ throws IOException {
+
+ long transferred = super.transferTo(position, count, target);
+
+ stopGrid(snd.name(), true);
+
+ return transferred;
+ }
+ };
+ }
+ });
+
+ rcv.context().io().addTransmissionHandler(topic, new DefaultTransmissionHandler(rcv, fileToSend, tempStore){
+ /** {@inheritDoc} */
+ @Override public String filePath(UUID nodeId, TransmissionMeta fileMeta) {
+ return new File(downloadTo, fileMeta.name()).getAbsolutePath();
+ }
+ });
+
+ Exception err = null;
+
+ try (GridIoManager.TransmissionSender sender = snd.context()
+ .io()
+ .openTransmissionSender(rcv.localNode().id(), topic)) {
+ sender.send(fileToSend, TransmissionPolicy.FILE);
+ }
+ catch (Exception e) {
+ // Ignore node stopping exception.
+ err = e;
+ }
+
+ assertEquals(NodeStoppingException.class, err.getClass());
+ assertEquals("Incomplete resources must be cleaned up on sender left",
+ 0,
+ fileCount(downloadTo.toPath()));
+ }
+
+ /**
+ * @throws Exception If fails.
+ */
+ @Test(expected = IgniteCheckedException.class)
+ public void testFileHandlerReconnectOnReadFail() throws Exception {
+ final String chunkDownloadExMsg = "Test exception. Chunk processing error.";
+
+ snd = startGrid(0);
+ rcv = startGrid(1);
+
+ snd.cluster().active(true);
+
+ File fileToSend = createFileRandomData("testFile", 5 * 1024 * 1024);
+ final AtomicInteger readChunks = new AtomicInteger();
+
+ transmissionFileIoFactory(rcv, new FileIOFactory() {
+ @Override public FileIO create(File file, OpenOption... modes) throws IOException {
+ fileIo[0] = IO_FACTORY.create(file, modes);
+
+ // Blocking writer and stopping node FileIo.
+ return new FileIODecorator(fileIo[0]) {
+ @Override public long transferFrom(ReadableByteChannel src, long position, long count)
+ throws IOException {
+ // Read 4 chunks than throw an exception to emulate error processing.
+ if (readChunks.incrementAndGet() == 4)
+ throw new IgniteException(chunkDownloadExMsg);
+
+ return super.transferFrom(src, position, count);
+ }
+ };
+ }
+ });
+
+ rcv.context().io().addTransmissionHandler(topic, new DefaultTransmissionHandler(rcv, fileToSend, tempStore) {
+ @Override public void onException(UUID nodeId, Throwable err) {
+ assertEquals(chunkDownloadExMsg, err.getMessage());
+ }
+ });
+
+ try (GridIoManager.TransmissionSender sender = snd.context()
+ .io()
+ .openTransmissionSender(rcv.localNode().id(), topic)) {
+ sender.send(fileToSend, TransmissionPolicy.FILE);
+ }
+ }
+
+ /**
+ * @throws Exception If fails.
+ */
+ @Test(expected = IgniteCheckedException.class)
+ public void testFileHandlerSenderStoppedIfReceiverInitFail() throws Exception {
+ final int fileSizeBytes = 5 * 1024 * 1024;
+ final AtomicBoolean throwFirstTime = new AtomicBoolean();
+
+ snd = startGrid(0);
+ rcv = startGrid(1);
+
+ snd.cluster().active(true);
+
+ File fileToSend = createFileRandomData("testFile", fileSizeBytes);
+ File rcvFile = new File(tempStore, "testFile" + "_" + rcv.localNode().id());
+
+ rcv.context().io().addTransmissionHandler(topic, new DefaultTransmissionHandler(rcv, fileToSend, tempStore) {
+ @Override public String filePath(UUID nodeId, TransmissionMeta fileMeta) {
+ if (throwFirstTime.compareAndSet(false, true))
+ throw new IgniteException("Test exception. Initialization fail.");
+
+ return rcvFile.getAbsolutePath();
+ }
+ });
+
+ try (GridIoManager.TransmissionSender sender = snd.context()
+ .io()
+ .openTransmissionSender(rcv.localNode().id(), topic)) {
+ sender.send(fileToSend, TransmissionPolicy.FILE);
+ }
+
+ assertEquals(fileToSend.length(), rcvFile.length());
+ assertCrcEquals(fileToSend, rcvFile);
+ }
+
+ /**
+ * @throws Exception If fails.
+ */
+ @Test
+ public void testFileHandlerNextWriterOpened() throws Exception {
+ final int fileSizeBytes = 5 * 1024 * 1024;
+ final AtomicBoolean networkExThrown = new AtomicBoolean();
+
+ snd = startGrid(0);
+ rcv = startGrid(1);
+
+ snd.cluster().active(true);
+
+ File fileToSend = createFileRandomData("File_5MB", fileSizeBytes);
+ File rcvFile = new File(tempStore, "File_5MB" + "_" + rcv.localNode().id());
+
+ rcv.context().io().addTransmissionHandler(topic, new DefaultTransmissionHandler(rcv, fileToSend, tempStore) {
+ @Override public void onException(UUID nodeId, Throwable err) {
+ assertEquals("Previous session is not closed properly", IgniteCheckedException.class, err.getClass());
+ }
+
+ @Override public String filePath(UUID nodeId, TransmissionMeta fileMeta) {
+ if (networkExThrown.compareAndSet(false, true))
+ return null;
+
+ return rcvFile.getAbsolutePath();
+ }
+ });
+
+ Exception expectedErr = null;
+
+ try (GridIoManager.TransmissionSender sender = snd.context()
+ .io()
+ .openTransmissionSender(rcv.localNode().id(), topic)) {
+ sender.send(fileToSend, TransmissionPolicy.FILE);
+ }
+ catch (IgniteCheckedException e) {
+ // Expected exception.
+ expectedErr = e;
+ }
+
+ assertNotNull("Transmission must ends with an exception", expectedErr);
+
+ // Open next session and complete successfull.
+ try (GridIoManager.TransmissionSender sender = snd.context()
+ .io()
+ .openTransmissionSender(rcv.localNode().id(), topic)) {
+ sender.send(fileToSend, TransmissionPolicy.FILE);
+ }
+
+ assertEquals(fileToSend.length(), rcvFile.length());
+ assertCrcEquals(fileToSend, rcvFile);
+ }
+
+ /**
+ * @throws Exception If fails.
+ */
+ @Test(expected = IgniteException.class)
+ public void testFileHandlerSendToNullTopic() throws Exception {
+ snd = startGrid(0);
+ rcv = startGrid(1);
+
+ snd.cluster().active(true);
+
+ // Ensure topic handler is empty.
+ rcv.context().io().removeTransmissionHandler(topic);
+
+ // Open next writer on removed topic.
+ try (GridIoManager.TransmissionSender sender = snd.context()
+ .io()
+ .openTransmissionSender(rcv.localNode().id(), topic)) {
+ sender.send(createFileRandomData("File_1MB", 1024 * 1024), TransmissionPolicy.FILE);
+ }
+ }
+
+ /**
+ * @throws Exception If fails.
+ */
+ @Test(expected = IgniteCheckedException.class)
+ public void testFileHandlerChannelCloseIfAnotherOpened() throws Exception {
+ final int fileSizeBytes = 5 * 1024 * 1024;
+ final CountDownLatch waitLatch = new CountDownLatch(2);
+ final CountDownLatch completionWait = new CountDownLatch(2);
+
+ snd = startGrid(0);
+ rcv = startGrid(1);
+
+ snd.cluster().active(true);
+
+ File fileToSend = createFileRandomData("file5MBSize", fileSizeBytes);
+
+ rcv.context().io().addTransmissionHandler(topic, new DefaultTransmissionHandler(rcv, fileToSend, tempStore) {
+ @Override public String filePath(UUID nodeId, TransmissionMeta fileMeta) {
+ waitLatch.countDown();
+
+ return super.filePath(nodeId, fileMeta);
+ }
+ });
+
+ Exception[] errs = new Exception[1];
+
+ try (GridIoManager.TransmissionSender sender = snd.context()
+ .io()
+ .openTransmissionSender(rcv.localNode().id(), topic);
+
+ GridIoManager.TransmissionSender anotherSender = snd.context()
+ .io()
+ .openTransmissionSender(rcv.localNode().id(), topic)) {
+ // Will connect on write attempt.
+ GridTestUtils.runAsync(() -> {
+ try {
+ sender.send(fileToSend, TransmissionPolicy.FILE);
+ }
+ catch (IgniteCheckedException | IOException | InterruptedException e) {
+ errs[0] = e;
+ }
+ finally {
+ completionWait.countDown();
+ }
+ });
+
+ GridTestUtils.runAsync(() -> {
+ try {
+ anotherSender.send(fileToSend, TransmissionPolicy.FILE);
+ }
+ catch (IgniteCheckedException | IOException | InterruptedException e) {
+ errs[0] = e;
+ }
+ finally {
+ completionWait.countDown();
+ }
+ });
+
+ waitLatch.await(5, TimeUnit.SECONDS);
+
+ // Expected that one of the writers will throw exception.
+ assertFalse("An error must be thrown if connected to the same topic during processing",
+ errs[0] == null);
+
+ completionWait.await(5, TimeUnit.SECONDS);
+
+ throw errs[0];
+ }
+ }
+
+ /**
+ * @throws Exception If fails.
+ */
+ @Test
+ public void testChunkHandlerWithReconnect() throws Exception {
+ snd = startGrid(0);
+ rcv = startGrid(1);
+
+ final String filePrefix = "testFile";
+ final AtomicInteger cnt = new AtomicInteger();
+ final AtomicInteger acceptedChunks = new AtomicInteger();
+ final File file = new File(tempStore, filePrefix + "_" + rcv.localNode().id());
+
+ snd.cluster().active(true);
+
+ File fileToSend = createFileRandomData(filePrefix, 10 * 1024 * 1024);
+
+ transmissionFileIoFactory(snd, new FileIOFactory() {
+ @Override public FileIO create(File file, OpenOption... modes) throws IOException {
+ FileIO fileIo = IO_FACTORY.create(file, modes);
+
+ return new FileIODecorator(fileIo) {
+ /** {@inheritDoc} */
+ @Override public long transferTo(long position, long count, WritableByteChannel target)
+ throws IOException {
+ // Send 5 chunks and close the channel.
+ if (cnt.incrementAndGet() == 10)
+ target.close();
+
+ return super.transferTo(position, count, target);
+ }
+ };
+ }
+ });
+
+ rcv.context().io().addTransmissionHandler(topic, new TransmissionHandlerAdapter() {
+ /** {@inheritDoc} */
+ @Override public void onException(UUID nodeId, Throwable err) {
+ U.closeQuiet(fileIo[0]);
+
+ fileIo[0] = null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Consumer<ByteBuffer> chunkHandler(UUID nodeId, TransmissionMeta initMeta) {
+
+ if (fileIo[0] == null) {
+ try {
+ fileIo[0] = IO_FACTORY.create(file);
+ fileIo[0].position(initMeta.offset());
+ }
+ catch (IOException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ return new Consumer<ByteBuffer>() {
+ final LongAdder transferred = new LongAdder();
+
+ @Override public void accept(ByteBuffer buff) {
+ try {
+ assertTrue(buff.order() == ByteOrder.nativeOrder());
+ assertEquals(0, buff.position());
+ assertEquals(buff.limit(), buff.capacity());
+
+ fileIo[0].writeFully(buff);
+
+ acceptedChunks.getAndIncrement();
+ transferred.add(buff.capacity());
+ }
+ catch (Throwable e) {
+ throw new IgniteException(e);
+ }
+ finally {
+ closeIfTransferred();
+ }
+ }
+
+ private void closeIfTransferred() {
+ if (transferred.longValue() == initMeta.count()) {
+ U.closeQuiet(fileIo[0]);
+
+ fileIo[0] = null;
+ }
+ }
+ };
+ }
+ });
+
+ try (GridIoManager.TransmissionSender sender = snd.context()
+ .io()
+ .openTransmissionSender(rcv.localNode().id(), topic)) {
+ sender.send(fileToSend, TransmissionPolicy.CHUNK);
+ }
+
+ assertEquals("Remote node must accept all chunks",
+ fileToSend.length() / rcv.configuration().getDataStorageConfiguration().getPageSize(),
+ acceptedChunks.get());
+ assertEquals("Received file and sent files have not the same lengtgh", fileToSend.length(), file.length());
+ assertCrcEquals(fileToSend, file);
+ assertNull(fileIo[0]);
+ }
+
+ /**
+ * @throws Exception If fails.
+ */
+ @Test(expected = IgniteCheckedException.class)
+ public void testChunkHandlerInitSizeFail() throws Exception {
+ snd = startGrid(0);
+ rcv = startGrid(1);
+
+ snd.cluster().active(true);
+
+ File fileToSend = createFileRandomData("testFile", 1024 * 1024);
+
+ rcv.context().io().addTransmissionHandler(topic, new TransmissionHandlerAdapter() {
+ /** {@inheritDoc} */
+ @Override public Consumer<ByteBuffer> chunkHandler(UUID nodeId, TransmissionMeta initMeta) {
+ throw new IgniteException("Test exception. Initialization failed");
+ }
+ });
+
+ try (GridIoManager.TransmissionSender sender = snd.context()
+ .io()
+ .openTransmissionSender(rcv.localNode().id(), topic)) {
+ sender.send(fileToSend, TransmissionPolicy.CHUNK);
+ }
+ }
+
+ /**
+ * @param ig Ignite instance to check.
+ */
+ private static void ensureResourcesFree(IgniteEx ig) {
+ if (ig == null)
+ return;
+
+ final GridIoManager io = ig.context().io();
+
+ ConcurrentMap<Object, Object> ctxs = GridTestUtils.getFieldValue(io, "rcvCtxs");
+ ConcurrentMap<T2<UUID, IgniteUuid>, AtomicBoolean> sndrFlags = GridTestUtils.getFieldValue(io, "senderStopFlags");
+
+ assertTrue("Receiver context map must be empty: " + ctxs, ctxs.isEmpty());
+ assertTrue("Sender stop flags must be empty: " + sndrFlags, sndrFlags.isEmpty());
+ }
+
+ /**
+ * @param ignite Ignite instance.
+ * @param cacheName Cache name to add data to.
+ */
+ private void addCacheData(Ignite ignite, String cacheName) {
+ try (IgniteDataStreamer<Integer, Integer> dataStreamer = ignite.dataStreamer(cacheName)) {
+ dataStreamer.allowOverwrite(true);
+
+ for (int i = 0; i < CACHE_SIZE; i++)
+ dataStreamer.addData(i, i + cacheName.hashCode());
+ }
+ }
+
+ /**
+ * @param ignite An ignite instance.
+ * @param cacheName Cache name.
+ * @return The cache working directory.
+ */
+ private File cacheWorkDir(IgniteEx ignite, String cacheName) {
+ // Resolve cache directory.
+ IgniteInternalCache<?, ?> cache = ignite.cachex(cacheName);
+
+ FilePageStoreManager pageStoreMgr = (FilePageStoreManager)cache.context()
+ .shared()
+ .pageStore();
+
+ return pageStoreMgr.cacheWorkDir(cache.configuration());
+ }
+
+ /**
+ * @param name The file name to create.
+ * @param size The file size.
+ * @throws IOException If fails.
+ */
+ private File createFileRandomData(String name, final int size) throws IOException {
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ File out = new File(tempStore, name);
+
+ try (RandomAccessFile raf = new RandomAccessFile(out, "rw")) {
+ byte[] buf = new byte[size];
+ rnd.nextBytes(buf);
+ raf.write(buf);
+ }
+
+ return out;
+ }
+
+ /**
+ * @param ignite Ignite instance to set factory.
+ * @param factory New factory to use.
+ */
+ private static void transmissionFileIoFactory(IgniteEx ignite, FileIOFactory factory) {
+ setFieldValue(ignite.context().io(), "fileIoFactory", factory);
+ }
+
+ /**
+ * @param fileToSend Source file to check CRC.
+ * @param fileReceived Destination file to check CRC.
+ */
+ private static void assertCrcEquals(File fileToSend, File fileReceived) {
+ try {
+ assertEquals(FastCrc.calcCrc(fileToSend), FastCrc.calcCrc(fileReceived));
+ }
+ catch (IOException e) {
+ throw new AssertionError(e);
+ }
+ }
+
+ /** The defailt implementation of transmit session. */
+ private static class DefaultTransmissionHandler extends TransmissionHandlerAdapter {
+ /** Ignite recevier node. */
+ private final IgniteEx rcv;
+
+ /** File to be send. */
+ private final File fileToSend;
+
+ /** Temporary local storage. */
+ private final File tempStorage;
+
+ /**
+ * @param rcv Ignite recevier node.
+ * @param fileToSend File to be send.
+ * @param tempStorage Temporary local storage.
+ */
+ public DefaultTransmissionHandler(IgniteEx rcv, File fileToSend, File tempStorage) {
+ this.rcv = rcv;
+ this.fileToSend = fileToSend;
+ this.tempStorage = tempStorage;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String filePath(UUID nodeId, TransmissionMeta fileMeta) {
+ return new File(tempStorage, fileMeta.name() + "_" + rcv.localNode().id()).getAbsolutePath();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Consumer<File> fileHandler(UUID nodeId, TransmissionMeta initMeta) {
+ return new Consumer<File>() {
+ @Override public void accept(File file) {
+ assertEquals(fileToSend.length(), file.length());
+ assertCrcEquals(fileToSend, file);
+ }
+ };
+ }
+ }
+
+ /** */
+ private static class BlockingOpenChannelCommunicationSpi extends TcpCommunicationSpi {
+ /** Latch to wait at. */
+ private final CountDownLatch latch = new CountDownLatch(1);
+
+ /** {@code true} to start waiting. */
+ private volatile boolean block;
+
+ /** {@inheritDoc} */
+ @Override public IgniteInternalFuture<Channel> openChannel(ClusterNode remote,
+ Message initMsg) throws IgniteSpiException {
+ try {
+ if (block) {
+ U.log(log, "Start waiting on trying open a new channel");
+
+ latch.await(5, TimeUnit.SECONDS);
+ }
+ }
+ catch (InterruptedException e) {
+ throw new IgniteException(e);
+ }
+
+ return super.openChannel(remote, initMsg);
+ }
+ }
+
+ /**
+ * The defailt implementation of transmit session.
+ */
+ private static class TransmissionHandlerAdapter implements TransmissionHandler {
+ /** {@inheritDoc} */
+ @Override public String filePath(UUID nodeId, TransmissionMeta fileMeta) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Consumer<ByteBuffer> chunkHandler(UUID nodeId, TransmissionMeta initMeta) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Consumer<File> fileHandler(UUID nodeId, TransmissionMeta initMeta) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onException(UUID nodeId, Throwable err) {
+ // No-op.
+ }
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
index d6b9b99..ed9079d 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
@@ -45,6 +45,7 @@ import org.apache.ignite.internal.MarshallerContextLockingSelfTest;
import org.apache.ignite.internal.TransactionsMXBeanImplTest;
import org.apache.ignite.internal.managers.IgniteDiagnosticMessagesMultipleConnectionsTest;
import org.apache.ignite.internal.managers.IgniteDiagnosticMessagesTest;
+import org.apache.ignite.internal.managers.communication.GridIoManagerFileTransmissionSelfTest;
import org.apache.ignite.internal.managers.discovery.IncompleteDeserializationExceptionTest;
import org.apache.ignite.internal.pagemem.wal.record.WALRecordTest;
import org.apache.ignite.internal.processors.DeadLockOnNodeLeftExchangeTest;
@@ -252,7 +253,9 @@ import org.junit.runners.Suite;
ClassPathContentLoggingTest.class,
- IncompleteDeserializationExceptionTest.class
+ IncompleteDeserializationExceptionTest.class,
+
+ GridIoManagerFileTransmissionSelfTest.class
})
public class IgniteBasicTestSuite {
}