You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2019/06/28 13:16:07 UTC

[incubator-iotdb] 01/03: fix query clone TVList bug

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 8d69b5ea79c3b0c5e402f0d7d1b62049620d045e
Author: qiaojialin <64...@qq.com>
AuthorDate: Fri Jun 28 19:45:04 2019 +0800

    fix query clone TVList bug
---
 .../db/engine/filenodeV2/FileNodeProcessorV2.java  | 25 ++++---
 .../db/engine/filenodeV2/TsFileResourceV2.java     |  4 ++
 .../filenodeV2/UnsealedTsFileProcessorV2.java      | 21 +++---
 .../iotdb/db/engine/memtable/AbstractMemTable.java | 44 +++++++++----
 .../apache/iotdb/db/engine/memtable/IMemTable.java |  2 +
 .../db/engine/memtable/MemTableFlushTaskV2.java    |  2 -
 .../db/engine/memtable/TimeValuePairSorter.java    |  2 +-
 .../db/engine/memtable/WritableMemChunkV2.java     | 76 +++++++++++-----------
 .../iotdb/db/utils/datastructure/TVList.java       |  4 +-
 .../engine/filenodeV2/FileNodeProcessorV2Test.java |  1 +
 .../UnseqSeriesReaderByTimestampTest.java          |  2 +-
 11 files changed, 109 insertions(+), 74 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
