You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2021/08/18 02:09:58 UTC

[iotdb] 01/01: new ExclusiveWriteLogNode

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch new_wal
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 52763281484e21ddab9f71455def7907b28add2d
Author: qiaojialin <64...@qq.com>
AuthorDate: Wed Aug 18 10:09:04 2021 +0800

    new ExclusiveWriteLogNode
---
 .../db/writelog/node/ExclusiveWriteLogNode.java    | 86 ++++++++--------------
 1 file changed, 32 insertions(+), 54 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java b/server/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
index c947963..56fafe2 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.writelog.node;
 
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.directories.DirectoryManager;
@@ -28,7 +29,6 @@ import org.apache.iotdb.db.writelog.io.ILogWriter;
 import org.apache.iotdb.db.writelog.io.LogWriter;
 import org.apache.iotdb.db.writelog.io.MultiFileLogReader;
 
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,7 +42,7 @@ import java.nio.channels.ClosedChannelException;
 import java.util.Arrays;
 import java.util.Comparator;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReentrantLock;
 
 /** This WriteLogNode is used to manage insert ahead logs of a TsFile. */
@@ -51,33 +51,31 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
   public static final String WAL_FILE_NAME = "wal";
   private static final Logger logger = LoggerFactory.getLogger(ExclusiveWriteLogNode.class);
 
-  private String identifier;
+  private final String identifier;
 
-  private String logDirectory;
+  private final String logDirectory;
 
   private ILogWriter currentFileWriter;
 
-  private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+  private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
 
-  private ByteBuffer logBufferWorking;
-  private ByteBuffer logBufferIdle;
-  private ByteBuffer logBufferFlushing;
+  private volatile ByteBuffer logBufferWorking;
+  private volatile ByteBuffer logBufferIdle;
+  private volatile ByteBuffer logBufferFlushing;
 
   // used for the convenience of deletion
-  private ByteBuffer[] bufferArray;
+  private volatile ByteBuffer[] bufferArray;
 
   private final Object switchBufferCondition = new Object();
-  private ReentrantLock lock = new ReentrantLock();
-  private static final ExecutorService FLUSH_BUFFER_THREAD_POOL =
-      Executors.newCachedThreadPool(
-          new ThreadFactoryBuilder().setNameFormat("Flush-WAL-Thread-%d").setDaemon(true).build());
+  private final ReentrantLock lock = new ReentrantLock();
+  private final ExecutorService FLUSH_BUFFER_THREAD_POOL;
 
   private long fileId = 0;
   private long lastFlushedId = 0;
 
   private int bufferedLogNum = 0;
 
