You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mb...@apache.org on 2017/10/27 22:22:58 UTC

asterixdb git commit: [NO ISSUE][*DB] LogFlusher fixes

Repository: asterixdb
Updated Branches:
  refs/heads/master 63b92981e -> a0b29c564


[NO ISSUE][*DB] LogFlusher fixes

Change-Id: I19e150f2560573738938967f389a397ad7150a4d
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2106
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <je...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mh...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/a0b29c56
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/a0b29c56
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/a0b29c56

Branch: refs/heads/master
Commit: a0b29c5641cca500428b95a824eae852dfd78c13
Parents: 63b9298
Author: Michael Blow <mi...@couchbase.com>
Authored: Fri Oct 27 12:05:01 2017 -0400
Committer: Michael Blow <mb...@apache.org>
Committed: Fri Oct 27 15:22:40 2017 -0700

----------------------------------------------------------------------
 .../asterix/common/transactions/ILogBuffer.java |   3 +-
 .../asterix/common/utils/InterruptUtil.java     | 118 +++++++++++++++++++
 .../management/service/logging/LogBuffer.java   |  38 +++---
 .../management/service/logging/LogManager.java  |  89 ++++++--------
 4 files changed, 176 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a0b29c56/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogBuffer.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogBuffer.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogBuffer.java
