You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2019/03/15 14:11:32 UTC

[ignite] branch master updated: IGNITE-11546 Fixed FileDownloader early close - Fixes #6271.

This is an automated email from the ASF dual-hosted git repository.

agoncharuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new c70a4cd  IGNITE-11546 Fixed FileDownloader early close - Fixes #6271.
c70a4cd is described below

commit c70a4cd0c7c984cdb2602575d36eb21583835bf3
Author: Alexey Goncharuk <al...@gmail.com>
AuthorDate: Fri Mar 15 17:04:03 2019 +0300

    IGNITE-11546 Fixed FileDownloader early close - Fixes #6271.
    
    Signed-off-by: Alexey Goncharuk <al...@gmail.com>
---
 .../cache/persistence/GridCacheOffheapManager.java |   9 ++
 .../cache/persistence/file/FileDownloader.java     | 166 +++++++++++++--------
 .../cache/persistence/file/FilePageStore.java      |   2 +-
 .../cache/persistence/file/FileUploader.java       |  40 +++--
 .../cache/persistence/file/FileDownloaderTest.java |  15 +-
 5 files changed, 132 insertions(+), 100 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index 2978f69..26a535c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -1474,6 +1474,15 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
                 try {
                     Metas metas = getOrAllocatePartitionMetas();
 
+                    if (PageIdUtils.partId(metas.reuseListRoot.pageId().pageId()) != partId ||
+                        PageIdUtils.partId(metas.treeRoot.pageId().pageId()) != partId ||
+                        PageIdUtils.partId(metas.pendingTreeRoot.pageId().pageId()) != partId) {
+                        throw new IgniteCheckedException("Invalid meta root allocated [" +
+                            "cacheOrGroupName=" + grp.cacheOrGroupName() +
+                            ", partId=" + partId +
+                            ", metas=" + metas + ']');
+                    }
+
                     RootPage reuseRoot = metas.reuseListRoot;
 
                     freeList = new CacheFreeListImpl(
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileDownloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileDownloader.java
index 3c54a49..2678315 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileDownloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileDownloader.java
@@ -25,13 +25,11 @@ import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
 import java.nio.file.Path;
 import java.nio.file.StandardOpenOption;
-import java.util.concurrent.atomic.AtomicLong;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.lang.IgniteInClosureX;
 import org.apache.ignite.internal.util.typedef.internal.U;
 
 /**
@@ -42,22 +40,28 @@ public class FileDownloader {
     private final IgniteLogger log;
 
     /** */
-    private static final int CHUNK_SIZE = 1024 * 1024;
+    private static final int CHUNK_SIZE = 16 * 1024 * 1024;
 
     /** */
     private final Path path;
 
     /** */
-    private final AtomicLong size = new AtomicLong(-1);
+    private long bytesReceived;
 
     /** */
-    private ServerSocketChannel serverChannel;
+    private boolean doneTransfer;
 
     /** */
-    private volatile GridFutureAdapter<?> finishFut;
+    private long bytesSent = -1;
 
     /** */
-    private final GridFutureAdapter<?> initFut = new GridFutureAdapter<>();
+    private ServerSocketChannel srvChan;
+
+    /** */
+    private SocketChannel readChan;
+
+    /** */
+    private final GridFutureAdapter<Void> finishFut = new GridFutureAdapter<>();
 
     /**
      *
@@ -68,6 +72,13 @@ public class FileDownloader {
     }
 
     /**
+     * @return Download finish future.
+     */
+    public IgniteInternalFuture<Void> finishFuture() {
+        return finishFut;
+    }
+
+    /**
      *
      */
     public InetSocketAddress start() throws IgniteCheckedException {
@@ -76,7 +87,7 @@ public class FileDownloader {
 
             ch.bind(null);
 
-            serverChannel = ch;
+            srvChan = ch;
 
             return (InetSocketAddress)ch.getLocalAddress();
         }
@@ -88,30 +99,9 @@ public class FileDownloader {
     /**
      *
      */
-    public void download(GridFutureAdapter<?> fut){
-        this.finishFut = fut;
-
-        final ServerSocketChannel ch = serverChannel;
-
-        fut.listen(new IgniteInClosureX<IgniteInternalFuture<?>>() {
-            @Override public void applyx(IgniteInternalFuture<?> future) throws IgniteCheckedException {
-                try {
-
-                    if (log != null && log.isInfoEnabled())
-                        log.info("Server socket closed " + ch.getLocalAddress());
-
-                    ch.close();
-                }
-                catch (Exception ex) {
-                    U.error(log, "Fail close socket.", ex);
-
-                    throw new IgniteCheckedException(ex);
-                }
-            }
-        });
-
-        FileChannel writeChannel = null;
-        SocketChannel readChannel = null;
+    public void download() {
+        FileChannel writeChan = null;
+        SocketChannel readChan = null;
 
         try {
             File f = new File(path.toUri().getPath());
@@ -124,43 +114,53 @@ public class FileDownloader {
             if (!cacheWorkDir.exists())
                 cacheWorkDir.mkdir();
 
-            writeChannel = FileChannel.open(path, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE);
+            readChan = srvChan.accept();
+
+            if (log != null && log.isInfoEnabled())
+                log.info("Accepted incoming connection, closing server socket: " + srvChan.getLocalAddress());
 
-            initFut.onDone();
+            U.closeQuiet(srvChan);
 
-            readChannel = serverChannel.accept();
+            synchronized (this) {
+                if (finishFut.isDone()) {
+                    // Already received a response with error.
+                    U.closeQuiet(readChan);
+
+                    return;
+                }
+                else
+                    this.readChan = readChan;
+            }
+
+            writeChan = FileChannel.open(path, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE);
+
+            if (log != null && log.isInfoEnabled())
+                log.info("Started writing file [path=" + path + ", rmtAddr=" + readChan.getRemoteAddress() + ']');
 
             long pos = 0;
 
-            long size = this.size.get();
+            boolean finish = false;
 
-            while (size == -1 || pos < size) {
-                pos += writeChannel.transferFrom(readChannel, pos, CHUNK_SIZE);
+            while (!finish && !finishFut.isDone()) {
+                long transferred = writeChan.transferFrom(readChan, pos, CHUNK_SIZE);
 
-                if (size == -1)
-                    size = this.size.get();
+                pos += transferred;
+
+                finish = onBytesReceived(transferred);
             }
         }
         catch (IOException ex) {
-            initFut.onDone(ex);
-
-            fut.onDone(ex);
+            finishFut.onDone(ex);
         }
         finally {
             try {
-                if (writeChannel != null)
-                    writeChannel.close();
+                onDoneTransfer();
             }
-            catch (IOException ex) {
-                throw new IgniteException("Could not close file: " + path);
-            }
-
-            try {
-                if (readChannel != null)
-                    readChannel.close();
-            }
-            catch (IOException ex) {
-                throw new IgniteException("Could not close socket");
+            finally {
+                // Safety.
+                U.closeQuiet(srvChan);
+                U.close(writeChan, log);
+                U.close(readChan, log);
             }
         }
     }
@@ -168,22 +168,58 @@ public class FileDownloader {
     /**
      *
      */
-    public void download(long size, Throwable th) {
-        try {
-            initFut.get();
+    public void onResult(long size, Throwable th) {
+        synchronized (this) {
+            if (th != null) {
+                bytesSent = 0;
 
-            if (th != null)
                 finishFut.onDone(th);
+
+                U.closeQuiet(readChan);
+            }
             else {
-                if (!this.size.compareAndSet(-1, size))
-                    finishFut.onDone(new IgniteException("Size mismatch: " + this.size.get() + " != " + size));
-                else
-                    finishFut.onDone();
+                bytesSent = size;
+
+                checkCompleted();
             }
+        }
+    }
+
+    /**
+     * @param transferred Number of bytes transferred.
+     * @return {@code True} if should keep reading.
+     */
+    private boolean onBytesReceived(long transferred) {
+        synchronized (this) {
+            bytesReceived += transferred;
 
+            return bytesSent != -1 && bytesSent == bytesReceived;
         }
-        catch (IgniteCheckedException e) {
-            finishFut.onDone(e);
+    }
+
+    /**
+     * Called when reading thread stopped transferring bytes for any reason.
+     */
+    private void onDoneTransfer() {
+        synchronized (this) {
+            doneTransfer = true;
+
+            checkCompleted();
+        }
+    }
+
+    /**
+     *
+     */
+    private void checkCompleted() {
+        // Compare sizes if done reading from the socket and received response from remote node.
+        if (doneTransfer && bytesSent != -1) {
+            if (bytesReceived == bytesSent)
+                finishFut.onDone();
+            else {
+                finishFut.onDone(new IgniteException("Failed to transfer file (sent and received sizes mismatch) [" +
+                    "bytesReceived=" + bytesReceived + ", bytesSent=" + bytesSent + ", file=" + path + ']'));
+            }
         }
     }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
index 0968c49..4e1c243 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
@@ -396,7 +396,7 @@ public class FilePageStore implements PageStore {
             assert pageBuf.position() == 0;
             assert pageBuf.order() == ByteOrder.nativeOrder();
             assert off <= allocated.get() : "calculatedOffset=" + off +
-                ", allocated=" + allocated.get() + ", headerSize=" + headerSize();
+                ", allocated=" + allocated.get() + ", headerSize=" + headerSize() + ", cfgFile=" + cfgFile;
 
             int n = readWithFailover(pageBuf, off);
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileUploader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileUploader.java
index ba21ae9..4ea4c06 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileUploader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileUploader.java
@@ -25,8 +25,9 @@ import java.nio.channels.SocketChannel;
 import java.nio.file.Path;
 import java.nio.file.StandardOpenOption;
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.internal.U;
 
 /**
  * Part of direct node to node file downloading
@@ -38,18 +39,22 @@ public class FileUploader {
     /** */
     private final Path path;
 
+    /** */
+    private final IgniteLogger log;
+
     /**
      *
      */
-    public FileUploader(Path path) {
+    public FileUploader(Path path, IgniteLogger log) {
         this.path = path;
+        this.log = log;
     }
 
     /**
      *
      */
-    public void upload(SocketChannel writeChannel, GridFutureAdapter<Long> finishFut) {
-        FileChannel readChannel = null;
+    public void upload(SocketChannel writeChan, GridFutureAdapter<Long> finishFut) {
+        FileChannel readChan = null;
 
         try {
             File file = new File(path.toUri().getPath());
@@ -64,14 +69,17 @@ public class FileUploader {
                 return;
             }
 
-            readChannel = FileChannel.open(path, StandardOpenOption.READ);
+            readChan = FileChannel.open(path, StandardOpenOption.READ);
 
             long written = 0;
 
-            long size = readChannel.size();
+            long size = readChan.size();
 
             while (written < size)
-                written += readChannel.transferTo(written, CHUNK_SIZE, writeChannel);
+                written += readChan.transferTo(written, CHUNK_SIZE, writeChan);
+
+            writeChan.shutdownOutput();
+            writeChan.shutdownInput();
 
             finishFut.onDone(written);
         }
@@ -79,22 +87,8 @@ public class FileUploader {
             finishFut.onDone(ex);
         }
         finally {
-            //FIXME: when an error occurs on writeChannel.close() no attempt to close readChannel will happen. Need to be fixed.
-            try {
-                if (writeChannel != null)
-                    writeChannel.close();
-            }
-            catch (IOException ex) {
-                throw new IgniteException("Could not close socket.");
-            }
-
-            try {
-                if (readChannel != null)
-                    readChannel.close();
-            }
-            catch (IOException ex) {
-                throw new IgniteException("Could not close file: " + path);
-            }
+            U.close(writeChan, log);
+            U.close(readChan, log);
         }
     }
 }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/file/FileDownloaderTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/file/FileDownloaderTest.java
index 90af04e..ccaa1ac 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/file/FileDownloaderTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/file/FileDownloaderTest.java
@@ -24,7 +24,6 @@ import java.net.InetSocketAddress;
 import java.nio.channels.SocketChannel;
 import java.nio.file.Files;
 import java.nio.file.Path;
-import java.util.concurrent.CountDownLatch;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
@@ -87,7 +86,7 @@ public class FileDownloaderTest extends GridCommonAbstractTest {
 
         GridFutureAdapter<Long> finishFut = new GridFutureAdapter<>();
 
-        FileUploader uploader = new FileUploader(UPLOADER_PATH);
+        FileUploader uploader = new FileUploader(UPLOADER_PATH, log);
 
         SocketChannel sc = null;
 
@@ -98,13 +97,7 @@ public class FileDownloaderTest extends GridCommonAbstractTest {
             U.warn(log, "Fail connect to " + address, e);
         }
 
-        CountDownLatch downLatch = new CountDownLatch(1);
-
-        runAsync(() -> {
-            downloader.download(finishFut);
-
-            downLatch.countDown();
-        });
+        runAsync(downloader::download);
 
         SocketChannel finalSc = sc;
 
@@ -112,9 +105,9 @@ public class FileDownloaderTest extends GridCommonAbstractTest {
 
         finishFut.get();
 
-        downloader.download(finishFut.get(), null);
+        downloader.onResult(finishFut.get(), null);
 
-        downLatch.await();
+        downloader.finishFuture().get();
 
         assertTrue(DOWNLOADER_PATH.toFile().exists());