You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2018/02/16 22:34:36 UTC

asterixdb git commit: [NO ISSUE][STO] Perform IO reads in uninterruptible threads

Repository: asterixdb
Updated Branches:
  refs/heads/master c3c235743 -> 98574960c


[NO ISSUE][STO] Perform IO reads in uninterruptible threads

- user model changes: no
- storage format changes: no
- interface changes: no

details:
- Previously, IO reads are performed on the task threads.
- Task threads can be interrupted which can cause deadlocks
  due to a JDK synchronization bug.
- After this change, 2 IO read threads are there per IO device.
- Threads will receive IO read requests and process them.
- Such threads are never interrupted and are killed through
  the use of a poison pill.

Change-Id: Id28d57a222f42962284b24296cb9b91658e5dc77
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2387
Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mb...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/98574960
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/98574960
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/98574960

Branch: refs/heads/master
Commit: 98574960cf9163ca3b15faaf8dcea1581cd22e43
Parents: c3c2357
Author: Abdullah Alamoudi <ba...@gmail.com>
Authored: Wed Feb 14 10:27:40 2018 -0800
Committer: abdullah alamoudi <ba...@gmail.com>
Committed: Fri Feb 16 14:34:16 2018 -0800

----------------------------------------------------------------------
 .../hyracks/hyracks-storage-common/pom.xml      |  14 +-
 .../hyracks/storage/common/IIndexCursor.java    |  21 ++-
 .../storage/common/buffercache/BufferCache.java | 157 +++++++++++++++----
 .../storage/common/buffercache/CachedPage.java  |  42 ++++-
 4 files changed, 188 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/98574960/hyracks-fullstack/hyracks/hyracks-storage-common/pom.xml
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/pom.xml b/hyracks-fullstack/hyracks/hyracks-storage-common/pom.xml
index 9936e1a..40422f4 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/pom.xml
@@ -16,18 +16,16 @@
  ! specific language governing permissions and limitations
  ! under the License.
  !-->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
   <modelVersion>4.0.0</modelVersion>
   <artifactId>hyracks-storage-common</artifactId>
   <name>hyracks-storage-common</name>
-
   <parent>
     <groupId>org.apache.hyracks</groupId>
     <artifactId>hyracks</artifactId>
     <version>0.3.4-SNAPSHOT</version>
   </parent>
-
   <licenses>
     <license>
       <name>Apache License, Version 2.0</name>
@@ -36,7 +34,6 @@
       <comments>A business-friendly OSS license</comments>
     </license>
   </licenses>
-
   <properties>
     <root.dir>${basedir}/../..</root.dir>
   </properties>
@@ -52,8 +49,13 @@
       <version>${project.version}</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.hyracks</groupId>
+      <artifactId>hyracks-util</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.logging.log4j</groupId>
       <artifactId>log4j-api</artifactId>
     </dependency>
   </dependencies>
