You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2020/04/16 11:16:00 UTC

[incubator-iotdb] branch disable_mqtt_server updated: modify bug that close rpc is called before the rpc start

This is an automated email from the ASF dual-hosted git repository.

hxd pushed a commit to branch disable_mqtt_server
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/disable_mqtt_server by this push:
     new 8780280  modify bug that close rpc is called before the rpc start
8780280 is described below

commit 8780280b395bcd60669729f6aa5d911b1bbcf9ac
Author: xiangdong huang <sa...@gmail.com>
AuthorDate: Thu Apr 16 19:15:43 2020 +0800

    modify bug that close rpc is called before the rpc start
---
 .../org/apache/iotdb/db/service/RPCService.java    | 35 ++++++++------
 .../iotdb/db/service/RPCServiceEventHandler.java   | 56 ----------------------
 2 files changed, 21 insertions(+), 70 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/service/RPCService.java b/server/src/main/java/org/apache/iotdb/db/service/RPCService.java
index 691c3de..31ae0a6 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/RPCService.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/RPCService.java
@@ -36,7 +36,6 @@ import org.apache.thrift.server.CustomizedTThreadPoolServer;
 import org.apache.thrift.server.TServer;
 import org.apache.thrift.server.CustomizedTThreadPoolServer.Args;
 import org.apache.thrift.transport.CustomizedTServerSocket;
