You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/04/09 02:11:52 UTC

[incubator-iotdb] 01/03: start API service at last

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch fix_upgrade_master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 2ba311deace6f8b43cfd2f1b6c102e3d0bcc3d52
Author: qiaojialin <64...@qq.com>
AuthorDate: Thu Apr 9 10:10:34 2020 +0800

    start API service at last
---
 .../main/java/org/apache/iotdb/SessionExample.java |  2 +-
 .../org/apache/iotdb/db/concurrent/ThreadName.java |  4 +--
 .../java/org/apache/iotdb/db/service/IoTDB.java    | 29 +++++++++------
 .../service/{JDBCService.java => RPCService.java}  | 42 +++++++++++-----------
 ...{JDBCServiceMBean.java => RPCServiceMBean.java} |  4 +--
 .../org/apache/iotdb/db/service/ServiceType.java   |  2 +-
 6 files changed, 45 insertions(+), 38 deletions(-)

diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
index 70b5411..9da6413 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
@@ -44,7 +44,7 @@ public class SessionExample {
     session = new Session("127.0.0.1", 6667, "root", "root");
     session.open(false);
 
-    session.setStorageGroup("root.sg1");
+//    session.setStorageGroup("root.sg1");
     if (session.checkTimeseriesExists("root.sg1.d1.s1")) {
       session.createTimeseries("root.sg1.d1.s1", TSDataType.INT64, TSEncoding.RLE,
           CompressionType.SNAPPY);
diff --git a/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java b/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
index 926b554..1b56e02 100644
--- a/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
+++ b/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
@@ -20,8 +20,8 @@ package org.apache.iotdb.db.concurrent;
 
 public enum ThreadName {
   METRICS_SERVICE("Metrics-ServerServiceImpl"),
-  JDBC_SERVICE("JDBC-ServerServiceImpl"),
-  JDBC_CLIENT("JDBC-Client"),
+  RPC_SERVICE("RPC-ServerServiceImpl"),
+  RPC_CLIENT("RPC-Client"),
   MERGE_SERVICE("Merge-ServerServiceImpl"),
   CLOSE_MERGE_SERVICE("Close-Merge-ServerServiceImpl"),
   CLOSE_MERGE_DAEMON("Close-Merge-Daemon-Thread"),
diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
index a0cbd48..7275d21 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
@@ -66,7 +66,9 @@ public class IoTDB implements IoTDBMBean {
       return;
     }
     try {
-      setUp();
+      setUpServerService();
+      setUpAPIService();
+      logger.info("IoTDB is set up.");
     } catch (StartupException e) {
       logger.error("meet error while starting up.", e);
       deactivate();
@@ -76,17 +78,18 @@ public class IoTDB implements IoTDBMBean {
     logger.info("{} has started.", IoTDBConstant.GLOBAL_DB_NAME);
   }
 
-  private void setUp() throws StartupException {
+
+  private void setUpServerService() throws StartupException {
     logger.info("Setting up IoTDB...");
 
     Runtime.getRuntime().addShutdownHook(new IoTDBShutdownHook());
     setUncaughtExceptionHandler();
 
     initMManager();
+    registerManager.register(StorageEngine.getInstance());
     registerManager.register(JMXService.getInstance());
     registerManager.register(FlushManager.getInstance());
     registerManager.register(MultiFileLogNodeManager.getInstance());
-    registerManager.register(JDBCService.getInstance());
     registerManager.register(Monitor.getInstance());
     registerManager.register(StatMonitor.getInstance());
     registerManager.register(Measurement.INSTANCE);
@@ -96,14 +99,8 @@ public class IoTDB implements IoTDBMBean {
     registerManager.register(UpgradeSevice.getINSTANCE());
     registerManager.register(MergeManager.getINSTANCE());
     registerManager.register(CacheHitRatioMonitor.getInstance());
-    if (IoTDBDescriptor.getInstance().getConfig().isEnableMetricService()) {
-      registerManager.register(MetricsService.getInstance());
-    }
-    if (IoTDBDescriptor.getInstance().getConfig().isEnableMQTTService()) {
-      registerManager.register(MQTTService.getInstance());
-    }
     JMXService.registerMBean(getInstance(), mbeanName);
-    registerManager.register(StorageEngine.getInstance());
+
 
     // When registering statMonitor, we should start recovering some statistics
     // with latest values stored
@@ -111,10 +108,20 @@ public class IoTDB implements IoTDBMBean {
     if (IoTDBDescriptor.getInstance().getConfig().isEnableStatMonitor()) {
       StatMonitor.getInstance().recovery();
     }
+  }
 
-    logger.info("IoTDB is set up.");
+  private void setUpAPIService() throws StartupException {
+    // start api services at last
+    if (IoTDBDescriptor.getInstance().getConfig().isEnableMetricService()) {
+      registerManager.register(MetricsService.getInstance());
+    }
+    if (IoTDBDescriptor.getInstance().getConfig().isEnableMQTTService()) {
+      registerManager.register(MQTTService.getInstance());
+    }
+    registerManager.register(RPCService.getInstance());
   }
 
+
   private void deactivate() {
     logger.info("Deactivating IoTDB...");
     registerManager.deregisterAll();
diff --git a/server/src/main/java/org/apache/iotdb/db/service/JDBCService.java b/server/src/main/java/org/apache/iotdb/db/service/RPCService.java
similarity index 88%
rename from server/src/main/java/org/apache/iotdb/db/service/JDBCService.java
rename to server/src/main/java/org/apache/iotdb/db/service/RPCService.java
index e4ae5ef..36a6737 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/JDBCService.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/RPCService.java
@@ -43,15 +43,15 @@ import org.slf4j.LoggerFactory;
 /**
  * A service to handle jdbc request from client.
  */
-public class JDBCService implements JDBCServiceMBean, IService {
+public class RPCService implements RPCServiceMBean, IService {
 
-  private static final Logger logger = LoggerFactory.getLogger(JDBCService.class);
+  private static final Logger logger = LoggerFactory.getLogger(RPCService.class);
   private static final String STATUS_UP = "UP";
   private static final String STATUS_DOWN = "DOWN";
   private final String mbeanName = String
       .format("%s:%s=%s", IoTDBConstant.IOTDB_PACKAGE, IoTDBConstant.JMX_TYPE,
           getID().getJmxName());
-  private Thread jdbcServiceThread;
+  private Thread rpcServiceThread;
   private TProtocolFactory protocolFactory;
   private Processor<TSIService.Iface> processor;
   private TThreadPoolServer.Args poolArgs;
@@ -59,15 +59,15 @@ public class JDBCService implements JDBCServiceMBean, IService {
   private CountDownLatch startLatch;
   private CountDownLatch stopLatch;
 
-  private JDBCService() {
+  private RPCService() {
   }
 
-  public static final JDBCService getInstance() {
-    return JDBCServiceHolder.INSTANCE;
+  public static final RPCService getInstance() {
+    return RPCServiceHolder.INSTANCE;
   }
 
   @Override
-  public String getJDBCServiceStatus() {
+  public String getRPCServiceStatus() {
     // TODO debug log, will be deleted in production env
     if(startLatch == null) {
       logger.info("Start latch is null when getting status");
@@ -113,12 +113,12 @@ public class JDBCService implements JDBCServiceMBean, IService {
 
   @Override
   public ServiceType getID() {
-    return ServiceType.JDBC_SERVICE;
+    return ServiceType.RPC_SERVICE;
   }
 
   @Override
   public synchronized void startService() throws StartupException {
-    if (STATUS_UP.equals(getJDBCServiceStatus())) {
+    if (STATUS_UP.equals(getRPCServiceStatus())) {
       logger.info("{}: {} has been already running now", IoTDBConstant.GLOBAL_DB_NAME,
           this.getID().getName());
       return;
@@ -126,9 +126,9 @@ public class JDBCService implements JDBCServiceMBean, IService {
     logger.info("{}: start {}...", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName());
     try {
       reset();
-      jdbcServiceThread = new JDBCServiceThread(startLatch, stopLatch);
-      jdbcServiceThread.setName(ThreadName.JDBC_SERVICE.getName());
-      jdbcServiceThread.start();
+      rpcServiceThread = new RPCServiceThread(startLatch, stopLatch);
+      rpcServiceThread.setName(ThreadName.RPC_SERVICE.getName());
+      rpcServiceThread.start();
       startLatch.await();
     } catch (InterruptedException | ClassNotFoundException |
         IllegalAccessException | InstantiationException e) {
@@ -154,13 +154,13 @@ public class JDBCService implements JDBCServiceMBean, IService {
 
   @Override
   public synchronized void stopService() {
-    if (STATUS_DOWN.equals(getJDBCServiceStatus())) {
+    if (STATUS_DOWN.equals(getRPCServiceStatus())) {
       logger.info("{}: {} isn't running now", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName());
       return;
     }
     logger.info("{}: closing {}...", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName());
-    if (jdbcServiceThread != null) {
-      ((JDBCServiceThread) jdbcServiceThread).close();
+    if (rpcServiceThread != null) {
+      ((RPCServiceThread) rpcServiceThread).close();
     }
     try {
       stopLatch.await();
@@ -172,22 +172,22 @@ public class JDBCService implements JDBCServiceMBean, IService {
     }
   }
 
-  private static class JDBCServiceHolder {
+  private static class RPCServiceHolder {
 
-    private static final JDBCService INSTANCE = new JDBCService();
+    private static final RPCService INSTANCE = new RPCService();
 
-    private JDBCServiceHolder() {
+    private RPCServiceHolder() {
     }
   }
 
-  private class JDBCServiceThread extends Thread {
+  private class RPCServiceThread extends Thread {
 
     private TServerSocket serverTransport;
     private TServer poolServer;
     private CountDownLatch threadStartLatch;
     private CountDownLatch threadStopLatch;
 
-    public JDBCServiceThread(CountDownLatch threadStartLatch, CountDownLatch threadStopLatch)
+    public RPCServiceThread(CountDownLatch threadStartLatch, CountDownLatch threadStopLatch)
         throws ClassNotFoundException, IllegalAccessException, InstantiationException {
       if(IoTDBDescriptor.getInstance().getConfig().isRpcThriftCompressionEnable()) {
         protocolFactory = new TCompactProtocol.Factory();
@@ -214,7 +214,7 @@ public class JDBCService implements JDBCServiceMBean, IService {
             .stopTimeoutVal(
                 IoTDBDescriptor.getInstance().getConfig().getThriftServerAwaitTimeForStopService());
         poolArgs.executorService = IoTDBThreadPoolFactory.createThriftRpcClientThreadPool(poolArgs,
-            ThreadName.JDBC_CLIENT.getName());
+            ThreadName.RPC_CLIENT.getName());
         poolArgs.processor(processor);
         poolArgs.protocolFactory(protocolFactory);
         poolServer = new TThreadPoolServer(poolArgs);
diff --git a/server/src/main/java/org/apache/iotdb/db/service/JDBCServiceMBean.java b/server/src/main/java/org/apache/iotdb/db/service/RPCServiceMBean.java
similarity index 93%
rename from server/src/main/java/org/apache/iotdb/db/service/JDBCServiceMBean.java
rename to server/src/main/java/org/apache/iotdb/db/service/RPCServiceMBean.java
index 4a5fee0..ef8d023 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/JDBCServiceMBean.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/RPCServiceMBean.java
@@ -20,9 +20,9 @@ package org.apache.iotdb.db.service;
 
 import org.apache.iotdb.db.exception.StartupException;
 
-public interface JDBCServiceMBean {
+public interface RPCServiceMBean {
 
-  String getJDBCServiceStatus();
+  String getRPCServiceStatus();
 
   int getRPCPort();
 
diff --git a/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java b/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
index 256cccb..e1498bc 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
@@ -25,7 +25,7 @@ public enum ServiceType {
   STORAGE_ENGINE_SERVICE("Storage Engine ServerService", ""),
   JMX_SERVICE("JMX ServerService", "JMX ServerService"),
   METRICS_SERVICE("Metrics ServerService","MetricsService"),
-  JDBC_SERVICE("JDBC ServerService", "JDBCService"),
+  RPC_SERVICE("RPC ServerService", "RPCService"),
   MQTT_SERVICE("MQTTService", ""),
   MONITOR_SERVICE("Monitor ServerService", "Monitor"),
   STAT_MONITOR_SERVICE("Statistics ServerService", ""),