You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mh...@apache.org on 2017/11/21 01:18:12 UTC

asterixdb git commit: [NO ISSUE][TX] Make TxnLogFile Close Idempotent

Repository: asterixdb
Updated Branches:
  refs/heads/master f9fe6dda8 -> 76ecc4b19


[NO ISSUE][TX] Make TxnLogFile Close Idempotent

- user model changes: no
- storage format changes: no
- interface changes: yes
  Renamed ILogReader.initializeScan to setPosition and added
  javadocs.

Details:
Currently there is an explicit check that the file channel
of a TxnLogFile is open before closing it. However, the
channel could be closed due to interrupts and therefore
we should remove the explicit check and always try to close
it. However, we should always decrement the TxnLogFile
references counter even if the channel is not open since
that TxnLogFile is not accessed anymore.

Change-Id: I255e4b9af0bc78298c0a25daf0b5629d413eba6a
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2165
Reviewed-by: Till Westmann <ti...@apache.org>
Sonar-Qube: Jenkins <je...@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/76ecc4b1
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/76ecc4b1
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/76ecc4b1

Branch: refs/heads/master
Commit: 76ecc4b191a5877f1543c711cbc8869657ce55e9
Parents: f9fe6dd
Author: Murtadha Hubail <mh...@apache.org>
Authored: Tue Nov 21 01:20:30 2017 +0300
Committer: Murtadha Hubail <mh...@apache.org>
Committed: Mon Nov 20 17:17:40 2017 -0800

----------------------------------------------------------------------
 .../apache/asterix/app/nc/RecoveryManager.java  |  6 +--
 .../asterix/common/transactions/ILogReader.java | 34 +++++++++----
 .../management/service/logging/LogManager.java  |  2 +-
 .../management/service/logging/LogReader.java   | 52 ++++++++------------
 4 files changed, 49 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/76ecc4b1/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index 7bc5697..19966fe 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -201,7 +201,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
         jobId2WinnerEntitiesMap = new HashMap<>();
         //set log reader to the lowWaterMarkLsn
         ILogRecord logRecord;
