You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by do...@apache.org on 2019/04/12 11:42:25 UTC

[incubator-iotdb] branch cluster updated: implement DELTA_OBEJECT, COLUMN, ALL_COLUMNS for cluster metadata query (#143)

This is an automated email from the ASF dual-hosted git repository.

dope pushed a commit to branch cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/cluster by this push:
     new ff6f262  implement DELTA_OBEJECT, COLUMN, ALL_COLUMNS for cluster metadata query (#143)
ff6f262 is described below

commit ff6f262fa4921dca63db53a4bc60856b9925409a
Author: XuYi <My...@users.noreply.github.com>
AuthorDate: Fri Apr 12 19:42:21 2019 +0800

    implement DELTA_OBEJECT, COLUMN, ALL_COLUMNS for cluster metadata query (#143)
---
 .../org/apache/iotdb/cluster/entity/Server.java    |   6 +
 .../cluster/qp/executor/QueryMetadataExecutor.java | 249 ++++++++++++---------
 .../rpc/raft/impl/RaftNodeAsClientManager.java     |   8 +
 ...essor.java => QueryMetadataAsyncProcessor.java} |  45 ++--
 .../QueryMetadataInStringAsyncProcessor.java       |   5 +-
 ...rocessor.java => QueryPathsAsyncProcessor.java} |  45 ++--
 ...sor.java => QuerySeriesTypeAsyncProcessor.java} |  47 ++--
 .../processor/QueryTimeSeriesAsyncProcessor.java   |  10 +-
 .../rpc/raft/request/QueryMetadataRequest.java     |  28 +++
 .../rpc/raft/request/QueryPathsRequest.java        |  36 +++
 .../rpc/raft/request/QuerySeriesTypeRequest.java   |  35 +++
 .../rpc/raft/response/QueryMetadataResponse.java   |  47 ++++
 .../rpc/raft/response/QueryPathsResponse.java      |  50 +++++
 .../rpc/raft/response/QuerySeriesTypeResponse.java |  50 +++++
 .../cluster/rpc/service/TSServiceClusterImpl.java  |  19 ++
 .../org/apache/iotdb/db/metadata/Metadata.java     | 121 +++++++++-
 .../org/apache/iotdb/db/service/TSServiceImpl.java |  25 ++-
 .../org/apache/iotdb/db/metadata/MetadataTest.java |  93 ++++++++
 18 files changed, 745 insertions(+), 174 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java b/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
index be4a74a..ad1b4d6 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
@@ -32,7 +32,10 @@ import org.apache.iotdb.cluster.entity.raft.MetadataRaftHolder;
 import org.apache.iotdb.cluster.rpc.raft.impl.RaftNodeAsClientManager;
 import org.apache.iotdb.cluster.rpc.raft.processor.DataGroupNonQueryAsyncProcessor;
 import org.apache.iotdb.cluster.rpc.raft.processor.MetaGroupNonQueryAsyncProcessor;
+import org.apache.iotdb.cluster.rpc.raft.processor.QueryMetadataAsyncProcessor;
 import org.apache.iotdb.cluster.rpc.raft.processor.QueryMetadataInStringAsyncProcessor;
+import org.apache.iotdb.cluster.rpc.raft.processor.QueryPathsAsyncProcessor;
+import org.apache.iotdb.cluster.rpc.raft.processor.QuerySeriesTypeAsyncProcessor;
 import org.apache.iotdb.cluster.rpc.raft.processor.QueryTimeSeriesAsyncProcessor;
 import org.apache.iotdb.cluster.utils.RaftUtils;
 import org.apache.iotdb.cluster.utils.hash.PhysicalNode;
@@ -93,6 +96,9 @@ public class Server {
     rpcServer.registerUserProcessor(new MetaGroupNonQueryAsyncProcessor());
     rpcServer.registerUserProcessor(new QueryTimeSeriesAsyncProcessor());
     rpcServer.registerUserProcessor(new QueryMetadataInStringAsyncProcessor());
+    rpcServer.registerUserProcessor(new QueryMetadataAsyncProcessor());
+    rpcServer.registerUserProcessor(new QuerySeriesTypeAsyncProcessor());
+    rpcServer.registerUserProcessor(new QueryPathsAsyncProcessor());
 
     metadataHolder = new MetadataRaftHolder(peerIds, serverId, rpcServer, true);
     metadataHolder.init();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
index 8dc20b3..1dfbc7e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
@@ -35,16 +35,24 @@ import org.apache.iotdb.cluster.entity.raft.RaftService;
 import org.apache.iotdb.cluster.exception.RaftConnectionException;
 import org.apache.iotdb.cluster.qp.ClusterQPExecutor;
 import org.apache.iotdb.cluster.rpc.raft.request.QueryMetadataInStringRequest;
+import org.apache.iotdb.cluster.rpc.raft.request.QueryMetadataRequest;
+import org.apache.iotdb.cluster.rpc.raft.request.QueryPathsRequest;
+import org.apache.iotdb.cluster.rpc.raft.request.QuerySeriesTypeRequest;
 import org.apache.iotdb.cluster.rpc.raft.request.QueryStorageGroupRequest;
 import org.apache.iotdb.cluster.rpc.raft.request.QueryTimeSeriesRequest;
 import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse;
 import org.apache.iotdb.cluster.rpc.raft.response.QueryMetadataInStringResponse;
+import org.apache.iotdb.cluster.rpc.raft.response.QueryMetadataResponse;
+import org.apache.iotdb.cluster.rpc.raft.response.QueryPathsResponse;
+import org.apache.iotdb.cluster.rpc.raft.response.QuerySeriesTypeResponse;
 import org.apache.iotdb.cluster.rpc.raft.response.QueryStorageGroupResponse;
 import org.apache.iotdb.cluster.rpc.raft.response.QueryTimeSeriesResponse;
 import org.apache.iotdb.cluster.utils.RaftUtils;
 import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.db.exception.ProcessorException;
 import org.apache.iotdb.db.metadata.MManager;
+import org.apache.iotdb.db.metadata.Metadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -109,7 +117,7 @@ public class QueryMetadataExecutor extends ClusterQPExecutor {
     }
     return paths;
   }
