You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2020/12/10 18:01:51 UTC
[iotdb] branch change_rpc_port updated: add internal_ip;
move replacePropers to ClusterMain
This is an automated email from the ASF dual-hosted git repository.
hxd pushed a commit to branch change_rpc_port
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/change_rpc_port by this push:
new 70ca15a add internal_ip; move replacePropers to ClusterMain
70ca15a is described below
commit 70ca15ab53f18db69e9b74cda9a4c0b35d3a5c76
Author: xiangdong huang <sa...@gmail.com>
AuthorDate: Fri Dec 11 02:01:25 2020 +0800
add internal_ip; move replacePropers to ClusterMain
---
.../resources/conf/iotdb-cluster.properties | 2 +
.../java/org/apache/iotdb/cluster/ClusterMain.java | 148 +++++++++++++++------
.../apache/iotdb/cluster/config/ClusterConfig.java | 15 ++-
.../iotdb/cluster/config/ClusterDescriptor.java | 92 ++-----------
.../iotdb/cluster/server/DataClusterServer.java | 6 +-
.../iotdb/cluster/server/MetaClusterServer.java | 10 +-
.../apache/iotdb/cluster/server/RaftServer.java | 6 +-
.../cluster/server/heartbeat/HeartbeatServer.java | 2 +-
.../server/heartbeat/MetaHeartbeatServer.java | 5 +
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 2 +-
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 31 -----
.../java/org/apache/iotdb/db/service/IoTDB.java | 3 -
.../iotdb/db/service/thrift/ThriftService.java | 19 ++-
13 files changed, 175 insertions(+), 166 deletions(-)
diff --git a/cluster/src/assembly/resources/conf/iotdb-cluster.properties b/cluster/src/assembly/resources/conf/iotdb-cluster.properties
index 840aa15..f111a5f 100644
--- a/cluster/src/assembly/resources/conf/iotdb-cluster.properties
+++ b/cluster/src/assembly/resources/conf/iotdb-cluster.properties
@@ -27,6 +27,8 @@ cluster_rpc_ip=127.0.0.1
# heartbeat service. #
#-------------------------------------------IMPORTANT---------------------------------------------#
+internal_ip=127.0.0.1
+
# port for metadata service
internal_meta_port=9003
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java
index 90d491c..1f6dbf6 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java
@@ -23,6 +23,12 @@ import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
import org.apache.iotdb.cluster.client.async.AsyncMetaClient;
import org.apache.iotdb.cluster.client.sync.SyncClientAdaptor;
import org.apache.iotdb.cluster.config.ClusterConfig;
@@ -38,6 +44,7 @@ import org.apache.iotdb.cluster.utils.ClusterUtils;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.StartupException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.service.RPCService;
import org.apache.thrift.TException;
import org.apache.thrift.async.TAsyncClientManager;
import org.apache.thrift.protocol.TBinaryProtocol.Factory;
@@ -58,30 +65,40 @@ public class ClusterMain {
// send a request to remove a node, more arguments: ip-of-removed-node
// metaport-of-removed-node
private static final String MODE_REMOVE = "-r";
- // the separator between the cluster configuration and the single-server configuration
- private static final String SERVER_CONF_SEPARATOR = "-sc";
+
+ private static final String OPTION_INTERVAL_META_PORT = "internal_meta_port";
+ private static final String OPTION_INTERVAL_DATA_PORT = "internal_data_port";
+ private static final String OPTION_CLUSTER_RPC_PORT = "rpc_port";
+ private static final String OPTION_SEED_NODES = "seed_nodes";
+ private static final String OPTION_DEBUG_RPC_PORT = "debug_rpc_port";
+
private static MetaClusterServer metaServer;
public static void main(String[] args) {
if (args.length < 1) {
- logger.error("Usage: <-s|-a|-r> [-internal_meta_port <internal meta port>] "
- + "[-internal_data_port <internal data port>] "
- + "[-cluster_rpc_port <cluster rpc port>] "
- + "[-seed_nodes <node1:meta_port:data_port:cluster_rpc_port,"
+ logger.error("Usage: <-s|-a|-r> [-{} <internal meta port>] "
+ + "[-{} <internal data port>] "
+ + "[-{} <cluster rpc port>] "
+ + "[-{} <node1:meta_port:data_port:cluster_rpc_port,"
+ "node2:meta_port:data_port:cluster_rpc_port,"
+ "...,noden:meta_port:data_port:cluster_rpc_port,>] "
- + "[-sc] "
- + "[-rpc_port <rpc port>]\n"
- + "-s: start the node as a seed"
- + "-a: start the node as a new node"
- + "-r: remove the node out of the cluster"
- + "");
+ + "[-{} <rpc port>]\n"
+ + "-s: start the node as a seed\n"
+ + "-a: start the node as a new node\n"
+ + "-r: remove the node out of the cluster\n"
+ + "",
+ OPTION_INTERVAL_META_PORT,
+ OPTION_INTERVAL_DATA_PORT,
+ OPTION_CLUSTER_RPC_PORT,
+ OPTION_SEED_NODES,
+ OPTION_DEBUG_RPC_PORT
+ );
return;
}
String mode = args[0];
if (args.length > 1) {
String[] params = Arrays.copyOfRange(args, 1, args.length);
- replaceDefaultPrams(params);
+ replaceDefaultProps(params);
}
// params check
@@ -164,34 +181,6 @@ public class ClusterMain {
}
}
- private static void replaceDefaultPrams(String[] args) {
- int index;
- String[] clusterParams;
- String[] serverParams = null;
- for (index = 0; index < args.length; index++) {
- //find where -sc is
- if (SERVER_CONF_SEPARATOR.equals(args[index])) {
- break;
- }
- }
- //parameters from 0 to "-sc" are for clusters
- clusterParams = Arrays.copyOfRange(args, 0, index);
-
- if (index < args.length) {
- serverParams = Arrays.copyOfRange(args, index + 1, args.length);
- }
-
- if (clusterParams.length > 0) {
- // replace the cluster default conf params
- ClusterDescriptor.getInstance().replaceProps(clusterParams);
- }
-
- if (serverParams != null && serverParams.length > 0) {
- // replace the server default conf params
- IoTDBDescriptor.getInstance().replaceProps(serverParams);
- }
- }
-
/**
* check the configuration is legal or not
*/
@@ -222,6 +211,83 @@ public class ClusterMain {
return true;
}
+
+ private static void replaceDefaultProps(String[] params) {
+ Options options = new Options();
+
+ Option metaPort = new Option(OPTION_INTERVAL_META_PORT, OPTION_INTERVAL_META_PORT, true,
+ "port for metadata service");
+ metaPort.setRequired(false);
+ options.addOption(metaPort);
+
+ Option dataPort = new Option(OPTION_INTERVAL_DATA_PORT, OPTION_INTERVAL_DATA_PORT, true,
+ "port for data service");
+ dataPort.setRequired(false);
+ options.addOption(dataPort);
+
+ Option clusterRpcPort = new Option(OPTION_CLUSTER_RPC_PORT, OPTION_CLUSTER_RPC_PORT, true,
+ "port for client service");
+ clusterRpcPort.setRequired(false);
+ options.addOption(clusterRpcPort);
+
+ Option seedNodes = new Option(OPTION_SEED_NODES, OPTION_SEED_NODES, true,
+ "comma-separated {IP/DOMAIN}:meta_port:data_port:client_port pairs");
+ seedNodes.setRequired(false);
+ options.addOption(seedNodes);
+
+ Option debugRpcPort = new Option(OPTION_DEBUG_RPC_PORT, OPTION_DEBUG_RPC_PORT, true,
+ "port for debug client service (using single node mode)");
+ clusterRpcPort.setRequired(false);
+ options.addOption(debugRpcPort);
+
+ CommandLine commandLine = parseCommandLine(options, params);
+ if (commandLine == null) {
+ logger.error("replaces properties failed, use default conf params");
+ } else {
+ ClusterConfig clusterConfig = ClusterDescriptor.getInstance().getConfig();
+ if (commandLine.hasOption(OPTION_INTERVAL_META_PORT)) {
+ clusterConfig.setInternalMetaPort(Integer.parseInt(commandLine.getOptionValue(
+ OPTION_INTERVAL_META_PORT)));
+ logger.debug("replace local meta port with={}", clusterConfig.getInternalMetaPort());
+ }
+
+ if (commandLine.hasOption(OPTION_INTERVAL_DATA_PORT)) {
+ clusterConfig.setInternalDataPort(Integer.parseInt(commandLine.getOptionValue(
+ OPTION_INTERVAL_DATA_PORT)));
+ logger.debug("replace local data port with={}", clusterConfig.getInternalDataPort());
+ }
+
+ if (commandLine.hasOption(OPTION_CLUSTER_RPC_PORT)) {
+ clusterConfig.setClusterRpcPort(Integer.parseInt(commandLine.getOptionValue(
+ OPTION_CLUSTER_RPC_PORT)));
+ logger.debug("replace local cluster rpc port with={}", clusterConfig.getClusterRpcPort());
+ }
+
+ if (commandLine.hasOption(OPTION_SEED_NODES)) {
+ String seedNodeUrls = commandLine.getOptionValue(OPTION_SEED_NODES);
+ clusterConfig.setSeedNodeUrls(ClusterDescriptor.getSeedUrlList(seedNodeUrls));
+ logger.debug("replace seed nodes with={}", clusterConfig.getSeedNodeUrls());
+ }
+
+ if (commandLine.hasOption(OPTION_DEBUG_RPC_PORT)) {
+ IoTDBDescriptor.getInstance().getConfig().setRpcPort(Integer.parseInt(commandLine.getOptionValue(
+ OPTION_DEBUG_RPC_PORT)));
+ logger.debug("replace local cluster (single node) rpc port with={}", commandLine.getOptionValue(
+ OPTION_DEBUG_RPC_PORT));
+ }
+ }
+ }
+
+ private static CommandLine parseCommandLine(Options options, String[] params) {
+ try {
+ CommandLineParser parser = new DefaultParser();
+ return parser.parse(options, params);
+ } catch (ParseException e) {
+ logger.error("parse conf params failed", e);
+ return null;
+ }
+ }
+
private static void doRemoveNode(String[] args) throws IOException {
if (args.length != 3) {
logger.error("Usage: -r <ip> <metaPort>");
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
index a44703f..0055b9f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
@@ -27,6 +27,7 @@ public class ClusterConfig {
static final String CONFIG_NAME = "iotdb-cluster.properties";
private String clusterRpcIp = "127.0.0.1";
+ private String internalIp = "127.0.0.1";
private int internalMetaPort = 9003;
private int internalDataPort = 40010;
private int clusterRpcPort = 6667;
@@ -197,7 +198,7 @@ public class ClusterConfig {
return internalMetaPort;
}
- void setInternalMetaPort(int internalMetaPort) {
+ public void setInternalMetaPort(int internalMetaPort) {
this.internalMetaPort = internalMetaPort;
}
@@ -245,7 +246,7 @@ public class ClusterConfig {
return internalDataPort;
}
- void setInternalDataPort(int internalDataPort) {
+ public void setInternalDataPort(int internalDataPort) {
this.internalDataPort = internalDataPort;
}
@@ -253,7 +254,7 @@ public class ClusterConfig {
return clusterRpcPort;
}
- void setClusterRpcPort(int clusterRpcPort) {
+ public void setClusterRpcPort(int clusterRpcPort) {
this.clusterRpcPort = clusterRpcPort;
}
@@ -440,4 +441,12 @@ public class ClusterConfig {
public void setWaitForSlowNode(boolean waitForSlowNode) {
this.waitForSlowNode = waitForSlowNode;
}
+
+ public String getInternalIp() {
+ return internalIp;
+ }
+
+ public void setInternalIp(String internalIp) {
+ this.internalIp = internalIp;
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
index 3c9e8ec..f465a94 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
@@ -30,10 +30,8 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
-import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.DefaultParser;
-import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.iotdb.cluster.exception.BadSeedUrlFormatException;
@@ -47,14 +45,7 @@ public class ClusterDescriptor {
private static final Logger logger = LoggerFactory.getLogger(ClusterDescriptor.class);
private static final ClusterDescriptor INSTANCE = new ClusterDescriptor();
- private static final String OPTION_INTERVAL_META_PORT = "internal_meta_port";
- private static final String OPTION_INTERVAL_DATA_PORT = "internal_data_port";
- private static final String OPTION_CLUSTER_RPC_PORT = "cluster_rpc_port";
- private static final String OPTION_SEED_NODES = "seed_nodes";
-
-
private ClusterConfig config = new ClusterConfig();
- private static CommandLine commandLine;
private ClusterDescriptor() {
loadProps();
@@ -88,65 +79,16 @@ public class ClusterDescriptor {
return url;
}
- public void replaceProps(String[] params) {
- Options options = new Options();
-
- Option metaPort = new Option(OPTION_INTERVAL_META_PORT, OPTION_INTERVAL_META_PORT, true,
- "port for metadata service");
- metaPort.setRequired(false);
- options.addOption(metaPort);
-
- Option dataPort = new Option(OPTION_INTERVAL_DATA_PORT, OPTION_INTERVAL_DATA_PORT, true,
- "port for data service");
- dataPort.setRequired(false);
- options.addOption(dataPort);
-
- Option clusterRpcPort = new Option(OPTION_CLUSTER_RPC_PORT, OPTION_CLUSTER_RPC_PORT, true,
- "port for client service");
- clusterRpcPort.setRequired(false);
- options.addOption(clusterRpcPort);
-
- Option seedNodes = new Option(OPTION_SEED_NODES, OPTION_SEED_NODES, true,
- "comma-separated {IP/DOMAIN}:meta_port:data_port:client_port pairs");
- seedNodes.setRequired(false);
- options.addOption(seedNodes);
-
- boolean ok = parseCommandLine(options, params);
- if (!ok) {
- logger.error("replaces properties failed, use default conf params");
- } else {
- if (commandLine.hasOption(OPTION_INTERVAL_META_PORT)) {
- config.setInternalMetaPort(Integer.parseInt(commandLine.getOptionValue(
- OPTION_INTERVAL_META_PORT)));
- logger.debug("replace local meta port with={}", config.getInternalMetaPort());
- }
-
- if (commandLine.hasOption(OPTION_INTERVAL_DATA_PORT)) {
- config.setInternalDataPort(Integer.parseInt(commandLine.getOptionValue(
- OPTION_INTERVAL_DATA_PORT)));
- logger.debug("replace local data port with={}", config.getInternalDataPort());
- }
-
- if (commandLine.hasOption(OPTION_CLUSTER_RPC_PORT)) {
- config.setClusterRpcPort(Integer.parseInt(commandLine.getOptionValue(
- OPTION_CLUSTER_RPC_PORT)));
- logger.debug("replace local cluster rpc port with={}", config.getClusterRpcPort());
- }
-
- if (commandLine.hasOption(OPTION_SEED_NODES)) {
- String seedNodeUrls = commandLine.getOptionValue(OPTION_SEED_NODES);
- config.setSeedNodeUrls(getSeedUrlList(seedNodeUrls));
- logger.debug("replace seed nodes with={}", config.getSeedNodeUrls());
- }
- }
- }
-
public void replaceHostnameWithIp() throws UnknownHostException, BadSeedUrlFormatException {
boolean isInvalidClusterRpcIp = InetAddresses.isInetAddress(config.getClusterRpcIp());
if (!isInvalidClusterRpcIp) {
String clusterRpcIp = hostnameToIP(config.getClusterRpcIp());
config.setClusterRpcIp(clusterRpcIp);
}
+ boolean isInvalidClusterInternalIp = InetAddresses.isInetAddress(config.getInternalIp());
+ if (!isInvalidClusterInternalIp) {
+ config.setClusterRpcIp(hostnameToIP(config.getInternalIp()));
+ }
List<String> newSeedUrls = new ArrayList<>();
for (String seedUrl : config.getSeedNodeUrls()) {
@@ -164,21 +106,12 @@ public class ClusterDescriptor {
}
}
config.setSeedNodeUrls(newSeedUrls);
- logger.debug("after replace, the clusterRpcIP={}, seedUrls={}", config.getClusterRpcIp(),
+ logger.debug("after replace, the clusterRpcIP={}, internalIP={} seedUrls={}",
+ config.getClusterRpcIp(),
+ config.getInternalIp(),
config.getSeedNodeUrls());
}
- private static boolean parseCommandLine(Options options, String[] params) {
- try {
- CommandLineParser parser = new DefaultParser();
- commandLine = parser.parse(options, params);
- } catch (ParseException e) {
- logger.error("parse conf params failed", e);
- return false;
- }
- return true;
- }
-
/**
* load an property file and set TsfileDBConfig variables.
*/
@@ -195,14 +128,15 @@ public class ClusterDescriptor {
}
}
config.setClusterRpcIp(properties.getProperty("cluster_rpc_ip", config.getClusterRpcIp()));
+ config.setInternalIp(properties.getProperty("internal_ip", config.getInternalIp()));
- config.setInternalMetaPort(Integer.parseInt(properties.getProperty(OPTION_INTERVAL_META_PORT,
+ config.setInternalMetaPort(Integer.parseInt(properties.getProperty("internal_meta_port",
String.valueOf(config.getInternalMetaPort()))));
- config.setInternalDataPort(Integer.parseInt(properties.getProperty(OPTION_INTERVAL_DATA_PORT,
+ config.setInternalDataPort(Integer.parseInt(properties.getProperty("internal_data_port",
Integer.toString(config.getInternalDataPort()))));
- config.setClusterRpcPort(Integer.parseInt(properties.getProperty(OPTION_CLUSTER_RPC_PORT,
+ config.setClusterRpcPort(Integer.parseInt(properties.getProperty("rpc_port",
Integer.toString(config.getClusterRpcPort()))));
config.setMaxConcurrentClientNum(Integer.parseInt(properties.getProperty(
@@ -297,14 +231,14 @@ public class ClusterDescriptor {
config.setConsistencyLevel(ConsistencyLevel.getConsistencyLevel(consistencyLevel));
}
- String seedUrls = properties.getProperty(OPTION_SEED_NODES);
+ String seedUrls = properties.getProperty("seed_nodes");
if (seedUrls != null) {
List<String> urlList = getSeedUrlList(seedUrls);
config.setSeedNodeUrls(urlList);
}
}
- private List<String> getSeedUrlList(String seedUrls) {
+ public static List<String> getSeedUrlList(String seedUrls) {
if (seedUrls == null) {
return Collections.emptyList();
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
index 391cf69..44fa503 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
@@ -460,11 +460,13 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
@Override
TServerTransport getServerSocket() throws TTransportException {
+ logger.info("[{}] Cluster node will listen {}:{}", getServerClientName(), config.getInternalIp(),
+ config.getInternalDataPort());
if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
- return new TNonblockingServerSocket(new InetSocketAddress(config.getClusterRpcIp(),
+ return new TNonblockingServerSocket(new InetSocketAddress(config.getInternalIp(),
thisNode.getDataPort()), getConnectionTimeoutInMS());
} else {
- return new TServerSocket(new InetSocketAddress(config.getClusterRpcIp(),
+ return new TServerSocket(new InetSocketAddress(config.getInternalIp(),
thisNode.getDataPort()));
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
index a0fb04d..8f6fe6e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
@@ -48,6 +48,7 @@ import org.apache.iotdb.cluster.utils.nodetool.ClusterMonitor;
import org.apache.iotdb.db.exception.StartupException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.db.service.RPCService;
import org.apache.iotdb.db.service.RegisterManager;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.apache.thrift.TException;
@@ -57,6 +58,8 @@ import org.apache.thrift.transport.TNonblockingServerSocket;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TServerTransport;
import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* MetaCluster manages the whole cluster's metadata, such as what nodes are in the cluster and the
@@ -65,6 +68,7 @@ import org.apache.thrift.transport.TTransportException;
*/
public class MetaClusterServer extends RaftServer implements TSMetaService.AsyncIface,
TSMetaService.Iface {
+ private static Logger logger = LoggerFactory.getLogger(MetaClusterServer.class);
// each node only contains one MetaGroupMember
private MetaGroupMember member;
@@ -144,11 +148,13 @@ public class MetaClusterServer extends RaftServer implements TSMetaService.Async
*/
@Override
TServerTransport getServerSocket() throws TTransportException {
+ logger.info("[{}] Cluster node will listen {}:{}", getServerClientName(), config.getInternalIp(),
+ config.getInternalMetaPort());
if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
- return new TNonblockingServerSocket(new InetSocketAddress(config.getClusterRpcIp(),
+ return new TNonblockingServerSocket(new InetSocketAddress(config.getInternalIp(),
config.getInternalMetaPort()), getConnectionTimeoutInMS());
} else {
- return new TServerSocket(new InetSocketAddress(config.getClusterRpcIp(),
+ return new TServerSocket(new InetSocketAddress(config.getInternalIp(),
config.getInternalMetaPort()));
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/RaftServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/RaftServer.java
index fe0cc63..8c90a6c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/RaftServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/RaftServer.java
@@ -215,7 +215,8 @@ public abstract class RaftServer implements RaftService.AsyncIface, RaftService.
}
private void establishServer() throws TTransportException {
- logger.info("Cluster node {} begins to set up", thisNode);
+ logger.info("[{}] Cluster node {} begins to set up with {} mode", getServerClientName(), thisNode,
+ ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()? "Async" : "Sync");
if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
poolServer = createAsyncServer();
@@ -224,9 +225,10 @@ public abstract class RaftServer implements RaftService.AsyncIface, RaftService.
}
clientService = Executors.newSingleThreadExecutor(r -> new Thread(r, getServerClientName()));
+
clientService.submit(() -> poolServer.serve());
- logger.info("Cluster node {} is up", thisNode);
+ logger.info("[{}] Cluster node {} is up", getServerClientName(), thisNode);
}
@TestOnly
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatServer.java
index e236e1c..ce59c0b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatServer.java
@@ -207,6 +207,6 @@ public abstract class HeartbeatServer {
.newSingleThreadExecutor(r -> new Thread(r, getServerClientName()));
heartbeatClientService.submit(() -> heartbeatPoolServer.serve());
- logger.info("Cluster node's heartbeat {} is up", thisNode);
+ logger.info("[{}] Cluster node's heartbeat {} is up", getServerClientName(), thisNode);
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatServer.java
index a46e213..43a54ad 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatServer.java
@@ -32,8 +32,11 @@ import org.apache.thrift.transport.TNonblockingServerSocket;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TServerTransport;
import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class MetaHeartbeatServer extends HeartbeatServer {
+ private static Logger logger = LoggerFactory.getLogger(MetaHeartbeatServer.class);
private MetaClusterServer metaClusterServer;
@@ -61,6 +64,8 @@ public class MetaHeartbeatServer extends HeartbeatServer {
@Override
TServerTransport getHeartbeatServerSocket() throws TTransportException {
+ logger.info("[{}] Cluster node will listen {}:{}", getServerClientName(), config.getInternalIp(),
+ config.getInternalMetaPort() + ClusterUtils.META_HEARTBEAT_PORT_OFFSET);
if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
return new TNonblockingServerSocket(new InetSocketAddress(config.getClusterRpcIp(),
config.getInternalMetaPort() + ClusterUtils.META_HEARTBEAT_PORT_OFFSET),
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index b620ab9..1d669e8 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -969,7 +969,7 @@ public class IoTDBConfig {
return rpcPort;
}
- void setRpcPort(int rpcPort) {
+ public void setRpcPort(int rpcPort) {
this.rpcPort = rpcPort;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 2a4827e..63ddbac 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -46,7 +46,6 @@ public class IoTDBDescriptor {
private static final Logger logger = LoggerFactory.getLogger(IoTDBDescriptor.class);
private IoTDBConfig conf = new IoTDBConfig();
- private static CommandLine commandLine;
protected IoTDBDescriptor() {
loadProps();
@@ -60,36 +59,6 @@ public class IoTDBDescriptor {
return conf;
}
- public void replaceProps(String[] params) {
- Options options = new Options();
- final String RPC_PORT = "rpc_port";
- Option rpcPort = new Option(RPC_PORT, RPC_PORT, true,
- "The jdbc service listens on the port");
- rpcPort.setRequired(false);
- options.addOption(rpcPort);
-
- boolean ok = parseCommandLine(options, params);
- if (!ok) {
- logger.error("replaces properties failed, use default conf params");
- } else {
- if (commandLine.hasOption(RPC_PORT)) {
- conf.setRpcPort(Integer.parseInt(commandLine.getOptionValue(RPC_PORT)));
- logger.debug("replace rpc port with={}", conf.getRpcPort());
- }
- }
- }
-
- private boolean parseCommandLine(Options options, String[] params) {
- try {
- CommandLineParser parser = new DefaultParser();
- commandLine = parser.parse(options, params);
- } catch (ParseException e) {
- logger.error("parse conf params failed, {}", e.toString());
- return false;
- }
- return true;
- }
-
public URL getPropsUrl() {
// Check if a config-directory was specified first.
String urlString = System.getProperty(IoTDBConstant.IOTDB_CONF, null);
diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
index 51be619..c6c2535 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
@@ -54,9 +54,6 @@ public class IoTDB implements IoTDBMBean {
}
public static void main(String[] args) {
- if (args.length > 0) {
- IoTDBDescriptor.getInstance().replaceProps(args);
- }
try {
IoTDBConfigCheck.getInstance().checkConfig();
} catch (IOException e) {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/ThriftService.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/ThriftService.java
index 04384d4..92963c0 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/ThriftService.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/ThriftService.java
@@ -43,6 +43,11 @@ public abstract class ThriftService implements IService {
private CountDownLatch stopLatch;
+ //whether enable the service. By default it is true.
+ //If we just want to create an IoTDB instance without RPC service, set this false before call
+ //IoTDB.active()
+ private boolean enable = true;
+
public String getRPCServiceStatus() {
if (thriftServiceThread == null) {
logger.debug("Start latch is null when getting status");
@@ -91,6 +96,11 @@ public abstract class ThriftService implements IService {
@SuppressWarnings("squid:S2276")
public void startService() throws StartupException {
+ if (!enable) {
+ logger.info("{}: {} is disabled", IoTDBConstant.GLOBAL_DB_NAME,
+ this.getID().getName());
+ return ;
+ }
if (STATUS_UP.equals(getRPCServiceStatus())) {
logger.info("{}: {} has been already running now", IoTDBConstant.GLOBAL_DB_NAME,
this.getID().getName());
@@ -130,7 +140,7 @@ public abstract class ThriftService implements IService {
}
public void stopService() {
- if (STATUS_DOWN.equals(getRPCServiceStatus())) {
+ if (!enable || STATUS_DOWN.equals(getRPCServiceStatus())) {
logger.info("{}: {} isn't running now", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName());
return;
}
@@ -148,4 +158,11 @@ public abstract class ThriftService implements IService {
}
}
+ public boolean isEnable() {
+ return enable;
+ }
+
+ public void setEnable(boolean enable) {
+ this.enable = enable;
+ }
}