You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/04/08 17:08:39 UTC
[incubator-iotdb] branch cluster updated: fix bug of manager
metadata
This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/cluster by this push:
new 8b4fd87 fix bug of manager metadata
8b4fd87 is described below
commit 8b4fd874082ba8507269de7f708f0385e6f0a754
Author: lta <li...@163.com>
AuthorDate: Tue Apr 9 01:07:27 2019 +0800
fix bug of manager metadata
---
.../apache/iotdb/cluster/callback/BatchQPTask.java | 20 ----------
.../cluster/qp/executor/NonQueryExecutor.java | 33 ++++++++---------
.../cluster/qp/executor/QueryMetadataExecutor.java | 43 ++++++++++++++--------
.../cluster/integration/IoTDBMetadataFetchIT.java | 38 +++++++++----------
4 files changed, 62 insertions(+), 72 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/callback/BatchQPTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/callback/BatchQPTask.java
index 4922c71..bd8a073 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/callback/BatchQPTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/callback/BatchQPTask.java
@@ -115,9 +115,6 @@ public class BatchQPTask extends MultiQPTask {
public void execute(NonQueryExecutor executor) {
this.executor = executor;
- // Check if it has metadata group task
- LOGGER.debug("Check if it has metadata group task");
- checkMetadataGroupTask();
for (Entry<String, QPTask> entry : taskMap.entrySet()) {
String groupId = entry.getKey();
@@ -136,23 +133,6 @@ public class BatchQPTask extends MultiQPTask {
}
/**
- * Check whether has metadata group task, it has the highest priority than others.
- */
- private void checkMetadataGroupTask(){
- String groupId = ClusterConfig.METADATA_GROUP_ID;
- if(taskMap.containsKey(groupId)){
- PeerId leader = RaftUtils.getLeaderPeerID(groupId);
- QPTask subTask = taskMap.get(groupId);
- if(executor.canHandleNonQueryByGroupId(groupId)) {
- executeLocalSubTask(subTask, groupId);
- }else{
- executeRpcSubTask(subTask, leader, groupId);
- }
- taskMap.remove(groupId);
- }
- }
-
- /**
* Execute local sub task
*/
private void executeLocalSubTask(QPTask subTask, String groupId) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
index 9ad4fbe..1098ba2 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
@@ -115,14 +115,19 @@ public class NonQueryExecutor extends ClusterQPExecutor {
PhysicalPlan plan = physicalPlans[i];
try {
String groupId = getGroupIdFromPhysicalPlan(plan);
-
- if (!physicalPlansMap.containsKey(groupId)) {
- physicalPlansMap.put(groupId, new ArrayList<>());
- planIndexMap.put(groupId, new ArrayList<>());
+ if (groupId.equals(ClusterConfig.METADATA_GROUP_ID)) {
+ LOGGER.debug("Execute metadata group task");
+ result[i] = handleNonQueryRequest(groupId, plan) ? Statement.SUCCESS_NO_INFO
+ : Statement.EXECUTE_FAILED;
+ }else {
+ if (!physicalPlansMap.containsKey(groupId)) {
+ physicalPlansMap.put(groupId, new ArrayList<>());
+ planIndexMap.put(groupId, new ArrayList<>());
+ }
+ physicalPlansMap.get(groupId).add(plan);
+ planIndexMap.get(groupId).add(i);
}
- physicalPlansMap.get(groupId).add(plan);
- planIndexMap.get(groupId).add(i);
- } catch (PathErrorException|ProcessorException e) {
+ } catch (PathErrorException | ProcessorException | IOException | RaftConnectionException e) {
result[i] = Statement.EXECUTE_FAILED;
batchResult.setAllSuccessful(false);
batchResult.setBatchErrorMessage(e.getMessage());
@@ -131,22 +136,16 @@ public class NonQueryExecutor extends ClusterQPExecutor {
}
}
- /** 2. Construct Multiple Requests **/
+ /** 2. Construct Multiple Data Group Requests **/
Map<String, QPTask> subTaskMap = new HashMap<>();
for (Entry<String, List<PhysicalPlan>> entry : physicalPlansMap.entrySet()) {
String groupId = entry.getKey();
SingleQPTask singleQPTask;
BasicRequest request;
try {
- if(groupId.equals(ClusterConfig.METADATA_GROUP_ID)){
- LOGGER.debug(
- String.format("METADATA_GROUP_ID Send batch size() : %d", entry.getValue().size()));
- request = new MetaGroupNonQueryRequest(groupId, entry.getValue());
- }else {
- LOGGER.debug(
- String.format("DATA_GROUP_ID Send batch size() : %d", entry.getValue().size()));
- request = new DataGroupNonQueryRequest(groupId, entry.getValue());
- }
+ LOGGER.debug(
+ String.format("DATA_GROUP_ID Send batch size() : %d", entry.getValue().size()));
+ request = new DataGroupNonQueryRequest(groupId, entry.getValue());
singleQPTask = new SingleQPTask(false, request);
subTaskMap.put(groupId, singleQPTask);
} catch (IOException e) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
index 0439bd5..a96bd60 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.cluster.qp.executor;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.closure.ReadIndexClosure;
import com.alipay.sofa.jraft.entity.PeerId;
-import com.alipay.sofa.jraft.util.Bits;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -55,6 +54,7 @@ import org.slf4j.LoggerFactory;
public class QueryMetadataExecutor extends ClusterQPExecutor {
private static final Logger LOGGER = LoggerFactory.getLogger(QueryMetadataExecutor.class);
+ private static final String DOUB_SEPARATOR = "\\.";
public QueryMetadataExecutor() {
super();
@@ -73,14 +73,10 @@ public class QueryMetadataExecutor extends ClusterQPExecutor {
List<String> storageGroupList = mManager.getAllFileNamesByPath(path);
if (storageGroupList.isEmpty()) {
return new ArrayList<>();
- } if(storageGroupList.size() == 1){
- List<String> paths = new ArrayList<>();
- paths.add(path);
- handleTimseriesQuery(getGroupIdBySG(path), paths, res);
} else {
Map<String, Set<String>> groupIdSGMap = classifySGByGroupId(storageGroupList);
for (Entry<String, Set<String>> entry : groupIdSGMap.entrySet()) {
- List<String> paths = new ArrayList<>(entry.getValue());
+ List<String> paths = getSubQueryPaths(entry.getValue(), path);
String groupId = entry.getKey();
handleTimseriesQuery(groupId, paths, res);
}
@@ -89,6 +85,21 @@ public class QueryMetadataExecutor extends ClusterQPExecutor {
}
/**
+ * Get all query path in storage group relatively to query path
+ */
+ private List<String> getSubQueryPaths(Set<String> stoageGroupList, String queryPath) {
+ List<String> paths = new ArrayList<>();
+ for (String storageGroup : stoageGroupList) {
+ if (storageGroup.length() >= queryPath.length()) {
+ paths.add(storageGroup);
+ } else {
+ paths.add(storageGroup + queryPath.substring(storageGroup.length()));
+ }
+ }
+ return paths;
+ }
+
+ /**
* Handle query timeseries in one data group
*
* @param groupId data group id
@@ -167,7 +178,7 @@ public class QueryMetadataExecutor extends ClusterQPExecutor {
DataPartitionRaftHolder dataPartitionHolder = RaftUtils.getDataPartitonRaftHolder(groupId);
/** Check consistency level**/
- if(readMetadataConsistencyLevel == ClusterConstant.WEAK_CONSISTENCY_LEVEL){
+ if (readMetadataConsistencyLevel == ClusterConstant.WEAK_CONSISTENCY_LEVEL) {
QueryTimeSeriesResponse response = QueryTimeSeriesResponse
.createEmptyInstance(groupId);
try {
@@ -231,12 +242,12 @@ public class QueryMetadataExecutor extends ClusterQPExecutor {
MetadataRaftHolder metadataHolder = (MetadataRaftHolder) server.getMetadataHolder();
if (readMetadataConsistencyLevel == ClusterConstant.WEAK_CONSISTENCY_LEVEL) {
QueryStorageGroupResponse response;
- try {
- response = QueryStorageGroupResponse
- .createSuccessInstance(metadataHolder.getFsm().getAllStorageGroups());
- } catch (final PathErrorException e) {
- response = QueryStorageGroupResponse.createErrorInstance(e.getMessage());
- }
+ try {
+ response = QueryStorageGroupResponse
+ .createSuccessInstance(metadataHolder.getFsm().getAllStorageGroups());
+ } catch (final PathErrorException e) {
+ response = QueryStorageGroupResponse.createErrorInstance(e.getMessage());
+ }
task.run(response);
} else {
((RaftService) metadataHolder.getService()).getNode()
@@ -270,10 +281,10 @@ public class QueryMetadataExecutor extends ClusterQPExecutor {
final byte[] reqContext = RaftUtils.createRaftRequestContext();
DataPartitionRaftHolder dataPartitionHolder = (DataPartitionRaftHolder) server
.getDataPartitionHolder(groupId);
- if(readMetadataConsistencyLevel == ClusterConstant.WEAK_CONSISTENCY_LEVEL){
+ if (readMetadataConsistencyLevel == ClusterConstant.WEAK_CONSISTENCY_LEVEL) {
QueryMetadataInStringResponse response = QueryMetadataInStringResponse
- .createSuccessInstance(groupId, mManager.getMetadataInString());
- response.addResult(true);
+ .createSuccessInstance(groupId, mManager.getMetadataInString());
+ response.addResult(true);
task.run(response);
} else {
((RaftService) dataPartitionHolder.getService()).getNode()
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBMetadataFetchIT.java b/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBMetadataFetchIT.java
index 19db49c..fb0be10 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBMetadataFetchIT.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBMetadataFetchIT.java
@@ -60,34 +60,34 @@ public class IoTDBMetadataFetchIT {
EnvironmentUtils.cleanEnv();
}
+// @Test
+// public void test() throws SQLException {
+// Connection connection = null;
+// try {
+// connection = DriverManager.getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+// insertSQL(connection, false);
+//// testShowStorageGroup(connection);
+// testDatabaseMetadata(connection);
+//// testShowTimeseries(connection);
+//// testShowTimeseriesPath(connection);
+// } finally {
+// connection.close();
+// }
+// }
+
@Test
- public void test() throws SQLException {
+ public void testBatch() throws SQLException {
Connection connection = null;
try {
connection = DriverManager.getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
- insertSQL(connection, false);
-// testShowStorageGroup(connection);
- testDatabaseMetadata(connection);
-// testShowTimeseries(connection);
-// testShowTimeseriesPath(connection);
+ insertSQL(connection, true);
+ testShowTimeseries(connection);
+ testShowTimeseriesPath(connection);
} finally {
connection.close();
}
}
- //Test
-// public void testBatch() throws SQLException {
-// Connection connection = null;
-// try {
-// connection = DriverManager.getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
-// insertSQL(connection, true);
-// testShowTimeseries(connection);
-// testShowTimeseriesPath(connection);
-// } finally {
-// connection.close();
-// }
-// }
-
private void insertSQL(Connection connection, boolean isBatch) throws SQLException {
Statement statement = connection.createStatement();
String[] insertSqls = new String[]{