-</project>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/98574960/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IIndexCursor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IIndexCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IIndexCursor.java
index f704921..00c5dce 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IIndexCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IIndexCursor.java
@@ -26,14 +26,20 @@ import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 /**
  * Represents an index cursor. The expected use
  * cursor = new cursor();
- * while (more predicates){
- * -cursor.open(predicate);
- * -while (cursor.hasNext()){
- * --cursor.next()
+ * try{
+ * -while (more predicates){
+ * --cursor.open(predicate);
+ * --try{
+ * ---while (cursor.hasNext()){
+ * ----cursor.next()
+ * ---}
+ * --} finally{
+ * ---cursor.close();
+ * --}
  * -}
- * -cursor.close();
+ * } finally{
+ * -cursor.destroy();
  * }
- * cursor.destroy();
  * Each created cursor must have destroy called
  * Each successfully opened cursor must have close called
  *
@@ -47,7 +53,8 @@ import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
  * When a cursor object is created, it is in the CLOSED state.
  * CLOSED: Legal calls are open() --> OPENED, or destroy() --> DESTROYED, close() --> no effect
  * OPENED: The only legal calls are hasNext(), next(), or close() --> CLOSED.
- * DESTROYED: All calls are illegal.
+ * DESTROYED: The only legal call is destroy() which has no effect.
+ *
  * Cursors must enforce the cursor state machine
  */
 public interface IIndexCursor extends IDestroyable {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/98574960/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
index 1443bbc..73969db 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
@@ -28,11 +28,14 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -45,8 +48,10 @@ import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
 import org.apache.hyracks.api.replication.IIOReplicationManager;
 import org.apache.hyracks.api.util.IoUtil;
+import org.apache.hyracks.storage.common.buffercache.CachedPage.State;
 import org.apache.hyracks.storage.common.file.BufferedFileHandle;
 import org.apache.hyracks.storage.common.file.IFileMapManager;
+import org.apache.hyracks.util.InvokeUtil;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -55,6 +60,7 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
 
     private static final Logger LOGGER = LogManager.getLogger();
     private static final int MAP_FACTOR = 3;
+    private static final CachedPage POISON_PILL = new CachedPage();
 
     private static final int MIN_CLEANED_COUNT_DIFF = 3;
     private static final int PIN_MAX_WAIT_TIME = 50;
@@ -66,7 +72,8 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
 
     private final int pageSize;
     private final int maxOpenFiles;
-    final IIOManager ioManager;
+    private final ExecutorService executor;
+    private final IIOManager ioManager;
     private final CacheBucket[] pageMap;
     private final IPageReplacementStrategy pageReplacementStrategy;
     private final IPageCleanerPolicy pageCleanerPolicy;
@@ -75,6 +82,7 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
     private final Map<Integer, BufferedFileHandle> fileInfoMap;
     private final AsyncFIFOPageQueueManager fifoWriter;
     private final Queue<BufferCacheHeaderHelper> headerPageCache = new ConcurrentLinkedQueue<>();
+    private final BlockingQueue<CachedPage> readRequests;
 
     //DEBUG
     private Level fileOpsLevel = Level.DEBUG;
@@ -103,19 +111,43 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
         this.pageReplacementStrategy = pageReplacementStrategy;
         this.pageCleanerPolicy = pageCleanerPolicy;
         this.fileMapManager = fileMapManager;
-
-        Executor executor = Executors.newCachedThreadPool(threadFactory);
-        fileInfoMap = new HashMap<>();
-        cleanerThread = new CleanerThread();
-        executor.execute(cleanerThread);
-        closed = false;
-
-        fifoWriter = new AsyncFIFOPageQueueManager(this);
-        if (DEBUG) {
-            confiscatedPages = new ArrayList<>();
-            confiscatedPagesOwner = new HashMap<>();
-            confiscateLock = new ReentrantLock();
-            pinnedPageOwner = new ConcurrentHashMap<>();
+        int numReaders = ioManager.getIODevices().size() * 2;
+        readRequests = new ArrayBlockingQueue<>(pageReplacementStrategy.getMaxAllowedNumPages());
+        executor = Executors.newFixedThreadPool(numReaders + 1, threadFactory);
+        try {
+            fileInfoMap = new HashMap<>();
+            cleanerThread = new CleanerThread();
+            executor.execute(cleanerThread);
+            for (int i = 0; i < numReaders; i++) {
+                executor.execute(new ReaderThread(i));
+            }
+            closed = false;
+            fifoWriter = new AsyncFIFOPageQueueManager(this);
+            if (DEBUG) {
+                confiscatedPages = new ArrayList<>();
+                confiscatedPagesOwner = new HashMap<>();
+                confiscateLock = new ReentrantLock();
+                pinnedPageOwner = new ConcurrentHashMap<>();
+            }
+        } catch (Throwable th) {
+            try {
+                throw th;
+            } finally {
+                readRequests.offer(POISON_PILL); // NOSONAR will always succeed since the queue is empty
+                executor.shutdown();
+                try {
+                    if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
+                        LOGGER.log(Level.WARN, "Failure shutting down buffer cache executor service");
+                    }
+                } catch (InterruptedException e) {
+                    LOGGER.log(Level.WARN, "Interrupted while shutting down buffer cache executor service");
+                    Thread.currentThread().interrupt();
+                    th.addSuppressed(e);
+                } catch (Throwable e) {
+                    LOGGER.log(Level.WARN, "Failure shutting down buffer cache executor service", e);
+                    th.addSuppressed(e);
+                }
+            }
         }
     }
 
