You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by do...@apache.org on 2019/04/12 11:42:25 UTC
[incubator-iotdb] branch cluster updated: implement DELTA_OBEJECT,
COLUMN, ALL_COLUMNS for cluster metadata query (#143)
This is an automated email from the ASF dual-hosted git repository.
dope pushed a commit to branch cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/cluster by this push:
new ff6f262 implement DELTA_OBEJECT, COLUMN, ALL_COLUMNS for cluster metadata query (#143)
ff6f262 is described below
commit ff6f262fa4921dca63db53a4bc60856b9925409a
Author: XuYi <My...@users.noreply.github.com>
AuthorDate: Fri Apr 12 19:42:21 2019 +0800
implement DELTA_OBEJECT, COLUMN, ALL_COLUMNS for cluster metadata query (#143)
---
.../org/apache/iotdb/cluster/entity/Server.java | 6 +
.../cluster/qp/executor/QueryMetadataExecutor.java | 249 ++++++++++++---------
.../rpc/raft/impl/RaftNodeAsClientManager.java | 8 +
...essor.java => QueryMetadataAsyncProcessor.java} | 45 ++--
.../QueryMetadataInStringAsyncProcessor.java | 5 +-
...rocessor.java => QueryPathsAsyncProcessor.java} | 45 ++--
...sor.java => QuerySeriesTypeAsyncProcessor.java} | 47 ++--
.../processor/QueryTimeSeriesAsyncProcessor.java | 10 +-
.../rpc/raft/request/QueryMetadataRequest.java | 28 +++
.../rpc/raft/request/QueryPathsRequest.java | 36 +++
.../rpc/raft/request/QuerySeriesTypeRequest.java | 35 +++
.../rpc/raft/response/QueryMetadataResponse.java | 47 ++++
.../rpc/raft/response/QueryPathsResponse.java | 50 +++++
.../rpc/raft/response/QuerySeriesTypeResponse.java | 50 +++++
.../cluster/rpc/service/TSServiceClusterImpl.java | 19 ++
.../org/apache/iotdb/db/metadata/Metadata.java | 121 +++++++++-
.../org/apache/iotdb/db/service/TSServiceImpl.java | 25 ++-
.../org/apache/iotdb/db/metadata/MetadataTest.java | 93 ++++++++
18 files changed, 745 insertions(+), 174 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java b/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
index be4a74a..ad1b4d6 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
@@ -32,7 +32,10 @@ import org.apache.iotdb.cluster.entity.raft.MetadataRaftHolder;
import org.apache.iotdb.cluster.rpc.raft.impl.RaftNodeAsClientManager;
import org.apache.iotdb.cluster.rpc.raft.processor.DataGroupNonQueryAsyncProcessor;
import org.apache.iotdb.cluster.rpc.raft.processor.MetaGroupNonQueryAsyncProcessor;
+import org.apache.iotdb.cluster.rpc.raft.processor.QueryMetadataAsyncProcessor;
import org.apache.iotdb.cluster.rpc.raft.processor.QueryMetadataInStringAsyncProcessor;
+import org.apache.iotdb.cluster.rpc.raft.processor.QueryPathsAsyncProcessor;
+import org.apache.iotdb.cluster.rpc.raft.processor.QuerySeriesTypeAsyncProcessor;
import org.apache.iotdb.cluster.rpc.raft.processor.QueryTimeSeriesAsyncProcessor;
import org.apache.iotdb.cluster.utils.RaftUtils;
import org.apache.iotdb.cluster.utils.hash.PhysicalNode;
@@ -93,6 +96,9 @@ public class Server {
rpcServer.registerUserProcessor(new MetaGroupNonQueryAsyncProcessor());
rpcServer.registerUserProcessor(new QueryTimeSeriesAsyncProcessor());
rpcServer.registerUserProcessor(new QueryMetadataInStringAsyncProcessor());
+ rpcServer.registerUserProcessor(new QueryMetadataAsyncProcessor());
+ rpcServer.registerUserProcessor(new QuerySeriesTypeAsyncProcessor());
+ rpcServer.registerUserProcessor(new QueryPathsAsyncProcessor());
metadataHolder = new MetadataRaftHolder(peerIds, serverId, rpcServer, true);
metadataHolder.init();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
index 8dc20b3..1dfbc7e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
@@ -35,16 +35,24 @@ import org.apache.iotdb.cluster.entity.raft.RaftService;
import org.apache.iotdb.cluster.exception.RaftConnectionException;
import org.apache.iotdb.cluster.qp.ClusterQPExecutor;
import org.apache.iotdb.cluster.rpc.raft.request.QueryMetadataInStringRequest;
+import org.apache.iotdb.cluster.rpc.raft.request.QueryMetadataRequest;
+import org.apache.iotdb.cluster.rpc.raft.request.QueryPathsRequest;
+import org.apache.iotdb.cluster.rpc.raft.request.QuerySeriesTypeRequest;
import org.apache.iotdb.cluster.rpc.raft.request.QueryStorageGroupRequest;
import org.apache.iotdb.cluster.rpc.raft.request.QueryTimeSeriesRequest;
import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse;
import org.apache.iotdb.cluster.rpc.raft.response.QueryMetadataInStringResponse;
+import org.apache.iotdb.cluster.rpc.raft.response.QueryMetadataResponse;
+import org.apache.iotdb.cluster.rpc.raft.response.QueryPathsResponse;
+import org.apache.iotdb.cluster.rpc.raft.response.QuerySeriesTypeResponse;
import org.apache.iotdb.cluster.rpc.raft.response.QueryStorageGroupResponse;
import org.apache.iotdb.cluster.rpc.raft.response.QueryTimeSeriesResponse;
import org.apache.iotdb.cluster.utils.RaftUtils;
import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.exception.ProcessorException;
import org.apache.iotdb.db.metadata.MManager;
+import org.apache.iotdb.db.metadata.Metadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -109,7 +117,7 @@ public class QueryMetadataExecutor extends ClusterQPExecutor {
}
return paths;
}
-
+
/**
* Handle query timeseries in one data group
*
@@ -122,18 +130,18 @@ public class QueryMetadataExecutor extends ClusterQPExecutor {
SingleQPTask task = new SingleQPTask(false, request);
LOGGER.debug("Execute show timeseries {} statement for group {}.", pathList, groupId);
+ PeerId holder;
/** Check if the plan can be executed locally. **/
if (canHandleQueryByGroupId(groupId)) {
- LOGGER.debug("Execute show timeseries {} statement locally for group {}.", pathList, groupId);
- res.addAll(queryTimeSeriesLocally(pathList, groupId, task));
+ LOGGER.debug("Execute show timeseries {} statement locally for group {} by sending request to local node.", pathList, groupId);
+ holder = this.server.getServerId();
} else {
- try {
- PeerId holder = RaftUtils.getRandomPeerID(groupId);
- res.addAll(queryTimeSeries(task, holder));
- } catch (RaftConnectionException e) {
- LOGGER.error(e.getMessage());
- throw new ProcessorException("Raft connection occurs error.", e);
- }
+ holder = RaftUtils.getRandomPeerID(groupId);
+ }
+ try {
+ res.addAll(queryTimeSeries(task, holder));
+ } catch (RaftConnectionException e) {
+ throw new ProcessorException("Raft connection occurs error.", e);
}
}
@@ -150,18 +158,18 @@ public class QueryMetadataExecutor extends ClusterQPExecutor {
taskList.add(task);
LOGGER.debug("Execute show metadata in string statement for group {}.", groupId);
+ PeerId holder;
/** Check if the plan can be executed locally. **/
if (canHandleQueryByGroupId(groupId)) {
- LOGGER.debug("Execute show metadata in string statement locally for group {}.", groupId);
- asyncQueryMetadataInStringLocally(groupId, task);
+ LOGGER.debug("Execute show metadata in string statement locally for group {} by sending request to local node.", groupId);
+ holder = this.server.getServerId();
} else {
- try {
- PeerId holder = RaftUtils.getRandomPeerID(groupId);
- asyncSendNonQueryTask(task, holder, 0);
- } catch (RaftConnectionException e) {
- LOGGER.error(e.getMessage());
- throw new ProcessorException("Raft connection occurs error.", e);
- }
+ holder = RaftUtils.getRandomPeerID(groupId);
+ }
+ try {
+ asyncSendNonQueryTask(task, holder, 0);
+ } catch (RaftConnectionException e) {
+ throw new ProcessorException("Raft connection occurs error.", e);
}
}
for (int i = 0; i < taskList.size(); i++) {
@@ -169,7 +177,6 @@ public class QueryMetadataExecutor extends ClusterQPExecutor {
task.await();
BasicResponse response = task.getResponse();
if (response == null || !response.isSuccess()) {
- LOGGER.error("Execute show timeseries statement false.");
throw new ProcessorException();
}
metadataList.add(((QueryMetadataInStringResponse)response).getMetadata());
@@ -177,61 +184,124 @@ public class QueryMetadataExecutor extends ClusterQPExecutor {
return combineMetadataInStringList(metadataList);
}
- /**
- * Handle "show timeseries <path>" statement
- *
- * @param pathList column path
- */
- private List<List<String>> queryTimeSeriesLocally(List<String> pathList, String groupId,
- SingleQPTask task)
- throws InterruptedException, ProcessorException {
- final byte[] reqContext = RaftUtils.createRaftRequestContext();
- DataPartitionRaftHolder dataPartitionHolder = RaftUtils.getDataPartitonRaftHolder(groupId);
+ public Metadata processMetadataQuery()
+ throws InterruptedException, ProcessorException, PathErrorException {
+ Set<String> groupIdSet = router.getAllGroupId();
- /** Check consistency level**/
- if (readMetadataConsistencyLevel == ClusterConstant.WEAK_CONSISTENCY_LEVEL) {
- QueryTimeSeriesResponse response = QueryTimeSeriesResponse
- .createEmptyResponse(groupId);
+ Metadata[] metadatas = new Metadata[groupIdSet.size()];
+ List<SingleQPTask> taskList = new ArrayList<>();
+ for (String groupId : groupIdSet) {
+ QueryMetadataRequest request = new QueryMetadataRequest(groupId,
+ readMetadataConsistencyLevel);
+ SingleQPTask task = new SingleQPTask(false, request);
+ taskList.add(task);
+
+ LOGGER.debug("Execute query metadata statement for group {}.", groupId);
+ PeerId holder;
+ /** Check if the plan can be executed locally. **/
+ if (canHandleQueryByGroupId(groupId)) {
+ LOGGER.debug("Execute query metadata statement locally for group {} by sending request to local node.", groupId);
+ holder = this.server.getServerId();
+ } else {
+ holder = RaftUtils.getRandomPeerID(groupId);
+ }
try {
- for (String path : pathList) {
- response.addTimeSeries(mManager.getShowTimeseriesPath(path));
+ asyncSendNonQueryTask(task, holder, 0);
+ } catch (RaftConnectionException e) {
+ throw new ProcessorException("Raft connection occurs error.", e);
+ }
+ }
+ for (int i = 0; i < taskList.size(); i++) {
+ SingleQPTask task = taskList.get(i);
+ task.await();
+ BasicResponse response = task.getResponse();
+ if (response == null || !response.isSuccess()) {
+ String errorMessage = "response is null";
+ if (response != null && response.getErrorMsg() != null) {
+ errorMessage = response.getErrorMsg();
}
- } catch (final PathErrorException e) {
- response = QueryTimeSeriesResponse.createErrorResponse(groupId, e.getMessage());
+ throw new ProcessorException("Execute query metadata statement false because " + errorMessage);
}
- task.run(response);
+ metadatas[i] = ((QueryMetadataResponse)response).getMetadata();
+ }
+ return Metadata.combineMetadatas(metadatas);
+ }
+
+ public TSDataType processSeriesTypeQuery(String path)
+ throws InterruptedException, ProcessorException, PathErrorException {
+ TSDataType dataType = null;
+ List<String> storageGroupList = mManager.getAllFileNamesByPath(path);
+ if (storageGroupList.size() != 1) {
+ throw new PathErrorException("path " + path + " is not valid.");
} else {
- ((RaftService) dataPartitionHolder.getService()).getNode()
- .readIndex(reqContext, new ReadIndexClosure() {
+ String groupId = getGroupIdBySG(storageGroupList.get(0));
+ QuerySeriesTypeRequest request = new QuerySeriesTypeRequest(groupId,
+ readMetadataConsistencyLevel, path);
+ SingleQPTask task = new SingleQPTask(false, request);
- @Override
- public void run(Status status, long index, byte[] reqCtx) {
- QueryTimeSeriesResponse response = QueryTimeSeriesResponse
- .createEmptyResponse(groupId);
- if (status.isOk()) {
- try {
- LOGGER.debug("start to read");
- for (String path : pathList) {
- response.addTimeSeries(mManager.getShowTimeseriesPath(path));
- }
- } catch (final PathErrorException e) {
- response = QueryTimeSeriesResponse.createErrorResponse(groupId, e.getMessage());
- }
- } else {
- response = QueryTimeSeriesResponse
- .createErrorResponse(groupId, status.getErrorMsg());
- }
- task.run(response);
- }
- });
+ LOGGER.debug("Execute get series type for {} statement for group {}.", path, groupId);
+ PeerId holder;
+ /** Check if the plan can be executed locally. **/
+ if (canHandleQueryByGroupId(groupId)) {
+ LOGGER.debug("Execute get series type for {} statement locally for group {} by sending request to local node.", path, groupId);
+ holder = this.server.getServerId();
+ } else {
+ holder = RaftUtils.getRandomPeerID(groupId);
+ }
+ try {
+ dataType = querySeriesType(task, holder);
+ } catch (RaftConnectionException e) {
+ throw new ProcessorException("Raft connection occurs error.", e);
+ }
}
- task.await();
- QueryTimeSeriesResponse response = (QueryTimeSeriesResponse) task.getResponse();
- if (response == null || !response.isSuccess()) {
- LOGGER.error("Execute show timeseries {} statement false.", pathList);
- throw new ProcessorException();
+ return dataType;
+ }
+
+ /**
+ * Handle show timeseries <path> statement
+ */
+ public List<String> processPathsQuery(String path)
+ throws InterruptedException, PathErrorException, ProcessorException {
+ List<String> res = new ArrayList<>();
+ List<String> storageGroupList = mManager.getAllFileNamesByPath(path);
+ if (storageGroupList.isEmpty()) {
+ return new ArrayList<>();
+ } else {
+ Map<String, Set<String>> groupIdSGMap = classifySGByGroupId(storageGroupList);
+ for (Entry<String, Set<String>> entry : groupIdSGMap.entrySet()) {
+ List<String> paths = getSubQueryPaths(entry.getValue(), path);
+ String groupId = entry.getKey();
+ handlePathsQuery(groupId, paths, res);
+ }
+ }
+ return res;
+ }
+
+ /**
+ * Handle query timeseries in one data group
+ *
+ * @param groupId data group id
+ */
+ private void handlePathsQuery(String groupId, List<String> pathList, List<String> res)
+ throws ProcessorException, InterruptedException {
+ QueryPathsRequest request = new QueryPathsRequest(groupId,
+ readMetadataConsistencyLevel, pathList);
+ SingleQPTask task = new SingleQPTask(false, request);
+
+ LOGGER.debug("Execute get paths for {} statement for group {}.", pathList, groupId);
+ PeerId holder;
+ /** Check if the plan can be executed locally. **/
+ if (canHandleQueryByGroupId(groupId)) {
+ LOGGER.debug("Execute get paths for {} statement locally for group {} by sending request to local node.", pathList, groupId);
+ holder = this.server.getServerId();
+ } else {
+ holder = RaftUtils.getRandomPeerID(groupId);
+ }
+ try {
+ res.addAll(queryPaths(task, holder));
+ } catch (RaftConnectionException e) {
+ throw new ProcessorException("Raft connection occurs error.", e);
}
- return response.getTimeSeries();
}
private List<List<String>> queryTimeSeries(SingleQPTask task, PeerId leader)
@@ -241,6 +311,13 @@ public class QueryMetadataExecutor extends ClusterQPExecutor {
: ((QueryTimeSeriesResponse) response).getTimeSeries();
}
+ private TSDataType querySeriesType(SingleQPTask task, PeerId leader)
+ throws InterruptedException, RaftConnectionException {
+ BasicResponse response = asyncHandleNonQueryTaskGetRes(task, leader, 0);
+ return response == null ? null
+ : ((QuerySeriesTypeResponse) response).getDataType();
+ }
+
/**
* Handle "show storage group" statement locally
*
@@ -286,39 +363,11 @@ public class QueryMetadataExecutor extends ClusterQPExecutor {
return ((QueryStorageGroupResponse) task.getResponse()).getStorageGroups();
}
- /**
- * Handle "show timeseries" statement
- */
- private void asyncQueryMetadataInStringLocally(String groupId, SingleQPTask task) {
- final byte[] reqContext = RaftUtils.createRaftRequestContext();
- DataPartitionRaftHolder dataPartitionHolder = (DataPartitionRaftHolder) server
- .getDataPartitionHolder(groupId);
- if (readMetadataConsistencyLevel == ClusterConstant.WEAK_CONSISTENCY_LEVEL) {
- QueryMetadataInStringResponse response = QueryMetadataInStringResponse
- .createSuccessResponse(groupId, mManager.getMetadataInString());
- response.addResult(true);
- task.run(response);
- } else {
- ((RaftService) dataPartitionHolder.getService()).getNode()
- .readIndex(reqContext, new ReadIndexClosure() {
-
- @Override
- public void run(Status status, long index, byte[] reqCtx) {
- QueryMetadataInStringResponse response;
- if (status.isOk()) {
- LOGGER.debug("start to read");
- response = QueryMetadataInStringResponse
- .createSuccessResponse(groupId, mManager.getMetadataInString());
- response.addResult(true);
- } else {
- response = QueryMetadataInStringResponse
- .createErrorResponse(groupId, status.getErrorMsg());
- response.addResult(false);
- }
- task.run(response);
- }
- });
- }
+ private List<String> queryPaths(SingleQPTask task, PeerId leader)
+ throws InterruptedException, RaftConnectionException {
+ BasicResponse response = asyncHandleNonQueryTaskGetRes(task, leader, 0);
+ return response == null ? new ArrayList<>()
+ : ((QueryPathsResponse) response).getPaths();
}
/**
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
index 4d947df..6ea12e4 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
@@ -66,6 +66,8 @@ public class RaftNodeAsClientManager {
*/
private final RaftNodeAsClient client = new RaftNodeAsClient();
+ private boolean clientInited = false;
+
/**
* Number of clients in use
*/
@@ -90,6 +92,10 @@ public class RaftNodeAsClientManager {
*/
public RaftNodeAsClient getRaftNodeAsClient() {
try {
+ if (!clientInited) {
+ client.init();
+ }
+
numLock.lock();
if (validClientNum.get() < MAX_VALID_CLIENT_NUM) {
validClientNum.incrementAndGet();
@@ -165,6 +171,7 @@ public class RaftNodeAsClientManager {
private void init(){
boltClientService = new BoltCliClientService();
boltClientService.init(new CliOptions());
+ clientInited = true;
}
@Override
@@ -229,6 +236,7 @@ public class RaftNodeAsClientManager {
@Override
public void shutdown() {
boltClientService.shutdown();
+ clientInited = false;
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryMetadataInStringAsyncProcessor.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryMetadataAsyncProcessor.java
similarity index 60%
copy from cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryMetadataInStringAsyncProcessor.java
copy to cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryMetadataAsyncProcessor.java
index 5c81756..176fa33 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryMetadataInStringAsyncProcessor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryMetadataAsyncProcessor.java
@@ -25,41 +25,54 @@ import com.alipay.sofa.jraft.closure.ReadIndexClosure;
import org.apache.iotdb.cluster.config.ClusterConstant;
import org.apache.iotdb.cluster.entity.raft.DataPartitionRaftHolder;
import org.apache.iotdb.cluster.entity.raft.RaftService;
-import org.apache.iotdb.cluster.rpc.raft.request.QueryMetadataInStringRequest;
-import org.apache.iotdb.cluster.rpc.raft.response.QueryMetadataInStringResponse;
+import org.apache.iotdb.cluster.rpc.raft.request.QueryMetadataRequest;
+import org.apache.iotdb.cluster.rpc.raft.response.QueryMetadataResponse;
import org.apache.iotdb.cluster.utils.RaftUtils;
+import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.metadata.MManager;
-public class QueryMetadataInStringAsyncProcessor extends
- BasicAsyncUserProcessor<QueryMetadataInStringRequest> {
+public class QueryMetadataAsyncProcessor extends
+ BasicAsyncUserProcessor<QueryMetadataRequest> {
private MManager mManager = MManager.getInstance();
@Override
public void handleRequest(BizContext bizContext, AsyncContext asyncContext,
- QueryMetadataInStringRequest request) {
+ QueryMetadataRequest request) {
String groupId = request.getGroupID();
- final byte[] reqContext = RaftUtils.createRaftRequestContext();
- DataPartitionRaftHolder dataPartitionHolder = RaftUtils.getDataPartitonRaftHolder(groupId);
if (request.getReadConsistencyLevel() == ClusterConstant.WEAK_CONSISTENCY_LEVEL) {
- QueryMetadataInStringResponse response = QueryMetadataInStringResponse
- .createSuccessResponse(groupId, mManager.getMetadataInString());
- response.addResult(true);
+ QueryMetadataResponse response = null;
+ try {
+ response = QueryMetadataResponse
+ .createSuccessResponse(groupId, mManager.getMetadata());
+ response.addResult(true);
+ } catch (PathErrorException e) {
+ response = QueryMetadataResponse.createErrorResponse(groupId, e.getMessage());
+ response.addResult(false);
+ }
asyncContext.sendResponse(response);
} else {
+ final byte[] reqContext = RaftUtils.createRaftRequestContext();
+ DataPartitionRaftHolder dataPartitionHolder = RaftUtils.getDataPartitonRaftHolder(groupId);
+
((RaftService) dataPartitionHolder.getService()).getNode()
.readIndex(reqContext, new ReadIndexClosure() {
@Override
public void run(Status status, long index, byte[] reqCtx) {
- QueryMetadataInStringResponse response;
+ QueryMetadataResponse response;
if (status.isOk()) {
- response = QueryMetadataInStringResponse
- .createSuccessResponse(groupId, mManager.getMetadataInString());
- response.addResult(true);
+ try {
+ response = QueryMetadataResponse
+ .createSuccessResponse(groupId, mManager.getMetadata());
+ response.addResult(true);
+ } catch (PathErrorException e) {
+ response = QueryMetadataResponse.createErrorResponse(groupId, e.getMessage());
+ response.addResult(false);
+ }
} else {
- response = QueryMetadataInStringResponse
+ response = QueryMetadataResponse
.createErrorResponse(groupId, status.getErrorMsg());
response.addResult(false);
}
@@ -71,6 +84,6 @@ public class QueryMetadataInStringAsyncProcessor extends
@Override
public String interest() {
- return QueryMetadataInStringRequest.class.getName();
+ return QueryMetadataRequest.class.getName();
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryMetadataInStringAsyncProcessor.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryMetadataInStringAsyncProcessor.java
index 5c81756..b80f4ae 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryMetadataInStringAsyncProcessor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryMetadataInStringAsyncProcessor.java
@@ -39,8 +39,6 @@ public class QueryMetadataInStringAsyncProcessor extends
public void handleRequest(BizContext bizContext, AsyncContext asyncContext,
QueryMetadataInStringRequest request) {
String groupId = request.getGroupID();
- final byte[] reqContext = RaftUtils.createRaftRequestContext();
- DataPartitionRaftHolder dataPartitionHolder = RaftUtils.getDataPartitonRaftHolder(groupId);
if (request.getReadConsistencyLevel() == ClusterConstant.WEAK_CONSISTENCY_LEVEL) {
QueryMetadataInStringResponse response = QueryMetadataInStringResponse
@@ -48,6 +46,9 @@ public class QueryMetadataInStringAsyncProcessor extends
response.addResult(true);
asyncContext.sendResponse(response);
} else {
+ final byte[] reqContext = RaftUtils.createRaftRequestContext();
+ DataPartitionRaftHolder dataPartitionHolder = RaftUtils.getDataPartitonRaftHolder(groupId);
+
((RaftService) dataPartitionHolder.getService()).getNode()
.readIndex(reqContext, new ReadIndexClosure() {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryTimeSeriesAsyncProcessor.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryPathsAsyncProcessor.java
similarity index 64%
copy from cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryTimeSeriesAsyncProcessor.java
copy to cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryPathsAsyncProcessor.java
index a800302..f54aba0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryTimeSeriesAsyncProcessor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryPathsAsyncProcessor.java
@@ -25,50 +25,55 @@ import com.alipay.sofa.jraft.closure.ReadIndexClosure;
import org.apache.iotdb.cluster.config.ClusterConstant;
import org.apache.iotdb.cluster.entity.raft.DataPartitionRaftHolder;
import org.apache.iotdb.cluster.entity.raft.RaftService;
-import org.apache.iotdb.cluster.rpc.raft.request.QueryTimeSeriesRequest;
-import org.apache.iotdb.cluster.rpc.raft.response.QueryTimeSeriesResponse;
+import org.apache.iotdb.cluster.rpc.raft.request.QueryPathsRequest;
+import org.apache.iotdb.cluster.rpc.raft.response.QueryPathsResponse;
import org.apache.iotdb.cluster.utils.RaftUtils;
import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.metadata.MManager;
-
-public class QueryTimeSeriesAsyncProcessor extends BasicAsyncUserProcessor<QueryTimeSeriesRequest> {
+public class QueryPathsAsyncProcessor extends BasicAsyncUserProcessor<QueryPathsRequest> {
private MManager mManager = MManager.getInstance();
@Override
public void handleRequest(BizContext bizContext, AsyncContext asyncContext,
- QueryTimeSeriesRequest request) {
+ QueryPathsRequest request) {
String groupId = request.getGroupID();
- final byte[] reqContext = RaftUtils.createRaftRequestContext();
- DataPartitionRaftHolder dataPartitionHolder = RaftUtils.getDataPartitonRaftHolder(groupId);
if (request.getReadConsistencyLevel() == ClusterConstant.WEAK_CONSISTENCY_LEVEL) {
- QueryTimeSeriesResponse response = QueryTimeSeriesResponse
+ QueryPathsResponse response = QueryPathsResponse
.createEmptyResponse(groupId);
try {
- queryTimeSeries(request, response);
+ queryPaths(request, response);
+ response.addResult(true);
} catch (final PathErrorException e) {
- response = QueryTimeSeriesResponse.createErrorResponse(groupId, e.getMessage());
+ response = QueryPathsResponse.createErrorResponse(groupId, e.getMessage());
+ response.addResult(false);
}
asyncContext.sendResponse(response);
} else {
+ final byte[] reqContext = RaftUtils.createRaftRequestContext();
+ DataPartitionRaftHolder dataPartitionHolder = RaftUtils.getDataPartitonRaftHolder(groupId);
+
((RaftService) dataPartitionHolder.getService()).getNode()
.readIndex(reqContext, new ReadIndexClosure() {
@Override
public void run(Status status, long index, byte[] reqCtx) {
- QueryTimeSeriesResponse response = QueryTimeSeriesResponse
+ QueryPathsResponse response = QueryPathsResponse
.createEmptyResponse(groupId);
if (status.isOk()) {
try {
- queryTimeSeries(request, response);
+ queryPaths(request, response);
+ response.addResult(true);
} catch (final PathErrorException e) {
- response = QueryTimeSeriesResponse.createErrorResponse(groupId, e.getMessage());
+ response = QueryPathsResponse.createErrorResponse(groupId, e.getMessage());
+ response.addResult(false);
}
} else {
- response = QueryTimeSeriesResponse
+ response = QueryPathsResponse
.createErrorResponse(groupId, status.getErrorMsg());
+ response.addResult(false);
}
asyncContext.sendResponse(response);
}
@@ -77,17 +82,17 @@ public class QueryTimeSeriesAsyncProcessor extends BasicAsyncUserProcessor<Query
}
/**
- * Query timeseries
+ * Query paths
*/
- private void queryTimeSeries(QueryTimeSeriesRequest queryMetadataRequest,
- QueryTimeSeriesResponse response) throws PathErrorException {
- for (String path : queryMetadataRequest.getPath()) {
- response.addTimeSeries(mManager.getShowTimeseriesPath(path));
+ private void queryPaths(QueryPathsRequest request,
+ QueryPathsResponse response) throws PathErrorException {
+ for (String path : request.getPath()) {
+ response.addPaths(mManager.getPaths(path));
}
}
@Override
public String interest() {
- return QueryTimeSeriesRequest.class.getName();
+ return QueryPathsRequest.class.getName();
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryTimeSeriesAsyncProcessor.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QuerySeriesTypeAsyncProcessor.java
similarity index 64%
copy from cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryTimeSeriesAsyncProcessor.java
copy to cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QuerySeriesTypeAsyncProcessor.java
index a800302..f0a4fc6 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryTimeSeriesAsyncProcessor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QuerySeriesTypeAsyncProcessor.java
@@ -25,50 +25,53 @@ import com.alipay.sofa.jraft.closure.ReadIndexClosure;
import org.apache.iotdb.cluster.config.ClusterConstant;
import org.apache.iotdb.cluster.entity.raft.DataPartitionRaftHolder;
import org.apache.iotdb.cluster.entity.raft.RaftService;
-import org.apache.iotdb.cluster.rpc.raft.request.QueryTimeSeriesRequest;
-import org.apache.iotdb.cluster.rpc.raft.response.QueryTimeSeriesResponse;
+import org.apache.iotdb.cluster.rpc.raft.request.QuerySeriesTypeRequest;
+import org.apache.iotdb.cluster.rpc.raft.response.QuerySeriesTypeResponse;
import org.apache.iotdb.cluster.utils.RaftUtils;
import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.metadata.MManager;
-
-public class QueryTimeSeriesAsyncProcessor extends BasicAsyncUserProcessor<QueryTimeSeriesRequest> {
+public class QuerySeriesTypeAsyncProcessor extends BasicAsyncUserProcessor<QuerySeriesTypeRequest> {
private MManager mManager = MManager.getInstance();
@Override
public void handleRequest(BizContext bizContext, AsyncContext asyncContext,
- QueryTimeSeriesRequest request) {
+ QuerySeriesTypeRequest request) {
String groupId = request.getGroupID();
- final byte[] reqContext = RaftUtils.createRaftRequestContext();
- DataPartitionRaftHolder dataPartitionHolder = RaftUtils.getDataPartitonRaftHolder(groupId);
if (request.getReadConsistencyLevel() == ClusterConstant.WEAK_CONSISTENCY_LEVEL) {
- QueryTimeSeriesResponse response = QueryTimeSeriesResponse
- .createEmptyResponse(groupId);
+ QuerySeriesTypeResponse response;
try {
- queryTimeSeries(request, response);
+ response = QuerySeriesTypeResponse.createSuccessResponse(groupId, mManager.getSeriesType(request.getPath()));
+ response.addResult(true);
} catch (final PathErrorException e) {
- response = QueryTimeSeriesResponse.createErrorResponse(groupId, e.getMessage());
+ response = QuerySeriesTypeResponse.createErrorResponse(groupId, e.getMessage());
+ response.addResult(false);
}
asyncContext.sendResponse(response);
} else {
+ final byte[] reqContext = RaftUtils.createRaftRequestContext();
+ DataPartitionRaftHolder dataPartitionHolder = RaftUtils.getDataPartitonRaftHolder(groupId);
+
((RaftService) dataPartitionHolder.getService()).getNode()
.readIndex(reqContext, new ReadIndexClosure() {
@Override
public void run(Status status, long index, byte[] reqCtx) {
- QueryTimeSeriesResponse response = QueryTimeSeriesResponse
- .createEmptyResponse(groupId);
+ QuerySeriesTypeResponse response;
if (status.isOk()) {
try {
- queryTimeSeries(request, response);
+ response = QuerySeriesTypeResponse.createSuccessResponse(groupId, mManager.getSeriesType(request.getPath()));
+ response.addResult(true);
} catch (final PathErrorException e) {
- response = QueryTimeSeriesResponse.createErrorResponse(groupId, e.getMessage());
+ response = QuerySeriesTypeResponse.createErrorResponse(groupId, e.getMessage());
+ response.addResult(false);
}
} else {
- response = QueryTimeSeriesResponse
+ response = QuerySeriesTypeResponse
.createErrorResponse(groupId, status.getErrorMsg());
+ response.addResult(false);
}
asyncContext.sendResponse(response);
}
@@ -76,18 +79,8 @@ public class QueryTimeSeriesAsyncProcessor extends BasicAsyncUserProcessor<Query
}
}
- /**
- * Query timeseries
- */
- private void queryTimeSeries(QueryTimeSeriesRequest queryMetadataRequest,
- QueryTimeSeriesResponse response) throws PathErrorException {
- for (String path : queryMetadataRequest.getPath()) {
- response.addTimeSeries(mManager.getShowTimeseriesPath(path));
- }
- }
-
@Override
public String interest() {
- return QueryTimeSeriesRequest.class.getName();
+ return QuerySeriesTypeRequest.class.getName();
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryTimeSeriesAsyncProcessor.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryTimeSeriesAsyncProcessor.java
index a800302..c41fdcf 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryTimeSeriesAsyncProcessor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryTimeSeriesAsyncProcessor.java
@@ -40,19 +40,22 @@ public class QueryTimeSeriesAsyncProcessor extends BasicAsyncUserProcessor<Query
public void handleRequest(BizContext bizContext, AsyncContext asyncContext,
QueryTimeSeriesRequest request) {
String groupId = request.getGroupID();
- final byte[] reqContext = RaftUtils.createRaftRequestContext();
- DataPartitionRaftHolder dataPartitionHolder = RaftUtils.getDataPartitonRaftHolder(groupId);
if (request.getReadConsistencyLevel() == ClusterConstant.WEAK_CONSISTENCY_LEVEL) {
QueryTimeSeriesResponse response = QueryTimeSeriesResponse
.createEmptyResponse(groupId);
try {
queryTimeSeries(request, response);
+ response.addResult(true);
} catch (final PathErrorException e) {
response = QueryTimeSeriesResponse.createErrorResponse(groupId, e.getMessage());
+ response.addResult(false);
}
asyncContext.sendResponse(response);
} else {
+ final byte[] reqContext = RaftUtils.createRaftRequestContext();
+ DataPartitionRaftHolder dataPartitionHolder = RaftUtils.getDataPartitonRaftHolder(groupId);
+
((RaftService) dataPartitionHolder.getService()).getNode()
.readIndex(reqContext, new ReadIndexClosure() {
@@ -63,12 +66,15 @@ public class QueryTimeSeriesAsyncProcessor extends BasicAsyncUserProcessor<Query
if (status.isOk()) {
try {
queryTimeSeries(request, response);
+ response.addResult(true);
} catch (final PathErrorException e) {
response = QueryTimeSeriesResponse.createErrorResponse(groupId, e.getMessage());
+ response.addResult(false);
}
} else {
response = QueryTimeSeriesResponse
.createErrorResponse(groupId, status.getErrorMsg());
+ response.addResult(false);
}
asyncContext.sendResponse(response);
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QueryMetadataRequest.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QueryMetadataRequest.java
new file mode 100644
index 0000000..2628fb6
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QueryMetadataRequest.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.rpc.raft.request;
+
+import java.io.Serializable;
+
+public class QueryMetadataRequest extends BasicQueryRequest implements Serializable {
+
+ public QueryMetadataRequest(String groupID, int readConsistencyLevel) {
+ super(groupID, readConsistencyLevel);
+ }
+}
\ No newline at end of file
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QueryPathsRequest.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QueryPathsRequest.java
new file mode 100644
index 0000000..2c600f4
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QueryPathsRequest.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.rpc.raft.request;
+
+import java.io.Serializable;
+import java.util.List;
+
+public class QueryPathsRequest extends BasicQueryRequest implements Serializable {
+
+ private List<String> path;
+
+ public QueryPathsRequest(String groupID, int readConsistencyLevel, List<String> path) {
+ super(groupID, readConsistencyLevel);
+ this.path = path;
+ }
+
+ public List<String> getPath() {
+ return path;
+ }
+}
\ No newline at end of file
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QuerySeriesTypeRequest.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QuerySeriesTypeRequest.java
new file mode 100644
index 0000000..c486576
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QuerySeriesTypeRequest.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.rpc.raft.request;
+
+import java.io.Serializable;
+
+public class QuerySeriesTypeRequest extends BasicQueryRequest implements Serializable {
+
+ private String path;
+
+ public QuerySeriesTypeRequest(String groupID, int readConsistencyLevel, String path) {
+ super(groupID, readConsistencyLevel);
+ this.path = path;
+ }
+
+ public String getPath() {
+ return path;
+ }
+}
\ No newline at end of file
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/QueryMetadataResponse.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/QueryMetadataResponse.java
new file mode 100644
index 0000000..6c21798
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/QueryMetadataResponse.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.rpc.raft.response;
+
+import org.apache.iotdb.db.metadata.Metadata;
+
+public class QueryMetadataResponse extends BasicResponse {
+
+ private Metadata metadata;
+
+ private QueryMetadataResponse(String groupId, boolean redirected, String leaderStr,
+ String errorMsg) {
+ super(groupId, redirected, leaderStr, errorMsg);
+ }
+
+ public static QueryMetadataResponse createSuccessResponse(String groupId,
+ Metadata metadata) {
+ QueryMetadataResponse response = new QueryMetadataResponse(groupId, false, null,
+ null);
+ response.metadata = metadata;
+ return response;
+ }
+
+ public static QueryMetadataResponse createErrorResponse(String groupId, String errorMsg) {
+ return new QueryMetadataResponse(groupId, false, null, errorMsg);
+ }
+
+ public Metadata getMetadata() {
+ return metadata;
+ }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/QueryPathsResponse.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/QueryPathsResponse.java
new file mode 100644
index 0000000..29d659a
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/QueryPathsResponse.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.rpc.raft.response;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class QueryPathsResponse extends BasicResponse {
+
+ private List<String> paths;
+
+ private QueryPathsResponse(String groupId, boolean redirected, boolean success, String leaderStr, String errorMsg) {
+ super(groupId, redirected, leaderStr, errorMsg);
+ this.addResult(success);
+ paths = new ArrayList<>();
+ }
+
+ public static QueryPathsResponse createEmptyResponse(String groupId){
+ return new QueryPathsResponse(groupId, false, true, null, null);
+ }
+
+ public static QueryPathsResponse createErrorResponse(String groupId, String errorMsg) {
+ return new QueryPathsResponse(groupId, false, false, null, errorMsg);
+ }
+
+ public List<String> getPaths() {
+ return paths;
+ }
+
+ public void addPaths(List<String> paths){
+ this.paths.addAll(paths);
+ }
+
+}
\ No newline at end of file
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/QuerySeriesTypeResponse.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/QuerySeriesTypeResponse.java
new file mode 100644
index 0000000..e86e108
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/QuerySeriesTypeResponse.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.rpc.raft.response;
+
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+public class QuerySeriesTypeResponse extends BasicResponse {
+
+ private TSDataType dataType;
+
+ private QuerySeriesTypeResponse(String groupId, boolean redirected, String leaderStr,
+ String errorMsg) {
+ super(groupId, redirected, leaderStr, errorMsg);
+ }
+
+ public static QuerySeriesTypeResponse createSuccessResponse(String groupId, TSDataType dataType) {
+ QuerySeriesTypeResponse response = new QuerySeriesTypeResponse(groupId, false, null,
+ null);
+ response.dataType = dataType;
+ return response;
+ }
+
+ public static QuerySeriesTypeResponse createErrorResponse(String groupId, String errorMsg) {
+ return new QuerySeriesTypeResponse(groupId, false, null, errorMsg);
+ }
+
+ public TSDataType getDataType() {
+ return dataType;
+ }
+
+ public void setDataType(TSDataType dataType) {
+ this.dataType = dataType;
+ }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/service/TSServiceClusterImpl.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/service/TSServiceClusterImpl.java
index ba4e59d..33e3e81 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/service/TSServiceClusterImpl.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/service/TSServiceClusterImpl.java
@@ -34,11 +34,13 @@ import org.apache.iotdb.db.auth.AuthException;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.metadata.Metadata;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.service.TSServiceImpl;
import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementReq;
import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementResp;
import org.apache.iotdb.service.rpc.thrift.TS_StatusCode;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.thrift.TException;
import org.slf4j.Logger;
@@ -244,6 +246,23 @@ public class TSServiceClusterImpl extends TSServiceImpl {
return queryMetadataExecutor.get().processMetadataInStringQuery();
}
+ @Override
+ protected Metadata getMetadata()
+ throws InterruptedException, ProcessorException, PathErrorException {
+ return queryMetadataExecutor.get().processMetadataQuery();
+ }
+
+ @Override
+ protected TSDataType getSeriesType(String path) throws PathErrorException, InterruptedException, ProcessorException {
+ return queryMetadataExecutor.get().processSeriesTypeQuery(path);
+ }
+
+ @Override
+ protected List<String> getPaths(String path)
+ throws PathErrorException, InterruptedException, ProcessorException {
+ return queryMetadataExecutor.get().processPathsQuery(path);
+ }
+
@OnlyForTest
public NonQueryExecutor getNonQueryExecutor() {
return nonQueryExecutor.get();
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/metadata/Metadata.java b/iotdb/src/main/java/org/apache/iotdb/db/metadata/Metadata.java
index 40fd0e4..5c56fe1 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/metadata/Metadata.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/metadata/Metadata.java
@@ -18,15 +18,21 @@
*/
package org.apache.iotdb.db.metadata;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
/**
* This class stores all the metadata info for every deviceId and every timeseries.
*/
-public class Metadata {
+public class Metadata implements Serializable {
private Map<String, List<MeasurementSchema>> seriesMap;
private Map<String, List<String>> deviceIdMap;
@@ -67,9 +73,122 @@ public class Metadata {
return deviceIdMap;
}
+ /**
+ * combine multiple metadatas
+ */
+ public static Metadata combineMetadatas(Metadata[] metadatas) {
+ Map<String, List<MeasurementSchema>> seriesMap = new HashMap<>();
+ Map<String, List<String>> deviceIdMap = new HashMap<>();
+ Map<String, Map<String, MeasurementSchema>> typeSchemaMap = new HashMap<>();
+
+ if (metadatas == null || metadatas.length == 0) {
+ return new Metadata(seriesMap, deviceIdMap);
+ }
+
+ for (int i = 0; i < metadatas.length; i++) {
+ Map<String, List<MeasurementSchema>> subSeriesMap = metadatas[i].seriesMap;
+ for (Entry<String, List<MeasurementSchema>> entry : subSeriesMap.entrySet()) {
+ Map<String, MeasurementSchema> map;
+ if (typeSchemaMap.containsKey(entry.getKey())) {
+ map = typeSchemaMap.get(entry.getKey());
+ } else {
+ map = new HashMap<>();
+ }
+ entry.getValue().forEach(schema -> map.put(schema.getMeasurementId(), schema));
+ if (!typeSchemaMap.containsKey(entry.getKey())) {
+ typeSchemaMap.put(entry.getKey(), map);
+ }
+ }
+
+ Map<String, List<String>> subDeviceIdMap = metadatas[i].deviceIdMap;
+ for (Entry<String, List<String>> entry : subDeviceIdMap.entrySet()) {
+ List<String> list;
+ if (deviceIdMap.containsKey(entry.getKey())) {
+ list = deviceIdMap.get(entry.getKey());
+ } else {
+ list = new ArrayList<>();
+ }
+ list.addAll(entry.getValue());
+ if (!deviceIdMap.containsKey(entry.getKey())) {
+ deviceIdMap.put(entry.getKey(), list);
+ }
+ }
+ }
+
+ for (Entry<String, Map<String, MeasurementSchema>> entry : typeSchemaMap.entrySet()) {
+ List<MeasurementSchema> list = new ArrayList<>();
+ list.addAll(entry.getValue().values());
+ seriesMap.put(entry.getKey(), list);
+ }
+
+ return new Metadata(seriesMap, deviceIdMap);
+ }
+
@Override
public String toString() {
return seriesMap.toString() + "\n" + deviceIdMap.toString();
}
+ @Override
+ public boolean equals(Object obj) {
+ if(this == obj){
+ return true;
+ }
+ if(obj == null){
+ return false;
+ }
+ if(this.getClass() != obj.getClass()){
+ return false;
+ }
+
+ Metadata metadata = (Metadata) obj;
+ return seriesMapEquals(seriesMap, metadata.seriesMap) && deviceIdMapEquals(deviceIdMap, metadata.deviceIdMap);
+ }
+
+ /**
+ * only used to check if seriesMap is equal to another seriesMap
+ */
+ private boolean seriesMapEquals(Map<String, List<MeasurementSchema>> map1, Map<String, List<MeasurementSchema>> map2) {
+ if (!map1.keySet().equals(map2.keySet())) {
+ return false;
+ }
+
+ for (Entry<String, List<MeasurementSchema>> entry : map1.entrySet()) {
+ List list1 = entry.getValue();
+ List list2 = map2.get(entry.getKey());
+
+ if (!listEquals(list1, list2)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * only used to check if deviceIdMap is equal to another deviceIdMap
+ */
+ private boolean deviceIdMapEquals(Map<String, List<String>> map1, Map<String, List<String>> map2) {
+ if (!map1.keySet().equals(map2.keySet())) {
+ return false;
+ }
+
+ for (Entry<String, List<String>> entry : map1.entrySet()) {
+ List list1 = entry.getValue();
+ List list2 = map2.get(entry.getKey());
+
+ if (!listEquals(list1, list2)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private boolean listEquals(List list1, List list2) {
+ Set set1 = new HashSet();
+ set1.addAll(list1);
+ Set set2 = new HashSet();
+ set2.addAll(list2);
+
+ return set1.equals(set2);
+ }
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 79dbd53..299133c 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -81,6 +81,7 @@ import org.apache.iotdb.service.rpc.thrift.TS_SessionHandle;
import org.apache.iotdb.service.rpc.thrift.TS_Status;
import org.apache.iotdb.service.rpc.thrift.TS_StatusCode;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.thrift.TException;
@@ -308,14 +309,14 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
Metadata metadata;
try {
String column = req.getColumnPath();
- metadata = MManager.getInstance().getMetadata();
+ metadata = getMetadata();
Map<String, List<String>> deviceMap = metadata.getDeviceMap();
if (deviceMap == null || !deviceMap.containsKey(column)) {
resp.setColumnsList(new ArrayList<>());
} else {
resp.setColumnsList(deviceMap.get(column));
}
- } catch (PathErrorException e) {
+ } catch (PathErrorException | InterruptedException | ProcessorException e) {
LOGGER.error("cannot get delta object map", e);
status = getErrorStatus(String.format("Failed to fetch delta object map because: %s", e));
resp.setStatus(status);
@@ -330,8 +331,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
break;
case "COLUMN":
try {
- resp.setDataType(MManager.getInstance().getSeriesType(req.getColumnPath()).toString());
- } catch (PathErrorException e) {
+ resp.setDataType(getSeriesType(req.getColumnPath()).toString());
+ } catch (PathErrorException | InterruptedException | ProcessorException e) {
// TODO aggregate seriesPath e.g. last(root.ln.wf01.wt01.status)
// status = new TS_Status(TS_StatusCode.ERROR_STATUS);
// status.setErrorMessage(String.format("Failed to fetch %s's data type because: %s",
@@ -343,8 +344,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
break;
case "ALL_COLUMNS":
try {
- resp.setColumnsList(MManager.getInstance().getPaths(req.getColumnPath()));
- } catch (PathErrorException e) {
+ resp.setColumnsList(getPaths(req.getColumnPath()));
+ } catch (PathErrorException | InterruptedException | ProcessorException e) {
status = getErrorStatus(String
.format("Failed to fetch %s's all columns because: %s", req.getColumnPath(), e));
resp.setStatus(status);
@@ -382,6 +383,18 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
return MManager.getInstance().getMetadataInString();
}
+ protected Metadata getMetadata() throws PathErrorException, InterruptedException, ProcessorException {
+ return MManager.getInstance().getMetadata();
+ }
+
+ protected TSDataType getSeriesType(String path) throws PathErrorException, InterruptedException, ProcessorException {
+ return MManager.getInstance().getSeriesType(path);
+ }
+
+ protected List<String> getPaths(String path) throws PathErrorException, InterruptedException, ProcessorException {
+ return MManager.getInstance().getPaths(path);
+ }
+
/**
* Judge whether the statement is ADMIN COMMAND and if true, executeWithGlobalTimeFilter it.
*
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/metadata/MetadataTest.java b/iotdb/src/test/java/org/apache/iotdb/db/metadata/MetadataTest.java
new file mode 100644
index 0000000..9e1adc7
--- /dev/null
+++ b/iotdb/src/test/java/org/apache/iotdb/db/metadata/MetadataTest.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.metadata;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.iotdb.db.exception.PathErrorException;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class MetadataTest {
+
+ @Before
+ public void setUp() throws Exception {
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ EnvironmentUtils.cleanEnv();
+ }
+
+ @Test
+ public void testCombineMetadatas() {
+ MManager manager = MManager.getInstance();
+
+ try {
+ manager.setStorageLevelToMTree("root.t.d1");
+ manager.addPathToMTree("root.t.d1.s0", "INT32", "RLE");
+ manager.addPathToMTree("root.t.d1.s1", "DOUBLE", "RLE");
+ manager.setStorageLevelToMTree("root.t.d2");
+ manager.addPathToMTree("root.t.d2.s1", "DOUBLE", "RLE");
+ Metadata metadata1 = manager.getMetadata();
+
+ manager.clear();
+
+ manager.setStorageLevelToMTree("root.t.d3");
+ manager.addPathToMTree("root.t.d3.s1", "DOUBLE", "RLE");
+ manager.addPathToMTree("root.t.d3.s2", "TEXT", "RLE");
+ manager.setStorageLevelToMTree("root.t1.d1");
+ manager.addPathToMTree("root.t1.d1.s1", "DOUBLE", "RLE");
+ manager.addPathToMTree("root.t1.d1.s2", "TEXT", "RLE");
+ Metadata metadata2 = manager.getMetadata();
+
+ manager.clear();
+
+ manager.setStorageLevelToMTree("root.t.d1");
+ manager.addPathToMTree("root.t.d1.s0", "INT32", "RLE");
+ manager.addPathToMTree("root.t.d1.s1", "DOUBLE", "RLE");
+ manager.setStorageLevelToMTree("root.t.d2");
+ manager.addPathToMTree("root.t.d2.s1", "DOUBLE", "RLE");
+ manager.setStorageLevelToMTree("root.t.d3");
+ manager.addPathToMTree("root.t.d3.s1", "DOUBLE", "RLE");
+ manager.addPathToMTree("root.t.d3.s2", "TEXT", "RLE");
+ manager.setStorageLevelToMTree("root.t1.d1");
+ manager.addPathToMTree("root.t1.d1.s1", "DOUBLE", "RLE");
+ manager.addPathToMTree("root.t1.d1.s2", "TEXT", "RLE");
+ Metadata metadata = manager.getMetadata();
+
+ Metadata combineMetadata = Metadata.combineMetadatas(new Metadata[]{metadata1, metadata2});
+ assertTrue(metadata.equals(combineMetadata));
+ } catch (PathErrorException | IOException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}
\ No newline at end of file