You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/06/28 09:41:09 UTC

[incubator-iotdb] 01/02: fix a bug of don't update endTimeMap when normal close UFP

This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit faa9e8ec4a99217b4d5550bd15aa3598c4212267
Author: lta <li...@163.com>
AuthorDate: Fri Jun 28 17:23:24 2019 +0800

    fix a bug of don't update endTimeMap when normal close UFP
---
 .../db/engine/filenodeV2/FileNodeProcessorV2.java  | 14 ++++++-----
 .../db/engine/filenodeV2/TsFileResourceV2.java     | 27 ++++++++++++----------
 .../filenodeV2/UnsealedTsFileProcessorV2.java      | 24 ++++++++++++-------
 .../engine/filenodeV2/FileNodeProcessorV2Test.java |  2 +-
 .../filenodeV2/UnsealedTsFileProcessorV2Test.java  |  9 ++++----
 5 files changed, 45 insertions(+), 31 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
index a98a140..0c64be3 100755
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
@@ -47,7 +47,6 @@ import org.apache.iotdb.db.exception.UnsealedTsFileProcessorException;
 import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
-import org.apache.iotdb.db.utils.datastructure.TVList;
 import org.apache.iotdb.db.utils.datastructure.TVListAllocator;
 import org.apache.iotdb.db.writelog.recover.TsFileRecoverPerformer;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
