You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2022/04/14 10:16:25 UTC
[iotdb] branch metric_dependency_resolve updated: fix
This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch metric_dependency_resolve
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/metric_dependency_resolve by this push:
new 224694ee71 fix
224694ee71 is described below
commit 224694ee71b1cb4df9ce2ceb792282751e82f1a9
Author: LebronAl <TX...@gmail.com>
AuthorDate: Thu Apr 14 18:16:14 2022 +0800
fix
---
.../server/ConfigNodeRPCServerProcessor.java | 2 +-
.../confignode/consensus/RatisConsensusDemo.java | 9 +++------
consensus/pom.xml | 5 -----
.../iotdb/consensus/ratis/RatisConsensus.java | 23 ++++++++++------------
.../apache/iotdb/db/client/ConfigNodeClient.java | 15 ++++++++++++--
5 files changed, 27 insertions(+), 27 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessor.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessor.java
index 7da25479df..15ef51437f 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessor.java
@@ -84,7 +84,7 @@ public class ConfigNodeRPCServerProcessor implements ConfigIService.Iface {
TDataNodeRegisterResp resp = new TDataNodeRegisterResp();
dataSet.convertToRpcDataNodeRegisterResp(resp);
- LOGGER.error("{}", resp);
+ LOGGER.info("Execute RegisterDatanodeRequest {} with result {}", resp, req);
return resp;
}
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/RatisConsensusDemo.java b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/RatisConsensusDemo.java
index 9348655d81..63695fd4bc 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/RatisConsensusDemo.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/RatisConsensusDemo.java
@@ -34,7 +34,6 @@ import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.junit.Assert;
-import org.junit.Test;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -56,7 +55,6 @@ public class RatisConsensusDemo {
* ConfigNode by yourself 5. Add @Test 6. run ratisConsensusRegisterDemo 7. run
* ratisConsensusQueryDemo
*/
- @Test
public void ratisConsensusSetStorageGroupsDemo() throws TException, InterruptedException {
createClients();
setStorageGroups();
@@ -67,7 +65,6 @@ public class RatisConsensusDemo {
queryDataNodes();
}
- @Test
public void ratisConsensusQueryStorageGroupsDemo() throws TException, InterruptedException {
createClients();
queryStorageGroups();
@@ -81,7 +78,7 @@ public class RatisConsensusDemo {
private void createClients() throws TTransportException {
// Create clients for these three ConfigNodes
// to simulate DataNodes to send RPC requests
- clients = new ConfigIService.Client[1];
+ clients = new ConfigIService.Client[3];
for (int i = 0; i < 1; i++) {
TTransport transport =
RpcTransportFactory.INSTANCE.getTransport(localhost, 22277 + i * 2, timeOutInMS);
@@ -110,7 +107,7 @@ public class RatisConsensusDemo {
TimeUnit.SECONDS.sleep(1);
// DataNodes can connect to any ConfigNode and send read requests
- for (int i = 0; i < 1; i++) {
+ for (int i = 0; i < 3; i++) {
TDataNodeMessageResp msgMap = clients[i].getDataNodesMessage(-1);
System.out.printf(
"\nQuery DataNode message from ConfigNode 0.0.0.0:%d. Result: %s\n",
@@ -131,7 +128,7 @@ public class RatisConsensusDemo {
// sleep 1s to make sure all ConfigNode in ConfigNodeGroup hold the same PartitionTable
TimeUnit.SECONDS.sleep(1);
- for (int i = 0; i < 1; i++) {
+ for (int i = 0; i < 3; i++) {
TStorageGroupSchemaResp msgMap = clients[i].getStorageGroupsSchema();
System.out.printf(
"\nQuery StorageGroup message from ConfigNode 0.0.0.0:%d. Result: {\n", 22277 + i * 2);
diff --git a/consensus/pom.xml b/consensus/pom.xml
index bdb355d8b6..857276360e 100644
--- a/consensus/pom.xml
+++ b/consensus/pom.xml
@@ -41,11 +41,6 @@
<artifactId>ratis-common</artifactId>
<version>2.2.0</version>
</dependency>
- <dependency>
- <groupId>org.apache.ratis</groupId>
- <artifactId>ratis-netty</artifactId>
- <version>2.2.0</version>
- </dependency>
<dependency>
<groupId>org.apache.ratis</groupId>
<artifactId>ratis-grpc</artifactId>
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
index 91acd7b106..a87cafd2fe 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
@@ -37,12 +37,11 @@ import org.apache.iotdb.consensus.exception.PeerNotInConsensusGroupException;
import org.apache.iotdb.consensus.exception.RatisRequestFailedException;
import org.apache.iotdb.consensus.statemachine.IStateMachine;
-import org.apache.ratis.RaftConfigKeys;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.netty.NettyConfigKeys;
-import org.apache.ratis.netty.NettyFactory;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.grpc.GrpcFactory;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.GroupInfoReply;
import org.apache.ratis.protocol.Message;
@@ -52,7 +51,6 @@ import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.exceptions.NotLeaderException;
-import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.util.NetUtils;
@@ -107,11 +105,10 @@ class RatisConsensus implements IConsensus {
RaftProperties properties = new RaftProperties();
RaftServerConfigKeys.setStorageDir(properties, Collections.singletonList(ratisStorageDir));
- RaftConfigKeys.Rpc.setType(properties, SupportedRpcType.NETTY);
// set the port which server listen to in RaftProperty object
final int port = NetUtils.createSocketAddr(address).getPort();
- NettyConfigKeys.Server.setPort(properties, port);
+ GrpcConfigKeys.Server.setPort(properties, port);
server =
RaftServer.newBuilder()
@@ -154,7 +151,7 @@ class RatisConsensus implements IConsensus {
// 1. first try the local server
RaftClientRequest clientRequest =
buildRawRequest(groupId, message, RaftClientRequest.writeRequestType());
- RaftClientReply localServerReply = null;
+ RaftClientReply localServerReply;
RaftPeer suggestedLeader = null;
try {
localServerReply = server.submitClientRequest(clientRequest);
@@ -174,7 +171,7 @@ class RatisConsensus implements IConsensus {
// 2. try raft client
RaftClient client = clientMap.get(Utils.toRatisGroupId(groupId));
- TSStatus writeResult = null;
+ TSStatus writeResult;
try {
RaftClientReply reply = client.io().send(message);
writeResult = Utils.deserializeFrom(reply.getMessage().getContent().asReadOnlyByteBuffer());
@@ -199,7 +196,7 @@ class RatisConsensus implements IConsensus {
return failedRead(new ConsensusGroupNotExistException(groupId));
}
- RaftClientReply reply = null;
+ RaftClientReply reply;
try {
RequestMessage message = new RequestMessage(IConsensusRequest);
@@ -238,7 +235,7 @@ class RatisConsensus implements IConsensus {
RaftClient client = buildClientAndCache(group);
// add RaftPeer myself to this RaftGroup
- RaftClientReply reply = null;
+ RaftClientReply reply;
try {
reply = client.getGroupManagementApi(myself.getId()).add(group);
} catch (IOException e) {
@@ -267,7 +264,7 @@ class RatisConsensus implements IConsensus {
RaftClient client = clientMap.get(raftGroupId);
// send remove group to myself
- RaftClientReply reply = null;
+ RaftClientReply reply;
try {
reply = client.getGroupManagementApi(myself.getId()).remove(raftGroupId, false, false);
} catch (IOException e) {
@@ -384,7 +381,7 @@ class RatisConsensus implements IConsensus {
buildClientAndCache(raftGroup);
// add RaftPeer myself to this RaftGroup
- RaftClientReply reply = null;
+ RaftClientReply reply;
try {
reply = sendReconfiguration(raftGroup.getGroupId(), new ArrayList<>(raftGroup.getPeers()));
// sync again
@@ -511,7 +508,7 @@ class RatisConsensus implements IConsensus {
.setProperties(raftProperties)
.setRaftGroup(group)
.setClientRpc(
- new NettyFactory(new Parameters())
+ new GrpcFactory(new Parameters())
.newRaftClientRpc(ClientId.randomId(), raftProperties));
RaftClient client = builder.build();
closeRaftClient(group.getGroupId());
diff --git a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
index c3c5b610f7..0816618fb3 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
@@ -23,7 +23,18 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.cluster.Endpoint;
import org.apache.iotdb.commons.exception.BadNodeUrlException;
import org.apache.iotdb.commons.utils.CommonUtils;
-import org.apache.iotdb.confignode.rpc.thrift.*;
+import org.apache.iotdb.confignode.rpc.thrift.ConfigIService;
+import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeMessageResp;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterResp;
+import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionResp;
+import org.apache.iotdb.confignode.rpc.thrift.TDeleteStorageGroupReq;
+import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionResp;
+import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq;
+import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchemaResp;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.RpcTransportFactory;
@@ -157,10 +168,10 @@ public class ConfigNodeClient {
for (int i = 0; i < RETRY_NUM; i++) {
try {
TDataNodeRegisterResp resp = client.registerDataNode(req);
- logger.error("{}", resp);
if (!updateConfigNodeLeader(resp.status)) {
return resp;
}
+ logger.info("Register current node using request {} with response {}", req, resp);
} catch (TException e) {
configLeader = null;
}