You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/04/09 13:52:24 UTC

[incubator-iotdb] branch master updated: [IOTDB-583] Start API services at last (#1013)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new fe62499  [IOTDB-583] Start API services at last (#1013)
fe62499 is described below

commit fe6249974b1718c6329cc09928cafbd48a017599
Author: Jialin Qiao <qj...@mails.tsinghua.edu.cn>
AuthorDate: Thu Apr 9 21:52:14 2020 +0800

    [IOTDB-583] Start API services at last (#1013)
    
    * start API service at last
---
 .../org/apache/iotdb/db/concurrent/ThreadName.java |  4 +-
 .../engine/storagegroup/StorageGroupProcessor.java |  8 +--
 ...viceException.java => RPCServiceException.java} |  6 +-
 .../java/org/apache/iotdb/db/service/IoTDB.java    | 14 ++--
 .../service/{JDBCService.java => RPCService.java}  | 78 ++++++++++------------
 ...entHandler.java => RPCServiceEventHandler.java} |  7 +-
 ...{JDBCServiceMBean.java => RPCServiceMBean.java} |  4 +-
 .../org/apache/iotdb/db/service/ServiceType.java   |  2 +-
 .../iotdb/db/sync/receiver/SyncServerManager.java  |  1 -
 9 files changed, 55 insertions(+), 69 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java b/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
