You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/04/08 17:08:39 UTC

[incubator-iotdb] branch cluster updated: fix bug of manager metadata

This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/cluster by this push:
     new 8b4fd87  fix bug of manager metadata
8b4fd87 is described below

commit 8b4fd874082ba8507269de7f708f0385e6f0a754
Author: lta <li...@163.com>
AuthorDate: Tue Apr 9 01:07:27 2019 +0800

    fix bug of manager metadata
---
 .../apache/iotdb/cluster/callback/BatchQPTask.java | 20 ----------
 .../cluster/qp/executor/NonQueryExecutor.java      | 33 ++++++++---------
 .../cluster/qp/executor/QueryMetadataExecutor.java | 43 ++++++++++++++--------
 .../cluster/integration/IoTDBMetadataFetchIT.java  | 38 +++++++++----------
 4 files changed, 62 insertions(+), 72 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/callback/BatchQPTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/callback/BatchQPTask.java
index 4922c71..bd8a073 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/callback/BatchQPTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/callback/BatchQPTask.java
@@ -115,9 +115,6 @@ public class BatchQPTask extends MultiQPTask {
 
   public void execute(NonQueryExecutor executor) {
     this.executor = executor;
-    // Check if it has metadata group task
-    LOGGER.debug("Check if it has metadata group task");
-    checkMetadataGroupTask();
 
     for (Entry<String, QPTask> entry : taskMap.entrySet()) {
       String groupId = entry.getKey();
@@ -136,23 +133,6 @@ public class BatchQPTask extends MultiQPTask {
   }
 
   /**
-   * Check whether has metadata group task, it has the highest priority than others.
-   */
-  private void checkMetadataGroupTask(){
-    String groupId = ClusterConfig.METADATA_GROUP_ID;
-    if(taskMap.containsKey(groupId)){
-      PeerId leader = RaftUtils.getLeaderPeerID(groupId);
-      QPTask subTask = taskMap.get(groupId);
-      if(executor.canHandleNonQueryByGroupId(groupId)) {
-        executeLocalSubTask(subTask, groupId);
-      }else{
-        executeRpcSubTask(subTask, leader, groupId);
-      }
-      taskMap.remove(groupId);
-    }
-  }
-
-  /**
    * Execute local sub task
    */
   private void executeLocalSubTask(QPTask subTask, String groupId) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
index 9ad4fbe..1098ba2 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
@@ -115,14 +115,19 @@ public class NonQueryExecutor extends ClusterQPExecutor {
         PhysicalPlan plan = physicalPlans[i];
         try {
           String groupId = getGroupIdFromPhysicalPlan(plan);
-
-          if (!physicalPlansMap.containsKey(groupId)) {
-            physicalPlansMap.put(groupId, new ArrayList<>());
-            planIndexMap.put(groupId, new ArrayList<>());
+          if (groupId.equals(ClusterConfig.METADATA_GROUP_ID)) {
+            LOGGER.debug("Execute metadata group task");
+            result[i] = handleNonQueryRequest(groupId, plan) ? Statement.SUCCESS_NO_INFO
+                : Statement.EXECUTE_FAILED;
+          }else {
+            if (!physicalPlansMap.containsKey(groupId)) {
+              physicalPlansMap.put(groupId, new ArrayList<>());
+              planIndexMap.put(groupId, new ArrayList<>());
+            }
+            physicalPlansMap.get(groupId).add(plan);
+            planIndexMap.get(groupId).add(i);
           }
-          physicalPlansMap.get(groupId).add(plan);
-          planIndexMap.get(groupId).add(i);
-        } catch (PathErrorException|ProcessorException e) {
+        } catch (PathErrorException | ProcessorException | IOException | RaftConnectionException e) {
           result[i] = Statement.EXECUTE_FAILED;
           batchResult.setAllSuccessful(false);
           batchResult.setBatchErrorMessage(e.getMessage());
@@ -131,22 +136,16 @@ public class NonQueryExecutor extends ClusterQPExecutor {
       }
     }
 
-    /** 2. Construct Multiple Requests **/
+    /** 2. Construct Multiple Data Group Requests **/
     Map<String, QPTask> subTaskMap = new HashMap<>();
     for (Entry<String, List<PhysicalPlan>> entry : physicalPlansMap.entrySet()) {
       String groupId = entry.getKey();
       SingleQPTask singleQPTask;
       BasicRequest request;
       try {
-        if(groupId.equals(ClusterConfig.METADATA_GROUP_ID)){
-          LOGGER.debug(
-              String.format("METADATA_GROUP_ID Send batch size() : %d", entry.getValue().size()));
-          request = new MetaGroupNonQueryRequest(groupId, entry.getValue());
-        }else {
-          LOGGER.debug(
-              String.format("DATA_GROUP_ID Send batch size() : %d", entry.getValue().size()));
-          request = new DataGroupNonQueryRequest(groupId, entry.getValue());
-        }
+        LOGGER.debug(
+            String.format("DATA_GROUP_ID Send batch size() : %d", entry.getValue().size()));
+        request = new DataGroupNonQueryRequest(groupId, entry.getValue());
         singleQPTask = new SingleQPTask(false, request);
         subTaskMap.put(groupId, singleQPTask);
       } catch (IOException e) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
index 0439bd5..a96bd60 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.cluster.qp.executor;
 import com.alipay.sofa.jraft.Status;
 import com.alipay.sofa.jraft.closure.ReadIndexClosure;
 import com.alipay.sofa.jraft.entity.PeerId;
-import com.alipay.sofa.jraft.util.Bits;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -55,6 +54,7 @@ import org.slf4j.LoggerFactory;
 public class QueryMetadataExecutor extends ClusterQPExecutor {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(QueryMetadataExecutor.class);
+  private static final String DOUB_SEPARATOR = "\\.";
 
   public QueryMetadataExecutor() {
     super();
@@ -73,14 +73,10 @@ public class QueryMetadataExecutor extends ClusterQPExecutor {
     List<String> storageGroupList = mManager.getAllFileNamesByPath(path);
     if (storageGroupList.isEmpty()) {
       return new ArrayList<>();
-    } if(storageGroupList.size() == 1){
-      List<String> paths = new ArrayList<>();
-      paths.add(path);
-      handleTimseriesQuery(getGroupIdBySG(path), paths, res);
     } else {
       Map<String, Set<String>> groupIdSGMap = classifySGByGroupId(storageGroupList);
       for (Entry<String, Set<String>> entry : groupIdSGMap.entrySet()) {
-        List<String> paths = new ArrayList<>(entry.getValue());
+        List<String> paths = getSubQueryPaths(entry.getValue(), path);
         String groupId = entry.getKey();
         handleTimseriesQuery(groupId, paths, res);
       }
@@ -89,6 +85,21 @@ public class QueryMetadataExecutor extends ClusterQPExecutor {
   }
 
   /**
+   * Get all query path in storage group relatively to query path
+   */
+  private List<String> getSubQueryPaths(Set<String> stoageGroupList, String queryPath) {
+    List<String> paths = new ArrayList<>();
+    for (String storageGroup : stoageGroupList) {
+      if (storageGroup.length() >= queryPath.length()) {
+        paths.add(storageGroup);
+      } else {
+        paths.add(storageGroup + queryPath.substring(storageGroup.length()));
+      }
+    }
+    return paths;
+  }
+
+  /**
    * Handle query timeseries in one data group
    *
    * @param groupId data group id
@@ -167,7 +178,7 @@ public class QueryMetadataExecutor extends ClusterQPExecutor {
     DataPartitionRaftHolder dataPartitionHolder = RaftUtils.getDataPartitonRaftHolder(groupId);
 
     /** Check consistency level**/
-    if(readMetadataConsistencyLevel == ClusterConstant.WEAK_CONSISTENCY_LEVEL){
+    if (readMetadataConsistencyLevel == ClusterConstant.WEAK_CONSISTENCY_LEVEL) {
       QueryTimeSeriesResponse response = QueryTimeSeriesResponse
           .createEmptyInstance(groupId);
       try {
@@ -231,12 +242,12 @@ public class QueryMetadataExecutor extends ClusterQPExecutor {
     MetadataRaftHolder metadataHolder = (MetadataRaftHolder) server.getMetadataHolder();
     if (readMetadataConsistencyLevel == ClusterConstant.WEAK_CONSISTENCY_LEVEL) {
       QueryStorageGroupResponse response;
-        try {
-          response = QueryStorageGroupResponse
-              .createSuccessInstance(metadataHolder.getFsm().getAllStorageGroups());
-        } catch (final PathErrorException e) {
-          response = QueryStorageGroupResponse.createErrorInstance(e.getMessage());
-        }
+      try {
+        response = QueryStorageGroupResponse
+            .createSuccessInstance(metadataHolder.getFsm().getAllStorageGroups());
+      } catch (final PathErrorException e) {
+        response = QueryStorageGroupResponse.createErrorInstance(e.getMessage());
+      }
       task.run(response);
     } else {
       ((RaftService) metadataHolder.getService()).getNode()
@@ -270,10 +281,10 @@ public class QueryMetadataExecutor extends ClusterQPExecutor {
     final byte[] reqContext = RaftUtils.createRaftRequestContext();
     DataPartitionRaftHolder dataPartitionHolder = (DataPartitionRaftHolder) server
         .getDataPartitionHolder(groupId);
-    if(readMetadataConsistencyLevel == ClusterConstant.WEAK_CONSISTENCY_LEVEL){
+    if (readMetadataConsistencyLevel == ClusterConstant.WEAK_CONSISTENCY_LEVEL) {
       QueryMetadataInStringResponse response = QueryMetadataInStringResponse
-            .createSuccessInstance(groupId, mManager.getMetadataInString());
-        response.addResult(true);
+          .createSuccessInstance(groupId, mManager.getMetadataInString());
+      response.addResult(true);
       task.run(response);
     } else {
       ((RaftService) dataPartitionHolder.getService()).getNode()
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBMetadataFetchIT.java b/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBMetadataFetchIT.java
index 19db49c..fb0be10 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBMetadataFetchIT.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBMetadataFetchIT.java
@@ -60,34 +60,34 @@ public class IoTDBMetadataFetchIT {
     EnvironmentUtils.cleanEnv();
   }
 
+//  @Test
+//  public void test() throws SQLException {
+//    Connection connection = null;
+//    try {
+//      connection = DriverManager.getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+//      insertSQL(connection, false);
+////      testShowStorageGroup(connection);
+//      testDatabaseMetadata(connection);
+////      testShowTimeseries(connection);
+////      testShowTimeseriesPath(connection);
+//    } finally {
+//      connection.close();
+//    }
+//  }
+
   @Test
-  public void test() throws SQLException {
+  public void testBatch() throws SQLException {
     Connection connection = null;
     try {
       connection = DriverManager.getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
-      insertSQL(connection, false);
-//      testShowStorageGroup(connection);
-      testDatabaseMetadata(connection);
-//      testShowTimeseries(connection);
-//      testShowTimeseriesPath(connection);
+      insertSQL(connection, true);
+      testShowTimeseries(connection);
+      testShowTimeseriesPath(connection);
     } finally {
       connection.close();
     }
   }
 
-  //Test
-//  public void testBatch() throws SQLException {
-//    Connection connection = null;
-//    try {
-//      connection = DriverManager.getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
-//      insertSQL(connection, true);
-//      testShowTimeseries(connection);
-//      testShowTimeseriesPath(connection);
-//    } finally {
-//      connection.close();
-//    }
-//  }
-
   private void insertSQL(Connection connection, boolean isBatch) throws SQLException {
     Statement statement = connection.createStatement();
     String[] insertSqls = new String[]{