You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ea...@apache.org on 2019/04/11 02:53:19 UTC

[incubator-iotdb] 01/01: implement DELTA_OBEJECT, COLUMN, ALL_COLUMNS

This is an automated email from the ASF dual-hosted git repository.

east pushed a commit to branch cluster_metadata_query
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 8836850439791cbefc3b41483613d6fa8b552534
Author: mdf369 <95...@qq.com>
AuthorDate: Thu Apr 11 10:52:51 2019 +0800

    implement DELTA_OBEJECT, COLUMN, ALL_COLUMNS
---
 .../org/apache/iotdb/cluster/entity/Server.java    |   6 +
 .../cluster/qp/executor/QueryMetadataExecutor.java | 283 +++++++++++++++++++++
 .../processor/QueryMetadataAsyncProcessor.java     |  86 +++++++
 .../raft/processor/QueryPathsAsyncProcessor.java   |  92 +++++++
 .../processor/QuerySeriesTypeAsyncProcessor.java   |  80 ++++++
 .../rpc/raft/request/QueryMetadataRequest.java     |  28 ++
 .../rpc/raft/request/QueryPathsRequest.java        |  36 +++
 .../rpc/raft/request/QuerySeriesTypeRequest.java   |  35 +++
 .../rpc/raft/response/QueryMetadataResponse.java   |  47 ++++
 .../rpc/raft/response/QueryPathsResponse.java      |  50 ++++
 .../rpc/raft/response/QuerySeriesTypeResponse.java |  50 ++++
 .../cluster/rpc/service/TSServiceClusterImpl.java  |  20 ++
 .../org/apache/iotdb/db/metadata/Metadata.java     |  54 ++++
 .../org/apache/iotdb/db/service/TSServiceImpl.java |  25 +-
 14 files changed, 886 insertions(+), 6 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java b/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
index be4a74a..ad1b4d6 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
@@ -32,7 +32,10 @@ import org.apache.iotdb.cluster.entity.raft.MetadataRaftHolder;
 import org.apache.iotdb.cluster.rpc.raft.impl.RaftNodeAsClientManager;
 import org.apache.iotdb.cluster.rpc.raft.processor.DataGroupNonQueryAsyncProcessor;
 import org.apache.iotdb.cluster.rpc.raft.processor.MetaGroupNonQueryAsyncProcessor;
+import org.apache.iotdb.cluster.rpc.raft.processor.QueryMetadataAsyncProcessor;
 import org.apache.iotdb.cluster.rpc.raft.processor.QueryMetadataInStringAsyncProcessor;
+import org.apache.iotdb.cluster.rpc.raft.processor.QueryPathsAsyncProcessor;
+import org.apache.iotdb.cluster.rpc.raft.processor.QuerySeriesTypeAsyncProcessor;
 import org.apache.iotdb.cluster.rpc.raft.processor.QueryTimeSeriesAsyncProcessor;
 import org.apache.iotdb.cluster.utils.RaftUtils;
 import org.apache.iotdb.cluster.utils.hash.PhysicalNode;
