You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/11/05 08:36:43 UTC
[iotdb] branch rel/0.11 updated: [To rel/0.11] [IOTDB-982] Solve
tag bug (#1942)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch rel/0.11
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.11 by this push:
new 66882c5 [To rel/0.11] [IOTDB-982] Solve tag bug (#1942)
66882c5 is described below
commit 66882c5aec5c07d34f0e1f17665c5440fb0942e6
Author: Jackie Tien <Ja...@foxmail.com>
AuthorDate: Thu Nov 5 16:32:04 2020 +0800
[To rel/0.11] [IOTDB-982] Solve tag bug (#1942)
* skip while the tag value is null
* change the inverted index map to thread safe
---
.../org/apache/iotdb/db/metadata/MManager.java | 48 +++++++++++++---------
1 file changed, 28 insertions(+), 20 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index 8d244dc..3118993 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -40,6 +40,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -121,7 +122,7 @@ public class MManager {
// device -> DeviceMNode
private RandomDeleteCache<PartialPath, MNode> mNodeCache;
// tag key -> tag value -> LeafMNode
- private Map<String, Map<String, Set<MeasurementMNode>>> tagIndex = new HashMap<>();
+ private Map<String, Map<String, Set<MeasurementMNode>>> tagIndex = new ConcurrentHashMap<>();
// data type -> number
private Map<TSDataType, Integer> schemaDataTypeNumMap = new ConcurrentHashMap<>();
@@ -374,7 +375,7 @@ public class MManager {
}
// two time series may set one storage group concurrently,
// that's normal in our concurrency control protocol
- catch (MetadataException e){
+ catch (MetadataException e) {
logger.info("concurrently operate set storage group cmd {} twice", cmd);
}
break;
@@ -430,8 +431,8 @@ public class MManager {
if (plan.getTags() != null) {
// tag key, tag value
for (Entry<String, String> entry : plan.getTags().entrySet()) {
- tagIndex.computeIfAbsent(entry.getKey(), k -> new HashMap<>())
- .computeIfAbsent(entry.getValue(), v -> new HashSet<>()).add(leafMNode);
+ tagIndex.computeIfAbsent(entry.getKey(), k -> new ConcurrentHashMap<>())
+ .computeIfAbsent(entry.getValue(), v -> new CopyOnWriteArraySet<>()).add(leafMNode);
}
}
@@ -566,7 +567,7 @@ public class MManager {
// TODO: delete the path node and all its ancestors
mNodeCache.clear();
totalSeriesNumber.addAndGet(-1);
- if (!allowToCreateNewSeries &&
+ if (!allowToCreateNewSeries &&
totalSeriesNumber.get() * ESTIMATED_SERIES_SIZE < MTREE_SIZE_THRESHOLD) {
logger.info("Current series number {} come back to normal level", totalSeriesNumber);
allowToCreateNewSeries = true;
@@ -600,7 +601,7 @@ public class MManager {
for (PartialPath storageGroup : storageGroups) {
totalSeriesNumber.addAndGet(mtree.getAllTimeseriesCount(storageGroup));
// clear cached MNode
- if (!allowToCreateNewSeries &&
+ if (!allowToCreateNewSeries &&
totalSeriesNumber.get() * ESTIMATED_SERIES_SIZE < MTREE_SIZE_THRESHOLD) {
logger.info("Current series number {} come back to normal level", totalSeriesNumber);
allowToCreateNewSeries = true;
@@ -821,6 +822,9 @@ public class MManager {
List<MeasurementMNode> allMatchedNodes = new ArrayList<>();
if (plan.isContains()) {
for (Entry<String, Set<MeasurementMNode>> entry : value2Node.entrySet()) {
+ if (entry.getKey() == null || entry.getValue() == null) {
+ continue;
+ }
String tagValue = entry.getKey();
if (tagValue.contains(plan.getValue())) {
allMatchedNodes.addAll(entry.getValue());
@@ -828,6 +832,9 @@ public class MManager {
}
} else {
for (Entry<String, Set<MeasurementMNode>> entry : value2Node.entrySet()) {
+ if (entry.getKey() == null || entry.getValue() == null) {
+ continue;
+ }
String tagValue = entry.getKey();
if (plan.getValue().equals(tagValue)) {
allMatchedNodes.addAll(entry.getValue());
@@ -1182,8 +1189,8 @@ public class MManager {
// update inverted Index map
if (tagsMap != null) {
for (Entry<String, String> entry : tagsMap.entrySet()) {
- tagIndex.computeIfAbsent(entry.getKey(), k -> new HashMap<>())
- .computeIfAbsent(entry.getValue(), v -> new HashSet<>()).add(leafMNode);
+ tagIndex.computeIfAbsent(entry.getKey(), k -> new ConcurrentHashMap<>())
+ .computeIfAbsent(entry.getValue(), v -> new CopyOnWriteArraySet<>()).add(leafMNode);
}
}
return;
@@ -1228,8 +1235,8 @@ public class MManager {
// if the key doesn't exist or the value is not equal to the new one
// we should add a new key-value to inverted index map
if (beforeValue == null || !beforeValue.equals(value)) {
- tagIndex.computeIfAbsent(key, k -> new HashMap<>())
- .computeIfAbsent(value, v -> new HashSet<>()).add(leafMNode);
+ tagIndex.computeIfAbsent(key, k -> new ConcurrentHashMap<>())
+ .computeIfAbsent(value, v -> new CopyOnWriteArraySet<>()).add(leafMNode);
}
}
}
@@ -1298,8 +1305,8 @@ public class MManager {
leafMNode.setOffset(offset);
// update inverted Index map
for (Entry<String, String> entry : tagsMap.entrySet()) {
- tagIndex.computeIfAbsent(entry.getKey(), k -> new HashMap<>())
- .computeIfAbsent(entry.getValue(), v -> new HashSet<>()).add(leafMNode);
+ tagIndex.computeIfAbsent(entry.getKey(), k -> new ConcurrentHashMap<>())
+ .computeIfAbsent(entry.getValue(), v -> new CopyOnWriteArraySet<>()).add(leafMNode);
}
return;
}
@@ -1321,8 +1328,8 @@ public class MManager {
tagLogFile.write(pair.left, pair.right, leafMNode.getOffset());
// update tag inverted map
- tagsMap.forEach((key, value) -> tagIndex.computeIfAbsent(key, k -> new HashMap<>())
- .computeIfAbsent(value, v -> new HashSet<>()).add(leafMNode));
+ tagsMap.forEach((key, value) -> tagIndex.computeIfAbsent(key, k -> new ConcurrentHashMap<>())
+ .computeIfAbsent(value, v -> new CopyOnWriteArraySet<>()).add(leafMNode));
}
/**
@@ -1461,8 +1468,8 @@ public class MManager {
tagIndex.containsKey(key)));
}
}
- tagIndex.computeIfAbsent(key, k -> new HashMap<>())
- .computeIfAbsent(currentValue, k -> new HashSet<>()).add(leafMNode);
+ tagIndex.computeIfAbsent(key, k -> new ConcurrentHashMap<>())
+ .computeIfAbsent(currentValue, k -> new CopyOnWriteArraySet<>()).add(leafMNode);
}
}
@@ -1523,8 +1530,8 @@ public class MManager {
tagIndex.containsKey(oldKey)));
}
}
- tagIndex.computeIfAbsent(newKey, k -> new HashMap<>())
- .computeIfAbsent(value, k -> new HashSet<>()).add(leafMNode);
+ tagIndex.computeIfAbsent(newKey, k -> new ConcurrentHashMap<>())
+ .computeIfAbsent(value, k -> new CopyOnWriteArraySet<>()).add(leafMNode);
} else if (pair.right.containsKey(oldKey)) {
// check attribute map
pair.right.put(newKey, pair.right.remove(oldKey));
@@ -1774,8 +1781,9 @@ public class MManager {
if (measurementMNode.getSchema().getType() != insertDataType) {
logger.warn("DataType mismatch, Insert measurement {} type {}, metadata tree type {}",
measurementList[i], insertDataType, measurementMNode.getSchema().getType());
- DataTypeMismatchException mismatchException = new DataTypeMismatchException(measurementList[i],
- insertDataType, measurementMNode.getSchema().getType());
+ DataTypeMismatchException mismatchException = new DataTypeMismatchException(
+ measurementList[i],
+ insertDataType, measurementMNode.getSchema().getType());
if (!config.isEnablePartialInsert()) {
throw mismatchException;
} else {