You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2020/11/10 10:16:27 UTC

[iotdb] 01/01: split wal buffer into 2 to reduce waiting time

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch split_wal_buffer
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit ec4440c03babf49179535dcbb5883d9dbfc46761
Author: jt <jt...@163.com>
AuthorDate: Tue Nov 10 18:15:22 2020 +0800

    split wal buffer into 2 to reduce waiting time
---
 .../iotdb/db/engine/flush/MemTableFlushTask.java   |   2 +-
 .../db/engine/storagegroup/TsFileProcessor.java    |   1 +
 .../db/exception/TsFileProcessorException.java     |   2 +-
 .../org/apache/iotdb/db/utils/SerializeUtils.java  |   6 +-
 .../org/apache/iotdb/db/writelog/io/LogWriter.java |   9 +-
 .../db/writelog/node/ExclusiveWriteLogNode.java    | 155 ++++++++++++++++-----
 .../apache/iotdb/db/writelog/WriteLogNodeTest.java |  11 +-
 7 files changed, 145 insertions(+), 41 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index e9f4c92..1962cfe 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@ -222,7 +222,7 @@ public class MemTableFlushTask {
           TimeUnit.MILLISECONDS.sleep(10);
         } catch (@SuppressWarnings("squid:S2142") InterruptedException e) {
           logger.error("Storage group {} memtable {}, io task is interrupted.", storageGroup
-              , memTable.getVersion(), e);
+              , memTable.getVersion());
           // generally it is because the thread pool is shutdown so the task should be aborted
           break;
         }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index b810774..c05bb25 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -631,6 +631,7 @@ public class TsFileProcessor {
           storageGroupName, tsFileResource.getTsFile().getName(), tobeFlushed.getMemTableMap());
       return;
     }