-
+  
   /**
    * Handle query timeseries in one data group
    *
@@ -122,18 +130,18 @@ public class QueryMetadataExecutor extends ClusterQPExecutor {
     SingleQPTask task = new SingleQPTask(false, request);
 
     LOGGER.debug("Execute show timeseries {} statement for group {}.", pathList, groupId);
+    PeerId holder;
     /** Check if the plan can be executed locally. **/
     if (canHandleQueryByGroupId(groupId)) {
-      LOGGER.debug("Execute show timeseries {} statement locally for group {}.", pathList, groupId);
-      res.addAll(queryTimeSeriesLocally(pathList, groupId, task));
+      LOGGER.debug("Execute show timeseries {} statement locally for group {} by sending request to local node.", pathList, groupId);
+      holder = this.server.getServerId();
     } else {
-      try {
-        PeerId holder = RaftUtils.getRandomPeerID(groupId);
-        res.addAll(queryTimeSeries(task, holder));
-      } catch (RaftConnectionException e) {
-        LOGGER.error(e.getMessage());
-        throw new ProcessorException("Raft connection occurs error.", e);
-      }
+      holder = RaftUtils.getRandomPeerID(groupId);
+    }
+    try {
+      res.addAll(queryTimeSeries(task, holder));
+    } catch (RaftConnectionException e) {
+      throw new ProcessorException("Raft connection occurs error.", e);
     }
   }
 
