You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2021/03/31 06:21:38 UTC

[iotdb] branch rel/0.11 updated: Fix unseq compaction file selector conflicts with time partition bug (#2951)

This is an automated email from the ASF dual-hosted git repository.

hxd pushed a commit to branch rel/0.11
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.11 by this push:
     new c03b7f5  Fix unseq compaction file selector conflicts with time partition bug (#2951)
c03b7f5 is described below

commit c03b7f57f44fb251e9172abfdfc1f5b2a313948c
Author: zhanglingzhe0820 <44...@qq.com>
AuthorDate: Wed Mar 31 14:21:17 2021 +0800

    Fix unseq compaction file selector conflicts with time partition bug (#2951)
---
 .../db/engine/compaction/TsFileManagement.java     |  13 +-
 .../level/LevelCompactionTsFileManagement.java     |  36 +++--
 .../no/NoCompactionTsFileManagement.java           | 175 +++++++++++++++++----
 3 files changed, 179 insertions(+), 45 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
index afda967..226f686 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
@@ -75,13 +75,20 @@ public abstract class TsFileManagement {
     this.storageGroupDir = storageGroupDir;
   }
 
+  /**
+   * get the TsFile list in sequence, not recommend to use this method, use
+   * getTsFileListByTimePartition instead
+   */
+  public abstract List<TsFileResource> getTsFileList(boolean sequence);
+
+  /** get the TsFile list in sequence by time partition */
+  public abstract List<TsFileResource> getTsFileListByTimePartition(
+      boolean sequence, long timePartition);
+
   public void setForceFullMerge(boolean forceFullMerge) {
     isForceFullMerge = forceFullMerge;
   }
 
-  /** get the TsFile list in sequence */
-  public abstract List<TsFileResource> getTsFileList(boolean sequence);
-
   /** get the TsFile list iterator in sequence */
   public abstract Iterator<TsFileResource> getIterator(boolean sequence);
 
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
index 0cc543e..61ba9b4 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
@@ -167,24 +167,42 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
     }
   }
 
+  @Deprecated
   @Override
   public List<TsFileResource> getTsFileList(boolean sequence) {
     List<TsFileResource> result = new ArrayList<>();
     if (sequence) {
       synchronized (sequenceTsFileResources) {
-        for (List<SortedSet<TsFileResource>> sequenceTsFileList :
-            sequenceTsFileResources.values()) {
-          for (int i = sequenceTsFileList.size() - 1; i >= 0; i--) {
-            result.addAll(sequenceTsFileList.get(i));
-          }
+        for (long timePartition : sequenceTsFileResources.keySet()) {
+          result.addAll(getTsFileListByTimePartition(true, timePartition));
         }
       }
     } else {
       synchronized (unSequenceTsFileResources) {
-        for (List<List<TsFileResource>> unSequenceTsFileList : unSequenceTsFileResources.values()) {
-          for (int i = unSequenceTsFileList.size() - 1; i >= 0; i--) {
-            result.addAll(unSequenceTsFileList.get(i));
-          }
+        for (long timePartition : unSequenceTsFileResources.keySet()) {
+          result.addAll(getTsFileListByTimePartition(false, timePartition));
+        }
+      }
+    }
+    return result;
+  }
+
+  public List<TsFileResource> getTsFileListByTimePartition(boolean sequence, long timePartition) {
+    List<TsFileResource> result = new ArrayList<>();
+    if (sequence) {
+      synchronized (sequenceTsFileResources) {
+        List<SortedSet<TsFileResource>> sequenceTsFileList =
+            sequenceTsFileResources.get(timePartition);
+        for (int i = sequenceTsFileList.size() - 1; i >= 0; i--) {
+          result.addAll(sequenceTsFileList.get(i));
+        }
+      }
+    } else {
+      synchronized (unSequenceTsFileResources) {
+        List<List<TsFileResource>> unSequenceTsFileList =
+            unSequenceTsFileResources.get(timePartition);
+        for (int i = unSequenceTsFileList.size() - 1; i >= 0; i--) {
+          result.addAll(unSequenceTsFileList.get(i));
         }
       }
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/no/NoCompactionTsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/no/NoCompactionTsFileManagement.java
index 7d3638d..acb336f 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/no/NoCompactionTsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/no/NoCompactionTsFileManagement.java
@@ -20,8 +20,12 @@
 package org.apache.iotdb.db.engine.compaction.no;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
 import java.util.TreeSet;
 import org.apache.iotdb.db.engine.compaction.TsFileManagement;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
@@ -32,30 +36,46 @@ public class NoCompactionTsFileManagement extends TsFileManagement {
 
   private static final Logger logger = LoggerFactory.getLogger(NoCompactionTsFileManagement.class);
   // includes sealed and unsealed sequence TsFiles
-  private TreeSet<TsFileResource> sequenceFileTreeSet = new TreeSet<>(
-      (o1, o2) -> {
-        try {
-          int rangeCompare = Long.compare(Long.parseLong(o1.getTsFile().getParentFile().getName()),
-              Long.parseLong(o2.getTsFile().getParentFile().getName()));
-          return rangeCompare == 0 ? compareFileName(o1.getTsFile(), o2.getTsFile()) : rangeCompare;
-        } catch (NumberFormatException e) {
-          return compareFileName(o1.getTsFile(), o2.getTsFile());
-        }
-      });
+  private final Map<Long, TreeSet<TsFileResource>> sequenceFileTreeSetMap = new TreeMap<>();
 
   // includes sealed and unsealed unSequence TsFiles
-  private List<TsFileResource> unSequenceFileList = new ArrayList<>();
+  private final Map<Long, List<TsFileResource>> unSequenceFileListMap = new TreeMap<>();
 
   public NoCompactionTsFileManagement(String storageGroupName, String storageGroupDir) {
     super(storageGroupName, storageGroupDir);
   }
 
+  @Deprecated
   @Override
   public List<TsFileResource> getTsFileList(boolean sequence) {
+    List<TsFileResource> result = new ArrayList<>();
+    if (sequence) {
+      synchronized (sequenceFileTreeSetMap) {
+        for (TreeSet<TsFileResource> tsFileResourceTreeSet : sequenceFileTreeSetMap.values()) {
+          result.addAll(tsFileResourceTreeSet);
+        }
+      }
+    } else {
+      synchronized (unSequenceFileListMap) {
+        for (List<TsFileResource> tsFileResourceList : unSequenceFileListMap.values()) {
+          result.addAll(tsFileResourceList);
+        }
+      }
+    }
+    return result;
+  }
+
+  @Override
+  public List<TsFileResource> getTsFileListByTimePartition(boolean sequence, long timePartition) {
     if (sequence) {
-      return new ArrayList<>(sequenceFileTreeSet);
+      synchronized (sequenceFileTreeSetMap) {
+        return new ArrayList<>(sequenceFileTreeSetMap.getOrDefault(timePartition, new TreeSet<>()));
+      }
     } else {
-      return unSequenceFileList;
+      synchronized (unSequenceFileListMap) {
+        return new ArrayList<>(
+            unSequenceFileListMap.getOrDefault(timePartition, Collections.emptyList()));
+      }
     }
   }
 
@@ -67,27 +87,79 @@ public class NoCompactionTsFileManagement extends TsFileManagement {
   @Override
   public void remove(TsFileResource tsFileResource, boolean sequence) {
     if (sequence) {
-      sequenceFileTreeSet.remove(tsFileResource);
+      synchronized (sequenceFileTreeSetMap) {
+        TreeSet<TsFileResource> sequenceFileTreeSet =
+            sequenceFileTreeSetMap.get(tsFileResource.getTimePartition());
+        sequenceFileTreeSet.remove(tsFileResource);
+      }
     } else {
-      unSequenceFileList.remove(tsFileResource);
+      synchronized (unSequenceFileListMap) {
+        List<TsFileResource> unSequenceFileList =
+            unSequenceFileListMap.get(tsFileResource.getTimePartition());
+        unSequenceFileList.remove(tsFileResource);
+      }
     }
   }
 
   @Override
   public void removeAll(List<TsFileResource> tsFileResourceList, boolean sequence) {
-    if (sequence) {
-      sequenceFileTreeSet.removeAll(tsFileResourceList);
-    } else {
-      unSequenceFileList.removeAll(tsFileResourceList);
+    if (tsFileResourceList.size() > 0) {
+      tsFileResourceList.sort((o1, o2) -> (int) (o1.getTimePartition() - o2.getTimePartition()));
+      if (sequence) {
+        synchronized (sequenceFileTreeSetMap) {
+          long currTimePartition = tsFileResourceList.get(0).getTimePartition();
+          int startIndex = 0;
+          for (int i = 1; i < tsFileResourceList.size(); i++) {
+            TsFileResource tsFileResource = tsFileResourceList.get(i);
+            if (tsFileResource.getTimePartition() != currTimePartition) {
+              sequenceFileTreeSetMap
+                  .get(currTimePartition)
+                  .removeAll(tsFileResourceList.subList(startIndex, i));
+              currTimePartition = tsFileResource.getTimePartition();
+              startIndex = i;
+            }
+          }
+          sequenceFileTreeSetMap
+              .get(currTimePartition)
+              .removeAll(tsFileResourceList.subList(startIndex, tsFileResourceList.size()));
+        }
+      } else {
+        synchronized (unSequenceFileListMap) {
+          long currTimePartition = tsFileResourceList.get(0).getTimePartition();
+          int startIndex = 0;
+          for (int i = 1; i < tsFileResourceList.size(); i++) {
+            TsFileResource tsFileResource = tsFileResourceList.get(i);
+            if (tsFileResource.getTimePartition() != currTimePartition) {
+              unSequenceFileListMap
+                  .get(currTimePartition)
+                  .removeAll(tsFileResourceList.subList(startIndex, i));
+              currTimePartition = tsFileResource.getTimePartition();
+              startIndex = i;
+            }
+          }
+          unSequenceFileListMap
+              .get(currTimePartition)
+              .removeAll(tsFileResourceList.subList(startIndex, tsFileResourceList.size()));
+        }
+      }
     }
   }
 
   @Override
   public void add(TsFileResource tsFileResource, boolean sequence) {
+    long timePartitionId = tsFileResource.getTimePartition();
     if (sequence) {
-      sequenceFileTreeSet.add(tsFileResource);
+      synchronized (sequenceFileTreeSetMap) {
+        sequenceFileTreeSetMap
+            .computeIfAbsent(timePartitionId, this::newSequenceTsFileResources)
+            .add(tsFileResource);
+      }
     } else {
-      unSequenceFileList.add(tsFileResource);
+      synchronized (unSequenceFileListMap) {
+        unSequenceFileListMap
+            .computeIfAbsent(timePartitionId, this::newUnSequenceTsFileResources)
+            .add(tsFileResource);
+      }
     }
   }
 
@@ -98,44 +170,73 @@ public class NoCompactionTsFileManagement extends TsFileManagement {
 
   @Override
   public void addAll(List<TsFileResource> tsFileResourceList, boolean sequence) {
-    if (sequence) {
-      sequenceFileTreeSet.addAll(tsFileResourceList);
-    } else {
-      unSequenceFileList.addAll(tsFileResourceList);
+    for (TsFileResource tsFileResource : tsFileResourceList) {
+      add(tsFileResource, sequence);
     }
   }
 
   @Override
   public boolean contains(TsFileResource tsFileResource, boolean sequence) {
     if (sequence) {
-      return sequenceFileTreeSet.contains(tsFileResource);
+      synchronized (sequenceFileTreeSetMap) {
+        return sequenceFileTreeSetMap
+            .getOrDefault(tsFileResource.getTimePartition(), newSequenceTsFileResources(0L))
+            .contains(tsFileResource);
+      }
     } else {
-      return unSequenceFileList.contains(tsFileResource);
+      synchronized (unSequenceFileListMap) {
+        return unSequenceFileListMap
+            .getOrDefault(tsFileResource.getTimePartition(), new ArrayList<>())
+            .contains(tsFileResource);
+      }
     }
   }
 
   @Override
   public void clear() {
-    sequenceFileTreeSet.clear();
-    unSequenceFileList.clear();
+    sequenceFileTreeSetMap.clear();
+    unSequenceFileListMap.clear();
   }
 
   @Override
   public boolean isEmpty(boolean sequence) {
     if (sequence) {
-      return sequenceFileTreeSet.isEmpty();
+      synchronized (sequenceFileTreeSetMap) {
+        for (Set<TsFileResource> sequenceFileTreeSet : sequenceFileTreeSetMap.values()) {
+          if (!sequenceFileTreeSet.isEmpty()) {
+            return false;
+          }
+        }
+      }
     } else {
-      return unSequenceFileList.isEmpty();
+      synchronized (unSequenceFileListMap) {
+        for (List<TsFileResource> unSequenceFileList : unSequenceFileListMap.values()) {
+          if (!unSequenceFileList.isEmpty()) {
+            return false;
+          }
+        }
+      }
     }
+    return true;
   }
 
   @Override
   public int size(boolean sequence) {
+    int result = 0;
     if (sequence) {
-      return sequenceFileTreeSet.size();
+      synchronized (sequenceFileTreeSetMap) {
+        for (Set<TsFileResource> sequenceFileTreeSet : sequenceFileTreeSetMap.values()) {
+          result += sequenceFileTreeSet.size();
+        }
+      }
     } else {
-      return unSequenceFileList.size();
+      synchronized (unSequenceFileListMap) {
+        for (List<TsFileResource> unSequenceFileList : unSequenceFileListMap.values()) {
+          result += unSequenceFileList.size();
+        }
+      }
     }
+    return result;
   }
 
   @Override
@@ -152,4 +253,12 @@ public class NoCompactionTsFileManagement extends TsFileManagement {
   protected void merge(long timePartition) {
     logger.info("{} no merge logic", storageGroupName);
   }
+
+  private TreeSet<TsFileResource> newSequenceTsFileResources(Long k) {
+    return new TreeSet<>((o1, o2) -> compareFileName(o1.getTsFile(), o2.getTsFile()));
+  }
+
+  private List<TsFileResource> newUnSequenceTsFileResources(Long k) {
+    return new ArrayList<>();
+  }
 }