You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2021/06/01 11:08:55 UTC

[iotdb] 02/02: implement server interfaces

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch expr
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 438cd8cdebdbf3390795c089bf53656d178a7cef
Author: jt <jt...@163.com>
AuthorDate: Thu May 27 17:17:41 2021 +0800

    implement server interfaces
---
 .../iotdb/cluster/server/DataClusterServer.java    | 27 ++++++++++++++++++++++
 .../iotdb/cluster/server/MetaClusterServer.java    | 25 ++++++++++++++++++++
 2 files changed, 52 insertions(+)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
index d88d1a3..69469b0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.cluster.partition.PartitionGroup;
 import org.apache.iotdb.cluster.partition.PartitionTable;
 import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntryAcknowledgement;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
 import org.apache.iotdb.cluster.rpc.thrift.ElectionRequest;
 import org.apache.iotdb.cluster.rpc.thrift.ExecutNonQueryReq;
@@ -963,4 +964,30 @@ public class DataClusterServer extends RaftServer
     getDataAsyncService(thisNode, resultHandler, hardLinkPath)
         .removeHardLink(hardLinkPath, resultHandler);
   }
+
+  @Override
+  public long appendEntryIndirect(AppendEntryRequest request, List<Node> subReceivers)
+      throws TException {
+    return getDataSyncService(thisNode)
+        .appendEntryIndirect(request, subReceivers);
+  }
+
+  @Override
+  public void acknowledgeAppendEntry(AppendEntryAcknowledgement ack) {
+    getDataSyncService(thisNode).acknowledgeAppendEntry(ack);
+  }
+
+  @Override
+  public void appendEntryIndirect(AppendEntryRequest request, List<Node> subReceivers,
+      AsyncMethodCallback<Long> resultHandler) {
+    getDataAsyncService(thisNode, resultHandler, request)
+        .appendEntryIndirect(request, subReceivers, resultHandler);
+  }
+
+  @Override
+  public void acknowledgeAppendEntry(AppendEntryAcknowledgement ack,
+      AsyncMethodCallback<Void> resultHandler) {
+    getDataAsyncService(thisNode, resultHandler, ack)
+        .acknowledgeAppendEntry(ack, resultHandler);
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
index ddc64cc..f18d920 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.cluster.server;
 
+import java.util.List;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.coordinator.Coordinator;
 import org.apache.iotdb.cluster.exception.ConfigInconsistentException;
@@ -26,6 +27,7 @@ import org.apache.iotdb.cluster.metadata.CMManager;
 import org.apache.iotdb.cluster.metadata.MetaPuller;
 import org.apache.iotdb.cluster.rpc.thrift.AddNodeResponse;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntryAcknowledgement;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
 import org.apache.iotdb.cluster.rpc.thrift.CheckStatusResponse;
 import org.apache.iotdb.cluster.rpc.thrift.ElectionRequest;
@@ -370,4 +372,27 @@ public class MetaClusterServer extends RaftServer
   public void handshake(Node sender, AsyncMethodCallback<Void> resultHandler) {
     asyncService.handshake(sender, resultHandler);
   }
+
+  @Override
+  public long appendEntryIndirect(AppendEntryRequest request, List<Node> subReceivers)
+      throws TException {
+    return syncService.appendEntryIndirect(request, subReceivers);
+  }
+
+  @Override
+  public void acknowledgeAppendEntry(AppendEntryAcknowledgement ack) {
+    syncService.acknowledgeAppendEntry(ack);
+  }
+
+  @Override
+  public void appendEntryIndirect(AppendEntryRequest request, List<Node> subReceivers,
+      AsyncMethodCallback<Long> resultHandler) {
+    asyncService.appendEntryIndirect(request, subReceivers, resultHandler);
+  }
+
+  @Override
+  public void acknowledgeAppendEntry(AppendEntryAcknowledgement ack,
+      AsyncMethodCallback<Void> resultHandler) {
+    asyncService.acknowledgeAppendEntry(ack, resultHandler);
+  }
 }