You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2018/01/16 00:33:48 UTC

asterixdb git commit: [NO ISSUE][STO] Unpin pages when interrupted during reads

Repository: asterixdb
Updated Branches:
  refs/heads/master d15f88881 -> f3e4397b3


[NO ISSUE][STO] Unpin pages when interrupted during reads

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- This change fixes a bug that happens when a thread pinning
  a page that is not already in the buffer cache is interrupted.
- The fix is that if a failure happens during the read call,
  the page is unpinned.
- A test case is added

Change-Id: I8d1c52fcf89ed90e8ef6019cd77842dd7468df49
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2274
Sonar-Qube: Jenkins <je...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mb...@apache.org>
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/f3e4397b
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/f3e4397b
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/f3e4397b

Branch: refs/heads/master
Commit: f3e4397b35804394bcf421d216a53832c0855d55
Parents: d15f888
Author: Abdullah Alamoudi <ba...@gmail.com>
Authored: Mon Jan 15 10:08:57 2018 -0800
Committer: abdullah alamoudi <ba...@gmail.com>
Committed: Mon Jan 15 16:33:28 2018 -0800

----------------------------------------------------------------------
 .../storage/common/buffercache/BufferCache.java |  13 ++-
 .../hyracks-storage-common-test/pom.xml         |   8 ++
 .../hyracks/storage/common/BufferCacheTest.java | 107 +++++++++++++++++++
 3 files changed, 126 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f3e4397b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
index 6212896..302c7b2 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
@@ -186,8 +186,17 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
             // disk.
             synchronized (cPage) {
                 if (!cPage.valid) {
-                    tryRead(cPage);
-                    cPage.valid = true;
+                    try {
+                        tryRead(cPage);
+                        cPage.valid = true;
+                    } catch (Exception e) {
+                        LOGGER.log(Level.WARN, "Failure while trying to read a page from disk", e);
+                        throw e;
+                    } finally {
+                        if (!cPage.valid) {
+                            unpin(cPage);
+                        }
+                    }
                 }
             }
         } else {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f3e4397b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/pom.xml
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/pom.xml b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/pom.xml
index f93df0a..bb31885 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/pom.xml
@@ -68,5 +68,13 @@
       <groupId>commons-io</groupId>
       <artifactId>commons-io</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-api</artifactId>
+    </dependency>
   </dependencies>
 </project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f3e4397b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/BufferCacheTest.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/BufferCacheTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/BufferCacheTest.java
index 26ad457..f94914c 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/BufferCacheTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/BufferCacheTest.java
@@ -26,8 +26,15 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.HyracksException;
@@ -39,11 +46,15 @@ import org.apache.hyracks.storage.common.buffercache.ICachedPage;
 import org.apache.hyracks.storage.common.file.BufferedFileHandle;
 import org.apache.hyracks.test.support.TestStorageManagerComponentHolder;
 import org.apache.hyracks.test.support.TestUtils;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Test;
 
 public class BufferCacheTest {
+    private static final Logger LOGGER = LogManager.getLogger();
     protected static final List<String> openedFiles = new ArrayList<>();
     protected static final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("ddMMyy-hhmmssSS");
 
@@ -62,6 +73,102 @@ public class BufferCacheTest {
     }
 
     @Test
+    public void interruptPinTest() throws Exception {
+        /*
+         * This test will create a buffer cache of a small size (4 pages)
+         * and then will create a file of size = 16 pages and have 4 threads
+         * pin and unpin the pages one by one. and another thread interrupts them
+         * for some time.. It then will close the file and ensure that all the pages are
+         * unpinned and that no problems are found
+         */
+        final int bufferCacheNumPages = 4;
+        TestStorageManagerComponentHolder.init(PAGE_SIZE, bufferCacheNumPages, MAX_OPEN_FILES);
+        IIOManager ioManager = TestStorageManagerComponentHolder.getIOManager();
+        IBufferCache bufferCache =
+                TestStorageManagerComponentHolder.getBufferCache(ctx.getJobletContext().getServiceContext());
+        final long duration = TimeUnit.SECONDS.toMillis(20);
+        final String fileName = getFileName();
+        final FileReference file = ioManager.resolve(fileName);
+        final int fileId = bufferCache.createFile(file);
+        final int numPages = 16;
+        bufferCache.openFile(fileId);
+        for (int i = 0; i < numPages; i++) {
+            long dpid = BufferedFileHandle.getDiskPageId(fileId, i);
+            ICachedPage page = bufferCache.confiscatePage(dpid);
+            page.getBuffer().putInt(0, i);
+            bufferCache.createFIFOQueue().put(page);
+        }
+        bufferCache.finishQueue();
+        bufferCache.closeFile(fileId);
+        ExecutorService executor = Executors.newFixedThreadPool(bufferCacheNumPages);
+        MutableObject<Thread>[] readers = new MutableObject[bufferCacheNumPages];
+        Future<Void>[] futures = new Future[bufferCacheNumPages];
+        for (int i = 0; i < bufferCacheNumPages; i++) {
+            readers[i] = new MutableObject<>();
+            final int threadNumber = i;
+            futures[i] = executor.submit(new Callable<Void>() {
+                @Override
+                public Void call() throws Exception {
+                    synchronized (readers[threadNumber]) {
+                        readers[threadNumber].setValue(Thread.currentThread());
+                        readers[threadNumber].notifyAll();
+                    }
+                    // for duration, just read the pages one by one.
+                    // At the end, close the file
+                    bufferCache.openFile(fileId);
+                    final long start = System.currentTimeMillis();
+                    int pageNumber = 0;
+                    int totalReads = 0;
+                    int successfulReads = 0;
+                    int interruptedReads = 0;
+                    while (System.currentTimeMillis() - start < duration) {
+                        totalReads++;
+                        pageNumber = (pageNumber + 1) % numPages;
+                        try {
+                            long dpid = BufferedFileHandle.getDiskPageId(fileId, pageNumber);
+                            ICachedPage page = bufferCache.pin(dpid, false);
+                            successfulReads++;
+                            bufferCache.unpin(page);
+                        } catch (HyracksDataException hde) {
+                            interruptedReads++;
+                            // clear
+                            Thread.interrupted();
+                        }
+                    }
+                    bufferCache.closeFile(fileId);
+                    LOGGER.log(Level.INFO, "Total reads = " + totalReads + " Successful Reads = " + successfulReads
+                            + " Interrupted Reads = " + interruptedReads);
+                    return null;
+                }
+            });
+        }
+
+        for (int i = 0; i < bufferCacheNumPages; i++) {
+            synchronized (readers[i]) {
+                while (readers[i].getValue() == null) {
+                    readers[i].wait();
+                }
+            }
+        }
+        final long start = System.currentTimeMillis();
+
+        while (System.currentTimeMillis() - start < duration) {
+            for (int i = 0; i < bufferCacheNumPages; i++) {
+                readers[i].getValue().interrupt();
+            }
+            Thread.sleep(25); // NOSONAR Sleep so some reads are successful
+        }
+        try {
+            for (int i = 0; i < bufferCacheNumPages; i++) {
+                futures[i].get();
+            }
+        } finally {
+            bufferCache.deleteFile(fileId);
+            bufferCache.close();
+        }
+    }
+
+    @Test
     public void simpleOpenPinCloseTest() throws HyracksException {
         TestStorageManagerComponentHolder.init(PAGE_SIZE, NUM_PAGES, MAX_OPEN_FILES);
         IBufferCache bufferCache =