You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2020/04/16 11:16:00 UTC
[incubator-iotdb] branch disable_mqtt_server updated: modify bug
that close rpc is called before the rpc start
This is an automated email from the ASF dual-hosted git repository.
hxd pushed a commit to branch disable_mqtt_server
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/disable_mqtt_server by this push:
new 8780280 modify bug that close rpc is called before the rpc start
8780280 is described below
commit 8780280b395bcd60669729f6aa5d911b1bbcf9ac
Author: xiangdong huang <sa...@gmail.com>
AuthorDate: Thu Apr 16 19:15:43 2020 +0800
modify bug that close rpc is called before the rpc start
---
.../org/apache/iotdb/db/service/RPCService.java | 35 ++++++++------
.../iotdb/db/service/RPCServiceEventHandler.java | 56 ----------------------
2 files changed, 21 insertions(+), 70 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/service/RPCService.java b/server/src/main/java/org/apache/iotdb/db/service/RPCService.java
index 691c3de..31ae0a6 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/RPCService.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/RPCService.java
@@ -36,7 +36,6 @@ import org.apache.thrift.server.CustomizedTThreadPoolServer;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.CustomizedTThreadPoolServer.Args;
import org.apache.thrift.transport.CustomizedTServerSocket;
-import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,12 +51,12 @@ public class RPCService implements RPCServiceMBean, IService {
private final String mbeanName = String
.format("%s:%s=%s", IoTDBConstant.IOTDB_PACKAGE, IoTDBConstant.JMX_TYPE,
getID().getJmxName());
- private Thread rpcServiceThread;
+ private RPCServiceThread rpcServiceThread;
private TProtocolFactory protocolFactory;
private Processor<TSIService.Iface> processor;
private CustomizedTThreadPoolServer.Args poolArgs;
private TSServiceImpl impl;
- private CountDownLatch startLatch;
+
private CountDownLatch stopLatch;
private RPCService() {
@@ -69,10 +68,10 @@ public class RPCService implements RPCServiceMBean, IService {
@Override
public String getRPCServiceStatus() {
- if(startLatch == null) {
+ if (rpcServiceThread == null) {
logger.debug("Start latch is null when getting status");
} else {
- logger.debug("Start latch is {} when getting status", startLatch.getCount());
+ logger.debug("Start status is {} when getting status", rpcServiceThread.isServing());
}
if(stopLatch == null) {
logger.debug("Stop latch is null when getting status");
@@ -80,7 +79,7 @@ public class RPCService implements RPCServiceMBean, IService {
logger.debug("Stop latch is {} when getting status", stopLatch.getCount());
}
- if(startLatch != null && startLatch.getCount() == 0) {
+ if(rpcServiceThread != null && rpcServiceThread.isServing()) {
return STATUS_UP;
} else {
return STATUS_DOWN;
@@ -111,6 +110,7 @@ public class RPCService implements RPCServiceMBean, IService {
}
@Override
+ @SuppressWarnings("squid:S2276")
public synchronized void startService() throws StartupException {
if (STATUS_UP.equals(getRPCServiceStatus())) {
logger.info("{}: {} has been already running now", IoTDBConstant.GLOBAL_DB_NAME,
@@ -120,10 +120,13 @@ public class RPCService implements RPCServiceMBean, IService {
logger.info("{}: start {}...", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName());
try {
reset();
- rpcServiceThread = new RPCServiceThread(startLatch, stopLatch);
+ rpcServiceThread = new RPCServiceThread(stopLatch);
rpcServiceThread.setName(ThreadName.RPC_SERVICE.getName());
rpcServiceThread.start();
- startLatch.await();
+ while (!rpcServiceThread.isServing()) {
+ //sleep 100ms for waiting the rpc server start.
+ Thread.sleep(100);
+ }
} catch (InterruptedException | ClassNotFoundException |
IllegalAccessException | InstantiationException e) {
Thread.currentThread().interrupt();
@@ -136,7 +139,7 @@ public class RPCService implements RPCServiceMBean, IService {
}
private void reset() {
- startLatch = new CountDownLatch(1);
+ rpcServiceThread = null;
stopLatch = new CountDownLatch(1);
}
@@ -154,7 +157,7 @@ public class RPCService implements RPCServiceMBean, IService {
}
logger.info("{}: closing {}...", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName());
if (rpcServiceThread != null) {
- ((RPCServiceThread) rpcServiceThread).close();
+ rpcServiceThread.close();
}
try {
stopLatch.await();
@@ -178,10 +181,9 @@ public class RPCService implements RPCServiceMBean, IService {
private CustomizedTServerSocket serverTransport;
private TServer poolServer;
- private CountDownLatch threadStartLatch;
private CountDownLatch threadStopLatch;
- public RPCServiceThread(CountDownLatch threadStartLatch, CountDownLatch threadStopLatch)
+ public RPCServiceThread(CountDownLatch threadStopLatch)
throws ClassNotFoundException, IllegalAccessException, InstantiationException {
if(IoTDBDescriptor.getInstance().getConfig().isRpcThriftCompressionEnable()) {
protocolFactory = new TCompactProtocol.Factory();
@@ -192,7 +194,6 @@ public class RPCService implements RPCServiceMBean, IService {
IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
impl = (TSServiceImpl) Class.forName(config.getRpcImplClassName()).newInstance();
processor = new TSIService.Processor<>(impl);
- this.threadStartLatch = threadStartLatch;
this.threadStopLatch = threadStopLatch;
}
@@ -217,7 +218,6 @@ public class RPCService implements RPCServiceMBean, IService {
poolArgs.processor(processor);
poolArgs.protocolFactory(protocolFactory);
poolServer = new CustomizedTThreadPoolServer(poolArgs);
- poolServer.setServerEventHandler(new RPCServiceEventHandler(impl, threadStartLatch));
poolServer.serve();
} catch (TTransportException e) {
throw new RPCServiceException(String.format("%s: failed to start %s, because ", IoTDBConstant.GLOBAL_DB_NAME,
@@ -251,5 +251,12 @@ public class RPCService implements RPCServiceMBean, IService {
serverTransport = null;
}
}
+
+ boolean isServing() {
+ if (poolServer != null) {
+ return poolServer.isServing();
+ }
+ return false;
+ }
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/RPCServiceEventHandler.java b/server/src/main/java/org/apache/iotdb/db/service/RPCServiceEventHandler.java
deleted file mode 100644
index dd88a2e..0000000
--- a/server/src/main/java/org/apache/iotdb/db/service/RPCServiceEventHandler.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.service;
-
-import java.util.concurrent.CountDownLatch;
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.server.ServerContext;
-import org.apache.thrift.server.TServerEventHandler;
-import org.apache.thrift.transport.TTransport;
-
-public class RPCServiceEventHandler implements TServerEventHandler {
-
- private TSServiceImpl serviceImpl;
- private CountDownLatch startLatch;
-
- RPCServiceEventHandler(TSServiceImpl serviceImpl, CountDownLatch startLatch) {
- this.serviceImpl = serviceImpl;
- this.startLatch = startLatch;
- }
-
- @Override
- public ServerContext createContext(TProtocol arg0, TProtocol arg1) {
- return null;
- }
-
- @Override
- public void deleteContext(ServerContext arg0, TProtocol arg1, TProtocol arg2) {
- serviceImpl.handleClientExit();
- }
-
- @Override
- public void preServe() {
- this.startLatch.countDown();
- }
-
- @Override
- public void processContext(ServerContext arg0, TTransport arg1, TTransport arg2) {
- }
-
-}