You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/06/28 09:41:08 UTC

[incubator-iotdb] branch feature_async_close_tsfile updated (cccfd82 -> 1d099e7)

This is an automated email from the ASF dual-hosted git repository.

lta pushed a change to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git.


    from cccfd82  merge and resolve conflict
     new faa9e8e  fix a bug of don't update endTimeMap when normal close UFP
     new 1d099e7  fix MemTableFlushTaskV2Test bug

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../db/engine/filenodeV2/FileNodeProcessorV2.java  | 14 ++++++-----
 .../db/engine/filenodeV2/TsFileResourceV2.java     | 27 ++++++++++++----------
 .../filenodeV2/UnsealedTsFileProcessorV2.java      | 24 ++++++++++++-------
 .../engine/filenodeV2/FileNodeProcessorV2Test.java |  3 ++-
 .../filenodeV2/UnsealedTsFileProcessorV2Test.java  |  9 ++++----
 .../db/engine/memtable/ChunkBufferPoolTest.java    |  9 +++++---
 .../engine/memtable/MemTableFlushTaskV2Test.java   |  4 ++--
 .../iotdb/db/engine/memtable/MemTablePoolTest.java |  7 +++++-
 8 files changed, 60 insertions(+), 37 deletions(-)


[incubator-iotdb] 01/02: fix a bug of don't update endTimeMap when normal close UFP

Posted by lt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit faa9e8ec4a99217b4d5550bd15aa3598c4212267
Author: lta <li...@163.com>
AuthorDate: Fri Jun 28 17:23:24 2019 +0800

    fix a bug of don't update endTimeMap when normal close UFP
---
 .../db/engine/filenodeV2/FileNodeProcessorV2.java  | 14 ++++++-----
 .../db/engine/filenodeV2/TsFileResourceV2.java     | 27 ++++++++++++----------
 .../filenodeV2/UnsealedTsFileProcessorV2.java      | 24 ++++++++++++-------
 .../engine/filenodeV2/FileNodeProcessorV2Test.java |  2 +-
 .../filenodeV2/UnsealedTsFileProcessorV2Test.java  |  9 ++++----
 5 files changed, 45 insertions(+), 31 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
index a98a140..0c64be3 100755
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
@@ -47,7 +47,6 @@ import org.apache.iotdb.db.exception.UnsealedTsFileProcessorException;
 import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
-import org.apache.iotdb.db.utils.datastructure.TVList;
 import org.apache.iotdb.db.utils.datastructure.TVListAllocator;
 import org.apache.iotdb.db.writelog.recover.TsFileRecoverPerformer;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
