You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2020/04/17 07:47:34 UTC
[incubator-iotdb] branch master updated: disable mqtt server in
default config and fix rpc and sync server startup problem (#1033)
This is an automated email from the ASF dual-hosted git repository.
hxd pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 3191fc4 disable mqtt server in default config and fix rpc and sync server startup problem (#1033)
3191fc4 is described below
commit 3191fc4b4474f95aa18325b4f42e31b5f579729b
Author: Dawei Liu <li...@apache.org>
AuthorDate: Fri Apr 17 15:47:25 2020 +0800
disable mqtt server in default config and fix rpc and sync server startup problem (#1033)
* disable mqtt server in default config
* fix rpc and sync server startup problem in UTs
---
docs/UserGuide/4-Client/6-Programming - MQTT.md | 6 +--
.../resources/conf/iotdb-engine.properties | 2 +-
.../db/concurrent/IoTDBThreadPoolFactory.java | 6 +--
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 2 +-
.../org/apache/iotdb/db/service/RPCService.java | 43 +++++++++++------
.../iotdb/db/service/RPCServiceEventHandler.java | 56 ----------------------
.../iotdb/db/sync/receiver/SyncServerManager.java | 28 ++++++-----
.../db/sync/thrift/SyncServiceEventHandler.java | 56 ----------------------
.../iotdb/db/metadata/MManagerImproveTest.java | 4 --
.../org/apache/iotdb/db/metadata/MTreeTest.java | 12 +++++
server/src/test/resources/logback.xml | 5 ++
11 files changed, 68 insertions(+), 152 deletions(-)
diff --git a/docs/UserGuide/4-Client/6-Programming - MQTT.md b/docs/UserGuide/4-Client/6-Programming - MQTT.md
index 0fe588e..bd06024 100644
--- a/docs/UserGuide/4-Client/6-Programming - MQTT.md
+++ b/docs/UserGuide/4-Client/6-Programming - MQTT.md
@@ -64,7 +64,7 @@ Configurations are as following:
| NAME | DESCRIPTION | DEFAULT |
| ------------- |:-------------:|:------:|
-| enable_mqtt_service | whether to enable the mqtt service | true |
+| enable_mqtt_service | whether to enable the mqtt service | false |
| mqtt_host | the mqtt service binding host | 0.0.0.0 |
| mqtt_port | the mqtt service binding port | 1883 |
| mqtt_handler_pool_size | the handler pool size for handing the mqtt messages | 1 |
@@ -74,7 +74,7 @@ Configurations are as following:
## Examples
The following is an example which a mqtt client send messages to IoTDB server.
- ```java
+```java
MQTT mqtt = new MQTT();
mqtt.setHost("127.0.0.1", 1883);
mqtt.setUserName("root");
@@ -97,4 +97,4 @@ The following is an example which a mqtt client send messages to IoTDB server.
connection.disconnect();
}
- ```
+```
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index b388823..14b36d5 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -459,7 +459,7 @@ memtable_num_in_each_storage_group=10
####################
# whether to enable the mqtt service.
-enable_mqtt_service=true
+enable_mqtt_service=false
# the mqtt service binding host.
mqtt_host=0.0.0.0
diff --git a/server/src/main/java/org/apache/iotdb/db/concurrent/IoTDBThreadPoolFactory.java b/server/src/main/java/org/apache/iotdb/db/concurrent/IoTDBThreadPoolFactory.java
index a5bd99e..41f35e6 100644
--- a/server/src/main/java/org/apache/iotdb/db/concurrent/IoTDBThreadPoolFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/concurrent/IoTDBThreadPoolFactory.java
@@ -23,7 +23,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
-import org.apache.thrift.server.TThreadPoolServer.Args;
+import org.apache.thrift.server.TThreadPoolServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -121,7 +121,7 @@ public class IoTDBThreadPoolFactory {
/**
* function for creating thrift rpc client thread pool.
*/
- public static ExecutorService createThriftRpcClientThreadPool(Args args, String poolName) {
+ public static ExecutorService createThriftRpcClientThreadPool(TThreadPoolServer.Args args, String poolName) {
SynchronousQueue<Runnable> executorQueue = new SynchronousQueue<>();
return new ThreadPoolExecutor(args.minWorkerThreads, args.maxWorkerThreads, args.stopTimeoutVal,
args.stopTimeoutUnit, executorQueue, new IoTThreadFactory(poolName));
@@ -130,7 +130,7 @@ public class IoTDBThreadPoolFactory {
/**
* function for creating thrift rpc client thread pool.
*/
- public static ExecutorService createThriftRpcClientThreadPool(Args args, String poolName,
+ public static ExecutorService createThriftRpcClientThreadPool(TThreadPoolServer.Args args, String poolName,
Thread.UncaughtExceptionHandler handler) {
SynchronousQueue<Runnable> executorQueue = new SynchronousQueue<>();
return new ThreadPoolExecutor(args.minWorkerThreads, args.maxWorkerThreads, args.stopTimeoutVal,
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index a1dee4d..7420be2 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -53,7 +53,7 @@ public class IoTDBConfig {
/**
* whether to enable the mqtt service.
*/
- private boolean enableMQTTService = true;
+ private boolean enableMQTTService = false;
/**
* the mqtt service binding host.
diff --git a/server/src/main/java/org/apache/iotdb/db/service/RPCService.java b/server/src/main/java/org/apache/iotdb/db/service/RPCService.java
index a1aeaa5..52b4b79 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/RPCService.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/RPCService.java
@@ -34,7 +34,6 @@ import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TThreadPoolServer;
-import org.apache.thrift.server.TThreadPoolServer.Args;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
@@ -51,12 +50,12 @@ public class RPCService implements RPCServiceMBean, IService {
private final String mbeanName = String
.format("%s:%s=%s", IoTDBConstant.IOTDB_PACKAGE, IoTDBConstant.JMX_TYPE,
getID().getJmxName());
- private Thread rpcServiceThread;
+ private RPCServiceThread rpcServiceThread;
private TProtocolFactory protocolFactory;
private Processor<TSIService.Iface> processor;
private TThreadPoolServer.Args poolArgs;
private TSServiceImpl impl;
- private CountDownLatch startLatch;
+
private CountDownLatch stopLatch;
private RPCService() {
@@ -68,10 +67,10 @@ public class RPCService implements RPCServiceMBean, IService {
@Override
public String getRPCServiceStatus() {
- if(startLatch == null) {
+ if (rpcServiceThread == null) {
logger.debug("Start latch is null when getting status");
} else {
- logger.debug("Start latch is {} when getting status", startLatch.getCount());
+ logger.debug("Start status is {} when getting status", rpcServiceThread.isServing());
}
if(stopLatch == null) {
logger.debug("Stop latch is null when getting status");
@@ -79,7 +78,7 @@ public class RPCService implements RPCServiceMBean, IService {
logger.debug("Stop latch is {} when getting status", stopLatch.getCount());
}
- if(startLatch != null && startLatch.getCount() == 0) {
+ if(rpcServiceThread != null && rpcServiceThread.isServing()) {
return STATUS_UP;
} else {
return STATUS_DOWN;
@@ -110,6 +109,7 @@ public class RPCService implements RPCServiceMBean, IService {
}
@Override
+ @SuppressWarnings("squid:S2276")
public synchronized void startService() throws StartupException {
if (STATUS_UP.equals(getRPCServiceStatus())) {
logger.info("{}: {} has been already running now", IoTDBConstant.GLOBAL_DB_NAME,
@@ -119,10 +119,13 @@ public class RPCService implements RPCServiceMBean, IService {
logger.info("{}: start {}...", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName());
try {
reset();
- rpcServiceThread = new RPCServiceThread(startLatch, stopLatch);
+ rpcServiceThread = new RPCServiceThread(stopLatch);
rpcServiceThread.setName(ThreadName.RPC_SERVICE.getName());
rpcServiceThread.start();
- startLatch.await();
+ while (!rpcServiceThread.isServing()) {
+ //sleep 100ms for waiting the rpc server start.
+ Thread.sleep(100);
+ }
} catch (InterruptedException | ClassNotFoundException |
IllegalAccessException | InstantiationException e) {
Thread.currentThread().interrupt();
@@ -135,7 +138,7 @@ public class RPCService implements RPCServiceMBean, IService {
}
private void reset() {
- startLatch = new CountDownLatch(1);
+ rpcServiceThread = null;
stopLatch = new CountDownLatch(1);
}
@@ -153,7 +156,7 @@ public class RPCService implements RPCServiceMBean, IService {
}
logger.info("{}: closing {}...", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName());
if (rpcServiceThread != null) {
- ((RPCServiceThread) rpcServiceThread).close();
+ rpcServiceThread.close();
}
try {
stopLatch.await();
@@ -177,10 +180,9 @@ public class RPCService implements RPCServiceMBean, IService {
private TServerSocket serverTransport;
private TServer poolServer;
- private CountDownLatch threadStartLatch;
private CountDownLatch threadStopLatch;
- public RPCServiceThread(CountDownLatch threadStartLatch, CountDownLatch threadStopLatch)
+ public RPCServiceThread(CountDownLatch threadStopLatch)
throws ClassNotFoundException, IllegalAccessException, InstantiationException {
if(IoTDBDescriptor.getInstance().getConfig().isRpcThriftCompressionEnable()) {
protocolFactory = new TCompactProtocol.Factory();
@@ -191,18 +193,22 @@ public class RPCService implements RPCServiceMBean, IService {
IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
impl = (TSServiceImpl) Class.forName(config.getRpcImplClassName()).newInstance();
processor = new TSIService.Processor<>(impl);
- this.threadStartLatch = threadStartLatch;
this.threadStopLatch = threadStopLatch;
}
@SuppressWarnings("squid:S2093") // socket will be used later
@Override
public void run() {
+ logger.info("The RPC service thread begin to run...");
try {
IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
serverTransport = new TServerSocket(new InetSocketAddress(config.getRpcAddress(),
config.getRpcPort()));
- poolArgs = new Args(serverTransport).maxWorkerThreads(IoTDBDescriptor.
+ //this is for testing.
+ if (!serverTransport.getServerSocket().isBound()) {
+ logger.error("The RPC service port is not bound.");
+ }
+ poolArgs = new TThreadPoolServer.Args(serverTransport).maxWorkerThreads(IoTDBDescriptor.
getInstance().getConfig().getRpcMaxConcurrentClientNum()).minWorkerThreads(1)
.stopTimeoutVal(
IoTDBDescriptor.getInstance().getConfig().getThriftServerAwaitTimeForStopService());
@@ -211,7 +217,6 @@ public class RPCService implements RPCServiceMBean, IService {
poolArgs.processor(processor);
poolArgs.protocolFactory(protocolFactory);
poolServer = new TThreadPoolServer(poolArgs);
- poolServer.setServerEventHandler(new RPCServiceEventHandler(impl, threadStartLatch));
poolServer.serve();
} catch (TTransportException e) {
throw new RPCServiceException(String.format("%s: failed to start %s, because ", IoTDBConstant.GLOBAL_DB_NAME,
@@ -236,6 +241,7 @@ public class RPCService implements RPCServiceMBean, IService {
private synchronized void close() {
if (poolServer != null) {
+ poolServer.setShouldStop(true);
poolServer.stop();
poolServer = null;
}
@@ -244,5 +250,12 @@ public class RPCService implements RPCServiceMBean, IService {
serverTransport = null;
}
}
+
+ boolean isServing() {
+ if (poolServer != null) {
+ return poolServer.isServing();
+ }
+ return false;
+ }
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/RPCServiceEventHandler.java b/server/src/main/java/org/apache/iotdb/db/service/RPCServiceEventHandler.java
deleted file mode 100644
index dd88a2e..0000000
--- a/server/src/main/java/org/apache/iotdb/db/service/RPCServiceEventHandler.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.service;
-
-import java.util.concurrent.CountDownLatch;
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.server.ServerContext;
-import org.apache.thrift.server.TServerEventHandler;
-import org.apache.thrift.transport.TTransport;
-
-public class RPCServiceEventHandler implements TServerEventHandler {
-
- private TSServiceImpl serviceImpl;
- private CountDownLatch startLatch;
-
- RPCServiceEventHandler(TSServiceImpl serviceImpl, CountDownLatch startLatch) {
- this.serviceImpl = serviceImpl;
- this.startLatch = startLatch;
- }
-
- @Override
- public ServerContext createContext(TProtocol arg0, TProtocol arg1) {
- return null;
- }
-
- @Override
- public void deleteContext(ServerContext arg0, TProtocol arg1, TProtocol arg2) {
- serviceImpl.handleClientExit();
- }
-
- @Override
- public void preServe() {
- this.startLatch.countDown();
- }
-
- @Override
- public void processContext(ServerContext arg0, TTransport arg1, TTransport arg2) {
- }
-
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java
index 19c4d51..f1fd2f1 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java
@@ -32,7 +32,6 @@ import org.apache.iotdb.db.service.ServiceType;
import org.apache.iotdb.db.sync.receiver.load.FileLoaderManager;
import org.apache.iotdb.db.sync.receiver.recover.SyncReceiverLogAnalyzer;
import org.apache.iotdb.db.sync.receiver.transfer.SyncServiceImpl;
-import org.apache.iotdb.db.sync.thrift.SyncServiceEventHandler;
import org.apache.iotdb.service.sync.thrift.SyncService;
import org.apache.iotdb.service.sync.thrift.SyncService.Processor;
import org.apache.thrift.protocol.TBinaryProtocol;
@@ -54,10 +53,8 @@ public class SyncServerManager implements IService {
private IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig();
- private Thread syncServerThread;
+ private SyncServiceThread syncServerThread;
- //we add this latch for avoiding in some ITs, the syncService is not startup but the IT has finished.
- private CountDownLatch startLatch;
//stopLatch is also for letting the IT know whether the socket is closed.
private CountDownLatch stopLatch;
@@ -87,14 +84,16 @@ public class SyncServerManager implements IService {
"Sync server failed to start because IP white list is null, please set IP white list.");
return;
}
- startLatch = new CountDownLatch(1);
stopLatch = new CountDownLatch(1);
conf.setIpWhiteList(conf.getIpWhiteList().replaceAll(" ", ""));
- syncServerThread = new SyncServiceThread(startLatch, stopLatch);
+ syncServerThread = new SyncServiceThread(stopLatch);
syncServerThread.setName(ThreadName.SYNC_SERVER.getName());
syncServerThread.start();
try {
- startLatch.await();
+ while (!syncServerThread.isServing()) {
+ //sleep 100ms for waiting the sync server start.
+ Thread.sleep(100);
+ }
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new StartupException(this.getID().getName(), e.getMessage());
@@ -109,7 +108,7 @@ public class SyncServerManager implements IService {
public void stop() {
if (conf.isSyncEnable()) {
FileLoaderManager.getInstance().stop();
- ((SyncServiceThread) syncServerThread).close();
+ syncServerThread.close();
try {
stopLatch.await();
} catch (InterruptedException e) {
@@ -136,13 +135,10 @@ public class SyncServerManager implements IService {
private TProtocolFactory protocolFactory;
private Processor<SyncService.Iface> processor;
private TThreadPoolServer.Args poolArgs;
- //we add this latch for avoiding in some ITs, the syncService is not startup but the IT has finished.
- private CountDownLatch threadStartLatch;
private CountDownLatch threadStopLatch;
- public SyncServiceThread(CountDownLatch startLatch, CountDownLatch stopLatch) {
+ public SyncServiceThread(CountDownLatch stopLatch) {
processor = new SyncService.Processor<>(new SyncServiceImpl());
- this.threadStartLatch = startLatch;
this.threadStopLatch = stopLatch;
}
@@ -164,7 +160,6 @@ public class SyncServerManager implements IService {
poolArgs.protocolFactory(protocolFactory);
poolArgs.processor(processor);
poolServer = new TThreadPoolServer(poolArgs);
- poolServer.setServerEventHandler(new SyncServiceEventHandler(threadStartLatch));
poolServer.serve();
} catch (TTransportException e) {
logger.error("{}: failed to start {}, because ", IoTDBConstant.GLOBAL_DB_NAME,
@@ -192,5 +187,12 @@ public class SyncServerManager implements IService {
serverTransport = null;
}
}
+
+ boolean isServing() {
+ if (poolServer != null) {
+ return poolServer.isServing();
+ }
+ return false;
+ }
}
}
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/thrift/SyncServiceEventHandler.java b/server/src/main/java/org/apache/iotdb/db/sync/thrift/SyncServiceEventHandler.java
deleted file mode 100644
index 25469ab..0000000
--- a/server/src/main/java/org/apache/iotdb/db/sync/thrift/SyncServiceEventHandler.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.sync.thrift;
-
-import java.util.concurrent.CountDownLatch;
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.server.ServerContext;
-import org.apache.thrift.server.TServerEventHandler;
-import org.apache.thrift.transport.TTransport;
-
-public class SyncServiceEventHandler implements TServerEventHandler {
-
- private CountDownLatch startLatch;
-
- public SyncServiceEventHandler(CountDownLatch startLatch) {
- this.startLatch = startLatch;
- }
-
- @Override
- public void preServe() {
- startLatch.countDown();
- }
-
- @Override
- public ServerContext createContext(TProtocol input, TProtocol output) {
- return null;
- }
-
- @Override
- public void deleteContext(ServerContext serverContext, TProtocol input, TProtocol output) {
-
- }
-
- @Override
- public void processContext(ServerContext serverContext, TTransport inputTransport,
- TTransport outputTransport) {
-
- }
-}
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/MManagerImproveTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/MManagerImproveTest.java
index bfccd56..33e0e61 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/MManagerImproveTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/MManagerImproveTest.java
@@ -62,10 +62,6 @@ public class MManagerImproveTest {
}
- @After
- public void after() throws IOException, StorageEngineException {
- EnvironmentUtils.cleanEnv();
- }
@Test
public void checkSetUp() {
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/MTreeTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/MTreeTest.java
index 771c00b..71afbea 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/MTreeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/MTreeTest.java
@@ -36,9 +36,21 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class MTreeTest {
+ @Before
+ public void setUp() throws Exception {
+ EnvironmentUtils.envSetUp();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ EnvironmentUtils.cleanEnv();
+ }
+
@Test
public void testAddLeftNodePath() throws MetadataException {
MTree root = new MTree();
diff --git a/server/src/test/resources/logback.xml b/server/src/test/resources/logback.xml
index 8128729..dcfa69e 100644
--- a/server/src/test/resources/logback.xml
+++ b/server/src/test/resources/logback.xml
@@ -45,6 +45,11 @@
<logger name="org.apache.iotdb.db.service.MetricsService" level="INFO"/>
<logger name="org.apache.iotdb.db.engine.flush.FlushManager" level="INFO"/>
<logger name="org.apache.iotdb.db.integration.IoTDBMergeTest" level="INFO"/>
+ <logger name="org.apache.iotdb.db.service.IoTDB" level="INFO"/>
+ <logger name="org.apache.iotdb.db.service.RPCService" level="INFO"/>
+ <logger name="org.apache.iotdb.db.service.MQTTService" level="INFO"/>
+ <logger name="org.apache.iotdb.db.sync.receiver.SyncServerManager" level="INFO"/>
+
<root level="WARN">
<appender-ref ref="stdout"/>
</root>