You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ea...@apache.org on 2019/04/11 15:54:56 UTC

[incubator-iotdb] branch cluster_metadata_query updated: update

This is an automated email from the ASF dual-hosted git repository.

east pushed a commit to branch cluster_metadata_query
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/cluster_metadata_query by this push:
     new 2cde64d  update
2cde64d is described below

commit 2cde64d52bdaa5a1968394e2e09693002e2b6ebe
Author: mdf369 <95...@qq.com>
AuthorDate: Thu Apr 11 23:54:42 2019 +0800

    update
---
 .../cluster/qp/executor/QueryMetadataExecutor.java | 19 ++---
 .../processor/QueryMetadataAsyncProcessor.java     | 11 ++-
 .../raft/processor/QueryPathsAsyncProcessor.java   | 10 ++-
 .../processor/QuerySeriesTypeAsyncProcessor.java   | 10 ++-
 .../processor/QueryTimeSeriesAsyncProcessor.java   | 10 ++-
 .../cluster/rpc/service/TSServiceClusterImpl.java  |  1 -
 .../org/apache/iotdb/db/metadata/Metadata.java     | 64 +++++++++++++++
 .../org/apache/iotdb/db/metadata/MetadataTest.java | 93 ++++++++++++++++++++++
 8 files changed, 195 insertions(+), 23 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
index 3be4e81..2187bea 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
@@ -139,7 +139,6 @@ public class QueryMetadataExecutor extends ClusterQPExecutor {
         PeerId holder = RaftUtils.getRandomPeerID(groupId);
         res.addAll(queryTimeSeries(task, holder));
       } catch (RaftConnectionException e) {
-        LOGGER.error(e.getMessage());
         throw new ProcessorException("Raft connection occurs error.", e);
       }
     }
@@ -167,7 +166,6 @@ public class QueryMetadataExecutor extends ClusterQPExecutor {
           PeerId holder = RaftUtils.getRandomPeerID(groupId);
           asyncSendNonQueryTask(task, holder, 0);
         } catch (RaftConnectionException e) {
-          LOGGER.error(e.getMessage());
           throw new ProcessorException("Raft connection occurs error.", e);
         }
       }
@@ -177,7 +175,6 @@ public class QueryMetadataExecutor extends ClusterQPExecutor {
       task.await();
       BasicResponse response = task.getResponse();
       if (response == null || !response.isSuccess()) {
-        LOGGER.error("Execute show timeseries statement false.");
         throw new ProcessorException();
       }
       metadataList.add(((QueryMetadataInStringResponse)response).getMetadata());
@@ -207,7 +204,6 @@ public class QueryMetadataExecutor extends ClusterQPExecutor {
           PeerId holder = RaftUtils.getRandomPeerID(groupId);
           asyncSendNonQueryTask(task, holder, 0);
         } catch (RaftConnectionException e) {
-          LOGGER.error(e.getMessage());
           throw new ProcessorException("Raft connection occurs error.", e);
         }
       }
@@ -217,8 +213,11 @@ public class QueryMetadataExecutor extends ClusterQPExecutor {
       task.await();
       BasicResponse response = task.getResponse();
       if (response == null || !response.isSuccess()) {
-        LOGGER.error("Execute show timeseries statement false.");
-        throw new ProcessorException();
+        String errorMessage = "response is null";
+        if (response != null && response.getErrorMsg() != null) {
+          errorMessage = response.getErrorMsg();
+        }
+        throw new ProcessorException("Execute query metadata statement false because " + errorMessage);
       }
       metadatas[i] = ((QueryMetadataResponse)response).getMetadata();
     }
@@ -247,7 +246,6 @@ public class QueryMetadataExecutor extends ClusterQPExecutor {
           PeerId holder = RaftUtils.getRandomPeerID(groupId);
           dataType = querySeriesType(task, holder);
         } catch (RaftConnectionException e) {
-          LOGGER.error(e.getMessage());
           throw new ProcessorException("Raft connection occurs error.", e);
         }
       }
