You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ej...@apache.org on 2020/06/11 10:53:38 UTC

[incubator-iotdb] branch dev_new_merge_rebase created (now 64063bd)

This is an automated email from the ASF dual-hosted git repository.

ejttianyu pushed a change to branch dev_new_merge_rebase
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git.


      at 64063bd  fix merge bug

This branch includes the following new commits:

     new 64063bd  fix merge bug

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-iotdb] 01/01: fix merge bug

Posted by ej...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ejttianyu pushed a commit to branch dev_new_merge_rebase
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 64063bde262b36a1f145548a0a088052ea1bb1ad
Author: EJTTianyu <16...@qq.com>
AuthorDate: Wed Jun 10 16:38:19 2020 +0800

    fix merge bug
---
 .../seqMerge/inplace/task/MergeMultiChunkTask.java | 90 +++++++++++-----------
 .../seqMerge/squeeze/task/SqueezeMergeTask.java    |  4 +-
 .../task/RegularizationMergeSeriesTask.java        |  8 ++
 .../task/RegularizationMergeTask.java              |  4 +-
 .../engine/storagegroup/StorageGroupProcessor.java | 53 +++++--------
 .../java/org/apache/iotdb/db/utils/MergeUtils.java |  4 +-
 .../iotdb/db/integration/IoTDBSensorUpdateIT.java  |  3 +
 7 files changed, 84 insertions(+), 82 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/seqMerge/inplace/task/MergeMultiChunkTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/seqMerge/inplace/task/MergeMultiChunkTask.java
