You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ej...@apache.org on 2021/01/22 11:22:28 UTC

[iotdb] branch debug_merge_bug created (now 3a447ec)

This is an automated email from the ASF dual-hosted git repository.

ejttianyu pushed a change to branch debug_merge_bug
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at 3a447ec  add some print

This branch includes the following new commits:

     new 3a447ec  add some print

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: add some print

Posted by ej...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ejttianyu pushed a commit to branch debug_merge_bug
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 3a447ec9af80df0101cca8a17590ded85a8f41e6
Author: EJTTianyu <16...@qq.com>
AuthorDate: Fri Jan 22 19:21:50 2021 +0800

    add some print
---
 .../level/LevelCompactionTsFileManagement.java     | 34 +++++++++++++++++++++-
 .../engine/storagegroup/StorageGroupProcessor.java |  1 +
 2 files changed, 34 insertions(+), 1 deletion(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
index ec1ebdd..b76852f 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
@@ -216,6 +216,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
           sequenceTsFileResources
               .computeIfAbsent(timePartitionId, this::newSequenceTsFileResources).get(level)
               .add(tsFileResource);
+          System.out.println("add a resource: " + tsFileResource.getTsFile().getName());
         } else {
           // current file has too high level
           sequenceTsFileResources
@@ -465,7 +466,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
   @Override
   public void forkCurrentFileList(long timePartition) {
     synchronized (sequenceTsFileResources) {
-      forkTsFileList(
+      forkSeqTsFileList(
           forkedSequenceTsFileResources,
           sequenceTsFileResources.computeIfAbsent(timePartition, this::newSequenceTsFileResources),
           seqLevelNum, seqFileNumInEachLevel);
@@ -500,6 +501,31 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
     }
   }
 
+  private void forkSeqTsFileList(
+      List<List<TsFileResource>> forkedTsFileResources,
+      List rawTsFileResources, int currMaxLevel, int currFileNumInEachLevel) {
+    forkedTsFileResources.clear();
+    for (int i = 0; i < currMaxLevel - 1; i++) {
+      List<TsFileResource> forkedLevelTsFileResources = new ArrayList<>();
+      Collection<TsFileResource> levelRawTsFileResources = (Collection<TsFileResource>) rawTsFileResources
+          .get(i);
+      for (TsFileResource tsFileResource : levelRawTsFileResources) {
+        if (tsFileResource.isClosed()) {
+          forkedLevelTsFileResources.add(tsFileResource);
+          if (forkedLevelTsFileResources.size() > currFileNumInEachLevel) {
+            break;
+          }
+        }
+      }
+      forkedTsFileResources.add(forkedLevelTsFileResources);
+    }
+    System.out.println("---------");
+    for (TsFileResource tsFileResource : forkedTsFileResources.get(0)){
+      System.out.print("file fork" + tsFileResource.getTsFile().getName());
+    }
+    System.out.println("---------");
+  }
+
   @Override
   protected void merge(long timePartition) {
     merge(forkedSequenceTsFileResources, true, timePartition, seqLevelNum,
@@ -546,15 +572,21 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
             }
             File newLevelFile = createNewTsFileName(mergeResources.get(i).get(0).getTsFile(),
                 i + 1);
+            System.out.println("new file name" + newLevelFile.getName());
+            System.out.println("merge resource 0" + mergeResources.get(i).get(0));
+
             compactionLogger.logSequence(sequence);
             compactionLogger.logFile(TARGET_NAME, newLevelFile);
             List<TsFileResource> toMergeTsFiles = mergeResources.get(i);
             logger.info("{} [Compaction] merge level-{}'s {} TsFiles to next level",
                 storageGroupName, i, toMergeTsFiles.size());
+            System.out.println("merge files used");
             for (TsFileResource toMergeTsFile : toMergeTsFiles) {
               logger.info("{} [Compaction] start to merge TsFile {}", storageGroupName,
                   toMergeTsFile);
+              System.out.print("merge file:" + toMergeTsFile.getTsFile().getName());
             }
+            System.out.println("--------");
 
             TsFileResource newResource = new TsFileResource(newLevelFile);
             // merge, read from source files and write to target file
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 2e88108..931d185 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -1087,6 +1087,7 @@ public class StorageGroupProcessor {
         // build new processor
         TsFileProcessor newProcessor = createTsFileProcessor(sequence, timeRangeId);
         tsFileProcessorTreeMap.put(timeRangeId, newProcessor);
+        System.out.println("put a processor to the tree map:" + newProcessor.getTsFileResource().getTsFile().getName());
         tsFileManagement.add(newProcessor.getTsFileResource(), sequence);
         res = newProcessor;
       }