You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2021/11/28 03:17:29 UTC

[iotdb] branch xianyi updated: fix read/write stable

This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch xianyi
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/xianyi by this push:
     new b448679  fix read/write stable
b448679 is described below

commit b448679a1d877589f3a8f345056687086abc90ba
Author: LebronAl <TX...@gmail.com>
AuthorDate: Sun Nov 28 11:16:49 2021 +0800

    fix read/write stable
---
 cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java |  2 +-
 .../java/org/apache/iotdb/session/ClusterSession.java   | 17 ++++++++---------
 2 files changed, 9 insertions(+), 10 deletions(-)

diff --git a/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java b/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java
index 240bad3..76c8cf6 100644
--- a/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java
+++ b/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java
@@ -381,7 +381,7 @@ public abstract class AbstractCli {
     if (specialCmd.startsWith(SELECT_COUNT) && nodeList.size() != 0) {
       String[] paths = specialCmd.split(" ");
       String deviceId = paths[paths.length - 1];
-      EndPoint endpoint = nodeList.get(Math.abs(deviceId.hashCode()) % nodeList.size());
+      EndPoint endpoint = nodeList.get(Math.abs(deviceId.hashCode()) % 3);
       if (endpoint.port == Integer.parseInt(port)) {
         executeQuery(connection, cmd);
       } else {
diff --git a/session/src/main/java/org/apache/iotdb/session/ClusterSession.java b/session/src/main/java/org/apache/iotdb/session/ClusterSession.java
index 496d166..d1389bd 100644
--- a/session/src/main/java/org/apache/iotdb/session/ClusterSession.java
+++ b/session/src/main/java/org/apache/iotdb/session/ClusterSession.java
@@ -32,15 +32,16 @@ public class ClusterSession {
   Session[] sessions;
   ArrayBlockingQueue[] queues;
   List<EndPoint> nodeList;
+  private final int size = 3;
 
   public ClusterSession(String host, int rpcPort) throws IoTDBConnectionException {
     Session session = new Session(host, rpcPort);
     session.open();
     nodeList = new ArrayList<>();
     nodeList.addAll(session.getNodeList());
-    sessions = new Session[nodeList.size()];
-    queues = new ArrayBlockingQueue[nodeList.size()];
-    for (int i = 0; i < nodeList.size(); i++) {
+    sessions = new Session[size];
+    queues = new ArrayBlockingQueue[size];
+    for (int i = 0; i < size; i++) {
       sessions[i] = new Session(nodeList.get(i).ip, nodeList.get(i).port);
       sessions[i].open();
       queues[i] = new ArrayBlockingQueue<>(1000);
@@ -50,10 +51,9 @@ public class ClusterSession {
 
   public void insertTablet(Tablet tablet)
       throws StatementExecutionException, IoTDBConnectionException {
-    int hashVal = Math.abs(tablet.prefixPath.hashCode());
-    int index = hashVal % nodeList.size();
+    int index = Math.abs(tablet.prefixPath.hashCode()) % size;
     for (int i = 0; i < 2; i++) {
-      int j = (index + i) % nodeList.size();
+      int j = (index + i) % 3;
       synchronized (queues[j]) {
         if (!queues[j].isEmpty()) {
           queues[j].add(tablet);
@@ -73,14 +73,13 @@ public class ClusterSession {
   }
 
   public SessionDataSet queryTablet(String sql, String deviceId) {
-    int hashVal = Math.abs(deviceId.hashCode());
-    int index = hashVal % nodeList.size();
+    int index = Math.abs(deviceId.hashCode()) % size;
     SessionDataSet sessionDataSet = null;
     try {
       sessionDataSet = sessions[index].executeQueryStatement(sql);
     } catch (Exception e) {
       try {
-        sessionDataSet = sessions[(index + 1) % nodeList.size()].executeQueryStatement(sql);
+        sessionDataSet = sessions[(index + 1) % size].executeQueryStatement(sql);
       } catch (Exception ex) {
         // never happen, once the node restart, it won't be killed anymore.
         e.printStackTrace();