-  private boolean deleted;
+  private final AtomicBoolean deleted = new AtomicBoolean(false);
 
   /**
    * constructor of ExclusiveWriteLogNode.
@@ -91,6 +89,9 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
     if (SystemFileFactory.INSTANCE.getFile(logDirectory).mkdirs()) {
       logger.info("create the WAL folder {}.", logDirectory);
     }
+    // this.identifier contains the storage group name + tsfile name.
+    FLUSH_BUFFER_THREAD_POOL =
+        IoTDBThreadPoolFactory.newSingleThreadExecutor("Flush-WAL-Thread-" + this.identifier);
   }
 
   @Override
@@ -102,7 +103,7 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
 
   @Override
   public void write(PhysicalPlan plan) throws IOException {
-    if (deleted) {
+    if (deleted.get()) {
       throw new IOException("WAL node deleted");
     }
     lock.lock();
@@ -138,7 +139,7 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
     lock.lock();
     try {
       synchronized (switchBufferCondition) {
-        while (logBufferFlushing != null && !deleted) {
+        while (logBufferFlushing != null && !deleted.get()) {
           switchBufferCondition.wait();
         }
         switchBufferCondition.notifyAll();
@@ -151,7 +152,7 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
       }
       logger.debug("Log node {} closed successfully", identifier);
     } catch (IOException e) {
-      logger.error("Cannot close log node {} because:", identifier, e);
+      logger.warn("Cannot close log node {} because:", identifier, e);
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
       logger.warn("Waiting for current buffer being flushed interrupted");
@@ -162,7 +163,7 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
 
   @Override
   public void forceSync() {
-    if (deleted) {
+    if (deleted.get()) {
       return;
     }
     sync();
@@ -208,9 +209,10 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
     try {
       close();
       FileUtils.deleteDirectory(SystemFileFactory.INSTANCE.getFile(logDirectory));
-      deleted = true;
+      deleted.set(true);
       return this.bufferArray;
     } finally {
+      FLUSH_BUFFER_THREAD_POOL.shutdown();
       lock.unlock();
     }
   }
@@ -232,7 +234,7 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
         FileUtils.forceDelete(logFile);
         logger.info("Log node {} cleaned old file", identifier);
       } catch (IOException e) {
-        logger.error("Old log file {} of {} cannot be deleted", logFile.getName(), identifier, e);
+        logger.warn("Old log file {} of {} cannot be deleted", logFile.getName(), identifier, e);
       }
     }
   }
@@ -245,7 +247,7 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
           currentFileWriter.force();
         }
       } catch (IOException e) {
-        logger.error("Log node {} force failed.", identifier, e);
+        logger.warn("Log node {} force failed.", identifier, e);
       }
     } finally {
       lock.unlock();
@@ -261,8 +263,6 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
       switchBufferWorkingToFlushing();
       ILogWriter currWriter = getCurrentFileWriter();
       FLUSH_BUFFER_THREAD_POOL.submit(() -> flushBuffer(currWriter));
-      switchBufferIdleToWorking();
-
       bufferedLogNum = 0;
       logger.debug("Log node {} ends sync.", identifier);
     } catch (InterruptedException e) {
@@ -281,50 +281,28 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
     } catch (ClosedChannelException e) {
       // ignore
     } catch (IOException e) {
-      logger.error("Log node {} sync failed, change system mode to read-only", identifier, e);
+      logger.warn("Log node {} sync failed, change system mode to read-only", identifier, e);
       IoTDBDescriptor.getInstance().getConfig().setReadOnly(true);
       return;
     }
-    logBufferFlushing.clear();
-
-    try {
-      switchBufferFlushingToIdle();
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-    }
-  }
 
-  private void switchBufferWorkingToFlushing() throws InterruptedException {
+    // switch buffer flushing to idle and notify the sync thread
     synchronized (switchBufferCondition) {
-      while (logBufferFlushing != null && !deleted) {
-        switchBufferCondition.wait();
-      }
-      logBufferFlushing = logBufferWorking;
-      logBufferWorking = null;
+      logBufferIdle = logBufferFlushing;
+      logBufferFlushing = null;
       switchBufferCondition.notifyAll();
     }
   }
 
-  private void switchBufferIdleToWorking() throws InterruptedException {
+  private void switchBufferWorkingToFlushing() throws InterruptedException {
     synchronized (switchBufferCondition) {
-      while (logBufferIdle == null && !deleted) {
-        switchBufferCondition.wait();
+      while (logBufferFlushing != null && !deleted.get()) {
+        switchBufferCondition.wait(100);
       }
+      logBufferFlushing = logBufferWorking;
       logBufferWorking = logBufferIdle;
+      logBufferWorking.clear();
       logBufferIdle = null;
-      switchBufferCondition.notifyAll();
-    }
-  }
-
-  private void switchBufferFlushingToIdle() throws InterruptedException {
-    synchronized (switchBufferCondition) {
-      while (logBufferIdle != null && !deleted) {
-        switchBufferCondition.wait();
-      }
-      logBufferIdle = logBufferFlushing;
-      logBufferIdle.clear();
-      logBufferFlushing = null;
-      switchBufferCondition.notifyAll();
     }
   }