You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2020/12/16 15:26:39 UTC
[iotdb] branch change_rpc_port updated: add name predix for some
threads in cluster module; add rpc ip for Nodes
This is an automated email from the ASF dual-hosted git repository.
hxd pushed a commit to branch change_rpc_port
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/change_rpc_port by this push:
new a21f05e add name predix for some threads in cluster module; add rpc ip for Nodes
a21f05e is described below
commit a21f05e920cd048a307dba7fc19c02574d1447d5
Author: xiangdong huang <sa...@gmail.com>
AuthorDate: Wed Dec 16 23:26:05 2020 +0800
add name predix for some threads in cluster module; add rpc ip for Nodes
---
.../java/org/apache/iotdb/cluster/ClusterMain.java | 47 ++++++++++++++--------
.../iotdb/cluster/client/DataClientProvider.java | 2 +-
.../cluster/client/async/AsyncClientPool.java | 8 ++--
.../iotdb/cluster/client/sync/SyncClientPool.java | 2 +-
.../cluster/server/member/DataGroupMember.java | 6 +--
.../cluster/server/member/MetaGroupMember.java | 4 +-
.../apache/iotdb/cluster/utils/ClusterNode.java | 5 ++-
.../apache/iotdb/cluster/utils/ClusterUtils.java | 6 +--
.../cluster/client/async/AsyncClientPoolTest.java | 12 +++---
.../cluster/client/async/AsyncDataClientTest.java | 2 +-
.../cluster/client/async/AsyncMetaClientTest.java | 2 +-
.../cluster/common/TestPartitionedLogManager.java | 10 +++--
.../apache/iotdb/cluster/log/HardStateTest.java | 3 +-
.../cluster/log/applier/MetaLogApplierTest.java | 6 ++-
.../cluster/log/logtypes/SerializeLogTest.java | 4 +-
.../cluster/partition/SlotPartitionTableTest.java | 15 +++----
.../handlers/caller/HeartbeatHandlerTest.java | 3 +-
.../cluster/server/member/DataGroupMemberTest.java | 6 ++-
.../cluster/server/member/MetaGroupMemberTest.java | 3 +-
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 2 +-
20 files changed, 86 insertions(+), 62 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java
index 3c54165..15b3bd7 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java
@@ -44,7 +44,6 @@ import org.apache.iotdb.cluster.utils.ClusterUtils;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.StartupException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.service.RPCService;
import org.apache.thrift.TException;
import org.apache.thrift.async.TAsyncClientManager;
import org.apache.thrift.protocol.TBinaryProtocol.Factory;
@@ -71,28 +70,31 @@ public class ClusterMain {
private static final String OPTION_CLUSTER_RPC_PORT = "rpc_port";
private static final String OPTION_SEED_NODES = "seed_nodes";
private static final String OPTION_DEBUG_RPC_PORT = "debug_rpc_port";
+ private static final String OPTION_CLUSTER_RPC_IP = "rpc_address";
private static MetaClusterServer metaServer;
public static void main(String[] args) {
if (args.length < 1) {
logger.error("Usage: <-s|-a|-r> [-{} <internal meta port>] "
- + "[-{} <internal data port>] "
- + "[-{} <cluster rpc port>] "
- + "[-{} <node1:meta_port:data_port:cluster_rpc_port,"
- + "node2:meta_port:data_port:cluster_rpc_port,"
- + "...,noden:meta_port:data_port:cluster_rpc_port,>] "
- + "[-{} <rpc port>]\n"
- + "-s: start the node as a seed\n"
- + "-a: start the node as a new node\n"
- + "-r: remove the node out of the cluster\n"
- + "",
- OPTION_INTERVAL_META_PORT,
- OPTION_INTERVAL_DATA_PORT,
- OPTION_CLUSTER_RPC_PORT,
- OPTION_SEED_NODES,
- OPTION_DEBUG_RPC_PORT
- );
+ + "[-{} <internal data port>] "
+ + "[-{} <cluster rpc port>] "
+ + "[-{} <cluster RPC address>]\n"
+ + "[-{} <node1:meta_port:data_port:cluster_rpc_port,"
+ + "node2:meta_port:data_port:cluster_rpc_port,"
+ + "...,noden:meta_port:data_port:cluster_rpc_port,>] "
+ + "[-{} <debug rpc port>]"
+ + "-s: start the node as a seed\n"
+ + "-a: start the node as a new node\n"
+ + "-r: remove the node out of the cluster\n"
+ + "",
+ OPTION_INTERVAL_META_PORT,
+ OPTION_INTERVAL_DATA_PORT,
+ OPTION_CLUSTER_RPC_PORT,
+ OPTION_CLUSTER_RPC_IP,
+ OPTION_SEED_NODES,
+ OPTION_DEBUG_RPC_PORT
+ );
return;
}
String mode = args[0];
@@ -231,6 +233,11 @@ public class ClusterMain {
clusterRpcPort.setRequired(false);
options.addOption(clusterRpcPort);
+ Option clusterRpcIP = new Option(OPTION_CLUSTER_RPC_IP, OPTION_CLUSTER_RPC_IP, true,
+ "IP for client service");
+ clusterRpcIP.setRequired(false);
+ options.addOption(clusterRpcIP);
+
Option seedNodes = new Option(OPTION_SEED_NODES, OPTION_SEED_NODES, true,
"comma-separated {IP/DOMAIN}:meta_port:data_port:client_port pairs");
seedNodes.setRequired(false);
@@ -264,6 +271,12 @@ public class ClusterMain {
logger.debug("replace local cluster rpc port with={}", clusterConfig.getClusterRpcPort());
}
+ if (commandLine.hasOption(OPTION_CLUSTER_RPC_IP)) {
+ IoTDBDescriptor.getInstance().getConfig()
+ .setRpcAddress(commandLine.getOptionValue(OPTION_CLUSTER_RPC_IP));
+ logger.debug("replace local cluster rpc port with={}", clusterConfig.getClusterRpcPort());
+ }
+
if (commandLine.hasOption(OPTION_SEED_NODES)) {
String seedNodeUrls = commandLine.getOptionValue(OPTION_SEED_NODES);
clusterConfig.setSeedNodeUrls(ClusterDescriptor.getSeedUrlList(seedNodeUrls));
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/DataClientProvider.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/DataClientProvider.java
index cb8a05c..28a8540 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/DataClientProvider.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/DataClientProvider.java
@@ -41,7 +41,7 @@ public class DataClientProvider {
if (!ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
dataSyncClientPool = new SyncClientPool(new SyncDataClient.FactorySync(factory));
} else {
- dataAsyncClientPool = new AsyncClientPool(new FactoryAsync(factory));
+ dataAsyncClientPool = new AsyncClientPool("dataClientProvider", new FactoryAsync(factory));
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java
index 9901840..fd91909 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java
@@ -54,18 +54,18 @@ public class AsyncClientPool {
private static final int MAX_ERROR_COUNT = 3;
private static final int PROBE_NODE_STATUS_PERIOD_SECOND = 60;
- public AsyncClientPool(AsyncClientFactory asyncClientFactory) {
- this(asyncClientFactory, true);
+ public AsyncClientPool(String namePredix, AsyncClientFactory asyncClientFactory) {
+ this(namePredix, asyncClientFactory, true);
}
- public AsyncClientPool(AsyncClientFactory asyncClientFactory, boolean blockOnError) {
+ public AsyncClientPool(String namePredix, AsyncClientFactory asyncClientFactory, boolean blockOnError) {
this.asyncClientFactory = asyncClientFactory;
this.maxConnectionForEachNode =
ClusterDescriptor.getInstance().getConfig().getMaxClientPerNodePerMember();
this.blockOnError = blockOnError;
if (blockOnError) {
this.cleanErrorClientExecutorService = new ScheduledThreadPoolExecutor(1,
- new BasicThreadFactory.Builder().namingPattern("clean-error-client-%d").daemon(true)
+ new BasicThreadFactory.Builder().namingPattern(namePredix + "-clean-error-client-%d").daemon(true)
.build());
this.cleanErrorClientExecutorService
.scheduleAtFixedRate(this::cleanErrorClients, PROBE_NODE_STATUS_PERIOD_SECOND,
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientPool.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientPool.java
index 535a815..1e538c8 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientPool.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientPool.java
@@ -44,7 +44,7 @@ public class SyncClientPool {
private Map<ClusterNode, Integer> nodeClientNumMap = new ConcurrentHashMap<>();
private SyncClientFactory syncClientFactory;
- public SyncClientPool(SyncClientFactory syncClientFactory) {
+ public SyncClientPool (SyncClientFactory syncClientFactory) {
this.syncClientFactory = syncClientFactory;
this.maxConnectionForEachNode =
ClusterDescriptor.getInstance().getConfig().getMaxClientPerNodePerMember();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index de200cf..8790d81 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -158,11 +158,11 @@ public class DataGroupMember extends RaftMember {
DataGroupMember(TProtocolFactory factory, PartitionGroup nodes, Node thisNode,
MetaGroupMember metaGroupMember) {
super("Data(" + nodes.getHeader().getIp() + ":" + nodes.getHeader().getMetaPort() + ")",
- new AsyncClientPool(new AsyncDataClient.FactoryAsync(factory)),
+ new AsyncClientPool(nodes.getHeader().getIp(), new AsyncDataClient.FactoryAsync(factory)),
new SyncClientPool(new SyncDataClient.FactorySync(factory)),
- new AsyncClientPool(new AsyncDataHeartbeatClient.FactoryAsync(factory), false),
+ new AsyncClientPool(nodes.getHeader().getIp(), new AsyncDataHeartbeatClient.FactoryAsync(factory), false),
new SyncClientPool(new SyncDataHeartbeatClient.FactorySync(factory)),
- new AsyncClientPool(new SingleManagerFactory(factory)));
+ new AsyncClientPool(nodes.getHeader().getIp(), new SingleManagerFactory(factory)));
this.thisNode = thisNode;
this.metaGroupMember = metaGroupMember;
allNodes = nodes;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index 863314f..85fed0d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -269,9 +269,9 @@ public class MetaGroupMember extends RaftMember {
}
public MetaGroupMember(TProtocolFactory factory, Node thisNode) throws QueryProcessException {
- super("Meta", new AsyncClientPool(new AsyncMetaClient.FactoryAsync(factory)),
+ super("Meta", new AsyncClientPool("Meta", new AsyncMetaClient.FactoryAsync(factory)),
new SyncClientPool(new SyncMetaClient.FactorySync(factory)),
- new AsyncClientPool(new AsyncMetaHeartbeatClient.FactoryAsync(factory), false),
+ new AsyncClientPool("Meta", new AsyncMetaHeartbeatClient.FactoryAsync(factory), false),
new SyncClientPool(new SyncMetaHeartbeatClient.FactorySync(factory)));
allNodes = new ArrayList<>();
initPeerMap();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterNode.java
index 0251641..1e5b15b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterNode.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterNode.java
@@ -28,11 +28,12 @@ import org.apache.iotdb.cluster.rpc.thrift.Node;
*/
public class ClusterNode extends Node {
+ //TODO hxd: maintain the client IP and Port
public ClusterNode() {
}
- public ClusterNode(String ip, int metaPort, int nodeIdentifier, int dataPort, int clientPort) {
- super(ip, metaPort, nodeIdentifier, dataPort, clientPort);
+ public ClusterNode(String ip, int metaPort, int nodeIdentifier, int dataPort) {
+ super(ip, metaPort, nodeIdentifier, dataPort);
}
public ClusterNode(Node other) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java
index 395ce03..b5b22de 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java
@@ -55,7 +55,7 @@ public class ClusterUtils {
public static final int WAIT_START_UP_CHECK_TIME_SEC = 5;
- public static final long START_UP_TIME_THRESHOLD_MS = 60 * 1000L;
+ public static final long START_UP_TIME_THRESHOLD_MS = 600 * 1000L;
public static final long START_UP_CHECK_TIME_INTERVAL_MS = 3 * 1000L;
@@ -281,8 +281,8 @@ public class ClusterUtils {
int metaPort = Integer.parseInt(str.substring(metaPortFirstPos, metaPortLastPos));
int id = Integer.parseInt(str.substring(idFirstPos, idLastPos));
int dataPort = Integer.parseInt(str.substring(dataPortFirstPos, dataPortLastPos));
- int clientPort = Integer.parseInt(str.substring(clientPortFirstPos, clientPortLastPos));
- return new Node(ip, metaPort, id, dataPort, clientPort);
+ //TODO hxd: we do not set values to all fields of a Node.
+ return new Node(ip, metaPort, id, dataPort);
}
public static Node parseNode(String nodeUrl) {
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncClientPoolTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncClientPoolTest.java
index 12eb645..9581903 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncClientPoolTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncClientPoolTest.java
@@ -53,7 +53,7 @@ public class AsyncClientPoolTest {
}
private void getClient() throws IOException {
- AsyncClientPool asyncClientPool = new AsyncClientPool(testAsyncClientFactory);
+ AsyncClientPool asyncClientPool = new AsyncClientPool("Test", testAsyncClientFactory);
for (int i = 0; i < 10; i++) {
AsyncClient client = asyncClientPool.getClient(TestUtils.getNode(i));
if (client instanceof TestAsyncClient) {
@@ -64,7 +64,7 @@ public class AsyncClientPoolTest {
}
private void putClient() throws IOException {
- AsyncClientPool asyncClientPool = new AsyncClientPool(testAsyncClientFactory);
+ AsyncClientPool asyncClientPool = new AsyncClientPool("Test", testAsyncClientFactory);
List<AsyncClient> testClients = new ArrayList<>();
for (int i = 0; i < 10; i++) {
AsyncClient client = asyncClientPool.getClient(TestUtils.getNode(i));
@@ -95,7 +95,7 @@ public class AsyncClientPoolTest {
int maxClientNum = ClusterDescriptor.getInstance().getConfig().getMaxClientPerNodePerMember();
ClusterDescriptor.getInstance().getConfig().setMaxClientPerNodePerMember(5);
testAsyncClientFactory = new TestAsyncClientFactory();
- AsyncClientPool asyncClientPool = new AsyncClientPool(testAsyncClientFactory);
+ AsyncClientPool asyncClientPool = new AsyncClientPool("Test", testAsyncClientFactory);
for (int i = 0; i < 5; i++) {
asyncClientPool.getClient(TestUtils.getNode(0));
@@ -121,7 +121,7 @@ public class AsyncClientPoolTest {
try {
ClusterDescriptor.getInstance().getConfig().setMaxClientPerNodePerMember(10);
testAsyncClientFactory = new TestAsyncClientFactory();
- AsyncClientPool asyncClientPool = new AsyncClientPool(testAsyncClientFactory);
+ AsyncClientPool asyncClientPool = new AsyncClientPool("Test", testAsyncClientFactory);
Node node = TestUtils.getNode(0);
List<AsyncClient> clients = new ArrayList<>();
@@ -160,7 +160,7 @@ public class AsyncClientPoolTest {
try {
ClusterDescriptor.getInstance().getConfig().setMaxClientPerNodePerMember(1);
testAsyncClientFactory = new TestAsyncClientFactory();
- AsyncClientPool asyncClientPool = new AsyncClientPool(testAsyncClientFactory);
+ AsyncClientPool asyncClientPool = new AsyncClientPool("Test", testAsyncClientFactory);
Node node = TestUtils.getNode(0);
List<AsyncClient> clients = new ArrayList<>();
@@ -177,7 +177,7 @@ public class AsyncClientPoolTest {
@Test
public void testRecreateClient() throws IOException {
testAsyncClientFactory = new TestAsyncClientFactory();
- AsyncClientPool asyncClientPool = new AsyncClientPool(new AsyncMetaClient.FactoryAsync(new Factory()));
+ AsyncClientPool asyncClientPool = new AsyncClientPool("Test", new AsyncMetaClient.FactoryAsync(new Factory()));
AsyncMetaClient client = (AsyncMetaClient) asyncClientPool.getClient(TestUtils.getNode(0));
client.onError(new Exception());
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncDataClientTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncDataClientTest.java
index 7030fc1..5f9afab 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncDataClientTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncDataClientTest.java
@@ -25,7 +25,7 @@ public class AsyncDataClientTest {
@Test
public void test() throws IOException, TException {
- AsyncClientPool asyncClientPool = new AsyncClientPool(new SingleManagerFactory(new Factory()));
+ AsyncClientPool asyncClientPool = new AsyncClientPool("Test", new SingleManagerFactory(new Factory()));
AsyncDataClient client;
Node node = TestUtils.getNode(0);
client = new AsyncDataClient(new Factory(), new TAsyncClientManager(),
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncMetaClientTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncMetaClientTest.java
index ffca8c5..5e795f0 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncMetaClientTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncMetaClientTest.java
@@ -25,7 +25,7 @@ public class AsyncMetaClientTest {
@Test
public void test() throws IOException, TException {
- AsyncClientPool asyncClientPool = new AsyncClientPool(new FactoryAsync(new Factory()));
+ AsyncClientPool asyncClientPool = new AsyncClientPool("Test", new FactoryAsync(new Factory()));
AsyncMetaClient client;
Node node = TestUtils.getNode(0);
client = new AsyncMetaClient(new Factory(), new TAsyncClientManager(),
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestPartitionedLogManager.java b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestPartitionedLogManager.java
index 790d048..7eb331b 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestPartitionedLogManager.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestPartitionedLogManager.java
@@ -29,7 +29,9 @@ import org.apache.iotdb.cluster.utils.Constants;
public class TestPartitionedLogManager extends PartitionedSnapshotLogManager {
public TestPartitionedLogManager() {
- super(new TestLogApplier(), null, new Node("localhost", 30001, 1, 40001, Constants.RPC_PORT), null, null,
+ super(new TestLogApplier(), null,
+ new Node("localhost", 30001, 1, 40001).setClientIp("localhost")
+ .setClientPort(Constants.RPC_PORT), null, null,
null);
}
@@ -37,9 +39,9 @@ public class TestPartitionedLogManager extends PartitionedSnapshotLogManager {
PartitionTable partitionTable,
Node header,
SnapshotFactory factory) {
- super(logApplier, partitionTable, header, new Node("localhost", 30001, 1, 40001, Constants.RPC_PORT),
- factory,
- null);
+ super(logApplier, partitionTable, header,
+ new Node("localhost", 30001, 1, 40001).setClientIp("localhost")
+ .setClientPort(Constants.RPC_PORT), factory, null);
}
@Override
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/HardStateTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/HardStateTest.java
index 7295c9a..46e8ce7 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/HardStateTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/HardStateTest.java
@@ -33,7 +33,8 @@ public class HardStateTest {
// Not NULL
HardState state = new HardState();
state.setCurrentTerm(2);
- state.setVoteFor(new Node("127.0.0.1", 30000, 0, 40000, Constants.RPC_PORT));
+ state.setVoteFor(new Node("127.0.0.1", 30000, 0, 40000)
+ .setClientIp("127.0.0.1").setClientPort(Constants.RPC_PORT));
ByteBuffer buffer = state.serialize();
HardState newState = HardState.deserialize(buffer);
assertEquals(state, newState);
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/MetaLogApplierTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/MetaLogApplierTest.java
index a0235c5..1437d6f 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/MetaLogApplierTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/MetaLogApplierTest.java
@@ -79,8 +79,10 @@ public class MetaLogApplierTest extends IoTDBTest {
public void testApplyAddNode()
throws QueryProcessException, StorageGroupNotSetException, StorageEngineException {
nodes.clear();
-
- Node node = new Node("localhost", 1111, 0, 2222, Constants.RPC_PORT);
+ //TODO hxd:
+ Node node = new Node("localhost", 1111, 0, 2222);
+ node.setClientIp("localhost");
+ node.setClientPort(Constants.RPC_PORT);
AddNodeLog log = new AddNodeLog();
log.setNewNode(node);
applier.apply(log);
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/logtypes/SerializeLogTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/logtypes/SerializeLogTest.java
index 3d66524..a3d7cde 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/logtypes/SerializeLogTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/logtypes/SerializeLogTest.java
@@ -32,7 +32,6 @@ import org.apache.iotdb.cluster.log.LogParser;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.utils.Constants;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
@@ -90,7 +89,8 @@ public class SerializeLogTest {
AddNodeLog log = new AddNodeLog();
log.setCurrLogIndex(2);
log.setCurrLogTerm(2);
- log.setNewNode(new Node("apache.iotdb.com", 1234, 1, 4321, Constants.RPC_PORT));
+ log.setNewNode(new Node("apache.iotdb.com", 1234, 1, 4321).setClientIp("apache.iotdb.com")
+ .setClientPort(Constants.RPC_PORT));
ByteBuffer byteBuffer = log.serialize();
Log logPrime = LogParser.getINSTANCE().parse(byteBuffer);
assertEquals(log, logPrime);
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/partition/SlotPartitionTableTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/partition/SlotPartitionTableTest.java
index b4a8a51..7846cdd 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/partition/SlotPartitionTableTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/partition/SlotPartitionTableTest.java
@@ -200,14 +200,14 @@ public class SlotPartitionTableTest {
private void assertGetHeaderGroup(int start, int last) {
PartitionGroup group = localTable
- .getHeaderGroup(new Node("localhost", 30000 + start, start, 40000 + start,
- Constants.RPC_PORT + start));
+ .getHeaderGroup(new Node("localhost", 30000 + start, start, 40000 + start)
+ .setClientIp("localhost").setClientPort(Constants.RPC_PORT + start));
assertEquals(replica_size, group.size());
- assertEquals(new Node("localhost", 30000 + start, start, 40000 + start,
- Constants.RPC_PORT + start), group.getHeader());
+ assertEquals(new Node("localhost", 30000 + start, start, 40000 + start)
+ .setClientIp("localhost").setClientPort(Constants.RPC_PORT + start), group.getHeader());
assertEquals(
- new Node("localhost", 30000 + last, last, 40000 + last,
- Constants.RPC_PORT + start), group.get(replica_size - 1));
+ new Node("localhost", 30000 + last, last, 40000 + last)
+ .setClientIp("localhost").setClientPort(Constants.RPC_PORT + start), group.get(replica_size - 1));
}
private void assertPartitionGroup(PartitionGroup group, int... nodeIds) {
@@ -507,7 +507,8 @@ public class SlotPartitionTableTest {
}
private Node getNode(int i) {
- return new Node("localhost", 30000 + i, i, 40000 + i, Constants.RPC_PORT + i);
+ return new Node("localhost", 30000 + i, i, 40000 + i)
+ .setClientIp("localhost").setClientPort(Constants.RPC_PORT + i);
}
@Test
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandlerTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandlerTest.java
index 7efc9b2..b75f8b2 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandlerTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandlerTest.java
@@ -73,7 +73,8 @@ public class HeartbeatHandlerTest {
HeartBeatResponse response = new HeartBeatResponse();
response.setTerm(Response.RESPONSE_AGREE);
response.setLastLogTerm(-2);
- response.setFollower(new Node("192.168.0.6", 9003, 6, 40010, Constants.RPC_PORT));
+ response.setFollower(new Node("192.168.0.6", 9003, 6, 40010).setClientIp("192.168.0.6")
+ .setClientPort(Constants.RPC_PORT));
catchUpFlag = false;
for (int i = 0; i < looseInconsistentNum; i++) {
handler.onComplete(response);
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
index aaf95ba..98f24f0 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
@@ -312,8 +312,10 @@ public class DataGroupMemberTest extends MemberTest {
testMetaMember.getTerm().set(10);
List<Log> metaLogs = TestUtils.prepareTestLogs(6);
metaLogManager.append(metaLogs);
- Node voteFor = new Node("127.0.0.1", 30000, 0, 40000, Constants.RPC_PORT);
- Node elector = new Node("127.0.0.1", 30001, 1, 40001, Constants.RPC_PORT + 1);
+ Node voteFor = new Node("127.0.0.1", 30000, 0, 40000).setClientIp("127.0.0.1")
+ .setClientPort(Constants.RPC_PORT);
+ Node elector = new Node("127.0.0.1", 30001, 1, 40001).setClientIp("127.0.0.1")
+ .setClientPort(Constants.RPC_PORT + 1);
// a request with smaller term
ElectionRequest electionRequest = new ElectionRequest();
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
index fdc517b..3d583b9 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
@@ -922,7 +922,8 @@ public class MetaGroupMemberTest extends MemberTest {
request.setLeaderCommit(0);
request.setPrevLogIndex(-1);
request.setPrevLogTerm(-1);
- request.setLeader(new Node("127.0.0.1", 30000, 0, 40000, Constants.RPC_PORT));
+ request.setLeader(new Node("127.0.0.1", 30000, 0, 40000).setClientIp("127.0.0.1")
+ .setClientPort(Constants.RPC_PORT));
AtomicReference<Long> result = new AtomicReference<>();
GenericHandler<Long> handler = new GenericHandler<>(TestUtils.getNode(0), result);
new MetaAsyncService(testMetaMember).appendEntry(request, handler);
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 1d669e8..8fd69dc 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -961,7 +961,7 @@ public class IoTDBConfig {
return rpcAddress;
}
- void setRpcAddress(String rpcAddress) {
+ public void setRpcAddress(String rpcAddress) {
this.rpcAddress = rpcAddress;
}