You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2019/06/26 06:14:56 UTC

[incubator-iotdb] branch feature_async_close_tsfile updated: remove lock in PrimitiveDataListPool

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/feature_async_close_tsfile by this push:
     new f4b2aee  remove lock in PrimitiveDataListPool
     new 543a438  Merge remote-tracking branch 'origin/feature_async_close_tsfile' into feature_async_close_tsfile
f4b2aee is described below

commit f4b2aee20f6a6d713f210f87c16d25d5ec388a45
Author: qiaojialin <64...@qq.com>
AuthorDate: Wed Jun 26 11:40:07 2019 +0800

    remove lock in PrimitiveDataListPool
---
 .../filenodeV2/UnsealedTsFileProcessorV2.java      | 13 +++-
 .../db/engine/memtable/MemTableFlushTaskV2.java    | 14 +---
 .../iotdb/db/qp/physical/crud/InsertPlan.java      |  9 +++
 .../iotdb/db/utils/PrimitiveDataListPool.java      | 32 +++++-----
 .../db/writelog/node/ExclusiveWriteLogNode.java    | 74 ++++++++++++++++------
 5 files changed, 92 insertions(+), 50 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
index 087d8a2..9708dc3 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
@@ -258,7 +258,9 @@ public class UnsealedTsFileProcessorV2 {
       if (workMemTable == null) {
         return;
       }
-      logNode.notifyStartFlush();
+      if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
+        logNode.notifyStartFlush();
+      }
       flushingMemTables.addLast(workMemTable);
       workMemTable.setVersion(versionController.nextVersion());
       FlushManager.getInstance().registerUnsealedTsFileProcessor(this);
@@ -303,8 +305,15 @@ public class UnsealedTsFileProcessorV2 {
       MemTableFlushTaskV2 flushTask = new MemTableFlushTaskV2(memTableToFlush, fileSchema, writer, storageGroupName,
           this::releaseFlushedMemTableCallback);
       flushTask.flushMemTable();
+      long start = System.currentTimeMillis();
       MemTablePool.getInstance().putBack(memTableToFlush, storageGroupName);
-      logNode.notifyEndFlush();
+      long elapse = System.currentTimeMillis() - start;
+      if (elapse > 1000) {
+        LOGGER.info("release a memtable cost: {}", elapse);
+      }
+      if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
+        logNode.notifyEndFlush();
+      }
       LOGGER.info("flush a memtable has finished");
     } else {
       LOGGER.info(
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
index 3a7a994..5502e30 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
@@ -128,12 +128,8 @@ public class MemTableFlushTaskV2 {
             }
           } else {
             if (task instanceof String) {
-//              LOGGER.info("Storage group {} memtable {}, issues a start flush chunk group task.",
-//                  storageGroup, memTable.getVersion());
               ioTaskQueue.add(task);
             } else if (task instanceof ChunkGroupIoTask) {
-//              LOGGER.info("Storage group {} memtable {}, issues a end flush chunk group task.",
-//                  storageGroup, memTable.getVersion());
               ioTaskQueue.add(task);
             } else {
               long starTime = System.currentTimeMillis();
@@ -150,8 +146,6 @@ public class MemTableFlushTaskV2 {
                     memTable.getVersion(), e);
                 throw new RuntimeException(e);
               }
-//              LOGGER.info("Storage group {} memtable {}, issues a write chunk task.",
-//                  storageGroup, memTable.getVersion());
               memSerializeTime += System.currentTimeMillis() - starTime;
             }
           }
