You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/04/09 02:11:52 UTC
[incubator-iotdb] 01/03: start API service at last
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch fix_upgrade_master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 2ba311deace6f8b43cfd2f1b6c102e3d0bcc3d52
Author: qiaojialin <64...@qq.com>
AuthorDate: Thu Apr 9 10:10:34 2020 +0800
start API service at last
---
.../main/java/org/apache/iotdb/SessionExample.java | 2 +-
.../org/apache/iotdb/db/concurrent/ThreadName.java | 4 +--
.../java/org/apache/iotdb/db/service/IoTDB.java | 29 +++++++++------
.../service/{JDBCService.java => RPCService.java} | 42 +++++++++++-----------
...{JDBCServiceMBean.java => RPCServiceMBean.java} | 4 +--
.../org/apache/iotdb/db/service/ServiceType.java | 2 +-
6 files changed, 45 insertions(+), 38 deletions(-)
diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
index 70b5411..9da6413 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
@@ -44,7 +44,7 @@ public class SessionExample {
session = new Session("127.0.0.1", 6667, "root", "root");
session.open(false);
- session.setStorageGroup("root.sg1");
+// session.setStorageGroup("root.sg1");
if (session.checkTimeseriesExists("root.sg1.d1.s1")) {
session.createTimeseries("root.sg1.d1.s1", TSDataType.INT64, TSEncoding.RLE,
CompressionType.SNAPPY);
diff --git a/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java b/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
index 926b554..1b56e02 100644
--- a/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
+++ b/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
@@ -20,8 +20,8 @@ package org.apache.iotdb.db.concurrent;
public enum ThreadName {
METRICS_SERVICE("Metrics-ServerServiceImpl"),
- JDBC_SERVICE("JDBC-ServerServiceImpl"),
- JDBC_CLIENT("JDBC-Client"),
+ RPC_SERVICE("RPC-ServerServiceImpl"),
+ RPC_CLIENT("RPC-Client"),
MERGE_SERVICE("Merge-ServerServiceImpl"),
CLOSE_MERGE_SERVICE("Close-Merge-ServerServiceImpl"),
CLOSE_MERGE_DAEMON("Close-Merge-Daemon-Thread"),
diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
index a0cbd48..7275d21 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
@@ -66,7 +66,9 @@ public class IoTDB implements IoTDBMBean {
return;
}
try {
- setUp();
+ setUpServerService();
+ setUpAPIService();
+ logger.info("IoTDB is set up.");
} catch (StartupException e) {
logger.error("meet error while starting up.", e);
deactivate();
@@ -76,17 +78,18 @@ public class IoTDB implements IoTDBMBean {
logger.info("{} has started.", IoTDBConstant.GLOBAL_DB_NAME);
}
- private void setUp() throws StartupException {
+
+ private void setUpServerService() throws StartupException {
logger.info("Setting up IoTDB...");
Runtime.getRuntime().addShutdownHook(new IoTDBShutdownHook());
setUncaughtExceptionHandler();
initMManager();
+ registerManager.register(StorageEngine.getInstance());
registerManager.register(JMXService.getInstance());
registerManager.register(FlushManager.getInstance());
registerManager.register(MultiFileLogNodeManager.getInstance());
- registerManager.register(JDBCService.getInstance());
registerManager.register(Monitor.getInstance());
registerManager.register(StatMonitor.getInstance());
registerManager.register(Measurement.INSTANCE);
@@ -96,14 +99,8 @@ public class IoTDB implements IoTDBMBean {
registerManager.register(UpgradeSevice.getINSTANCE());
registerManager.register(MergeManager.getINSTANCE());
registerManager.register(CacheHitRatioMonitor.getInstance());
- if (IoTDBDescriptor.getInstance().getConfig().isEnableMetricService()) {
- registerManager.register(MetricsService.getInstance());
- }
- if (IoTDBDescriptor.getInstance().getConfig().isEnableMQTTService()) {
- registerManager.register(MQTTService.getInstance());
- }
JMXService.registerMBean(getInstance(), mbeanName);
- registerManager.register(StorageEngine.getInstance());
+
// When registering statMonitor, we should start recovering some statistics
// with latest values stored
@@ -111,10 +108,20 @@ public class IoTDB implements IoTDBMBean {
if (IoTDBDescriptor.getInstance().getConfig().isEnableStatMonitor()) {
StatMonitor.getInstance().recovery();
}
+ }
- logger.info("IoTDB is set up.");
+ private void setUpAPIService() throws StartupException {
+ // start api services at last
+ if (IoTDBDescriptor.getInstance().getConfig().isEnableMetricService()) {
+ registerManager.register(MetricsService.getInstance());
+ }
+ if (IoTDBDescriptor.getInstance().getConfig().isEnableMQTTService()) {
+ registerManager.register(MQTTService.getInstance());
+ }
+ registerManager.register(RPCService.getInstance());
}
+
private void deactivate() {
logger.info("Deactivating IoTDB...");
registerManager.deregisterAll();
diff --git a/server/src/main/java/org/apache/iotdb/db/service/JDBCService.java b/server/src/main/java/org/apache/iotdb/db/service/RPCService.java
similarity index 88%
rename from server/src/main/java/org/apache/iotdb/db/service/JDBCService.java
rename to server/src/main/java/org/apache/iotdb/db/service/RPCService.java
index e4ae5ef..36a6737 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/JDBCService.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/RPCService.java
@@ -43,15 +43,15 @@ import org.slf4j.LoggerFactory;
/**
* A service to handle jdbc request from client.
*/
-public class JDBCService implements JDBCServiceMBean, IService {
+public class RPCService implements RPCServiceMBean, IService {
- private static final Logger logger = LoggerFactory.getLogger(JDBCService.class);
+ private static final Logger logger = LoggerFactory.getLogger(RPCService.class);
private static final String STATUS_UP = "UP";
private static final String STATUS_DOWN = "DOWN";
private final String mbeanName = String
.format("%s:%s=%s", IoTDBConstant.IOTDB_PACKAGE, IoTDBConstant.JMX_TYPE,
getID().getJmxName());
- private Thread jdbcServiceThread;
+ private Thread rpcServiceThread;
private TProtocolFactory protocolFactory;
private Processor<TSIService.Iface> processor;
private TThreadPoolServer.Args poolArgs;
@@ -59,15 +59,15 @@ public class JDBCService implements JDBCServiceMBean, IService {
private CountDownLatch startLatch;
private CountDownLatch stopLatch;
- private JDBCService() {
+ private RPCService() {
}
- public static final JDBCService getInstance() {
- return JDBCServiceHolder.INSTANCE;
+ public static final RPCService getInstance() {
+ return RPCServiceHolder.INSTANCE;
}
@Override
- public String getJDBCServiceStatus() {
+ public String getRPCServiceStatus() {
// TODO debug log, will be deleted in production env
if(startLatch == null) {
logger.info("Start latch is null when getting status");
@@ -113,12 +113,12 @@ public class JDBCService implements JDBCServiceMBean, IService {
@Override
public ServiceType getID() {
- return ServiceType.JDBC_SERVICE;
+ return ServiceType.RPC_SERVICE;
}
@Override
public synchronized void startService() throws StartupException {
- if (STATUS_UP.equals(getJDBCServiceStatus())) {
+ if (STATUS_UP.equals(getRPCServiceStatus())) {
logger.info("{}: {} has been already running now", IoTDBConstant.GLOBAL_DB_NAME,
this.getID().getName());
return;
@@ -126,9 +126,9 @@ public class JDBCService implements JDBCServiceMBean, IService {
logger.info("{}: start {}...", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName());
try {
reset();
- jdbcServiceThread = new JDBCServiceThread(startLatch, stopLatch);
- jdbcServiceThread.setName(ThreadName.JDBC_SERVICE.getName());
- jdbcServiceThread.start();
+ rpcServiceThread = new RPCServiceThread(startLatch, stopLatch);
+ rpcServiceThread.setName(ThreadName.RPC_SERVICE.getName());
+ rpcServiceThread.start();
startLatch.await();
} catch (InterruptedException | ClassNotFoundException |
IllegalAccessException | InstantiationException e) {
@@ -154,13 +154,13 @@ public class JDBCService implements JDBCServiceMBean, IService {
@Override
public synchronized void stopService() {
- if (STATUS_DOWN.equals(getJDBCServiceStatus())) {
+ if (STATUS_DOWN.equals(getRPCServiceStatus())) {
logger.info("{}: {} isn't running now", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName());
return;
}
logger.info("{}: closing {}...", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName());
- if (jdbcServiceThread != null) {
- ((JDBCServiceThread) jdbcServiceThread).close();
+ if (rpcServiceThread != null) {
+ ((RPCServiceThread) rpcServiceThread).close();
}
try {
stopLatch.await();
@@ -172,22 +172,22 @@ public class JDBCService implements JDBCServiceMBean, IService {
}
}
- private static class JDBCServiceHolder {
+ private static class RPCServiceHolder {
- private static final JDBCService INSTANCE = new JDBCService();
+ private static final RPCService INSTANCE = new RPCService();
- private JDBCServiceHolder() {
+ private RPCServiceHolder() {
}
}
- private class JDBCServiceThread extends Thread {
+ private class RPCServiceThread extends Thread {
private TServerSocket serverTransport;
private TServer poolServer;
private CountDownLatch threadStartLatch;
private CountDownLatch threadStopLatch;
- public JDBCServiceThread(CountDownLatch threadStartLatch, CountDownLatch threadStopLatch)
+ public RPCServiceThread(CountDownLatch threadStartLatch, CountDownLatch threadStopLatch)
throws ClassNotFoundException, IllegalAccessException, InstantiationException {
if(IoTDBDescriptor.getInstance().getConfig().isRpcThriftCompressionEnable()) {
protocolFactory = new TCompactProtocol.Factory();
@@ -214,7 +214,7 @@ public class JDBCService implements JDBCServiceMBean, IService {
.stopTimeoutVal(
IoTDBDescriptor.getInstance().getConfig().getThriftServerAwaitTimeForStopService());
poolArgs.executorService = IoTDBThreadPoolFactory.createThriftRpcClientThreadPool(poolArgs,
- ThreadName.JDBC_CLIENT.getName());
+ ThreadName.RPC_CLIENT.getName());
poolArgs.processor(processor);
poolArgs.protocolFactory(protocolFactory);
poolServer = new TThreadPoolServer(poolArgs);
diff --git a/server/src/main/java/org/apache/iotdb/db/service/JDBCServiceMBean.java b/server/src/main/java/org/apache/iotdb/db/service/RPCServiceMBean.java
similarity index 93%
rename from server/src/main/java/org/apache/iotdb/db/service/JDBCServiceMBean.java
rename to server/src/main/java/org/apache/iotdb/db/service/RPCServiceMBean.java
index 4a5fee0..ef8d023 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/JDBCServiceMBean.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/RPCServiceMBean.java
@@ -20,9 +20,9 @@ package org.apache.iotdb.db.service;
import org.apache.iotdb.db.exception.StartupException;
-public interface JDBCServiceMBean {
+public interface RPCServiceMBean {
- String getJDBCServiceStatus();
+ String getRPCServiceStatus();
int getRPCPort();
diff --git a/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java b/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
index 256cccb..e1498bc 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
@@ -25,7 +25,7 @@ public enum ServiceType {
STORAGE_ENGINE_SERVICE("Storage Engine ServerService", ""),
JMX_SERVICE("JMX ServerService", "JMX ServerService"),
METRICS_SERVICE("Metrics ServerService","MetricsService"),
- JDBC_SERVICE("JDBC ServerService", "JDBCService"),
+ RPC_SERVICE("RPC ServerService", "RPCService"),
MQTT_SERVICE("MQTTService", ""),
MONITOR_SERVICE("Monitor ServerService", "Monitor"),
STAT_MONITOR_SERVICE("Statistics ServerService", ""),