+    logger.warn("Before flush listeners are called");
 
     for (FlushListener flushListener : flushListeners) {
       flushListener.onFlushStart(tobeFlushed);
diff --git a/server/src/main/java/org/apache/iotdb/db/exception/TsFileProcessorException.java b/server/src/main/java/org/apache/iotdb/db/exception/TsFileProcessorException.java
index 5ff1e3b..4c7e72c 100644
--- a/server/src/main/java/org/apache/iotdb/db/exception/TsFileProcessorException.java
+++ b/server/src/main/java/org/apache/iotdb/db/exception/TsFileProcessorException.java
@@ -29,6 +29,6 @@ public class TsFileProcessorException extends IoTDBException {
   }
 
   public TsFileProcessorException(Exception exception) {
-    super(exception.getMessage(), TSStatusCode.TSFILE_PROCESSOR_ERROR.getStatusCode());
+    super(exception, TSStatusCode.TSFILE_PROCESSOR_ERROR.getStatusCode());
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/SerializeUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/SerializeUtils.java
index ea6234d..0150f44 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/SerializeUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/SerializeUtils.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
 import org.apache.iotdb.tsfile.read.common.BatchData;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.filter.operator.In;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
@@ -509,11 +510,14 @@ public class SerializeUtils {
     int idLastPos = str.indexOf(',', idFirstPos);
     int dataPortFirstPos = str.indexOf("dataPort:", idLastPos) + "dataPort:".length();
     int dataPortLastPos = str.indexOf(')', dataPortFirstPos);
+    int clientPortFirstPos = str.indexOf("clientPort:", dataPortLastPos) + "clientPort:".length();
+    int clientPortLastPos = str.indexOf(')', clientPortFirstPos);
 
     String ip = str.substring(ipFirstPos, ipLastPos);
     int metaPort = Integer.parseInt(str.substring(metaPortFirstPos, metaPortLastPos));
     int id = Integer.parseInt(str.substring(idFirstPos, idLastPos));
     int dataPort = Integer.parseInt(str.substring(dataPortFirstPos, dataPortLastPos));
-    return new Node(ip, metaPort, id, dataPort);
+    int clientPort = Integer.parseInt(str.substring(clientPortFirstPos, clientPortLastPos));
+    return new Node(ip, metaPort, id, dataPort, clientPort);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/io/LogWriter.java b/server/src/main/java/org/apache/iotdb/db/writelog/io/LogWriter.java
index c270a16..0ba9f74 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/io/LogWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/io/LogWriter.java
@@ -84,7 +84,7 @@ public class LogWriter implements ILogWriter {
 
   @Override
   public void force() throws IOException {
-    if (channel != null) {
+    if (channel != null && channel.isOpen()) {
       channel.force(true);
     }
   }
@@ -98,4 +98,11 @@ public class LogWriter implements ILogWriter {
       channel = null;
     }
   }
+
+  @Override
+  public String toString() {
+    return "LogWriter{" +
+        "logFile=" + logFile +
+        '}';
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java b/server/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
index 680ac65..f37da85 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
@@ -18,14 +18,18 @@
  */
 package org.apache.iotdb.db.writelog.node;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.io.File;
 import java.io.IOException;
 import java.nio.BufferOverflowException;
 import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
 import java.util.Arrays;
 import java.util.Comparator;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.locks.ReentrantLock;
 import org.apache.commons.io.FileUtils;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -55,16 +59,25 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
 
   private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
 
-  private ByteBuffer logBuffer = ByteBuffer
-      .allocate(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize());
+  private ByteBuffer logBufferWorking = ByteBuffer
+      .allocate(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+  private ByteBuffer logBufferIdle = ByteBuffer
+      .allocate(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+  private ByteBuffer logBufferFlushing;
 
-  private ReadWriteLock lock = new ReentrantReadWriteLock();
+  private final Object switchBufferCondition = new Object();
+  private ReentrantLock lock = new ReentrantLock();
+  private static final ExecutorService FLUSH_BUFFER_THREAD_POOL =
+      Executors.newCachedThreadPool(
+          new ThreadFactoryBuilder().setNameFormat("Flush-WAL-Thread-%d").setDaemon(true).build());
 
   private long fileId = 0;
   private long lastFlushedId = 0;
 
   private int bufferedLogNum = 0;
 
+  private boolean deleted;
+
   /**
    * constructor of ExclusiveWriteLogNode.
    *
@@ -75,13 +88,16 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
     this.logDirectory =
         DirectoryManager.getInstance().getWALFolder() + File.separator + this.identifier;
     if (SystemFileFactory.INSTANCE.getFile(logDirectory).mkdirs()) {
-      logger.info("create the WAL folder {}." + logDirectory);
+      logger.info("create the WAL folder {}", logDirectory);
     }
   }
 
   @Override
   public void write(PhysicalPlan plan) throws IOException {
-    lock.writeLock().lock();
+    if (deleted) {
+      throw new IOException("WAL node deleted");
+    }
+    lock.lock();
     try {
       putLog(plan);
       if (bufferedLogNum >= config.getFlushWalThreshold()) {
@@ -91,43 +107,57 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
       throw new IOException(
           "Log cannot fit into the buffer, please increase wal_buffer_size", e);
     } finally {
-      lock.writeLock().unlock();
+      lock.unlock();
     }
   }
 
   private void putLog(PhysicalPlan plan) {
-    logBuffer.mark();
+    logBufferWorking.mark();
     try {
-      plan.serialize(logBuffer);
+      plan.serialize(logBufferWorking);
     } catch (BufferOverflowException e) {
       logger.info("WAL BufferOverflow !");
-      logBuffer.reset();
+      logBufferWorking.reset();
       sync();
-      plan.serialize(logBuffer);
+      plan.serialize(logBufferWorking);
     }
-    bufferedLogNum ++;
+    bufferedLogNum++;
   }
 
   @Override
   public void close() {
     sync();
     forceWal();
-    lock.writeLock().lock();
+    lock.lock();
     try {
+      synchronized (switchBufferCondition) {
+        while (logBufferFlushing != null && !deleted) {
+          switchBufferCondition.wait();
+        }
+        switchBufferCondition.notifyAll();
+      }
+
       if (this.currentFileWriter != null) {
         this.currentFileWriter.close();
+        logger.warn("WAL file {} is closed", currentFileWriter);
         this.currentFileWriter = null;
       }
       logger.debug("Log node {} closed successfully", identifier);
     } catch (IOException e) {
       logger.error("Cannot close log node {} because:", identifier, e);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      logger.warn("Waiting for current buffer being flushed interrupted");
     } finally {
-      lock.writeLock().unlock();
+      lock.unlock();
     }
   }
 
   @Override
   public void forceSync() {
+    if (deleted) {
+      return;
+    }
     sync();
     forceWal();
   }
@@ -135,23 +165,24 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
 
   @Override
   public void notifyStartFlush() {
-    lock.writeLock().lock();
+    lock.lock();
     try {
       close();
       nextFileWriter();
     } finally {
-      lock.writeLock().unlock();
+      lock.unlock();
     }
   }
 
   @Override
   public void notifyEndFlush() {
-    lock.writeLock().lock();
+    lock.lock();
     try {
-      File logFile = SystemFileFactory.INSTANCE.getFile(logDirectory, WAL_FILE_NAME + ++lastFlushedId);
+      File logFile = SystemFileFactory.INSTANCE
+          .getFile(logDirectory, WAL_FILE_NAME + ++lastFlushedId);
       discard(logFile);
     } finally {
-      lock.writeLock().unlock();
+      lock.unlock();
     }
   }
 
@@ -167,13 +198,13 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
 
   @Override
   public void delete() throws IOException {
-    lock.writeLock().lock();
+    lock.lock();
     try {
-      logBuffer.clear();
       close();
       FileUtils.deleteDirectory(SystemFileFactory.INSTANCE.getFile(logDirectory));
+      deleted = true;
     } finally {
-      lock.writeLock().unlock();
+      lock.unlock();
     }
   }
 
@@ -199,7 +230,7 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
   }
 
   private void forceWal() {
-    lock.writeLock().lock();
+    lock.lock();
     try {
       try {
         if (currentFileWriter != null) {
@@ -209,28 +240,81 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
         logger.error("Log node {} force failed.", identifier, e);
       }
     } finally {
-      lock.writeLock().unlock();
+      lock.unlock();
     }
   }
 
   private void sync() {
-    lock.writeLock().lock();
+    lock.lock();
     try {
       if (bufferedLogNum == 0) {
         return;
       }
-      try {
-        getCurrentFileWriter().write(logBuffer);
-      } catch (IOException e) {
-        logger.error("Log node {} sync failed, change system mode to read-only", identifier, e);
-        IoTDBDescriptor.getInstance().getConfig().setReadOnly(true);
-        return;
-      }
-      logBuffer.clear();
+      switchBufferWorkingToFlushing();
+      ILogWriter currWriter = getCurrentFileWriter();
+      FLUSH_BUFFER_THREAD_POOL.submit(() -> flushBuffer(currWriter));
+      switchBufferIdleToWorking();
+
       bufferedLogNum = 0;
       logger.debug("Log node {} ends sync.", identifier);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      logger.warn("Waiting for available buffer interrupted");
     } finally {
-      lock.writeLock().unlock();
+      lock.unlock();
+    }
+  }
+
+  private void flushBuffer(ILogWriter writer) {
+    try {
+      writer.write(logBufferFlushing);
+    } catch (ClosedChannelException e) {
+      // ignore
+    } catch (IOException e) {
+      logger.error("Log node {} sync failed, change system mode to read-only", identifier, e);
+      IoTDBDescriptor.getInstance().getConfig().setReadOnly(true);
+      return;
+    }
+    logBufferFlushing.clear();
+
+    try {
+      switchBufferFlushingToIdle();
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  private void switchBufferWorkingToFlushing() throws InterruptedException {
+    synchronized (switchBufferCondition) {
+      while (logBufferFlushing != null && !deleted) {
+        switchBufferCondition.wait();
+      }
+      logBufferFlushing = logBufferWorking;
+      logBufferWorking = null;
+      switchBufferCondition.notifyAll();
+    }
+  }
+
+  private void switchBufferIdleToWorking() throws InterruptedException {
+    synchronized (switchBufferCondition) {
+      while (logBufferIdle == null && !deleted) {
+        switchBufferCondition.wait();
+      }
+      logBufferWorking = logBufferIdle;
+      logBufferIdle = null;
+      switchBufferCondition.notifyAll();
+    }
+  }
+
+  private void switchBufferFlushingToIdle() throws InterruptedException {
+    synchronized (switchBufferCondition) {
+      while (logBufferIdle != null && !deleted) {
+        switchBufferCondition.wait();
+      }
+      logBufferIdle = logBufferFlushing;
+      logBufferIdle.clear();
+      logBufferFlushing = null;
+      switchBufferCondition.notifyAll();
     }
   }
 
@@ -247,6 +331,7 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
     if (newFile.getParentFile().mkdirs()) {
       logger.info("create WAL parent folder {}.", newFile.getParent());
     }
+    logger.warn("WAL file {} is opened", newFile);
     currentFileWriter = new LogWriter(newFile);
   }
 
diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java
index e779e2d..fe5610c 100644
--- a/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java
@@ -180,9 +180,13 @@ public class WriteLogNodeTest {
 
     File walFile = new File(
         config.getWalDir() + File.separator + "root.logTestDevice" + File.separator + "wal1");
-    assertTrue(!walFile.exists());
+    assertFalse(walFile.exists());
 
     logNode.write(deletePlan);
+    System.out.println("Waiting for wal file to be created");
+    while (!walFile.exists()) {
+
+    }
     assertTrue(walFile.exists());
 
     logNode.delete();
@@ -209,7 +213,10 @@ public class WriteLogNodeTest {
 
     File walFile = new File(
         config.getWalDir() + File.separator + "root.logTestDevice" + File.separator + "wal1");
-    assertTrue(walFile.exists());
+    System.out.println("Waiting for wal to be created");
+    while (!walFile.exists()) {
+
+    }
 
     assertTrue(new File(logNode.getLogDirectory()).exists());
     logNode.delete();