You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by yu...@apache.org on 2020/07/20 08:58:34 UTC

[incubator-iotdb] branch kyy updated: use read and write config

This is an automated email from the ASF dual-hosted git repository.

yuyuankang pushed a commit to branch kyy
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/kyy by this push:
     new 62d4b79  use read and write config
62d4b79 is described below

commit 62d4b79c498dd571e1c0f64a12ab563c46968caa
Author: Ring-k <yu...@hotmail.com>
AuthorDate: Mon Jul 20 16:58:14 2020 +0800

    use read and write config
---
 .../cluster/client/sync/SyncClientAdaptor.java     | 63 ++++++++++++----------
 .../apache/iotdb/cluster/server/RaftServer.java    |  6 +++
 2 files changed, 41 insertions(+), 28 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
index 42e2a2b..7456fc3 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
@@ -103,13 +103,14 @@ public class SyncClientAdaptor {
     return resultRef.get();
   }
 
-  public static Long querySingleSeriesByTimestamp(AsyncDataClient client, SingleSeriesQueryRequest request)
+  public static Long querySingleSeriesByTimestamp(AsyncDataClient client,
+      SingleSeriesQueryRequest request)
       throws TException, InterruptedException {
     AtomicReference<Long> result = new AtomicReference<>();
     GenericHandler<Long> handler = new GenericHandler<>(client.getNode(), result);
     synchronized (result) {
       client.querySingleSeriesByTimestamp(request, handler);
-      result.wait(RaftServer.getConnectionTimeoutInMS());
+      result.wait(RaftServer.getReadOperationTimeoutMS());
     }
     return result.get();
   }
@@ -130,7 +131,7 @@ public class SyncClientAdaptor {
 
     synchronized (result) {
       client.querySingleSeries(request, handler);
-      result.wait(RaftServer.getConnectionTimeoutInMS());
+      result.wait(RaftServer.getReadOperationTimeoutMS());
     }
     return result.get();
   }
@@ -143,7 +144,7 @@ public class SyncClientAdaptor {
     handler.setContact(client.getNode());
     synchronized (response) {
       client.getNodeList(header, schemaPattern, level, handler);
-      response.wait(RaftServer.getConnectionTimeoutInMS());
+      response.wait(RaftServer.getReadOperationTimeoutMS());
     }
     return response.get();
   }
@@ -156,7 +157,7 @@ public class SyncClientAdaptor {
     handler.setContact(client.getNode());
     synchronized (response) {
       client.getChildNodePathInNextLevel(header, path, handler);
-      response.wait(RaftServer.getConnectionTimeoutInMS());
+      response.wait(RaftServer.getReadOperationTimeoutMS());
     }
     return response.get();
   }
@@ -175,7 +176,7 @@ public class SyncClientAdaptor {
       plan.serialize(dataOutputStream);
       client.getAllMeasurementSchema(header, ByteBuffer.wrap(byteArrayOutputStream.toByteArray()),
           handler);
-      response.wait(RaftServer.getConnectionTimeoutInMS());
+      response.wait(RaftServer.getReadOperationTimeoutMS());
     }
     return response.get();
   }
@@ -186,7 +187,7 @@ public class SyncClientAdaptor {
     GenericHandler<TNodeStatus> handler = new GenericHandler<>(client.getNode(), resultRef);
     synchronized (resultRef) {
       client.queryNodeStatus(handler);
-      resultRef.wait(RaftServer.getConnectionTimeoutInMS());
+      resultRef.wait(RaftServer.getReadOperationTimeoutMS());
     }
     return resultRef.get();
   }
@@ -197,12 +198,13 @@ public class SyncClientAdaptor {
     GenericHandler<CheckStatusResponse> handler = new GenericHandler<>(client.getNode(), resultRef);
     synchronized (resultRef) {
       client.checkStatus(startUpStatus, handler);
-      resultRef.wait(RaftServer.getConnectionTimeoutInMS());
+      resultRef.wait(RaftServer.getReadOperationTimeoutMS());
     }
     return resultRef.get();
   }
 
