You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/04/09 13:52:24 UTC
[incubator-iotdb] branch master updated: [IOTDB-583] Start API
services at last (#1013)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new fe62499 [IOTDB-583] Start API services at last (#1013)
fe62499 is described below
commit fe6249974b1718c6329cc09928cafbd48a017599
Author: Jialin Qiao <qj...@mails.tsinghua.edu.cn>
AuthorDate: Thu Apr 9 21:52:14 2020 +0800
[IOTDB-583] Start API services at last (#1013)
* start API service at last
---
.../org/apache/iotdb/db/concurrent/ThreadName.java | 4 +-
.../engine/storagegroup/StorageGroupProcessor.java | 8 +--
...viceException.java => RPCServiceException.java} | 6 +-
.../java/org/apache/iotdb/db/service/IoTDB.java | 14 ++--
.../service/{JDBCService.java => RPCService.java} | 78 ++++++++++------------
...entHandler.java => RPCServiceEventHandler.java} | 7 +-
...{JDBCServiceMBean.java => RPCServiceMBean.java} | 4 +-
.../org/apache/iotdb/db/service/ServiceType.java | 2 +-
.../iotdb/db/sync/receiver/SyncServerManager.java | 1 -
9 files changed, 55 insertions(+), 69 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java b/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
index 926b554..1b56e02 100644
--- a/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
+++ b/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
@@ -20,8 +20,8 @@ package org.apache.iotdb.db.concurrent;
public enum ThreadName {
METRICS_SERVICE("Metrics-ServerServiceImpl"),
- JDBC_SERVICE("JDBC-ServerServiceImpl"),
- JDBC_CLIENT("JDBC-Client"),
+ RPC_SERVICE("RPC-ServerServiceImpl"),
+ RPC_CLIENT("RPC-Client"),
MERGE_SERVICE("Merge-ServerServiceImpl"),
CLOSE_MERGE_SERVICE("Close-Merge-ServerServiceImpl"),
CLOSE_MERGE_DAEMON("Close-Merge-Daemon-Thread"),
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 1074e6c..d3afe72 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -1296,12 +1296,12 @@ public class StorageGroupProcessor {
*/
public int countUpgradeFiles() {
int cntUpgradeFileNum = 0;
- for (TsFileResource seqTsFileResource : new ArrayList<>(sequenceFileTreeSet)) {
+ for (TsFileResource seqTsFileResource : sequenceFileTreeSet) {
if (UpgradeUtils.isNeedUpgrade(seqTsFileResource)) {
cntUpgradeFileNum += 1;
}
}
- for (TsFileResource unseqTsFileResource : new ArrayList<>(unSequenceFileList)) {
+ for (TsFileResource unseqTsFileResource : unSequenceFileList) {
if (UpgradeUtils.isNeedUpgrade(unseqTsFileResource)) {
cntUpgradeFileNum += 1;
}
@@ -1310,10 +1310,10 @@ public class StorageGroupProcessor {
}
public void upgrade() {
- for (TsFileResource seqTsFileResource : new ArrayList<>(sequenceFileTreeSet)) {
+ for (TsFileResource seqTsFileResource : sequenceFileTreeSet) {
seqTsFileResource.doUpgrade();
}
- for (TsFileResource unseqTsFileResource : new ArrayList<>(unSequenceFileList)) {
+ for (TsFileResource unseqTsFileResource : unSequenceFileList) {
unseqTsFileResource.doUpgrade();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/exception/runtime/JDBCServiceException.java b/server/src/main/java/org/apache/iotdb/db/exception/runtime/RPCServiceException.java
similarity index 85%
rename from server/src/main/java/org/apache/iotdb/db/exception/runtime/JDBCServiceException.java
rename to server/src/main/java/org/apache/iotdb/db/exception/runtime/RPCServiceException.java
index 1664964..c3acee3 100644
--- a/server/src/main/java/org/apache/iotdb/db/exception/runtime/JDBCServiceException.java
+++ b/server/src/main/java/org/apache/iotdb/db/exception/runtime/RPCServiceException.java
@@ -18,15 +18,15 @@
*/
package org.apache.iotdb.db.exception.runtime;
-public class JDBCServiceException extends RuntimeException{
+public class RPCServiceException extends RuntimeException{
private static final long serialVersionUID = 520836932066897810L;
- public JDBCServiceException(String message) {
+ public RPCServiceException(String message) {
super(message);
}
- public JDBCServiceException(String message, Throwable e) {
+ public RPCServiceException(String message, Throwable e) {
super(message + e.getMessage());
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
index a0cbd48..022bfbc 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
@@ -86,7 +86,6 @@ public class IoTDB implements IoTDBMBean {
registerManager.register(JMXService.getInstance());
registerManager.register(FlushManager.getInstance());
registerManager.register(MultiFileLogNodeManager.getInstance());
- registerManager.register(JDBCService.getInstance());
registerManager.register(Monitor.getInstance());
registerManager.register(StatMonitor.getInstance());
registerManager.register(Measurement.INSTANCE);
@@ -96,12 +95,6 @@ public class IoTDB implements IoTDBMBean {
registerManager.register(UpgradeSevice.getINSTANCE());
registerManager.register(MergeManager.getINSTANCE());
registerManager.register(CacheHitRatioMonitor.getInstance());
- if (IoTDBDescriptor.getInstance().getConfig().isEnableMetricService()) {
- registerManager.register(MetricsService.getInstance());
- }
- if (IoTDBDescriptor.getInstance().getConfig().isEnableMQTTService()) {
- registerManager.register(MQTTService.getInstance());
- }
JMXService.registerMBean(getInstance(), mbeanName);
registerManager.register(StorageEngine.getInstance());
@@ -112,6 +105,13 @@ public class IoTDB implements IoTDBMBean {
StatMonitor.getInstance().recovery();
}
+ registerManager.register(RPCService.getInstance());
+ if (IoTDBDescriptor.getInstance().getConfig().isEnableMetricService()) {
+ registerManager.register(MetricsService.getInstance());
+ }
+ if (IoTDBDescriptor.getInstance().getConfig().isEnableMQTTService()) {
+ registerManager.register(MQTTService.getInstance());
+ }
logger.info("IoTDB is set up.");
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/JDBCService.java b/server/src/main/java/org/apache/iotdb/db/service/RPCService.java
similarity index 73%
rename from server/src/main/java/org/apache/iotdb/db/service/JDBCService.java
rename to server/src/main/java/org/apache/iotdb/db/service/RPCService.java
index e4ae5ef..a1aeaa5 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/JDBCService.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/RPCService.java
@@ -26,7 +26,7 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.StartupException;
-import org.apache.iotdb.db.exception.runtime.JDBCServiceException;
+import org.apache.iotdb.db.exception.runtime.RPCServiceException;
import org.apache.iotdb.service.rpc.thrift.TSIService;
import org.apache.iotdb.service.rpc.thrift.TSIService.Processor;
import org.apache.thrift.protocol.TBinaryProtocol;
@@ -43,15 +43,15 @@ import org.slf4j.LoggerFactory;
/**
* A service to handle jdbc request from client.
*/
-public class JDBCService implements JDBCServiceMBean, IService {
+public class RPCService implements RPCServiceMBean, IService {
- private static final Logger logger = LoggerFactory.getLogger(JDBCService.class);
+ private static final Logger logger = LoggerFactory.getLogger(RPCService.class);
private static final String STATUS_UP = "UP";
private static final String STATUS_DOWN = "DOWN";
private final String mbeanName = String
.format("%s:%s=%s", IoTDBConstant.IOTDB_PACKAGE, IoTDBConstant.JMX_TYPE,
getID().getJmxName());
- private Thread jdbcServiceThread;
+ private Thread rpcServiceThread;
private TProtocolFactory protocolFactory;
private Processor<TSIService.Iface> processor;
private TThreadPoolServer.Args poolArgs;
@@ -59,27 +59,25 @@ public class JDBCService implements JDBCServiceMBean, IService {
private CountDownLatch startLatch;
private CountDownLatch stopLatch;
- private JDBCService() {
+ private RPCService() {
}
- public static final JDBCService getInstance() {
- return JDBCServiceHolder.INSTANCE;
+ public static final RPCService getInstance() {
+ return RPCServiceHolder.INSTANCE;
}
@Override
- public String getJDBCServiceStatus() {
- // TODO debug log, will be deleted in production env
+ public String getRPCServiceStatus() {
if(startLatch == null) {
- logger.info("Start latch is null when getting status");
+ logger.debug("Start latch is null when getting status");
} else {
- logger.info("Start latch is {} when getting status", startLatch.getCount());
+ logger.debug("Start latch is {} when getting status", startLatch.getCount());
}
if(stopLatch == null) {
- logger.info("Stop latch is null when getting status");
+ logger.debug("Stop latch is null when getting status");
} else {
- logger.info("Stop latch is {} when getting status", stopLatch.getCount());
+ logger.debug("Stop latch is {} when getting status", stopLatch.getCount());
}
- // debug log, will be deleted in production env
if(startLatch != null && startLatch.getCount() == 0) {
return STATUS_UP;
@@ -96,13 +94,8 @@ public class JDBCService implements JDBCServiceMBean, IService {
@Override
public void start() throws StartupException {
- try {
JMXService.registerMBean(getInstance(), mbeanName);
startService();
- } catch (Exception e) {
- logger.error("Failed to start {} because: ", this.getID().getName(), e);
- throw new StartupException(e);
- }
}
@Override
@@ -113,12 +106,12 @@ public class JDBCService implements JDBCServiceMBean, IService {
@Override
public ServiceType getID() {
- return ServiceType.JDBC_SERVICE;
+ return ServiceType.RPC_SERVICE;
}
@Override
public synchronized void startService() throws StartupException {
- if (STATUS_UP.equals(getJDBCServiceStatus())) {
+ if (STATUS_UP.equals(getRPCServiceStatus())) {
logger.info("{}: {} has been already running now", IoTDBConstant.GLOBAL_DB_NAME,
this.getID().getName());
return;
@@ -126,9 +119,9 @@ public class JDBCService implements JDBCServiceMBean, IService {
logger.info("{}: start {}...", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName());
try {
reset();
- jdbcServiceThread = new JDBCServiceThread(startLatch, stopLatch);
- jdbcServiceThread.setName(ThreadName.JDBC_SERVICE.getName());
- jdbcServiceThread.start();
+ rpcServiceThread = new RPCServiceThread(startLatch, stopLatch);
+ rpcServiceThread.setName(ThreadName.RPC_SERVICE.getName());
+ rpcServiceThread.start();
startLatch.await();
} catch (InterruptedException | ClassNotFoundException |
IllegalAccessException | InstantiationException e) {
@@ -154,40 +147,40 @@ public class JDBCService implements JDBCServiceMBean, IService {
@Override
public synchronized void stopService() {
- if (STATUS_DOWN.equals(getJDBCServiceStatus())) {
+ if (STATUS_DOWN.equals(getRPCServiceStatus())) {
logger.info("{}: {} isn't running now", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName());
return;
}
logger.info("{}: closing {}...", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName());
- if (jdbcServiceThread != null) {
- ((JDBCServiceThread) jdbcServiceThread).close();
+ if (rpcServiceThread != null) {
+ ((RPCServiceThread) rpcServiceThread).close();
}
try {
stopLatch.await();
reset();
logger.info("{}: close {} successfully", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName());
} catch (InterruptedException e) {
- logger.error("{}: close {} failed because {}", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName(), e);
+ logger.error("{}: close {} failed because: ", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName(), e);
Thread.currentThread().interrupt();
}
}
- private static class JDBCServiceHolder {
+ private static class RPCServiceHolder {
- private static final JDBCService INSTANCE = new JDBCService();
+ private static final RPCService INSTANCE = new RPCService();
- private JDBCServiceHolder() {
+ private RPCServiceHolder() {
}
}
- private class JDBCServiceThread extends Thread {
+ private class RPCServiceThread extends Thread {
private TServerSocket serverTransport;
private TServer poolServer;
private CountDownLatch threadStartLatch;
private CountDownLatch threadStopLatch;
- public JDBCServiceThread(CountDownLatch threadStartLatch, CountDownLatch threadStopLatch)
+ public RPCServiceThread(CountDownLatch threadStartLatch, CountDownLatch threadStopLatch)
throws ClassNotFoundException, IllegalAccessException, InstantiationException {
if(IoTDBDescriptor.getInstance().getConfig().isRpcThriftCompressionEnable()) {
protocolFactory = new TCompactProtocol.Factory();
@@ -214,33 +207,30 @@ public class JDBCService implements JDBCServiceMBean, IService {
.stopTimeoutVal(
IoTDBDescriptor.getInstance().getConfig().getThriftServerAwaitTimeForStopService());
poolArgs.executorService = IoTDBThreadPoolFactory.createThriftRpcClientThreadPool(poolArgs,
- ThreadName.JDBC_CLIENT.getName());
+ ThreadName.RPC_CLIENT.getName());
poolArgs.processor(processor);
poolArgs.protocolFactory(protocolFactory);
poolServer = new TThreadPoolServer(poolArgs);
- poolServer.setServerEventHandler(new JDBCServiceEventHandler(impl, threadStartLatch));
+ poolServer.setServerEventHandler(new RPCServiceEventHandler(impl, threadStartLatch));
poolServer.serve();
} catch (TTransportException e) {
- throw new JDBCServiceException(String.format("%s: failed to start %s, because ", IoTDBConstant.GLOBAL_DB_NAME,
+ throw new RPCServiceException(String.format("%s: failed to start %s, because ", IoTDBConstant.GLOBAL_DB_NAME,
getID().getName()), e);
} catch (Exception e) {
- throw new JDBCServiceException(String.format("%s: %s exit, because ", IoTDBConstant.GLOBAL_DB_NAME, getID().getName()), e);
+ throw new RPCServiceException(String.format("%s: %s exit, because ", IoTDBConstant.GLOBAL_DB_NAME, getID().getName()), e);
} finally {
close();
- // TODO debug log, will be deleted in production env
if (threadStopLatch == null) {
- logger.info("Stop Count Down latch is null");
+ logger.debug("Stop Count Down latch is null");
} else {
- logger.info("Stop Count Down latch is {}", threadStopLatch.getCount());
+ logger.debug("Stop Count Down latch is {}", threadStopLatch.getCount());
}
- // debug log, will be deleted in production env
if (threadStopLatch != null && threadStopLatch.getCount() == 1) {
threadStopLatch.countDown();
}
- logger.info("{}: close TThreadPoolServer and TServerSocket for {}",
- IoTDBConstant.GLOBAL_DB_NAME,
- getID().getName());
+ logger.debug("{}: close TThreadPoolServer and TServerSocket for {}",
+ IoTDBConstant.GLOBAL_DB_NAME, getID().getName());
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/JDBCServiceEventHandler.java b/server/src/main/java/org/apache/iotdb/db/service/RPCServiceEventHandler.java
similarity index 87%
rename from server/src/main/java/org/apache/iotdb/db/service/JDBCServiceEventHandler.java
rename to server/src/main/java/org/apache/iotdb/db/service/RPCServiceEventHandler.java
index 2e9ea5f..dd88a2e 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/JDBCServiceEventHandler.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/RPCServiceEventHandler.java
@@ -24,19 +24,18 @@ import org.apache.thrift.server.ServerContext;
import org.apache.thrift.server.TServerEventHandler;
import org.apache.thrift.transport.TTransport;
-public class JDBCServiceEventHandler implements TServerEventHandler {
+public class RPCServiceEventHandler implements TServerEventHandler {
private TSServiceImpl serviceImpl;
private CountDownLatch startLatch;
- JDBCServiceEventHandler(TSServiceImpl serviceImpl, CountDownLatch startLatch) {
+ RPCServiceEventHandler(TSServiceImpl serviceImpl, CountDownLatch startLatch) {
this.serviceImpl = serviceImpl;
this.startLatch = startLatch;
}
@Override
public ServerContext createContext(TProtocol arg0, TProtocol arg1) {
- // TODO Auto-generated method stub
return null;
}
@@ -52,8 +51,6 @@ public class JDBCServiceEventHandler implements TServerEventHandler {
@Override
public void processContext(ServerContext arg0, TTransport arg1, TTransport arg2) {
- // TODO Auto-generated method stub
-
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/JDBCServiceMBean.java b/server/src/main/java/org/apache/iotdb/db/service/RPCServiceMBean.java
similarity index 93%
rename from server/src/main/java/org/apache/iotdb/db/service/JDBCServiceMBean.java
rename to server/src/main/java/org/apache/iotdb/db/service/RPCServiceMBean.java
index 4a5fee0..ef8d023 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/JDBCServiceMBean.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/RPCServiceMBean.java
@@ -20,9 +20,9 @@ package org.apache.iotdb.db.service;
import org.apache.iotdb.db.exception.StartupException;
-public interface JDBCServiceMBean {
+public interface RPCServiceMBean {
- String getJDBCServiceStatus();
+ String getRPCServiceStatus();
int getRPCPort();
diff --git a/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java b/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
index 256cccb..e1498bc 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
@@ -25,7 +25,7 @@ public enum ServiceType {
STORAGE_ENGINE_SERVICE("Storage Engine ServerService", ""),
JMX_SERVICE("JMX ServerService", "JMX ServerService"),
METRICS_SERVICE("Metrics ServerService","MetricsService"),
- JDBC_SERVICE("JDBC ServerService", "JDBCService"),
+ RPC_SERVICE("RPC ServerService", "RPCService"),
MQTT_SERVICE("MQTTService", ""),
MONITOR_SERVICE("Monitor ServerService", "Monitor"),
STAT_MONITOR_SERVICE("Statistics ServerService", ""),
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java
index 056a9a0..19c4d51 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java
@@ -28,7 +28,6 @@ import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.StartupException;
import org.apache.iotdb.db.service.IService;
-import org.apache.iotdb.db.service.JDBCServiceEventHandler;
import org.apache.iotdb.db.service.ServiceType;
import org.apache.iotdb.db.sync.receiver.load.FileLoaderManager;
import org.apache.iotdb.db.sync.receiver.recover.SyncReceiverLogAnalyzer;