-        logReader.initializeScan(lowWaterMarkLSN);
+        logReader.setPosition(lowWaterMarkLSN);
         logRecord = logReader.next();
         while (logRecord != null) {
             if (IS_DEBUG_MODE) {
@@ -300,7 +300,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
 
         ILogRecord logRecord = null;
         try {
-            logReader.initializeScan(lowWaterMarkLSN);
+            logReader.setPosition(lowWaterMarkLSN);
             logRecord = logReader.next();
             while (logRecord != null) {
                 if (IS_DEBUG_MODE) {
@@ -540,7 +540,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
         Set<Integer> activePartitions = localResourceRepository.getActivePartitions();
         ILogReader logReader = logMgr.getLogReader(false);
         try {
-            logReader.initializeScan(firstLSN);
+            logReader.setPosition(firstLSN);
             ILogRecord logRecord = null;
             while (currentLSN < lastLSN) {
                 logRecord = logReader.next();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/76ecc4b1/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogReader.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogReader.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogReader.java
index da188e3..8539e2b 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogReader.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogReader.java
@@ -18,18 +18,34 @@
  */
 package org.apache.asterix.common.transactions;
 
-import org.apache.asterix.common.exceptions.ACIDException;
-
 public interface ILogReader {
 
-    public void initializeScan(long beginLSN) throws ACIDException;
+    /**
+     * Sets the log reader position at log sequence number with value {@code lsn}.
+     *
+     * @param lsn
+     */
+    void setPosition(long lsn);
 
-    //for scanning
-    public ILogRecord next() throws ACIDException;
+    /**
+     * Reads and returns the log record located at the log reader current position. After reading the log record,
+     * the log reader position is incremented by the size of the read log.
+     *
+     * @return the log record
+     */
+    ILogRecord next();
 
-    //for random reading
-    public ILogRecord read(long readLSN) throws ACIDException;
+    /**
+     * Reads and returns the log record with log sequence number {@code lsn}.
+     *
+     * @param lsn
+     * @return The log record
+     */
+    ILogRecord read(long lsn);
 
-    public void close() throws ACIDException;
+    /**
+     * Closes the log reader and any resources used.
+     */
+    void close();
 
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/76ecc4b1/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
index dd0a5c7..cdd957a 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
@@ -580,7 +580,7 @@ public class LogManager implements ILogManager, ILifeCycleComponent {
     @Override
     public void closeLogFile(TxnLogFile logFileRef, FileChannel fileChannel) throws IOException {
         if (!fileChannel.isOpen()) {
-            throw new IllegalStateException("File channel is not open");
+            LOGGER.warning(() -> "Closing log file with id(" + logFileRef.getLogFileId() + ") with a closed channel.");
         }
         fileChannel.close();
         untouchLogFile(logFileRef.getLogFileId());

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/76ecc4b1/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogReader.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogReader.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogReader.java
index 148aa7e..f2c5eef 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogReader.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogReader.java
@@ -30,14 +30,11 @@ import org.apache.asterix.common.transactions.ILogRecord.RecordReadStatus;
 import org.apache.asterix.common.transactions.LogRecord;
 import org.apache.asterix.common.transactions.MutableLong;
 import org.apache.asterix.common.transactions.TxnLogFile;
+import org.apache.hyracks.util.annotations.NotThreadSafe;
 
-/**
- * NOTE: Many method calls of this class are not thread safe.
- * Be very cautious using it in a multithreaded context.
- */
+@NotThreadSafe
 public class LogReader implements ILogReader {
 
-    public static final boolean IS_DEBUG_MODE = false;//true
     private static final Logger LOGGER = Logger.getLogger(LogReader.class.getName());
     private final ILogManager logMgr;
     private final long logFileSize;
@@ -54,7 +51,7 @@ public class LogReader implements ILogReader {
     private enum ReturnState {
         FLUSH,
         EOF
-    };
+    }
 
     public LogReader(ILogManager logMgr, long logFileSize, int logPageSize, MutableLong flushLSN,
             boolean isRecoveryMode) {
@@ -68,8 +65,8 @@ public class LogReader implements ILogReader {
     }
 
     @Override
-    public void initializeScan(long beginLSN) throws ACIDException {
-        readLSN = beginLSN;
+    public void setPosition(long lsn) {
+        readLSN = lsn;
         if (waitForFlushOrReturnIfEOF() == ReturnState.EOF) {
             return;
         }
@@ -84,7 +81,7 @@ public class LogReader implements ILogReader {
      * @throws ACIDException
      */
     @Override
-    public ILogRecord next() throws ACIDException {
+    public ILogRecord next() {
         if (waitForFlushOrReturnIfEOF() == ReturnState.EOF) {
             return null;
         }
@@ -147,13 +144,10 @@ public class LogReader implements ILogReader {
                     return ReturnState.EOF;
                 }
                 try {
-                    if (IS_DEBUG_MODE) {
-                        LOGGER.info(
-                                "waitForFlushOrReturnIfEOF()| flushLSN: " + flushLSN.get() + ", readLSN: " + readLSN);
-                    }
                     flushLSN.wait();
                 } catch (InterruptedException e) {
-                    //ignore
+                    Thread.currentThread().interrupt();
+                    throw new ACIDException(e);
                 }
             }
             return ReturnState.FLUSH;
@@ -166,10 +160,9 @@ public class LogReader implements ILogReader {
      * @return true if log continues, false if EOF
      * @throws ACIDException
      */
-    private boolean refillLogReadBuffer() throws ACIDException {
+    private boolean refillLogReadBuffer() {
         try {
             if (readLSN % logFileSize == logFile.size()) {
-                logFile.close();
                 readLSN += logFileSize - (readLSN % logFileSize);
                 getLogFile();
             }
@@ -183,14 +176,12 @@ public class LogReader implements ILogReader {
      * Fills the log buffer with data from the log file at the current position
      *
      * @return false if EOF, true otherwise
-     * @throws ACIDException
      */
-
-    private boolean fillLogReadBuffer() throws ACIDException {
+    private boolean fillLogReadBuffer() {
         return fillLogReadBuffer(logPageSize, readBuffer);
     }
 
-    private boolean fillLogReadBuffer(int readSize, ByteBuffer readBuffer) throws ACIDException {
+    private boolean fillLogReadBuffer(int readSize, ByteBuffer readBuffer) {
         int size = 0;
         int read = 0;
         readBuffer.position(0);
@@ -217,10 +208,9 @@ public class LogReader implements ILogReader {
         return true;
     }
 
-    //for random reading
     @Override
-    public ILogRecord read(long LSN) throws ACIDException {
-        readLSN = LSN;
+    public ILogRecord read(long lsn) {
+        readLSN = lsn;
         //wait for the log to be flushed if needed before trying to read it.
         synchronized (flushLSN) {
             while (readLSN >= flushLSN.get()) {
@@ -232,15 +222,10 @@ public class LogReader implements ILogReader {
             }
         }
         try {
-            if (logFile == null) {
+            if (logFile == null || readLSN < fileBeginLSN || readLSN >= fileBeginLSN + logFile.size()) {
                 //get the log file which contains readLSN
                 getLogFile();
                 fillLogReadBuffer();
-            } else if (readLSN < fileBeginLSN || readLSN >= fileBeginLSN + logFile.size()) {
-                //log is not in the current log file
-                logFile.close();
-                getLogFile();
-                fillLogReadBuffer();
             } else if (readLSN < bufferBeginLSN || readLSN >= bufferBeginLSN + readBuffer.limit()) {
                 //log is not in the current read buffer
                 fillLogReadBuffer();
@@ -265,7 +250,7 @@ public class LogReader implements ILogReader {
                 case TRUNCATED: {
                     if (!fillLogReadBuffer()) {
                         throw new IllegalStateException(
-                                "Could not read LSN(" + LSN + ") from log file id " + logFile.getLogFileId());
+                                "Could not read LSN(" + lsn + ") from log file id " + logFile.getLogFileId());
                     }
                     //now read the complete log record
                     continue;
@@ -285,8 +270,10 @@ public class LogReader implements ILogReader {
         return logRecord;
     }
 
-    private void getLogFile() throws ACIDException {
+    private void getLogFile() {
         try {
+            // close existing file (if any) before opening another one
+            close();
             logFile = logMgr.getLogFile(readLSN);
             fileBeginLSN = logFile.getFileBeginLSN();
         } catch (IOException e) {
@@ -295,10 +282,11 @@ public class LogReader implements ILogReader {
     }
 
     @Override
-    public void close() throws ACIDException {
+    public void close() {
         try {
             if (logFile != null) {
                 logFile.close();
+                logFile = null;
             }
         } catch (IOException e) {
             throw new ACIDException(e);