index c2bc33f..67381f4 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/seqMerge/inplace/task/MergeMultiChunkTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/seqMerge/inplace/task/MergeMultiChunkTask.java
@@ -213,19 +213,21 @@ class MergeMultiChunkTask {
     int[] ptWrittens = new int[seqChunkMeta.length];
     int mergeChunkSubTaskNum = IoTDBDescriptor.getInstance().getConfig()
         .getMergeChunkSubThreadNum();
-    PriorityQueue<MetaListEntry>[] chunkMetaHeaps = new PriorityQueue[mergeChunkSubTaskNum];
+    MetaListEntry[] metaListEntries = new MetaListEntry[currMergingPaths.size()];
+    PriorityQueue<Integer>[] chunkIdxHeaps = new PriorityQueue[mergeChunkSubTaskNum];
     for (int i = 0; i < mergeChunkSubTaskNum; i++) {
-      chunkMetaHeaps[i] = new PriorityQueue<>();
+      chunkIdxHeaps[i] = new PriorityQueue<>();
     }
     int idx = 0;
     for (int i = 0; i < currMergingPaths.size(); i++) {
+      chunkIdxHeaps[idx % mergeChunkSubTaskNum].add(i);
       if (seqChunkMeta[i].isEmpty()) {
         continue;
       }
       MetaListEntry entry = new MetaListEntry(i, seqChunkMeta[i]);
       entry.next();
 
-      chunkMetaHeaps[idx % mergeChunkSubTaskNum].add(entry);
+      metaListEntries[i] = entry;
       idx++;
       ptWrittens[i] = 0;
     }
@@ -237,9 +239,8 @@ class MergeMultiChunkTask {
     for (int i = 0; i < mergeChunkSubTaskNum; i++) {
       int finalI = i;
       futures.add(MergeManager.getINSTANCE().submitChunkSubTask(() -> {
-        mergeChunkHeap(chunkMetaHeaps[finalI], ptWrittens, reader, mergeFileWriter, unseqReaders,
-            currFile,
-            isLastFile);
+        mergeChunkHeap(chunkIdxHeaps[finalI], metaListEntries, ptWrittens, reader, mergeFileWriter,
+            unseqReaders, currFile, isLastFile);
         return null;
       }));
     }
@@ -260,48 +261,49 @@ class MergeMultiChunkTask {
     return mergedChunkNum.get() > 0;
   }
 
-  private void mergeChunkHeap(PriorityQueue<MetaListEntry> chunkMetaHeap, int[] ptWrittens,
-      TsFileSequenceReader reader,
-      RestorableTsFileIOWriter mergeFileWriter, IPointReader[] unseqReaders,
-      TsFileResource currFile,
-      boolean isLastFile) throws IOException {
-    while (!chunkMetaHeap.isEmpty()) {
-      MetaListEntry metaListEntry = chunkMetaHeap.poll();
-      ChunkMetadata currMeta = metaListEntry.current();
-      int pathIdx = metaListEntry.getPathId();
-      boolean isLastChunk = !metaListEntry.hasNext();
+  private void mergeChunkHeap(PriorityQueue<Integer> chunkIdxHeap, MetaListEntry[] metaListEntries,
+      int[] ptWrittens, TsFileSequenceReader reader, RestorableTsFileIOWriter mergeFileWriter,
+      IPointReader[] unseqReaders, TsFileResource currFile, boolean isLastFile) throws IOException {
+    while (!chunkIdxHeap.isEmpty()) {
+      int pathIdx = chunkIdxHeap.poll();
       Path path = currMergingPaths.get(pathIdx);
       IChunkWriter chunkWriter = resource.getChunkWriter(path);
 
-      boolean chunkOverflowed = MergeUtils.isChunkOverflowed(currTimeValuePairs[pathIdx], currMeta);
-      boolean chunkTooSmall = MergeUtils
-          .isChunkTooSmall(ptWrittens[pathIdx], currMeta, isLastChunk, minChunkPointNum);
-
-      Chunk chunk;
-      synchronized (reader) {
-        chunk = reader.readMemChunk(currMeta);
-      }
-      ptWrittens[pathIdx] = mergeChunkV2(currMeta, chunkOverflowed, chunkTooSmall, chunk,
-          ptWrittens[pathIdx], pathIdx, mergeFileWriter, unseqReaders[pathIdx], chunkWriter,
-          currFile);
-
-      if (!isLastChunk) {
-        metaListEntry.next();
-        chunkMetaHeap.add(metaListEntry);
-      } else {
-        // this only happens when the seqFiles do not contain this series, otherwise the remaining
-        // data will be merged with the last chunk in the seqFiles
-        if (isLastFile && currTimeValuePairs[pathIdx] != null) {
-          ptWrittens[pathIdx] += writeRemainingUnseq(chunkWriter, unseqReaders[pathIdx],
-              Long.MAX_VALUE,
-              pathIdx);
-          mergedChunkNum.incrementAndGet();
+      if (metaListEntries[pathIdx] != null) {
+        MetaListEntry metaListEntry = metaListEntries[pathIdx];
+        ChunkMetadata currMeta = metaListEntry.current();
+        boolean isLastChunk = !metaListEntry.hasNext();
+        boolean chunkOverflowed = MergeUtils
+            .isChunkOverflowed(currTimeValuePairs[pathIdx], currMeta);
+        boolean chunkTooSmall = MergeUtils
+            .isChunkTooSmall(ptWrittens[pathIdx], currMeta, isLastChunk, minChunkPointNum);
+
+        Chunk chunk;
+        synchronized (reader) {
+          chunk = reader.readMemChunk(currMeta);
+        }
+        ptWrittens[pathIdx] = mergeChunkV2(currMeta, chunkOverflowed, chunkTooSmall, chunk,
+            ptWrittens[pathIdx], pathIdx, mergeFileWriter, unseqReaders[pathIdx], chunkWriter,
+            currFile);
+
+        if (!isLastChunk) {
+          metaListEntry.next();
+          chunkIdxHeap.add(pathIdx);
+          continue;
         }
-        // the last merged chunk may still be smaller than the threshold, flush it anyway
-        if (ptWrittens[pathIdx] > 0) {
-          synchronized (mergeFileWriter) {
-            chunkWriter.writeToFileWriter(mergeFileWriter);
-          }
+      }
+      // this only happens when the seqFiles do not contain this series, otherwise the remaining
+      // data will be merged with the last chunk in the seqFiles
+      if (isLastFile && currTimeValuePairs[pathIdx] != null) {
+        ptWrittens[pathIdx] += writeRemainingUnseq(chunkWriter, unseqReaders[pathIdx],
+            Long.MAX_VALUE,
+            pathIdx);
+        mergedChunkNum.incrementAndGet();
+      }
+      // the last merged chunk may still be smaller than the threshold, flush it anyway
+      if (ptWrittens[pathIdx] > 0) {
+        synchronized (mergeFileWriter) {
+          chunkWriter.writeToFileWriter(mergeFileWriter);
         }
       }
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/seqMerge/squeeze/task/SqueezeMergeTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/seqMerge/squeeze/task/SqueezeMergeTask.java
index 9303d12..13c1b11 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/seqMerge/squeeze/task/SqueezeMergeTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/seqMerge/squeeze/task/SqueezeMergeTask.java
@@ -152,7 +152,7 @@ public class SqueezeMergeTask implements Callable<Void> {
   }
 
   private void deleteFile(TsFileResource seqFile) {
-    seqFile.getWriteQueryLock().writeLock().lock();
+    seqFile.writeLock();
     try {
       resource.removeFileReader(seqFile);
       ChunkMetadataCache.getInstance().remove(seqFile);
@@ -163,7 +163,7 @@ public class SqueezeMergeTask implements Callable<Void> {
     } catch (Exception e) {
       logger.error(e.getMessage(), e);
     } finally {
-      seqFile.getWriteQueryLock().writeLock().unlock();
+      seqFile.writeUnlock();
     }
   }
 }
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/sizeMerge/regularization/task/RegularizationMergeSeriesTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/sizeMerge/regularization/task/RegularizationMergeSeriesTask.java
index 0ee3d00..51783bc 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/sizeMerge/regularization/task/RegularizationMergeSeriesTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/sizeMerge/regularization/task/RegularizationMergeSeriesTask.java
@@ -53,6 +53,14 @@ public class RegularizationMergeSeriesTask extends BaseMergeSeriesTask {
         .split(IoTDBConstant.TSFILE_NAME_SEPARATOR);
     String[] items2 = o2.getName().replace(TSFILE_SUFFIX, "")
         .split(IoTDBConstant.TSFILE_NAME_SEPARATOR);
+
+    //TODO: for test only
+    try {
+      Long.parseLong(items1[0]);
+    } catch (NumberFormatException e) {
+      return items1[0].compareTo(items2[0]);
+    }
+
     return Long.compare(Long.parseLong(items1[0]), Long.parseLong(items2[0]));
   }));
 
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/sizeMerge/regularization/task/RegularizationMergeTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/sizeMerge/regularization/task/RegularizationMergeTask.java
index 5ff1a38..62f3ca9 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/sizeMerge/regularization/task/RegularizationMergeTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/sizeMerge/regularization/task/RegularizationMergeTask.java
@@ -161,7 +161,7 @@ public class RegularizationMergeTask implements Callable<Void> {
   }
 
   private void deleteFile(TsFileResource seqFile) {
-    seqFile.getWriteQueryLock().writeLock().lock();
+    seqFile.writeLock();
     try {
       resource.removeFileReader(seqFile);
       ChunkMetadataCache.getInstance().remove(seqFile);
@@ -170,7 +170,7 @@ public class RegularizationMergeTask implements Callable<Void> {
     } catch (Exception e) {
       logger.error(e.getMessage(), e);
     } finally {
-      seqFile.getWriteQueryLock().writeLock().unlock();
+      seqFile.writeUnlock();
     }
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 90351c7..b2b1e54 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -1708,13 +1708,6 @@ public class StorageGroupProcessor {
   }
 
   private void removeSeqFiles(List<TsFileResource> seqFiles) {
-    mergeLock.writeLock().lock();
-    try {
-      sequenceFileTreeSet.removeAll(seqFiles);
-    } finally {
-      mergeLock.writeLock().unlock();
-    }
-
     for (TsFileResource seqFile : seqFiles) {
       seqFile.writeLock();
       try {
@@ -1726,13 +1719,6 @@ public class StorageGroupProcessor {
   }
 
   private void removeUnseqFiles(List<TsFileResource> unseqFiles) {
-    mergeLock.writeLock().lock();
-    try {
-      unSequenceFileList.removeAll(unseqFiles);
-    } finally {
-      mergeLock.writeLock().unlock();
-    }
-
     for (TsFileResource unseqFile : unseqFiles) {
       unseqFile.writeLock();
       try {
@@ -1808,29 +1794,34 @@ public class StorageGroupProcessor {
 
   private void handleInplaceMerge(List<TsFileResource> seqFiles,
       List<TsFileResource> unseqFiles, File mergeLog) {
-
+    mergeLock.writeLock().lock();
+    try {
+      unSequenceFileList.removeAll(unseqFiles);
+    } finally {
+      mergeLock.writeLock().unlock();
+    }
     removeUnseqFiles(unseqFiles);
     endMerge(mergeLog, seqFiles);
   }
 
   private void handleOtherMerge(List<TsFileResource> seqFiles,
       List<TsFileResource> unseqFiles, File mergeLog, List<TsFileResource> newFile) {
-    // make sure no queries are holding the seqFiles
-    for (TsFileResource seqFile : seqFiles) {
-      seqFile.writeLock();
-    }
-    for (TsFileResource unseqFile : unseqFiles) {
-      unseqFile.writeLock();
-    }
     // block new queries and insertions to prevent the seqFiles from changing
+    writeLock();
     mergeLock.writeLock().lock();
     try {
-      removeUnseqFiles(unseqFiles);
-      removeSeqFiles(seqFiles);
+
+      unSequenceFileList.removeAll(unseqFiles);
+      sequenceFileTreeSet.removeAll(seqFiles);
+
       // move modifications generated during merge into the new file
       for (TsFileResource tsFileResource : newFile) {
-        tsFileResource
-            .setProcessor(getOrCreateTsFileProcessor(tsFileResource.getTimePartition(), true));
+        TsFileProcessor tsFileProcessor = new TsFileProcessor(storageGroupName,
+            tsFileResource.getFile().getAbsoluteFile(),
+            getVersionControllerByTimePartitionId(tsFileResource.getTimePartition()),
+            this::closeUnsealedTsFileProcessorCallBack,
+            this::updateLatestFlushTimeCallback, true);
+        tsFileResource.setProcessor(tsFileProcessor);
         if (mergingModification != null) {
           logger.info("{} is updating the merged file's modification file", storageGroupName);
           for (Modification modification : mergingModification.getModifications()) {
@@ -1847,15 +1838,13 @@ public class StorageGroupProcessor {
       logger.error("{} fails to do the after merge action,", storageGroupName, e);
     } finally {
       isMerging = false;
+      writeUnlock();
       mergeLock.writeLock().unlock();
       logger.info("{} a merge task ends", storageGroupName);
-      for (TsFileResource seqFile : seqFiles) {
-        seqFile.writeUnlock();
-      }
-      for (TsFileResource unseqFile : unseqFiles) {
-        unseqFile.writeUnlock();
-      }
     }
+
+    removeUnseqFiles(unseqFiles);
+    removeSeqFiles(seqFiles);
   }
 
   /**
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java
index 1e5e1e4..3350e24 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java
@@ -35,8 +35,8 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.metadata.mnode.InternalMNode;
-import org.apache.iotdb.db.metadata.mnode.LeafMNode;
 import org.apache.iotdb.db.metadata.mnode.MNode;
+import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
 import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
@@ -271,7 +271,7 @@ public class MergeUtils {
     for (String device : devices) {
       InternalMNode deviceNode = (InternalMNode) MManager.getInstance().getNodeByPath(device);
       for (Entry<String, MNode> entry : deviceNode.getChildren().entrySet()) {
-        MeasurementSchema measurementSchema = ((LeafMNode) entry.getValue()).getSchema();
+        MeasurementSchema measurementSchema = ((MeasurementMNode) entry.getValue()).getSchema();
         chunkWriterCacheMap
             .put(new Path(device, entry.getKey()), new ChunkWriterImpl(measurementSchema));
       }
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSensorUpdateIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSensorUpdateIT.java
index c404297..9999f9b 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSensorUpdateIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSensorUpdateIT.java
@@ -25,6 +25,8 @@ import java.sql.DriverManager;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.merge.seqMerge.SeqMergeFileStrategy;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.jdbc.Config;
 import org.junit.After;
@@ -37,6 +39,7 @@ public class IoTDBSensorUpdateIT {
   public void setUp() throws Exception {
     EnvironmentUtils.closeStatMonitor();
     EnvironmentUtils.envSetUp();
+    IoTDBDescriptor.getInstance().getConfig().setSeqMergeFileStrategy(SeqMergeFileStrategy.INPLACE);
     Class.forName(Config.JDBC_DRIVER_NAME);
   }