You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mh...@apache.org on 2017/09/14 15:52:28 UTC

asterixdb git commit: [ASTERIXDB-2046][STO] Retry Reading a Page on ClosedChannelException

Repository: asterixdb
Updated Branches:
  refs/heads/master 41b4519ec -> 2e3c52015


[ASTERIXDB-2046][STO] Retry Reading a Page on ClosedChannelException

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Currently when concurrent threads are reading from the same file
  and one of them is interrupted, the file channel will be closed and
  other threads will fail with AsynchronousCloseException if they were
  reading or ClosedChannelException if they were about to read the file.
  This change make the threads that were not interrupted retry the read
  operation.

Change-Id: I4f9de9f51596314d17e4c6a4a58333e8fd6c03d1
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2005
Sonar-Qube: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <ba...@gmail.com>
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <je...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/2e3c5201
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/2e3c5201
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/2e3c5201

Branch: refs/heads/master
Commit: 2e3c52015bfed6d6bed6c26124a3cf10b10864c5
Parents: 41b4519
Author: Murtadha Hubail <mh...@apache.org>
Authored: Wed Sep 13 19:04:53 2017 -0700
Committer: Murtadha Hubail <mh...@apache.org>
Committed: Thu Sep 14 08:52:10 2017 -0700

----------------------------------------------------------------------
 .../hyracks/api/exceptions/ErrorCode.java       |  1 +
 .../src/main/resources/errormsg/en.properties   |  1 +
 .../apache/hyracks/control/nc/io/IOManager.java |  3 ++
 .../storage/common/buffercache/BufferCache.java | 34 ++++++++++++-
 .../hyracks/storage/common/BufferCacheTest.java | 52 ++++++++++++++++++++
 5 files changed, 89 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2e3c5201/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index cf83bca..68e7cd1 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -127,6 +127,7 @@ public class ErrorCode {
     public static final int TIMEOUT = 91;
     public static final int JOB_HAS_BEEN_CLEARED_FROM_HISTORY = 92;
     public static final int JOB_HAS_NOT_BEEN_CREATED_YET = 93;
+    public static final int CANNOT_READ_CLOSED_FILE = 94;
 
     // Compilation error codes.
     public static final int RULECOLLECTION_NOT_INSTANCE_OF_LIST = 10000;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2e3c5201/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index 56abee5..ede2f86 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -110,5 +110,6 @@
 91 = Operation timed out
 92 = Job %1$s has been cleared from job history
 93 = Job %1$s has not been created yet
+94 = Cannot read closed file (%1$s)
 
 10000 = The given rule collection %1$s is not an instance of the List class.

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2e3c5201/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
index 3da30ab..54a171d 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
@@ -23,6 +23,7 @@ import java.io.FilenameFilter;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.ClosedByInterruptException;
+import java.nio.channels.ClosedChannelException;
 import java.nio.channels.FileChannel;
 import java.nio.file.Path;
 import java.nio.file.Paths;
@@ -209,6 +210,8 @@ public class IOManager implements IIOManager {
             // re-open the closed channel. The channel will be closed during the typical file lifecycle
             ((FileHandle) fHandle).ensureOpen();
             throw HyracksDataException.create(e);
+        } catch (ClosedChannelException e) {
+            throw HyracksDataException.create(ErrorCode.CANNOT_READ_CLOSED_FILE, e, fHandle.getFileReference());
         } catch (IOException e) {
             throw HyracksDataException.create(e);
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2e3c5201/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
index f89f638..6304a9a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
@@ -39,6 +39,7 @@ import java.util.concurrent.locks.ReentrantLock;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.io.IFileHandle;
@@ -58,6 +59,8 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
     private static final int PIN_MAX_WAIT_TIME = 50;
     private static final int PIN_ATTEMPT_CYCLES_WARNING_THRESHOLD = 3;
     private static final int MAX_PIN_ATTEMPT_CYCLES = 1000;
+    private static final int MAX_PAGE_READ_ATTEMPTS = 5;
+    private static final long PERIOD_BETWEEN_READ_ATTEMPTS = 100;
     public static final boolean DEBUG = false;
 
     private final int pageSize;
@@ -209,7 +212,7 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
             // disk.
             synchronized (cPage) {
                 if (!cPage.valid) {
-                    read(cPage);
+                    tryRead(cPage);
                     cPage.valid = true;
                 }
             }
@@ -527,6 +530,33 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
         return false;
     }
 
+    private void tryRead(CachedPage cPage) throws HyracksDataException {
+        for (int i = 1; i <= MAX_PAGE_READ_ATTEMPTS; i++) {
+            try {
+                read(cPage);
+                return;
+            } catch (HyracksDataException readException) {
+                if (readException.getErrorCode() == ErrorCode.CANNOT_READ_CLOSED_FILE && i <= MAX_PAGE_READ_ATTEMPTS) {
+                    /**
+                     * if the read failure was due to another thread closing the file channel because
+                     * it was interrupted, we will try to read again since the interrupted thread
+                     * will re-open the file.
+                     */
+                    try {
+                        Thread.sleep(PERIOD_BETWEEN_READ_ATTEMPTS);
+                        LOGGER.log(Level.WARNING, String.format("Failed to read page. Retrying attempt (%d/%d)", i + 1,
+                                MAX_PAGE_READ_ATTEMPTS), readException);
+                    } catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                        throw HyracksDataException.create(e);
+                    }
+                } else {
+                    throw readException;
+                }
+            }
+        }
+    }
+
     private void read(CachedPage cPage) throws HyracksDataException {
         BufferedFileHandle fInfo = getFileInfo(cPage);
         cPage.buffer.clear();
@@ -1122,7 +1152,7 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
             ICachedPageInternal old = cachedPages.set(victim.cpid, null);
             if (DEBUG) {
                 assert old == victim;
-            } ;
+            }
         }
         return true;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2e3c5201/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/BufferCacheTest.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/BufferCacheTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/BufferCacheTest.java