@@ -296,7 +294,6 @@ public class QueryMetadataExecutor extends ClusterQPExecutor {
         PeerId holder = RaftUtils.getRandomPeerID(groupId);
         res.addAll(queryPaths(task, holder));
       } catch (RaftConnectionException e) {
-        LOGGER.error(e.getMessage());
         throw new ProcessorException("Raft connection occurs error.", e);
       }
     }
@@ -353,8 +350,7 @@ public class QueryMetadataExecutor extends ClusterQPExecutor {
     task.await();
     QueryTimeSeriesResponse response = (QueryTimeSeriesResponse) task.getResponse();
     if (response == null || !response.isSuccess()) {
-      LOGGER.error("Execute show timeseries {} statement false.", pathList);
-      throw new ProcessorException();
+      throw new ProcessorException("Execute show timeseries " + pathList + " statement false.");
     }
     return response.getTimeSeries();
   }
@@ -405,8 +401,7 @@ public class QueryMetadataExecutor extends ClusterQPExecutor {
     task.await();
     QuerySeriesTypeResponse response = (QuerySeriesTypeResponse) task.getResponse();
     if (response == null || !response.isSuccess()) {
-      LOGGER.error("Execute get series type for {} statement false.", path);
-      throw new ProcessorException();
+      throw new ProcessorException("Execute get series type for " + path + " statement false.");
     }
     return response.getDataType();
   }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryMetadataAsyncProcessor.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryMetadataAsyncProcessor.java
