You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2022/04/14 10:16:25 UTC

[iotdb] branch metric_dependency_resolve updated: fix

This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch metric_dependency_resolve
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/metric_dependency_resolve by this push:
     new 224694ee71 fix
224694ee71 is described below

commit 224694ee71b1cb4df9ce2ceb792282751e82f1a9
Author: LebronAl <TX...@gmail.com>
AuthorDate: Thu Apr 14 18:16:14 2022 +0800

    fix
---
 .../server/ConfigNodeRPCServerProcessor.java       |  2 +-
 .../confignode/consensus/RatisConsensusDemo.java   |  9 +++------
 consensus/pom.xml                                  |  5 -----
 .../iotdb/consensus/ratis/RatisConsensus.java      | 23 ++++++++++------------
 .../apache/iotdb/db/client/ConfigNodeClient.java   | 15 ++++++++++++--
 5 files changed, 27 insertions(+), 27 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessor.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessor.java
index 7da25479df..15ef51437f 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessor.java
@@ -84,7 +84,7 @@ public class ConfigNodeRPCServerProcessor implements ConfigIService.Iface {
 
     TDataNodeRegisterResp resp = new TDataNodeRegisterResp();
     dataSet.convertToRpcDataNodeRegisterResp(resp);
-    LOGGER.error("{}", resp);
+    LOGGER.info("Execute RegisterDatanodeRequest {} with result {}", resp, req);
     return resp;
   }
 
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/RatisConsensusDemo.java b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/RatisConsensusDemo.java
index 9348655d81..63695fd4bc 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/RatisConsensusDemo.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/RatisConsensusDemo.java
@@ -34,7 +34,6 @@ import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
 import org.junit.Assert;