@@ -185,22 +217,27 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
             // Resolve race of multiple threads trying to read the page from
             // disk.
             synchronized (cPage) {
-                if (!cPage.valid) {
+                if (cPage.state != State.VALID) {
                     try {
-                        tryRead(cPage);
-                        cPage.valid = true;
-                    } catch (Exception e) {
-                        LOGGER.log(Level.WARN, "Failure while trying to read a page from disk", e);
-                        throw e;
-                    } finally {
-                        if (!cPage.valid) {
-                            unpin(cPage);
+                        // Will attempt to re-read even if previous read failed
+                        if (cPage.state == State.INVALID || cPage.state == State.READ_FAILED) {
+                            // submit request to read
+                            cPage.state = State.READ_REQUESTED;
+                            readRequests.put(cPage);
                         }
+                        cPage.awaitRead();
+                    } catch (InterruptedException e) {
+                        cPage.state = State.INVALID;
+                        unpin(cPage);
+                        throw HyracksDataException.create(e);
+                    } catch (Throwable th) {
+                        unpin(cPage);
+                        throw HyracksDataException.create(th);
                     }
                 }
             }
         } else {
-            cPage.valid = true;
+            cPage.state = State.VALID;
         }
         pageReplacementStrategy.notifyCachePageAccess(cPage);
         if (DEBUG) {
@@ -449,7 +486,7 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
                     buffer.append("      ").append(cp.cpid).append(" -> [")
                             .append(BufferedFileHandle.getFileId(cp.dpid)).append(':')
                             .append(BufferedFileHandle.getPageId(cp.dpid)).append(", ").append(cp.pinCount.get())
-                            .append(", ").append(cp.valid ? "valid" : "invalid").append(", ")
+                            .append(", ").append(cp.state).append(", ")
                             .append(cp.confiscated.get() ? "confiscated" : "physical").append(", ")
                             .append(cp.dirty.get() ? "dirty" : "clean").append("]\n");
                     cp = cp.next;
@@ -480,7 +517,7 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
                 if (c.confiscated() || c.latch.getReadLockCount() != 0 || c.latch.getWriteHoldCount() != 0) {
                     return false;
                 }
-                if (c.valid) {
+                if (c.state == State.VALID) {
                     reachableDpids.add(c.dpid);
                 }
             }
@@ -519,6 +556,9 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
                 read(cPage);
                 return;
             } catch (HyracksDataException readException) {
+                if (Thread.interrupted()) {
+                    LOGGER.log(Level.WARN, "Ignoring interrupt. Reader threads should never be interrupted.");
+                }
                 if (readException.getErrorCode() == ErrorCode.CANNOT_READ_CLOSED_FILE && i <= MAX_PAGE_READ_ATTEMPTS) {
                     /**
                      * if the read failure was due to another thread closing the file channel because
@@ -530,8 +570,7 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
                         LOGGER.log(Level.WARN, String.format("Failed to read page. Retrying attempt (%d/%d)", i + 1,
                                 MAX_PAGE_READ_ATTEMPTS), readException);
                     } catch (InterruptedException e) {
-                        Thread.currentThread().interrupt();
-                        throw HyracksDataException.create(e);
+                        LOGGER.log(Level.WARN, "Ignoring interrupt. Reader threads should never be interrupted.");
                     }
                 } else {
                     throw readException;
@@ -670,6 +709,54 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
         }
     }
 
+    private class ReaderThread implements Runnable {
+        private final int num;
+
+        private ReaderThread(int num) {
+            this.num = num;
+        }
+
+        @Override
+        public void run() {
+            Thread.currentThread().setName("Buffer-Cache-Reader-" + num);
+            while (true) {
+                CachedPage next;
+                try {
+                    next = readRequests.take();
+                } catch (InterruptedException e) {
+                    LOGGER.log(Level.WARN, "Ignoring interrupt. Reader threads should never be interrupted.");
+                    break;
+                }
+                if (next == POISON_PILL) {
+                    LOGGER.log(Level.INFO, "Exiting");
+                    InvokeUtil.doUninterruptibly(() -> readRequests.put(POISON_PILL));
+                    if (Thread.interrupted()) {
+                        LOGGER.log(Level.ERROR, "Ignoring interrupt. Reader threads should never be interrupted.");
+                    }
+                    break;
+                }
+                if (next.state != State.READ_REQUESTED) {
+                    LOGGER.log(Level.ERROR,
+                            "Exiting BufferCache reader thread. Took a page with state = {} out of the queue",
+                            next.state);
+                    break;
+                }
+                try {
+                    tryRead(next);
+                    next.state = State.VALID;
+                } catch (HyracksDataException e) {
+                    next.readFailure = e;
+                    next.state = State.READ_FAILED;
+                    LOGGER.log(Level.WARN, "Failed to read a page", e);
+                }
+                synchronized (next) {
+                    next.notifyAll();
+                }
+            }
+        }
+
+    }
+
     private class CleanerThread implements Runnable {
         private volatile boolean shutdownStart = false;
         private volatile boolean shutdownComplete = false;
@@ -799,6 +886,16 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
             });
             fileInfoMap.clear();
         }
+        InvokeUtil.doUninterruptibly(() -> readRequests.put(POISON_PILL));
+        executor.shutdown();
+        try {
+            if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
+                LOGGER.log(Level.WARN, "Failure shutting down buffer cache executor service");
+            }
+        } catch (InterruptedException e) {
+            LOGGER.log(Level.WARN, "Interrupted while shutting down buffer cache executor service");
+            Thread.currentThread().interrupt();
+        }
     }
 
     @Override
@@ -1343,7 +1440,7 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
             }
             try {
                 cPage.reset(cPage.dpid);
-                cPage.valid = true;
+                cPage.state = State.VALID;
                 cPage.next = bucket.cachedPage;
                 bucket.cachedPage = cPage;
                 cPage.pinCount.decrementAndGet();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/98574960/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java
index bc0a04e..d7a55af 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java
@@ -23,10 +23,19 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
 /**
  * @author yingyib
  */
 public class CachedPage implements ICachedPageInternal {
+    public enum State {
+        INVALID,
+        READ_REQUESTED,
+        READ_FAILED,
+        VALID
+    }
+
     final int cpid;
     ByteBuffer buffer;
     public final AtomicInteger pinCount;
@@ -36,7 +45,7 @@ public class CachedPage implements ICachedPageInternal {
     private final IPageReplacementStrategy pageReplacementStrategy;
     volatile long dpid; // disk page id (composed of file id and page id)
     CachedPage next;
-    volatile boolean valid;
+    volatile State state;
     final AtomicBoolean confiscated;
     private IQueueInfo queueInfo;
     private int multiplier;
@@ -44,6 +53,7 @@ public class CachedPage implements ICachedPageInternal {
     // DEBUG
     private static final boolean DEBUG = false;
     private final StackTraceElement[] ctorStack;
+    Throwable readFailure;
 
     //Constructor for making dummy entry for FIFO queue
     public CachedPage() {
@@ -72,7 +82,7 @@ public class CachedPage implements ICachedPageInternal {
         latch = new ReentrantReadWriteLock(true);
         replacementStrategyObject = pageReplacementStrategy.createPerPageStrategyObject(cpid);
         dpid = -1;
-        valid = false;
+        state = State.INVALID;
         confiscated = new AtomicBoolean(false);
         queueInfo = null;
         ctorStack = DEBUG ? new Throwable().getStackTrace() : null;
@@ -81,7 +91,7 @@ public class CachedPage implements ICachedPageInternal {
     public void reset(long dpid) {
         this.dpid = dpid;
         dirty.set(false);
-        valid = false;
+        state = State.INVALID;
         confiscated.set(false);
         pageReplacementStrategy.notifyCachePageReset(this);
         queueInfo = null;
@@ -205,4 +215,30 @@ public class CachedPage implements ICachedPageInternal {
     public boolean isLargePage() {
         return multiplier > 1;
     }
+
+    /**
+     * Wait for the page requested to be read to complete the read operation
+     * This method is uninterrubtible
+     *
+     * @throws HyracksDataException
+     */
+    public synchronized void awaitRead() throws HyracksDataException {
+        boolean interrupted = false;
+        try {
+            while (state != State.VALID && state != State.READ_FAILED) {
+                try {
+                    wait();
+                } catch (InterruptedException e) {
+                    interrupted = true;
+                }
+            }
+            if (state == State.READ_FAILED) {
+                throw HyracksDataException.create(readFailure);
+            }
+        } finally {
+            if (interrupted) {
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
 }