@@ -195,16 +189,10 @@ public class MemTableFlushTaskV2 {
             long starTime = System.currentTimeMillis();
             try {
               if (ioTask instanceof String) {
-//                LOGGER.info("Storage group {} memtable {}, start a chunk group.", storageGroup,
-//                    memTable.getVersion());
                 tsFileIoWriter.startChunkGroup((String) ioTask);
               } else if (ioTask instanceof IChunkWriter) {
-//                LOGGER.info("Storage group {} memtable {}, writing a series to file.", storageGroup,
-//                    memTable.getVersion());
                 ((IChunkWriter) ioTask).writeToFileWriter(tsFileIoWriter);
               } else {
-//                LOGGER.info("Storage group {} memtable {}, end a chunk group.", storageGroup,
-//                    memTable.getVersion());
                 ChunkGroupIoTask endGroupTask = (ChunkGroupIoTask) ioTask;
                 tsFileIoWriter.endChunkGroup(endGroupTask.version);
                 endGroupTask.finished = true;
@@ -217,7 +205,7 @@ public class MemTableFlushTaskV2 {
             ioTime += System.currentTimeMillis() - starTime;
           }
         }
-        LOGGER.info("flushing a memtable {} in storage group {}, cost {}ms", memTable.getVersion(),
+        LOGGER.info("flushing a memtable {} in storage group {}, io cost {}ms", memTable.getVersion(),
             storageGroup, ioTime);
       } catch (RuntimeException e) {
         LOGGER.error("ioflush thread is dead", e);
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
index b017e3c..f98541e 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
@@ -31,9 +31,13 @@ import org.apache.iotdb.db.qp.physical.transfer.SystemLogOperator;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.write.record.TSRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class InsertPlan extends PhysicalPlan {
 
+  private static final Logger LOGGER = LoggerFactory.getLogger(PhysicalPlan.class);
+
   private String deviceId;
   private String[] measurements;
   private TSDataType[] dataTypes;
@@ -163,6 +167,7 @@ public class InsertPlan extends PhysicalPlan {
 
   @Override
   public void serializeTo(ByteBuffer buffer) {
+    long startTime = System.currentTimeMillis();
     int type = SystemLogOperator.INSERT;
     buffer.put((byte) type);
     buffer.put((byte) insertType);
@@ -179,6 +184,10 @@ public class InsertPlan extends PhysicalPlan {
     for (String m : values) {
       putString(buffer, m);
     }
+    long elapsed = System.currentTimeMillis() - startTime;
+    if (elapsed > 1000) {
+      LOGGER.info("Serialize an insertion costs {}ms", elapsed);
+    }
   }
 
   @Override
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/utils/PrimitiveDataListPool.java b/iotdb/src/main/java/org/apache/iotdb/db/utils/PrimitiveDataListPool.java
index 1454857..08a3e4e 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/utils/PrimitiveDataListPool.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/utils/PrimitiveDataListPool.java
@@ -18,9 +18,9 @@
  */
 package org.apache.iotdb.db.utils;
 
-import java.util.HashMap;
-import java.util.LinkedList;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Binary;
@@ -30,21 +30,21 @@ import org.apache.iotdb.tsfile.utils.Binary;
  */
 public class PrimitiveDataListPool {
 
-  private static final Map<Class, LinkedList<PrimitiveArrayListV2>> primitiveArrayListsMap = new HashMap<>();
+  private static final Map<Class, ConcurrentLinkedQueue<PrimitiveArrayListV2>> primitiveArrayListsMap = new ConcurrentHashMap<>();
 
   static {
-    primitiveArrayListsMap.put(boolean.class, new LinkedList<>());
-    primitiveArrayListsMap.put(int.class, new LinkedList<>());
-    primitiveArrayListsMap.put(long.class, new LinkedList<>());
-    primitiveArrayListsMap.put(float.class, new LinkedList<>());
-    primitiveArrayListsMap.put(double.class, new LinkedList<>());
-    primitiveArrayListsMap.put(Binary.class, new LinkedList<>());
+    primitiveArrayListsMap.put(boolean.class, new ConcurrentLinkedQueue<>());
+    primitiveArrayListsMap.put(int.class, new ConcurrentLinkedQueue<>());
+    primitiveArrayListsMap.put(long.class, new ConcurrentLinkedQueue<>());
+    primitiveArrayListsMap.put(float.class, new ConcurrentLinkedQueue<>());
+    primitiveArrayListsMap.put(double.class, new ConcurrentLinkedQueue<>());
+    primitiveArrayListsMap.put(Binary.class, new ConcurrentLinkedQueue<>());
   }
 
   private PrimitiveDataListPool() {
   }
 
-  public synchronized PrimitiveArrayListV2 getPrimitiveDataListByDataType(TSDataType dataType) {
+  public PrimitiveArrayListV2 getPrimitiveDataListByDataType(TSDataType dataType) {
     switch (dataType) {
       case BOOLEAN:
         return getPrimitiveDataList(boolean.class);
@@ -64,14 +64,14 @@ public class PrimitiveDataListPool {
   }
 
   private PrimitiveArrayListV2 getPrimitiveDataList(Class clazz) {
-    LinkedList<PrimitiveArrayListV2> primitiveArrayList = primitiveArrayListsMap.get(clazz);
-    return primitiveArrayList.isEmpty() ? new PrimitiveArrayListV2(clazz)
-        : primitiveArrayList.pollFirst();
+    ConcurrentLinkedQueue<PrimitiveArrayListV2> primitiveArrayList = primitiveArrayListsMap.get(clazz);
+    PrimitiveArrayListV2 dataList = primitiveArrayList.poll();
+    return dataList == null ? new PrimitiveArrayListV2(clazz) : dataList;
   }
 
-  public synchronized void release(PrimitiveArrayListV2 primitiveArrayList) {
+  public void release(PrimitiveArrayListV2 primitiveArrayList) {
     primitiveArrayList.reset();
-    primitiveArrayListsMap.get(primitiveArrayList.getClazz()).addLast(primitiveArrayList);
+    primitiveArrayListsMap.get(primitiveArrayList.getClazz()).offer(primitiveArrayList);
   }
 
   public static PrimitiveDataListPool getInstance() {
@@ -86,7 +86,7 @@ public class PrimitiveDataListPool {
     private static final PrimitiveDataListPool INSTANCE = new PrimitiveDataListPool();
   }
 
-  public synchronized int getPrimitiveDataListSizeByDataType(TSDataType dataType){
+  public int getPrimitiveDataListSizeByDataType(TSDataType dataType){
     switch (dataType) {
       case BOOLEAN:
         return primitiveArrayListsMap.get(boolean.class).size();
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
index 03158bb..b9b5a82 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
@@ -40,7 +40,7 @@ import org.slf4j.LoggerFactory;
 public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<ExclusiveWriteLogNode> {
 
   public static final String WAL_FILE_NAME = "wal";
-  private static final Logger logger = LoggerFactory.getLogger(ExclusiveWriteLogNode.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(ExclusiveWriteLogNode.class);
   private static int logBufferSize = IoTDBDescriptor.getInstance().getConfig().getWalBufferSize();
 
   private String identifier;
@@ -74,7 +74,12 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
 
   @Override
   public void write(PhysicalPlan plan) throws IOException {
+    long lockStartTime = System.currentTimeMillis();
     lock.writeLock().lock();
+    long lockElapsed = System.currentTimeMillis() - lockStartTime;
+    if (lockElapsed > 1000) {
+      LOGGER.info("WAL waiting for lock costs {} ms", lockElapsed);
+    }
     try {
       long start = System.currentTimeMillis();
 
@@ -85,7 +90,7 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
       }
       long elapse = System.currentTimeMillis() - start;
       if (elapse > 1000) {
-        logger.info("WAL insert cost {} ms", elapse);
+        LOGGER.info("WAL insert cost {} ms", elapse);
       }
     } catch (BufferOverflowException e) {
       throw new IOException("Log cannot fit into buffer", e);
@@ -99,6 +104,7 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
     try {
       plan.serializeTo(logBuffer);
     } catch (BufferOverflowException e) {
+      LOGGER.info("WAL BufferOverflow !");
       logBuffer.reset();
       sync();
       plan.serializeTo(logBuffer);
@@ -110,22 +116,27 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
   public void close() {
     sync();
     forceWal();
+    long lockStartTime = System.currentTimeMillis();
     lock.writeLock().lock();
+    long lockElapsed = System.currentTimeMillis() - lockStartTime;
+    if (lockElapsed > 1000) {
+      LOGGER.info("WAL waiting for lock costs {} ms", lockElapsed);
+    }
     long start = System.currentTimeMillis();
     try {
       if (this.currentFileWriter != null) {
         this.currentFileWriter.close();
         this.currentFileWriter = null;
       }
-      logger.debug("Log node {} closed successfully", identifier);
+      LOGGER.debug("Log node {} closed successfully", identifier);
     } catch (IOException e) {
-      logger.error("Cannot close log node {} because:", identifier, e);
+      LOGGER.error("Cannot close log node {} because:", identifier, e);
     } finally {
       lock.writeLock().unlock();
     }
     long elapse = System.currentTimeMillis() - start;
     if (elapse > 1000) {
-      logger.info("WAL log node {} close cost {} ms", identifier, elapse);
+      LOGGER.info("WAL log node {} close cost {} ms", identifier, elapse);
     }
   }
 
@@ -138,14 +149,19 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
 
   @Override
   public void notifyStartFlush() {
+    long lockStartTime = System.currentTimeMillis();
     lock.writeLock().lock();
+    long lockElapsed = System.currentTimeMillis() - lockStartTime;
+    if (lockElapsed > 1000) {
+      LOGGER.info("WAL waiting for lock costs {} ms", lockElapsed);
+    }
     try {
       long start = System.currentTimeMillis();
       close();
       nextFileWriter();
       long elapse = System.currentTimeMillis() - start;
       if (elapse > 1000) {
-        logger.info("WAL notifyStartFlush cost {} ms", elapse);
+        LOGGER.info("WAL notifyStartFlush cost {} ms", elapse);
       }
     } finally {
       lock.writeLock().unlock();
@@ -154,14 +170,19 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
 
   @Override
   public void notifyEndFlush() {
+    long lockStartTime = System.currentTimeMillis();
     lock.writeLock().lock();
+    long lockElapsed = System.currentTimeMillis() - lockStartTime;
+    if (lockElapsed > 1000) {
+      LOGGER.info("WAL waiting for lock costs {} ms", lockElapsed);
+    }
     try {
       long start = System.currentTimeMillis();
       File logFile = new File(logDirectory, WAL_FILE_NAME + ++lastFlushedId);
       discard(logFile);
       long elapse = System.currentTimeMillis() - start;
       if (elapse > 1000) {
-        logger.info("WAL notifyEndFlush cost {} ms", elapse);
+        LOGGER.info("WAL notifyEndFlush cost {} ms", elapse);
       }
     } finally {
       lock.writeLock().unlock();
@@ -180,7 +201,12 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
 
   @Override
   public void delete() throws IOException {
+    long lockStartTime = System.currentTimeMillis();
     lock.writeLock().lock();
+    long lockElapsed = System.currentTimeMillis() - lockStartTime;
+    if (lockElapsed > 1000) {
+      LOGGER.info("WAL waiting for lock costs {} ms", lockElapsed);
+    }
     try {
       long start = System.currentTimeMillis();
       logBuffer.clear();
@@ -188,7 +214,7 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
       FileUtils.deleteDirectory(new File(logDirectory));
       long elapse = System.currentTimeMillis() - start;
       if (elapse > 1000) {
-        logger.info("WAL delete cost {} ms", elapse);
+        LOGGER.info("WAL delete cost {} ms", elapse);
       }
     } finally {
       lock.writeLock().unlock();
@@ -205,33 +231,38 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
 
   private void discard(File logFile) {
     if (!logFile.exists()) {
-      logger.info("Log file does not exist");
+      LOGGER.info("Log file does not exist");
     } else {
       try {
         FileUtils.forceDelete(logFile);
-        logger.info("Log node {} cleaned old file", identifier);
+        LOGGER.info("Log node {} cleaned old file", identifier);
       } catch (IOException e) {
-        logger.error("Old log file {} of {} cannot be deleted", logFile.getName(), identifier, e);
+        LOGGER.error("Old log file {} of {} cannot be deleted", logFile.getName(), identifier, e);
       }
     }
   }
 
   private void forceWal() {
+    long lockStartTime = System.currentTimeMillis();
     lock.writeLock().lock();
+    long lockElapsed = System.currentTimeMillis() - lockStartTime;
+    if (lockElapsed > 1000) {
+      LOGGER.info("WAL waiting for lock costs {} ms", lockElapsed);
+    }
     try {
       long start = System.currentTimeMillis();
-      logger.debug("Log node {} starts force, {} logs to be forced", identifier, bufferedLogNum);
+      LOGGER.debug("Log node {} starts force, {} logs to be forced", identifier, bufferedLogNum);
       try {
         if (currentFileWriter != null) {
           currentFileWriter.force();
         }
       } catch (IOException e) {
-        logger.error("Log node {} force failed.", identifier, e);
+        LOGGER.error("Log node {} force failed.", identifier, e);
       }
-      logger.debug("Log node {} ends force.", identifier);
+      LOGGER.debug("Log node {} ends force.", identifier);
       long elapse = System.currentTimeMillis() - start;
       if (elapse > 1000) {
-        logger.info("WAL forceWal cost {} ms", elapse);
+        LOGGER.info("WAL forceWal cost {} ms", elapse);
       }
     } finally {
       lock.writeLock().unlock();
@@ -239,25 +270,30 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
   }
 
   private void sync() {
+    long lockStartTime = System.currentTimeMillis();
     lock.writeLock().lock();
+    long lockElapsed = System.currentTimeMillis() - lockStartTime;
+    if (lockElapsed > 1000) {
+      LOGGER.info("WAL waiting for lock costs {} ms", lockElapsed);
+    }
     try {
       long start = System.currentTimeMillis();
-      logger.debug("Log node {} starts sync, {} logs to be synced", identifier, bufferedLogNum);
+      LOGGER.debug("Log node {} starts sync, {} logs to be synced", identifier, bufferedLogNum);
       if (bufferedLogNum == 0) {
         return;
       }
       try {
         getCurrentFileWriter().write(logBuffer);
       } catch (IOException e) {
-        logger.error("Log node {} sync failed", identifier, e);
+        LOGGER.error("Log node {} sync failed", identifier, e);
       }
       logBuffer.clear();
       bufferedLogNum = 0;
 
-      logger.debug("Log node {} ends sync.", identifier);
+      LOGGER.debug("Log node {} ends sync.", identifier);
       long elapse = System.currentTimeMillis() - start;
       if (elapse > 1000) {
-        logger.info("WAL sync cost {} ms", elapse);
+        LOGGER.info("WAL sync cost {} ms", elapse);
       }
     } finally {
       lock.writeLock().unlock();