You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ea...@apache.org on 2019/04/11 15:54:56 UTC
[incubator-iotdb] branch cluster_metadata_query updated: update
This is an automated email from the ASF dual-hosted git repository.
east pushed a commit to branch cluster_metadata_query
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/cluster_metadata_query by this push:
new 2cde64d update
2cde64d is described below
commit 2cde64d52bdaa5a1968394e2e09693002e2b6ebe
Author: mdf369 <95...@qq.com>
AuthorDate: Thu Apr 11 23:54:42 2019 +0800
update
---
.../cluster/qp/executor/QueryMetadataExecutor.java | 19 ++---
.../processor/QueryMetadataAsyncProcessor.java | 11 ++-
.../raft/processor/QueryPathsAsyncProcessor.java | 10 ++-
.../processor/QuerySeriesTypeAsyncProcessor.java | 10 ++-
.../processor/QueryTimeSeriesAsyncProcessor.java | 10 ++-
.../cluster/rpc/service/TSServiceClusterImpl.java | 1 -
.../org/apache/iotdb/db/metadata/Metadata.java | 64 +++++++++++++++
.../org/apache/iotdb/db/metadata/MetadataTest.java | 93 ++++++++++++++++++++++
8 files changed, 195 insertions(+), 23 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
index 3be4e81..2187bea 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
@@ -139,7 +139,6 @@ public class QueryMetadataExecutor extends ClusterQPExecutor {
PeerId holder = RaftUtils.getRandomPeerID(groupId);
res.addAll(queryTimeSeries(task, holder));
} catch (RaftConnectionException e) {
- LOGGER.error(e.getMessage());
throw new ProcessorException("Raft connection occurs error.", e);
}
}
@@ -167,7 +166,6 @@ public class QueryMetadataExecutor extends ClusterQPExecutor {
PeerId holder = RaftUtils.getRandomPeerID(groupId);
asyncSendNonQueryTask(task, holder, 0);
} catch (RaftConnectionException e) {
- LOGGER.error(e.getMessage());
throw new ProcessorException("Raft connection occurs error.", e);
}
}
@@ -177,7 +175,6 @@ public class QueryMetadataExecutor extends ClusterQPExecutor {
task.await();
BasicResponse response = task.getResponse();
if (response == null || !response.isSuccess()) {
- LOGGER.error("Execute show timeseries statement false.");
throw new ProcessorException();
}
metadataList.add(((QueryMetadataInStringResponse)response).getMetadata());
@@ -207,7 +204,6 @@ public class QueryMetadataExecutor extends ClusterQPExecutor {
PeerId holder = RaftUtils.getRandomPeerID(groupId);
asyncSendNonQueryTask(task, holder, 0);
} catch (RaftConnectionException e) {
- LOGGER.error(e.getMessage());
throw new ProcessorException("Raft connection occurs error.", e);
}
}
@@ -217,8 +213,11 @@ public class QueryMetadataExecutor extends ClusterQPExecutor {
task.await();
BasicResponse response = task.getResponse();
if (response == null || !response.isSuccess()) {
- LOGGER.error("Execute show timeseries statement false.");
- throw new ProcessorException();
+ String errorMessage = "response is null";
+ if (response != null && response.getErrorMsg() != null) {
+ errorMessage = response.getErrorMsg();
+ }
+ throw new ProcessorException("Execute query metadata statement false because " + errorMessage);
}
metadatas[i] = ((QueryMetadataResponse)response).getMetadata();
}
@@ -247,7 +246,6 @@ public class QueryMetadataExecutor extends ClusterQPExecutor {
PeerId holder = RaftUtils.getRandomPeerID(groupId);
dataType = querySeriesType(task, holder);
} catch (RaftConnectionException e) {
- LOGGER.error(e.getMessage());
throw new ProcessorException("Raft connection occurs error.", e);
}
}
@@ -296,7 +294,6 @@ public class QueryMetadataExecutor extends ClusterQPExecutor {
PeerId holder = RaftUtils.getRandomPeerID(groupId);
res.addAll(queryPaths(task, holder));
} catch (RaftConnectionException e) {
- LOGGER.error(e.getMessage());
throw new ProcessorException("Raft connection occurs error.", e);
}
}
@@ -353,8 +350,7 @@ public class QueryMetadataExecutor extends ClusterQPExecutor {
task.await();
QueryTimeSeriesResponse response = (QueryTimeSeriesResponse) task.getResponse();
if (response == null || !response.isSuccess()) {
- LOGGER.error("Execute show timeseries {} statement false.", pathList);
- throw new ProcessorException();
+ throw new ProcessorException("Execute show timeseries " + pathList + " statement false.");
}
return response.getTimeSeries();
}
@@ -405,8 +401,7 @@ public class QueryMetadataExecutor extends ClusterQPExecutor {
task.await();
QuerySeriesTypeResponse response = (QuerySeriesTypeResponse) task.getResponse();
if (response == null || !response.isSuccess()) {
- LOGGER.error("Execute get series type for {} statement false.", path);
- throw new ProcessorException();
+ throw new ProcessorException("Execute get series type for " + path + " statement false.");
}
return response.getDataType();
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryMetadataAsyncProcessor.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryMetadataAsyncProcessor.java
index 7af4adc..176fa33 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryMetadataAsyncProcessor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryMetadataAsyncProcessor.java
@@ -40,20 +40,22 @@ public class QueryMetadataAsyncProcessor extends
public void handleRequest(BizContext bizContext, AsyncContext asyncContext,
QueryMetadataRequest request) {
String groupId = request.getGroupID();
- final byte[] reqContext = RaftUtils.createRaftRequestContext();
- DataPartitionRaftHolder dataPartitionHolder = RaftUtils.getDataPartitonRaftHolder(groupId);
if (request.getReadConsistencyLevel() == ClusterConstant.WEAK_CONSISTENCY_LEVEL) {
QueryMetadataResponse response = null;
try {
response = QueryMetadataResponse
.createSuccessResponse(groupId, mManager.getMetadata());
+ response.addResult(true);
} catch (PathErrorException e) {
response = QueryMetadataResponse.createErrorResponse(groupId, e.getMessage());
+ response.addResult(false);
}
- response.addResult(true);
asyncContext.sendResponse(response);
} else {
+ final byte[] reqContext = RaftUtils.createRaftRequestContext();
+ DataPartitionRaftHolder dataPartitionHolder = RaftUtils.getDataPartitonRaftHolder(groupId);
+
((RaftService) dataPartitionHolder.getService()).getNode()
.readIndex(reqContext, new ReadIndexClosure() {
@@ -64,10 +66,11 @@ public class QueryMetadataAsyncProcessor extends
try {
response = QueryMetadataResponse
.createSuccessResponse(groupId, mManager.getMetadata());
+ response.addResult(true);
} catch (PathErrorException e) {
response = QueryMetadataResponse.createErrorResponse(groupId, e.getMessage());
+ response.addResult(false);
}
- response.addResult(true);
} else {
response = QueryMetadataResponse
.createErrorResponse(groupId, status.getErrorMsg());
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryPathsAsyncProcessor.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryPathsAsyncProcessor.java
index b234a83..f54aba0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryPathsAsyncProcessor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryPathsAsyncProcessor.java
@@ -39,19 +39,22 @@ public class QueryPathsAsyncProcessor extends BasicAsyncUserProcessor<QueryPaths
public void handleRequest(BizContext bizContext, AsyncContext asyncContext,
QueryPathsRequest request) {
String groupId = request.getGroupID();
- final byte[] reqContext = RaftUtils.createRaftRequestContext();
- DataPartitionRaftHolder dataPartitionHolder = RaftUtils.getDataPartitonRaftHolder(groupId);
if (request.getReadConsistencyLevel() == ClusterConstant.WEAK_CONSISTENCY_LEVEL) {
QueryPathsResponse response = QueryPathsResponse
.createEmptyResponse(groupId);
try {
queryPaths(request, response);
+ response.addResult(true);
} catch (final PathErrorException e) {
response = QueryPathsResponse.createErrorResponse(groupId, e.getMessage());
+ response.addResult(false);
}
asyncContext.sendResponse(response);
} else {
+ final byte[] reqContext = RaftUtils.createRaftRequestContext();
+ DataPartitionRaftHolder dataPartitionHolder = RaftUtils.getDataPartitonRaftHolder(groupId);
+
((RaftService) dataPartitionHolder.getService()).getNode()
.readIndex(reqContext, new ReadIndexClosure() {
@@ -62,12 +65,15 @@ public class QueryPathsAsyncProcessor extends BasicAsyncUserProcessor<QueryPaths
if (status.isOk()) {
try {
queryPaths(request, response);
+ response.addResult(true);
} catch (final PathErrorException e) {
response = QueryPathsResponse.createErrorResponse(groupId, e.getMessage());
+ response.addResult(false);
}
} else {
response = QueryPathsResponse
.createErrorResponse(groupId, status.getErrorMsg());
+ response.addResult(false);
}
asyncContext.sendResponse(response);
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QuerySeriesTypeAsyncProcessor.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QuerySeriesTypeAsyncProcessor.java
index b72f0a2..f0a4fc6 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QuerySeriesTypeAsyncProcessor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QuerySeriesTypeAsyncProcessor.java
@@ -39,18 +39,21 @@ public class QuerySeriesTypeAsyncProcessor extends BasicAsyncUserProcessor<Query
public void handleRequest(BizContext bizContext, AsyncContext asyncContext,
QuerySeriesTypeRequest request) {
String groupId = request.getGroupID();
- final byte[] reqContext = RaftUtils.createRaftRequestContext();
- DataPartitionRaftHolder dataPartitionHolder = RaftUtils.getDataPartitonRaftHolder(groupId);
if (request.getReadConsistencyLevel() == ClusterConstant.WEAK_CONSISTENCY_LEVEL) {
QuerySeriesTypeResponse response;
try {
response = QuerySeriesTypeResponse.createSuccessResponse(groupId, mManager.getSeriesType(request.getPath()));
+ response.addResult(true);
} catch (final PathErrorException e) {
response = QuerySeriesTypeResponse.createErrorResponse(groupId, e.getMessage());
+ response.addResult(false);
}
asyncContext.sendResponse(response);
} else {
+ final byte[] reqContext = RaftUtils.createRaftRequestContext();
+ DataPartitionRaftHolder dataPartitionHolder = RaftUtils.getDataPartitonRaftHolder(groupId);
+
((RaftService) dataPartitionHolder.getService()).getNode()
.readIndex(reqContext, new ReadIndexClosure() {
@@ -60,12 +63,15 @@ public class QuerySeriesTypeAsyncProcessor extends BasicAsyncUserProcessor<Query
if (status.isOk()) {
try {
response = QuerySeriesTypeResponse.createSuccessResponse(groupId, mManager.getSeriesType(request.getPath()));
+ response.addResult(true);
} catch (final PathErrorException e) {
response = QuerySeriesTypeResponse.createErrorResponse(groupId, e.getMessage());
+ response.addResult(false);
}
} else {
response = QuerySeriesTypeResponse
.createErrorResponse(groupId, status.getErrorMsg());
+ response.addResult(false);
}
asyncContext.sendResponse(response);
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryTimeSeriesAsyncProcessor.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryTimeSeriesAsyncProcessor.java
index a800302..c41fdcf 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryTimeSeriesAsyncProcessor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryTimeSeriesAsyncProcessor.java
@@ -40,19 +40,22 @@ public class QueryTimeSeriesAsyncProcessor extends BasicAsyncUserProcessor<Query
public void handleRequest(BizContext bizContext, AsyncContext asyncContext,
QueryTimeSeriesRequest request) {
String groupId = request.getGroupID();
- final byte[] reqContext = RaftUtils.createRaftRequestContext();
- DataPartitionRaftHolder dataPartitionHolder = RaftUtils.getDataPartitonRaftHolder(groupId);
if (request.getReadConsistencyLevel() == ClusterConstant.WEAK_CONSISTENCY_LEVEL) {
QueryTimeSeriesResponse response = QueryTimeSeriesResponse
.createEmptyResponse(groupId);
try {
queryTimeSeries(request, response);
+ response.addResult(true);
} catch (final PathErrorException e) {
response = QueryTimeSeriesResponse.createErrorResponse(groupId, e.getMessage());
+ response.addResult(false);
}
asyncContext.sendResponse(response);
} else {
+ final byte[] reqContext = RaftUtils.createRaftRequestContext();
+ DataPartitionRaftHolder dataPartitionHolder = RaftUtils.getDataPartitonRaftHolder(groupId);
+
((RaftService) dataPartitionHolder.getService()).getNode()
.readIndex(reqContext, new ReadIndexClosure() {
@@ -63,12 +66,15 @@ public class QueryTimeSeriesAsyncProcessor extends BasicAsyncUserProcessor<Query
if (status.isOk()) {
try {
queryTimeSeries(request, response);
+ response.addResult(true);
} catch (final PathErrorException e) {
response = QueryTimeSeriesResponse.createErrorResponse(groupId, e.getMessage());
+ response.addResult(false);
}
} else {
response = QueryTimeSeriesResponse
.createErrorResponse(groupId, status.getErrorMsg());
+ response.addResult(false);
}
asyncContext.sendResponse(response);
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/service/TSServiceClusterImpl.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/service/TSServiceClusterImpl.java
index 320398b..33e3e81 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/service/TSServiceClusterImpl.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/service/TSServiceClusterImpl.java
@@ -34,7 +34,6 @@ import org.apache.iotdb.db.auth.AuthException;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.exception.ProcessorException;
-import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.metadata.Metadata;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.service.TSServiceImpl;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/metadata/Metadata.java b/iotdb/src/main/java/org/apache/iotdb/db/metadata/Metadata.java
index 15b6011..c83a3dc 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/metadata/Metadata.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/metadata/Metadata.java
@@ -20,9 +20,11 @@ package org.apache.iotdb.db.metadata;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
@@ -126,4 +128,66 @@ public class Metadata {
return seriesMap.toString() + "\n" + deviceIdMap.toString();
}
+ @Override
+ public boolean equals(Object obj) {
+ if(this == obj){
+ return true;
+ }
+ if(obj == null){
+ return false;
+ }
+ if(this.getClass() != obj.getClass()){
+ return false;
+ }
+
+ Metadata metadata = (Metadata) obj;
+ return seriesMapEquals(seriesMap, metadata.seriesMap) && deviceIdMapEquals(deviceIdMap, metadata.deviceIdMap);
+ }
+
+ /**
+ * only used to check if seriesMap is equal to another seriesMap
+ */
+ private boolean seriesMapEquals(Map<String, List<MeasurementSchema>> map1, Map<String, List<MeasurementSchema>> map2) {
+ if (!map1.keySet().equals(map2.keySet())) {
+ return false;
+ }
+
+ for (Entry<String, List<MeasurementSchema>> entry : map1.entrySet()) {
+ List list1 = entry.getValue();
+ List list2 = map2.get(entry.getKey());
+
+ if (!listEquals(list1, list2)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * only used to check if deviceIdMap is equal to another deviceIdMap
+ */
+ private boolean deviceIdMapEquals(Map<String, List<String>> map1, Map<String, List<String>> map2) {
+ if (!map1.keySet().equals(map2.keySet())) {
+ return false;
+ }
+
+ for (Entry<String, List<String>> entry : map1.entrySet()) {
+ List list1 = entry.getValue();
+ List list2 = map2.get(entry.getKey());
+
+ if (!listEquals(list1, list2)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private boolean listEquals(List list1, List list2) {
+ Set set1 = new HashSet();
+ set1.addAll(list1);
+ Set set2 = new HashSet();
+ set2.addAll(list2);
+
+ return set1.equals(set2);
+ }
}
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/metadata/MetadataTest.java b/iotdb/src/test/java/org/apache/iotdb/db/metadata/MetadataTest.java
new file mode 100644
index 0000000..9e1adc7
--- /dev/null
+++ b/iotdb/src/test/java/org/apache/iotdb/db/metadata/MetadataTest.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.metadata;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.iotdb.db.exception.PathErrorException;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class MetadataTest {
+
+ @Before
+ public void setUp() throws Exception {
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ EnvironmentUtils.cleanEnv();
+ }
+
+ @Test
+ public void testCombineMetadatas() {
+ MManager manager = MManager.getInstance();
+
+ try {
+ manager.setStorageLevelToMTree("root.t.d1");
+ manager.addPathToMTree("root.t.d1.s0", "INT32", "RLE");
+ manager.addPathToMTree("root.t.d1.s1", "DOUBLE", "RLE");
+ manager.setStorageLevelToMTree("root.t.d2");
+ manager.addPathToMTree("root.t.d2.s1", "DOUBLE", "RLE");
+ Metadata metadata1 = manager.getMetadata();
+
+ manager.clear();
+
+ manager.setStorageLevelToMTree("root.t.d3");
+ manager.addPathToMTree("root.t.d3.s1", "DOUBLE", "RLE");
+ manager.addPathToMTree("root.t.d3.s2", "TEXT", "RLE");
+ manager.setStorageLevelToMTree("root.t1.d1");
+ manager.addPathToMTree("root.t1.d1.s1", "DOUBLE", "RLE");
+ manager.addPathToMTree("root.t1.d1.s2", "TEXT", "RLE");
+ Metadata metadata2 = manager.getMetadata();
+
+ manager.clear();
+
+ manager.setStorageLevelToMTree("root.t.d1");
+ manager.addPathToMTree("root.t.d1.s0", "INT32", "RLE");
+ manager.addPathToMTree("root.t.d1.s1", "DOUBLE", "RLE");
+ manager.setStorageLevelToMTree("root.t.d2");
+ manager.addPathToMTree("root.t.d2.s1", "DOUBLE", "RLE");
+ manager.setStorageLevelToMTree("root.t.d3");
+ manager.addPathToMTree("root.t.d3.s1", "DOUBLE", "RLE");
+ manager.addPathToMTree("root.t.d3.s2", "TEXT", "RLE");
+ manager.setStorageLevelToMTree("root.t1.d1");
+ manager.addPathToMTree("root.t1.d1.s1", "DOUBLE", "RLE");
+ manager.addPathToMTree("root.t1.d1.s2", "TEXT", "RLE");
+ Metadata metadata = manager.getMetadata();
+
+ Metadata combineMetadata = Metadata.combineMetadatas(new Metadata[]{metadata1, metadata2});
+ assertTrue(metadata.equals(combineMetadata));
+ } catch (PathErrorException | IOException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}
\ No newline at end of file