index 926b554..1b56e02 100644
--- a/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
+++ b/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
@@ -20,8 +20,8 @@ package org.apache.iotdb.db.concurrent;
 
 public enum ThreadName {
   METRICS_SERVICE("Metrics-ServerServiceImpl"),
-  JDBC_SERVICE("JDBC-ServerServiceImpl"),
-  JDBC_CLIENT("JDBC-Client"),
+  RPC_SERVICE("RPC-ServerServiceImpl"),
+  RPC_CLIENT("RPC-Client"),
   MERGE_SERVICE("Merge-ServerServiceImpl"),
   CLOSE_MERGE_SERVICE("Close-Merge-ServerServiceImpl"),
   CLOSE_MERGE_DAEMON("Close-Merge-Daemon-Thread"),
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 1074e6c..d3afe72 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -1296,12 +1296,12 @@ public class StorageGroupProcessor {
    */
   public int countUpgradeFiles() {
     int cntUpgradeFileNum = 0;
-    for (TsFileResource seqTsFileResource : new ArrayList<>(sequenceFileTreeSet)) {
+    for (TsFileResource seqTsFileResource : sequenceFileTreeSet) {
       if (UpgradeUtils.isNeedUpgrade(seqTsFileResource)) {
         cntUpgradeFileNum += 1;
       }
     }
-    for (TsFileResource unseqTsFileResource : new ArrayList<>(unSequenceFileList)) {
+    for (TsFileResource unseqTsFileResource : unSequenceFileList) {
       if (UpgradeUtils.isNeedUpgrade(unseqTsFileResource)) {
         cntUpgradeFileNum += 1;
       }
@@ -1310,10 +1310,10 @@ public class StorageGroupProcessor {
   }
 
   public void upgrade() {
-    for (TsFileResource seqTsFileResource : new ArrayList<>(sequenceFileTreeSet)) {
+    for (TsFileResource seqTsFileResource : sequenceFileTreeSet) {
       seqTsFileResource.doUpgrade();
     }
-    for (TsFileResource unseqTsFileResource : new ArrayList<>(unSequenceFileList)) {
+    for (TsFileResource unseqTsFileResource : unSequenceFileList) {
       unseqTsFileResource.doUpgrade();
     }
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/exception/runtime/JDBCServiceException.java b/server/src/main/java/org/apache/iotdb/db/exception/runtime/RPCServiceException.java
similarity index 85%
rename from server/src/main/java/org/apache/iotdb/db/exception/runtime/JDBCServiceException.java
rename to server/src/main/java/org/apache/iotdb/db/exception/runtime/RPCServiceException.java
index 1664964..c3acee3 100644
--- a/server/src/main/java/org/apache/iotdb/db/exception/runtime/JDBCServiceException.java
+++ b/server/src/main/java/org/apache/iotdb/db/exception/runtime/RPCServiceException.java
@@ -18,15 +18,15 @@
  */
 package org.apache.iotdb.db.exception.runtime;
 
-public class JDBCServiceException extends RuntimeException{
+public class RPCServiceException extends RuntimeException{
 
   private static final long serialVersionUID = 520836932066897810L;
 
-  public JDBCServiceException(String message) {
+  public RPCServiceException(String message) {
     super(message);
   }
 
-  public JDBCServiceException(String message, Throwable e) {
+  public RPCServiceException(String message, Throwable e) {
     super(message + e.getMessage());
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
index a0cbd48..022bfbc 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
@@ -86,7 +86,6 @@ public class IoTDB implements IoTDBMBean {
     registerManager.register(JMXService.getInstance());
     registerManager.register(FlushManager.getInstance());
     registerManager.register(MultiFileLogNodeManager.getInstance());
-    registerManager.register(JDBCService.getInstance());
     registerManager.register(Monitor.getInstance());
     registerManager.register(StatMonitor.getInstance());
     registerManager.register(Measurement.INSTANCE);
@@ -96,12 +95,6 @@ public class IoTDB implements IoTDBMBean {
     registerManager.register(UpgradeSevice.getINSTANCE());
     registerManager.register(MergeManager.getINSTANCE());
     registerManager.register(CacheHitRatioMonitor.getInstance());
-    if (IoTDBDescriptor.getInstance().getConfig().isEnableMetricService()) {
-      registerManager.register(MetricsService.getInstance());
-    }
-    if (IoTDBDescriptor.getInstance().getConfig().isEnableMQTTService()) {
-      registerManager.register(MQTTService.getInstance());
-    }
     JMXService.registerMBean(getInstance(), mbeanName);
     registerManager.register(StorageEngine.getInstance());
 
@@ -112,6 +105,13 @@ public class IoTDB implements IoTDBMBean {
       StatMonitor.getInstance().recovery();
     }
 
+    registerManager.register(RPCService.getInstance());
+    if (IoTDBDescriptor.getInstance().getConfig().isEnableMetricService()) {
+      registerManager.register(MetricsService.getInstance());
+    }
+    if (IoTDBDescriptor.getInstance().getConfig().isEnableMQTTService()) {
+      registerManager.register(MQTTService.getInstance());
+    }
     logger.info("IoTDB is set up.");
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/service/JDBCService.java b/server/src/main/java/org/apache/iotdb/db/service/RPCService.java
similarity index 73%
rename from server/src/main/java/org/apache/iotdb/db/service/JDBCService.java
rename to server/src/main/java/org/apache/iotdb/db/service/RPCService.java
index e4ae5ef..a1aeaa5 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/JDBCService.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/RPCService.java
@@ -26,7 +26,7 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.StartupException;
-import org.apache.iotdb.db.exception.runtime.JDBCServiceException;
+import org.apache.iotdb.db.exception.runtime.RPCServiceException;
 import org.apache.iotdb.service.rpc.thrift.TSIService;
 import org.apache.iotdb.service.rpc.thrift.TSIService.Processor;
 import org.apache.thrift.protocol.TBinaryProtocol;
@@ -43,15 +43,15 @@ import org.slf4j.LoggerFactory;
 /**
  * A service to handle jdbc request from client.
  */
-public class JDBCService implements JDBCServiceMBean, IService {
+public class RPCService implements RPCServiceMBean, IService {
 
-  private static final Logger logger = LoggerFactory.getLogger(JDBCService.class);
+  private static final Logger logger = LoggerFactory.getLogger(RPCService.class);
   private static final String STATUS_UP = "UP";
   private static final String STATUS_DOWN = "DOWN";
   private final String mbeanName = String
       .format("%s:%s=%s", IoTDBConstant.IOTDB_PACKAGE, IoTDBConstant.JMX_TYPE,
           getID().getJmxName());
-  private Thread jdbcServiceThread;
+  private Thread rpcServiceThread;
   private TProtocolFactory protocolFactory;
   private Processor<TSIService.Iface> processor;
   private TThreadPoolServer.Args poolArgs;
@@ -59,27 +59,25 @@ public class JDBCService implements JDBCServiceMBean, IService {
   private CountDownLatch startLatch;
   private CountDownLatch stopLatch;
 
-  private JDBCService() {
+  private RPCService() {
   }
 
-  public static final JDBCService getInstance() {
-    return JDBCServiceHolder.INSTANCE;
+  public static final RPCService getInstance() {
+    return RPCServiceHolder.INSTANCE;
   }
 
   @Override
-  public String getJDBCServiceStatus() {
-    // TODO debug log, will be deleted in production env
+  public String getRPCServiceStatus() {
     if(startLatch == null) {
-      logger.info("Start latch is null when getting status");
+      logger.debug("Start latch is null when getting status");
     } else {
-      logger.info("Start latch is {} when getting status", startLatch.getCount());
+      logger.debug("Start latch is {} when getting status", startLatch.getCount());
     }
     if(stopLatch == null) {
-      logger.info("Stop latch is null when getting status");
+      logger.debug("Stop latch is null when getting status");
     } else {
-      logger.info("Stop latch is {} when getting status", stopLatch.getCount());
+      logger.debug("Stop latch is {} when getting status", stopLatch.getCount());
     }	
-    // debug log, will be deleted in production env
 
     if(startLatch != null && startLatch.getCount() == 0) {
       return STATUS_UP;
@@ -96,13 +94,8 @@ public class JDBCService implements JDBCServiceMBean, IService {
 
   @Override
   public void start() throws StartupException {
-    try {
       JMXService.registerMBean(getInstance(), mbeanName);
       startService();
-    } catch (Exception e) {
-      logger.error("Failed to start {} because: ", this.getID().getName(), e);
-      throw new StartupException(e);
-    }
   }
 
   @Override
@@ -113,12 +106,12 @@ public class JDBCService implements JDBCServiceMBean, IService {
 
   @Override
   public ServiceType getID() {
-    return ServiceType.JDBC_SERVICE;
+    return ServiceType.RPC_SERVICE;
   }
 
   @Override
   public synchronized void startService() throws StartupException {
-    if (STATUS_UP.equals(getJDBCServiceStatus())) {
+    if (STATUS_UP.equals(getRPCServiceStatus())) {
       logger.info("{}: {} has been already running now", IoTDBConstant.GLOBAL_DB_NAME,
           this.getID().getName());
       return;
@@ -126,9 +119,9 @@ public class JDBCService implements JDBCServiceMBean, IService {
     logger.info("{}: start {}...", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName());
     try {
       reset();
-      jdbcServiceThread = new JDBCServiceThread(startLatch, stopLatch);
-      jdbcServiceThread.setName(ThreadName.JDBC_SERVICE.getName());
-      jdbcServiceThread.start();
+      rpcServiceThread = new RPCServiceThread(startLatch, stopLatch);
+      rpcServiceThread.setName(ThreadName.RPC_SERVICE.getName());
+      rpcServiceThread.start();
       startLatch.await();
     } catch (InterruptedException | ClassNotFoundException |
         IllegalAccessException | InstantiationException e) {
@@ -154,40 +147,40 @@ public class JDBCService implements JDBCServiceMBean, IService {
 
   @Override
   public synchronized void stopService() {
-    if (STATUS_DOWN.equals(getJDBCServiceStatus())) {
+    if (STATUS_DOWN.equals(getRPCServiceStatus())) {
       logger.info("{}: {} isn't running now", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName());
       return;
     }
     logger.info("{}: closing {}...", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName());
-    if (jdbcServiceThread != null) {
-      ((JDBCServiceThread) jdbcServiceThread).close();
+    if (rpcServiceThread != null) {
+      ((RPCServiceThread) rpcServiceThread).close();
     }
     try {
       stopLatch.await();
       reset();
       logger.info("{}: close {} successfully", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName());
     } catch (InterruptedException e) {
-      logger.error("{}: close {} failed because {}", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName(), e);
+      logger.error("{}: close {} failed because: ", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName(), e);
       Thread.currentThread().interrupt();
     }
   }
 
-  private static class JDBCServiceHolder {
+  private static class RPCServiceHolder {
 
-    private static final JDBCService INSTANCE = new JDBCService();
+    private static final RPCService INSTANCE = new RPCService();
 
-    private JDBCServiceHolder() {
+    private RPCServiceHolder() {
     }
   }
 
-  private class JDBCServiceThread extends Thread {
+  private class RPCServiceThread extends Thread {
 
     private TServerSocket serverTransport;
     private TServer poolServer;
     private CountDownLatch threadStartLatch;
     private CountDownLatch threadStopLatch;
 
-    public JDBCServiceThread(CountDownLatch threadStartLatch, CountDownLatch threadStopLatch)
+    public RPCServiceThread(CountDownLatch threadStartLatch, CountDownLatch threadStopLatch)
         throws ClassNotFoundException, IllegalAccessException, InstantiationException {
       if(IoTDBDescriptor.getInstance().getConfig().isRpcThriftCompressionEnable()) {
         protocolFactory = new TCompactProtocol.Factory();
@@ -214,33 +207,30 @@ public class JDBCService implements JDBCServiceMBean, IService {
             .stopTimeoutVal(
                 IoTDBDescriptor.getInstance().getConfig().getThriftServerAwaitTimeForStopService());
         poolArgs.executorService = IoTDBThreadPoolFactory.createThriftRpcClientThreadPool(poolArgs,
-            ThreadName.JDBC_CLIENT.getName());
+            ThreadName.RPC_CLIENT.getName());
         poolArgs.processor(processor);
         poolArgs.protocolFactory(protocolFactory);
         poolServer = new TThreadPoolServer(poolArgs);
-        poolServer.setServerEventHandler(new JDBCServiceEventHandler(impl, threadStartLatch));
+        poolServer.setServerEventHandler(new RPCServiceEventHandler(impl, threadStartLatch));
         poolServer.serve();
       } catch (TTransportException e) {
-        throw new JDBCServiceException(String.format("%s: failed to start %s, because ", IoTDBConstant.GLOBAL_DB_NAME,
+        throw new RPCServiceException(String.format("%s: failed to start %s, because ", IoTDBConstant.GLOBAL_DB_NAME,
             getID().getName()), e);
       } catch (Exception e) {
-        throw new JDBCServiceException(String.format("%s: %s exit, because ", IoTDBConstant.GLOBAL_DB_NAME, getID().getName()), e);
+        throw new RPCServiceException(String.format("%s: %s exit, because ", IoTDBConstant.GLOBAL_DB_NAME, getID().getName()), e);
       } finally {
         close();
-        // TODO debug log, will be deleted in production env
         if (threadStopLatch == null) {
-          logger.info("Stop Count Down latch is null");
+          logger.debug("Stop Count Down latch is null");
         } else {
-          logger.info("Stop Count Down latch is {}", threadStopLatch.getCount());
+          logger.debug("Stop Count Down latch is {}", threadStopLatch.getCount());
         }
-        // debug log, will be deleted in production env
 
         if (threadStopLatch != null && threadStopLatch.getCount() == 1) {
           threadStopLatch.countDown();
         }
-        logger.info("{}: close TThreadPoolServer and TServerSocket for {}",
-            IoTDBConstant.GLOBAL_DB_NAME,
-            getID().getName());
+        logger.debug("{}: close TThreadPoolServer and TServerSocket for {}",
+            IoTDBConstant.GLOBAL_DB_NAME, getID().getName());
       }
     }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/service/JDBCServiceEventHandler.java b/server/src/main/java/org/apache/iotdb/db/service/RPCServiceEventHandler.java
similarity index 87%
rename from server/src/main/java/org/apache/iotdb/db/service/JDBCServiceEventHandler.java
rename to server/src/main/java/org/apache/iotdb/db/service/RPCServiceEventHandler.java
index 2e9ea5f..dd88a2e 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/JDBCServiceEventHandler.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/RPCServiceEventHandler.java
@@ -24,19 +24,18 @@ import org.apache.thrift.server.ServerContext;
 import org.apache.thrift.server.TServerEventHandler;
 import org.apache.thrift.transport.TTransport;
 
-public class JDBCServiceEventHandler implements TServerEventHandler {
+public class RPCServiceEventHandler implements TServerEventHandler {
 
   private TSServiceImpl serviceImpl;
   private CountDownLatch startLatch;
 
-  JDBCServiceEventHandler(TSServiceImpl serviceImpl, CountDownLatch startLatch) {
+  RPCServiceEventHandler(TSServiceImpl serviceImpl, CountDownLatch startLatch) {
     this.serviceImpl = serviceImpl;
     this.startLatch = startLatch;
   }
 
   @Override
   public ServerContext createContext(TProtocol arg0, TProtocol arg1) {
-    // TODO Auto-generated method stub
     return null;
   }
 
@@ -52,8 +51,6 @@ public class JDBCServiceEventHandler implements TServerEventHandler {
 
   @Override
   public void processContext(ServerContext arg0, TTransport arg1, TTransport arg2) {
-    // TODO Auto-generated method stub
-
   }
 
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/JDBCServiceMBean.java b/server/src/main/java/org/apache/iotdb/db/service/RPCServiceMBean.java
similarity index 93%
rename from server/src/main/java/org/apache/iotdb/db/service/JDBCServiceMBean.java
rename to server/src/main/java/org/apache/iotdb/db/service/RPCServiceMBean.java
index 4a5fee0..ef8d023 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/JDBCServiceMBean.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/RPCServiceMBean.java
@@ -20,9 +20,9 @@ package org.apache.iotdb.db.service;
 
 import org.apache.iotdb.db.exception.StartupException;
 
-public interface JDBCServiceMBean {
+public interface RPCServiceMBean {
 
-  String getJDBCServiceStatus();
+  String getRPCServiceStatus();
 
   int getRPCPort();
 
diff --git a/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java b/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
index 256cccb..e1498bc 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
@@ -25,7 +25,7 @@ public enum ServiceType {
   STORAGE_ENGINE_SERVICE("Storage Engine ServerService", ""),
   JMX_SERVICE("JMX ServerService", "JMX ServerService"),
   METRICS_SERVICE("Metrics ServerService","MetricsService"),
-  JDBC_SERVICE("JDBC ServerService", "JDBCService"),
+  RPC_SERVICE("RPC ServerService", "RPCService"),
   MQTT_SERVICE("MQTTService", ""),
   MONITOR_SERVICE("Monitor ServerService", "Monitor"),
   STAT_MONITOR_SERVICE("Statistics ServerService", ""),
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java
index 056a9a0..19c4d51 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java
@@ -28,7 +28,6 @@ import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.StartupException;
 import org.apache.iotdb.db.service.IService;
-import org.apache.iotdb.db.service.JDBCServiceEventHandler;
 import org.apache.iotdb.db.service.ServiceType;
 import org.apache.iotdb.db.sync.receiver.load.FileLoaderManager;
 import org.apache.iotdb.db.sync.receiver.recover.SyncReceiverLogAnalyzer;