You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2020/11/10 10:16:26 UTC

[iotdb] branch split_wal_buffer created (now ec4440c)

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a change to branch split_wal_buffer
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at ec4440c  split wal buffer into 2 to reduce waiting time

This branch includes the following new commits:

     new ec4440c  split wal buffer into 2 to reduce waiting time

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: split wal buffer into 2 to reduce waiting time

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch split_wal_buffer
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit ec4440c03babf49179535dcbb5883d9dbfc46761
Author: jt <jt...@163.com>
AuthorDate: Tue Nov 10 18:15:22 2020 +0800

    split wal buffer into 2 to reduce waiting time
---
 .../iotdb/db/engine/flush/MemTableFlushTask.java   |   2 +-
 .../db/engine/storagegroup/TsFileProcessor.java    |   1 +
 .../db/exception/TsFileProcessorException.java     |   2 +-
 .../org/apache/iotdb/db/utils/SerializeUtils.java  |   6 +-
 .../org/apache/iotdb/db/writelog/io/LogWriter.java |   9 +-
 .../db/writelog/node/ExclusiveWriteLogNode.java    | 155 ++++++++++++++++-----
 .../apache/iotdb/db/writelog/WriteLogNodeTest.java |  11 +-
 7 files changed, 145 insertions(+), 41 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index e9f4c92..1962cfe 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@ -222,7 +222,7 @@ public class MemTableFlushTask {
           TimeUnit.MILLISECONDS.sleep(10);
         } catch (@SuppressWarnings("squid:S2142") InterruptedException e) {
           logger.error("Storage group {} memtable {}, io task is interrupted.", storageGroup
-              , memTable.getVersion(), e);
+              , memTable.getVersion());
           // generally it is because the thread pool is shutdown so the task should be aborted
           break;
         }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index b810774..c05bb25 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -631,6 +631,7 @@ public class TsFileProcessor {
           storageGroupName, tsFileResource.getTsFile().getName(), tobeFlushed.getMemTableMap());
       return;
     }