@@ -288,10 +287,11 @@ public class FileNodeProcessorV2 {
 
     // insert BufferWrite
     long start2 = System.currentTimeMillis();
-    result = unsealedTsFileProcessor.insert(insertPlan);
+    result = unsealedTsFileProcessor.insert(insertPlan, sequence);
     start2 = System.currentTimeMillis() - start2;
     if (start2 > 1000) {
-      LOGGER.info("FNP {} insert a record into unsealed file processor cost: {}", storageGroupName, start2);
+      LOGGER.info("FNP {} insert a record into unsealed file processor cost: {}", storageGroupName,
+          start2);
     }
 
     // try to update the latest time of the device of this tsRecord
@@ -355,6 +355,7 @@ public class FileNodeProcessorV2 {
     if (sequence) {
       closingSequenceTsFileProcessor.add(unsealedTsFileProcessor);
       workSequenceTsFileProcessor = null;
+      updateEndTimeMap(unsealedTsFileProcessor);
     } else {
       closingUnSequenceTsFileProcessor.add(unsealedTsFileProcessor);
       workUnSequenceTsFileProcessor = null;
@@ -389,7 +390,8 @@ public class FileNodeProcessorV2 {
     lock.writeLock().lock();
     time = System.currentTimeMillis() - time;
     if (time > 1000) {
-      LOGGER.info("storage group {} wait for write lock cost: {}", storageGroupName, time, new RuntimeException());
+      LOGGER.info("storage group {} wait for write lock cost: {}", storageGroupName, time,
+          new RuntimeException());
     }
     timerr.set(System.currentTimeMillis());
   }
@@ -398,7 +400,8 @@ public class FileNodeProcessorV2 {
     lock.writeLock().unlock();
     long time = System.currentTimeMillis() - timerr.get();
     if (time > 1000) {
-      LOGGER.info("storage group {} take lock for {}ms", storageGroupName, time, new RuntimeException());
+      LOGGER.info("storage group {} take lock for {}ms", storageGroupName, time,
+          new RuntimeException());
     }
   }
 
@@ -535,7 +538,6 @@ public class FileNodeProcessorV2 {
       if (workUnSequenceTsFileProcessor != null) {
         closingUnSequenceTsFileProcessor.add(workUnSequenceTsFileProcessor);
         workUnSequenceTsFileProcessor.asyncClose();
-        updateEndTimeMap(workUnSequenceTsFileProcessor);
         workUnSequenceTsFileProcessor = null;
       }
     } finally {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/TsFileResourceV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/TsFileResourceV2.java
index a60ccf5..d226517 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/TsFileResourceV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/TsFileResourceV2.java
@@ -22,18 +22,14 @@ import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
-import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Set;
-import org.apache.iotdb.db.engine.filenode.TsFileResource;
 import org.apache.iotdb.db.engine.modification.ModificationFile;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
@@ -51,8 +47,7 @@ public class TsFileResourceV2 {
   private Map<String, Long> startTimeMap;
 
   /**
-   * device -> end time
-   * null if it's an unsealed tsfile
+   * device -> end time null if it's an unsealed tsfile
    */
   private Map<String, Long> endTimeMap;
 
@@ -96,7 +91,8 @@ public class TsFileResourceV2 {
   }
 
   public void serialize() throws IOException {
-    try (OutputStream outputStream = new BufferedOutputStream(new FileOutputStream(file + RESOURCE_SUFFIX))){
+    try (OutputStream outputStream = new BufferedOutputStream(
+        new FileOutputStream(file + RESOURCE_SUFFIX))) {
       ReadWriteIOUtils.write(this.startTimeMap.size(), outputStream);
       for (Entry<String, Long> entry : this.startTimeMap.entrySet()) {
         ReadWriteIOUtils.write(entry.getKey(), outputStream);
@@ -111,7 +107,8 @@ public class TsFileResourceV2 {
   }
 
   public void deSerialize() throws IOException {
-    try (InputStream inputStream = new BufferedInputStream(new FileInputStream(file + RESOURCE_SUFFIX))) {
+    try (InputStream inputStream = new BufferedInputStream(
+        new FileInputStream(file + RESOURCE_SUFFIX))) {
       int size = ReadWriteIOUtils.readInt(inputStream);
       Map<String, Long> startTimes = new HashMap<>();
       for (int i = 0; i < size; i++) {
@@ -132,10 +129,16 @@ public class TsFileResourceV2 {
   }
 
   public void updateStartTime(String device, long time) {
-    startTimeMap.putIfAbsent(device, time);
-    long startTime = startTimeMap.get(device);
-    if (time < startTimeMap.get(device)) {
-      startTimeMap.put(device, startTime);
+    long startTime = startTimeMap.getOrDefault(device, Long.MAX_VALUE);
+    if (time < startTime) {
+      startTimeMap.put(device, time);
+    }
+  }
+
+  public void updateEndTime(String device, long time) {
+    long endTime = endTimeMap.getOrDefault(device, Long.MIN_VALUE);
+    if (time > endTime) {
+      endTimeMap.put(device, time);
     }
   }
 
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
index 81c3cf0..c01c33c 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
@@ -116,7 +116,7 @@ public class UnsealedTsFileProcessorV2 {
    * @param insertPlan physical plan of insertion
    * @return succeed or fail
    */
-  public boolean insert(InsertPlan insertPlan) {
+  public boolean insert(InsertPlan insertPlan, boolean sequence) {
 
     long start1 = System.currentTimeMillis();
     if (workMemTable == null) {
@@ -144,13 +144,17 @@ public class UnsealedTsFileProcessorV2 {
     }
     // update start time of this memtable
     tsFileResource.updateStartTime(insertPlan.getDeviceId(), insertPlan.getTime());
+    if (!sequence) {
+      tsFileResource.updateEndTime(insertPlan.getDeviceId(), insertPlan.getTime());
+    }
 
     long start2 = System.currentTimeMillis();
     // insert tsRecord to work memtable
     workMemTable.insert(insertPlan);
     start2 = System.currentTimeMillis() - start2;
     if (start2 > 1000) {
-      LOGGER.info("UFP {} insert into memtable cost: {}, insertPlan: {}, current data points in memtable: {}",
+      LOGGER.info(
+          "UFP {} insert into memtable cost: {}, insertPlan: {}, current data points in memtable: {}",
           storageGroupName, start2, insertPlan, workMemTable.size());
     }
 
@@ -212,8 +216,8 @@ public class UnsealedTsFileProcessorV2 {
   }
 
   /**
-   * Ensure there must be a flush thread submitted after setCloseMark() is called,
-   * therefore the close task will be executed by a flush thread.
+   * Ensure there must be a flush thread submitted after setCloseMark() is called, therefore the
+   * close task will be executed by a flush thread.
    */
   public void asyncClose() {
     flushQueryLock.writeLock().lock();
@@ -221,7 +225,9 @@ public class UnsealedTsFileProcessorV2 {
     try {
       IMemTable tmpMemTable = workMemTable == null ? new EmptyMemTable() : workMemTable;
       if (!tmpMemTable.isManagedByMemPool()) {
-        LOGGER.info("storage group {} add an empty memtable into flushing memtable list when async close", storageGroupName);
+        LOGGER.info(
+            "storage group {} add an empty memtable into flushing memtable list when async close",
+            storageGroupName);
       } else {
         LOGGER.info("storage group {} async flush a memtable when async close", storageGroupName);
       }
@@ -299,7 +305,7 @@ public class UnsealedTsFileProcessorV2 {
       writer.makeMetadataVisible();
       flushingMemTables.remove(memTable);
       LOGGER.info("flush finished, remove a memtable from flushing list, "
-              + "flushing memtable list size: {}", flushingMemTables.size());
+          + "flushing memtable list size: {}", flushingMemTables.size());
     } finally {
       flushQueryLock.writeLock().unlock();
     }
@@ -317,7 +323,8 @@ public class UnsealedTsFileProcessorV2 {
 
     // null memtable only appears when calling asyncClose()
     if (memTableToFlush.isManagedByMemPool()) {
-      MemTableFlushTaskV2 flushTask = new MemTableFlushTaskV2(memTableToFlush, fileSchema, writer, storageGroupName,
+      MemTableFlushTaskV2 flushTask = new MemTableFlushTaskV2(memTableToFlush, fileSchema, writer,
+          storageGroupName,
           this::releaseFlushedMemTableCallback);
       flushTask.flushMemTable();
       long start = System.currentTimeMillis();
@@ -397,7 +404,8 @@ public class UnsealedTsFileProcessorV2 {
 
   public void close() throws IOException {
     tsFileResource.close();
-    MultiFileLogNodeManager.getInstance().deleteNode(storageGroupName + "-" + tsFileResource.getFile().getName());
+    MultiFileLogNodeManager.getInstance()
+        .deleteNode(storageGroupName + "-" + tsFileResource.getFile().getName());
   }
 
   public void setManagedByFlushManager(boolean managedByFlushManager) {
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2Test.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2Test.java
index 6c6120f..e53c351 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2Test.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2Test.java
@@ -60,7 +60,7 @@ public class FileNodeProcessorV2Test {
       processor.insert(new InsertPlan(record));
       processor.asyncForceClose();
     }
-    
+
     processor.syncCloseFileNode();
     QueryDataSourceV2 queryDataSource = null;
     try {
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2Test.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2Test.java
index 94f5f1b..2331391 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2Test.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2Test.java
@@ -19,7 +19,8 @@
 package org.apache.iotdb.db.engine.filenodeV2;
 
 import static junit.framework.TestCase.assertTrue;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 
 import java.io.File;
 import java.io.IOException;
@@ -86,7 +87,7 @@ public class UnsealedTsFileProcessorV2Test {
     for (int i = 1; i <= 100; i++) {
       TSRecord record = new TSRecord(i, deviceId);
       record.addTuple(DataPoint.getDataPoint(dataType, measurementId, String.valueOf(i)));
-      processor.insert(new InsertPlan(record));
+      processor.insert(new InsertPlan(record), true);
     }
 
     // query data in memory
@@ -134,7 +135,7 @@ public class UnsealedTsFileProcessorV2Test {
       for (int i = 1; i <= 10; i++) {
         TSRecord record = new TSRecord(i, deviceId);
         record.addTuple(DataPoint.getDataPoint(dataType, measurementId, String.valueOf(i)));
-        processor.insert(new InsertPlan(record));
+        processor.insert(new InsertPlan(record), true);
       }
       processor.asyncFlush();
     }
@@ -177,7 +178,7 @@ public class UnsealedTsFileProcessorV2Test {
     for (int i = 1; i <= 100; i++) {
       TSRecord record = new TSRecord(i, deviceId);
       record.addTuple(DataPoint.getDataPoint(dataType, measurementId, String.valueOf(i)));
-      processor.insert(new InsertPlan(record));
+      processor.insert(new InsertPlan(record), true);
     }
 
     // query data in memory