@@ -93,6 +96,9 @@ public class Server {
     rpcServer.registerUserProcessor(new MetaGroupNonQueryAsyncProcessor());
     rpcServer.registerUserProcessor(new QueryTimeSeriesAsyncProcessor());
     rpcServer.registerUserProcessor(new QueryMetadataInStringAsyncProcessor());
+    rpcServer.registerUserProcessor(new QueryMetadataAsyncProcessor());
+    rpcServer.registerUserProcessor(new QuerySeriesTypeAsyncProcessor());
+    rpcServer.registerUserProcessor(new QueryPathsAsyncProcessor());
 
     metadataHolder = new MetadataRaftHolder(peerIds, serverId, rpcServer, true);
     metadataHolder.init();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
index 8dc20b3..3be4e81 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
@@ -35,16 +35,24 @@ import org.apache.iotdb.cluster.entity.raft.RaftService;
 import org.apache.iotdb.cluster.exception.RaftConnectionException;
 import org.apache.iotdb.cluster.qp.ClusterQPExecutor;
 import org.apache.iotdb.cluster.rpc.raft.request.QueryMetadataInStringRequest;
+import org.apache.iotdb.cluster.rpc.raft.request.QueryMetadataRequest;
+import org.apache.iotdb.cluster.rpc.raft.request.QueryPathsRequest;
+import org.apache.iotdb.cluster.rpc.raft.request.QuerySeriesTypeRequest;
 import org.apache.iotdb.cluster.rpc.raft.request.QueryStorageGroupRequest;
 import org.apache.iotdb.cluster.rpc.raft.request.QueryTimeSeriesRequest;
 import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse;
 import org.apache.iotdb.cluster.rpc.raft.response.QueryMetadataInStringResponse;
+import org.apache.iotdb.cluster.rpc.raft.response.QueryMetadataResponse;
+import org.apache.iotdb.cluster.rpc.raft.response.QueryPathsResponse;
+import org.apache.iotdb.cluster.rpc.raft.response.QuerySeriesTypeResponse;
 import org.apache.iotdb.cluster.rpc.raft.response.QueryStorageGroupResponse;
 import org.apache.iotdb.cluster.rpc.raft.response.QueryTimeSeriesResponse;
 import org.apache.iotdb.cluster.utils.RaftUtils;
 import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.db.exception.ProcessorException;
 import org.apache.iotdb.db.metadata.MManager;
+import org.apache.iotdb.db.metadata.Metadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -177,6 +185,123 @@ public class QueryMetadataExecutor extends ClusterQPExecutor {
     return combineMetadataInStringList(metadataList);
   }
 
+  public Metadata processMetadataQuery()
+      throws InterruptedException, ProcessorException, PathErrorException {
+    Set<String> groupIdSet = router.getAllGroupId();
+
+    Metadata[] metadatas = new Metadata[groupIdSet.size()];
+    List<SingleQPTask> taskList = new ArrayList<>();
+    for (String groupId : groupIdSet) {
+      QueryMetadataRequest request = new QueryMetadataRequest(groupId,
+          readMetadataConsistencyLevel);
+      SingleQPTask task = new SingleQPTask(false, request);
+      taskList.add(task);
+
+      LOGGER.debug("Execute query metadata statement for group {}.", groupId);
+      /** Check if the plan can be executed locally. **/
+      if (canHandleQueryByGroupId(groupId)) {
+        LOGGER.debug("Execute query metadata statement locally for group {}.", groupId);
+        asyncQueryMetadataLocally(groupId, task);
+      } else {
+        try {
+          PeerId holder = RaftUtils.getRandomPeerID(groupId);
+          asyncSendNonQueryTask(task, holder, 0);
+        } catch (RaftConnectionException e) {
+          LOGGER.error(e.getMessage());
+          throw new ProcessorException("Raft connection occurs error.", e);
+        }
+      }
+    }
+    for (int i = 0; i < taskList.size(); i++) {
+      SingleQPTask task = taskList.get(i);
+      task.await();
+      BasicResponse response = task.getResponse();
+      if (response == null || !response.isSuccess()) {
+        LOGGER.error("Execute show timeseries statement false.");
+        throw new ProcessorException();
+      }
+      metadatas[i] = ((QueryMetadataResponse)response).getMetadata();
+    }
+    return Metadata.combineMetadatas(metadatas);
+  }
+
+  public TSDataType processSeriesTypeQuery(String path)
+      throws InterruptedException, ProcessorException, PathErrorException {
+    TSDataType dataType = null;
+    List<String> storageGroupList = mManager.getAllFileNamesByPath(path);
+    if (storageGroupList.size() != 1) {
+      throw new PathErrorException("path " + path + " is not valid.");
+    } else {
+      String groupId = getGroupIdBySG(storageGroupList.get(0));
+      QuerySeriesTypeRequest request = new QuerySeriesTypeRequest(groupId,
+          readMetadataConsistencyLevel, path);
+      SingleQPTask task = new SingleQPTask(false, request);
+
+      LOGGER.debug("Execute get series type for {} statement for group {}.", path, groupId);
+      /** Check if the plan can be executed locally. **/
+      if (canHandleQueryByGroupId(groupId)) {
+        LOGGER.debug("Execute get series type for {} statement locally for group {}.", path, groupId);
+        dataType = querySeriesTypeLocally(path, groupId, task);
+      } else {
+        try {
+          PeerId holder = RaftUtils.getRandomPeerID(groupId);
+          dataType = querySeriesType(task, holder);
+        } catch (RaftConnectionException e) {
+          LOGGER.error(e.getMessage());
+          throw new ProcessorException("Raft connection occurs error.", e);
+        }
+      }
+    }
+    return dataType;
+  }
+
+  /**
+   * Handle show timeseries <path> statement
+   */
+  public List<String> processPathsQuery(String path)
+      throws InterruptedException, PathErrorException, ProcessorException {
+    List<String> res = new ArrayList<>();
+    List<String> storageGroupList = mManager.getAllFileNamesByPath(path);
+    if (storageGroupList.isEmpty()) {
+      return new ArrayList<>();
+    } else {
+      Map<String, Set<String>> groupIdSGMap = classifySGByGroupId(storageGroupList);
+      for (Entry<String, Set<String>> entry : groupIdSGMap.entrySet()) {
+        List<String> paths = getSubQueryPaths(entry.getValue(), path);
+        String groupId = entry.getKey();
+        handlePathsQuery(groupId, paths, res);
+      }
+    }
+    return res;
+  }
+
+  /**
+   * Handle query timeseries in one data group
+   *
+   * @param groupId data group id
+   */
+  private void handlePathsQuery(String groupId, List<String> pathList, List<String> res)
+      throws ProcessorException, InterruptedException {
+    QueryPathsRequest request = new QueryPathsRequest(groupId,
+        readMetadataConsistencyLevel, pathList);
+    SingleQPTask task = new SingleQPTask(false, request);
+
+    LOGGER.debug("Execute get paths for {} statement for group {}.", pathList, groupId);
+    /** Check if the plan can be executed locally. **/
+    if (canHandleQueryByGroupId(groupId)) {
+      LOGGER.debug("Execute get paths for {} statement locally for group {}.", pathList, groupId);
+      res.addAll(queryPathsLocally(pathList, groupId, task));
+    } else {
+      try {
+        PeerId holder = RaftUtils.getRandomPeerID(groupId);
+        res.addAll(queryPaths(task, holder));
+      } catch (RaftConnectionException e) {
+        LOGGER.error(e.getMessage());
+        throw new ProcessorException("Raft connection occurs error.", e);
+      }
+    }
+  }
+
   /**
    * Handle "show timeseries <path>" statement
    *
@@ -241,6 +366,58 @@ public class QueryMetadataExecutor extends ClusterQPExecutor {
         : ((QueryTimeSeriesResponse) response).getTimeSeries();
   }
 
+  private TSDataType querySeriesTypeLocally(String path, String groupId,
+      SingleQPTask task)
+      throws InterruptedException, ProcessorException {
+    final byte[] reqContext = RaftUtils.createRaftRequestContext();
+    DataPartitionRaftHolder dataPartitionHolder = RaftUtils.getDataPartitonRaftHolder(groupId);
+
+    /** Check consistency level**/
+    if (readMetadataConsistencyLevel == ClusterConstant.WEAK_CONSISTENCY_LEVEL) {
+      QuerySeriesTypeResponse response;
+      try {
+        response = QuerySeriesTypeResponse.createSuccessResponse(groupId, mManager.getSeriesType(path));
+      } catch (final PathErrorException e) {
+        response = QuerySeriesTypeResponse.createErrorResponse(groupId, e.getMessage());
+      }
+      task.run(response);
+    } else {
+      ((RaftService) dataPartitionHolder.getService()).getNode()
+          .readIndex(reqContext, new ReadIndexClosure() {
+
+            @Override
+            public void run(Status status, long index, byte[] reqCtx) {
+              QuerySeriesTypeResponse response;
+              if (status.isOk()) {
+                try {
+                  LOGGER.debug("start to read");
+                  response = QuerySeriesTypeResponse.createSuccessResponse(groupId, mManager.getSeriesType(path));
+                } catch (final PathErrorException e) {
+                  response = QuerySeriesTypeResponse.createErrorResponse(groupId, e.getMessage());
+                }
+              } else {
+                response = QuerySeriesTypeResponse.createErrorResponse(groupId, status.getErrorMsg());
+              }
+              task.run(response);
+            }
+          });
+    }
+    task.await();
+    QuerySeriesTypeResponse response = (QuerySeriesTypeResponse) task.getResponse();
+    if (response == null || !response.isSuccess()) {
+      LOGGER.error("Execute get series type for {} statement false.", path);
+      throw new ProcessorException();
+    }
+    return response.getDataType();
+  }
+
+  private TSDataType querySeriesType(SingleQPTask task, PeerId leader)
+      throws InterruptedException, RaftConnectionException {
+    BasicResponse response = asyncHandleNonQueryTaskGetRes(task, leader, 0);
+    return response == null ? null
+        : ((QuerySeriesTypeResponse) response).getDataType();
+  }
+
   /**
    * Handle "show storage group" statement locally
    *
@@ -322,6 +499,112 @@ public class QueryMetadataExecutor extends ClusterQPExecutor {
   }
 
   /**
+   * Handle "show timeseries" statement
+   */
+  private void asyncQueryMetadataLocally(String groupId, SingleQPTask task)
+      throws PathErrorException {
+    final byte[] reqContext = RaftUtils.createRaftRequestContext();
+    DataPartitionRaftHolder dataPartitionHolder = (DataPartitionRaftHolder) server
+        .getDataPartitionHolder(groupId);
+    if (readMetadataConsistencyLevel == ClusterConstant.WEAK_CONSISTENCY_LEVEL) {
+      QueryMetadataResponse response = QueryMetadataResponse
+          .createSuccessResponse(groupId, mManager.getMetadata());
+      response.addResult(true);
+      task.run(response);
+    } else {
+      ((RaftService) dataPartitionHolder.getService()).getNode()
+          .readIndex(reqContext, new ReadIndexClosure() {
+
+            @Override
+            public void run(Status status, long index, byte[] reqCtx) {
+              QueryMetadataResponse response;
+              if (status.isOk()) {
+                LOGGER.debug("start to read");
+                try {
+                  response = QueryMetadataResponse
+                      .createSuccessResponse(groupId, mManager.getMetadata());
+                  response.addResult(true);
+                } catch (PathErrorException e) {
+                  response = QueryMetadataResponse
+                      .createErrorResponse(groupId, e.getMessage());
+                  response.addResult(false);
+                }
+              } else {
+                response = QueryMetadataResponse
+                    .createErrorResponse(groupId, status.getErrorMsg());
+                response.addResult(false);
+              }
+              task.run(response);
+            }
+          });
+    }
+  }
+
+  /**
+   * Handle "show timeseries <path>" statement
+   *
+   * @param pathList column path
+   */
+  private List<String> queryPathsLocally(List<String> pathList, String groupId,
+      SingleQPTask task)
+      throws InterruptedException, ProcessorException {
+    final byte[] reqContext = RaftUtils.createRaftRequestContext();
+    DataPartitionRaftHolder dataPartitionHolder = RaftUtils.getDataPartitonRaftHolder(groupId);
+
+    /** Check consistency level**/
+    if (readMetadataConsistencyLevel == ClusterConstant.WEAK_CONSISTENCY_LEVEL) {
+      QueryPathsResponse response = QueryPathsResponse
+          .createEmptyResponse(groupId);
+      try {
+        for (String path : pathList) {
+          response.addPaths(mManager.getPaths(path));
+        }
+      } catch (final PathErrorException e) {
+        response = QueryPathsResponse.createErrorResponse(groupId, e.getMessage());
+      }
+      task.run(response);
+    } else {
+      ((RaftService) dataPartitionHolder.getService()).getNode()
+          .readIndex(reqContext, new ReadIndexClosure() {
+
+            @Override
+            public void run(Status status, long index, byte[] reqCtx) {
+              QueryPathsResponse response = QueryPathsResponse
+                  .createEmptyResponse(groupId);
+              if (status.isOk()) {
+                try {
+                  LOGGER.debug("start to read");
+                  for (String path : pathList) {
+                    response.addPaths(mManager.getPaths(path));
+                  }
+                } catch (final PathErrorException e) {
+                  response = QueryPathsResponse.createErrorResponse(groupId, e.getMessage());
+                }
+              } else {
+                response = QueryPathsResponse
+                    .createErrorResponse(groupId, status.getErrorMsg());
+              }
+              task.run(response);
+            }
+          });
+    }
+    task.await();
+    QueryPathsResponse response = (QueryPathsResponse) task.getResponse();
+    if (response == null || !response.isSuccess()) {
+      LOGGER.error("Execute get paths for {} statement false.", pathList);
+      throw new ProcessorException();
+    }
+    return response.getPaths();
+  }
+
+  private List<String> queryPaths(SingleQPTask task, PeerId leader)
+      throws InterruptedException, RaftConnectionException {
+    BasicResponse response = asyncHandleNonQueryTaskGetRes(task, leader, 0);
+    return response == null ? new ArrayList<>()
+        : ((QueryPathsResponse) response).getPaths();
+  }
+
+  /**
    * Combine multiple metadata in String format into single String
    *
    * @return single String of all metadata
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryMetadataAsyncProcessor.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryMetadataAsyncProcessor.java
new file mode 100644
index 0000000..7af4adc
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryMetadataAsyncProcessor.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.rpc.raft.processor;
+
+import com.alipay.remoting.AsyncContext;
+import com.alipay.remoting.BizContext;
+import com.alipay.sofa.jraft.Status;
+import com.alipay.sofa.jraft.closure.ReadIndexClosure;
+import org.apache.iotdb.cluster.config.ClusterConstant;
+import org.apache.iotdb.cluster.entity.raft.DataPartitionRaftHolder;
+import org.apache.iotdb.cluster.entity.raft.RaftService;
+import org.apache.iotdb.cluster.rpc.raft.request.QueryMetadataRequest;
+import org.apache.iotdb.cluster.rpc.raft.response.QueryMetadataResponse;
+import org.apache.iotdb.cluster.utils.RaftUtils;
+import org.apache.iotdb.db.exception.PathErrorException;
+import org.apache.iotdb.db.metadata.MManager;
+
+public class QueryMetadataAsyncProcessor extends
+    BasicAsyncUserProcessor<QueryMetadataRequest> {
+
+  private MManager mManager = MManager.getInstance();
+
+  @Override
+  public void handleRequest(BizContext bizContext, AsyncContext asyncContext,
+      QueryMetadataRequest request) {
+    String groupId = request.getGroupID();
+    final byte[] reqContext = RaftUtils.createRaftRequestContext();
+    DataPartitionRaftHolder dataPartitionHolder = RaftUtils.getDataPartitonRaftHolder(groupId);
+
+    if (request.getReadConsistencyLevel() == ClusterConstant.WEAK_CONSISTENCY_LEVEL) {
+      QueryMetadataResponse response = null;
+      try {
+        response = QueryMetadataResponse
+            .createSuccessResponse(groupId, mManager.getMetadata());
+      } catch (PathErrorException e) {
+        response = QueryMetadataResponse.createErrorResponse(groupId, e.getMessage());
+      }
+      response.addResult(true);
+      asyncContext.sendResponse(response);
+    } else {
+      ((RaftService) dataPartitionHolder.getService()).getNode()
+          .readIndex(reqContext, new ReadIndexClosure() {
+
+            @Override
+            public void run(Status status, long index, byte[] reqCtx) {
+              QueryMetadataResponse response;
+              if (status.isOk()) {
+                try {
+                  response = QueryMetadataResponse
+                      .createSuccessResponse(groupId, mManager.getMetadata());
+                } catch (PathErrorException e) {
+                  response = QueryMetadataResponse.createErrorResponse(groupId, e.getMessage());
+                }
+                response.addResult(true);
+              } else {
+                response = QueryMetadataResponse
+                    .createErrorResponse(groupId, status.getErrorMsg());
+                response.addResult(false);
+              }
+              asyncContext.sendResponse(response);
+            }
+          });
+    }
+  }
+
+  @Override
+  public String interest() {
+    return QueryMetadataRequest.class.getName();
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryPathsAsyncProcessor.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryPathsAsyncProcessor.java
new file mode 100644
index 0000000..b234a83
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryPathsAsyncProcessor.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.rpc.raft.processor;
+
+import com.alipay.remoting.AsyncContext;
+import com.alipay.remoting.BizContext;
+import com.alipay.sofa.jraft.Status;
+import com.alipay.sofa.jraft.closure.ReadIndexClosure;
+import org.apache.iotdb.cluster.config.ClusterConstant;
+import org.apache.iotdb.cluster.entity.raft.DataPartitionRaftHolder;
+import org.apache.iotdb.cluster.entity.raft.RaftService;
+import org.apache.iotdb.cluster.rpc.raft.request.QueryPathsRequest;
+import org.apache.iotdb.cluster.rpc.raft.response.QueryPathsResponse;
+import org.apache.iotdb.cluster.utils.RaftUtils;
+import org.apache.iotdb.db.exception.PathErrorException;
+import org.apache.iotdb.db.metadata.MManager;
+
+public class QueryPathsAsyncProcessor extends BasicAsyncUserProcessor<QueryPathsRequest> {
+
+  private MManager mManager = MManager.getInstance();
+
+  @Override
+  public void handleRequest(BizContext bizContext, AsyncContext asyncContext,
+      QueryPathsRequest request) {
+    String groupId = request.getGroupID();
+    final byte[] reqContext = RaftUtils.createRaftRequestContext();
+    DataPartitionRaftHolder dataPartitionHolder = RaftUtils.getDataPartitonRaftHolder(groupId);
+
+    if (request.getReadConsistencyLevel() == ClusterConstant.WEAK_CONSISTENCY_LEVEL) {
+      QueryPathsResponse response = QueryPathsResponse
+          .createEmptyResponse(groupId);
+      try {
+        queryPaths(request, response);
+      } catch (final PathErrorException e) {
+        response = QueryPathsResponse.createErrorResponse(groupId, e.getMessage());
+      }
+      asyncContext.sendResponse(response);
+    } else {
+      ((RaftService) dataPartitionHolder.getService()).getNode()
+          .readIndex(reqContext, new ReadIndexClosure() {
+
+            @Override
+            public void run(Status status, long index, byte[] reqCtx) {
+              QueryPathsResponse response = QueryPathsResponse
+                  .createEmptyResponse(groupId);
+              if (status.isOk()) {
+                try {
+                  queryPaths(request, response);
+                } catch (final PathErrorException e) {
+                  response = QueryPathsResponse.createErrorResponse(groupId, e.getMessage());
+                }
+              } else {
+                response = QueryPathsResponse
+                    .createErrorResponse(groupId, status.getErrorMsg());
+              }
+              asyncContext.sendResponse(response);
+            }
+          });
+    }
+  }
+
+  /**
+   * Query paths
+   */
+  private void queryPaths(QueryPathsRequest request,
+      QueryPathsResponse response) throws PathErrorException {
+    for (String path : request.getPath()) {
+      response.addPaths(mManager.getPaths(path));
+    }
+  }
+
+  @Override
+  public String interest() {
+    return QueryPathsRequest.class.getName();
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QuerySeriesTypeAsyncProcessor.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QuerySeriesTypeAsyncProcessor.java
new file mode 100644
index 0000000..b72f0a2
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QuerySeriesTypeAsyncProcessor.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.rpc.raft.processor;
+
+import com.alipay.remoting.AsyncContext;
+import com.alipay.remoting.BizContext;
+import com.alipay.sofa.jraft.Status;
+import com.alipay.sofa.jraft.closure.ReadIndexClosure;
+import org.apache.iotdb.cluster.config.ClusterConstant;
+import org.apache.iotdb.cluster.entity.raft.DataPartitionRaftHolder;
+import org.apache.iotdb.cluster.entity.raft.RaftService;
+import org.apache.iotdb.cluster.rpc.raft.request.QuerySeriesTypeRequest;
+import org.apache.iotdb.cluster.rpc.raft.response.QuerySeriesTypeResponse;
+import org.apache.iotdb.cluster.utils.RaftUtils;
+import org.apache.iotdb.db.exception.PathErrorException;
+import org.apache.iotdb.db.metadata.MManager;
+
+public class QuerySeriesTypeAsyncProcessor extends BasicAsyncUserProcessor<QuerySeriesTypeRequest> {
+
+  private MManager mManager = MManager.getInstance();
+
+  @Override
+  public void handleRequest(BizContext bizContext, AsyncContext asyncContext,
+      QuerySeriesTypeRequest request) {
+    String groupId = request.getGroupID();
+    final byte[] reqContext = RaftUtils.createRaftRequestContext();
+    DataPartitionRaftHolder dataPartitionHolder = RaftUtils.getDataPartitonRaftHolder(groupId);
+
+    if (request.getReadConsistencyLevel() == ClusterConstant.WEAK_CONSISTENCY_LEVEL) {
+      QuerySeriesTypeResponse response;
+      try {
+        response = QuerySeriesTypeResponse.createSuccessResponse(groupId, mManager.getSeriesType(request.getPath()));
+      } catch (final PathErrorException e) {
+        response = QuerySeriesTypeResponse.createErrorResponse(groupId, e.getMessage());
+      }
+      asyncContext.sendResponse(response);
+    } else {
+      ((RaftService) dataPartitionHolder.getService()).getNode()
+          .readIndex(reqContext, new ReadIndexClosure() {
+
+            @Override
+            public void run(Status status, long index, byte[] reqCtx) {
+              QuerySeriesTypeResponse response;
+              if (status.isOk()) {
+                try {
+                  response = QuerySeriesTypeResponse.createSuccessResponse(groupId, mManager.getSeriesType(request.getPath()));
+                } catch (final PathErrorException e) {
+                  response = QuerySeriesTypeResponse.createErrorResponse(groupId, e.getMessage());
+                }
+              } else {
+                response = QuerySeriesTypeResponse
+                    .createErrorResponse(groupId, status.getErrorMsg());
+              }
+              asyncContext.sendResponse(response);
+            }
+          });
+    }
+  }
+
+  @Override
+  public String interest() {
+    return QuerySeriesTypeRequest.class.getName();
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QueryMetadataRequest.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QueryMetadataRequest.java
new file mode 100644
index 0000000..2628fb6
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QueryMetadataRequest.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.rpc.raft.request;
+
+import java.io.Serializable;
+
+public class QueryMetadataRequest extends BasicQueryRequest implements Serializable {
+
+  public QueryMetadataRequest(String groupID, int readConsistencyLevel) {
+    super(groupID, readConsistencyLevel);
+  }
+}
\ No newline at end of file
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QueryPathsRequest.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QueryPathsRequest.java
new file mode 100644
index 0000000..2c600f4
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QueryPathsRequest.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.rpc.raft.request;
+
+import java.io.Serializable;
+import java.util.List;
+
+public class QueryPathsRequest extends BasicQueryRequest implements Serializable {
+
+  private List<String> path;
+
+  public QueryPathsRequest(String groupID, int readConsistencyLevel, List<String> path) {
+    super(groupID, readConsistencyLevel);
+    this.path = path;
+  }
+
+  public List<String> getPath() {
+    return path;
+  }
+}
\ No newline at end of file
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QuerySeriesTypeRequest.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QuerySeriesTypeRequest.java
new file mode 100644
index 0000000..c486576
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QuerySeriesTypeRequest.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.rpc.raft.request;
+
+import java.io.Serializable;
+
+public class QuerySeriesTypeRequest extends BasicQueryRequest implements Serializable {
+
+  private String path;
+
+  public QuerySeriesTypeRequest(String groupID, int readConsistencyLevel, String path) {
+    super(groupID, readConsistencyLevel);
+    this.path = path;
+  }
+
+  public String getPath() {
+    return path;
+  }
+}
\ No newline at end of file
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/QueryMetadataResponse.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/QueryMetadataResponse.java
new file mode 100644
index 0000000..6c21798
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/QueryMetadataResponse.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.rpc.raft.response;
+
+import org.apache.iotdb.db.metadata.Metadata;
+
+public class QueryMetadataResponse extends BasicResponse {
+
+  private Metadata metadata;
+
+  private QueryMetadataResponse(String groupId, boolean redirected, String leaderStr,
+      String errorMsg) {
+    super(groupId, redirected, leaderStr, errorMsg);
+  }
+
+  public static QueryMetadataResponse createSuccessResponse(String groupId,
+      Metadata metadata) {
+    QueryMetadataResponse response = new QueryMetadataResponse(groupId, false, null,
+        null);
+    response.metadata = metadata;
+    return response;
+  }
+
+  public static QueryMetadataResponse createErrorResponse(String groupId, String errorMsg) {
+    return new QueryMetadataResponse(groupId, false, null, errorMsg);
+  }
+
+  public Metadata getMetadata() {
+    return metadata;
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/QueryPathsResponse.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/QueryPathsResponse.java
new file mode 100644
index 0000000..29d659a
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/QueryPathsResponse.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.rpc.raft.response;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class QueryPathsResponse extends BasicResponse {
+
+  private List<String> paths;
+
+  private QueryPathsResponse(String groupId, boolean redirected, boolean success, String leaderStr, String errorMsg) {
+    super(groupId, redirected, leaderStr, errorMsg);
+    this.addResult(success);
+    paths = new ArrayList<>();
+  }
+
+  public static QueryPathsResponse createEmptyResponse(String groupId){
+    return new QueryPathsResponse(groupId, false, true, null, null);
+  }
+
+  public static QueryPathsResponse createErrorResponse(String groupId, String errorMsg) {
+    return new QueryPathsResponse(groupId, false, false, null, errorMsg);
+  }
+
+  public List<String> getPaths() {
+    return paths;
+  }
+
+  public void addPaths(List<String> paths){
+    this.paths.addAll(paths);
+  }
+
+}
\ No newline at end of file
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/QuerySeriesTypeResponse.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/QuerySeriesTypeResponse.java
new file mode 100644
index 0000000..e86e108
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/QuerySeriesTypeResponse.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.rpc.raft.response;
+
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+public class QuerySeriesTypeResponse extends BasicResponse {
+
+  private TSDataType dataType;
+
+  private QuerySeriesTypeResponse(String groupId, boolean redirected, String leaderStr,
+      String errorMsg) {
+    super(groupId, redirected, leaderStr, errorMsg);
+  }
+
+  public static QuerySeriesTypeResponse createSuccessResponse(String groupId, TSDataType dataType) {
+    QuerySeriesTypeResponse response = new QuerySeriesTypeResponse(groupId, false, null,
+        null);
+    response.dataType = dataType;
+    return response;
+  }
+
+  public static QuerySeriesTypeResponse createErrorResponse(String groupId, String errorMsg) {
+    return new QuerySeriesTypeResponse(groupId, false, null, errorMsg);
+  }
+
+  public TSDataType getDataType() {
+    return dataType;
+  }
+
+  public void setDataType(TSDataType dataType) {
+    this.dataType = dataType;
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/service/TSServiceClusterImpl.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/service/TSServiceClusterImpl.java
index ba4e59d..320398b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/service/TSServiceClusterImpl.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/service/TSServiceClusterImpl.java
@@ -34,11 +34,14 @@ import org.apache.iotdb.db.auth.AuthException;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.metadata.MManager;
+import org.apache.iotdb.db.metadata.Metadata;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.service.TSServiceImpl;
 import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementReq;
 import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementResp;
 import org.apache.iotdb.service.rpc.thrift.TS_StatusCode;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
@@ -244,6 +247,23 @@ public class TSServiceClusterImpl extends TSServiceImpl {
     return queryMetadataExecutor.get().processMetadataInStringQuery();
   }
 
+  @Override
+  protected Metadata getMetadata()
+      throws InterruptedException, ProcessorException, PathErrorException {
+    return queryMetadataExecutor.get().processMetadataQuery();
+  }
+
+  @Override
+  protected TSDataType getSeriesType(String path) throws PathErrorException, InterruptedException, ProcessorException {
+    return queryMetadataExecutor.get().processSeriesTypeQuery(path);
+  }
+
+  @Override
+  protected List<String> getPaths(String path)
+      throws PathErrorException, InterruptedException, ProcessorException {
+    return queryMetadataExecutor.get().processPathsQuery(path);
+  }
+
   @OnlyForTest
   public NonQueryExecutor getNonQueryExecutor() {
     return nonQueryExecutor.get();
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/metadata/Metadata.java b/iotdb/src/main/java/org/apache/iotdb/db/metadata/Metadata.java
index 40fd0e4..15b6011 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/metadata/Metadata.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/metadata/Metadata.java
@@ -18,8 +18,11 @@
  */
 package org.apache.iotdb.db.metadata;
 
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
@@ -67,6 +70,57 @@ public class Metadata {
     return deviceIdMap;
   }
 
+  /**
+   * combine multiple metadatas
+   */
+  public static Metadata combineMetadatas(Metadata[] metadatas) {
+    Map<String, List<MeasurementSchema>> seriesMap = new HashMap<>();
+    Map<String, List<String>> deviceIdMap = new HashMap<>();
+    Map<String, Map<String, MeasurementSchema>> typeSchemaMap = new HashMap<>();
+
+    if (metadatas == null || metadatas.length == 0) {
+      return new Metadata(seriesMap, deviceIdMap);
+    }
+
+    for (int i = 0; i < metadatas.length; i++) {
+      Map<String, List<MeasurementSchema>> subSeriesMap = metadatas[i].seriesMap;
+      for (Entry<String, List<MeasurementSchema>> entry : subSeriesMap.entrySet()) {
+        Map<String, MeasurementSchema> map;
+        if (typeSchemaMap.containsKey(entry.getKey())) {
+          map = typeSchemaMap.get(entry.getKey());
+        } else {
+          map = new HashMap<>();
+        }
+        entry.getValue().forEach(schema -> map.put(schema.getMeasurementId(), schema));
+        if (!typeSchemaMap.containsKey(entry.getKey())) {
+          typeSchemaMap.put(entry.getKey(), map);
+        }
+      }
+
+      Map<String, List<String>> subDeviceIdMap = metadatas[i].deviceIdMap;
+      for (Entry<String, List<String>> entry : subDeviceIdMap.entrySet()) {
+        List<String> list;
+        if (deviceIdMap.containsKey(entry.getKey())) {
+          list = deviceIdMap.get(entry.getKey());
+        } else {
+          list = new ArrayList<>();
+        }
+        list.addAll(entry.getValue());
+        if (!deviceIdMap.containsKey(entry.getKey())) {
+          deviceIdMap.put(entry.getKey(), list);
+        }
+      }
+    }
+
+    for (Entry<String, Map<String, MeasurementSchema>> entry : typeSchemaMap.entrySet()) {
+      List<MeasurementSchema> list = new ArrayList<>();
+      list.addAll(entry.getValue().values());
+      seriesMap.put(entry.getKey(), list);
+    }
+
+    return new Metadata(seriesMap, deviceIdMap);
+  }
+
   @Override
   public String toString() {
     return seriesMap.toString() + "\n" + deviceIdMap.toString();
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 79dbd53..299133c 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -81,6 +81,7 @@ import org.apache.iotdb.service.rpc.thrift.TS_SessionHandle;
 import org.apache.iotdb.service.rpc.thrift.TS_Status;
 import org.apache.iotdb.service.rpc.thrift.TS_StatusCode;
 import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 import org.apache.thrift.TException;
@@ -308,14 +309,14 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
         Metadata metadata;
         try {
           String column = req.getColumnPath();
-          metadata = MManager.getInstance().getMetadata();
+          metadata = getMetadata();
           Map<String, List<String>> deviceMap = metadata.getDeviceMap();
           if (deviceMap == null || !deviceMap.containsKey(column)) {
             resp.setColumnsList(new ArrayList<>());
           } else {
             resp.setColumnsList(deviceMap.get(column));
           }
-        } catch (PathErrorException e) {
+        } catch (PathErrorException | InterruptedException | ProcessorException e) {
           LOGGER.error("cannot get delta object map", e);
           status = getErrorStatus(String.format("Failed to fetch delta object map because: %s", e));
           resp.setStatus(status);
@@ -330,8 +331,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
         break;
       case "COLUMN":
         try {
-          resp.setDataType(MManager.getInstance().getSeriesType(req.getColumnPath()).toString());
-        } catch (PathErrorException e) {
+          resp.setDataType(getSeriesType(req.getColumnPath()).toString());
+        } catch (PathErrorException | InterruptedException | ProcessorException e) {
           // TODO aggregate seriesPath e.g. last(root.ln.wf01.wt01.status)
           // status = new TS_Status(TS_StatusCode.ERROR_STATUS);
           // status.setErrorMessage(String.format("Failed to fetch %s's data type because: %s",
@@ -343,8 +344,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
         break;
       case "ALL_COLUMNS":
         try {
-          resp.setColumnsList(MManager.getInstance().getPaths(req.getColumnPath()));
-        } catch (PathErrorException e) {
+          resp.setColumnsList(getPaths(req.getColumnPath()));
+        } catch (PathErrorException | InterruptedException | ProcessorException e) {
           status = getErrorStatus(String
               .format("Failed to fetch %s's all columns because: %s", req.getColumnPath(), e));
           resp.setStatus(status);
@@ -382,6 +383,18 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
     return MManager.getInstance().getMetadataInString();
   }
 
+  protected Metadata getMetadata() throws PathErrorException, InterruptedException, ProcessorException {
+    return MManager.getInstance().getMetadata();
+  }
+
+  protected TSDataType getSeriesType(String path) throws PathErrorException, InterruptedException, ProcessorException {
+    return MManager.getInstance().getSeriesType(path);
+  }
+
+  protected List<String> getPaths(String path) throws PathErrorException, InterruptedException, ProcessorException {
+    return MManager.getInstance().getPaths(path);
+  }
+
   /**
    * Judge whether the statement is ADMIN COMMAND and if true, executeWithGlobalTimeFilter it.
    *