You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/11/05 08:36:43 UTC

[iotdb] branch rel/0.11 updated: [To rel/0.11] [IOTDB-982] Solve tag bug (#1942)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch rel/0.11
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.11 by this push:
     new 66882c5  [To rel/0.11] [IOTDB-982] Solve tag bug (#1942)
66882c5 is described below

commit 66882c5aec5c07d34f0e1f17665c5440fb0942e6
Author: Jackie Tien <Ja...@foxmail.com>
AuthorDate: Thu Nov 5 16:32:04 2020 +0800

    [To rel/0.11] [IOTDB-982] Solve tag bug (#1942)
    
    * skip while the tag value is null
    * change the inverted index map to thread safe
---
 .../org/apache/iotdb/db/metadata/MManager.java     | 48 +++++++++++++---------
 1 file changed, 28 insertions(+), 20 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index 8d244dc..3118993 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -40,6 +40,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -121,7 +122,7 @@ public class MManager {
   // device -> DeviceMNode
   private RandomDeleteCache<PartialPath, MNode> mNodeCache;
   // tag key -> tag value -> LeafMNode
-  private Map<String, Map<String, Set<MeasurementMNode>>> tagIndex = new HashMap<>();
+  private Map<String, Map<String, Set<MeasurementMNode>>> tagIndex = new ConcurrentHashMap<>();
 
   // data type -> number
   private Map<TSDataType, Integer> schemaDataTypeNumMap = new ConcurrentHashMap<>();
@@ -374,7 +375,7 @@ public class MManager {
         }
         // two time series may set one storage group concurrently,
         // that's normal in our concurrency control protocol
-        catch (MetadataException e){
+        catch (MetadataException e) {
           logger.info("concurrently operate set storage group cmd {} twice", cmd);
         }
         break;
@@ -430,8 +431,8 @@ public class MManager {
       if (plan.getTags() != null) {
         // tag key, tag value
         for (Entry<String, String> entry : plan.getTags().entrySet()) {
-          tagIndex.computeIfAbsent(entry.getKey(), k -> new HashMap<>())
-              .computeIfAbsent(entry.getValue(), v -> new HashSet<>()).add(leafMNode);
+          tagIndex.computeIfAbsent(entry.getKey(), k -> new ConcurrentHashMap<>())
+              .computeIfAbsent(entry.getValue(), v -> new CopyOnWriteArraySet<>()).add(leafMNode);
         }
       }
 
@@ -566,7 +567,7 @@ public class MManager {
     // TODO: delete the path node and all its ancestors
     mNodeCache.clear();
     totalSeriesNumber.addAndGet(-1);
-    if (!allowToCreateNewSeries && 
+    if (!allowToCreateNewSeries &&
         totalSeriesNumber.get() * ESTIMATED_SERIES_SIZE < MTREE_SIZE_THRESHOLD) {
       logger.info("Current series number {} come back to normal level", totalSeriesNumber);
       allowToCreateNewSeries = true;
@@ -600,7 +601,7 @@ public class MManager {
       for (PartialPath storageGroup : storageGroups) {
         totalSeriesNumber.addAndGet(mtree.getAllTimeseriesCount(storageGroup));
         // clear cached MNode
-        if (!allowToCreateNewSeries && 
+        if (!allowToCreateNewSeries &&
             totalSeriesNumber.get() * ESTIMATED_SERIES_SIZE < MTREE_SIZE_THRESHOLD) {
           logger.info("Current series number {} come back to normal level", totalSeriesNumber);
           allowToCreateNewSeries = true;
@@ -821,6 +822,9 @@ public class MManager {
     List<MeasurementMNode> allMatchedNodes = new ArrayList<>();
     if (plan.isContains()) {
       for (Entry<String, Set<MeasurementMNode>> entry : value2Node.entrySet()) {
+        if (entry.getKey() == null || entry.getValue() == null) {
+          continue;
+        }
         String tagValue = entry.getKey();
         if (tagValue.contains(plan.getValue())) {
           allMatchedNodes.addAll(entry.getValue());
@@ -828,6 +832,9 @@ public class MManager {
       }
     } else {
       for (Entry<String, Set<MeasurementMNode>> entry : value2Node.entrySet()) {
+        if (entry.getKey() == null || entry.getValue() == null) {
+          continue;
+        }
         String tagValue = entry.getKey();
         if (plan.getValue().equals(tagValue)) {
           allMatchedNodes.addAll(entry.getValue());
@@ -1182,8 +1189,8 @@ public class MManager {
       // update inverted Index map
       if (tagsMap != null) {
         for (Entry<String, String> entry : tagsMap.entrySet()) {
-          tagIndex.computeIfAbsent(entry.getKey(), k -> new HashMap<>())
-              .computeIfAbsent(entry.getValue(), v -> new HashSet<>()).add(leafMNode);
+          tagIndex.computeIfAbsent(entry.getKey(), k -> new ConcurrentHashMap<>())
+              .computeIfAbsent(entry.getValue(), v -> new CopyOnWriteArraySet<>()).add(leafMNode);
         }
       }
       return;
@@ -1228,8 +1235,8 @@ public class MManager {
         // if the key doesn't exist or the value is not equal to the new one
         // we should add a new key-value to inverted index map
         if (beforeValue == null || !beforeValue.equals(value)) {
-          tagIndex.computeIfAbsent(key, k -> new HashMap<>())
-              .computeIfAbsent(value, v -> new HashSet<>()).add(leafMNode);
+          tagIndex.computeIfAbsent(key, k -> new ConcurrentHashMap<>())
+              .computeIfAbsent(value, v -> new CopyOnWriteArraySet<>()).add(leafMNode);
         }
       }
     }
@@ -1298,8 +1305,8 @@ public class MManager {
       leafMNode.setOffset(offset);
       // update inverted Index map
       for (Entry<String, String> entry : tagsMap.entrySet()) {
-        tagIndex.computeIfAbsent(entry.getKey(), k -> new HashMap<>())
-            .computeIfAbsent(entry.getValue(), v -> new HashSet<>()).add(leafMNode);
+        tagIndex.computeIfAbsent(entry.getKey(), k -> new ConcurrentHashMap<>())
+            .computeIfAbsent(entry.getValue(), v -> new CopyOnWriteArraySet<>()).add(leafMNode);
       }
       return;
     }
@@ -1321,8 +1328,8 @@ public class MManager {
     tagLogFile.write(pair.left, pair.right, leafMNode.getOffset());
 
     // update tag inverted map
-    tagsMap.forEach((key, value) -> tagIndex.computeIfAbsent(key, k -> new HashMap<>())
-        .computeIfAbsent(value, v -> new HashSet<>()).add(leafMNode));
+    tagsMap.forEach((key, value) -> tagIndex.computeIfAbsent(key, k -> new ConcurrentHashMap<>())
+        .computeIfAbsent(value, v -> new CopyOnWriteArraySet<>()).add(leafMNode));
   }
 
   /**
@@ -1461,8 +1468,8 @@ public class MManager {
               tagIndex.containsKey(key)));
         }
       }
-      tagIndex.computeIfAbsent(key, k -> new HashMap<>())
-          .computeIfAbsent(currentValue, k -> new HashSet<>()).add(leafMNode);
+      tagIndex.computeIfAbsent(key, k -> new ConcurrentHashMap<>())
+          .computeIfAbsent(currentValue, k -> new CopyOnWriteArraySet<>()).add(leafMNode);
     }
   }
 
@@ -1523,8 +1530,8 @@ public class MManager {
               tagIndex.containsKey(oldKey)));
         }
       }
-      tagIndex.computeIfAbsent(newKey, k -> new HashMap<>())
-          .computeIfAbsent(value, k -> new HashSet<>()).add(leafMNode);
+      tagIndex.computeIfAbsent(newKey, k -> new ConcurrentHashMap<>())
+          .computeIfAbsent(value, k -> new CopyOnWriteArraySet<>()).add(leafMNode);
     } else if (pair.right.containsKey(oldKey)) {
       // check attribute map
       pair.right.put(newKey, pair.right.remove(oldKey));
@@ -1774,8 +1781,9 @@ public class MManager {
         if (measurementMNode.getSchema().getType() != insertDataType) {
           logger.warn("DataType mismatch, Insert measurement {} type {}, metadata tree type {}",
               measurementList[i], insertDataType, measurementMNode.getSchema().getType());
-          DataTypeMismatchException mismatchException = new DataTypeMismatchException(measurementList[i],
-                  insertDataType, measurementMNode.getSchema().getType());
+          DataTypeMismatchException mismatchException = new DataTypeMismatchException(
+              measurementList[i],
+              insertDataType, measurementMNode.getSchema().getType());
           if (!config.isEnablePartialInsert()) {
             throw mismatchException;
           } else {