index 8e67603..6bdce73 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogBuffer.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogBuffer.java
@@ -34,8 +34,9 @@ public interface ILogBuffer {
 
     /**
      * flush content of buffer to disk
+     * @param stopping
      */
-    void flush();
+    void flush(boolean stopping);
 
     /**
      * @param logSize

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a0b29c56/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/InterruptUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/InterruptUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/InterruptUtil.java
new file mode 100644
index 0000000..4c65c66
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/InterruptUtil.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.utils;
+
+public class InterruptUtil {
+    /**
+     * Executes the passed interruptible, retrying if the operation is interrupted. Once the interruptible
+     * completes, the current thread will be re-interrupted, if the original operation was interrupted.
+     */
+    public static void doUninterruptibly(Interruptible interruptible) {
+        boolean interrupted = false;
+        try {
+            while (true) {
+                try {
+                    interruptible.run();
+                    break;
+                } catch (InterruptedException e) { // NOSONAR- we will re-interrupt the thread during unwind
+                    interrupted = true;
+                }
+            }
+        } finally {
+            if (interrupted) {
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+
+    /**
+     * Executes the passed interruptible, retrying if the operation is interrupted. Once the interruptible
+     * completes, the current thread will be re-interrupted, if the original operation was interrupted.
+     */
+    public static void doExUninterruptibly(ThrowingInterruptible interruptible) throws Exception {
+        boolean interrupted = false;
+        try {
+            while (true) {
+                try {
+                    interruptible.run();
+                    break;
+                } catch (InterruptedException e) { // NOSONAR- we will re-interrupt the thread during unwind
+                    interrupted = true;
+                }
+            }
+        } finally {
+            if (interrupted) {
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+
+    /**
+     * Executes the passed interruptible, retrying if the operation is interrupted.
+     *
+     * @return true if the original operation was interrupted, otherwise false
+     */
+    public static boolean doUninterruptiblyGet(Interruptible interruptible) {
+        boolean interrupted = false;
+        while (true) {
+            try {
+                interruptible.run();
+                break;
+            } catch (InterruptedException e) { // NOSONAR- contract states caller must handle
+                interrupted = true;
+            }
+        }
+        return interrupted;
+    }
+
+    /**
+     * Executes the passed interruptible, retrying if the operation is interrupted.  If the operation throws an
+     * exception after being previously interrupted, the current thread will be re-interrupted.
+     *
+     * @return true if the original operation was interrupted, otherwise false
+     */
+    public static boolean doExUninterruptiblyGet(ThrowingInterruptible interruptible) throws Exception {
+        boolean interrupted = false;
+        boolean success = false;
+        while (true) {
+            try {
+                interruptible.run();
+                success = true;
+                break;
+            } catch (InterruptedException e) { // NOSONAR- contract states caller must handle
+                interrupted = true;
+            } finally {
+                if (!success && interrupted) {
+                    Thread.currentThread().interrupt();
+                }
+            }
+        }
+        return interrupted;
+    }
+
+    @FunctionalInterface
+    public interface Interruptible {
+        void run() throws InterruptedException;
+    }
+
+    @FunctionalInterface
+    public interface ThrowingInterruptible {
+        void run() throws Exception; // NOSONAR
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a0b29c56/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
index 081cf02..668eab1 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
@@ -105,15 +105,15 @@ public class LogBuffer implements ILogBuffer {
                 if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT
                         || logRecord.getLogType() == LogType.WAIT) {
                     logRecord.isFlushed(false);
-                    syncCommitQ.offer(logRecord);
+                    syncCommitQ.add(logRecord);
                 }
                 if (logRecord.getLogType() == LogType.FLUSH) {
                     logRecord.isFlushed(false);
-                    flushQ.offer(logRecord);
+                    flushQ.add(logRecord);
                 }
             } else if (logRecord.getLogSource() == LogSource.REMOTE
                     && (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT)) {
-                remoteJobsQ.offer(logRecord);
+                remoteJobsQ.add(logRecord);
             }
             this.notify();
         }
@@ -168,29 +168,30 @@ public class LogBuffer implements ILogBuffer {
     ////////////////////////////////////
 
     @Override
-    public void flush() {
+    public void flush(boolean stopping) {
+        boolean interrupted = false;
         try {
             int endOffset;
             while (!full.get()) {
-                synchronized (this) {
-                    if (appendOffset - flushOffset == 0 && !full.get()) {
-                        try {
+                try {
+                    synchronized (this) {
+                        if (appendOffset - flushOffset == 0 && !full.get()) {
                             if (IS_DEBUG_MODE) {
                                 LOGGER.info("flush()| appendOffset: " + appendOffset + ", flushOffset: " + flushOffset
                                         + ", full: " + full.get());
                             }
-                            if (stop) {
+                            if (stopping || stop) {
                                 fileChannel.close();
                                 return;
                             }
-                            this.wait();
-                        } catch (InterruptedException e) {
-                            continue;
+                            wait();
                         }
+                        endOffset = appendOffset;
                     }
-                    endOffset = appendOffset;
-                }
                 internalFlush(flushOffset, endOffset);
+                } catch (InterruptedException e) {
+                    interrupted = true;
+                }
             }
             internalFlush(flushOffset, appendOffset);
             if (isLastPage) {
@@ -198,6 +199,10 @@ public class LogBuffer implements ILogBuffer {
             }
         } catch (IOException e) {
             throw new IllegalStateException(e);
+        } finally {
+            if (interrupted) {
+                Thread.currentThread().interrupt();
+            }
         }
     }
 
@@ -230,7 +235,7 @@ public class LogBuffer implements ILogBuffer {
         if (endOffset > beginOffset) {
             logBufferTailReader.initializeScan(beginOffset, endOffset);
 
-            ITransactionContext txnCtx = null;
+            ITransactionContext txnCtx;
 
             LogRecord logRecord = logBufferTailReader.next();
             while (logRecord != null) {
@@ -327,8 +332,9 @@ public class LogBuffer implements ILogBuffer {
     }
 
     @Override
-    public void stop() {
-        this.stop = true;
+    public synchronized void stop() {
+        stop = true;
+        notifyAll();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a0b29c56/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
index e5e91e8..5f9369d 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
@@ -33,7 +33,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -51,6 +51,7 @@ import org.apache.asterix.common.transactions.LogManagerProperties;
 import org.apache.asterix.common.transactions.LogType;
 import org.apache.asterix.common.transactions.MutableLong;
 import org.apache.asterix.common.transactions.TxnLogFile;
+import org.apache.asterix.common.utils.InterruptUtil;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
 
 public class LogManager implements ILogManager, ILifeCycleComponent {
@@ -162,7 +163,7 @@ public class LogManager implements ILogManager, ILifeCycleComponent {
             }
         }
 
-        /**
+        /*
          * To eliminate the case where the modulo of the next appendLSN = 0 (the next
          * appendLSN = the first LSN of the next log file), we do not allow a log to be
          * written at the last offset of the current file.
@@ -616,13 +617,11 @@ public class LogManager implements ILogManager, ILifeCycleComponent {
      * The deadlock happens when PrimaryIndexOpeartionTracker.completeOperation results in generating a FLUSH log and there are no empty log buffers available to log it.
      */
     private class FlushLogsLogger extends Thread {
-        private ILogRecord logRecord;
-
         @Override
         public void run() {
             while (true) {
                 try {
-                    logRecord = flushLogsQ.take();
+                    ILogRecord logRecord = flushLogsQ.take();
                     appendToLogTail(logRecord);
                 } catch (ACIDException e) {
                     e.printStackTrace();
@@ -641,77 +640,57 @@ class LogFlusher implements Callable<Boolean> {
     private final LinkedBlockingQueue<ILogBuffer> emptyQ;
     private final LinkedBlockingQueue<ILogBuffer> flushQ;
     private final LinkedBlockingQueue<ILogBuffer> stashQ;
-    private ILogBuffer flushPage;
-    private final AtomicBoolean isStarted;
-    private final AtomicBoolean terminateFlag;
+    private volatile ILogBuffer flushPage;
+    private volatile boolean stopping;
+    private final Semaphore started;
 
-    public LogFlusher(LogManager logMgr, LinkedBlockingQueue<ILogBuffer> emptyQ, LinkedBlockingQueue<ILogBuffer> flushQ,
+    LogFlusher(LogManager logMgr, LinkedBlockingQueue<ILogBuffer> emptyQ, LinkedBlockingQueue<ILogBuffer> flushQ,
             LinkedBlockingQueue<ILogBuffer> stashQ) {
         this.logMgr = logMgr;
         this.emptyQ = emptyQ;
         this.flushQ = flushQ;
         this.stashQ = stashQ;
-        flushPage = null;
-        isStarted = new AtomicBoolean(false);
-        terminateFlag = new AtomicBoolean(false);
-
+        this.started = new Semaphore(0);
     }
 
     public void terminate() {
-        //make sure the LogFlusher thread started before terminating it.
-        synchronized (isStarted) {
-            while (!isStarted.get()) {
-                try {
-                    isStarted.wait();
-                } catch (InterruptedException e) {
-                    //ignore
-                }
-            }
-        }
+        // make sure the LogFlusher thread started before terminating it.
+        InterruptUtil.doUninterruptibly(started::acquire);
 
-        terminateFlag.set(true);
-        if (flushPage != null) {
-            synchronized (flushPage) {
-                flushPage.stop();
-                flushPage.notify();
-            }
+        stopping = true;
+
+        // we must tell any active flush, if any, to stop
+        final ILogBuffer currentFlushPage = this.flushPage;
+        if (currentFlushPage != null) {
+            currentFlushPage.stop();
         }
-        //[Notice]
-        //The return value doesn't need to be checked
-        //since terminateFlag will trigger termination if the flushQ is full.
-        flushQ.offer(POISON_PILL);
+        // finally we put a POISON_PILL onto the flushQ to indicate to the flusher it is time to exit
+        InterruptUtil.doUninterruptibly(() -> flushQ.put(POISON_PILL));
     }
 
     @Override
-    public Boolean call() {
-        synchronized (isStarted) {
-            isStarted.set(true);
-            isStarted.notify();
-        }
+    public Boolean call() throws InterruptedException {
+        started.release();
+        boolean interrupted = false;
         try {
             while (true) {
                 flushPage = null;
-                try {
-                    flushPage = flushQ.take();
-                    if (flushPage == POISON_PILL || terminateFlag.get()) {
-                        return true;
-                    }
-                } catch (InterruptedException e) {
-                    if (flushPage == null) {
-                        continue;
-                    }
+                interrupted = InterruptUtil.doUninterruptiblyGet(() -> flushPage = flushQ.take()) || interrupted;
+                if (flushPage == POISON_PILL) {
+                    return true;
                 }
-                flushPage.flush();
-                emptyQ.offer(flushPage.getLogPageSize() == logMgr.getLogPageSize() ? flushPage : stashQ.remove());
+                flushPage.flush(stopping);
+
+                // TODO(mblow): recycle large pages
+                emptyQ.add(flushPage.getLogPageSize() == logMgr.getLogPageSize() ? flushPage : stashQ.remove());
             }
         } catch (Exception e) {
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("-------------------------------------------------------------------------");
-                LOGGER.info("LogFlusher is terminating abnormally. System is in unusalbe state.");
-                LOGGER.info("-------------------------------------------------------------------------");
-            }
-            e.printStackTrace();
+            LOGGER.log(Level.SEVERE, "LogFlusher is terminating abnormally. System is in unusable state.", e);
             throw e;
+        } finally {
+            if (interrupted) {
+                Thread.currentThread().interrupt();
+            }
         }
     }
 }