You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/06/08 16:11:14 UTC

[incubator-iotdb] branch cluster updated (0089e73 -> 6b94837)

This is an automated email from the ASF dual-hosted git repository.

lta pushed a change to branch cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git.


    from 0089e73  Merge t pushbranch 'cluster' of github.com:apache/incubator-iotdb into cluster
     add 68844b2  fix missing wal log node after recovery
     add 6f40c05  Merge pull request #186 from apache/fix_missing_wal_node
     add a7c0221  simplify Path construction
     add 8a642a4  Merge pull request #193 from apache/f_construct_path
     new b2c61a0  Merge remote-tracking branch 'origin/master' into cluster
     new b4c70a6  fix a serve bug of set consistency level
     new db7d992  add error statement
     new 6b94837  fix sonar issues

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/iotdb/cluster/config/ClusterConfig.java | 10 ++++----
 .../iotdb/cluster/config/ClusterDescriptor.java    | 18 ++++++++++-----
 .../org/apache/iotdb/cluster/entity/Server.java    |  4 ++--
 .../cluster/qp/executor/AbstractQPExecutor.java    | 18 ++++++---------
 .../qp/executor/ClusterQueryProcessExecutor.java   |  2 +-
 .../cluster/qp/executor/NonQueryExecutor.java      |  4 ++--
 .../cluster/qp/executor/QueryMetadataExecutor.java | 18 +++++++--------
 .../apache/iotdb/cluster/qp/task/BatchQPTask.java  |  4 ++--
 .../org/apache/iotdb/cluster/qp/task/QPTask.java   |  1 +
 .../apache/iotdb/cluster/qp/task/SingleQPTask.java |  6 +----
 .../ClusterGroupByDataSetWithOnlyTimeFilter.java   |  1 +
 .../executor/ClusterExecutorWithTimeGenerator.java |  3 ---
 .../cluster/query/executor/ClusterQueryRouter.java |  2 +-
 .../querynode/ClusterLocalQueryManager.java        |  1 -
 .../querynode/ClusterLocalSingleQueryManager.java  |  8 +++----
 .../ClusterFillSelectSeriesBatchReader.java        |  1 -
 .../querynode/ClusterSelectSeriesBatchReader.java  |  4 ++--
 .../ClusterSelectSeriesBatchReaderByTimestamp.java |  4 ++--
 .../ClusterSelectSeriesBatchReaderEntity.java      |  8 +++----
 ...r.java => IClusterSelectSeriesBatchReader.java} |  4 ++--
 .../cluster/query/utils/ClusterRpcReaderUtils.java |  2 +-
 .../rpc/raft/impl/RaftNodeAsClientManager.java     |  4 ----
 .../querymetadata/QueryMetadataAsyncProcessor.java |  1 -
 .../QueryMetadataInStringAsyncProcessor.java       |  1 -
 .../querymetadata/QueryPathsAsyncProcessor.java    |  1 -
 .../QuerySeriesTypeAsyncProcessor.java             |  1 -
 .../QueryTimeSeriesAsyncProcessor.java             |  1 -
 .../cluster/service/TSServiceClusterImpl.java      |  2 +-
 .../org/apache/iotdb/cluster/utils/RaftUtils.java  |  8 ++-----
 .../cluster/config/ClusterDescriptorTest.java      |  4 ++--
 .../query/manager/ClusterLocalManagerTest.java     | 27 +++++++++++-----------
 iotdb/iotdb/conf/iotdb-cluster.properties          | 10 ++++----
 .../engine/bufferwrite/BufferWriteProcessor.java   | 24 ++++++++++---------
 .../iotdb/db/engine/filenode/FileNodeManager.java  | 12 ++++++++--
 .../db/engine/overflow/io/OverflowProcessor.java   | 18 ++++++++-------
 .../iotdb/db/qp/physical/crud/InsertPlan.java      |  2 +-
 .../qp/strategy/optimizer/ConcatPathOptimizer.java |  4 ++--
 .../org/apache/iotdb/tsfile/read/common/Path.java  | 11 ++++++---
 38 files changed, 128 insertions(+), 126 deletions(-)
 rename cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/{AbstractClusterSelectSeriesBatchReader.java => IClusterSelectSeriesBatchReader.java} (88%)


[incubator-iotdb] 02/04: fix a serve bug of set consistency level

Posted by lt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit b4c70a61d2cdbe1d26bf50aac968731c65df54b5
Author: lta <li...@163.com>
AuthorDate: Sat Jun 8 22:12:48 2019 +0800

    fix a serve bug of set consistency level
---
 .../org/apache/iotdb/cluster/config/ClusterConfig.java | 10 ++++++----
 .../apache/iotdb/cluster/config/ClusterDescriptor.java | 18 ++++++++++++------
 .../iotdb/cluster/qp/executor/AbstractQPExecutor.java  |  1 +
 .../qp/executor/ClusterQueryProcessExecutor.java       |  2 +-
 .../cluster/qp/executor/QueryMetadataExecutor.java     |  2 +-
 .../iotdb/cluster/config/ClusterDescriptorTest.java    |  4 ++--
 6 files changed, 23 insertions(+), 14 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
