You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by yu...@apache.org on 2020/07/20 08:58:34 UTC
[incubator-iotdb] branch kyy updated: use read and write config
This is an automated email from the ASF dual-hosted git repository.
yuyuankang pushed a commit to branch kyy
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/kyy by this push:
new 62d4b79 use read and write config
62d4b79 is described below
commit 62d4b79c498dd571e1c0f64a12ab563c46968caa
Author: Ring-k <yu...@hotmail.com>
AuthorDate: Mon Jul 20 16:58:14 2020 +0800
use read and write config
---
.../cluster/client/sync/SyncClientAdaptor.java | 63 ++++++++++++----------
.../apache/iotdb/cluster/server/RaftServer.java | 6 +++
2 files changed, 41 insertions(+), 28 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
index 42e2a2b..7456fc3 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
@@ -103,13 +103,14 @@ public class SyncClientAdaptor {
return resultRef.get();
}
- public static Long querySingleSeriesByTimestamp(AsyncDataClient client, SingleSeriesQueryRequest request)
+ public static Long querySingleSeriesByTimestamp(AsyncDataClient client,
+ SingleSeriesQueryRequest request)
throws TException, InterruptedException {
AtomicReference<Long> result = new AtomicReference<>();
GenericHandler<Long> handler = new GenericHandler<>(client.getNode(), result);
synchronized (result) {
client.querySingleSeriesByTimestamp(request, handler);
- result.wait(RaftServer.getConnectionTimeoutInMS());
+ result.wait(RaftServer.getReadOperationTimeoutMS());
}
return result.get();
}
@@ -130,7 +131,7 @@ public class SyncClientAdaptor {
synchronized (result) {
client.querySingleSeries(request, handler);
- result.wait(RaftServer.getConnectionTimeoutInMS());
+ result.wait(RaftServer.getReadOperationTimeoutMS());
}
return result.get();
}
@@ -143,7 +144,7 @@ public class SyncClientAdaptor {
handler.setContact(client.getNode());
synchronized (response) {
client.getNodeList(header, schemaPattern, level, handler);
- response.wait(RaftServer.getConnectionTimeoutInMS());
+ response.wait(RaftServer.getReadOperationTimeoutMS());
}
return response.get();
}
@@ -156,7 +157,7 @@ public class SyncClientAdaptor {
handler.setContact(client.getNode());
synchronized (response) {
client.getChildNodePathInNextLevel(header, path, handler);
- response.wait(RaftServer.getConnectionTimeoutInMS());
+ response.wait(RaftServer.getReadOperationTimeoutMS());
}
return response.get();
}
@@ -175,7 +176,7 @@ public class SyncClientAdaptor {
plan.serialize(dataOutputStream);
client.getAllMeasurementSchema(header, ByteBuffer.wrap(byteArrayOutputStream.toByteArray()),
handler);
- response.wait(RaftServer.getConnectionTimeoutInMS());
+ response.wait(RaftServer.getReadOperationTimeoutMS());
}
return response.get();
}
@@ -186,7 +187,7 @@ public class SyncClientAdaptor {
GenericHandler<TNodeStatus> handler = new GenericHandler<>(client.getNode(), resultRef);
synchronized (resultRef) {
client.queryNodeStatus(handler);
- resultRef.wait(RaftServer.getConnectionTimeoutInMS());
+ resultRef.wait(RaftServer.getReadOperationTimeoutMS());
}
return resultRef.get();
}
@@ -197,12 +198,13 @@ public class SyncClientAdaptor {
GenericHandler<CheckStatusResponse> handler = new GenericHandler<>(client.getNode(), resultRef);
synchronized (resultRef) {
client.checkStatus(startUpStatus, handler);
- resultRef.wait(RaftServer.getConnectionTimeoutInMS());
+ resultRef.wait(RaftServer.getReadOperationTimeoutMS());
}
return resultRef.get();
}
- public static AddNodeResponse addNode(AsyncMetaClient client, Node thisNode, StartUpStatus startUpStatus)
+ public static AddNodeResponse addNode(AsyncMetaClient client, Node thisNode,
+ StartUpStatus startUpStatus)
throws TException, InterruptedException {
JoinClusterHandler handler = new JoinClusterHandler();
AtomicReference<AddNodeResponse> response = new AtomicReference(null);
@@ -220,9 +222,10 @@ public class SyncClientAdaptor {
PullSchemaRequest pullSchemaRequest) throws TException, InterruptedException {
AtomicReference<List<MeasurementSchema>> timeseriesSchemas = new AtomicReference<>();
synchronized (timeseriesSchemas) {
- client.pullTimeSeriesSchema(pullSchemaRequest, new PullTimeseriesSchemaHandler(client.getNode(),
- pullSchemaRequest.getPrefixPaths(), timeseriesSchemas));
- timeseriesSchemas.wait(RaftServer.getConnectionTimeoutInMS());
+ client
+ .pullTimeSeriesSchema(pullSchemaRequest, new PullTimeseriesSchemaHandler(client.getNode(),
+ pullSchemaRequest.getPrefixPaths(), timeseriesSchemas));
+ timeseriesSchemas.wait(RaftServer.getReadOperationTimeoutMS());
}
return timeseriesSchemas.get();
}
@@ -230,32 +233,35 @@ public class SyncClientAdaptor {
public static List<ByteBuffer> getAggrResult(AsyncDataClient client, GetAggrResultRequest request)
throws TException, InterruptedException {
AtomicReference<List<ByteBuffer>> resultReference = new AtomicReference<>();
- GenericHandler<List<ByteBuffer>> handler = new GenericHandler<>(client.getNode(), resultReference);
+ GenericHandler<List<ByteBuffer>> handler = new GenericHandler<>(client.getNode(),
+ resultReference);
synchronized (resultReference) {
resultReference.set(null);
client.getAggrResult(request, handler);
- resultReference.wait(RaftServer.getConnectionTimeoutInMS());
+ resultReference.wait(RaftServer.getReadOperationTimeoutMS());
}
return resultReference.get();
}
- public static List<String> getUnregisteredMeasurements(AsyncDataClient client, Node header, List<String> seriesPaths) throws TException, InterruptedException {
+ public static List<String> getUnregisteredMeasurements(AsyncDataClient client, Node header,
+ List<String> seriesPaths) throws TException, InterruptedException {
AtomicReference<List<String>> remoteResult = new AtomicReference<>();
GenericHandler<List<String>> handler = new GenericHandler<>(client.getNode(), remoteResult);
synchronized (remoteResult) {
client.getUnregisteredTimeseries(header, seriesPaths, handler);
- remoteResult.wait(RaftServer.getConnectionTimeoutInMS());
+ remoteResult.wait(RaftServer.getReadOperationTimeoutMS());
}
return remoteResult.get();
}
- public static List<String> getAllPaths(AsyncDataClient client, Node header, List<String> pathsToQuery)
+ public static List<String> getAllPaths(AsyncDataClient client, Node header,
+ List<String> pathsToQuery)
throws InterruptedException, TException {
AtomicReference<List<String>> remoteResult = new AtomicReference<>();
GenericHandler<List<String>> handler = new GenericHandler<>(client.getNode(), remoteResult);
synchronized (remoteResult) {
client.getAllPaths(header, pathsToQuery, handler);
- remoteResult.wait(RaftServer.getConnectionTimeoutInMS());
+ remoteResult.wait(RaftServer.getReadOperationTimeoutMS());
}
return remoteResult.get();
}
@@ -267,7 +273,7 @@ public class SyncClientAdaptor {
GenericHandler<Integer> handler = new GenericHandler<>(client.getNode(), remoteResult);
synchronized (remoteResult) {
client.getPathCount(header, pathsToQuery, level, handler);
- remoteResult.wait(RaftServer.getConnectionTimeoutInMS());
+ remoteResult.wait(RaftServer.getReadOperationTimeoutMS());
}
return remoteResult.get();
}
@@ -279,7 +285,7 @@ public class SyncClientAdaptor {
GenericHandler<Set<String>> handler = new GenericHandler<>(client.getNode(), remoteResult);
synchronized (remoteResult) {
client.getAllDevices(header, pathsToQuery, handler);
- remoteResult.wait(RaftServer.getConnectionTimeoutInMS());
+ remoteResult.wait(RaftServer.getReadOperationTimeoutMS());
}
return remoteResult.get();
}
@@ -291,7 +297,7 @@ public class SyncClientAdaptor {
synchronized (result) {
result.set(null);
client.getGroupByExecutor(request, handler);
- result.wait(RaftServer.getConnectionTimeoutInMS());
+ result.wait(RaftServer.getReadOperationTimeoutMS());
}
return result.get();
}
@@ -322,7 +328,7 @@ public class SyncClientAdaptor {
synchronized (status) {
client.executeNonQueryPlan(req, new ForwardPlanHandler(status, plan, receiver));
- status.wait(RaftServer.getConnectionTimeoutInMS());
+ status.wait(RaftServer.getWriteOperationTimeoutMS());
}
return status.get();
@@ -335,19 +341,20 @@ public class SyncClientAdaptor {
GenericHandler<ByteBuffer> handler = new GenericHandler<>(client.getNode(), result);
synchronized (result) {
client.readFile(remotePath, offset, fetchSize, handler);
- result.wait(RaftServer.getConnectionTimeoutInMS());
+ result.wait(RaftServer.getWriteOperationTimeoutMS());
}
return result.get();
}
- public static List<ByteBuffer> getGroupByResult(AsyncDataClient client, Node header, long executorId
+ public static List<ByteBuffer> getGroupByResult(AsyncDataClient client, Node header,
+ long executorId
, long curStartTime, long curEndTime) throws InterruptedException, TException {
AtomicReference<List<ByteBuffer>> fetchResult = new AtomicReference<>();
GenericHandler<List<ByteBuffer>> handler = new GenericHandler<>(client.getNode(), fetchResult);
synchronized (fetchResult) {
fetchResult.set(null);
client.getGroupByResult(header, executorId, curStartTime, curEndTime, handler);
- fetchResult.wait(RaftServer.getConnectionTimeoutInMS());
+ fetchResult.wait(RaftServer.getReadOperationTimeoutMS());
}
return fetchResult.get();
}
@@ -359,7 +366,7 @@ public class SyncClientAdaptor {
synchronized (snapshotRef) {
client.pullSnapshot(request, new PullSnapshotHandler<>(snapshotRef,
client.getNode(), slots, factory));
- snapshotRef.wait(RaftServer.getConnectionTimeoutInMS());
+ snapshotRef.wait(RaftServer.getReadOperationTimeoutMS());
}
return snapshotRef.get();
}
@@ -373,7 +380,7 @@ public class SyncClientAdaptor {
context.getQueryId(), deviceMeasurements, header, client.getNode());
synchronized (result) {
client.last(request, handler);
- result.wait(RaftServer.getConnectionTimeoutInMS());
+ result.wait(RaftServer.getReadOperationTimeoutMS());
}
return result.get();
}
@@ -384,7 +391,7 @@ public class SyncClientAdaptor {
GenericHandler<Boolean> handler = new GenericHandler<>(client.getNode(), result);
synchronized (result) {
client.onSnapshotApplied(header, slots, handler);
- result.wait(RaftServer.getConnectionTimeoutInMS());
+ result.wait(RaftServer.getWriteOperationTimeoutMS());
}
return result.get();
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/RaftServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/RaftServer.java
index 42986b8..79f37ca 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/RaftServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/RaftServer.java
@@ -58,6 +58,8 @@ public abstract class RaftServer implements RaftService.AsyncIface, RaftService.
ClusterDescriptor.getInstance().getConfig().getConnectionTimeoutInMS();
private static int readOperationTimeoutMS =
ClusterDescriptor.getInstance().getConfig().getReadOperationTimeoutMS();
+ private static int writeOperationTimeoutMS =
+ ClusterDescriptor.getInstance().getConfig().getWriteOperationTimeoutMS();
private static int syncLeaderMaxWaitMs = 20 * 1000;
private static long heartBeatIntervalMs = 1000L;
@@ -97,6 +99,10 @@ public abstract class RaftServer implements RaftService.AsyncIface, RaftService.
return readOperationTimeoutMS;
}
+ public static int getWriteOperationTimeoutMS() {
+ return writeOperationTimeoutMS;
+ }
+
public static int getSyncLeaderMaxWaitMs() {
return syncLeaderMaxWaitMs;
}