-import org.junit.Test;
 
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
@@ -56,7 +55,6 @@ public class RatisConsensusDemo {
    * ConfigNode by yourself 5. Add @Test 6. run ratisConsensusRegisterDemo 7. run
    * ratisConsensusQueryDemo
    */
-  @Test
   public void ratisConsensusSetStorageGroupsDemo() throws TException, InterruptedException {
     createClients();
     setStorageGroups();
@@ -67,7 +65,6 @@ public class RatisConsensusDemo {
     queryDataNodes();
   }
 
-  @Test
   public void ratisConsensusQueryStorageGroupsDemo() throws TException, InterruptedException {
     createClients();
     queryStorageGroups();
@@ -81,7 +78,7 @@ public class RatisConsensusDemo {
   private void createClients() throws TTransportException {
     // Create clients for these three ConfigNodes
     // to simulate DataNodes to send RPC requests
-    clients = new ConfigIService.Client[1];
+    clients = new ConfigIService.Client[3];
     for (int i = 0; i < 1; i++) {
       TTransport transport =
           RpcTransportFactory.INSTANCE.getTransport(localhost, 22277 + i * 2, timeOutInMS);
@@ -110,7 +107,7 @@ public class RatisConsensusDemo {
     TimeUnit.SECONDS.sleep(1);
 
     // DataNodes can connect to any ConfigNode and send read requests
-    for (int i = 0; i < 1; i++) {
+    for (int i = 0; i < 3; i++) {
       TDataNodeMessageResp msgMap = clients[i].getDataNodesMessage(-1);
       System.out.printf(
           "\nQuery DataNode message from ConfigNode 0.0.0.0:%d. Result: %s\n",
@@ -131,7 +128,7 @@ public class RatisConsensusDemo {
     // sleep 1s to make sure all ConfigNode in ConfigNodeGroup hold the same PartitionTable
     TimeUnit.SECONDS.sleep(1);
 
-    for (int i = 0; i < 1; i++) {
+    for (int i = 0; i < 3; i++) {
       TStorageGroupSchemaResp msgMap = clients[i].getStorageGroupsSchema();
       System.out.printf(
           "\nQuery StorageGroup message from ConfigNode 0.0.0.0:%d. Result: {\n", 22277 + i * 2);
diff --git a/consensus/pom.xml b/consensus/pom.xml
index bdb355d8b6..857276360e 100644
--- a/consensus/pom.xml
+++ b/consensus/pom.xml
@@ -41,11 +41,6 @@
             <artifactId>ratis-common</artifactId>
             <version>2.2.0</version>
         </dependency>
-        <dependency>
-            <groupId>org.apache.ratis</groupId>
-            <artifactId>ratis-netty</artifactId>
-            <version>2.2.0</version>
-        </dependency>
         <dependency>
             <groupId>org.apache.ratis</groupId>
             <artifactId>ratis-grpc</artifactId>
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
index 91acd7b106..a87cafd2fe 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
@@ -37,12 +37,11 @@ import org.apache.iotdb.consensus.exception.PeerNotInConsensusGroupException;
 import org.apache.iotdb.consensus.exception.RatisRequestFailedException;
 import org.apache.iotdb.consensus.statemachine.IStateMachine;
 
-import org.apache.ratis.RaftConfigKeys;
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.conf.Parameters;
 import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.netty.NettyConfigKeys;
-import org.apache.ratis.netty.NettyFactory;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.grpc.GrpcFactory;
 import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.GroupInfoReply;
 import org.apache.ratis.protocol.Message;
@@ -52,7 +51,6 @@ import org.apache.ratis.protocol.RaftGroup;
 import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.exceptions.NotLeaderException;
-import org.apache.ratis.rpc.SupportedRpcType;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.util.NetUtils;
@@ -107,11 +105,10 @@ class RatisConsensus implements IConsensus {
     RaftProperties properties = new RaftProperties();
 
     RaftServerConfigKeys.setStorageDir(properties, Collections.singletonList(ratisStorageDir));
-    RaftConfigKeys.Rpc.setType(properties, SupportedRpcType.NETTY);
 
     // set the port which server listen to in RaftProperty object
     final int port = NetUtils.createSocketAddr(address).getPort();
-    NettyConfigKeys.Server.setPort(properties, port);
+    GrpcConfigKeys.Server.setPort(properties, port);
 
     server =
         RaftServer.newBuilder()
@@ -154,7 +151,7 @@ class RatisConsensus implements IConsensus {
     // 1. first try the local server
     RaftClientRequest clientRequest =
         buildRawRequest(groupId, message, RaftClientRequest.writeRequestType());
-    RaftClientReply localServerReply = null;
+    RaftClientReply localServerReply;
     RaftPeer suggestedLeader = null;
     try {
       localServerReply = server.submitClientRequest(clientRequest);
@@ -174,7 +171,7 @@ class RatisConsensus implements IConsensus {
 
     // 2. try raft client
     RaftClient client = clientMap.get(Utils.toRatisGroupId(groupId));
-    TSStatus writeResult = null;
+    TSStatus writeResult;
     try {
       RaftClientReply reply = client.io().send(message);
       writeResult = Utils.deserializeFrom(reply.getMessage().getContent().asReadOnlyByteBuffer());
@@ -199,7 +196,7 @@ class RatisConsensus implements IConsensus {
       return failedRead(new ConsensusGroupNotExistException(groupId));
     }
 
-    RaftClientReply reply = null;
+    RaftClientReply reply;
     try {
       RequestMessage message = new RequestMessage(IConsensusRequest);
 
@@ -238,7 +235,7 @@ class RatisConsensus implements IConsensus {
     RaftClient client = buildClientAndCache(group);
 
     // add RaftPeer myself to this RaftGroup
-    RaftClientReply reply = null;
+    RaftClientReply reply;
     try {
       reply = client.getGroupManagementApi(myself.getId()).add(group);
     } catch (IOException e) {
@@ -267,7 +264,7 @@ class RatisConsensus implements IConsensus {
 
     RaftClient client = clientMap.get(raftGroupId);
     // send remove group to myself
-    RaftClientReply reply = null;
+    RaftClientReply reply;
     try {
       reply = client.getGroupManagementApi(myself.getId()).remove(raftGroupId, false, false);
     } catch (IOException e) {
@@ -384,7 +381,7 @@ class RatisConsensus implements IConsensus {
     buildClientAndCache(raftGroup);
 
     // add RaftPeer myself to this RaftGroup
-    RaftClientReply reply = null;
+    RaftClientReply reply;
     try {
       reply = sendReconfiguration(raftGroup.getGroupId(), new ArrayList<>(raftGroup.getPeers()));
       // sync again
@@ -511,7 +508,7 @@ class RatisConsensus implements IConsensus {
             .setProperties(raftProperties)
             .setRaftGroup(group)
             .setClientRpc(
-                new NettyFactory(new Parameters())
+                new GrpcFactory(new Parameters())
                     .newRaftClientRpc(ClientId.randomId(), raftProperties));
     RaftClient client = builder.build();
     closeRaftClient(group.getGroupId());
diff --git a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
index c3c5b610f7..0816618fb3 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
@@ -23,7 +23,18 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.cluster.Endpoint;
 import org.apache.iotdb.commons.exception.BadNodeUrlException;
 import org.apache.iotdb.commons.utils.CommonUtils;
-import org.apache.iotdb.confignode.rpc.thrift.*;
+import org.apache.iotdb.confignode.rpc.thrift.ConfigIService;
+import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeMessageResp;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterResp;
+import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionResp;
+import org.apache.iotdb.confignode.rpc.thrift.TDeleteStorageGroupReq;
+import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionResp;
+import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq;
+import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchemaResp;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.rpc.RpcTransportFactory;
@@ -157,10 +168,10 @@ public class ConfigNodeClient {
     for (int i = 0; i < RETRY_NUM; i++) {
       try {
         TDataNodeRegisterResp resp = client.registerDataNode(req);
-        logger.error("{}", resp);
         if (!updateConfigNodeLeader(resp.status)) {
           return resp;
         }
+        logger.info("Register current node using request {} with response {}", req, resp);
       } catch (TException e) {
         configLeader = null;
       }