@@ -150,18 +158,18 @@ public class QueryMetadataExecutor extends ClusterQPExecutor {
       taskList.add(task);
 
       LOGGER.debug("Execute show metadata in string statement for group {}.", groupId);
+      PeerId holder;
       /** Check if the plan can be executed locally. **/
       if (canHandleQueryByGroupId(groupId)) {
-        LOGGER.debug("Execute show metadata in string statement locally for group {}.", groupId);
-        asyncQueryMetadataInStringLocally(groupId, task);
+        LOGGER.debug("Execute show metadata in string statement locally for group {} by sending request to local node.", groupId);
+        holder = this.server.getServerId();
       } else {
-        try {
-          PeerId holder = RaftUtils.getRandomPeerID(groupId);
-          asyncSendNonQueryTask(task, holder, 0);
-        } catch (RaftConnectionException e) {
-          LOGGER.error(e.getMessage());
-          throw new ProcessorException("Raft connection occurs error.", e);
-        }
+        holder = RaftUtils.getRandomPeerID(groupId);
+      }
+      try {
+        asyncSendNonQueryTask(task, holder, 0);
+      } catch (RaftConnectionException e) {
+        throw new ProcessorException("Raft connection occurs error.", e);
       }
     }
     for (int i = 0; i < taskList.size(); i++) {
@@ -169,7 +177,6 @@ public class QueryMetadataExecutor extends ClusterQPExecutor {
       task.await();
       BasicResponse response = task.getResponse();
       if (response == null || !response.isSuccess()) {
-        LOGGER.error("Execute show timeseries statement false.");
         throw new ProcessorException();
       }
       metadataList.add(((QueryMetadataInStringResponse)response).getMetadata());
@@ -177,61 +184,124 @@ public class QueryMetadataExecutor extends ClusterQPExecutor {
     return combineMetadataInStringList(metadataList);
   }
 
-  /**
-   * Handle "show timeseries <path>" statement
-   *
-   * @param pathList column path
-   */
-  private List<List<String>> queryTimeSeriesLocally(List<String> pathList, String groupId,
-      SingleQPTask task)
-      throws InterruptedException, ProcessorException {
-    final byte[] reqContext = RaftUtils.createRaftRequestContext();
-    DataPartitionRaftHolder dataPartitionHolder = RaftUtils.getDataPartitonRaftHolder(groupId);
+  public Metadata processMetadataQuery()
+      throws InterruptedException, ProcessorException, PathErrorException {
+    Set<String> groupIdSet = router.getAllGroupId();
 
-    /** Check consistency level**/
-    if (readMetadataConsistencyLevel == ClusterConstant.WEAK_CONSISTENCY_LEVEL) {
-      QueryTimeSeriesResponse response = QueryTimeSeriesResponse
-          .createEmptyResponse(groupId);
+    Metadata[] metadatas = new Metadata[groupIdSet.size()];
+    List<SingleQPTask> taskList = new ArrayList<>();
+    for (String groupId : groupIdSet) {
+      QueryMetadataRequest request = new QueryMetadataRequest(groupId,
+          readMetadataConsistencyLevel);
+      SingleQPTask task = new SingleQPTask(false, request);
+      taskList.add(task);
+
+      LOGGER.debug("Execute query metadata statement for group {}.", groupId);
+      PeerId holder;
+      /** Check if the plan can be executed locally. **/
+      if (canHandleQueryByGroupId(groupId)) {
+        LOGGER.debug("Execute query metadata statement locally for group {} by sending request to local node.", groupId);
+        holder = this.server.getServerId();
+      } else {
+        holder = RaftUtils.getRandomPeerID(groupId);
+      }
       try {
-        for (String path : pathList) {
-          response.addTimeSeries(mManager.getShowTimeseriesPath(path));
+        asyncSendNonQueryTask(task, holder, 0);
+      } catch (RaftConnectionException e) {
+        throw new ProcessorException("Raft connection occurs error.", e);
+      }
+    }
+    for (int i = 0; i < taskList.size(); i++) {
+      SingleQPTask task = taskList.get(i);
+      task.await();
+      BasicResponse response = task.getResponse();
+      if (response == null || !response.isSuccess()) {
+        String errorMessage = "response is null";
+        if (response != null && response.getErrorMsg() != null) {
+          errorMessage = response.getErrorMsg();
         }
-      } catch (final PathErrorException e) {
-        response = QueryTimeSeriesResponse.createErrorResponse(groupId, e.getMessage());
+        throw new ProcessorException("Execute query metadata statement false because " + errorMessage);
       }
-      task.run(response);
+      metadatas[i] = ((QueryMetadataResponse)response).getMetadata();
+    }
+    return Metadata.combineMetadatas(metadatas);
+  }
+
+  public TSDataType processSeriesTypeQuery(String path)
+      throws InterruptedException, ProcessorException, PathErrorException {
+    TSDataType dataType = null;
+    List<String> storageGroupList = mManager.getAllFileNamesByPath(path);
+    if (storageGroupList.size() != 1) {
+      throw new PathErrorException("path " + path + " is not valid.");
     } else {
-      ((RaftService) dataPartitionHolder.getService()).getNode()
-          .readIndex(reqContext, new ReadIndexClosure() {
+      String groupId = getGroupIdBySG(storageGroupList.get(0));
+      QuerySeriesTypeRequest request = new QuerySeriesTypeRequest(groupId,
+          readMetadataConsistencyLevel, path);
+      SingleQPTask task = new SingleQPTask(false, request);
 
-            @Override
-            public void run(Status status, long index, byte[] reqCtx) {
-              QueryTimeSeriesResponse response = QueryTimeSeriesResponse
-                  .createEmptyResponse(groupId);
-              if (status.isOk()) {
-                try {
-                  LOGGER.debug("start to read");
-                  for (String path : pathList) {
-                    response.addTimeSeries(mManager.getShowTimeseriesPath(path));
-                  }
-                } catch (final PathErrorException e) {
-                  response = QueryTimeSeriesResponse.createErrorResponse(groupId, e.getMessage());
-                }
-              } else {
-                response = QueryTimeSeriesResponse
-                    .createErrorResponse(groupId, status.getErrorMsg());
-              }
-              task.run(response);
-            }
-          });
+      LOGGER.debug("Execute get series type for {} statement for group {}.", path, groupId);
+      PeerId holder;
+      /** Check if the plan can be executed locally. **/
+      if (canHandleQueryByGroupId(groupId)) {
+        LOGGER.debug("Execute get series type for {} statement locally for group {} by sending request to local node.", path, groupId);
+        holder = this.server.getServerId();
+      } else {
+        holder = RaftUtils.getRandomPeerID(groupId);
+      }
+      try {
+        dataType = querySeriesType(task, holder);
+      } catch (RaftConnectionException e) {
+        throw new ProcessorException("Raft connection occurs error.", e);
+      }
     }
-    task.await();
-    QueryTimeSeriesResponse response = (QueryTimeSeriesResponse) task.getResponse();
-    if (response == null || !response.isSuccess()) {
-      LOGGER.error("Execute show timeseries {} statement false.", pathList);
-      throw new ProcessorException();
+    return dataType;
+  }
+
+  /**
+   * Handle show timeseries <path> statement
+   */
+  public List<String> processPathsQuery(String path)
+      throws InterruptedException, PathErrorException, ProcessorException {
+    List<String> res = new ArrayList<>();
+    List<String> storageGroupList = mManager.getAllFileNamesByPath(path);
+    if (storageGroupList.isEmpty()) {
+      return new ArrayList<>();
+    } else {
+      Map<String, Set<String>> groupIdSGMap = classifySGByGroupId(storageGroupList);
+      for (Entry<String, Set<String>> entry : groupIdSGMap.entrySet()) {
+        List<String> paths = getSubQueryPaths(entry.getValue(), path);
+        String groupId = entry.getKey();
+        handlePathsQuery(groupId, paths, res);
+      }
+    }
+    return res;
+  }
+
+  /**
+   * Handle query timeseries in one data group
+   *
+   * @param groupId data group id
+   */
+  private void handlePathsQuery(String groupId, List<String> pathList, List<String> res)
+      throws ProcessorException, InterruptedException {
+    QueryPathsRequest request = new QueryPathsRequest(groupId,
+        readMetadataConsistencyLevel, pathList);
+    SingleQPTask task = new SingleQPTask(false, request);
+
+    LOGGER.debug("Execute get paths for {} statement for group {}.", pathList, groupId);
+    PeerId holder;
+    /** Check if the plan can be executed locally. **/
+    if (canHandleQueryByGroupId(groupId)) {
+      LOGGER.debug("Execute get paths for {} statement locally for group {} by sending request to local node.", pathList, groupId);
+      holder = this.server.getServerId();
+    } else {
+      holder = RaftUtils.getRandomPeerID(groupId);
+    }
+    try {
+      res.addAll(queryPaths(task, holder));
+    } catch (RaftConnectionException e) {
+      throw new ProcessorException("Raft connection occurs error.", e);
     }
-    return response.getTimeSeries();
   }
 
   private List<List<String>> queryTimeSeries(SingleQPTask task, PeerId leader)
@@ -241,6 +311,13 @@ public class QueryMetadataExecutor extends ClusterQPExecutor {
         : ((QueryTimeSeriesResponse) response).getTimeSeries();
   }
 
+  private TSDataType querySeriesType(SingleQPTask task, PeerId leader)
+      throws InterruptedException, RaftConnectionException {
+    BasicResponse response = asyncHandleNonQueryTaskGetRes(task, leader, 0);
+    return response == null ? null
+        : ((QuerySeriesTypeResponse) response).getDataType();
+  }
+
   /**
    * Handle "show storage group" statement locally
    *
@@ -286,39 +363,11 @@ public class QueryMetadataExecutor extends ClusterQPExecutor {
     return ((QueryStorageGroupResponse) task.getResponse()).getStorageGroups();
   }
 
-  /**
-   * Handle "show timeseries" statement
-   */
-  private void asyncQueryMetadataInStringLocally(String groupId, SingleQPTask task) {
-    final byte[] reqContext = RaftUtils.createRaftRequestContext();
-    DataPartitionRaftHolder dataPartitionHolder = (DataPartitionRaftHolder) server
-        .getDataPartitionHolder(groupId);
-    if (readMetadataConsistencyLevel == ClusterConstant.WEAK_CONSISTENCY_LEVEL) {
-      QueryMetadataInStringResponse response = QueryMetadataInStringResponse
-          .createSuccessResponse(groupId, mManager.getMetadataInString());
-      response.addResult(true);
-      task.run(response);
-    } else {
-      ((RaftService) dataPartitionHolder.getService()).getNode()
-          .readIndex(reqContext, new ReadIndexClosure() {
-
-            @Override
-            public void run(Status status, long index, byte[] reqCtx) {
-              QueryMetadataInStringResponse response;
-              if (status.isOk()) {
-                LOGGER.debug("start to read");
-                response = QueryMetadataInStringResponse
-                    .createSuccessResponse(groupId, mManager.getMetadataInString());
-                response.addResult(true);
-              } else {
-                response = QueryMetadataInStringResponse
-                    .createErrorResponse(groupId, status.getErrorMsg());
-                response.addResult(false);
-              }
-              task.run(response);
-            }
-          });
-    }
+  private List<String> queryPaths(SingleQPTask task, PeerId leader)
+      throws InterruptedException, RaftConnectionException {
+    BasicResponse response = asyncHandleNonQueryTaskGetRes(task, leader, 0);
+    return response == null ? new ArrayList<>()
+        : ((QueryPathsResponse) response).getPaths();
   }
 
   /**
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
index 4d947df..6ea12e4 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
@@ -66,6 +66,8 @@ public class RaftNodeAsClientManager {
    */
   private final RaftNodeAsClient client = new RaftNodeAsClient();
 
+  private boolean clientInited = false;
+
   /**
    * Number of clients in use
    */
@@ -90,6 +92,10 @@ public class RaftNodeAsClientManager {
    */
   public RaftNodeAsClient getRaftNodeAsClient() {
     try {
+      if (!clientInited) {
+        client.init();
+      }
+
       numLock.lock();
       if (validClientNum.get() < MAX_VALID_CLIENT_NUM) {
         validClientNum.incrementAndGet();
@@ -165,6 +171,7 @@ public class RaftNodeAsClientManager {
     private void init(){
       boltClientService = new BoltCliClientService();
       boltClientService.init(new CliOptions());
+      clientInited = true;
     }
 
     @Override
@@ -229,6 +236,7 @@ public class RaftNodeAsClientManager {
     @Override
     public void shutdown() {
       boltClientService.shutdown();
+      clientInited = false;
     }
 
   }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryMetadataInStringAsyncProcessor.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryMetadataAsyncProcessor.java
similarity index 60%
copy from cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryMetadataInStringAsyncProcessor.java
copy to cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryMetadataAsyncProcessor.java
index 5c81756..176fa33 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryMetadataInStringAsyncProcessor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryMetadataAsyncProcessor.java
@@ -25,41 +25,54 @@ import com.alipay.sofa.jraft.closure.ReadIndexClosure;
 import org.apache.iotdb.cluster.config.ClusterConstant;
 import org.apache.iotdb.cluster.entity.raft.DataPartitionRaftHolder;
 import org.apache.iotdb.cluster.entity.raft.RaftService;
-import org.apache.iotdb.cluster.rpc.raft.request.QueryMetadataInStringRequest;
-import org.apache.iotdb.cluster.rpc.raft.response.QueryMetadataInStringResponse;
+import org.apache.iotdb.cluster.rpc.raft.request.QueryMetadataRequest;
+import org.apache.iotdb.cluster.rpc.raft.response.QueryMetadataResponse;
 import org.apache.iotdb.cluster.utils.RaftUtils;
+import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.db.metadata.MManager;
 
-public class QueryMetadataInStringAsyncProcessor extends
-    BasicAsyncUserProcessor<QueryMetadataInStringRequest> {
+public class QueryMetadataAsyncProcessor extends
+    BasicAsyncUserProcessor<QueryMetadataRequest> {
 
   private MManager mManager = MManager.getInstance();
 
   @Override
   public void handleRequest(BizContext bizContext, AsyncContext asyncContext,
-      QueryMetadataInStringRequest request) {
+      QueryMetadataRequest request) {
     String groupId = request.getGroupID();
-    final byte[] reqContext = RaftUtils.createRaftRequestContext();
-    DataPartitionRaftHolder dataPartitionHolder = RaftUtils.getDataPartitonRaftHolder(groupId);
 
     if (request.getReadConsistencyLevel() == ClusterConstant.WEAK_CONSISTENCY_LEVEL) {
-      QueryMetadataInStringResponse response = QueryMetadataInStringResponse
-          .createSuccessResponse(groupId, mManager.getMetadataInString());
-      response.addResult(true);
+      QueryMetadataResponse response = null;
+      try {
+        response = QueryMetadataResponse
+            .createSuccessResponse(groupId, mManager.getMetadata());
+        response.addResult(true);
+      } catch (PathErrorException e) {
+        response = QueryMetadataResponse.createErrorResponse(groupId, e.getMessage());
+        response.addResult(false);
+      }
       asyncContext.sendResponse(response);
     } else {
+      final byte[] reqContext = RaftUtils.createRaftRequestContext();
+      DataPartitionRaftHolder dataPartitionHolder = RaftUtils.getDataPartitonRaftHolder(groupId);
+
       ((RaftService) dataPartitionHolder.getService()).getNode()
           .readIndex(reqContext, new ReadIndexClosure() {
 
             @Override
             public void run(Status status, long index, byte[] reqCtx) {
-              QueryMetadataInStringResponse response;
+              QueryMetadataResponse response;
               if (status.isOk()) {
-                response = QueryMetadataInStringResponse
-                    .createSuccessResponse(groupId, mManager.getMetadataInString());
-                response.addResult(true);
+                try {
+                  response = QueryMetadataResponse
+                      .createSuccessResponse(groupId, mManager.getMetadata());
+                  response.addResult(true);
+                } catch (PathErrorException e) {
+                  response = QueryMetadataResponse.createErrorResponse(groupId, e.getMessage());
+                  response.addResult(false);
+                }
               } else {
-                response = QueryMetadataInStringResponse
+                response = QueryMetadataResponse
                     .createErrorResponse(groupId, status.getErrorMsg());
                 response.addResult(false);
               }
@@ -71,6 +84,6 @@ public class QueryMetadataInStringAsyncProcessor extends
 
   @Override
   public String interest() {
-    return QueryMetadataInStringRequest.class.getName();
+    return QueryMetadataRequest.class.getName();
   }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryMetadataInStringAsyncProcessor.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryMetadataInStringAsyncProcessor.java
index 5c81756..b80f4ae 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryMetadataInStringAsyncProcessor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryMetadataInStringAsyncProcessor.java
@@ -39,8 +39,6 @@ public class QueryMetadataInStringAsyncProcessor extends
   public void handleRequest(BizContext bizContext, AsyncContext asyncContext,
       QueryMetadataInStringRequest request) {
     String groupId = request.getGroupID();
-    final byte[] reqContext = RaftUtils.createRaftRequestContext();
-    DataPartitionRaftHolder dataPartitionHolder = RaftUtils.getDataPartitonRaftHolder(groupId);
 
     if (request.getReadConsistencyLevel() == ClusterConstant.WEAK_CONSISTENCY_LEVEL) {
       QueryMetadataInStringResponse response = QueryMetadataInStringResponse
@@ -48,6 +46,9 @@ public class QueryMetadataInStringAsyncProcessor extends
       response.addResult(true);
       asyncContext.sendResponse(response);
     } else {
+      final byte[] reqContext = RaftUtils.createRaftRequestContext();
+      DataPartitionRaftHolder dataPartitionHolder = RaftUtils.getDataPartitonRaftHolder(groupId);
+
       ((RaftService) dataPartitionHolder.getService()).getNode()
           .readIndex(reqContext, new ReadIndexClosure() {
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryTimeSeriesAsyncProcessor.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryPathsAsyncProcessor.java
similarity index 64%
copy from cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryTimeSeriesAsyncProcessor.java
copy to cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryPathsAsyncProcessor.java
index a800302..f54aba0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryTimeSeriesAsyncProcessor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryPathsAsyncProcessor.java
@@ -25,50 +25,55 @@ import com.alipay.sofa.jraft.closure.ReadIndexClosure;
 import org.apache.iotdb.cluster.config.ClusterConstant;
 import org.apache.iotdb.cluster.entity.raft.DataPartitionRaftHolder;
 import org.apache.iotdb.cluster.entity.raft.RaftService;
-import org.apache.iotdb.cluster.rpc.raft.request.QueryTimeSeriesRequest;
-import org.apache.iotdb.cluster.rpc.raft.response.QueryTimeSeriesResponse;
+import org.apache.iotdb.cluster.rpc.raft.request.QueryPathsRequest;
+import org.apache.iotdb.cluster.rpc.raft.response.QueryPathsResponse;
 import org.apache.iotdb.cluster.utils.RaftUtils;
 import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.db.metadata.MManager;
 
-
-public class QueryTimeSeriesAsyncProcessor extends BasicAsyncUserProcessor<QueryTimeSeriesRequest> {
+public class QueryPathsAsyncProcessor extends BasicAsyncUserProcessor<QueryPathsRequest> {
 
   private MManager mManager = MManager.getInstance();
 
   @Override
   public void handleRequest(BizContext bizContext, AsyncContext asyncContext,
-      QueryTimeSeriesRequest request) {
+      QueryPathsRequest request) {
     String groupId = request.getGroupID();
-    final byte[] reqContext = RaftUtils.createRaftRequestContext();
-    DataPartitionRaftHolder dataPartitionHolder = RaftUtils.getDataPartitonRaftHolder(groupId);
 
     if (request.getReadConsistencyLevel() == ClusterConstant.WEAK_CONSISTENCY_LEVEL) {
-      QueryTimeSeriesResponse response = QueryTimeSeriesResponse
+      QueryPathsResponse response = QueryPathsResponse
           .createEmptyResponse(groupId);
       try {
-        queryTimeSeries(request, response);
+        queryPaths(request, response);
+        response.addResult(true);
       } catch (final PathErrorException e) {
-        response = QueryTimeSeriesResponse.createErrorResponse(groupId, e.getMessage());
+        response = QueryPathsResponse.createErrorResponse(groupId, e.getMessage());
+        response.addResult(false);
       }
       asyncContext.sendResponse(response);
     } else {
+      final byte[] reqContext = RaftUtils.createRaftRequestContext();
+      DataPartitionRaftHolder dataPartitionHolder = RaftUtils.getDataPartitonRaftHolder(groupId);
+
       ((RaftService) dataPartitionHolder.getService()).getNode()
           .readIndex(reqContext, new ReadIndexClosure() {
 
             @Override
             public void run(Status status, long index, byte[] reqCtx) {
-              QueryTimeSeriesResponse response = QueryTimeSeriesResponse
+              QueryPathsResponse response = QueryPathsResponse
                   .createEmptyResponse(groupId);
               if (status.isOk()) {
                 try {
-                  queryTimeSeries(request, response);
+                  queryPaths(request, response);
+                  response.addResult(true);
                 } catch (final PathErrorException e) {
-                  response = QueryTimeSeriesResponse.createErrorResponse(groupId, e.getMessage());
+                  response = QueryPathsResponse.createErrorResponse(groupId, e.getMessage());
+                  response.addResult(false);
                 }
               } else {
-                response = QueryTimeSeriesResponse
+                response = QueryPathsResponse
                     .createErrorResponse(groupId, status.getErrorMsg());
+                response.addResult(false);
               }
               asyncContext.sendResponse(response);
             }
@@ -77,17 +82,17 @@ public class QueryTimeSeriesAsyncProcessor extends BasicAsyncUserProcessor<Query
   }
 
   /**
-   * Query timeseries
+   * Query paths
    */
-  private void queryTimeSeries(QueryTimeSeriesRequest queryMetadataRequest,
-      QueryTimeSeriesResponse response) throws PathErrorException {
-    for (String path : queryMetadataRequest.getPath()) {
-      response.addTimeSeries(mManager.getShowTimeseriesPath(path));
+  private void queryPaths(QueryPathsRequest request,
+      QueryPathsResponse response) throws PathErrorException {
+    for (String path : request.getPath()) {
+      response.addPaths(mManager.getPaths(path));
     }
   }
 
   @Override
   public String interest() {
-    return QueryTimeSeriesRequest.class.getName();
+    return QueryPathsRequest.class.getName();
   }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryTimeSeriesAsyncProcessor.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QuerySeriesTypeAsyncProcessor.java
similarity index 64%
copy from cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryTimeSeriesAsyncProcessor.java
copy to cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QuerySeriesTypeAsyncProcessor.java
index a800302..f0a4fc6 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryTimeSeriesAsyncProcessor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QuerySeriesTypeAsyncProcessor.java
@@ -25,50 +25,53 @@ import com.alipay.sofa.jraft.closure.ReadIndexClosure;
 import org.apache.iotdb.cluster.config.ClusterConstant;
 import org.apache.iotdb.cluster.entity.raft.DataPartitionRaftHolder;
 import org.apache.iotdb.cluster.entity.raft.RaftService;
-import org.apache.iotdb.cluster.rpc.raft.request.QueryTimeSeriesRequest;
-import org.apache.iotdb.cluster.rpc.raft.response.QueryTimeSeriesResponse;
+import org.apache.iotdb.cluster.rpc.raft.request.QuerySeriesTypeRequest;
+import org.apache.iotdb.cluster.rpc.raft.response.QuerySeriesTypeResponse;
 import org.apache.iotdb.cluster.utils.RaftUtils;
 import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.db.metadata.MManager;
 
-
-public class QueryTimeSeriesAsyncProcessor extends BasicAsyncUserProcessor<QueryTimeSeriesRequest> {
+public class QuerySeriesTypeAsyncProcessor extends BasicAsyncUserProcessor<QuerySeriesTypeRequest> {
 
   private MManager mManager = MManager.getInstance();
 
   @Override
   public void handleRequest(BizContext bizContext, AsyncContext asyncContext,
-      QueryTimeSeriesRequest request) {
+      QuerySeriesTypeRequest request) {
     String groupId = request.getGroupID();
-    final byte[] reqContext = RaftUtils.createRaftRequestContext();
-    DataPartitionRaftHolder dataPartitionHolder = RaftUtils.getDataPartitonRaftHolder(groupId);
 
     if (request.getReadConsistencyLevel() == ClusterConstant.WEAK_CONSISTENCY_LEVEL) {
-      QueryTimeSeriesResponse response = QueryTimeSeriesResponse
-          .createEmptyResponse(groupId);
+      QuerySeriesTypeResponse response;
       try {
-        queryTimeSeries(request, response);
+        response = QuerySeriesTypeResponse.createSuccessResponse(groupId, mManager.getSeriesType(request.getPath()));
+        response.addResult(true);
       } catch (final PathErrorException e) {
-        response = QueryTimeSeriesResponse.createErrorResponse(groupId, e.getMessage());
+        response = QuerySeriesTypeResponse.createErrorResponse(groupId, e.getMessage());
+        response.addResult(false);
       }
       asyncContext.sendResponse(response);
     } else {
+      final byte[] reqContext = RaftUtils.createRaftRequestContext();
+      DataPartitionRaftHolder dataPartitionHolder = RaftUtils.getDataPartitonRaftHolder(groupId);
+
       ((RaftService) dataPartitionHolder.getService()).getNode()
           .readIndex(reqContext, new ReadIndexClosure() {
 
             @Override
             public void run(Status status, long index, byte[] reqCtx) {
-              QueryTimeSeriesResponse response = QueryTimeSeriesResponse
-                  .createEmptyResponse(groupId);
+              QuerySeriesTypeResponse response;
               if (status.isOk()) {
                 try {
-                  queryTimeSeries(request, response);
+                  response = QuerySeriesTypeResponse.createSuccessResponse(groupId, mManager.getSeriesType(request.getPath()));
+                  response.addResult(true);
                 } catch (final PathErrorException e) {
-                  response = QueryTimeSeriesResponse.createErrorResponse(groupId, e.getMessage());
+                  response = QuerySeriesTypeResponse.createErrorResponse(groupId, e.getMessage());
+                  response.addResult(false);
                 }
               } else {
-                response = QueryTimeSeriesResponse
+                response = QuerySeriesTypeResponse
                     .createErrorResponse(groupId, status.getErrorMsg());
+                response.addResult(false);
               }
               asyncContext.sendResponse(response);
             }
@@ -76,18 +79,8 @@ public class QueryTimeSeriesAsyncProcessor extends BasicAsyncUserProcessor<Query
     }
   }
 
-  /**
-   * Query timeseries
-   */
-  private void queryTimeSeries(QueryTimeSeriesRequest queryMetadataRequest,
-      QueryTimeSeriesResponse response) throws PathErrorException {
-    for (String path : queryMetadataRequest.getPath()) {
-      response.addTimeSeries(mManager.getShowTimeseriesPath(path));
-    }
-  }
-
   @Override
   public String interest() {
-    return QueryTimeSeriesRequest.class.getName();
+    return QuerySeriesTypeRequest.class.getName();
   }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryTimeSeriesAsyncProcessor.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryTimeSeriesAsyncProcessor.java
index a800302..c41fdcf 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryTimeSeriesAsyncProcessor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryTimeSeriesAsyncProcessor.java
@@ -40,19 +40,22 @@ public class QueryTimeSeriesAsyncProcessor extends BasicAsyncUserProcessor<Query
   public void handleRequest(BizContext bizContext, AsyncContext asyncContext,
       QueryTimeSeriesRequest request) {
     String groupId = request.getGroupID();
-    final byte[] reqContext = RaftUtils.createRaftRequestContext();
-    DataPartitionRaftHolder dataPartitionHolder = RaftUtils.getDataPartitonRaftHolder(groupId);
 
     if (request.getReadConsistencyLevel() == ClusterConstant.WEAK_CONSISTENCY_LEVEL) {
       QueryTimeSeriesResponse response = QueryTimeSeriesResponse
           .createEmptyResponse(groupId);
       try {
         queryTimeSeries(request, response);
+        response.addResult(true);
       } catch (final PathErrorException e) {
         response = QueryTimeSeriesResponse.createErrorResponse(groupId, e.getMessage());
+        response.addResult(false);
       }
       asyncContext.sendResponse(response);
     } else {
+      final byte[] reqContext = RaftUtils.createRaftRequestContext();
+      DataPartitionRaftHolder dataPartitionHolder = RaftUtils.getDataPartitonRaftHolder(groupId);
+
       ((RaftService) dataPartitionHolder.getService()).getNode()
           .readIndex(reqContext, new ReadIndexClosure() {
 
@@ -63,12 +66,15 @@ public class QueryTimeSeriesAsyncProcessor extends BasicAsyncUserProcessor<Query
               if (status.isOk()) {
                 try {
                   queryTimeSeries(request, response);
+                  response.addResult(true);
                 } catch (final PathErrorException e) {
                   response = QueryTimeSeriesResponse.createErrorResponse(groupId, e.getMessage());
+                  response.addResult(false);
                 }
               } else {
                 response = QueryTimeSeriesResponse
                     .createErrorResponse(groupId, status.getErrorMsg());
+                response.addResult(false);
               }
               asyncContext.sendResponse(response);
             }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QueryMetadataRequest.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QueryMetadataRequest.java
new file mode 100644
index 0000000..2628fb6
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QueryMetadataRequest.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.rpc.raft.request;
+
+import java.io.Serializable;
+
+public class QueryMetadataRequest extends BasicQueryRequest implements Serializable {
+
+  public QueryMetadataRequest(String groupID, int readConsistencyLevel) {
+    super(groupID, readConsistencyLevel);
+  }
+}
\ No newline at end of file
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QueryPathsRequest.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QueryPathsRequest.java
new file mode 100644
index 0000000..2c600f4
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QueryPathsRequest.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.rpc.raft.request;
+
+import java.io.Serializable;
+import java.util.List;
+
+public class QueryPathsRequest extends BasicQueryRequest implements Serializable {
+
+  private List<String> path;
+
+  public QueryPathsRequest(String groupID, int readConsistencyLevel, List<String> path) {
+    super(groupID, readConsistencyLevel);
+    this.path = path;
+  }
+
+  public List<String> getPath() {
+    return path;
+  }
+}
\ No newline at end of file
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QuerySeriesTypeRequest.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QuerySeriesTypeRequest.java
new file mode 100644
index 0000000..c486576
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QuerySeriesTypeRequest.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.rpc.raft.request;
+
+import java.io.Serializable;
+
+public class QuerySeriesTypeRequest extends BasicQueryRequest implements Serializable {
+
+  private String path;
+
+  public QuerySeriesTypeRequest(String groupID, int readConsistencyLevel, String path) {
+    super(groupID, readConsistencyLevel);
+    this.path = path;
+  }
+
+  public String getPath() {
+    return path;
+  }
+}
\ No newline at end of file
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/QueryMetadataResponse.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/QueryMetadataResponse.java
new file mode 100644
index 0000000..6c21798
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/QueryMetadataResponse.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.rpc.raft.response;
+
+import org.apache.iotdb.db.metadata.Metadata;
+
+public class QueryMetadataResponse extends BasicResponse {
+
+  private Metadata metadata;
+
+  private QueryMetadataResponse(String groupId, boolean redirected, String leaderStr,
+      String errorMsg) {
+    super(groupId, redirected, leaderStr, errorMsg);
+  }
+
+  public static QueryMetadataResponse createSuccessResponse(String groupId,
+      Metadata metadata) {
+    QueryMetadataResponse response = new QueryMetadataResponse(groupId, false, null,
+        null);
+    response.metadata = metadata;
+    return response;
+  }
+
+  public static QueryMetadataResponse createErrorResponse(String groupId, String errorMsg) {
+    return new QueryMetadataResponse(groupId, false, null, errorMsg);
+  }
+
+  public Metadata getMetadata() {
+    return metadata;
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/QueryPathsResponse.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/QueryPathsResponse.java
new file mode 100644
index 0000000..29d659a
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/QueryPathsResponse.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.rpc.raft.response;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class QueryPathsResponse extends BasicResponse {
+
+  private List<String> paths;
+
+  private QueryPathsResponse(String groupId, boolean redirected, boolean success, String leaderStr, String errorMsg) {
+    super(groupId, redirected, leaderStr, errorMsg);
+    this.addResult(success);
+    paths = new ArrayList<>();
+  }
+
+  public static QueryPathsResponse createEmptyResponse(String groupId){
+    return new QueryPathsResponse(groupId, false, true, null, null);
+  }
+
+  public static QueryPathsResponse createErrorResponse(String groupId, String errorMsg) {
+    return new QueryPathsResponse(groupId, false, false, null, errorMsg);
+  }
+
+  public List<String> getPaths() {
+    return paths;
+  }
+
+  public void addPaths(List<String> paths){
+    this.paths.addAll(paths);
+  }
+
+}
\ No newline at end of file
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/QuerySeriesTypeResponse.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/QuerySeriesTypeResponse.java
new file mode 100644
index 0000000..e86e108
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/QuerySeriesTypeResponse.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.rpc.raft.response;
+
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+public class QuerySeriesTypeResponse extends BasicResponse {
+
+  private TSDataType dataType;
+
+  private QuerySeriesTypeResponse(String groupId, boolean redirected, String leaderStr,
+      String errorMsg) {
+    super(groupId, redirected, leaderStr, errorMsg);
+  }
+
+  public static QuerySeriesTypeResponse createSuccessResponse(String groupId, TSDataType dataType) {
+    QuerySeriesTypeResponse response = new QuerySeriesTypeResponse(groupId, false, null,
+        null);
+    response.dataType = dataType;
+    return response;
+  }
+
+  public static QuerySeriesTypeResponse createErrorResponse(String groupId, String errorMsg) {
+    return new QuerySeriesTypeResponse(groupId, false, null, errorMsg);
+  }
+
+  public TSDataType getDataType() {
+    return dataType;
+  }
+
+  public void setDataType(TSDataType dataType) {
+    this.dataType = dataType;
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/service/TSServiceClusterImpl.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/service/TSServiceClusterImpl.java
index ba4e59d..33e3e81 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/service/TSServiceClusterImpl.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/service/TSServiceClusterImpl.java
@@ -34,11 +34,13 @@ import org.apache.iotdb.db.auth.AuthException;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.metadata.Metadata;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.service.TSServiceImpl;
 import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementReq;
 import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementResp;
 import org.apache.iotdb.service.rpc.thrift.TS_StatusCode;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
@@ -244,6 +246,23 @@ public class TSServiceClusterImpl extends TSServiceImpl {
     return queryMetadataExecutor.get().processMetadataInStringQuery();
   }
 
+  @Override
+  protected Metadata getMetadata()
+      throws InterruptedException, ProcessorException, PathErrorException {
+    return queryMetadataExecutor.get().processMetadataQuery();
+  }
+
+  @Override
+  protected TSDataType getSeriesType(String path) throws PathErrorException, InterruptedException, ProcessorException {
+    return queryMetadataExecutor.get().processSeriesTypeQuery(path);
+  }
+
+  @Override
+  protected List<String> getPaths(String path)
+      throws PathErrorException, InterruptedException, ProcessorException {
+    return queryMetadataExecutor.get().processPathsQuery(path);
+  }
+
   @OnlyForTest
   public NonQueryExecutor getNonQueryExecutor() {
     return nonQueryExecutor.get();
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/metadata/Metadata.java b/iotdb/src/main/java/org/apache/iotdb/db/metadata/Metadata.java
index 40fd0e4..5c56fe1 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/metadata/Metadata.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/metadata/Metadata.java
@@ -18,15 +18,21 @@
  */
 package org.apache.iotdb.db.metadata;
 
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
 import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 /**
  * This class stores all the metadata info for every deviceId and every timeseries.
  */
-public class Metadata {
+public class Metadata implements Serializable {
 
   private Map<String, List<MeasurementSchema>> seriesMap;
   private Map<String, List<String>> deviceIdMap;
@@ -67,9 +73,122 @@ public class Metadata {
     return deviceIdMap;
   }
 
+  /**
+   * combine multiple metadatas
+   */
+  public static Metadata combineMetadatas(Metadata[] metadatas) {
+    Map<String, List<MeasurementSchema>> seriesMap = new HashMap<>();
+    Map<String, List<String>> deviceIdMap = new HashMap<>();
+    Map<String, Map<String, MeasurementSchema>> typeSchemaMap = new HashMap<>();
+
+    if (metadatas == null || metadatas.length == 0) {
+      return new Metadata(seriesMap, deviceIdMap);
+    }
+
+    for (int i = 0; i < metadatas.length; i++) {
+      Map<String, List<MeasurementSchema>> subSeriesMap = metadatas[i].seriesMap;
+      for (Entry<String, List<MeasurementSchema>> entry : subSeriesMap.entrySet()) {
+        Map<String, MeasurementSchema> map;
+        if (typeSchemaMap.containsKey(entry.getKey())) {
+          map = typeSchemaMap.get(entry.getKey());
+        } else {
+          map = new HashMap<>();
+        }
+        entry.getValue().forEach(schema -> map.put(schema.getMeasurementId(), schema));
+        if (!typeSchemaMap.containsKey(entry.getKey())) {
+          typeSchemaMap.put(entry.getKey(), map);
+        }
+      }
+
+      Map<String, List<String>> subDeviceIdMap = metadatas[i].deviceIdMap;
+      for (Entry<String, List<String>> entry : subDeviceIdMap.entrySet()) {
+        List<String> list;
+        if (deviceIdMap.containsKey(entry.getKey())) {
+          list = deviceIdMap.get(entry.getKey());
+        } else {
+          list = new ArrayList<>();
+        }
+        list.addAll(entry.getValue());
+        if (!deviceIdMap.containsKey(entry.getKey())) {
+          deviceIdMap.put(entry.getKey(), list);
+        }
+      }
+    }
+
+    for (Entry<String, Map<String, MeasurementSchema>> entry : typeSchemaMap.entrySet()) {
+      List<MeasurementSchema> list = new ArrayList<>();
+      list.addAll(entry.getValue().values());
+      seriesMap.put(entry.getKey(), list);
+    }
+
+    return new Metadata(seriesMap, deviceIdMap);
+  }
+
   @Override
   public String toString() {
     return seriesMap.toString() + "\n" + deviceIdMap.toString();
   }
 
+  @Override
+  public boolean equals(Object obj) {
+    if(this == obj){
+      return true;
+    }
+    if(obj == null){
+      return false;
+    }
+    if(this.getClass() != obj.getClass()){
+      return false;
+    }
+
+    Metadata metadata = (Metadata) obj;
+    return seriesMapEquals(seriesMap, metadata.seriesMap) && deviceIdMapEquals(deviceIdMap, metadata.deviceIdMap);
+  }
+
+  /**
+   * only used to check if seriesMap is equal to another seriesMap
+   */
+  private boolean seriesMapEquals(Map<String, List<MeasurementSchema>> map1, Map<String, List<MeasurementSchema>> map2) {
+    if (!map1.keySet().equals(map2.keySet())) {
+      return false;
+    }
+
+    for (Entry<String, List<MeasurementSchema>> entry : map1.entrySet()) {
+      List list1 = entry.getValue();
+      List list2 = map2.get(entry.getKey());
+
+      if (!listEquals(list1, list2)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * only used to check if deviceIdMap is equal to another deviceIdMap
+   */
+  private boolean deviceIdMapEquals(Map<String, List<String>> map1, Map<String, List<String>> map2) {
+    if (!map1.keySet().equals(map2.keySet())) {
+      return false;
+    }
+
+    for (Entry<String, List<String>> entry : map1.entrySet()) {
+      List list1 = entry.getValue();
+      List list2 = map2.get(entry.getKey());
+
+      if (!listEquals(list1, list2)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private boolean listEquals(List list1, List list2) {
+    Set set1 = new HashSet();
+    set1.addAll(list1);
+    Set set2 = new HashSet();
+    set2.addAll(list2);
+
+    return set1.equals(set2);
+  }
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 79dbd53..299133c 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -81,6 +81,7 @@ import org.apache.iotdb.service.rpc.thrift.TS_SessionHandle;
 import org.apache.iotdb.service.rpc.thrift.TS_Status;
 import org.apache.iotdb.service.rpc.thrift.TS_StatusCode;
 import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 import org.apache.thrift.TException;
@@ -308,14 +309,14 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
         Metadata metadata;
         try {
           String column = req.getColumnPath();
-          metadata = MManager.getInstance().getMetadata();
+          metadata = getMetadata();
           Map<String, List<String>> deviceMap = metadata.getDeviceMap();
           if (deviceMap == null || !deviceMap.containsKey(column)) {
             resp.setColumnsList(new ArrayList<>());
           } else {
             resp.setColumnsList(deviceMap.get(column));
           }
-        } catch (PathErrorException e) {
+        } catch (PathErrorException | InterruptedException | ProcessorException e) {
           LOGGER.error("cannot get delta object map", e);
           status = getErrorStatus(String.format("Failed to fetch delta object map because: %s", e));
           resp.setStatus(status);
@@ -330,8 +331,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
         break;
       case "COLUMN":
         try {
-          resp.setDataType(MManager.getInstance().getSeriesType(req.getColumnPath()).toString());
-        } catch (PathErrorException e) {
+          resp.setDataType(getSeriesType(req.getColumnPath()).toString());
+        } catch (PathErrorException | InterruptedException | ProcessorException e) {
           // TODO aggregate seriesPath e.g. last(root.ln.wf01.wt01.status)
           // status = new TS_Status(TS_StatusCode.ERROR_STATUS);
           // status.setErrorMessage(String.format("Failed to fetch %s's data type because: %s",
@@ -343,8 +344,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
         break;
       case "ALL_COLUMNS":
         try {
-          resp.setColumnsList(MManager.getInstance().getPaths(req.getColumnPath()));
-        } catch (PathErrorException e) {
+          resp.setColumnsList(getPaths(req.getColumnPath()));
+        } catch (PathErrorException | InterruptedException | ProcessorException e) {
           status = getErrorStatus(String
               .format("Failed to fetch %s's all columns because: %s", req.getColumnPath(), e));
           resp.setStatus(status);
@@ -382,6 +383,18 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
     return MManager.getInstance().getMetadataInString();
   }
 
+  protected Metadata getMetadata() throws PathErrorException, InterruptedException, ProcessorException {
+    return MManager.getInstance().getMetadata();
+  }
+
+  protected TSDataType getSeriesType(String path) throws PathErrorException, InterruptedException, ProcessorException {
+    return MManager.getInstance().getSeriesType(path);
+  }
+
+  protected List<String> getPaths(String path) throws PathErrorException, InterruptedException, ProcessorException {
+    return MManager.getInstance().getPaths(path);
+  }
+
   /**
    * Judge whether the statement is ADMIN COMMAND and if true, executeWithGlobalTimeFilter it.
    *
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/metadata/MetadataTest.java b/iotdb/src/test/java/org/apache/iotdb/db/metadata/MetadataTest.java
new file mode 100644
index 0000000..9e1adc7
--- /dev/null
+++ b/iotdb/src/test/java/org/apache/iotdb/db/metadata/MetadataTest.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.metadata;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.iotdb.db.exception.PathErrorException;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class MetadataTest {
+
+  @Before
+  public void setUp() throws Exception {
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    EnvironmentUtils.cleanEnv();
+  }
+
+  @Test
+  public void testCombineMetadatas() {
+    MManager manager = MManager.getInstance();
+
+    try {
+      manager.setStorageLevelToMTree("root.t.d1");
+      manager.addPathToMTree("root.t.d1.s0", "INT32", "RLE");
+      manager.addPathToMTree("root.t.d1.s1", "DOUBLE", "RLE");
+      manager.setStorageLevelToMTree("root.t.d2");
+      manager.addPathToMTree("root.t.d2.s1", "DOUBLE", "RLE");
+      Metadata metadata1 = manager.getMetadata();
+
+      manager.clear();
+
+      manager.setStorageLevelToMTree("root.t.d3");
+      manager.addPathToMTree("root.t.d3.s1", "DOUBLE", "RLE");
+      manager.addPathToMTree("root.t.d3.s2", "TEXT", "RLE");
+      manager.setStorageLevelToMTree("root.t1.d1");
+      manager.addPathToMTree("root.t1.d1.s1", "DOUBLE", "RLE");
+      manager.addPathToMTree("root.t1.d1.s2", "TEXT", "RLE");
+      Metadata metadata2 = manager.getMetadata();
+
+      manager.clear();
+
+      manager.setStorageLevelToMTree("root.t.d1");
+      manager.addPathToMTree("root.t.d1.s0", "INT32", "RLE");
+      manager.addPathToMTree("root.t.d1.s1", "DOUBLE", "RLE");
+      manager.setStorageLevelToMTree("root.t.d2");
+      manager.addPathToMTree("root.t.d2.s1", "DOUBLE", "RLE");
+      manager.setStorageLevelToMTree("root.t.d3");
+      manager.addPathToMTree("root.t.d3.s1", "DOUBLE", "RLE");
+      manager.addPathToMTree("root.t.d3.s2", "TEXT", "RLE");
+      manager.setStorageLevelToMTree("root.t1.d1");
+      manager.addPathToMTree("root.t1.d1.s1", "DOUBLE", "RLE");
+      manager.addPathToMTree("root.t1.d1.s2", "TEXT", "RLE");
+      Metadata metadata = manager.getMetadata();
+
+      Metadata combineMetadata = Metadata.combineMetadatas(new Metadata[]{metadata1, metadata2});
+      assertTrue(metadata.equals(combineMetadata));
+    } catch (PathErrorException | IOException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+}
\ No newline at end of file