You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2021/11/28 04:50:40 UTC

[iotdb] branch xianyi updated (b448679 -> 94366c1)

This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a change to branch xianyi
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


 discard b448679  fix read/write stable
     new 94366c1  fix read/write stable

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (b448679)
            \
             N -- N -- N   refs/heads/xianyi (94366c1)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 server/src/assembly/resources/conf/iotdb-engine.properties         | 3 +++
 session/src/main/java/org/apache/iotdb/session/ClusterSession.java | 2 +-
 2 files changed, 4 insertions(+), 1 deletion(-)

[iotdb] 01/01: fix read/write stable

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch xianyi
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 94366c1c05dba8f5b0acc02aff1bb8d8e7106496
Author: LebronAl <TX...@gmail.com>
AuthorDate: Sun Nov 28 11:16:49 2021 +0800

    fix read/write stable
---
 cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java |  2 +-
 .../src/assembly/resources/conf/iotdb-engine.properties |  3 +++
 .../java/org/apache/iotdb/session/ClusterSession.java   | 17 ++++++++---------
 3 files changed, 12 insertions(+), 10 deletions(-)

diff --git a/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java b/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java
index 240bad3..76c8cf6 100644
--- a/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java
+++ b/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java
@@ -381,7 +381,7 @@ public abstract class AbstractCli {
     if (specialCmd.startsWith(SELECT_COUNT) && nodeList.size() != 0) {
       String[] paths = specialCmd.split(" ");
       String deviceId = paths[paths.length - 1];
-      EndPoint endpoint = nodeList.get(Math.abs(deviceId.hashCode()) % nodeList.size());
+      EndPoint endpoint = nodeList.get(Math.abs(deviceId.hashCode()) % 3);
       if (endpoint.port == Integer.parseInt(port)) {
         executeQuery(connection, cmd);
       } else {
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index ceb68f8..ea41f5f 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -40,6 +40,9 @@ rpc_address=0.0.0.0
 # Datatype: int
 rpc_port=6667
 
+# Node list: string
+node_list=127.0.0.1:6667,127.0.0.1:6668,127.0.0.1:6669
+
 # Datatype: boolean
 # rpc_thrift_compression_enable=false
 
diff --git a/session/src/main/java/org/apache/iotdb/session/ClusterSession.java b/session/src/main/java/org/apache/iotdb/session/ClusterSession.java
index 496d166..ee630e0 100644
--- a/session/src/main/java/org/apache/iotdb/session/ClusterSession.java
+++ b/session/src/main/java/org/apache/iotdb/session/ClusterSession.java
@@ -32,15 +32,16 @@ public class ClusterSession {
   Session[] sessions;
   ArrayBlockingQueue[] queues;
   List<EndPoint> nodeList;
+  private final int size = 3;
 
   public ClusterSession(String host, int rpcPort) throws IoTDBConnectionException {
     Session session = new Session(host, rpcPort);
     session.open();
     nodeList = new ArrayList<>();
     nodeList.addAll(session.getNodeList());
-    sessions = new Session[nodeList.size()];
-    queues = new ArrayBlockingQueue[nodeList.size()];
-    for (int i = 0; i < nodeList.size(); i++) {
+    sessions = new Session[size];
+    queues = new ArrayBlockingQueue[size];
+    for (int i = 0; i < size; i++) {
       sessions[i] = new Session(nodeList.get(i).ip, nodeList.get(i).port);
       sessions[i].open();
       queues[i] = new ArrayBlockingQueue<>(1000);
@@ -50,10 +51,9 @@ public class ClusterSession {
 
   public void insertTablet(Tablet tablet)
       throws StatementExecutionException, IoTDBConnectionException {
-    int hashVal = Math.abs(tablet.prefixPath.hashCode());
-    int index = hashVal % nodeList.size();
+    int index = Math.abs(tablet.prefixPath.hashCode()) % size;
     for (int i = 0; i < 2; i++) {
-      int j = (index + i) % nodeList.size();
+      int j = (index + i) % size;
       synchronized (queues[j]) {
         if (!queues[j].isEmpty()) {
           queues[j].add(tablet);
@@ -73,14 +73,13 @@ public class ClusterSession {
   }
 
   public SessionDataSet queryTablet(String sql, String deviceId) {
-    int hashVal = Math.abs(deviceId.hashCode());
-    int index = hashVal % nodeList.size();
+    int index = Math.abs(deviceId.hashCode()) % size;
     SessionDataSet sessionDataSet = null;
     try {
       sessionDataSet = sessions[index].executeQueryStatement(sql);
     } catch (Exception e) {
       try {
-        sessionDataSet = sessions[(index + 1) % nodeList.size()].executeQueryStatement(sql);
+        sessionDataSet = sessions[(index + 1) % size].executeQueryStatement(sql);
       } catch (Exception ex) {
         // never happen, once the node restart, it won't be killed anymore.
         e.printStackTrace();