index e688ee5..e34ee0e 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/BufferCacheTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/BufferCacheTest.java
@@ -26,12 +26,14 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.storage.common.buffercache.CachedPage;
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
 import org.apache.hyracks.storage.common.buffercache.ICachedPage;
 import org.apache.hyracks.storage.common.file.BufferedFileHandle;
@@ -314,6 +316,56 @@ public class BufferCacheTest {
         bufferCache.close();
     }
 
+    @Test
+    public void interruptedConcurrentReadTest() throws Exception {
+        TestStorageManagerComponentHolder.init(PAGE_SIZE, 200, MAX_OPEN_FILES);
+        IBufferCache bufferCache =
+                TestStorageManagerComponentHolder.getBufferCache(ctx.getJobletContext().getServiceContext());
+        IIOManager ioManager = TestStorageManagerComponentHolder.getIOManager();
+        String fileName = getFileName();
+        FileReference file = ioManager.resolve(fileName);
+        int fileId = bufferCache.createFile(file);
+        int testPageId = 0;
+        bufferCache.openFile(fileId);
+
+        final int expectedPinCount = 100;
+        final AtomicInteger actualPinCount = new AtomicInteger(0);
+        Thread innocentReader = new Thread(() -> {
+            Thread interruptedReader = null;
+            try {
+                for (int i = 0; i < expectedPinCount; i++) {
+                    ICachedPage aPage = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, testPageId), false);
+                    bufferCache.unpin(aPage);
+                    ((CachedPage) aPage).invalidate();
+                    actualPinCount.incrementAndGet();
+                    if (i % 10 == 0) {
+                        // start an interruptedReader that will cause the channel to closed
+                        interruptedReader = new Thread(() -> {
+                            try {
+                                Thread.currentThread().interrupt();
+                                bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, testPageId + 1), false);
+                            } catch (Exception e) {
+                                e.printStackTrace();
+                            }
+                        });
+                        interruptedReader.start();
+                    }
+                }
+                if (interruptedReader != null) {
+                    interruptedReader.join();
+                }
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        });
+        innocentReader.start();
+        innocentReader.join();
+        // make sure that all reads by the innocentReader succeeded
+        Assert.assertEquals(actualPinCount.get(), expectedPinCount);
+        // close file
+        bufferCache.closeFile(fileId);
+    }
+
     @AfterClass
     public static void cleanup() throws Exception {
         for (String s : openedFiles) {