You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2021/02/06 01:49:40 UTC

[iotdb] branch rel/0.11 updated: Add log for better tracing (#2637)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch rel/0.11
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.11 by this push:
     new 8c0d246  Add log for better tracing (#2637)
8c0d246 is described below

commit 8c0d246d1f992c0f1814257b152a8eb90a709c18
Author: SilverNarcissus <15...@smail.nju.edu.cn>
AuthorDate: Fri Feb 5 19:49:25 2021 -0600

    Add log for better tracing (#2637)
    
    * add wal log
    
    * fix issue
    
    * fix bug
    
    * add log in memtable flush and format code
---
 .../iotdb/db/engine/flush/MemTableFlushTask.java   | 151 ++++++++++-----------
 .../iotdb/db/writelog/io/SingleFileLogReader.java  |  12 +-
 2 files changed, 82 insertions(+), 81 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index ce08298..6491db4 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@ -47,82 +47,14 @@ public class MemTableFlushTask {
   private static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
   private final Future<?> encodingTaskFuture;
   private final Future<?> ioTaskFuture;
-  private RestorableTsFileIOWriter writer;
-
   private final LinkedBlockingQueue<Object> ioTaskQueue =
       new LinkedBlockingQueue<>(config.getIoTaskQueueSizeForFlushing());
-  private final LinkedBlockingQueue<Object> encodingTaskQueue = 
+  private final LinkedBlockingQueue<Object> encodingTaskQueue =
       new LinkedBlockingQueue<>(config.getEncodingTaskQueueSizeForFlushing());
+  private RestorableTsFileIOWriter writer;
   private String storageGroup;
 
   private IMemTable memTable;
-
-
-  /**
-   * @param memTable the memTable to flush
-   * @param writer the writer where memTable will be flushed to (current tsfile writer or vm writer)
-   * @param storageGroup current storage group
-   */
-
-  public MemTableFlushTask(IMemTable memTable, RestorableTsFileIOWriter writer, String storageGroup) {
-    this.memTable = memTable;
-    this.writer = writer;
-    this.storageGroup = storageGroup;
-    this.encodingTaskFuture = subTaskPoolManager.submit(encodingTask);
-    this.ioTaskFuture = subTaskPoolManager.submit(ioTask);
-    logger.debug("flush task of Storage group {} memtable {} is created ",
-        storageGroup, memTable.getVersion());
-  }
-
-  /**
-   * the function for flushing memtable.
-   */
-  public void syncFlushMemTable()
-      throws ExecutionException, InterruptedException, IOException {
-    logger.info("The memTable size of SG {} is {}, the avg series points num in chunk is {} ",
-        storageGroup,
-        memTable.memSize(),
-        memTable.getTotalPointsNum() / memTable.getSeriesNumber());
-    long start = System.currentTimeMillis();
-    long sortTime = 0;
-
-    for (String deviceId : memTable.getMemTableMap().keySet()) {
-      encodingTaskQueue.put(new StartFlushGroupIOTask(deviceId));
-      for (String measurementId : memTable.getMemTableMap().get(deviceId).keySet()) {
-        long startTime = System.currentTimeMillis();
-        IWritableMemChunk series = memTable.getMemTableMap().get(deviceId).get(measurementId);
-        MeasurementSchema desc = series.getSchema();
-        TVList tvList = series.getSortedTVList();
-        sortTime += System.currentTimeMillis() - startTime;
-        encodingTaskQueue.put(new Pair<>(tvList, desc));
-      }
-      encodingTaskQueue.put(new EndChunkGroupIoTask());
-    }
-    encodingTaskQueue.put(new TaskEnd());
-    logger.debug(
-        "Storage group {} memtable {}, flushing into disk: data sort time cost {} ms.",
-        storageGroup, memTable.getVersion(), sortTime);
-
-    try {
-      encodingTaskFuture.get();
-    } catch (InterruptedException | ExecutionException e) {
-      ioTaskFuture.cancel(true);
-      throw e;
-    }
-
-    ioTaskFuture.get();
-
-    try {
-      writer.writeVersion(memTable.getVersion());
-    } catch (IOException e) {
-      throw new ExecutionException(e);
-    }
-
-    logger.info(
-        "Storage group {} memtable {} flushing a memtable has finished! Time consumption: {}ms",
-        storageGroup, memTable, System.currentTimeMillis() - start);
-  }
-
   private Runnable encodingTask = new Runnable() {
     private void writeOneSeries(TVList tvPairs, IChunkWriter seriesWriterImpl,
         TSDataType dataType) {
@@ -214,7 +146,6 @@ public class MemTableFlushTask {
           storageGroup, memTable.getVersion(), memSerializeTime);
     }
   };
-
   @SuppressWarnings("squid:S135")
   private Runnable ioTask = () -> {
     long ioTime = 0;
@@ -234,8 +165,7 @@ public class MemTableFlushTask {
           this.writer.startChunkGroup(((StartFlushGroupIOTask) ioMessage).deviceId);
         } else if (ioMessage instanceof TaskEnd) {
           break;
-        }
-        else if (ioMessage instanceof IChunkWriter) {
+        } else if (ioMessage instanceof IChunkWriter) {
           ChunkWriterImpl chunkWriter = (ChunkWriterImpl) ioMessage;
           chunkWriter.writeToFileWriter(this.writer);
         } else {
@@ -251,11 +181,80 @@ public class MemTableFlushTask {
     logger.debug("flushing a memtable {} in storage group {}, io cost {}ms", memTable.getVersion(),
         storageGroup, ioTime);
   };
-  
+
+  /**
+   * @param memTable     the memTable to flush
+   * @param writer       the writer where memTable will be flushed to (current tsfile writer or vm
+   *                     writer)
+   * @param storageGroup current storage group
+   */
+
+  public MemTableFlushTask(IMemTable memTable, RestorableTsFileIOWriter writer,
+      String storageGroup) {
+    this.memTable = memTable;
+    this.writer = writer;
+    this.storageGroup = storageGroup;
+    this.encodingTaskFuture = subTaskPoolManager.submit(encodingTask);
+    this.ioTaskFuture = subTaskPoolManager.submit(ioTask);
+    logger.debug("flush task of Storage group {} memtable {} is created ",
+        storageGroup, memTable.getVersion());
+  }
+
+  /**
+   * the function for flushing memtable.
+   */
+  public void syncFlushMemTable()
+      throws ExecutionException, InterruptedException, IOException {
+    logger.info(
+        "The memTable size of SG {} is {}, the avg series points num in chunk is {}, total timeseries number is {}",
+        storageGroup,
+        memTable.memSize(),
+        memTable.getTotalPointsNum() / memTable.getSeriesNumber(),
+        memTable.getSeriesNumber());
+    long start = System.currentTimeMillis();
+    long sortTime = 0;
+
+    for (String deviceId : memTable.getMemTableMap().keySet()) {
+      encodingTaskQueue.put(new StartFlushGroupIOTask(deviceId));
+      for (String measurementId : memTable.getMemTableMap().get(deviceId).keySet()) {
+        long startTime = System.currentTimeMillis();
+        IWritableMemChunk series = memTable.getMemTableMap().get(deviceId).get(measurementId);
+        MeasurementSchema desc = series.getSchema();
+        TVList tvList = series.getSortedTVList();
+        sortTime += System.currentTimeMillis() - startTime;
+        encodingTaskQueue.put(new Pair<>(tvList, desc));
+      }
+      encodingTaskQueue.put(new EndChunkGroupIoTask());
+    }
+    encodingTaskQueue.put(new TaskEnd());
+    logger.debug(
+        "Storage group {} memtable {}, flushing into disk: data sort time cost {} ms.",
+        storageGroup, memTable.getVersion(), sortTime);
+
+    try {
+      encodingTaskFuture.get();
+    } catch (InterruptedException | ExecutionException e) {
+      ioTaskFuture.cancel(true);
+      throw e;
+    }
+
+    ioTaskFuture.get();
+
+    try {
+      writer.writeVersion(memTable.getVersion());
+    } catch (IOException e) {
+      throw new ExecutionException(e);
+    }
+
+    logger.info(
+        "Storage group {} memtable {} flushing a memtable has finished! Time consumption: {}ms",
+        storageGroup, memTable, System.currentTimeMillis() - start);
+  }
+
   static class TaskEnd {
-    
+
     TaskEnd() {
-      
+
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/io/SingleFileLogReader.java b/server/src/main/java/org/apache/iotdb/db/writelog/io/SingleFileLogReader.java
index d504cbc..0e1a611 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/io/SingleFileLogReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/io/SingleFileLogReader.java
@@ -37,9 +37,8 @@ import org.slf4j.LoggerFactory;
  */
 public class SingleFileLogReader implements ILogReader {
 
-  private static final Logger logger = LoggerFactory.getLogger(SingleFileLogReader.class);
   public static final int LEAST_LOG_SIZE = 12; // size + checksum
-
+  private static final Logger logger = LoggerFactory.getLogger(SingleFileLogReader.class);
   private DataInputStream logStream;
   private String filepath;
 
@@ -91,7 +90,9 @@ public class SingleFileLogReader implements ILogReader {
       batchLogReader = new BatchLogReader(ByteBuffer.wrap(buffer));
       fileCorrupted = fileCorrupted || batchLogReader.isFileCorrupted();
     } catch (Exception e) {
-      logger.error("Cannot read more PhysicalPlans from {} because", filepath, e);
+      logger.error(
+          "Cannot read more PhysicalPlans from {}, successfully read index is {}. The reason is",
+          idx, filepath, e);
       fileCorrupted = true;
       return false;
     }
@@ -100,11 +101,11 @@ public class SingleFileLogReader implements ILogReader {
 
   @Override
   public PhysicalPlan next() {
-    if (!hasNext()){
+    if (!hasNext()) {
       throw new NoSuchElementException();
     }
 
-    idx ++;
+    idx++;
     return batchLogReader.next();
   }
 
@@ -122,6 +123,7 @@ public class SingleFileLogReader implements ILogReader {
   public void open(File logFile) throws FileNotFoundException {
     close();
     logStream = new DataInputStream(new BufferedInputStream(new FileInputStream(logFile)));
+    logger.info("WAL file: {} size is {}", logFile.getName(), logFile.length());
     this.filepath = logFile.getPath();
     idx = 0;
   }