You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2021/11/28 03:17:29 UTC
[iotdb] branch xianyi updated: fix read/write stable
This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch xianyi
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/xianyi by this push:
new b448679 fix read/write stable
b448679 is described below
commit b448679a1d877589f3a8f345056687086abc90ba
Author: LebronAl <TX...@gmail.com>
AuthorDate: Sun Nov 28 11:16:49 2021 +0800
fix read/write stable
---
cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java | 2 +-
.../java/org/apache/iotdb/session/ClusterSession.java | 17 ++++++++---------
2 files changed, 9 insertions(+), 10 deletions(-)
diff --git a/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java b/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java
index 240bad3..76c8cf6 100644
--- a/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java
+++ b/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java
@@ -381,7 +381,7 @@ public abstract class AbstractCli {
if (specialCmd.startsWith(SELECT_COUNT) && nodeList.size() != 0) {
String[] paths = specialCmd.split(" ");
String deviceId = paths[paths.length - 1];
- EndPoint endpoint = nodeList.get(Math.abs(deviceId.hashCode()) % nodeList.size());
+ EndPoint endpoint = nodeList.get(Math.abs(deviceId.hashCode()) % 3);
if (endpoint.port == Integer.parseInt(port)) {
executeQuery(connection, cmd);
} else {
diff --git a/session/src/main/java/org/apache/iotdb/session/ClusterSession.java b/session/src/main/java/org/apache/iotdb/session/ClusterSession.java
index 496d166..d1389bd 100644
--- a/session/src/main/java/org/apache/iotdb/session/ClusterSession.java
+++ b/session/src/main/java/org/apache/iotdb/session/ClusterSession.java
@@ -32,15 +32,16 @@ public class ClusterSession {
Session[] sessions;
ArrayBlockingQueue[] queues;
List<EndPoint> nodeList;
+ private final int size = 3;
public ClusterSession(String host, int rpcPort) throws IoTDBConnectionException {
Session session = new Session(host, rpcPort);
session.open();
nodeList = new ArrayList<>();
nodeList.addAll(session.getNodeList());
- sessions = new Session[nodeList.size()];
- queues = new ArrayBlockingQueue[nodeList.size()];
- for (int i = 0; i < nodeList.size(); i++) {
+ sessions = new Session[size];
+ queues = new ArrayBlockingQueue[size];
+ for (int i = 0; i < size; i++) {
sessions[i] = new Session(nodeList.get(i).ip, nodeList.get(i).port);
sessions[i].open();
queues[i] = new ArrayBlockingQueue<>(1000);
@@ -50,10 +51,9 @@ public class ClusterSession {
public void insertTablet(Tablet tablet)
throws StatementExecutionException, IoTDBConnectionException {
- int hashVal = Math.abs(tablet.prefixPath.hashCode());
- int index = hashVal % nodeList.size();
+ int index = Math.abs(tablet.prefixPath.hashCode()) % size;
for (int i = 0; i < 2; i++) {
- int j = (index + i) % nodeList.size();
+ int j = (index + i) % 3;
synchronized (queues[j]) {
if (!queues[j].isEmpty()) {
queues[j].add(tablet);
@@ -73,14 +73,13 @@ public class ClusterSession {
}
public SessionDataSet queryTablet(String sql, String deviceId) {
- int hashVal = Math.abs(deviceId.hashCode());
- int index = hashVal % nodeList.size();
+ int index = Math.abs(deviceId.hashCode()) % size;
SessionDataSet sessionDataSet = null;
try {
sessionDataSet = sessions[index].executeQueryStatement(sql);
} catch (Exception e) {
try {
- sessionDataSet = sessions[(index + 1) % nodeList.size()].executeQueryStatement(sql);
+ sessionDataSet = sessions[(index + 1) % size].executeQueryStatement(sql);
} catch (Exception ex) {
// never happen, once the node restart, it won't be killed anymore.
e.printStackTrace();