You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by yu...@apache.org on 2020/07/20 08:48:35 UTC
[incubator-iotdb] 01/01: separate read and write config
This is an automated email from the ASF dual-hosted git repository.
yuyuankang pushed a commit to branch kyy
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 4637756254e3c546566b4415fb069cafe3553e18
Author: Ring-k <yu...@hotmail.com>
AuthorDate: Mon Jul 20 16:45:05 2020 +0800
separate read and write config
---
.../resources/conf/iotdb-cluster.properties | 8 +++++-
.../cluster/client/sync/SyncClientAdaptor.java | 2 +-
.../apache/iotdb/cluster/config/ClusterConfig.java | 20 ++++++++++----
.../iotdb/cluster/config/ClusterDescriptor.java | 32 +++++++++++++---------
.../apache/iotdb/cluster/server/RaftServer.java | 8 +++---
.../cluster/server/member/MetaGroupMember.java | 4 +--
6 files changed, 48 insertions(+), 26 deletions(-)
diff --git a/cluster/src/assembly/resources/conf/iotdb-cluster.properties b/cluster/src/assembly/resources/conf/iotdb-cluster.properties
index b656542..216af2e 100644
--- a/cluster/src/assembly/resources/conf/iotdb-cluster.properties
+++ b/cluster/src/assembly/resources/conf/iotdb-cluster.properties
@@ -53,7 +53,13 @@ max_concurrent_client_num=10000
default_replica_num=2
# connection time out (ms) among raft nodes
-connection_time_out_ms=20000
+connection_timeout_ms=20000
+
+# write operation timeout threshold (ms)
+write_operation_timeout_ms=30000
+
+# read operation timeout threshold (ms)
+read_operation_timeout_ms=30000
# when the logs size larger than this, we actually delete snapshoted logs, the unit is bytes
max_unsnapshoted_log_size=134217728
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
index 6a4631e..42e2a2b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
@@ -302,7 +302,7 @@ public class SyncClientAdaptor {
GenericHandler<ByteBuffer> nodeHandler = new GenericHandler<>(client.getNode(), resultRef);
synchronized (resultRef) {
client.previousFill(request, nodeHandler);
- resultRef.wait(RaftServer.getQueryTimeoutInSec() * 1000L);
+ resultRef.wait(RaftServer.getReadOperationTimeoutMS());
}
return resultRef.get();
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
index ca77d5a..ab47959 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
@@ -56,7 +56,9 @@ public class ClusterConfig {
*/
private long maxUnsnapshotedLogSize = 1024 * 1024 * 128L;
- private int queryTimeoutInSec = 30;
+ private int readOperationTimeoutMS = 30 * 1000;
+
+ private int writeOperationTimeoutMS = 30 * 1000;
private boolean useBatchInLogCatchUp = true;
@@ -203,12 +205,20 @@ public class ClusterConfig {
this.connectionTimeoutInMS = connectionTimeoutInMS;
}
- public int getQueryTimeoutInSec() {
- return queryTimeoutInSec;
+ public int getReadOperationTimeoutMS() {
+ return readOperationTimeoutMS;
+ }
+
+ public void setReadOperationTimeoutMS(int readOperationTimeoutMS) {
+ this.readOperationTimeoutMS = readOperationTimeoutMS;
+ }
+
+ public int getWriteOperationTimeoutMS() {
+ return writeOperationTimeoutMS;
}
- public void setQueryTimeoutInSec(int queryTimeoutInSec) {
- this.queryTimeoutInSec = queryTimeoutInSec;
+ public void setWriteOperationTimeoutMS(int writeOperationTimeoutMS) {
+ this.writeOperationTimeoutMS = writeOperationTimeoutMS;
}
public int getMaxNumberOfLogs() {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
index d0fb38e..4ed4bf5 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
@@ -91,7 +91,8 @@ public class ClusterDescriptor {
public void replaceProps(String[] params) {
Options options = new Options();
- Option metaPort = new Option(OPTION_META_PORT, OPTION_META_PORT, true, "port for metadata service");
+ Option metaPort = new Option(OPTION_META_PORT, OPTION_META_PORT, true,
+ "port for metadata service");
metaPort.setRequired(false);
options.addOption(metaPort);
@@ -99,7 +100,8 @@ public class ClusterDescriptor {
metaPort.setRequired(false);
options.addOption(dataPort);
- Option clientPort = new Option(OPTION_CLIENT_PORT, OPTION_CLIENT_PORT, true, "port for client service");
+ Option clientPort = new Option(OPTION_CLIENT_PORT, OPTION_CLIENT_PORT, true,
+ "port for client service");
metaPort.setRequired(false);
options.addOption(clientPort);
@@ -208,16 +210,19 @@ public class ClusterDescriptor {
config.setRpcThriftCompressionEnabled(Boolean.parseBoolean(properties.getProperty(
"rpc_thrift_compression_enable", String.valueOf(config.isRpcThriftCompressionEnabled()))));
- config
- .setConnectionTimeoutInMS(Integer.parseInt(properties.getProperty("connection_time_out_ms",
- String.valueOf(config.getConnectionTimeoutInMS()))));
+ config.setConnectionTimeoutInMS(Integer.parseInt(properties
+ .getProperty("connection_timeout_ms", String.valueOf(config.getConnectionTimeoutInMS()))));
+
+ config.setReadOperationTimeoutMS(Integer.parseInt(properties
+ .getProperty("read_operation_timeout_ms",
+ String.valueOf(config.getReadOperationTimeoutMS()))));
- config
- .setQueryTimeoutInSec(Integer.parseInt(properties.getProperty("QUERY_TIME_OUT_SEC",
- String.valueOf(config.getQueryTimeoutInSec()))));
+ config.setWriteOperationTimeoutMS(Integer.parseInt(properties
+ .getProperty("write_operation_timeout_ms",
+ String.valueOf(config.getWriteOperationTimeoutMS()))));
- config
- .setMaxUnsnapshotedLogSize(Long.parseLong(properties.getProperty("max_unsnapshoted_log_size",
+ config.setMaxUnsnapshotedLogSize(Long.parseLong(properties
+ .getProperty("max_unsnapshoted_log_size",
String.valueOf(config.getMaxUnsnapshotedLogSize()))));
config.setUseBatchInLogCatchUp(Boolean.parseBoolean(properties.getProperty(
@@ -288,7 +293,7 @@ public class ClusterDescriptor {
/**
* This method is for setting hot modified properties of the cluster. Currently, we support
- * max_concurrent_client_num, connection_time_out_ms, max_resolved_log_size
+ * max_concurrent_client_num, connection_timeout_ms, max_resolved_log_size
*
* @param properties
* @throws QueryProcessException
@@ -300,10 +305,11 @@ public class ClusterDescriptor {
String.valueOf(config.getMaxConcurrentClientNum()))));
config.setConnectionTimeoutInMS(Integer.parseInt(properties
- .getProperty("connection_time_out_ms", String.valueOf(config.getConnectionTimeoutInMS()))));
+ .getProperty("connection_timeout_ms", String.valueOf(config.getConnectionTimeoutInMS()))));
config.setMaxUnsnapshotedLogSize(Long.parseLong(properties
- .getProperty("max_unsnapshoted_log_size", String.valueOf(config.getMaxUnsnapshotedLogSize()))));
+ .getProperty("max_unsnapshoted_log_size",
+ String.valueOf(config.getMaxUnsnapshotedLogSize()))));
logger.info("Set cluster configuration {}", properties);
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/RaftServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/RaftServer.java
index 35cb914..42986b8 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/RaftServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/RaftServer.java
@@ -56,8 +56,8 @@ public abstract class RaftServer implements RaftService.AsyncIface, RaftService.
private static final Logger logger = LoggerFactory.getLogger(RaftServer.class);
private static int connectionTimeoutInMS =
ClusterDescriptor.getInstance().getConfig().getConnectionTimeoutInMS();
- private static int queryTimeoutInSec =
- ClusterDescriptor.getInstance().getConfig().getQueryTimeoutInSec();
+ private static int readOperationTimeoutMS =
+ ClusterDescriptor.getInstance().getConfig().getReadOperationTimeoutMS();
private static int syncLeaderMaxWaitMs = 20 * 1000;
private static long heartBeatIntervalMs = 1000L;
@@ -93,8 +93,8 @@ public abstract class RaftServer implements RaftService.AsyncIface, RaftService.
RaftServer.connectionTimeoutInMS = connectionTimeoutInMS;
}
- public static int getQueryTimeoutInSec() {
- return queryTimeoutInSec;
+ public static int getReadOperationTimeoutMS() {
+ return readOperationTimeoutMS;
}
public static int getSyncLeaderMaxWaitMs() {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index fa0abff..4a6cf34 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -2688,7 +2688,7 @@ public class MetaGroupMember extends RaftMember {
}
getAllPathsService.shutdown();
try {
- getAllPathsService.awaitTermination(RaftServer.getQueryTimeoutInSec(), TimeUnit.SECONDS);
+ getAllPathsService.awaitTermination(RaftServer.getReadOperationTimeoutMS(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("Unexpected interruption when waiting for get all paths services to stop", e);
@@ -3391,7 +3391,7 @@ public class MetaGroupMember extends RaftMember {
}
fillService.shutdown();
try {
- fillService.awaitTermination(RaftServer.getQueryTimeoutInSec(), TimeUnit.SECONDS);
+ fillService.awaitTermination(RaftServer.getReadOperationTimeoutMS(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("Unexpected interruption when waiting for fill pool to stop", e);