You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ea...@apache.org on 2019/04/11 02:53:19 UTC
[incubator-iotdb] 01/01: implement DELTA_OBEJECT, COLUMN,
ALL_COLUMNS
This is an automated email from the ASF dual-hosted git repository.
east pushed a commit to branch cluster_metadata_query
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 8836850439791cbefc3b41483613d6fa8b552534
Author: mdf369 <95...@qq.com>
AuthorDate: Thu Apr 11 10:52:51 2019 +0800
implement DELTA_OBEJECT, COLUMN, ALL_COLUMNS
---
.../org/apache/iotdb/cluster/entity/Server.java | 6 +
.../cluster/qp/executor/QueryMetadataExecutor.java | 283 +++++++++++++++++++++
.../processor/QueryMetadataAsyncProcessor.java | 86 +++++++
.../raft/processor/QueryPathsAsyncProcessor.java | 92 +++++++
.../processor/QuerySeriesTypeAsyncProcessor.java | 80 ++++++
.../rpc/raft/request/QueryMetadataRequest.java | 28 ++
.../rpc/raft/request/QueryPathsRequest.java | 36 +++
.../rpc/raft/request/QuerySeriesTypeRequest.java | 35 +++
.../rpc/raft/response/QueryMetadataResponse.java | 47 ++++
.../rpc/raft/response/QueryPathsResponse.java | 50 ++++
.../rpc/raft/response/QuerySeriesTypeResponse.java | 50 ++++
.../cluster/rpc/service/TSServiceClusterImpl.java | 20 ++
.../org/apache/iotdb/db/metadata/Metadata.java | 54 ++++
.../org/apache/iotdb/db/service/TSServiceImpl.java | 25 +-
14 files changed, 886 insertions(+), 6 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java b/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
index be4a74a..ad1b4d6 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
@@ -32,7 +32,10 @@ import org.apache.iotdb.cluster.entity.raft.MetadataRaftHolder;
import org.apache.iotdb.cluster.rpc.raft.impl.RaftNodeAsClientManager;
import org.apache.iotdb.cluster.rpc.raft.processor.DataGroupNonQueryAsyncProcessor;
import org.apache.iotdb.cluster.rpc.raft.processor.MetaGroupNonQueryAsyncProcessor;
+import org.apache.iotdb.cluster.rpc.raft.processor.QueryMetadataAsyncProcessor;
import org.apache.iotdb.cluster.rpc.raft.processor.QueryMetadataInStringAsyncProcessor;
+import org.apache.iotdb.cluster.rpc.raft.processor.QueryPathsAsyncProcessor;
+import org.apache.iotdb.cluster.rpc.raft.processor.QuerySeriesTypeAsyncProcessor;
import org.apache.iotdb.cluster.rpc.raft.processor.QueryTimeSeriesAsyncProcessor;
import org.apache.iotdb.cluster.utils.RaftUtils;
import org.apache.iotdb.cluster.utils.hash.PhysicalNode;
@@ -93,6 +96,9 @@ public class Server {
rpcServer.registerUserProcessor(new MetaGroupNonQueryAsyncProcessor());
rpcServer.registerUserProcessor(new QueryTimeSeriesAsyncProcessor());
rpcServer.registerUserProcessor(new QueryMetadataInStringAsyncProcessor());
+ rpcServer.registerUserProcessor(new QueryMetadataAsyncProcessor());
+ rpcServer.registerUserProcessor(new QuerySeriesTypeAsyncProcessor());
+ rpcServer.registerUserProcessor(new QueryPathsAsyncProcessor());
metadataHolder = new MetadataRaftHolder(peerIds, serverId, rpcServer, true);
metadataHolder.init();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
index 8dc20b3..3be4e81 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
@@ -35,16 +35,24 @@ import org.apache.iotdb.cluster.entity.raft.RaftService;
import org.apache.iotdb.cluster.exception.RaftConnectionException;
import org.apache.iotdb.cluster.qp.ClusterQPExecutor;
import org.apache.iotdb.cluster.rpc.raft.request.QueryMetadataInStringRequest;
+import org.apache.iotdb.cluster.rpc.raft.request.QueryMetadataRequest;
+import org.apache.iotdb.cluster.rpc.raft.request.QueryPathsRequest;
+import org.apache.iotdb.cluster.rpc.raft.request.QuerySeriesTypeRequest;
import org.apache.iotdb.cluster.rpc.raft.request.QueryStorageGroupRequest;
import org.apache.iotdb.cluster.rpc.raft.request.QueryTimeSeriesRequest;
import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse;
import org.apache.iotdb.cluster.rpc.raft.response.QueryMetadataInStringResponse;
+import org.apache.iotdb.cluster.rpc.raft.response.QueryMetadataResponse;
+import org.apache.iotdb.cluster.rpc.raft.response.QueryPathsResponse;
+import org.apache.iotdb.cluster.rpc.raft.response.QuerySeriesTypeResponse;
import org.apache.iotdb.cluster.rpc.raft.response.QueryStorageGroupResponse;
import org.apache.iotdb.cluster.rpc.raft.response.QueryTimeSeriesResponse;
import org.apache.iotdb.cluster.utils.RaftUtils;
import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.exception.ProcessorException;
import org.apache.iotdb.db.metadata.MManager;
+import org.apache.iotdb.db.metadata.Metadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -177,6 +185,123 @@ public class QueryMetadataExecutor extends ClusterQPExecutor {
return combineMetadataInStringList(metadataList);
}
+ public Metadata processMetadataQuery()
+ throws InterruptedException, ProcessorException, PathErrorException {
+ Set<String> groupIdSet = router.getAllGroupId();
+
+ Metadata[] metadatas = new Metadata[groupIdSet.size()];
+ List<SingleQPTask> taskList = new ArrayList<>();
+ for (String groupId : groupIdSet) {
+ QueryMetadataRequest request = new QueryMetadataRequest(groupId,
+ readMetadataConsistencyLevel);
+ SingleQPTask task = new SingleQPTask(false, request);
+ taskList.add(task);
+
+ LOGGER.debug("Execute query metadata statement for group {}.", groupId);
+ /** Check if the plan can be executed locally. **/
+ if (canHandleQueryByGroupId(groupId)) {
+ LOGGER.debug("Execute query metadata statement locally for group {}.", groupId);
+ asyncQueryMetadataLocally(groupId, task);
+ } else {
+ try {
+ PeerId holder = RaftUtils.getRandomPeerID(groupId);
+ asyncSendNonQueryTask(task, holder, 0);
+ } catch (RaftConnectionException e) {
+ LOGGER.error(e.getMessage());
+ throw new ProcessorException("Raft connection occurs error.", e);
+ }
+ }
+ }
+ for (int i = 0; i < taskList.size(); i++) {
+ SingleQPTask task = taskList.get(i);
+ task.await();
+ BasicResponse response = task.getResponse();
+ if (response == null || !response.isSuccess()) {
+ LOGGER.error("Execute show timeseries statement false.");
+ throw new ProcessorException();
+ }
+ metadatas[i] = ((QueryMetadataResponse)response).getMetadata();
+ }
+ return Metadata.combineMetadatas(metadatas);
+ }
+
+ public TSDataType processSeriesTypeQuery(String path)
+ throws InterruptedException, ProcessorException, PathErrorException {
+ TSDataType dataType = null;
+ List<String> storageGroupList = mManager.getAllFileNamesByPath(path);
+ if (storageGroupList.size() != 1) {
+ throw new PathErrorException("path " + path + " is not valid.");
+ } else {
+ String groupId = getGroupIdBySG(storageGroupList.get(0));
+ QuerySeriesTypeRequest request = new QuerySeriesTypeRequest(groupId,
+ readMetadataConsistencyLevel, path);
+ SingleQPTask task = new SingleQPTask(false, request);
+
+ LOGGER.debug("Execute get series type for {} statement for group {}.", path, groupId);
+ /** Check if the plan can be executed locally. **/
+ if (canHandleQueryByGroupId(groupId)) {
+ LOGGER.debug("Execute get series type for {} statement locally for group {}.", path, groupId);
+ dataType = querySeriesTypeLocally(path, groupId, task);
+ } else {
+ try {
+ PeerId holder = RaftUtils.getRandomPeerID(groupId);
+ dataType = querySeriesType(task, holder);
+ } catch (RaftConnectionException e) {
+ LOGGER.error(e.getMessage());
+ throw new ProcessorException("Raft connection occurs error.", e);
+ }
+ }
+ }
+ return dataType;
+ }
+
+ /**
+ * Handle show timeseries <path> statement
+ */
+ public List<String> processPathsQuery(String path)
+ throws InterruptedException, PathErrorException, ProcessorException {
+ List<String> res = new ArrayList<>();
+ List<String> storageGroupList = mManager.getAllFileNamesByPath(path);
+ if (storageGroupList.isEmpty()) {
+ return new ArrayList<>();
+ } else {
+ Map<String, Set<String>> groupIdSGMap = classifySGByGroupId(storageGroupList);
+ for (Entry<String, Set<String>> entry : groupIdSGMap.entrySet()) {
+ List<String> paths = getSubQueryPaths(entry.getValue(), path);
+ String groupId = entry.getKey();
+ handlePathsQuery(groupId, paths, res);
+ }
+ }
+ return res;
+ }
+
+ /**
+ * Handle query timeseries in one data group
+ *
+ * @param groupId data group id
+ */
+ private void handlePathsQuery(String groupId, List<String> pathList, List<String> res)
+ throws ProcessorException, InterruptedException {
+ QueryPathsRequest request = new QueryPathsRequest(groupId,
+ readMetadataConsistencyLevel, pathList);
+ SingleQPTask task = new SingleQPTask(false, request);
+
+ LOGGER.debug("Execute get paths for {} statement for group {}.", pathList, groupId);
+ /** Check if the plan can be executed locally. **/
+ if (canHandleQueryByGroupId(groupId)) {
+ LOGGER.debug("Execute get paths for {} statement locally for group {}.", pathList, groupId);
+ res.addAll(queryPathsLocally(pathList, groupId, task));
+ } else {
+ try {
+ PeerId holder = RaftUtils.getRandomPeerID(groupId);
+ res.addAll(queryPaths(task, holder));
+ } catch (RaftConnectionException e) {
+ LOGGER.error(e.getMessage());
+ throw new ProcessorException("Raft connection occurs error.", e);
+ }
+ }
+ }
+
/**
* Handle "show timeseries <path>" statement
*
@@ -241,6 +366,58 @@ public class QueryMetadataExecutor extends ClusterQPExecutor {
: ((QueryTimeSeriesResponse) response).getTimeSeries();
}
+ private TSDataType querySeriesTypeLocally(String path, String groupId,
+ SingleQPTask task)
+ throws InterruptedException, ProcessorException {
+ final byte[] reqContext = RaftUtils.createRaftRequestContext();
+ DataPartitionRaftHolder dataPartitionHolder = RaftUtils.getDataPartitonRaftHolder(groupId);
+
+ /** Check consistency level**/
+ if (readMetadataConsistencyLevel == ClusterConstant.WEAK_CONSISTENCY_LEVEL) {
+ QuerySeriesTypeResponse response;
+ try {
+ response = QuerySeriesTypeResponse.createSuccessResponse(groupId, mManager.getSeriesType(path));
+ } catch (final PathErrorException e) {
+ response = QuerySeriesTypeResponse.createErrorResponse(groupId, e.getMessage());
+ }
+ task.run(response);
+ } else {
+ ((RaftService) dataPartitionHolder.getService()).getNode()
+ .readIndex(reqContext, new ReadIndexClosure() {
+
+ @Override
+ public void run(Status status, long index, byte[] reqCtx) {
+ QuerySeriesTypeResponse response;
+ if (status.isOk()) {
+ try {
+ LOGGER.debug("start to read");
+ response = QuerySeriesTypeResponse.createSuccessResponse(groupId, mManager.getSeriesType(path));
+ } catch (final PathErrorException e) {
+ response = QuerySeriesTypeResponse.createErrorResponse(groupId, e.getMessage());
+ }
+ } else {
+ response = QuerySeriesTypeResponse.createErrorResponse(groupId, status.getErrorMsg());
+ }
+ task.run(response);
+ }
+ });
+ }
+ task.await();
+ QuerySeriesTypeResponse response = (QuerySeriesTypeResponse) task.getResponse();
+ if (response == null || !response.isSuccess()) {
+ LOGGER.error("Execute get series type for {} statement false.", path);
+ throw new ProcessorException();
+ }
+ return response.getDataType();
+ }
+
+ private TSDataType querySeriesType(SingleQPTask task, PeerId leader)
+ throws InterruptedException, RaftConnectionException {
+ BasicResponse response = asyncHandleNonQueryTaskGetRes(task, leader, 0);
+ return response == null ? null
+ : ((QuerySeriesTypeResponse) response).getDataType();
+ }
+
/**
* Handle "show storage group" statement locally
*
@@ -322,6 +499,112 @@ public class QueryMetadataExecutor extends ClusterQPExecutor {
}
/**
+ * Handle "show timeseries" statement
+ */
+ private void asyncQueryMetadataLocally(String groupId, SingleQPTask task)
+ throws PathErrorException {
+ final byte[] reqContext = RaftUtils.createRaftRequestContext();
+ DataPartitionRaftHolder dataPartitionHolder = (DataPartitionRaftHolder) server
+ .getDataPartitionHolder(groupId);
+ if (readMetadataConsistencyLevel == ClusterConstant.WEAK_CONSISTENCY_LEVEL) {
+ QueryMetadataResponse response = QueryMetadataResponse
+ .createSuccessResponse(groupId, mManager.getMetadata());
+ response.addResult(true);
+ task.run(response);
+ } else {
+ ((RaftService) dataPartitionHolder.getService()).getNode()
+ .readIndex(reqContext, new ReadIndexClosure() {
+
+ @Override
+ public void run(Status status, long index, byte[] reqCtx) {
+ QueryMetadataResponse response;
+ if (status.isOk()) {
+ LOGGER.debug("start to read");
+ try {
+ response = QueryMetadataResponse
+ .createSuccessResponse(groupId, mManager.getMetadata());
+ response.addResult(true);
+ } catch (PathErrorException e) {
+ response = QueryMetadataResponse
+ .createErrorResponse(groupId, e.getMessage());
+ response.addResult(false);
+ }
+ } else {
+ response = QueryMetadataResponse
+ .createErrorResponse(groupId, status.getErrorMsg());
+ response.addResult(false);
+ }
+ task.run(response);
+ }
+ });
+ }
+ }
+
+ /**
+ * Handle "show timeseries <path>" statement
+ *
+ * @param pathList column path
+ */
+ private List<String> queryPathsLocally(List<String> pathList, String groupId,
+ SingleQPTask task)
+ throws InterruptedException, ProcessorException {
+ final byte[] reqContext = RaftUtils.createRaftRequestContext();
+ DataPartitionRaftHolder dataPartitionHolder = RaftUtils.getDataPartitonRaftHolder(groupId);
+
+ /** Check consistency level**/
+ if (readMetadataConsistencyLevel == ClusterConstant.WEAK_CONSISTENCY_LEVEL) {
+ QueryPathsResponse response = QueryPathsResponse
+ .createEmptyResponse(groupId);
+ try {
+ for (String path : pathList) {
+ response.addPaths(mManager.getPaths(path));
+ }
+ } catch (final PathErrorException e) {
+ response = QueryPathsResponse.createErrorResponse(groupId, e.getMessage());
+ }
+ task.run(response);
+ } else {
+ ((RaftService) dataPartitionHolder.getService()).getNode()
+ .readIndex(reqContext, new ReadIndexClosure() {
+
+ @Override
+ public void run(Status status, long index, byte[] reqCtx) {
+ QueryPathsResponse response = QueryPathsResponse
+ .createEmptyResponse(groupId);
+ if (status.isOk()) {
+ try {
+ LOGGER.debug("start to read");
+ for (String path : pathList) {
+ response.addPaths(mManager.getPaths(path));
+ }
+ } catch (final PathErrorException e) {
+ response = QueryPathsResponse.createErrorResponse(groupId, e.getMessage());
+ }
+ } else {
+ response = QueryPathsResponse
+ .createErrorResponse(groupId, status.getErrorMsg());
+ }
+ task.run(response);
+ }
+ });
+ }
+ task.await();
+ QueryPathsResponse response = (QueryPathsResponse) task.getResponse();
+ if (response == null || !response.isSuccess()) {
+ LOGGER.error("Execute get paths for {} statement false.", pathList);
+ throw new ProcessorException();
+ }
+ return response.getPaths();
+ }
+
+ private List<String> queryPaths(SingleQPTask task, PeerId leader)
+ throws InterruptedException, RaftConnectionException {
+ BasicResponse response = asyncHandleNonQueryTaskGetRes(task, leader, 0);
+ return response == null ? new ArrayList<>()
+ : ((QueryPathsResponse) response).getPaths();
+ }
+
+ /**
* Combine multiple metadata in String format into single String
*
* @return single String of all metadata
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryMetadataAsyncProcessor.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryMetadataAsyncProcessor.java
new file mode 100644
index 0000000..7af4adc
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryMetadataAsyncProcessor.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.rpc.raft.processor;
+
+import com.alipay.remoting.AsyncContext;
+import com.alipay.remoting.BizContext;
+import com.alipay.sofa.jraft.Status;
+import com.alipay.sofa.jraft.closure.ReadIndexClosure;
+import org.apache.iotdb.cluster.config.ClusterConstant;
+import org.apache.iotdb.cluster.entity.raft.DataPartitionRaftHolder;
+import org.apache.iotdb.cluster.entity.raft.RaftService;
+import org.apache.iotdb.cluster.rpc.raft.request.QueryMetadataRequest;
+import org.apache.iotdb.cluster.rpc.raft.response.QueryMetadataResponse;
+import org.apache.iotdb.cluster.utils.RaftUtils;
+import org.apache.iotdb.db.exception.PathErrorException;
+import org.apache.iotdb.db.metadata.MManager;
+
+public class QueryMetadataAsyncProcessor extends
+ BasicAsyncUserProcessor<QueryMetadataRequest> {
+
+ private MManager mManager = MManager.getInstance();
+
+ @Override
+ public void handleRequest(BizContext bizContext, AsyncContext asyncContext,
+ QueryMetadataRequest request) {
+ String groupId = request.getGroupID();
+ final byte[] reqContext = RaftUtils.createRaftRequestContext();
+ DataPartitionRaftHolder dataPartitionHolder = RaftUtils.getDataPartitonRaftHolder(groupId);
+
+ if (request.getReadConsistencyLevel() == ClusterConstant.WEAK_CONSISTENCY_LEVEL) {
+ QueryMetadataResponse response = null;
+ try {
+ response = QueryMetadataResponse
+ .createSuccessResponse(groupId, mManager.getMetadata());
+ } catch (PathErrorException e) {
+ response = QueryMetadataResponse.createErrorResponse(groupId, e.getMessage());
+ }
+ response.addResult(true);
+ asyncContext.sendResponse(response);
+ } else {
+ ((RaftService) dataPartitionHolder.getService()).getNode()
+ .readIndex(reqContext, new ReadIndexClosure() {
+
+ @Override
+ public void run(Status status, long index, byte[] reqCtx) {
+ QueryMetadataResponse response;
+ if (status.isOk()) {
+ try {
+ response = QueryMetadataResponse
+ .createSuccessResponse(groupId, mManager.getMetadata());
+ } catch (PathErrorException e) {
+ response = QueryMetadataResponse.createErrorResponse(groupId, e.getMessage());
+ }
+ response.addResult(true);
+ } else {
+ response = QueryMetadataResponse
+ .createErrorResponse(groupId, status.getErrorMsg());
+ response.addResult(false);
+ }
+ asyncContext.sendResponse(response);
+ }
+ });
+ }
+ }
+
+ @Override
+ public String interest() {
+ return QueryMetadataRequest.class.getName();
+ }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryPathsAsyncProcessor.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryPathsAsyncProcessor.java
new file mode 100644
index 0000000..b234a83
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryPathsAsyncProcessor.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.rpc.raft.processor;
+
+import com.alipay.remoting.AsyncContext;
+import com.alipay.remoting.BizContext;
+import com.alipay.sofa.jraft.Status;
+import com.alipay.sofa.jraft.closure.ReadIndexClosure;
+import org.apache.iotdb.cluster.config.ClusterConstant;
+import org.apache.iotdb.cluster.entity.raft.DataPartitionRaftHolder;
+import org.apache.iotdb.cluster.entity.raft.RaftService;
+import org.apache.iotdb.cluster.rpc.raft.request.QueryPathsRequest;
+import org.apache.iotdb.cluster.rpc.raft.response.QueryPathsResponse;
+import org.apache.iotdb.cluster.utils.RaftUtils;
+import org.apache.iotdb.db.exception.PathErrorException;
+import org.apache.iotdb.db.metadata.MManager;
+
+public class QueryPathsAsyncProcessor extends BasicAsyncUserProcessor<QueryPathsRequest> {
+
+ private MManager mManager = MManager.getInstance();
+
+ @Override
+ public void handleRequest(BizContext bizContext, AsyncContext asyncContext,
+ QueryPathsRequest request) {
+ String groupId = request.getGroupID();
+ final byte[] reqContext = RaftUtils.createRaftRequestContext();
+ DataPartitionRaftHolder dataPartitionHolder = RaftUtils.getDataPartitonRaftHolder(groupId);
+
+ if (request.getReadConsistencyLevel() == ClusterConstant.WEAK_CONSISTENCY_LEVEL) {
+ QueryPathsResponse response = QueryPathsResponse
+ .createEmptyResponse(groupId);
+ try {
+ queryPaths(request, response);
+ } catch (final PathErrorException e) {
+ response = QueryPathsResponse.createErrorResponse(groupId, e.getMessage());
+ }
+ asyncContext.sendResponse(response);
+ } else {
+ ((RaftService) dataPartitionHolder.getService()).getNode()
+ .readIndex(reqContext, new ReadIndexClosure() {
+
+ @Override
+ public void run(Status status, long index, byte[] reqCtx) {
+ QueryPathsResponse response = QueryPathsResponse
+ .createEmptyResponse(groupId);
+ if (status.isOk()) {
+ try {
+ queryPaths(request, response);
+ } catch (final PathErrorException e) {
+ response = QueryPathsResponse.createErrorResponse(groupId, e.getMessage());
+ }
+ } else {
+ response = QueryPathsResponse
+ .createErrorResponse(groupId, status.getErrorMsg());
+ }
+ asyncContext.sendResponse(response);
+ }
+ });
+ }
+ }
+
+ /**
+ * Query paths
+ */
+ private void queryPaths(QueryPathsRequest request,
+ QueryPathsResponse response) throws PathErrorException {
+ for (String path : request.getPath()) {
+ response.addPaths(mManager.getPaths(path));
+ }
+ }
+
+ @Override
+ public String interest() {
+ return QueryPathsRequest.class.getName();
+ }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QuerySeriesTypeAsyncProcessor.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QuerySeriesTypeAsyncProcessor.java
new file mode 100644
index 0000000..b72f0a2
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QuerySeriesTypeAsyncProcessor.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.rpc.raft.processor;
+
+import com.alipay.remoting.AsyncContext;
+import com.alipay.remoting.BizContext;
+import com.alipay.sofa.jraft.Status;
+import com.alipay.sofa.jraft.closure.ReadIndexClosure;
+import org.apache.iotdb.cluster.config.ClusterConstant;
+import org.apache.iotdb.cluster.entity.raft.DataPartitionRaftHolder;
+import org.apache.iotdb.cluster.entity.raft.RaftService;
+import org.apache.iotdb.cluster.rpc.raft.request.QuerySeriesTypeRequest;
+import org.apache.iotdb.cluster.rpc.raft.response.QuerySeriesTypeResponse;
+import org.apache.iotdb.cluster.utils.RaftUtils;
+import org.apache.iotdb.db.exception.PathErrorException;
+import org.apache.iotdb.db.metadata.MManager;
+
+public class QuerySeriesTypeAsyncProcessor extends BasicAsyncUserProcessor<QuerySeriesTypeRequest> {
+
+ private MManager mManager = MManager.getInstance();
+
+ @Override
+ public void handleRequest(BizContext bizContext, AsyncContext asyncContext,
+ QuerySeriesTypeRequest request) {
+ String groupId = request.getGroupID();
+ final byte[] reqContext = RaftUtils.createRaftRequestContext();
+ DataPartitionRaftHolder dataPartitionHolder = RaftUtils.getDataPartitonRaftHolder(groupId);
+
+ if (request.getReadConsistencyLevel() == ClusterConstant.WEAK_CONSISTENCY_LEVEL) {
+ QuerySeriesTypeResponse response;
+ try {
+ response = QuerySeriesTypeResponse.createSuccessResponse(groupId, mManager.getSeriesType(request.getPath()));
+ } catch (final PathErrorException e) {
+ response = QuerySeriesTypeResponse.createErrorResponse(groupId, e.getMessage());
+ }
+ asyncContext.sendResponse(response);
+ } else {
+ ((RaftService) dataPartitionHolder.getService()).getNode()
+ .readIndex(reqContext, new ReadIndexClosure() {
+
+ @Override
+ public void run(Status status, long index, byte[] reqCtx) {
+ QuerySeriesTypeResponse response;
+ if (status.isOk()) {
+ try {
+ response = QuerySeriesTypeResponse.createSuccessResponse(groupId, mManager.getSeriesType(request.getPath()));
+ } catch (final PathErrorException e) {
+ response = QuerySeriesTypeResponse.createErrorResponse(groupId, e.getMessage());
+ }
+ } else {
+ response = QuerySeriesTypeResponse
+ .createErrorResponse(groupId, status.getErrorMsg());
+ }
+ asyncContext.sendResponse(response);
+ }
+ });
+ }
+ }
+
+ @Override
+ public String interest() {
+ return QuerySeriesTypeRequest.class.getName();
+ }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QueryMetadataRequest.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QueryMetadataRequest.java
new file mode 100644
index 0000000..2628fb6
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QueryMetadataRequest.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.rpc.raft.request;
+
+import java.io.Serializable;
+
+public class QueryMetadataRequest extends BasicQueryRequest implements Serializable {
+
+ public QueryMetadataRequest(String groupID, int readConsistencyLevel) {
+ super(groupID, readConsistencyLevel);
+ }
+}
\ No newline at end of file
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QueryPathsRequest.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QueryPathsRequest.java
new file mode 100644
index 0000000..2c600f4
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QueryPathsRequest.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.rpc.raft.request;
+
+import java.io.Serializable;
+import java.util.List;
+
+public class QueryPathsRequest extends BasicQueryRequest implements Serializable {
+
+ private List<String> path;
+
+ public QueryPathsRequest(String groupID, int readConsistencyLevel, List<String> path) {
+ super(groupID, readConsistencyLevel);
+ this.path = path;
+ }
+
+ public List<String> getPath() {
+ return path;
+ }
+}
\ No newline at end of file
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QuerySeriesTypeRequest.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QuerySeriesTypeRequest.java
new file mode 100644
index 0000000..c486576
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QuerySeriesTypeRequest.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.rpc.raft.request;
+
+import java.io.Serializable;
+
+public class QuerySeriesTypeRequest extends BasicQueryRequest implements Serializable {
+
+ private String path;
+
+ public QuerySeriesTypeRequest(String groupID, int readConsistencyLevel, String path) {
+ super(groupID, readConsistencyLevel);
+ this.path = path;
+ }
+
+ public String getPath() {
+ return path;
+ }
+}
\ No newline at end of file
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/QueryMetadataResponse.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/QueryMetadataResponse.java
new file mode 100644
index 0000000..6c21798
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/QueryMetadataResponse.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.rpc.raft.response;
+
+import org.apache.iotdb.db.metadata.Metadata;
+
+public class QueryMetadataResponse extends BasicResponse {
+
+ private Metadata metadata;
+
+ private QueryMetadataResponse(String groupId, boolean redirected, String leaderStr,
+ String errorMsg) {
+ super(groupId, redirected, leaderStr, errorMsg);
+ }
+
+ public static QueryMetadataResponse createSuccessResponse(String groupId,
+ Metadata metadata) {
+ QueryMetadataResponse response = new QueryMetadataResponse(groupId, false, null,
+ null);
+ response.metadata = metadata;
+ return response;
+ }
+
+ public static QueryMetadataResponse createErrorResponse(String groupId, String errorMsg) {
+ return new QueryMetadataResponse(groupId, false, null, errorMsg);
+ }
+
+ public Metadata getMetadata() {
+ return metadata;
+ }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/QueryPathsResponse.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/QueryPathsResponse.java
new file mode 100644
index 0000000..29d659a
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/QueryPathsResponse.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.rpc.raft.response;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class QueryPathsResponse extends BasicResponse {
+
+ private List<String> paths;
+
+ private QueryPathsResponse(String groupId, boolean redirected, boolean success, String leaderStr, String errorMsg) {
+ super(groupId, redirected, leaderStr, errorMsg);
+ this.addResult(success);
+ paths = new ArrayList<>();
+ }
+
+ public static QueryPathsResponse createEmptyResponse(String groupId){
+ return new QueryPathsResponse(groupId, false, true, null, null);
+ }
+
+ public static QueryPathsResponse createErrorResponse(String groupId, String errorMsg) {
+ return new QueryPathsResponse(groupId, false, false, null, errorMsg);
+ }
+
+ public List<String> getPaths() {
+ return paths;
+ }
+
+ public void addPaths(List<String> paths){
+ this.paths.addAll(paths);
+ }
+
+}
\ No newline at end of file
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/QuerySeriesTypeResponse.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/QuerySeriesTypeResponse.java
new file mode 100644
index 0000000..e86e108
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/QuerySeriesTypeResponse.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.rpc.raft.response;
+
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+public class QuerySeriesTypeResponse extends BasicResponse {
+
+ private TSDataType dataType;
+
+ private QuerySeriesTypeResponse(String groupId, boolean redirected, String leaderStr,
+ String errorMsg) {
+ super(groupId, redirected, leaderStr, errorMsg);
+ }
+
+ public static QuerySeriesTypeResponse createSuccessResponse(String groupId, TSDataType dataType) {
+ QuerySeriesTypeResponse response = new QuerySeriesTypeResponse(groupId, false, null,
+ null);
+ response.dataType = dataType;
+ return response;
+ }
+
+ public static QuerySeriesTypeResponse createErrorResponse(String groupId, String errorMsg) {
+ return new QuerySeriesTypeResponse(groupId, false, null, errorMsg);
+ }
+
+ public TSDataType getDataType() {
+ return dataType;
+ }
+
+ public void setDataType(TSDataType dataType) {
+ this.dataType = dataType;
+ }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/service/TSServiceClusterImpl.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/service/TSServiceClusterImpl.java
index ba4e59d..320398b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/service/TSServiceClusterImpl.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/service/TSServiceClusterImpl.java
@@ -34,11 +34,14 @@ import org.apache.iotdb.db.auth.AuthException;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.metadata.MManager;
+import org.apache.iotdb.db.metadata.Metadata;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.service.TSServiceImpl;
import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementReq;
import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementResp;
import org.apache.iotdb.service.rpc.thrift.TS_StatusCode;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.thrift.TException;
import org.slf4j.Logger;
@@ -244,6 +247,23 @@ public class TSServiceClusterImpl extends TSServiceImpl {
return queryMetadataExecutor.get().processMetadataInStringQuery();
}
+ @Override
+ protected Metadata getMetadata()
+ throws InterruptedException, ProcessorException, PathErrorException {
+ return queryMetadataExecutor.get().processMetadataQuery();
+ }
+
+ @Override
+ protected TSDataType getSeriesType(String path) throws PathErrorException, InterruptedException, ProcessorException {
+ return queryMetadataExecutor.get().processSeriesTypeQuery(path);
+ }
+
+ @Override
+ protected List<String> getPaths(String path)
+ throws PathErrorException, InterruptedException, ProcessorException {
+ return queryMetadataExecutor.get().processPathsQuery(path);
+ }
+
@OnlyForTest
public NonQueryExecutor getNonQueryExecutor() {
return nonQueryExecutor.get();
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/metadata/Metadata.java b/iotdb/src/main/java/org/apache/iotdb/db/metadata/Metadata.java
index 40fd0e4..15b6011 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/metadata/Metadata.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/metadata/Metadata.java
@@ -18,8 +18,11 @@
*/
package org.apache.iotdb.db.metadata;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
@@ -67,6 +70,57 @@ public class Metadata {
return deviceIdMap;
}
+ /**
+ * combine multiple metadatas
+ */
+ public static Metadata combineMetadatas(Metadata[] metadatas) {
+ Map<String, List<MeasurementSchema>> seriesMap = new HashMap<>();
+ Map<String, List<String>> deviceIdMap = new HashMap<>();
+ Map<String, Map<String, MeasurementSchema>> typeSchemaMap = new HashMap<>();
+
+ if (metadatas == null || metadatas.length == 0) {
+ return new Metadata(seriesMap, deviceIdMap);
+ }
+
+ for (int i = 0; i < metadatas.length; i++) {
+ Map<String, List<MeasurementSchema>> subSeriesMap = metadatas[i].seriesMap;
+ for (Entry<String, List<MeasurementSchema>> entry : subSeriesMap.entrySet()) {
+ Map<String, MeasurementSchema> map;
+ if (typeSchemaMap.containsKey(entry.getKey())) {
+ map = typeSchemaMap.get(entry.getKey());
+ } else {
+ map = new HashMap<>();
+ }
+ entry.getValue().forEach(schema -> map.put(schema.getMeasurementId(), schema));
+ if (!typeSchemaMap.containsKey(entry.getKey())) {
+ typeSchemaMap.put(entry.getKey(), map);
+ }
+ }
+
+ Map<String, List<String>> subDeviceIdMap = metadatas[i].deviceIdMap;
+ for (Entry<String, List<String>> entry : subDeviceIdMap.entrySet()) {
+ List<String> list;
+ if (deviceIdMap.containsKey(entry.getKey())) {
+ list = deviceIdMap.get(entry.getKey());
+ } else {
+ list = new ArrayList<>();
+ }
+ list.addAll(entry.getValue());
+ if (!deviceIdMap.containsKey(entry.getKey())) {
+ deviceIdMap.put(entry.getKey(), list);
+ }
+ }
+ }
+
+ for (Entry<String, Map<String, MeasurementSchema>> entry : typeSchemaMap.entrySet()) {
+ List<MeasurementSchema> list = new ArrayList<>();
+ list.addAll(entry.getValue().values());
+ seriesMap.put(entry.getKey(), list);
+ }
+
+ return new Metadata(seriesMap, deviceIdMap);
+ }
+
@Override
public String toString() {
return seriesMap.toString() + "\n" + deviceIdMap.toString();
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 79dbd53..299133c 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -81,6 +81,7 @@ import org.apache.iotdb.service.rpc.thrift.TS_SessionHandle;
import org.apache.iotdb.service.rpc.thrift.TS_Status;
import org.apache.iotdb.service.rpc.thrift.TS_StatusCode;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.thrift.TException;
@@ -308,14 +309,14 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
Metadata metadata;
try {
String column = req.getColumnPath();
- metadata = MManager.getInstance().getMetadata();
+ metadata = getMetadata();
Map<String, List<String>> deviceMap = metadata.getDeviceMap();
if (deviceMap == null || !deviceMap.containsKey(column)) {
resp.setColumnsList(new ArrayList<>());
} else {
resp.setColumnsList(deviceMap.get(column));
}
- } catch (PathErrorException e) {
+ } catch (PathErrorException | InterruptedException | ProcessorException e) {
LOGGER.error("cannot get delta object map", e);
status = getErrorStatus(String.format("Failed to fetch delta object map because: %s", e));
resp.setStatus(status);
@@ -330,8 +331,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
break;
case "COLUMN":
try {
- resp.setDataType(MManager.getInstance().getSeriesType(req.getColumnPath()).toString());
- } catch (PathErrorException e) {
+ resp.setDataType(getSeriesType(req.getColumnPath()).toString());
+ } catch (PathErrorException | InterruptedException | ProcessorException e) {
// TODO aggregate seriesPath e.g. last(root.ln.wf01.wt01.status)
// status = new TS_Status(TS_StatusCode.ERROR_STATUS);
// status.setErrorMessage(String.format("Failed to fetch %s's data type because: %s",
@@ -343,8 +344,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
break;
case "ALL_COLUMNS":
try {
- resp.setColumnsList(MManager.getInstance().getPaths(req.getColumnPath()));
- } catch (PathErrorException e) {
+ resp.setColumnsList(getPaths(req.getColumnPath()));
+ } catch (PathErrorException | InterruptedException | ProcessorException e) {
status = getErrorStatus(String
.format("Failed to fetch %s's all columns because: %s", req.getColumnPath(), e));
resp.setStatus(status);
@@ -382,6 +383,18 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
return MManager.getInstance().getMetadataInString();
}
+ protected Metadata getMetadata() throws PathErrorException, InterruptedException, ProcessorException {
+ return MManager.getInstance().getMetadata();
+ }
+
+ protected TSDataType getSeriesType(String path) throws PathErrorException, InterruptedException, ProcessorException {
+ return MManager.getInstance().getSeriesType(path);
+ }
+
+ protected List<String> getPaths(String path) throws PathErrorException, InterruptedException, ProcessorException {
+ return MManager.getInstance().getPaths(path);
+ }
+
/**
* Judge whether the statement is ADMIN COMMAND and if true, executeWithGlobalTimeFilter it.
*