-import org.apache.thrift.transport.TServerSocket;
 import org.apache.thrift.transport.TTransportException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -52,12 +51,12 @@ public class RPCService implements RPCServiceMBean, IService {
   private final String mbeanName = String
       .format("%s:%s=%s", IoTDBConstant.IOTDB_PACKAGE, IoTDBConstant.JMX_TYPE,
           getID().getJmxName());
-  private Thread rpcServiceThread;
+  private RPCServiceThread rpcServiceThread;
   private TProtocolFactory protocolFactory;
   private Processor<TSIService.Iface> processor;
   private CustomizedTThreadPoolServer.Args poolArgs;
   private TSServiceImpl impl;
-  private CountDownLatch startLatch;
+
   private CountDownLatch stopLatch;
 
   private RPCService() {
@@ -69,10 +68,10 @@ public class RPCService implements RPCServiceMBean, IService {
 
   @Override
   public String getRPCServiceStatus() {
-    if(startLatch == null) {
+    if (rpcServiceThread == null) {
       logger.debug("Start latch is null when getting status");
     } else {
-      logger.debug("Start latch is {} when getting status", startLatch.getCount());
+      logger.debug("Start status is {} when getting status", rpcServiceThread.isServing());
     }
     if(stopLatch == null) {
       logger.debug("Stop latch is null when getting status");
@@ -80,7 +79,7 @@ public class RPCService implements RPCServiceMBean, IService {
       logger.debug("Stop latch is {} when getting status", stopLatch.getCount());
     }	
 
-    if(startLatch != null && startLatch.getCount() == 0) {
+    if(rpcServiceThread != null && rpcServiceThread.isServing()) {
       return STATUS_UP;
     } else {
       return STATUS_DOWN;
@@ -111,6 +110,7 @@ public class RPCService implements RPCServiceMBean, IService {
   }
 
   @Override
+  @SuppressWarnings("squid:S2276")
   public synchronized void startService() throws StartupException {
     if (STATUS_UP.equals(getRPCServiceStatus())) {
       logger.info("{}: {} has been already running now", IoTDBConstant.GLOBAL_DB_NAME,
@@ -120,10 +120,13 @@ public class RPCService implements RPCServiceMBean, IService {
     logger.info("{}: start {}...", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName());
     try {
       reset();
-      rpcServiceThread = new RPCServiceThread(startLatch, stopLatch);
+      rpcServiceThread = new RPCServiceThread(stopLatch);
       rpcServiceThread.setName(ThreadName.RPC_SERVICE.getName());
       rpcServiceThread.start();
-      startLatch.await();
+      while (!rpcServiceThread.isServing()) {
+        //sleep 100ms for waiting the rpc server start.
+        Thread.sleep(100);
+      }
     } catch (InterruptedException | ClassNotFoundException |
         IllegalAccessException | InstantiationException e) {
       Thread.currentThread().interrupt();
@@ -136,7 +139,7 @@ public class RPCService implements RPCServiceMBean, IService {
   }
   
   private void reset() {
-    startLatch = new CountDownLatch(1);
+    rpcServiceThread = null;
     stopLatch = new CountDownLatch(1);	  
   }
 
@@ -154,7 +157,7 @@ public class RPCService implements RPCServiceMBean, IService {
     }
     logger.info("{}: closing {}...", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName());
     if (rpcServiceThread != null) {
-      ((RPCServiceThread) rpcServiceThread).close();
+      rpcServiceThread.close();
     }
     try {
       stopLatch.await();
@@ -178,10 +181,9 @@ public class RPCService implements RPCServiceMBean, IService {
 
     private CustomizedTServerSocket serverTransport;
     private TServer poolServer;
-    private CountDownLatch threadStartLatch;
     private CountDownLatch threadStopLatch;
 
-    public RPCServiceThread(CountDownLatch threadStartLatch, CountDownLatch threadStopLatch)
+    public RPCServiceThread(CountDownLatch threadStopLatch)
         throws ClassNotFoundException, IllegalAccessException, InstantiationException {
       if(IoTDBDescriptor.getInstance().getConfig().isRpcThriftCompressionEnable()) {
         protocolFactory = new TCompactProtocol.Factory();
@@ -192,7 +194,6 @@ public class RPCService implements RPCServiceMBean, IService {
       IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
       impl = (TSServiceImpl) Class.forName(config.getRpcImplClassName()).newInstance();
       processor = new TSIService.Processor<>(impl);
-      this.threadStartLatch = threadStartLatch;
       this.threadStopLatch = threadStopLatch;
     }
 
@@ -217,7 +218,6 @@ public class RPCService implements RPCServiceMBean, IService {
         poolArgs.processor(processor);
         poolArgs.protocolFactory(protocolFactory);
         poolServer = new CustomizedTThreadPoolServer(poolArgs);
-        poolServer.setServerEventHandler(new RPCServiceEventHandler(impl, threadStartLatch));
         poolServer.serve();
       } catch (TTransportException e) {
         throw new RPCServiceException(String.format("%s: failed to start %s, because ", IoTDBConstant.GLOBAL_DB_NAME,
@@ -251,5 +251,12 @@ public class RPCService implements RPCServiceMBean, IService {
         serverTransport = null;
       }
     }
+
+    boolean isServing() {
+      if (poolServer != null) {
+        return poolServer.isServing();
+      }
+      return false;
+    }
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/RPCServiceEventHandler.java b/server/src/main/java/org/apache/iotdb/db/service/RPCServiceEventHandler.java
deleted file mode 100644
index dd88a2e..0000000
--- a/server/src/main/java/org/apache/iotdb/db/service/RPCServiceEventHandler.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.service;
-
-import java.util.concurrent.CountDownLatch;
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.server.ServerContext;
-import org.apache.thrift.server.TServerEventHandler;
-import org.apache.thrift.transport.TTransport;
-
-public class RPCServiceEventHandler implements TServerEventHandler {
-
-  private TSServiceImpl serviceImpl;
-  private CountDownLatch startLatch;
-
-  RPCServiceEventHandler(TSServiceImpl serviceImpl, CountDownLatch startLatch) {
-    this.serviceImpl = serviceImpl;
-    this.startLatch = startLatch;
-  }
-
-  @Override
-  public ServerContext createContext(TProtocol arg0, TProtocol arg1) {
-    return null;
-  }
-
-  @Override
-  public void deleteContext(ServerContext arg0, TProtocol arg1, TProtocol arg2) {
-    serviceImpl.handleClientExit();
-  }
-
-  @Override
-  public void preServe() {
-    this.startLatch.countDown();
-  }
-
-  @Override
-  public void processContext(ServerContext arg0, TTransport arg1, TTransport arg2) {
-  }
-
-}