@@ -288,10 +287,11 @@ public class FileNodeProcessorV2 {
 
     // insert BufferWrite
     long start2 = System.currentTimeMillis();
-    result = unsealedTsFileProcessor.insert(insertPlan);
+    result = unsealedTsFileProcessor.insert(insertPlan, sequence);
     start2 = System.currentTimeMillis() - start2;
     if (start2 > 1000) {
-      LOGGER.info("FNP {} insert a record into unsealed file processor cost: {}", storageGroupName, start2);
+      LOGGER.info("FNP {} insert a record into unsealed file processor cost: {}", storageGroupName,
+          start2);
     }
 
     // try to update the latest time of the device of this tsRecord
@@ -355,6 +355,7 @@ public class FileNodeProcessorV2 {
     if (sequence) {
       closingSequenceTsFileProcessor.add(unsealedTsFileProcessor);
       workSequenceTsFileProcessor = null;
+      updateEndTimeMap(unsealedTsFileProcessor);
     } else {
       closingUnSequenceTsFileProcessor.add(unsealedTsFileProcessor);
       workUnSequenceTsFileProcessor = null;
@@ -389,7 +390,8 @@ public class FileNodeProcessorV2 {
     lock.writeLock().lock();
     time = System.currentTimeMillis() - time;
     if (time > 1000) {
-      LOGGER.info("storage group {} wait for write lock cost: {}", storageGroupName, time, new RuntimeException());
+      LOGGER.info("storage group {} wait for write lock cost: {}", storageGroupName, time,
+          new RuntimeException());
     }
     timerr.set(System.currentTimeMillis());
   }
@@ -398,7 +400,8 @@ public class FileNodeProcessorV2 {
     lock.writeLock().unlock();
     long time = System.currentTimeMillis() - timerr.get();
     if (time > 1000) {
-      LOGGER.info("storage group {} take lock for {}ms", storageGroupName, time, new RuntimeException());
+      LOGGER.info("storage group {} take lock for {}ms", storageGroupName, time,
+          new RuntimeException());
     }
   }
 
@@ -535,7 +538,6 @@ public class FileNodeProcessorV2 {
       if (workUnSequenceTsFileProcessor != null) {
         closingUnSequenceTsFileProcessor.add(workUnSequenceTsFileProcessor);
         workUnSequenceTsFileProcessor.asyncClose();
-        updateEndTimeMap(workUnSequenceTsFileProcessor);
         workUnSequenceTsFileProcessor = null;
       }
     } finally {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/TsFileResourceV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/TsFileResourceV2.java
index a60ccf5..d226517 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/TsFileResourceV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/TsFileResourceV2.java
@@ -22,18 +22,14 @@ import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
-import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Set;
-import org.apache.iotdb.db.engine.filenode.TsFileResource;
 import org.apache.iotdb.db.engine.modification.ModificationFile;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
@@ -51,8 +47,7 @@ public class TsFileResourceV2 {
   private Map<String, Long> startTimeMap;
 
   /**
-   * device -> end time
-   * null if it's an unsealed tsfile
+   * device -> end time null if it's an unsealed tsfile
    */
   private Map<String, Long> endTimeMap;
 
@@ -96,7 +91,8 @@ public class TsFileResourceV2 {
   }
 
   public void serialize() throws IOException {
-    try (OutputStream outputStream = new BufferedOutputStream(new FileOutputStream(file + RESOURCE_SUFFIX))){
+    try (OutputStream outputStream = new BufferedOutputStream(
+        new FileOutputStream(file + RESOURCE_SUFFIX))) {
       ReadWriteIOUtils.write(this.startTimeMap.size(), outputStream);
       for (Entry<String, Long> entry : this.startTimeMap.entrySet()) {
         ReadWriteIOUtils.write(entry.getKey(), outputStream);
@@ -111,7 +107,8 @@ public class TsFileResourceV2 {
   }
 
   public void deSerialize() throws IOException {
-    try (InputStream inputStream = new BufferedInputStream(new FileInputStream(file + RESOURCE_SUFFIX))) {
+    try (InputStream inputStream = new BufferedInputStream(
+        new FileInputStream(file + RESOURCE_SUFFIX))) {
       int size = ReadWriteIOUtils.readInt(inputStream);
       Map<String, Long> startTimes = new HashMap<>();
       for (int i = 0; i < size; i++) {
@@ -132,10 +129,16 @@ public class TsFileResourceV2 {
   }
 
   public void updateStartTime(String device, long time) {
-    startTimeMap.putIfAbsent(device, time);
-    long startTime = startTimeMap.get(device);
-    if (time < startTimeMap.get(device)) {
-      startTimeMap.put(device, startTime);
+    long startTime = startTimeMap.getOrDefault(device, Long.MAX_VALUE);
+    if (time < startTime) {
+      startTimeMap.put(device, time);
+    }
+  }
+
+  public void updateEndTime(String device, long time) {
+    long endTime = endTimeMap.getOrDefault(device, Long.MIN_VALUE);
+    if (time > endTime) {
+      endTimeMap.put(device, time);
     }
   }
 
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
index 81c3cf0..c01c33c 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
@@ -116,7 +116,7 @@ public class UnsealedTsFileProcessorV2 {
    * @param insertPlan physical plan of insertion
    * @return succeed or fail
    */
-  public boolean insert(InsertPlan insertPlan) {
+  public boolean insert(InsertPlan insertPlan, boolean sequence) {
 
     long start1 = System.currentTimeMillis();
     if (workMemTable == null) {
@@ -144,13 +144,17 @@ public class UnsealedTsFileProcessorV2 {
     }
     // update start time of this memtable
     tsFileResource.updateStartTime(insertPlan.getDeviceId(), insertPlan.getTime());
+    if (!sequence) {
+      tsFileResource.updateEndTime(insertPlan.getDeviceId(), insertPlan.getTime());
+    }
 
     long start2 = System.currentTimeMillis();
     // insert tsRecord to work memtable
     workMemTable.insert(insertPlan);
     start2 = System.currentTimeMillis() - start2;
     if (start2 > 1000) {
-      LOGGER.info("UFP {} insert into memtable cost: {}, insertPlan: {}, current data points in memtable: {}",
+      LOGGER.info(
+          "UFP {} insert into memtable cost: {}, insertPlan: {}, current data points in memtable: {}",
           storageGroupName, start2, insertPlan, workMemTable.size());
     }
 
@@ -212,8 +216,8 @@ public class UnsealedTsFileProcessorV2 {
   }
 
   /**
-   * Ensure there must be a flush thread submitted after setCloseMark() is called,
-   * therefore the close task will be executed by a flush thread.
+   * Ensure there must be a flush thread submitted after setCloseMark() is called, therefore the
+   * close task will be executed by a flush thread.
    */
   public void asyncClose() {
     flushQueryLock.writeLock().lock();
@@ -221,7 +225,9 @@ public class UnsealedTsFileProcessorV2 {
     try {
       IMemTable tmpMemTable = workMemTable == null ? new EmptyMemTable() : workMemTable;
       if (!tmpMemTable.isManagedByMemPool()) {
-        LOGGER.info("storage group {} add an empty memtable into flushing memtable list when async close", storageGroupName);
+        LOGGER.info(
+            "storage group {} add an empty memtable into flushing memtable list when async close",
+            storageGroupName);
       } else {
         LOGGER.info("storage group {} async flush a memtable when async close", storageGroupName);
       }
@@ -299,7 +305,7 @@ public class UnsealedTsFileProcessorV2 {
       writer.makeMetadataVisible();
       flushingMemTables.remove(memTable);
       LOGGER.info("flush finished, remove a memtable from flushing list, "
-              + "flushing memtable list size: {}", flushingMemTables.size());
+          + "flushing memtable list size: {}", flushingMemTables.size());
     } finally {
       flushQueryLock.writeLock().unlock();
     }
@@ -317,7 +323,8 @@ public class UnsealedTsFileProcessorV2 {
 
     // null memtable only appears when calling asyncClose()
     if (memTableToFlush.isManagedByMemPool()) {
-      MemTableFlushTaskV2 flushTask = new MemTableFlushTaskV2(memTableToFlush, fileSchema, writer, storageGroupName,
+      MemTableFlushTaskV2 flushTask = new MemTableFlushTaskV2(memTableToFlush, fileSchema, writer,
+          storageGroupName,
           this::releaseFlushedMemTableCallback);
       flushTask.flushMemTable();
       long start = System.currentTimeMillis();
@@ -397,7 +404,8 @@ public class UnsealedTsFileProcessorV2 {
 
   public void close() throws IOException {
     tsFileResource.close();
-    MultiFileLogNodeManager.getInstance().deleteNode(storageGroupName + "-" + tsFileResource.getFile().getName());
+    MultiFileLogNodeManager.getInstance()
+        .deleteNode(storageGroupName + "-" + tsFileResource.getFile().getName());
   }
 
   public void setManagedByFlushManager(boolean managedByFlushManager) {
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2Test.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2Test.java
index 6c6120f..e53c351 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2Test.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2Test.java
@@ -60,7 +60,7 @@ public class FileNodeProcessorV2Test {
       processor.insert(new InsertPlan(record));
       processor.asyncForceClose();
     }
-    
+
     processor.syncCloseFileNode();
     QueryDataSourceV2 queryDataSource = null;
     try {
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2Test.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2Test.java
index 94f5f1b..2331391 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2Test.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2Test.java
@@ -19,7 +19,8 @@
 package org.apache.iotdb.db.engine.filenodeV2;
 
 import static junit.framework.TestCase.assertTrue;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 
 import java.io.File;
 import java.io.IOException;
@@ -86,7 +87,7 @@ public class UnsealedTsFileProcessorV2Test {
     for (int i = 1; i <= 100; i++) {
       TSRecord record = new TSRecord(i, deviceId);
       record.addTuple(DataPoint.getDataPoint(dataType, measurementId, String.valueOf(i)));
-      processor.insert(new InsertPlan(record));
+      processor.insert(new InsertPlan(record), true);
     }
 
     // query data in memory
@@ -134,7 +135,7 @@ public class UnsealedTsFileProcessorV2Test {
       for (int i = 1; i <= 10; i++) {
         TSRecord record = new TSRecord(i, deviceId);
         record.addTuple(DataPoint.getDataPoint(dataType, measurementId, String.valueOf(i)));
-        processor.insert(new InsertPlan(record));
+        processor.insert(new InsertPlan(record), true);
       }
       processor.asyncFlush();
     }
@@ -177,7 +178,7 @@ public class UnsealedTsFileProcessorV2Test {
     for (int i = 1; i <= 100; i++) {
       TSRecord record = new TSRecord(i, deviceId);
       record.addTuple(DataPoint.getDataPoint(dataType, measurementId, String.valueOf(i)));
-      processor.insert(new InsertPlan(record));
+      processor.insert(new InsertPlan(record), true);
     }
 
     // query data in memory


[incubator-iotdb] 02/02: fix MemTableFlushTaskV2Test bug

Posted by lt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 1d099e7963b0f63c2432108b0b0510ae7b02f028
Author: lta <li...@163.com>
AuthorDate: Fri Jun 28 17:40:38 2019 +0800

    fix MemTableFlushTaskV2Test bug
---
 .../iotdb/db/engine/filenodeV2/FileNodeProcessorV2Test.java      | 1 +
 .../org/apache/iotdb/db/engine/memtable/ChunkBufferPoolTest.java | 9 ++++++---
 .../apache/iotdb/db/engine/memtable/MemTableFlushTaskV2Test.java | 4 ++--
 .../org/apache/iotdb/db/engine/memtable/MemTablePoolTest.java    | 7 ++++++-
 4 files changed, 15 insertions(+), 6 deletions(-)

diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2Test.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2Test.java
index e53c351..3f2ec2b 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2Test.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2Test.java
@@ -49,6 +49,7 @@ public class FileNodeProcessorV2Test {
   @After
   public void tearDown() throws Exception {
     EnvironmentUtils.cleanEnv();
+    EnvironmentUtils.cleanDir("data");
   }
 
 
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/memtable/ChunkBufferPoolTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/memtable/ChunkBufferPoolTest.java
index c71bb85..1efc9bb 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/memtable/ChunkBufferPoolTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/memtable/ChunkBufferPoolTest.java
@@ -31,15 +31,17 @@ import org.junit.Test;
 public class ChunkBufferPoolTest {
 
   private ConcurrentLinkedQueue<ChunkBuffer> chunkBuffers;
+  private Thread thread = new ReturnThread();
 
   @Before
   public void setUp() throws Exception {
     chunkBuffers = new ConcurrentLinkedQueue();
-    new ReturnThread().start();
+    thread.start();
   }
 
   @After
   public void tearDown() throws Exception {
+    thread.interrupt();
   }
 
   @Test
@@ -57,19 +59,20 @@ public class ChunkBufferPoolTest {
     @Override
     public void run() {
       while (true) {
+        if(isInterrupted()){
+          break;
+        }
         ChunkBuffer chunkBuffer = chunkBuffers.poll();
         if (chunkBuffer == null) {
           try {
             Thread.sleep(10);
           } catch (InterruptedException e) {
-            e.printStackTrace();
           }
           continue;
         }
         try {
           Thread.sleep(100);
         } catch (InterruptedException e) {
-          e.printStackTrace();
         }
         chunkBuffers.remove(chunkBuffer);
         ChunkBufferPool.getInstance().putBack(chunkBuffer, "test case");
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2Test.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2Test.java
index e552a42..8d9ef4a 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2Test.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2Test.java
@@ -35,7 +35,7 @@ public class MemTableFlushTaskV2Test {
 
   private NativeRestorableIOWriter writer;
   private String storageGroup = "storage_group1";
-  private String filePath = "testUnsealedTsFileProcessor.tsfile";
+  private String filePath = "data/testUnsealedTsFileProcessor.tsfile";
   private IMemTable memTable;
   private long startTime = 1;
   private long endTime = 100;
@@ -53,7 +53,7 @@ public class MemTableFlushTaskV2Test {
   @After
   public void tearDown() throws Exception {
     EnvironmentUtils.cleanEnv();
-    EnvironmentUtils.cleanDir(filePath);
+    EnvironmentUtils.cleanDir("data");
   }
 
   @Test
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/memtable/MemTablePoolTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/memtable/MemTablePoolTest.java
index 3dbf681..d43f68e 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/memtable/MemTablePoolTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/memtable/MemTablePoolTest.java
@@ -27,15 +27,17 @@ import org.junit.Test;
 public class MemTablePoolTest {
 
   private ConcurrentLinkedQueue<IMemTable> memTables;
+  private Thread thread = new ReturnThread();
 
   @Before
   public void setUp() throws Exception {
     memTables = new ConcurrentLinkedQueue();
-    new ReturnThread().start();
+    thread.start();
   }
 
   @After
   public void tearDown() throws Exception {
+    thread.interrupt();
   }
 
   @Test
@@ -66,6 +68,9 @@ public class MemTablePoolTest {
     @Override
     public void run() {
       while (true) {
+        if(isInterrupted()){
+          break;
+        }
         IMemTable memTable = memTables.poll();
         if (memTable == null) {
           try {