You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2019/03/15 14:11:32 UTC
[ignite] branch master updated: IGNITE-11546 Fixed FileDownloader
early close - Fixes #6271.
This is an automated email from the ASF dual-hosted git repository.
agoncharuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new c70a4cd IGNITE-11546 Fixed FileDownloader early close - Fixes #6271.
c70a4cd is described below
commit c70a4cd0c7c984cdb2602575d36eb21583835bf3
Author: Alexey Goncharuk <al...@gmail.com>
AuthorDate: Fri Mar 15 17:04:03 2019 +0300
IGNITE-11546 Fixed FileDownloader early close - Fixes #6271.
Signed-off-by: Alexey Goncharuk <al...@gmail.com>
---
.../cache/persistence/GridCacheOffheapManager.java | 9 ++
.../cache/persistence/file/FileDownloader.java | 166 +++++++++++++--------
.../cache/persistence/file/FilePageStore.java | 2 +-
.../cache/persistence/file/FileUploader.java | 40 +++--
.../cache/persistence/file/FileDownloaderTest.java | 15 +-
5 files changed, 132 insertions(+), 100 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index 2978f69..26a535c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -1474,6 +1474,15 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
try {
Metas metas = getOrAllocatePartitionMetas();
+ if (PageIdUtils.partId(metas.reuseListRoot.pageId().pageId()) != partId ||
+ PageIdUtils.partId(metas.treeRoot.pageId().pageId()) != partId ||
+ PageIdUtils.partId(metas.pendingTreeRoot.pageId().pageId()) != partId) {
+ throw new IgniteCheckedException("Invalid meta root allocated [" +
+ "cacheOrGroupName=" + grp.cacheOrGroupName() +
+ ", partId=" + partId +
+ ", metas=" + metas + ']');
+ }
+
RootPage reuseRoot = metas.reuseListRoot;
freeList = new CacheFreeListImpl(
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileDownloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileDownloader.java
index 3c54a49..2678315 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileDownloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileDownloader.java
@@ -25,13 +25,11 @@ import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.lang.IgniteInClosureX;
import org.apache.ignite.internal.util.typedef.internal.U;
/**
@@ -42,22 +40,28 @@ public class FileDownloader {
private final IgniteLogger log;
/** */
- private static final int CHUNK_SIZE = 1024 * 1024;
+ private static final int CHUNK_SIZE = 16 * 1024 * 1024;
/** */
private final Path path;
/** */
- private final AtomicLong size = new AtomicLong(-1);
+ private long bytesReceived;
/** */
- private ServerSocketChannel serverChannel;
+ private boolean doneTransfer;
/** */
- private volatile GridFutureAdapter<?> finishFut;
+ private long bytesSent = -1;
/** */
- private final GridFutureAdapter<?> initFut = new GridFutureAdapter<>();
+ private ServerSocketChannel srvChan;
+
+ /** */
+ private SocketChannel readChan;
+
+ /** */
+ private final GridFutureAdapter<Void> finishFut = new GridFutureAdapter<>();
/**
*
@@ -68,6 +72,13 @@ public class FileDownloader {
}
/**
+ * @return Download finish future.
+ */
+ public IgniteInternalFuture<Void> finishFuture() {
+ return finishFut;
+ }
+
+ /**
*
*/
public InetSocketAddress start() throws IgniteCheckedException {
@@ -76,7 +87,7 @@ public class FileDownloader {
ch.bind(null);
- serverChannel = ch;
+ srvChan = ch;
return (InetSocketAddress)ch.getLocalAddress();
}
@@ -88,30 +99,9 @@ public class FileDownloader {
/**
*
*/
- public void download(GridFutureAdapter<?> fut){
- this.finishFut = fut;
-
- final ServerSocketChannel ch = serverChannel;
-
- fut.listen(new IgniteInClosureX<IgniteInternalFuture<?>>() {
- @Override public void applyx(IgniteInternalFuture<?> future) throws IgniteCheckedException {
- try {
-
- if (log != null && log.isInfoEnabled())
- log.info("Server socket closed " + ch.getLocalAddress());
-
- ch.close();
- }
- catch (Exception ex) {
- U.error(log, "Fail close socket.", ex);
-
- throw new IgniteCheckedException(ex);
- }
- }
- });
-
- FileChannel writeChannel = null;
- SocketChannel readChannel = null;
+ public void download() {
+ FileChannel writeChan = null;
+ SocketChannel readChan = null;
try {
File f = new File(path.toUri().getPath());
@@ -124,43 +114,53 @@ public class FileDownloader {
if (!cacheWorkDir.exists())
cacheWorkDir.mkdir();
- writeChannel = FileChannel.open(path, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE);
+ readChan = srvChan.accept();
+
+ if (log != null && log.isInfoEnabled())
+ log.info("Accepted incoming connection, closing server socket: " + srvChan.getLocalAddress());
- initFut.onDone();
+ U.closeQuiet(srvChan);
- readChannel = serverChannel.accept();
+ synchronized (this) {
+ if (finishFut.isDone()) {
+ // Already received a response with error.
+ U.closeQuiet(readChan);
+
+ return;
+ }
+ else
+ this.readChan = readChan;
+ }
+
+ writeChan = FileChannel.open(path, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE);
+
+ if (log != null && log.isInfoEnabled())
+ log.info("Started writing file [path=" + path + ", rmtAddr=" + readChan.getRemoteAddress() + ']');
long pos = 0;
- long size = this.size.get();
+ boolean finish = false;
- while (size == -1 || pos < size) {
- pos += writeChannel.transferFrom(readChannel, pos, CHUNK_SIZE);
+ while (!finish && !finishFut.isDone()) {
+ long transferred = writeChan.transferFrom(readChan, pos, CHUNK_SIZE);
- if (size == -1)
- size = this.size.get();
+ pos += transferred;
+
+ finish = onBytesReceived(transferred);
}
}
catch (IOException ex) {
- initFut.onDone(ex);
-
- fut.onDone(ex);
+ finishFut.onDone(ex);
}
finally {
try {
- if (writeChannel != null)
- writeChannel.close();
+ onDoneTransfer();
}
- catch (IOException ex) {
- throw new IgniteException("Could not close file: " + path);
- }
-
- try {
- if (readChannel != null)
- readChannel.close();
- }
- catch (IOException ex) {
- throw new IgniteException("Could not close socket");
+ finally {
+ // Safety.
+ U.closeQuiet(srvChan);
+ U.close(writeChan, log);
+ U.close(readChan, log);
}
}
}
@@ -168,22 +168,58 @@ public class FileDownloader {
/**
*
*/
- public void download(long size, Throwable th) {
- try {
- initFut.get();
+ public void onResult(long size, Throwable th) {
+ synchronized (this) {
+ if (th != null) {
+ bytesSent = 0;
- if (th != null)
finishFut.onDone(th);
+
+ U.closeQuiet(readChan);
+ }
else {
- if (!this.size.compareAndSet(-1, size))
- finishFut.onDone(new IgniteException("Size mismatch: " + this.size.get() + " != " + size));
- else
- finishFut.onDone();
+ bytesSent = size;
+
+ checkCompleted();
}
+ }
+ }
+
+ /**
+ * @param transferred Number of bytes transferred.
+ * @return {@code True} if should keep reading.
+ */
+ private boolean onBytesReceived(long transferred) {
+ synchronized (this) {
+ bytesReceived += transferred;
+ return bytesSent != -1 && bytesSent == bytesReceived;
}
- catch (IgniteCheckedException e) {
- finishFut.onDone(e);
+ }
+
+ /**
+ * Called when reading thread stopped transferring bytes for any reason.
+ */
+ private void onDoneTransfer() {
+ synchronized (this) {
+ doneTransfer = true;
+
+ checkCompleted();
+ }
+ }
+
+ /**
+ *
+ */
+ private void checkCompleted() {
+ // Compare sizes if done reading from the socket and received response from remote node.
+ if (doneTransfer && bytesSent != -1) {
+ if (bytesReceived == bytesSent)
+ finishFut.onDone();
+ else {
+ finishFut.onDone(new IgniteException("Failed to transfer file (sent and received sizes mismatch) [" +
+ "bytesReceived=" + bytesReceived + ", bytesSent=" + bytesSent + ", file=" + path + ']'));
+ }
}
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
index 0968c49..4e1c243 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
@@ -396,7 +396,7 @@ public class FilePageStore implements PageStore {
assert pageBuf.position() == 0;
assert pageBuf.order() == ByteOrder.nativeOrder();
assert off <= allocated.get() : "calculatedOffset=" + off +
- ", allocated=" + allocated.get() + ", headerSize=" + headerSize();
+ ", allocated=" + allocated.get() + ", headerSize=" + headerSize() + ", cfgFile=" + cfgFile;
int n = readWithFailover(pageBuf, off);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileUploader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileUploader.java
index ba21ae9..4ea4c06 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileUploader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileUploader.java
@@ -25,8 +25,9 @@ import java.nio.channels.SocketChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.internal.U;
/**
* Part of direct node to node file downloading
@@ -38,18 +39,22 @@ public class FileUploader {
/** */
private final Path path;
+ /** */
+ private final IgniteLogger log;
+
/**
*
*/
- public FileUploader(Path path) {
+ public FileUploader(Path path, IgniteLogger log) {
this.path = path;
+ this.log = log;
}
/**
*
*/
- public void upload(SocketChannel writeChannel, GridFutureAdapter<Long> finishFut) {
- FileChannel readChannel = null;
+ public void upload(SocketChannel writeChan, GridFutureAdapter<Long> finishFut) {
+ FileChannel readChan = null;
try {
File file = new File(path.toUri().getPath());
@@ -64,14 +69,17 @@ public class FileUploader {
return;
}
- readChannel = FileChannel.open(path, StandardOpenOption.READ);
+ readChan = FileChannel.open(path, StandardOpenOption.READ);
long written = 0;
- long size = readChannel.size();
+ long size = readChan.size();
while (written < size)
- written += readChannel.transferTo(written, CHUNK_SIZE, writeChannel);
+ written += readChan.transferTo(written, CHUNK_SIZE, writeChan);
+
+ writeChan.shutdownOutput();
+ writeChan.shutdownInput();
finishFut.onDone(written);
}
@@ -79,22 +87,8 @@ public class FileUploader {
finishFut.onDone(ex);
}
finally {
- //FIXME: when an error occurs on writeChannel.close() no attempt to close readChannel will happen. Need to be fixed.
- try {
- if (writeChannel != null)
- writeChannel.close();
- }
- catch (IOException ex) {
- throw new IgniteException("Could not close socket.");
- }
-
- try {
- if (readChannel != null)
- readChannel.close();
- }
- catch (IOException ex) {
- throw new IgniteException("Could not close file: " + path);
- }
+ U.close(writeChan, log);
+ U.close(readChan, log);
}
}
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/file/FileDownloaderTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/file/FileDownloaderTest.java
index 90af04e..ccaa1ac 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/file/FileDownloaderTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/file/FileDownloaderTest.java
@@ -24,7 +24,6 @@ import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
import java.nio.file.Files;
import java.nio.file.Path;
-import java.util.concurrent.CountDownLatch;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
@@ -87,7 +86,7 @@ public class FileDownloaderTest extends GridCommonAbstractTest {
GridFutureAdapter<Long> finishFut = new GridFutureAdapter<>();
- FileUploader uploader = new FileUploader(UPLOADER_PATH);
+ FileUploader uploader = new FileUploader(UPLOADER_PATH, log);
SocketChannel sc = null;
@@ -98,13 +97,7 @@ public class FileDownloaderTest extends GridCommonAbstractTest {
U.warn(log, "Fail connect to " + address, e);
}
- CountDownLatch downLatch = new CountDownLatch(1);
-
- runAsync(() -> {
- downloader.download(finishFut);
-
- downLatch.countDown();
- });
+ runAsync(downloader::download);
SocketChannel finalSc = sc;
@@ -112,9 +105,9 @@ public class FileDownloaderTest extends GridCommonAbstractTest {
finishFut.get();
- downloader.download(finishFut.get(), null);
+ downloader.onResult(finishFut.get(), null);
- downLatch.await();
+ downloader.finishFuture().get();
assertTrue(DOWNLOADER_PATH.toFile().exists());