index 7af4adc..176fa33 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryMetadataAsyncProcessor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryMetadataAsyncProcessor.java
@@ -40,20 +40,22 @@ public class QueryMetadataAsyncProcessor extends
   public void handleRequest(BizContext bizContext, AsyncContext asyncContext,
       QueryMetadataRequest request) {
     String groupId = request.getGroupID();
-    final byte[] reqContext = RaftUtils.createRaftRequestContext();
-    DataPartitionRaftHolder dataPartitionHolder = RaftUtils.getDataPartitonRaftHolder(groupId);
 
     if (request.getReadConsistencyLevel() == ClusterConstant.WEAK_CONSISTENCY_LEVEL) {
       QueryMetadataResponse response = null;
       try {
         response = QueryMetadataResponse
             .createSuccessResponse(groupId, mManager.getMetadata());
+        response.addResult(true);
       } catch (PathErrorException e) {
         response = QueryMetadataResponse.createErrorResponse(groupId, e.getMessage());
+        response.addResult(false);
       }
-      response.addResult(true);
       asyncContext.sendResponse(response);
     } else {
+      final byte[] reqContext = RaftUtils.createRaftRequestContext();
+      DataPartitionRaftHolder dataPartitionHolder = RaftUtils.getDataPartitonRaftHolder(groupId);
+
       ((RaftService) dataPartitionHolder.getService()).getNode()
           .readIndex(reqContext, new ReadIndexClosure() {
 
@@ -64,10 +66,11 @@ public class QueryMetadataAsyncProcessor extends
                 try {
                   response = QueryMetadataResponse
                       .createSuccessResponse(groupId, mManager.getMetadata());
+                  response.addResult(true);
                 } catch (PathErrorException e) {
                   response = QueryMetadataResponse.createErrorResponse(groupId, e.getMessage());
+                  response.addResult(false);
                 }
-                response.addResult(true);
               } else {
                 response = QueryMetadataResponse
                     .createErrorResponse(groupId, status.getErrorMsg());
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryPathsAsyncProcessor.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryPathsAsyncProcessor.java
index b234a83..f54aba0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryPathsAsyncProcessor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryPathsAsyncProcessor.java
@@ -39,19 +39,22 @@ public class QueryPathsAsyncProcessor extends BasicAsyncUserProcessor<QueryPaths
   public void handleRequest(BizContext bizContext, AsyncContext asyncContext,
       QueryPathsRequest request) {
     String groupId = request.getGroupID();
-    final byte[] reqContext = RaftUtils.createRaftRequestContext();
-    DataPartitionRaftHolder dataPartitionHolder = RaftUtils.getDataPartitonRaftHolder(groupId);
 
     if (request.getReadConsistencyLevel() == ClusterConstant.WEAK_CONSISTENCY_LEVEL) {
       QueryPathsResponse response = QueryPathsResponse
           .createEmptyResponse(groupId);
       try {
         queryPaths(request, response);
+        response.addResult(true);
       } catch (final PathErrorException e) {
         response = QueryPathsResponse.createErrorResponse(groupId, e.getMessage());
+        response.addResult(false);
       }
       asyncContext.sendResponse(response);
     } else {
+      final byte[] reqContext = RaftUtils.createRaftRequestContext();
+      DataPartitionRaftHolder dataPartitionHolder = RaftUtils.getDataPartitonRaftHolder(groupId);
+
       ((RaftService) dataPartitionHolder.getService()).getNode()
           .readIndex(reqContext, new ReadIndexClosure() {
 
@@ -62,12 +65,15 @@ public class QueryPathsAsyncProcessor extends BasicAsyncUserProcessor<QueryPaths
               if (status.isOk()) {
                 try {
                   queryPaths(request, response);
+                  response.addResult(true);
                 } catch (final PathErrorException e) {
                   response = QueryPathsResponse.createErrorResponse(groupId, e.getMessage());
+                  response.addResult(false);
                 }
               } else {
                 response = QueryPathsResponse
                     .createErrorResponse(groupId, status.getErrorMsg());
+                response.addResult(false);
               }
               asyncContext.sendResponse(response);
             }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QuerySeriesTypeAsyncProcessor.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QuerySeriesTypeAsyncProcessor.java
index b72f0a2..f0a4fc6 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QuerySeriesTypeAsyncProcessor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QuerySeriesTypeAsyncProcessor.java
@@ -39,18 +39,21 @@ public class QuerySeriesTypeAsyncProcessor extends BasicAsyncUserProcessor<Query
   public void handleRequest(BizContext bizContext, AsyncContext asyncContext,
       QuerySeriesTypeRequest request) {
     String groupId = request.getGroupID();
-    final byte[] reqContext = RaftUtils.createRaftRequestContext();
-    DataPartitionRaftHolder dataPartitionHolder = RaftUtils.getDataPartitonRaftHolder(groupId);
 
     if (request.getReadConsistencyLevel() == ClusterConstant.WEAK_CONSISTENCY_LEVEL) {
       QuerySeriesTypeResponse response;
       try {
         response = QuerySeriesTypeResponse.createSuccessResponse(groupId, mManager.getSeriesType(request.getPath()));
+        response.addResult(true);
       } catch (final PathErrorException e) {
         response = QuerySeriesTypeResponse.createErrorResponse(groupId, e.getMessage());
+        response.addResult(false);
       }
       asyncContext.sendResponse(response);
     } else {
+      final byte[] reqContext = RaftUtils.createRaftRequestContext();
+      DataPartitionRaftHolder dataPartitionHolder = RaftUtils.getDataPartitonRaftHolder(groupId);
+
       ((RaftService) dataPartitionHolder.getService()).getNode()
           .readIndex(reqContext, new ReadIndexClosure() {
 
@@ -60,12 +63,15 @@ public class QuerySeriesTypeAsyncProcessor extends BasicAsyncUserProcessor<Query
               if (status.isOk()) {
                 try {
                   response = QuerySeriesTypeResponse.createSuccessResponse(groupId, mManager.getSeriesType(request.getPath()));
+                  response.addResult(true);
                 } catch (final PathErrorException e) {
                   response = QuerySeriesTypeResponse.createErrorResponse(groupId, e.getMessage());
+                  response.addResult(false);
                 }
               } else {
                 response = QuerySeriesTypeResponse
                     .createErrorResponse(groupId, status.getErrorMsg());
+                response.addResult(false);
               }
               asyncContext.sendResponse(response);
             }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryTimeSeriesAsyncProcessor.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryTimeSeriesAsyncProcessor.java
index a800302..c41fdcf 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryTimeSeriesAsyncProcessor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryTimeSeriesAsyncProcessor.java
@@ -40,19 +40,22 @@ public class QueryTimeSeriesAsyncProcessor extends BasicAsyncUserProcessor<Query
   public void handleRequest(BizContext bizContext, AsyncContext asyncContext,
       QueryTimeSeriesRequest request) {
     String groupId = request.getGroupID();
-    final byte[] reqContext = RaftUtils.createRaftRequestContext();
-    DataPartitionRaftHolder dataPartitionHolder = RaftUtils.getDataPartitonRaftHolder(groupId);
 
     if (request.getReadConsistencyLevel() == ClusterConstant.WEAK_CONSISTENCY_LEVEL) {
       QueryTimeSeriesResponse response = QueryTimeSeriesResponse
           .createEmptyResponse(groupId);
       try {
         queryTimeSeries(request, response);
+        response.addResult(true);
       } catch (final PathErrorException e) {
         response = QueryTimeSeriesResponse.createErrorResponse(groupId, e.getMessage());
+        response.addResult(false);
       }
       asyncContext.sendResponse(response);
     } else {
+      final byte[] reqContext = RaftUtils.createRaftRequestContext();
+      DataPartitionRaftHolder dataPartitionHolder = RaftUtils.getDataPartitonRaftHolder(groupId);
+
       ((RaftService) dataPartitionHolder.getService()).getNode()
           .readIndex(reqContext, new ReadIndexClosure() {
 
@@ -63,12 +66,15 @@ public class QueryTimeSeriesAsyncProcessor extends BasicAsyncUserProcessor<Query
               if (status.isOk()) {
                 try {
                   queryTimeSeries(request, response);
+                  response.addResult(true);
                 } catch (final PathErrorException e) {
                   response = QueryTimeSeriesResponse.createErrorResponse(groupId, e.getMessage());
+                  response.addResult(false);
                 }
               } else {
                 response = QueryTimeSeriesResponse
                     .createErrorResponse(groupId, status.getErrorMsg());
+                response.addResult(false);
               }
               asyncContext.sendResponse(response);
             }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/service/TSServiceClusterImpl.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/service/TSServiceClusterImpl.java
index 320398b..33e3e81 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/service/TSServiceClusterImpl.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/service/TSServiceClusterImpl.java
@@ -34,7 +34,6 @@ import org.apache.iotdb.db.auth.AuthException;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.db.exception.ProcessorException;
-import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.metadata.Metadata;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.service.TSServiceImpl;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/metadata/Metadata.java b/iotdb/src/main/java/org/apache/iotdb/db/metadata/Metadata.java
index 15b6011..c83a3dc 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/metadata/Metadata.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/metadata/Metadata.java
@@ -20,9 +20,11 @@ package org.apache.iotdb.db.metadata;
 
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
@@ -126,4 +128,66 @@ public class Metadata {
     return seriesMap.toString() + "\n" + deviceIdMap.toString();
   }
 
+  @Override
+  public boolean equals(Object obj) {
+    if(this == obj){
+      return true;
+    }
+    if(obj == null){
+      return false;
+    }
+    if(this.getClass() != obj.getClass()){
+      return false;
+    }
+
+    Metadata metadata = (Metadata) obj;
+    return seriesMapEquals(seriesMap, metadata.seriesMap) && deviceIdMapEquals(deviceIdMap, metadata.deviceIdMap);
+  }
+
+  /**
+   * only used to check if seriesMap is equal to another seriesMap
+   */
+  private boolean seriesMapEquals(Map<String, List<MeasurementSchema>> map1, Map<String, List<MeasurementSchema>> map2) {
+    if (!map1.keySet().equals(map2.keySet())) {
+      return false;
+    }
+
+    for (Entry<String, List<MeasurementSchema>> entry : map1.entrySet()) {
+      List list1 = entry.getValue();
+      List list2 = map2.get(entry.getKey());
+
+      if (!listEquals(list1, list2)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * only used to check if deviceIdMap is equal to another deviceIdMap
+   */
+  private boolean deviceIdMapEquals(Map<String, List<String>> map1, Map<String, List<String>> map2) {
+    if (!map1.keySet().equals(map2.keySet())) {
+      return false;
+    }
+
+    for (Entry<String, List<String>> entry : map1.entrySet()) {
+      List list1 = entry.getValue();
+      List list2 = map2.get(entry.getKey());
+
+      if (!listEquals(list1, list2)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private boolean listEquals(List list1, List list2) {
+    Set set1 = new HashSet();
+    set1.addAll(list1);
+    Set set2 = new HashSet();
+    set2.addAll(list2);
+
+    return set1.equals(set2);
+  }
 }
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/metadata/MetadataTest.java b/iotdb/src/test/java/org/apache/iotdb/db/metadata/MetadataTest.java
new file mode 100644
index 0000000..9e1adc7
--- /dev/null
+++ b/iotdb/src/test/java/org/apache/iotdb/db/metadata/MetadataTest.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.metadata;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.iotdb.db.exception.PathErrorException;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class MetadataTest {
+
+  @Before
+  public void setUp() throws Exception {
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    EnvironmentUtils.cleanEnv();
+  }
+
+  @Test
+  public void testCombineMetadatas() {
+    MManager manager = MManager.getInstance();
+
+    try {
+      manager.setStorageLevelToMTree("root.t.d1");
+      manager.addPathToMTree("root.t.d1.s0", "INT32", "RLE");
+      manager.addPathToMTree("root.t.d1.s1", "DOUBLE", "RLE");
+      manager.setStorageLevelToMTree("root.t.d2");
+      manager.addPathToMTree("root.t.d2.s1", "DOUBLE", "RLE");
+      Metadata metadata1 = manager.getMetadata();
+
+      manager.clear();
+
+      manager.setStorageLevelToMTree("root.t.d3");
+      manager.addPathToMTree("root.t.d3.s1", "DOUBLE", "RLE");
+      manager.addPathToMTree("root.t.d3.s2", "TEXT", "RLE");
+      manager.setStorageLevelToMTree("root.t1.d1");
+      manager.addPathToMTree("root.t1.d1.s1", "DOUBLE", "RLE");
+      manager.addPathToMTree("root.t1.d1.s2", "TEXT", "RLE");
+      Metadata metadata2 = manager.getMetadata();
+
+      manager.clear();
+
+      manager.setStorageLevelToMTree("root.t.d1");
+      manager.addPathToMTree("root.t.d1.s0", "INT32", "RLE");
+      manager.addPathToMTree("root.t.d1.s1", "DOUBLE", "RLE");
+      manager.setStorageLevelToMTree("root.t.d2");
+      manager.addPathToMTree("root.t.d2.s1", "DOUBLE", "RLE");
+      manager.setStorageLevelToMTree("root.t.d3");
+      manager.addPathToMTree("root.t.d3.s1", "DOUBLE", "RLE");
+      manager.addPathToMTree("root.t.d3.s2", "TEXT", "RLE");
+      manager.setStorageLevelToMTree("root.t1.d1");
+      manager.addPathToMTree("root.t1.d1.s1", "DOUBLE", "RLE");
+      manager.addPathToMTree("root.t1.d1.s2", "TEXT", "RLE");
+      Metadata metadata = manager.getMetadata();
+
+      Metadata combineMetadata = Metadata.combineMetadatas(new Metadata[]{metadata1, metadata2});
+      assertTrue(metadata.equals(combineMetadata));
+    } catch (PathErrorException | IOException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+}
\ No newline at end of file