-  public static AddNodeResponse addNode(AsyncMetaClient client, Node thisNode, StartUpStatus startUpStatus)
+  public static AddNodeResponse addNode(AsyncMetaClient client, Node thisNode,
+      StartUpStatus startUpStatus)
       throws TException, InterruptedException {
     JoinClusterHandler handler = new JoinClusterHandler();
     AtomicReference<AddNodeResponse> response = new AtomicReference(null);
@@ -220,9 +222,10 @@ public class SyncClientAdaptor {
       PullSchemaRequest pullSchemaRequest) throws TException, InterruptedException {
     AtomicReference<List<MeasurementSchema>> timeseriesSchemas = new AtomicReference<>();
     synchronized (timeseriesSchemas) {
-      client.pullTimeSeriesSchema(pullSchemaRequest, new PullTimeseriesSchemaHandler(client.getNode(),
-          pullSchemaRequest.getPrefixPaths(), timeseriesSchemas));
-      timeseriesSchemas.wait(RaftServer.getConnectionTimeoutInMS());
+      client
+          .pullTimeSeriesSchema(pullSchemaRequest, new PullTimeseriesSchemaHandler(client.getNode(),
+              pullSchemaRequest.getPrefixPaths(), timeseriesSchemas));
+      timeseriesSchemas.wait(RaftServer.getReadOperationTimeoutMS());
     }
     return timeseriesSchemas.get();
   }
@@ -230,32 +233,35 @@ public class SyncClientAdaptor {
   public static List<ByteBuffer> getAggrResult(AsyncDataClient client, GetAggrResultRequest request)
       throws TException, InterruptedException {
     AtomicReference<List<ByteBuffer>> resultReference = new AtomicReference<>();
-    GenericHandler<List<ByteBuffer>> handler = new GenericHandler<>(client.getNode(), resultReference);
+    GenericHandler<List<ByteBuffer>> handler = new GenericHandler<>(client.getNode(),
+        resultReference);
     synchronized (resultReference) {
       resultReference.set(null);
       client.getAggrResult(request, handler);
-      resultReference.wait(RaftServer.getConnectionTimeoutInMS());
+      resultReference.wait(RaftServer.getReadOperationTimeoutMS());
     }
     return resultReference.get();
   }
 
-  public static List<String> getUnregisteredMeasurements(AsyncDataClient client, Node header, List<String> seriesPaths) throws TException, InterruptedException {
+  public static List<String> getUnregisteredMeasurements(AsyncDataClient client, Node header,
+      List<String> seriesPaths) throws TException, InterruptedException {
     AtomicReference<List<String>> remoteResult = new AtomicReference<>();
     GenericHandler<List<String>> handler = new GenericHandler<>(client.getNode(), remoteResult);
     synchronized (remoteResult) {
       client.getUnregisteredTimeseries(header, seriesPaths, handler);
-      remoteResult.wait(RaftServer.getConnectionTimeoutInMS());
+      remoteResult.wait(RaftServer.getReadOperationTimeoutMS());
     }
     return remoteResult.get();
   }
 
-  public static List<String> getAllPaths(AsyncDataClient client, Node header, List<String> pathsToQuery)
+  public static List<String> getAllPaths(AsyncDataClient client, Node header,
+      List<String> pathsToQuery)
       throws InterruptedException, TException {
     AtomicReference<List<String>> remoteResult = new AtomicReference<>();
     GenericHandler<List<String>> handler = new GenericHandler<>(client.getNode(), remoteResult);
     synchronized (remoteResult) {
       client.getAllPaths(header, pathsToQuery, handler);
-      remoteResult.wait(RaftServer.getConnectionTimeoutInMS());
+      remoteResult.wait(RaftServer.getReadOperationTimeoutMS());
     }
     return remoteResult.get();
   }
@@ -267,7 +273,7 @@ public class SyncClientAdaptor {
     GenericHandler<Integer> handler = new GenericHandler<>(client.getNode(), remoteResult);
     synchronized (remoteResult) {
       client.getPathCount(header, pathsToQuery, level, handler);
-      remoteResult.wait(RaftServer.getConnectionTimeoutInMS());
+      remoteResult.wait(RaftServer.getReadOperationTimeoutMS());
     }
     return remoteResult.get();
   }
@@ -279,7 +285,7 @@ public class SyncClientAdaptor {
     GenericHandler<Set<String>> handler = new GenericHandler<>(client.getNode(), remoteResult);
     synchronized (remoteResult) {
       client.getAllDevices(header, pathsToQuery, handler);
-      remoteResult.wait(RaftServer.getConnectionTimeoutInMS());
+      remoteResult.wait(RaftServer.getReadOperationTimeoutMS());
     }
     return remoteResult.get();
   }
@@ -291,7 +297,7 @@ public class SyncClientAdaptor {
     synchronized (result) {
       result.set(null);
       client.getGroupByExecutor(request, handler);
-      result.wait(RaftServer.getConnectionTimeoutInMS());
+      result.wait(RaftServer.getReadOperationTimeoutMS());
     }
     return result.get();
   }
@@ -322,7 +328,7 @@ public class SyncClientAdaptor {
 
     synchronized (status) {
       client.executeNonQueryPlan(req, new ForwardPlanHandler(status, plan, receiver));
-      status.wait(RaftServer.getConnectionTimeoutInMS());
+      status.wait(RaftServer.getWriteOperationTimeoutMS());
     }
 
     return status.get();
@@ -335,19 +341,20 @@ public class SyncClientAdaptor {
     GenericHandler<ByteBuffer> handler = new GenericHandler<>(client.getNode(), result);
     synchronized (result) {
       client.readFile(remotePath, offset, fetchSize, handler);
-      result.wait(RaftServer.getConnectionTimeoutInMS());
+      result.wait(RaftServer.getWriteOperationTimeoutMS());
     }
     return result.get();
   }
 
-  public static List<ByteBuffer> getGroupByResult(AsyncDataClient client, Node header, long executorId
+  public static List<ByteBuffer> getGroupByResult(AsyncDataClient client, Node header,
+      long executorId
       , long curStartTime, long curEndTime) throws InterruptedException, TException {
     AtomicReference<List<ByteBuffer>> fetchResult = new AtomicReference<>();
     GenericHandler<List<ByteBuffer>> handler = new GenericHandler<>(client.getNode(), fetchResult);
     synchronized (fetchResult) {
       fetchResult.set(null);
       client.getGroupByResult(header, executorId, curStartTime, curEndTime, handler);
-      fetchResult.wait(RaftServer.getConnectionTimeoutInMS());
+      fetchResult.wait(RaftServer.getReadOperationTimeoutMS());
     }
     return fetchResult.get();
   }
@@ -359,7 +366,7 @@ public class SyncClientAdaptor {
     synchronized (snapshotRef) {
       client.pullSnapshot(request, new PullSnapshotHandler<>(snapshotRef,
           client.getNode(), slots, factory));
-      snapshotRef.wait(RaftServer.getConnectionTimeoutInMS());
+      snapshotRef.wait(RaftServer.getReadOperationTimeoutMS());
     }
     return snapshotRef.get();
   }
@@ -373,7 +380,7 @@ public class SyncClientAdaptor {
         context.getQueryId(), deviceMeasurements, header, client.getNode());
     synchronized (result) {
       client.last(request, handler);
-      result.wait(RaftServer.getConnectionTimeoutInMS());
+      result.wait(RaftServer.getReadOperationTimeoutMS());
     }
     return result.get();
   }
@@ -384,7 +391,7 @@ public class SyncClientAdaptor {
     GenericHandler<Boolean> handler = new GenericHandler<>(client.getNode(), result);
     synchronized (result) {
       client.onSnapshotApplied(header, slots, handler);
-      result.wait(RaftServer.getConnectionTimeoutInMS());
+      result.wait(RaftServer.getWriteOperationTimeoutMS());
     }
     return result.get();
   }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/RaftServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/RaftServer.java
index 42986b8..79f37ca 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/RaftServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/RaftServer.java
@@ -58,6 +58,8 @@ public abstract class RaftServer implements RaftService.AsyncIface, RaftService.
       ClusterDescriptor.getInstance().getConfig().getConnectionTimeoutInMS();
   private static int readOperationTimeoutMS =
       ClusterDescriptor.getInstance().getConfig().getReadOperationTimeoutMS();
+  private static int writeOperationTimeoutMS =
+      ClusterDescriptor.getInstance().getConfig().getWriteOperationTimeoutMS();
   private static int syncLeaderMaxWaitMs = 20 * 1000;
   private static long heartBeatIntervalMs = 1000L;
 
@@ -97,6 +99,10 @@ public abstract class RaftServer implements RaftService.AsyncIface, RaftService.
     return readOperationTimeoutMS;
   }
 
+  public static int getWriteOperationTimeoutMS() {
+    return writeOperationTimeoutMS;
+  }
+
   public static int getSyncLeaderMaxWaitMs() {
     return syncLeaderMaxWaitMs;
   }