You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/04/28 14:46:05 UTC
[iotdb] 01/01: fix some bugs when debug
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch xingtanzjr/fix_some_bugs
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 0e86f534151197796d596b6f7253880e011b1c54
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Thu Apr 28 22:43:20 2022 +0800
fix some bugs when debug
---
.../main/java/org/apache/iotdb/confignode/manager/ConfigManager.java | 5 ++---
.../main/java/org/apache/iotdb/commons/partition/DataPartition.java | 2 +-
.../java/org/apache/iotdb/commons/partition/SchemaPartition.java | 2 +-
.../java/org/apache/iotdb/db/mpp/common/schematree/SchemaTree.java | 2 +-
.../db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java | 3 +++
server/src/main/java/org/apache/iotdb/db/mpp/memory/MemoryPool.java | 2 +-
.../org/apache/iotdb/db/mpp/sql/analyze/ClusterPartitionFetcher.java | 4 ++--
.../org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java | 1 +
.../iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java | 5 -----
.../org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java | 2 +-
10 files changed, 13 insertions(+), 15 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 94ac7c68c3..c8b55e1bdc 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -198,7 +198,6 @@ public class ConfigManager implements Manager {
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
List<String> devicePaths = patternTree.findAllDevicePaths();
List<String> storageGroups = getClusterSchemaManager().getStorageGroupNames();
-
GetSchemaPartitionReq getSchemaPartitionReq = new GetSchemaPartitionReq();
Map<String, List<TSeriesPartitionSlot>> partitionSlotsMap = new HashMap<>();
@@ -207,7 +206,7 @@ public class ConfigManager implements Manager {
for (String devicePath : devicePaths) {
boolean matchStorageGroup = false;
for (String storageGroup : storageGroups) {
- if (devicePath.contains(storageGroup)) {
+ if (devicePath.startsWith(storageGroup + ".")) {
matchStorageGroup = true;
if (devicePath.contains("*")) {
// Get all SchemaPartitions of this StorageGroup if the devicePath contains "*"
@@ -263,7 +262,7 @@ public class ConfigManager implements Manager {
if (!devicePath.contains("*")) {
// Only check devicePaths that without "*"
for (String storageGroup : storageGroups) {
- if (devicePath.contains(storageGroup)) {
+ if (devicePath.startsWith(storageGroup + ".")) {
partitionSlotsMap
.computeIfAbsent(storageGroup, key -> new ArrayList<>())
.add(getPartitionManager().getSeriesPartitionSlot(devicePath));
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
index 5156ba47af..8c150d0ba4 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
@@ -104,7 +104,7 @@ public class DataPartition extends Partition {
private String getStorageGroupByDevice(String deviceName) {
for (String storageGroup : dataPartitionMap.keySet()) {
- if (deviceName.startsWith(storageGroup)) {
+ if (deviceName.startsWith(storageGroup + ".")) {
return storageGroup;
}
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java
index a93048ff83..2c8e415199 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java
@@ -63,7 +63,7 @@ public class SchemaPartition extends Partition {
private String getStorageGroupByDevice(String deviceName) {
for (String storageGroup : schemaPartitionMap.keySet()) {
- if (deviceName.startsWith(storageGroup)) {
+ if (deviceName.startsWith(storageGroup + ".")) {
return storageGroup;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/SchemaTree.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/SchemaTree.java
index 2bf2d8e72a..bc5b1c8994 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/SchemaTree.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/SchemaTree.java
@@ -228,7 +228,7 @@ public class SchemaTree {
*/
public String getBelongedStorageGroup(PartialPath path) {
for (String storageGroup : storageGroups) {
- if (path.getFullPath().startsWith(storageGroup)) {
+ if (path.getFullPath().startsWith(storageGroup + ".")) {
return storageGroup;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
index 245dbcc435..6bf7fad34a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
@@ -84,6 +84,9 @@ public class SimpleFragInstanceDispatcher implements IFragInstanceDispatcher {
client.close();
}
throw e;
+ } catch (Exception e) {
+ LOGGER.error("unexpected exception", e);
+ throw e;
} finally {
if (client != null) {
client.returnSelf();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/memory/MemoryPool.java b/server/src/main/java/org/apache/iotdb/db/mpp/memory/MemoryPool.java
index 14950ed94c..e7fb2e2b5c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/memory/MemoryPool.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/memory/MemoryPool.java
@@ -141,7 +141,7 @@ public class MemoryPool {
* return 0.
*/
public synchronized long tryCancel(ListenableFuture<Void> future) {
- if (future.isDone()) {
+ if (future == null || future.isDone()) {
return 0L;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterPartitionFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterPartitionFetcher.java
index b6426dca9e..1d1e0ca64a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterPartitionFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterPartitionFetcher.java
@@ -432,7 +432,7 @@ public class ClusterPartitionFetcher implements IPartitionFetcher {
for (String devicePath : devicePaths) {
boolean hit = false;
for (String storageGroup : storageGroupCache) {
- if (devicePath.startsWith(storageGroup)) {
+ if (devicePath.startsWith(storageGroup + ".")) {
deviceToStorageGroupMap.put(devicePath, storageGroup);
hit = true;
break;
@@ -558,7 +558,7 @@ public class ClusterPartitionFetcher implements IPartitionFetcher {
if (!device.contains("*")) {
String storageGroup = null;
for (String storageGroupName : storageGroupNames) {
- if (device.startsWith(storageGroupName)) {
+ if (device.startsWith(storageGroupName + ".")) {
storageGroup = storageGroupName;
break;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
index 3c9329c469..866d7ce0d4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
@@ -536,6 +536,7 @@ public class DistributionPlanner {
FragmentSinkNode sinkNode = new FragmentSinkNode(context.getQueryId().genPlanNodeId());
sinkNode.setChild(exchangeNode.getChild());
sinkNode.setDownStreamPlanNodeId(exchangeNode.getPlanNodeId());
+
// Record the source node info in the ExchangeNode so that we can keep the connection of
// these nodes/fragments
exchangeNode.setRemoteSourceNode(sinkNode);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
index 09ddd063ed..ae97ffff0e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
@@ -22,7 +22,6 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
-import org.apache.iotdb.db.mpp.sql.planner.plan.PlanFragment;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
@@ -110,8 +109,6 @@ public class ExchangeNode extends PlanNode {
}
public static ExchangeNode deserialize(ByteBuffer byteBuffer) {
- FragmentSinkNode fragmentSinkNode =
- (FragmentSinkNode) PlanFragment.deserializeHelper(byteBuffer);
TEndPoint endPoint =
new TEndPoint(
ReadWriteIOUtils.readString(byteBuffer), ReadWriteIOUtils.readInt(byteBuffer));
@@ -120,14 +117,12 @@ public class ExchangeNode extends PlanNode {
PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
ExchangeNode exchangeNode = new ExchangeNode(planNodeId);
exchangeNode.setUpstream(endPoint, fragmentInstanceId, upstreamPlanNodeId);
- exchangeNode.setRemoteSourceNode(fragmentSinkNode);
return exchangeNode;
}
@Override
protected void serializeAttributes(ByteBuffer byteBuffer) {
PlanNodeType.EXCHANGE.serialize(byteBuffer);
- remoteSourceNode.serialize(byteBuffer);
ReadWriteIOUtils.write(upstreamEndpoint.getIp(), byteBuffer);
ReadWriteIOUtils.write(upstreamEndpoint.getPort(), byteBuffer);
upstreamInstanceId.serialize(byteBuffer);
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
index 493b8466a2..86810c8372 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
@@ -137,7 +137,7 @@ public class InternalServiceImpl implements InternalService.Iface {
// TODO need to be implemented and currently in order not to print NotImplementedException log,
// we simply return null
- return null;
+ return new TCancelResp(true);
// throw new NotImplementedException();
}