+    logger.warn("Before flush listeners are called");
 
     for (FlushListener flushListener : flushListeners) {
       flushListener.onFlushStart(tobeFlushed);
diff --git a/server/src/main/java/org/apache/iotdb/db/exception/TsFileProcessorException.java b/server/src/main/java/org/apache/iotdb/db/exception/TsFileProcessorException.java
index 5ff1e3b..4c7e72c 100644
--- a/server/src/main/java/org/apache/iotdb/db/exception/TsFileProcessorException.java
+++ b/server/src/main/java/org/apache/iotdb/db/exception/TsFileProcessorException.java
@@ -29,6 +29,6 @@ public class TsFileProcessorException extends IoTDBException {
   }
 
   public TsFileProcessorException(Exception exception) {
-    super(exception.getMessage(), TSStatusCode.TSFILE_PROCESSOR_ERROR.getStatusCode());
+    super(exception, TSStatusCode.TSFILE_PROCESSOR_ERROR.getStatusCode());
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/SerializeUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/SerializeUtils.java
index ea6234d..0150f44 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/SerializeUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/SerializeUtils.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
 import org.apache.iotdb.tsfile.read.common.BatchData;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.filter.operator.In;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
@@ -509,11 +510,14 @@ public class SerializeUtils {
     int idLastPos = str.indexOf(',', idFirstPos);
     int dataPortFirstPos = str.indexOf("dataPort:", idLastPos) + "dataPort:".length();
     int dataPortLastPos = str.indexOf(')', dataPortFirstPos);
+    int clientPortFirstPos = str.indexOf("clientPort:", dataPortLastPos) + "clientPort:".length();
+    int clientPortLastPos = str.indexOf(')', clientPortFirstPos);
 
     String ip = str.substring(ipFirstPos, ipLastPos);
     int metaPort = Integer.parseInt(str.substring(metaPortFirstPos, metaPortLastPos));
     int id = Integer.parseInt(str.substring(idFirstPos, idLastPos));
     int dataPort = Integer.parseInt(str.substring(dataPortFirstPos, dataPortLastPos));
-    return new Node(ip, metaPort, id, dataPort);
+    int clientPort = Integer.parseInt(str.substring(clientPortFirstPos, clientPortLastPos));
+    return new Node(ip, metaPort, id, dataPort, clientPort);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/io/LogWriter.java b/server/src/main/java/org/apache/iotdb/db/writelog/io/LogWriter.java
index c270a16..0ba9f74 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/io/LogWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/io/LogWriter.java
@@ -84,7 +84,7 @@ public class LogWriter implements ILogWriter {
 
   @Override
   public void force() throws IOException {
-    if (channel != null) {
+    if (channel != null && channel.isOpen()) {
       channel.force(true);
     }
   }
@@ -98,4 +98,11 @@ public class LogWriter implements ILogWriter {
       channel = null;
     }
   }
+
+  @Override
+  public String toString() {
+    return "LogWriter{" +
+        "logFile=" + logFile +
+        '}';
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java b/server/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
index 680ac65..f37da85 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
@@ -18,14 +18,18 @@
  */
 package org.apache.iotdb.db.writelog.node;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.io.File;
 import java.io.IOException;
 import java.nio.BufferOverflowException;
 import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
 import java.util.Arrays;
 import java.util.Comparator;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.locks.ReentrantLock;
 import org.apache.commons.io.FileUtils;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -55,16 +59,25 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
 
   private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
 
-  private ByteBuffer logBuffer = ByteBuffer
-      .allocate(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize());
+  private ByteBuffer logBufferWorking = ByteBuffer
+      .allocate(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+  private ByteBuffer logBufferIdle = ByteBuffer
+      .allocate(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+  private ByteBuffer logBufferFlushing;
 
-  private ReadWriteLock lock = new ReentrantReadWriteLock();
+  private final Object switchBufferCondition = new Object();
+  private ReentrantLock lock = new ReentrantLock();
+  private static final ExecutorService FLUSH_BUFFER_THREAD_POOL =
+      Executors.newCachedThreadPool(
+          new ThreadFactoryBuilder().setNameFormat("Flush-WAL-Thread-%d").setDaemon(true).build());
 
   private long fileId = 0;
   private long lastFlushedId = 0;
 
   private int bufferedLogNum = 0;
 
+  private boolean deleted;
+
   /**
    * constructor of ExclusiveWriteLogNode.
    *
@@ -75,13 +88,16 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
     this.logDirectory =
         DirectoryManager.getInstance().getWALFolder() + File.separator + this.identifier;
     if (SystemFileFactory.INSTANCE.getFile(logDirectory).mkdirs()) {
-      logger.info("create the WAL folder {}." + logDirectory);
+      logger.info("create the WAL folder {}", logDirectory);
     }
   }
 
   @Override
   public void write(PhysicalPlan plan) throws IOException {
-    lock.writeLock().lock();
+    if (deleted) {
+      throw new IOException("WAL node deleted");
+    }
+    lock.lock();
     try {
       putLog(plan);
       if (bufferedLogNum >= config.getFlushWalThreshold()) {
@@ -91,43 +107,57 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
       throw new IOException(
           "Log cannot fit into the buffer, please increase wal_buffer_size", e);
     } finally {
-      lock.writeLock().unlock();
+      lock.unlock();
     }
   }
 
   private void putLog(PhysicalPlan plan) {
-    logBuffer.mark();
+    logBufferWorking.mark();
     try {
-      plan.serialize(logBuffer);
+      plan.serialize(logBufferWorking);
     } catch (BufferOverflowException e) {
       logger.info("WAL BufferOverflow !");
-      logBuffer.reset();
+      logBufferWorking.reset();
       sync();
-      plan.serialize(logBuffer);
+      plan.serialize(logBufferWorking);
     }
-    bufferedLogNum ++;
+    bufferedLogNum++;
   }
 
   @Override
   public void close() {
     sync();
     forceWal();
-    lock.writeLock().lock();
+    lock.lock();
     try {
+      synchronized (switchBufferCondition) {
+        while (logBufferFlushing != null && !deleted) {
+          switchBufferCondition.wait();
+        }
+        switchBufferCondition.notifyAll();
+      }
+
       if (this.currentFileWriter != null) {
         this.currentFileWriter.close();
+        logger.warn("WAL file {} is closed", currentFileWriter);
         this.currentFileWriter = null;
       }
       logger.debug("Log node {} closed successfully", identifier);
     } catch (IOException e) {
       logger.error("Cannot close log node {} because:", identifier, e);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      logger.warn("Waiting for current buffer being flushed interrupted");
     } finally {
-      lock.writeLock().unlock();
+      lock.unlock();
     }
   }
 
   @Override
   public void forceSync() {
+    if (deleted) {
+      return;
+    }
     sync();
     forceWal();
   }
@@ -135,23 +165,24 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
 
   @Override
   public void notifyStartFlush() {
-    lock.writeLock().lock();
+    lock.lock();
     try {
       close();
       nextFileWriter();
     } finally {
-      lock.writeLock().unlock();
+      lock.unlock();
     }
   }
 
   @Override
   public void notifyEndFlush() {
-    lock.writeLock().lock();
+    lock.lock();
     try {
-      File logFile = SystemFileFactory.INSTANCE.getFile(logDirectory, WAL_FILE_NAME + ++lastFlushedId);
+      File logFile = SystemFileFactory.INSTANCE
+          .getFile(logDirectory, WAL_FILE_NAME + ++lastFlushedId);
       discard(logFile);
     } finally {
-      lock.writeLock().unlock();
+      lock.unlock();
     }
   }
 
@@ -167,13 +198,13 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
 
   @Override
   public void delete() throws IOException {
-    lock.writeLock().lock();
+    lock.lock();
     try {
-      logBuffer.clear();
       close();
       FileUtils.deleteDirectory(SystemFileFactory.INSTANCE.getFile(logDirectory));
+      deleted = true;
     } finally {
-      lock.writeLock().unlock();
+      lock.unlock();
     }
   }
 
@@ -199,7 +230,7 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
   }
 
   private void forceWal() {
-    lock.writeLock().lock();
+    lock.lock();
     try {
       try {
         if (currentFileWriter != null) {
@@ -209,28 +240,81 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
         logger.error("Log node {} force failed.", identifier, e);
       }
     } finally {
-      lock.writeLock().unlock();
+      lock.unlock();
     }
   }
 
   private void sync() {
-    lock.writeLock().lock();
+    lock.lock();
     try {
       if (bufferedLogNum == 0) {
         return;
       }
-      try {
-        getCurrentFileWriter().write(logBuffer);
-      } catch (IOException e) {
-        logger.error("Log node {} sync failed, change system mode to read-only", identifier, e);
-        IoTDBDescriptor.getInstance().getConfig().setReadOnly(true);
-        return;
-      }
-      logBuffer.clear();
+      switchBufferWorkingToFlushing();
+      ILogWriter currWriter = getCurrentFileWriter();
+      FLUSH_BUFFER_THREAD_POOL.submit(() -> flushBuffer(currWriter));
+      switchBufferIdleToWorking();
+
       bufferedLogNum = 0;
       logger.debug("Log node {} ends sync.", identifier);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      logger.warn("Waiting for available buffer interrupted");
     } finally {
-      lock.writeLock().unlock();
+      lock.unlock();
+    }
+  }
+
+  private void flushBuffer(ILogWriter writer) {
+    try {
+      writer.write(logBufferFlushing);
+    } catch (ClosedChannelException e) {
+      // ignore
+    } catch (IOException e) {
+      logger.error("Log node {} sync failed, change system mode to read-only", identifier, e);
+      IoTDBDescriptor.getInstance().getConfig().setReadOnly(true);
+      return;
+    }
+    logBufferFlushing.clear();
+
+    try {
+      switchBufferFlushingToIdle();
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  private void switchBufferWorkingToFlushing() throws InterruptedException {
+    synchronized (switchBufferCondition) {
+      while (logBufferFlushing != null && !deleted) {
+        switchBufferCondition.wait();
+      }
+      logBufferFlushing = logBufferWorking;
+      logBufferWorking = null;
+      switchBufferCondition.notifyAll();
+    }
+  }
+
+  private void switchBufferIdleToWorking() throws InterruptedException {
+    synchronized (switchBufferCondition) {
+      while (logBufferIdle == null && !deleted) {
+        switchBufferCondition.wait();
+      }
+      logBufferWorking = logBufferIdle;
+      logBufferIdle = null;
+      switchBufferCondition.notifyAll();
+    }
+  }
+
+  private void switchBufferFlushingToIdle() throws InterruptedException {
+    synchronized (switchBufferCondition) {
+      while (logBufferIdle != null && !deleted) {
+        switchBufferCondition.wait();
+      }
+      logBufferIdle = logBufferFlushing;
+      logBufferIdle.clear();
+      logBufferFlushing = null;
+      switchBufferCondition.notifyAll();
     }
   }
 
@@ -247,6 +331,7 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
     if (newFile.getParentFile().mkdirs()) {
       logger.info("create WAL parent folder {}.", newFile.getParent());
     }
+    logger.warn("WAL file {} is opened", newFile);
     currentFileWriter = new LogWriter(newFile);
   }
 
diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java
index e779e2d..fe5610c 100644
--- a/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java
@@ -180,9 +180,13 @@ public class WriteLogNodeTest {
 
     File walFile = new File(
         config.getWalDir() + File.separator + "root.logTestDevice" + File.separator + "wal1");
-    assertTrue(!walFile.exists());
+    assertFalse(walFile.exists());
 
     logNode.write(deletePlan);
+    System.out.println("Waiting for wal file to be created");
+    while (!walFile.exists()) {
+
+    }
     assertTrue(walFile.exists());
 
     logNode.delete();
@@ -209,7 +213,10 @@ public class WriteLogNodeTest {
 
     File walFile = new File(
         config.getWalDir() + File.separator + "root.logTestDevice" + File.separator + "wal1");
-    assertTrue(walFile.exists());
+    System.out.println("Waiting for wal to be created");
+    while (!walFile.exists()) {
+
+    }
 
     assertTrue(new File(logNode.getLogDirectory()).exists());
     logNode.delete();