index 230afcb..95905dd 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
@@ -121,14 +121,16 @@ public class ClusterConfig {
   private int maxQueueNumOfQPTask = 500;
 
   /**
-   * ReadMetadataConsistencyLevel: 1 Strong consistency, 2 Weak consistency
+   * ReadMetadataConsistencyLevel: strong or weak. Default consistency level is strong.
+   * This parameter is case-insensitive.
    */
-  private int readMetadataConsistencyLevel = 1;
+  private int readMetadataConsistencyLevel = ClusterConsistencyLevel.STRONG.ordinal();
 
   /**
-   * ReadDataConsistencyLevel: 1 Strong consistency, 2 Weak consistency
+   * ReadDataConsistencyLevel: strong or weak. Default consistency level is strong.
+   * This parameter is case-insensitive.
    */
-  private int readDataConsistencyLevel = 1;
+  private int readDataConsistencyLevel = ClusterConsistencyLevel.STRONG.ordinal();
 
   /**
    * Maximum number of threads which execute tasks generated by client requests concurrently. Each
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
index 3251dbd..2e67d6a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
@@ -148,13 +148,19 @@ public class ClusterDescriptor {
           .parseInt(properties.getProperty("max_queue_num_of_inner_rpc_client",
               Integer.toString(conf.getMaxQueueNumOfQPTask()))));
 
-      conf.setReadMetadataConsistencyLevel(Integer
-          .parseInt(properties.getProperty("read_metadata_consistency_level",
-              Integer.toString(conf.getReadMetadataConsistencyLevel()))));
+      String readMetadataLevelName = properties.getProperty("read_metadata_consistency_level", "");
+      int readMetadataLevel = ClusterConsistencyLevel.getLevel(readMetadataLevelName);
+      if(readMetadataLevel == ClusterConsistencyLevel.UNSUPPORT_LEVEL){
+        readMetadataLevel = ClusterConsistencyLevel.STRONG.ordinal();
+      }
+      conf.setReadMetadataConsistencyLevel(readMetadataLevel);
 
-      conf.setReadDataConsistencyLevel(Integer
-          .parseInt(properties.getProperty("read_data_consistency_level",
-              Integer.toString(conf.getReadDataConsistencyLevel()))));
+      String readDataLevelName = properties.getProperty("read_data_consistency_level", "");
+      int readDataLevel = ClusterConsistencyLevel.getLevel(readDataLevelName);
+      if(readDataLevel == ClusterConsistencyLevel.UNSUPPORT_LEVEL){
+        readDataLevel = ClusterConsistencyLevel.STRONG.ordinal();
+      }
+      conf.setReadDataConsistencyLevel(readDataLevel);
 
       conf.setConcurrentQPSubTaskThread(Integer
           .parseInt(properties.getProperty("concurrent_qp_sub_task_thread",
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/AbstractQPExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/AbstractQPExecutor.java
index 44049cf..2a721f3 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/AbstractQPExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/AbstractQPExecutor.java
@@ -22,6 +22,7 @@ import com.alipay.sofa.jraft.entity.PeerId;
 import java.util.HashSet;
 import java.util.Set;
 import org.apache.iotdb.cluster.config.ClusterConfig;
+import org.apache.iotdb.cluster.config.ClusterConsistencyLevel;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.entity.Server;
 import org.apache.iotdb.cluster.exception.ConsistencyLevelException;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/ClusterQueryProcessExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/ClusterQueryProcessExecutor.java
index e8e0aec..39324d8 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/ClusterQueryProcessExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/ClusterQueryProcessExecutor.java
@@ -126,7 +126,7 @@ public class ClusterQueryProcessExecutor extends AbstractQPExecutor implements I
   public List<String> getAllPaths(String originPath)
       throws PathErrorException {
     try {
-      LOGGER.debug("read metadata level :" + getReadMetadataConsistencyLevel());
+      LOGGER.debug(String.format("read metadata level :%d", getReadMetadataConsistencyLevel()));
       return queryMetadataExecutor.processPathsQuery(originPath);
     } catch (InterruptedException | ProcessorException e) {
       throw new PathErrorException(e.getMessage());
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
index 92f4c97..4270c2b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
@@ -289,7 +289,7 @@ public class QueryMetadataExecutor extends AbstractQPExecutor {
 
   public TSDataType processSeriesTypeQuery(String path)
       throws InterruptedException, ProcessorException, PathErrorException {
-    TSDataType dataType = null;
+    TSDataType dataType;
     List<String> storageGroupList = mManager.getAllFileNamesByPath(path);
     if (storageGroupList.size() != 1) {
       throw new PathErrorException("path " + path + " is not valid.");
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/config/ClusterDescriptorTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/config/ClusterDescriptorTest.java
index 4bb6449..4c3286e 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/config/ClusterDescriptorTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/config/ClusterDescriptorTest.java
@@ -54,8 +54,8 @@ public class ClusterDescriptorTest {
   private String testVNodesNew = "4";
   private String testClientNumNew = "400000";
   private String testQueueLenNew = "300000";
-  private String testMetadataConsistencyNew = "2";
-  private String testDataConsistencyNew = "4";
+  private String testMetadataConsistencyNew = String.valueOf(ClusterConsistencyLevel.STRONG.ordinal());
+  private String testDataConsistencyNew = String.valueOf(ClusterConsistencyLevel.STRONG.ordinal());
   private String testConcurrentQPTaskThreadNew = "6";
   private String testConcurrentRaftTaskThreadNew = "11";
 


[incubator-iotdb] 03/04: add error statement

Posted by lt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit db7d9921f4ea5c2127b2e369f23375d4c84c0738
Author: lta <li...@163.com>
AuthorDate: Sun Jun 9 00:02:13 2019 +0800

    add error statement
---
 .../iotdb/cluster/qp/executor/AbstractQPExecutor.java | 19 +++++++------------
 .../iotdb/cluster/qp/executor/NonQueryExecutor.java   |  4 ++--
 .../cluster/qp/executor/QueryMetadataExecutor.java    | 15 ++++++++-------
 .../org/apache/iotdb/cluster/qp/task/BatchQPTask.java |  2 +-
 .../java/org/apache/iotdb/cluster/qp/task/QPTask.java |  1 +
 .../apache/iotdb/cluster/qp/task/SingleQPTask.java    |  4 +---
 .../cluster/query/executor/ClusterQueryRouter.java    |  2 +-
 .../cluster/query/utils/ClusterRpcReaderUtils.java    |  2 +-
 .../rpc/raft/impl/RaftNodeAsClientManager.java        |  1 -
 .../iotdb/cluster/service/TSServiceClusterImpl.java   |  2 +-
 iotdb/iotdb/conf/iotdb-cluster.properties             | 10 ++++++----
 .../db/qp/strategy/optimizer/ConcatPathOptimizer.java |  4 ++--
 12 files changed, 31 insertions(+), 35 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/AbstractQPExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/AbstractQPExecutor.java
index 2a721f3..5059e06 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/AbstractQPExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/AbstractQPExecutor.java
@@ -22,10 +22,8 @@ import com.alipay.sofa.jraft.entity.PeerId;
 import java.util.HashSet;
 import java.util.Set;
 import org.apache.iotdb.cluster.config.ClusterConfig;
-import org.apache.iotdb.cluster.config.ClusterConsistencyLevel;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.entity.Server;
-import org.apache.iotdb.cluster.exception.ConsistencyLevelException;
 import org.apache.iotdb.cluster.exception.RaftConnectionException;
 import org.apache.iotdb.cluster.qp.task.QPTask;
 import org.apache.iotdb.cluster.qp.task.QPTask.TaskState;
@@ -96,14 +94,14 @@ public abstract class AbstractQPExecutor {
       throws InterruptedException, RaftConnectionException {
     PeerId firstNode = task.getTargetNode();
     RaftUtils.updatePeerIDOrder(firstNode, groupId);
-    BasicResponse response = null;
+    BasicResponse response;
     try {
       asyncSendSingleTask(task, taskRetryNum);
       response = syncGetSingleTaskRes(task, taskRetryNum, taskInfo, groupId, downNodeSet);
+      return response;
     } catch (RaftConnectionException ex) {
-      boolean success = false;
       downNodeSet.add(firstNode);
-      while (!success) {
+      while (true) {
         PeerId nextNode = null;
         try {
           nextNode = RaftUtils.getPeerIDInOrder(groupId);
@@ -115,22 +113,19 @@ public abstract class AbstractQPExecutor {
               nextNode);
           task.resetTask();
           task.setTargetNode(nextNode);
-          task.setTaskState(TaskState.INITIAL);
           asyncSendSingleTask(task, taskRetryNum);
           response = syncGetSingleTaskRes(task, taskRetryNum, taskInfo, groupId, downNodeSet);
           LOGGER.debug("{} task for group {} to node {} succeed.", taskInfo, groupId, nextNode);
-          success = true;
+          return response;
         } catch (RaftConnectionException e1) {
           LOGGER.debug("{} task for group {} to node {} fail.", taskInfo, groupId, nextNode);
           downNodeSet.add(nextNode);
         }
       }
-      LOGGER.debug("The final result for {} task is {}", taskInfo, success);
-      if (!success) {
-        throw ex;
-      }
+      throw new RaftConnectionException(String
+          .format("Can not %s in all nodes of group<%s>, please check cluster status.",
+              taskInfo, groupId));
     }
-    return response;
   }
 
   protected BasicResponse syncHandleSingleTaskGetRes(SingleQPTask task, int taskRetryNum, String taskInfo, String groupId)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
index b7dd242..1e6abea 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
@@ -88,7 +88,7 @@ public class NonQueryExecutor extends AbstractQPExecutor {
       return handleNonQueryRequest(groupId, plan);
     } catch (RaftConnectionException e) {
       LOGGER.error(e.getMessage());
-      throw new ProcessorException("Raft connection occurs error.", e);
+      throw new ProcessorException(e.getMessage());
     } catch (InterruptedException | PathErrorException | IOException e) {
       throw new ProcessorException(e);
     }
@@ -327,7 +327,7 @@ public class NonQueryExecutor extends AbstractQPExecutor {
     } else {
       PeerId leader = RaftUtils.getLocalLeaderPeerID(groupId);
       qpTask.setTargetNode(leader);
-      return syncHandleSingleTask(qpTask, "non-query", groupId);
+      return syncHandleSingleTask(qpTask, "execute non-query", groupId);
     }
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
index 4270c2b..2d39863 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
@@ -29,7 +29,6 @@ import java.util.Map.Entry;
 import java.util.Set;
 import org.apache.iotdb.cluster.config.ClusterConfig;
 import org.apache.iotdb.cluster.config.ClusterConsistencyLevel;
-import org.apache.iotdb.cluster.config.ClusterConstant;
 import org.apache.iotdb.cluster.entity.raft.MetadataRaftHolder;
 import org.apache.iotdb.cluster.entity.raft.RaftService;
 import org.apache.iotdb.cluster.exception.RaftConnectionException;
@@ -68,7 +67,6 @@ public class QueryMetadataExecutor extends AbstractQPExecutor {
   private static final Logger LOGGER = LoggerFactory.getLogger(QueryMetadataExecutor.class);
   private static final String DOUB_SEPARATOR = "\\.";
   private static final char SINGLE_SEPARATOR = '.';
-  private static final String RAFT_CONNECTION_ERROR = "Raft connection occurs error.";
 
   public QueryMetadataExecutor() {
     super();
@@ -151,13 +149,13 @@ public class QueryMetadataExecutor extends AbstractQPExecutor {
           holder);
       res.addAll(queryTimeSeries(task, pathList, groupId));
     } catch (RaftConnectionException e) {
-      throw new ProcessorException(RAFT_CONNECTION_ERROR, e);
+      throw new ProcessorException(e.getMessage());
     }
   }
 
   private List<List<String>> queryTimeSeries(SingleQPTask task, List<String> pathList, String groupId)
       throws InterruptedException, RaftConnectionException {
-    BasicResponse response = syncHandleSingleTaskGetRes(task, 0, "show timeseries " + pathList, groupId);
+    BasicResponse response = syncHandleSingleTaskGetRes(task, 0, "query timeseries " + pathList, groupId);
     return response == null ? new ArrayList<>()
         : ((QueryTimeSeriesResponse) response).getTimeSeries();
   }
@@ -266,7 +264,10 @@ public class QueryMetadataExecutor extends AbstractQPExecutor {
         }
         LOGGER.debug("The final result for query metadata task is {}", success);
         if (!success) {
-          throw new ProcessorException(RAFT_CONNECTION_ERROR, e);
+          throw new ProcessorException(String
+              .format(
+                  "Can not query metadata in all nodes of group<%s>, please check cluster status.",
+                  groupId));
         }
       }
     }
@@ -316,7 +317,7 @@ public class QueryMetadataExecutor extends AbstractQPExecutor {
             holder);
         dataType = querySeriesType(task, path, groupId);
       } catch (RaftConnectionException e) {
-        throw new ProcessorException(RAFT_CONNECTION_ERROR, e);
+        throw new ProcessorException(e.getMessage());
       }
     }
     return dataType;
@@ -377,7 +378,7 @@ public class QueryMetadataExecutor extends AbstractQPExecutor {
           .debug("Send get paths for {} task for group {} to node {}.", pathList, groupId, holder);
       res.addAll(queryPaths(task, pathList, groupId));
     } catch (RaftConnectionException e) {
-      throw new ProcessorException(RAFT_CONNECTION_ERROR, e);
+      throw new ProcessorException(e.getMessage());
     }
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/BatchQPTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/BatchQPTask.java
index 8548879..c2b290f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/BatchQPTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/BatchQPTask.java
@@ -180,7 +180,7 @@ public class BatchQPTask extends MultiQPTask {
    */
   private void executeRpcSubTask(SingleQPTask subTask, String groupId) {
     try {
-      executor.syncHandleSingleTask(subTask, "sub non-query", groupId);
+      executor.syncHandleSingleTask(subTask, "execute sub non-query", groupId);
       this.receive(subTask.getResponse());
     } catch (RaftConnectionException | InterruptedException e) {
       LOGGER.error("Async handle sub task failed.");
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/QPTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/QPTask.java
index b86e92a..f3182c9 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/QPTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/QPTask.java
@@ -100,6 +100,7 @@ public abstract class QPTask {
 
   public void resetTask() {
     this.taskCountDownLatch = new CountDownLatch(taskNum);
+    this.taskState = TaskState.INITIAL;
   }
 
   public TaskState getTaskState() {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/SingleQPTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/SingleQPTask.java
index c684cf1..1faef65 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/SingleQPTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/SingleQPTask.java
@@ -28,8 +28,6 @@ import org.slf4j.LoggerFactory;
  */
 public class SingleQPTask extends QPTask {
 
-  private static final Logger LOGGER = LoggerFactory.getLogger(SingleQPTask.class);
-
   private static final int TASK_NUM = 1;
 
   public SingleQPTask(boolean isSyncTask, BasicRequest request) {
@@ -45,7 +43,7 @@ public class SingleQPTask extends QPTask {
     if(taskState != TaskState.EXCEPTION) {
       this.response = response;
       if(response == null){
-        LOGGER.error("Response is null");
+        this.taskState = TaskState.RAFT_CONNECTION_EXCEPTION;
       } else if (response.isRedirected()) {
         this.taskState = TaskState.REDIRECT;
       } else {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterQueryRouter.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterQueryRouter.java
index d56540e..3db7c6a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterQueryRouter.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterQueryRouter.java
@@ -92,7 +92,7 @@ public class ClusterQueryRouter extends AbstractQueryRouter {
         return engineExecutor.execute(context);
       }
     } catch (QueryFilterOptimizationException | IOException | RaftConnectionException e) {
-      throw new FileNodeManagerException(e);
+      throw new FileNodeManagerException(e.getMessage());
     }
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterRpcReaderUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterRpcReaderUtils.java
index bd61375..bab0a67 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterRpcReaderUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterRpcReaderUtils.java
@@ -73,7 +73,7 @@ public class ClusterRpcReaderUtils {
       }
     }
     throw new RaftConnectionException(
-        String.format("Can not init series reader in all nodes of group<%s>.", groupId));
+        String.format("Can not init series reader in all nodes of group<%s>, please check cluster status.", groupId));
   }
 
   /**
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
index 088177f..8913086 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
@@ -224,7 +224,6 @@ public class RaftNodeAsClientManager {
         qpTask.receive(response);
       } catch (RemotingException | InterruptedException e) {
         LOGGER.error(e.getMessage());
-        qpTask.setTaskState(TaskState.RAFT_CONNECTION_EXCEPTION);
         qpTask.receive(null);
       }
     }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/service/TSServiceClusterImpl.java b/cluster/src/main/java/org/apache/iotdb/cluster/service/TSServiceClusterImpl.java
index b2b4fab..505afd9 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/service/TSServiceClusterImpl.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/service/TSServiceClusterImpl.java
@@ -321,7 +321,7 @@ public class TSServiceClusterImpl extends TSServiceImpl {
     contextMapLocal.get().put(req.queryId, context);
 
     queryManager.addSingleQuery(jobId, (QueryPlan) physicalPlan);
-    QueryDataSet queryDataSet = processor.getExecutor().processQuery((QueryPlan) physicalPlan,
+    QueryDataSet queryDataSet = processor.getExecutor().processQuery(physicalPlan,
         context);
     try {
       queryRet.get().put(statement, queryDataSet);
diff --git a/iotdb/iotdb/conf/iotdb-cluster.properties b/iotdb/iotdb/conf/iotdb-cluster.properties
index 2761b22..c85ecc6 100644
--- a/iotdb/iotdb/conf/iotdb-cluster.properties
+++ b/iotdb/iotdb/conf/iotdb-cluster.properties
@@ -77,11 +77,13 @@ concurrent_inner_rpc_client_thread = 0
 # waiting qp tasks exceed to this number, new qp task will be rejected.
 max_queue_num_of_inner_rpc_client = 500
 
-# ReadMetadataConsistencyLevel: 1  Strong consistency, 2  Weak consistency
-read_metadata_consistency_level = 1
+# ReadMetadataConsistencyLevel: strong or weak. Default consistency level is strong.
+# This parameter is case-insensitive.
+read_metadata_consistency_level = strong
 
-# ReadDataConsistencyLevel: 1  Strong consistency, 2  Weak consistency
-read_data_consistency_level = 1
+# ReadDataConsistencyLevel: strong or weak. Default consistency level is strong.
+# This parameter is case-insensitive.
+read_data_consistency_level = strong
 
 # Maximum number of threads which execute tasks generated by client requests concurrently.
 # Each client request corresponds to a QP Task. A QP task may be divided into several sub-tasks.
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/strategy/optimizer/ConcatPathOptimizer.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/strategy/optimizer/ConcatPathOptimizer.java
index 9d1d618..689a05d 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/strategy/optimizer/ConcatPathOptimizer.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/strategy/optimizer/ConcatPathOptimizer.java
@@ -27,7 +27,6 @@ import org.apache.iotdb.db.exception.qp.LogicalOperatorException;
 import org.apache.iotdb.db.exception.qp.LogicalOptimizeException;
 import org.apache.iotdb.db.qp.constant.SQLConstant;
 import org.apache.iotdb.db.qp.executor.IQueryProcessExecutor;
-import org.apache.iotdb.db.qp.executor.QueryProcessExecutor;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.logical.crud.BasicFunctionOperator;
 import org.apache.iotdb.db.qp.logical.crud.FilterOperator;
@@ -355,7 +354,8 @@ public class ConcatPathOptimizer implements ILogicalOptimizer {
           }
         }
       } catch (PathErrorException e) {
-        throw new LogicalOptimizeException("error when remove star: ", e);
+        throw new LogicalOptimizeException(
+            String.format("error when remove star: %s", e.getMessage()), e);
       }
     }
     if (retPaths.isEmpty()) {


[incubator-iotdb] 01/04: Merge remote-tracking branch 'origin/master' into cluster

Posted by lt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit b2c61a0084da23db5b3595d1afb585661eacb7bb
Merge: 0089e73 8a642a4
Author: lta <li...@163.com>
AuthorDate: Sat Jun 8 19:32:15 2019 +0800

    Merge remote-tracking branch 'origin/master' into cluster

 .../engine/bufferwrite/BufferWriteProcessor.java   | 24 ++++++++++++----------
 .../iotdb/db/engine/filenode/FileNodeManager.java  | 12 +++++++++--
 .../db/engine/overflow/io/OverflowProcessor.java   | 18 ++++++++--------
 .../iotdb/db/qp/physical/crud/InsertPlan.java      |  2 +-
 .../org/apache/iotdb/tsfile/read/common/Path.java  | 11 +++++++---
 5 files changed, 42 insertions(+), 25 deletions(-)


[incubator-iotdb] 04/04: fix sonar issues

Posted by lt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 6b948375b6876ef2df00f7944a836c2c45b64b91
Author: lta <li...@163.com>
AuthorDate: Sun Jun 9 00:10:51 2019 +0800

    fix sonar issues
---
 .../org/apache/iotdb/cluster/entity/Server.java    |  4 ++--
 .../cluster/qp/executor/QueryMetadataExecutor.java |  1 -
 .../apache/iotdb/cluster/qp/task/BatchQPTask.java  |  2 +-
 .../apache/iotdb/cluster/qp/task/SingleQPTask.java |  2 --
 .../ClusterGroupByDataSetWithOnlyTimeFilter.java   |  1 +
 .../executor/ClusterExecutorWithTimeGenerator.java |  3 ---
 .../querynode/ClusterLocalQueryManager.java        |  1 -
 .../querynode/ClusterLocalSingleQueryManager.java  |  8 +++----
 .../ClusterFillSelectSeriesBatchReader.java        |  1 -
 .../querynode/ClusterSelectSeriesBatchReader.java  |  4 ++--
 .../ClusterSelectSeriesBatchReaderByTimestamp.java |  4 ++--
 .../ClusterSelectSeriesBatchReaderEntity.java      |  8 +++----
 ...r.java => IClusterSelectSeriesBatchReader.java} |  4 ++--
 .../rpc/raft/impl/RaftNodeAsClientManager.java     |  3 ---
 .../querymetadata/QueryMetadataAsyncProcessor.java |  1 -
 .../QueryMetadataInStringAsyncProcessor.java       |  1 -
 .../querymetadata/QueryPathsAsyncProcessor.java    |  1 -
 .../QuerySeriesTypeAsyncProcessor.java             |  1 -
 .../QueryTimeSeriesAsyncProcessor.java             |  1 -
 .../org/apache/iotdb/cluster/utils/RaftUtils.java  |  8 ++-----
 .../query/manager/ClusterLocalManagerTest.java     | 27 +++++++++++-----------
 21 files changed, 33 insertions(+), 53 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java b/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
index 418b625..41c4cb1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
@@ -97,13 +97,13 @@ public class Server {
   private RegisterManager registerManager = new RegisterManager();
 
   public static void main(String[] args)
-      throws ProcessorException, InterruptedException, RaftConnectionException, FileNodeManagerException {
+      throws ProcessorException, RaftConnectionException, FileNodeManagerException {
     Server server = Server.getInstance();
     server.start();
   }
 
   public void start()
-      throws ProcessorException, InterruptedException, RaftConnectionException, FileNodeManagerException {
+      throws ProcessorException, RaftConnectionException, FileNodeManagerException {
 
     /** Stand-alone version of IoTDB, be careful to replace the internal JDBC Server with a cluster version **/
     iotdb = new IoTDB();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
index 2d39863..ce4b920 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
@@ -259,7 +259,6 @@ public class QueryMetadataExecutor extends AbstractQPExecutor {
             success = true;
           } catch (RaftConnectionException e1) {
             LOGGER.debug("Query metadata task for group {} to node {} fail.", groupId, nextNode);
-            continue;
           }
         }
         LOGGER.debug("The final result for query metadata task is {}", success);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/BatchQPTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/BatchQPTask.java
index c2b290f..8ede57d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/BatchQPTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/BatchQPTask.java
@@ -109,7 +109,7 @@ public class BatchQPTask extends MultiQPTask {
         batchResult.setAllSuccessful(false);
       }
     } catch (Exception ex) {
-      ex.printStackTrace();
+      LOGGER.error("Execute batch statement occurs error.", ex);
     } finally {
       lock.unlock();
     }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/SingleQPTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/SingleQPTask.java
index 1faef65..16ddf60 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/SingleQPTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/SingleQPTask.java
@@ -20,8 +20,6 @@ package org.apache.iotdb.cluster.qp.task;
 
 import org.apache.iotdb.cluster.rpc.raft.request.BasicRequest;
 import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Process task(s) for only one raft group, which is used for operations except for querying data.
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/dataset/ClusterGroupByDataSetWithOnlyTimeFilter.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/dataset/ClusterGroupByDataSetWithOnlyTimeFilter.java
index ef5386d..599439a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/dataset/ClusterGroupByDataSetWithOnlyTimeFilter.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/dataset/ClusterGroupByDataSetWithOnlyTimeFilter.java
@@ -72,6 +72,7 @@ public class ClusterGroupByDataSetWithOnlyTimeFilter extends GroupByWithOnlyTime
   /**
    * init reader and aggregate function.
    */
+  @Override
   public void initGroupBy(QueryContext context, List<String> aggres, IExpression expression)
       throws FileNodeManagerException, PathErrorException, ProcessorException, IOException {
     initAggreFuction(aggres);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterExecutorWithTimeGenerator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterExecutorWithTimeGenerator.java
index fe2511a..0cdd457 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterExecutorWithTimeGenerator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterExecutorWithTimeGenerator.java
@@ -22,17 +22,14 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 import org.apache.iotdb.cluster.query.dataset.ClusterDataSetWithTimeGenerator;
 import org.apache.iotdb.cluster.query.factory.ClusterSeriesReaderFactory;
 import org.apache.iotdb.cluster.query.manager.coordinatornode.ClusterRpcSingleQueryManager;
 import org.apache.iotdb.cluster.query.manager.coordinatornode.FilterSeriesGroupEntity;
-import org.apache.iotdb.cluster.query.reader.coordinatornode.ClusterSelectSeriesReader;
 import org.apache.iotdb.cluster.query.timegenerator.ClusterTimeGenerator;
 import org.apache.iotdb.db.exception.FileNodeManagerException;
 import org.apache.iotdb.db.exception.PathErrorException;
-import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.QueryResourceManager;
 import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalQueryManager.java
index a602c84..e6149f2 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalQueryManager.java
@@ -25,7 +25,6 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
-import org.apache.iotdb.cluster.query.manager.coordinatornode.ClusterRpcSingleQueryManager;
 import org.apache.iotdb.cluster.rpc.raft.request.querydata.InitSeriesReaderRequest;
 import org.apache.iotdb.cluster.rpc.raft.request.querydata.QuerySeriesDataByTimestampRequest;
 import org.apache.iotdb.cluster.rpc.raft.request.querydata.QuerySeriesDataRequest;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java
index f2a9ca3..097f24d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java
@@ -28,7 +28,7 @@ import org.apache.iotdb.cluster.concurrent.pool.QueryTimerThreadManager;
 import org.apache.iotdb.cluster.config.ClusterConstant;
 import org.apache.iotdb.cluster.query.PathType;
 import org.apache.iotdb.cluster.query.factory.ClusterSeriesReaderFactory;
-import org.apache.iotdb.cluster.query.reader.querynode.AbstractClusterSelectSeriesBatchReader;
+import org.apache.iotdb.cluster.query.reader.querynode.IClusterSelectSeriesBatchReader;
 import org.apache.iotdb.cluster.query.reader.querynode.ClusterFillSelectSeriesBatchReader;
 import org.apache.iotdb.cluster.query.reader.querynode.ClusterFilterSeriesBatchReaderEntity;
 import org.apache.iotdb.cluster.query.reader.querynode.ClusterGroupBySelectSeriesBatchReaderEntity;
@@ -417,9 +417,9 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM
     long targetQueryRounds = request.getQueryRounds();
     if (targetQueryRounds != this.queryRound) {
       this.queryRound = targetQueryRounds;
-      List<AbstractClusterSelectSeriesBatchReader> readers = selectReaderEntity.getAllReaders();
+      List<IClusterSelectSeriesBatchReader> readers = selectReaderEntity.getAllReaders();
       List<BatchData> batchDataList = new ArrayList<>();
-      for (AbstractClusterSelectSeriesBatchReader reader : readers) {
+      for (IClusterSelectSeriesBatchReader reader : readers) {
         batchDataList.add(reader.nextBatch(request.getBatchTimestamp()));
       }
       cachedBatchDataResult = batchDataList;
@@ -443,7 +443,7 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM
   private List<BatchData> readSelectSeriesBatchData(List<Integer> seriesIndexs) throws IOException {
     List<BatchData> batchDataList = new ArrayList<>();
     for (int index : seriesIndexs) {
-      AbstractClusterSelectSeriesBatchReader reader = selectReaderEntity.getReaderByIndex(index);
+      IClusterSelectSeriesBatchReader reader = selectReaderEntity.getReaderByIndex(index);
       batchDataList.add(reader.nextBatch());
     }
     return batchDataList;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterFillSelectSeriesBatchReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterFillSelectSeriesBatchReader.java
index a16a220..a581d07 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterFillSelectSeriesBatchReader.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterFillSelectSeriesBatchReader.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.cluster.query.reader.querynode;
 import java.io.IOException;
 import org.apache.iotdb.cluster.query.common.ClusterNullableBatchData;
 import org.apache.iotdb.db.query.reader.IPointReader;
-import org.apache.iotdb.db.utils.TimeValuePair;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.BatchData;
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterSelectSeriesBatchReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterSelectSeriesBatchReader.java
index cbbad2e..b3c05d8 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterSelectSeriesBatchReader.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterSelectSeriesBatchReader.java
@@ -30,8 +30,8 @@ import org.apache.iotdb.tsfile.read.common.BatchData;
 /**
  * BatchReader without time generator for cluster which is used in query node.
  */
-public class ClusterSelectSeriesBatchReader extends
-    AbstractClusterSelectSeriesBatchReader {
+public class ClusterSelectSeriesBatchReader implements
+    IClusterSelectSeriesBatchReader {
 
   /**
    * Data type
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterSelectSeriesBatchReaderByTimestamp.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterSelectSeriesBatchReaderByTimestamp.java
index 72dce05..fc6fe31 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterSelectSeriesBatchReaderByTimestamp.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterSelectSeriesBatchReaderByTimestamp.java
@@ -27,8 +27,8 @@ import org.apache.iotdb.tsfile.read.common.BatchData;
 /**
  * BatchReader by timestamp for cluster which is used in query node.
  */
-public class ClusterSelectSeriesBatchReaderByTimestamp extends
-    AbstractClusterSelectSeriesBatchReader {
+public class ClusterSelectSeriesBatchReaderByTimestamp implements
+    IClusterSelectSeriesBatchReader {
 
   /**
    * Reader
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterSelectSeriesBatchReaderEntity.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterSelectSeriesBatchReaderEntity.java
index 484c423..7150ffa 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterSelectSeriesBatchReaderEntity.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterSelectSeriesBatchReaderEntity.java
@@ -34,7 +34,7 @@ public class ClusterSelectSeriesBatchReaderEntity {
   /**
    * All select readers
    */
-  private List<AbstractClusterSelectSeriesBatchReader> readers;
+  private List<IClusterSelectSeriesBatchReader> readers;
 
   public ClusterSelectSeriesBatchReaderEntity() {
     paths = new ArrayList<>();
@@ -45,15 +45,15 @@ public class ClusterSelectSeriesBatchReaderEntity {
     this.paths.add(path);
   }
 
-  public void addReaders(AbstractClusterSelectSeriesBatchReader reader) {
+  public void addReaders(IClusterSelectSeriesBatchReader reader) {
     this.readers.add(reader);
   }
 
-  public List<AbstractClusterSelectSeriesBatchReader> getAllReaders() {
+  public List<IClusterSelectSeriesBatchReader> getAllReaders() {
     return readers;
   }
 
-  public AbstractClusterSelectSeriesBatchReader getReaderByIndex(int index) {
+  public IClusterSelectSeriesBatchReader getReaderByIndex(int index) {
     return readers.get(index);
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/AbstractClusterSelectSeriesBatchReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/IClusterSelectSeriesBatchReader.java
similarity index 88%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/AbstractClusterSelectSeriesBatchReader.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/IClusterSelectSeriesBatchReader.java
index 6fe28e2..87a8329 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/AbstractClusterSelectSeriesBatchReader.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/IClusterSelectSeriesBatchReader.java
@@ -26,7 +26,7 @@ import org.apache.iotdb.tsfile.read.common.BatchData;
 /**
  * Cluster batch reader, which provides another method to get batch data by batch timestamp.
  */
-public abstract class AbstractClusterSelectSeriesBatchReader implements IBatchReader {
+public interface IClusterSelectSeriesBatchReader extends IBatchReader {
 
   /**
    * Get batch data by batch time
@@ -34,6 +34,6 @@ public abstract class AbstractClusterSelectSeriesBatchReader implements IBatchRe
    * @param batchTime valid batch timestamp
    * @return corresponding batch data
    */
-  public abstract BatchData nextBatch(List<Long> batchTime) throws IOException;
+  BatchData nextBatch(List<Long> batchTime) throws IOException;
 
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
index 8913086..cf41ae6 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
@@ -18,12 +18,10 @@
  */
 package org.apache.iotdb.cluster.rpc.raft.impl;
 
-import com.alipay.remoting.InvokeCallback;
 import com.alipay.remoting.exception.RemotingException;
 import com.alipay.sofa.jraft.option.CliOptions;
 import com.alipay.sofa.jraft.rpc.impl.cli.BoltCliClientService;
 import java.util.LinkedList;
-import java.util.concurrent.Executor;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -32,7 +30,6 @@ import org.apache.iotdb.cluster.config.ClusterConfig;
 import org.apache.iotdb.cluster.config.ClusterConstant;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.exception.RaftConnectionException;
-import org.apache.iotdb.cluster.qp.task.QPTask.TaskState;
 import org.apache.iotdb.cluster.qp.task.SingleQPTask;
 import org.apache.iotdb.cluster.rpc.raft.NodeAsClient;
 import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/querymetadata/QueryMetadataAsyncProcessor.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/querymetadata/QueryMetadataAsyncProcessor.java
index 446fe88..a8fa1fa 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/querymetadata/QueryMetadataAsyncProcessor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/querymetadata/QueryMetadataAsyncProcessor.java
@@ -23,7 +23,6 @@ import com.alipay.remoting.BizContext;
 import com.alipay.sofa.jraft.Status;
 import com.alipay.sofa.jraft.closure.ReadIndexClosure;
 import org.apache.iotdb.cluster.config.ClusterConsistencyLevel;
-import org.apache.iotdb.cluster.config.ClusterConstant;
 import org.apache.iotdb.cluster.entity.raft.DataPartitionRaftHolder;
 import org.apache.iotdb.cluster.entity.raft.RaftService;
 import org.apache.iotdb.cluster.rpc.raft.processor.BasicAsyncUserProcessor;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/querymetadata/QueryMetadataInStringAsyncProcessor.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/querymetadata/QueryMetadataInStringAsyncProcessor.java
index 857f360..7a46a14 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/querymetadata/QueryMetadataInStringAsyncProcessor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/querymetadata/QueryMetadataInStringAsyncProcessor.java
@@ -23,7 +23,6 @@ import com.alipay.remoting.BizContext;
 import com.alipay.sofa.jraft.Status;
 import com.alipay.sofa.jraft.closure.ReadIndexClosure;
 import org.apache.iotdb.cluster.config.ClusterConsistencyLevel;
-import org.apache.iotdb.cluster.config.ClusterConstant;
 import org.apache.iotdb.cluster.entity.raft.DataPartitionRaftHolder;
 import org.apache.iotdb.cluster.entity.raft.RaftService;
 import org.apache.iotdb.cluster.rpc.raft.processor.BasicAsyncUserProcessor;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/querymetadata/QueryPathsAsyncProcessor.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/querymetadata/QueryPathsAsyncProcessor.java
index 9f65929..3736105 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/querymetadata/QueryPathsAsyncProcessor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/querymetadata/QueryPathsAsyncProcessor.java
@@ -23,7 +23,6 @@ import com.alipay.remoting.BizContext;
 import com.alipay.sofa.jraft.Status;
 import com.alipay.sofa.jraft.closure.ReadIndexClosure;
 import org.apache.iotdb.cluster.config.ClusterConsistencyLevel;
-import org.apache.iotdb.cluster.config.ClusterConstant;
 import org.apache.iotdb.cluster.entity.raft.DataPartitionRaftHolder;
 import org.apache.iotdb.cluster.entity.raft.RaftService;
 import org.apache.iotdb.cluster.rpc.raft.processor.BasicAsyncUserProcessor;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/querymetadata/QuerySeriesTypeAsyncProcessor.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/querymetadata/QuerySeriesTypeAsyncProcessor.java
index dcb774f..c8df5a2 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/querymetadata/QuerySeriesTypeAsyncProcessor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/querymetadata/QuerySeriesTypeAsyncProcessor.java
@@ -23,7 +23,6 @@ import com.alipay.remoting.BizContext;
 import com.alipay.sofa.jraft.Status;
 import com.alipay.sofa.jraft.closure.ReadIndexClosure;
 import org.apache.iotdb.cluster.config.ClusterConsistencyLevel;
-import org.apache.iotdb.cluster.config.ClusterConstant;
 import org.apache.iotdb.cluster.entity.raft.DataPartitionRaftHolder;
 import org.apache.iotdb.cluster.entity.raft.RaftService;
 import org.apache.iotdb.cluster.rpc.raft.processor.BasicAsyncUserProcessor;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/querymetadata/QueryTimeSeriesAsyncProcessor.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/querymetadata/QueryTimeSeriesAsyncProcessor.java
index c36120f..d08cd1a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/querymetadata/QueryTimeSeriesAsyncProcessor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/querymetadata/QueryTimeSeriesAsyncProcessor.java
@@ -23,7 +23,6 @@ import com.alipay.remoting.BizContext;
 import com.alipay.sofa.jraft.Status;
 import com.alipay.sofa.jraft.closure.ReadIndexClosure;
 import org.apache.iotdb.cluster.config.ClusterConsistencyLevel;
-import org.apache.iotdb.cluster.config.ClusterConstant;
 import org.apache.iotdb.cluster.entity.raft.DataPartitionRaftHolder;
 import org.apache.iotdb.cluster.entity.raft.RaftService;
 import org.apache.iotdb.cluster.rpc.raft.processor.BasicAsyncUserProcessor;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
index f8546b7..96e0363 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
@@ -79,8 +79,6 @@ import org.slf4j.LoggerFactory;
 
 public class RaftUtils {
 
-  private static final ClusterConfig CLUSTER_CONFIG = ClusterDescriptor.getInstance().getConfig();
-
   private static final Logger LOGGER = LoggerFactory.getLogger(RaftUtils.class);
   private static final Server server = Server.getInstance();
   private static final Router router = Router.getInstance();
@@ -765,10 +763,8 @@ public class RaftUtils {
         int[] nums1 = convertIPToNums(o1);
         int[] nums2 = convertIPToNums(o2);
         for (int i = 0; i < Math.min(nums1.length, nums2.length); i++) {
-          if (nums1[i] == nums2[i]) {
-            continue;
-          } else {
-            return ((Integer) nums1[i]).compareTo(nums2[i]);
+          if (nums1[i] != nums2[i]) {
+            return Integer.compare(nums1[i], nums2[i]);
           }
         }
         return 0;
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/manager/ClusterLocalManagerTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/manager/ClusterLocalManagerTest.java
index e71e489..7ba4547 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/manager/ClusterLocalManagerTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/manager/ClusterLocalManagerTest.java
@@ -32,14 +32,13 @@ import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.iotdb.cluster.config.ClusterConfig;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.entity.Server;
 import org.apache.iotdb.cluster.query.manager.querynode.ClusterLocalQueryManager;
 import org.apache.iotdb.cluster.query.manager.querynode.ClusterLocalSingleQueryManager;
-import org.apache.iotdb.cluster.query.reader.querynode.AbstractClusterSelectSeriesBatchReader;
+import org.apache.iotdb.cluster.query.reader.querynode.IClusterSelectSeriesBatchReader;
 import org.apache.iotdb.cluster.query.reader.querynode.ClusterSelectSeriesBatchReader;
 import org.apache.iotdb.cluster.query.reader.querynode.ClusterSelectSeriesBatchReaderByTimestamp;
 import org.apache.iotdb.cluster.query.reader.querynode.ClusterFilterSeriesBatchReaderEntity;
@@ -224,11 +223,11 @@ public class ClusterLocalManagerTest {
         ClusterSelectSeriesBatchReaderEntity selectSeriesBatchReaderEntity = singleQueryManager.getSelectReaderEntity();
         assertEquals(3, selectSeriesBatchReaderEntity.getAllReaders().size());
         Map<String, TSDataType> typeMap = singleQueryManager.getDataTypeMap();
-        List<AbstractClusterSelectSeriesBatchReader> readers = selectSeriesBatchReaderEntity.getAllReaders();
+        List<IClusterSelectSeriesBatchReader> readers = selectSeriesBatchReaderEntity.getAllReaders();
         List<String> paths = selectSeriesBatchReaderEntity.getAllPaths();
         for (int i =0 ; i < readers.size(); i++) {
           TSDataType dataType = typeMap.get(paths.get(i));
-          AbstractClusterSelectSeriesBatchReader clusterBatchReader = readers.get(i);
+          IClusterSelectSeriesBatchReader clusterBatchReader = readers.get(i);
           assertNotNull(((ClusterSelectSeriesBatchReader) clusterBatchReader).getReader());
           assertEquals(dataType,
               ((ClusterSelectSeriesBatchReader) clusterBatchReader).getDataType());
@@ -251,11 +250,11 @@ public class ClusterLocalManagerTest {
         ClusterSelectSeriesBatchReaderEntity selectSeriesBatchReaderEntity = singleQueryManager.getSelectReaderEntity();
         assertEquals(3, selectSeriesBatchReaderEntity.getAllReaders().size());
         Map<String, TSDataType> typeMap = singleQueryManager.getDataTypeMap();
-        List<AbstractClusterSelectSeriesBatchReader> readers = selectSeriesBatchReaderEntity.getAllReaders();
+        List<IClusterSelectSeriesBatchReader> readers = selectSeriesBatchReaderEntity.getAllReaders();
         List<String> paths = selectSeriesBatchReaderEntity.getAllPaths();
         for (int i =0 ; i < readers.size(); i++) {
           TSDataType dataType = typeMap.get(paths.get(i));
-          AbstractClusterSelectSeriesBatchReader clusterBatchReader = readers.get(i);
+          IClusterSelectSeriesBatchReader clusterBatchReader = readers.get(i);
           assertNotNull(((ClusterSelectSeriesBatchReader) clusterBatchReader).getReader());
           assertEquals(dataType,
               ((ClusterSelectSeriesBatchReader) clusterBatchReader).getDataType());
@@ -278,11 +277,11 @@ public class ClusterLocalManagerTest {
         ClusterSelectSeriesBatchReaderEntity selectSeriesBatchReaderEntity = singleQueryManager.getSelectReaderEntity();
         assertEquals(3, selectSeriesBatchReaderEntity.getAllReaders().size());
         Map<String, TSDataType> typeMap = singleQueryManager.getDataTypeMap();
-        List<AbstractClusterSelectSeriesBatchReader> readers = selectSeriesBatchReaderEntity.getAllReaders();
+        List<IClusterSelectSeriesBatchReader> readers = selectSeriesBatchReaderEntity.getAllReaders();
         List<String> paths = selectSeriesBatchReaderEntity.getAllPaths();
         for (int i =0 ; i < readers.size(); i++) {
           TSDataType dataType = typeMap.get(paths.get(i));
-          AbstractClusterSelectSeriesBatchReader clusterBatchReader = readers.get(i);
+          IClusterSelectSeriesBatchReader clusterBatchReader = readers.get(i);
           assertNotNull(((ClusterSelectSeriesBatchReader) clusterBatchReader).getReader());
           assertEquals(dataType,
               ((ClusterSelectSeriesBatchReader) clusterBatchReader).getDataType());
@@ -322,11 +321,11 @@ public class ClusterLocalManagerTest {
         assertNotNull(selectSeriesBatchReaderEntity.getAllReaders());
         assertEquals(3, selectSeriesBatchReaderEntity.getAllReaders().size());
         Map<String, TSDataType> typeMap = singleQueryManager.getDataTypeMap();
-        List<AbstractClusterSelectSeriesBatchReader> readers = selectSeriesBatchReaderEntity.getAllReaders();
+        List<IClusterSelectSeriesBatchReader> readers = selectSeriesBatchReaderEntity.getAllReaders();
         List<String> paths = selectSeriesBatchReaderEntity.getAllPaths();
         for (int i =0 ; i < readers.size(); i++) {
           TSDataType dataType = typeMap.get(paths.get(i));
-          AbstractClusterSelectSeriesBatchReader clusterBatchReader = readers.get(i);
+          IClusterSelectSeriesBatchReader clusterBatchReader = readers.get(i);
           assertNotNull(((ClusterSelectSeriesBatchReaderByTimestamp) clusterBatchReader).getReaderByTimeStamp());
           assertEquals(dataType,
               ((ClusterSelectSeriesBatchReaderByTimestamp) clusterBatchReader).getDataType());
@@ -353,14 +352,14 @@ public class ClusterLocalManagerTest {
         assertNotNull(filterReader.getQueryDataSet());
 
         ClusterSelectSeriesBatchReaderEntity selectSeriesBatchReaderEntity = singleQueryManager.getSelectReaderEntity();
-        List<AbstractClusterSelectSeriesBatchReader> readers = selectSeriesBatchReaderEntity.getAllReaders();
+        List<IClusterSelectSeriesBatchReader> readers = selectSeriesBatchReaderEntity.getAllReaders();
         assertNotNull(readers);
         assertEquals(3, readers.size());
         Map<String, TSDataType> typeMap = singleQueryManager.getDataTypeMap();
         List<String> paths = selectSeriesBatchReaderEntity.getAllPaths();
         for (int i =0 ; i < readers.size(); i++) {
           TSDataType dataType = typeMap.get(paths.get(i));
-          AbstractClusterSelectSeriesBatchReader clusterBatchReader = readers.get(i);
+          IClusterSelectSeriesBatchReader clusterBatchReader = readers.get(i);
           assertNotNull(((ClusterSelectSeriesBatchReaderByTimestamp) clusterBatchReader).getReaderByTimeStamp());
           assertEquals(dataType,
               ((ClusterSelectSeriesBatchReaderByTimestamp) clusterBatchReader).getDataType());
@@ -387,14 +386,14 @@ public class ClusterLocalManagerTest {
         assertNotNull(filterReader.getQueryDataSet());
 
         ClusterSelectSeriesBatchReaderEntity selectSeriesBatchReaderEntity = singleQueryManager.getSelectReaderEntity();
-        List<AbstractClusterSelectSeriesBatchReader> readers = selectSeriesBatchReaderEntity.getAllReaders();
+        List<IClusterSelectSeriesBatchReader> readers = selectSeriesBatchReaderEntity.getAllReaders();
         assertNotNull(readers);
         assertEquals(3, readers.size());
         Map<String, TSDataType> typeMap = singleQueryManager.getDataTypeMap();
         List<String> paths = selectSeriesBatchReaderEntity.getAllPaths();
         for (int i =0 ; i < readers.size(); i++) {
           TSDataType dataType = typeMap.get(paths.get(i));
-          AbstractClusterSelectSeriesBatchReader clusterBatchReader = readers.get(i);
+          IClusterSelectSeriesBatchReader clusterBatchReader = readers.get(i);
           assertNotNull(((ClusterSelectSeriesBatchReaderByTimestamp) clusterBatchReader).getReaderByTimeStamp());
           assertEquals(dataType,
               ((ClusterSelectSeriesBatchReaderByTimestamp) clusterBatchReader).getDataType());