index a98a140..7615bc0 100755
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
@@ -89,12 +89,13 @@ public class FileNodeProcessorV2 {
 
   private String storageGroupName;
 
-  private final ReadWriteLock lock = new ReentrantReadWriteLock();
+  private final ReadWriteLock insertLock = new ReentrantReadWriteLock();
 
   private final Object closeFileNodeCondition = new Object();
 
   private final ThreadLocal<Long> timerr = new ThreadLocal<>();
 
+  private final ReadWriteLock closeQueryLock = new ReentrantReadWriteLock();
   /**
    * Mark whether to close file node
    */
@@ -372,7 +373,7 @@ public class FileNodeProcessorV2 {
   // TODO need a read lock, please consider the concurrency with flush manager threads.
   public QueryDataSourceV2 query(String deviceId, String measurementId)
       throws FileNodeProcessorException {
-    lock.readLock().lock();
+    insertLock.readLock().lock();
     try {
       List<TsFileResourceV2> seqResources = getFileReSourceListForQuery(sequenceFileList,
           deviceId, measurementId);
@@ -380,13 +381,13 @@ public class FileNodeProcessorV2 {
           deviceId, measurementId);
       return new QueryDataSourceV2(new Path(deviceId, measurementId), seqResources, unseqResources);
     } finally {
-      lock.readLock().unlock();
+      insertLock.readLock().unlock();
     }
   }
 
   private void writeLock() {
     long time = System.currentTimeMillis();
-    lock.writeLock().lock();
+    insertLock.writeLock().lock();
     time = System.currentTimeMillis() - time;
     if (time > 1000) {
       LOGGER.info("storage group {} wait for write lock cost: {}", storageGroupName, time, new RuntimeException());
@@ -395,7 +396,7 @@ public class FileNodeProcessorV2 {
   }
 
   private void writeUnlock() {
-    lock.writeLock().unlock();
+    insertLock.writeLock().unlock();
     long time = System.currentTimeMillis() - timerr.get();
     if (time > 1000) {
       LOGGER.info("storage group {} take lock for {}ms", storageGroupName, time, new RuntimeException());
@@ -418,8 +419,9 @@ public class FileNodeProcessorV2 {
       if (!tsFileResource.containsDevice(deviceId)) {
         continue;
       }
-      synchronized (tsFileResource) {
-        if (!tsFileResource.getStartTimeMap().isEmpty()) {
+      if (!tsFileResource.getStartTimeMap().isEmpty()) {
+        closeQueryLock.readLock().lock();
+        try {
           if (tsFileResource.isClosed()) {
             tsfileResourcesForQuery.add(tsFileResource);
           } else {
@@ -432,8 +434,12 @@ public class FileNodeProcessorV2 {
               throw new FileNodeProcessorException(e);
             }
             tsfileResourcesForQuery
-                .add(new TsFileResourceV2(tsFileResource.getFile(), pair.left, pair.right));
+                .add(new TsFileResourceV2(tsFileResource.getFile(),
+                    tsFileResource.getStartTimeMap(),
+                    tsFileResource.getEndTimeMap(), pair.left, pair.right));
           }
+        } finally {
+          closeQueryLock.readLock().unlock();
         }
       }
     }
@@ -633,10 +639,13 @@ public class FileNodeProcessorV2 {
   // TODO please consider concurrency with query and insert method.
   public void closeUnsealedTsFileProcessorCallback(
       UnsealedTsFileProcessorV2 unsealedTsFileProcessor) {
+    closeQueryLock.writeLock().lock();
     try {
       unsealedTsFileProcessor.close();
     } catch (IOException e) {
       LOGGER.error("storage group: {} close unsealedTsFileProcessor failed", storageGroupName, e);
+    } finally {
+      closeQueryLock.writeLock().unlock();
     }
     if (closingSequenceTsFileProcessor.contains(unsealedTsFileProcessor)) {
       closingSequenceTsFileProcessor.remove(unsealedTsFileProcessor);
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/TsFileResourceV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/TsFileResourceV2.java
index a60ccf5..e9366d0 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/TsFileResourceV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/TsFileResourceV2.java
@@ -88,9 +88,13 @@ public class TsFileResourceV2 {
   }
 
   public TsFileResourceV2(File file,
+      Map<String, Long> startTimeMap,
+      Map<String, Long> endTimeMap,
       ReadOnlyMemChunk readOnlyMemChunk,
       List<ChunkMetaData> chunkMetaDatas) {
     this.file = file;
+    this.startTimeMap = startTimeMap;
+    this.endTimeMap = endTimeMap;
     this.chunkMetaDatas = chunkMetaDatas;
     this.readOnlyMemChunk = readOnlyMemChunk;
   }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
index 81c3cf0..b432f3e 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
@@ -23,11 +23,13 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
+import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.memtable.EmptyMemTable;
 import org.apache.iotdb.db.engine.memtable.IMemTable;
@@ -298,6 +300,7 @@ public class UnsealedTsFileProcessorV2 {
     try {
       writer.makeMetadataVisible();
       flushingMemTables.remove(memTable);
+      memTable.release();
       LOGGER.info("flush finished, remove a memtable from flushing list, "
               + "flushing memtable list size: {}", flushingMemTables.size());
     } finally {
@@ -359,14 +362,10 @@ public class UnsealedTsFileProcessorV2 {
 
     writer.endFile(fileSchema);
 
-    flushQueryLock.writeLock().lock();
-    try {
       // remove this processor from Closing list in FileNodeProcessor, mark the TsFileResource closed, no need writer anymore
-      closeUnsealedFileCallback.accept(this);
-      writer = null;
-    } finally {
-      flushQueryLock.writeLock().unlock();
-    }
+    closeUnsealedFileCallback.accept(this);
+
+    writer = null;
 
     // delete the restore for this bufferwrite processor
     if (LOGGER.isInfoEnabled()) {
@@ -440,12 +439,12 @@ public class UnsealedTsFileProcessorV2 {
         if (!flushingMemTable.isManagedByMemPool()) {
           continue;
         }
-        memSeriesLazyMerger
-            .addMemSeries(flushingMemTable.query(deviceId, measurementId, dataType, props));
+        ReadOnlyMemChunk memChunk = flushingMemTable.query(deviceId, measurementId, dataType, props);
+        memSeriesLazyMerger.addMemSeries(memChunk);
       }
       if (workMemTable != null) {
-        memSeriesLazyMerger
-            .addMemSeries(workMemTable.query(deviceId, measurementId, dataType, props));
+        ReadOnlyMemChunk memChunk = workMemTable.query(deviceId, measurementId, dataType, props);
+        memSeriesLazyMerger.addMemSeries(memChunk);
       }
       // memSeriesLazyMerger has handled the props,
       // so we do not need to handle it again in the following readOnlyMemChunk
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
index fe3a5f5..8073704 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
@@ -24,13 +24,23 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
+import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.engine.modification.Deletion;
 import org.apache.iotdb.db.engine.modification.Modification;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.utils.MemUtils;
 import org.apache.iotdb.db.utils.TimeValuePair;
+import org.apache.iotdb.db.utils.TsPrimitiveType.TsBinary;
+import org.apache.iotdb.db.utils.TsPrimitiveType.TsBoolean;
+import org.apache.iotdb.db.utils.TsPrimitiveType.TsDouble;
+import org.apache.iotdb.db.utils.TsPrimitiveType.TsFloat;
+import org.apache.iotdb.db.utils.TsPrimitiveType.TsInt;
+import org.apache.iotdb.db.utils.TsPrimitiveType.TsLong;
+import org.apache.iotdb.db.utils.datastructure.TVList;
 import org.apache.iotdb.db.utils.datastructure.TVListAllocator;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 public abstract class AbstractMemTable implements IMemTable {
@@ -143,11 +153,20 @@ public abstract class AbstractMemTable implements IMemTable {
   @Override
   public ReadOnlyMemChunk query(String deviceId, String measurement, TSDataType dataType,
       Map<String, String> props) {
-
-    return new ReadOnlyMemChunk(dataType, getSeriesData(deviceId,
-        measurement, dataType), props);
+    TimeValuePairSorter sorter;
+    if (!checkPath(deviceId, measurement)) {
+      sorter = new WritableMemChunk(dataType);
+    } else {
+      long undeletedTime = findUndeletedTime(deviceId, measurement);
+      IWritableMemChunk memChunk = memTableMap.get(deviceId).get(measurement);
+      IWritableMemChunk chunkCopy = new WritableMemChunkV2(dataType, memChunk.getTVList().clone());
+      chunkCopy.setTimeOffset(undeletedTime);
+      sorter = chunkCopy;
+    }
+    return new ReadOnlyMemChunk(dataType, sorter, props);
   }
 
+
   private long findUndeletedTime(String deviceId, String measurement) {
     String path = deviceId + PATH_SEPARATOR + measurement;
     long undeletedTime = 0;
@@ -162,16 +181,6 @@ public abstract class AbstractMemTable implements IMemTable {
     return undeletedTime + 1;
   }
 
-  private TimeValuePairSorter getSeriesData(String deviceId, String measurement, TSDataType dataType) {
-    if (!checkPath(deviceId, measurement)) {
-      return new WritableMemChunk(dataType);
-    }
-    long undeletedTime = findUndeletedTime(deviceId, measurement);
-    IWritableMemChunk memChunk = memTableMap.get(deviceId).get(measurement);
-    memChunk.setTimeOffset(undeletedTime);
-    return memChunk;
-  }
-
   @Override
   public boolean delete(String deviceId, String measurementId, long timestamp) {
     Map<String, IWritableMemChunk> deviceMap = memTableMap.get(deviceId);
@@ -263,4 +272,13 @@ public abstract class AbstractMemTable implements IMemTable {
   public TVListAllocator getTVListAllocator() {
     return allocator;
   }
+
+  @Override
+  public void release() {
+    for (Entry<String, Map<String, IWritableMemChunk>> entry: memTableMap.entrySet()) {
+      for (Entry<String, IWritableMemChunk> subEntry: entry.getValue().entrySet()) {
+        allocator.release(entry.getKey() + IoTDBConstant.PATH_SEPARATOR + subEntry.getKey(), subEntry.getValue().getTVList());
+      }
+    }
+  }
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
index 9f0a453..8408b54 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
@@ -102,4 +102,6 @@ public interface IMemTable {
   void setTVListAllocator(TVListAllocator allocator);
 
   TVListAllocator getTVListAllocator();
+
+  void release();
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
index a6f182d..6ef25a7 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
@@ -143,8 +143,6 @@ public class MemTableFlushTaskV2 {
               try {
                 writeOneSeries(encodingMessage.left, seriesWriter,
                     encodingMessage.right.getType());
-                memTable.getTVListAllocator().release(currDevice + IoTDBConstant.PATH_SEPARATOR
-                    + encodingMessage.right.getMeasurementId(), encodingMessage.left);
                 ioTaskQueue.add(seriesWriter);
               } catch (IOException e) {
                 LOGGER.error("Storage group {} memtable {}, encoding task error.", storageGroup,
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/TimeValuePairSorter.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/TimeValuePairSorter.java
index ffbb7ec..12e36f7 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/TimeValuePairSorter.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/TimeValuePairSorter.java
@@ -26,7 +26,7 @@ import org.apache.iotdb.db.utils.TimeValuePair;
 public interface TimeValuePairSorter {
 
   /**
-   * get the distinct sorted startTime.
+   * get the distinct sorted startTime. Only for query.
    *
    * @return a List which contains all distinct {@link TimeValuePair}s in ascending order by
    * timestamp.
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunkV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunkV2.java
index 14a8335..8d34e67 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunkV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunkV2.java
@@ -39,6 +39,7 @@ public class WritableMemChunkV2 implements IWritableMemChunk {
   private static final Logger LOGGER = LoggerFactory.getLogger(WritableMemChunkV2.class);
   private TSDataType dataType;
   private TVList list;
+  private List<TimeValuePair> sortedList;
 
   public WritableMemChunkV2(TSDataType dataType, TVList list) {
     this.dataType = dataType;
@@ -129,7 +130,7 @@ public class WritableMemChunkV2 implements IWritableMemChunk {
   }
 
   @Override
-  public TVList getSortedTVList() {
+  public synchronized TVList getSortedTVList() {
     list.sort();
     return list;
   }
@@ -155,42 +156,43 @@ public class WritableMemChunkV2 implements IWritableMemChunk {
   }
 
   @Override
-  public List<TimeValuePair> getSortedTimeValuePairList() {
-   List<TimeValuePair> result = new ArrayList<>();
-   TVList cloneList = list.clone();
-   cloneList.sort();
-   for (int i = 0; i < cloneList.size(); i++) {
-     long time = cloneList.getTime(i);
-     if (time < cloneList.getTimeOffset() ||
-         (i+1 < cloneList.size() && (time == cloneList.getTime(i+1)))) {
-       continue;
-     }
-
-     switch (dataType) {
-       case BOOLEAN:
-         result.add(new TimeValuePair(time, new TsBoolean(cloneList.getBoolean(i))));
-         break;
-       case INT32:
-         result.add(new TimeValuePair(time, new TsInt(cloneList.getInt(i))));
-         break;
-       case INT64:
-         result.add(new TimeValuePair(time, new TsLong(cloneList.getLong(i))));
-         break;
-       case FLOAT:
-         result.add(new TimeValuePair(time, new TsFloat(cloneList.getFloat(i))));
-         break;
-       case DOUBLE:
-         result.add(new TimeValuePair(time, new TsDouble(cloneList.getDouble(i))));
-         break;
-       case TEXT:
-         result.add(new TimeValuePair(time, new TsBinary(cloneList.getBinary(i))));
-         break;
-       default:
-         LOGGER.error("don't support data type: {}", dataType);
-         break;
-     }
-   }
-   return result;
+  public synchronized List<TimeValuePair> getSortedTimeValuePairList() {
+    if (sortedList != null) {
+      return sortedList;
+    }
+    sortedList = new ArrayList<>();
+    list.sort();
+    for (int i = 0; i < list.size(); i++) {
+      long time = list.getTime(i);
+      if (time < list.getTimeOffset() ||
+          (i + 1 < list.size() && (time == list.getTime(i + 1)))) {
+        continue;
+      }
+      switch (dataType) {
+        case BOOLEAN:
+          sortedList.add(new TimeValuePair(time, new TsBoolean(list.getBoolean(i))));
+          break;
+        case INT32:
+          sortedList.add(new TimeValuePair(time, new TsInt(list.getInt(i))));
+          break;
+        case INT64:
+          sortedList.add(new TimeValuePair(time, new TsLong(list.getLong(i))));
+          break;
+        case FLOAT:
+          sortedList.add(new TimeValuePair(time, new TsFloat(list.getFloat(i))));
+          break;
+        case DOUBLE:
+          sortedList.add(new TimeValuePair(time, new TsDouble(list.getDouble(i))));
+          break;
+        case TEXT:
+          sortedList.add(new TimeValuePair(time, new TsBinary(list.getBinary(i))));
+          break;
+        default:
+          LOGGER.error("don't support data type: {}", dataType);
+          break;
+      }
+    }
+    return this.sortedList;
   }
 
   @Override
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java b/iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
index 90779ff..35e9fd6 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
@@ -144,7 +144,6 @@ public abstract class TVList {
 
   public void reset() {
     size = 0;
-    limit = 0;
     timeOffset = -1;
     sorted = false;
   }
@@ -164,6 +163,9 @@ public abstract class TVList {
   }
 
   protected void sort(int lo, int hi) {
+    if (sorted) {
+      return;
+    }
     if (lo == hi) {
       return;
     }
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2Test.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2Test.java
index 6c6120f..508a411 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2Test.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2Test.java
@@ -55,6 +55,7 @@ public class FileNodeProcessorV2Test {
   @Test
   public void testSequenceSyncClose() {
     for (int j = 1; j <= 100; j++) {
+      System.out.println(j);
       TSRecord record = new TSRecord(j, deviceId);
       record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
       processor.insert(new InsertPlan(record));
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/query/reader/unsequence/UnseqSeriesReaderByTimestampTest.java b/iotdb/src/test/java/org/apache/iotdb/db/query/reader/unsequence/UnseqSeriesReaderByTimestampTest.java
index 78dfdbf..822a9a5 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/query/reader/unsequence/UnseqSeriesReaderByTimestampTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/query/reader/unsequence/UnseqSeriesReaderByTimestampTest.java
@@ -74,7 +74,7 @@ public class UnseqSeriesReaderByTimestampTest {
         TSRecord record = new TSRecord(2, deviceId);
         record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(100)));
         FileNodeManagerV2.getInstance().insert(new InsertPlan(record));
-//        FileNodeManagerV2.getInstance().asyncFlushAndSealAllFiles();
+ //       FileNodeManagerV2.getInstance().asyncFlushAndSealAllFiles();
 
         // query
         List<Path> paths = new ArrayList<>();