You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/04/17 03:25:41 UTC
[incubator-iotdb] 01/01: fix sync thread bug
This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch fix_syncservice_bug
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit d6874d52a15202d5a2cfa583a787b4b767592908
Author: lta <li...@163.com>
AuthorDate: Wed Apr 17 11:25:07 2019 +0800
fix sync thread bug
---
.../java/org/apache/iotdb/db/service/IoTDB.java | 5 +-
.../org/apache/iotdb/db/service/JDBCService.java | 4 +-
.../iotdb/db/service/JDBCServiceEventHandler.java | 2 +-
.../receiver/SyncServiceEventHandler.java} | 39 ++++--------
...cServerManager.java => SyncServiceManager.java} | 69 ++++++++++++++++++----
5 files changed, 75 insertions(+), 44 deletions(-)
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/service/IoTDB.java b/iotdb/src/main/java/org/apache/iotdb/db/service/IoTDB.java
index 6ea138b..d9a1938 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/service/IoTDB.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/service/IoTDB.java
@@ -33,8 +33,7 @@ import org.apache.iotdb.db.exception.StartupException;
import org.apache.iotdb.db.exception.builder.ExceptionBuilder;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.monitor.StatMonitor;
-import org.apache.iotdb.db.query.control.FileReaderManager;
-import org.apache.iotdb.db.sync.receiver.SyncServerManager;
+import org.apache.iotdb.db.sync.receiver.SyncServiceManager;
import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
import org.apache.iotdb.db.writelog.manager.WriteLogNodeManager;
import org.slf4j.Logger;
@@ -107,7 +106,7 @@ public class IoTDB implements IoTDBMBean {
registerManager.register(CloseMergeService.getInstance());
registerManager.register(StatMonitor.getInstance());
registerManager.register(BasicMemController.getInstance());
- registerManager.register(SyncServerManager.getInstance());
+ registerManager.register(SyncServiceManager.getInstance());
JMXService.registerMBean(getInstance(), mbeanName);
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/service/JDBCService.java b/iotdb/src/main/java/org/apache/iotdb/db/service/JDBCService.java
index ea78d5d..d9ccecf 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/service/JDBCService.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/service/JDBCService.java
@@ -218,9 +218,9 @@ public class JDBCService implements JDBCServiceMBean, IService {
close();
// TODO debug log, will be deleted in production env
if(threadStopLatch == null) {
- LOGGER.info("Stop Count Down latch is null");
+ LOGGER.info("JDBC Service Stop Count Down latch is null");
} else {
- LOGGER.info("Stop Count Down latch is {}", threadStopLatch.getCount());
+ LOGGER.info("JDBC Service Stop Count Down latch is {}", threadStopLatch.getCount());
}
// debug log, will be deleted in production env
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/service/JDBCServiceEventHandler.java b/iotdb/src/main/java/org/apache/iotdb/db/service/JDBCServiceEventHandler.java
index b39632f..b47a166 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/service/JDBCServiceEventHandler.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/service/JDBCServiceEventHandler.java
@@ -32,7 +32,7 @@ public class JDBCServiceEventHandler implements TServerEventHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(JDBCServiceEventHandler.class);
private TSServiceImpl serviceImpl;
- CountDownLatch startLatch;
+ private CountDownLatch startLatch;
public JDBCServiceEventHandler(TSServiceImpl serviceImpl, CountDownLatch startLatch) {
this.serviceImpl = serviceImpl;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/service/JDBCServiceEventHandler.java b/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServiceEventHandler.java
similarity index 55%
copy from iotdb/src/main/java/org/apache/iotdb/db/service/JDBCServiceEventHandler.java
copy to iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServiceEventHandler.java
index b39632f..cb77a20 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/service/JDBCServiceEventHandler.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServiceEventHandler.java
@@ -16,53 +16,40 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.service;
+package org.apache.iotdb.db.sync.receiver;
import java.util.concurrent.CountDownLatch;
-
-import org.apache.thrift.TException;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.server.ServerContext;
import org.apache.thrift.server.TServerEventHandler;
import org.apache.thrift.transport.TTransport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-public class JDBCServiceEventHandler implements TServerEventHandler {
+public class SyncServiceEventHandler implements TServerEventHandler {
- private static final Logger LOGGER = LoggerFactory.getLogger(JDBCServiceEventHandler.class);
- private TSServiceImpl serviceImpl;
- CountDownLatch startLatch;
+ private CountDownLatch startLatch;
- public JDBCServiceEventHandler(TSServiceImpl serviceImpl, CountDownLatch startLatch) {
- this.serviceImpl = serviceImpl;
+ public SyncServiceEventHandler(CountDownLatch startLatch) {
this.startLatch = startLatch;
}
@Override
- public ServerContext createContext(TProtocol arg0, TProtocol arg1) {
- // TODO Auto-generated method stub
- return null;
+ public void preServe() {
+ startLatch.countDown();
}
@Override
- public void deleteContext(ServerContext arg0, TProtocol arg1, TProtocol arg2) {
- try {
- serviceImpl.handleClientExit();
- } catch (TException e) {
- LOGGER.error("failed to clear client status", e);
- }
+ public ServerContext createContext(TProtocol input, TProtocol output) {
+ return null;
}
@Override
- public void preServe() {
- this.startLatch.countDown();
+ public void deleteContext(ServerContext serverContext, TProtocol input, TProtocol output) {
+ // Do nothing.
}
@Override
- public void processContext(ServerContext arg0, TTransport arg1, TTransport arg2) {
- // TODO Auto-generated method stub
-
+ public void processContext(ServerContext serverContext, TTransport inputTransport,
+ TTransport outputTransport) {
+ // Do nothing.
}
-
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java b/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServiceManager.java
similarity index 62%
rename from iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java
rename to iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServiceManager.java
index 5348668..c0c1c00 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServiceManager.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.sync.receiver;
import java.net.InetSocketAddress;
+import java.util.concurrent.CountDownLatch;
import org.apache.iotdb.db.concurrent.ThreadName;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBConstant;
@@ -40,16 +41,18 @@ import org.slf4j.LoggerFactory;
/**
* sync receiver server.
*/
-public class SyncServerManager implements IService {
+public class SyncServiceManager implements IService {
- private static final Logger LOGGER = LoggerFactory.getLogger(SyncServerManager.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(SyncServiceManager.class);
private Thread syncServerThread;
private IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig();
+ private CountDownLatch startLatch;
+ private CountDownLatch stopLatch;
- private SyncServerManager() {
+ private SyncServiceManager() {
}
- public static final SyncServerManager getInstance() {
+ public static final SyncServiceManager getInstance() {
return ServerManagerHolder.INSTANCE;
}
@@ -67,10 +70,27 @@ public class SyncServerManager implements IService {
return;
}
conf.setIpWhiteList(conf.getIpWhiteList().replaceAll(" ", ""));
- syncServerThread = new SyncServiceThread();
- syncServerThread.setName(ThreadName.SYNC_SERVER.getName());
- syncServerThread.start();
- LOGGER.info("Sync server has started.");
+ resetLatch();
+ try {
+ syncServerThread = new SyncServiceThread(startLatch, stopLatch);
+ syncServerThread.setName(ThreadName.SYNC_SERVER.getName());
+ syncServerThread.start();
+ startLatch.await();
+ } catch (InterruptedException e) {
+ String errorMessage = String
+ .format("Failed to start %s because of %s", this.getID().getName(),
+ e.getMessage());
+ LOGGER.error(errorMessage);
+ throw new StartupException(errorMessage);
+ }
+ LOGGER
+ .info("{}: start {} successfully, listening on ip {} port {}", IoTDBConstant.GLOBAL_DB_NAME,
+ this.getID().getName(), conf.getRpcAddress(), conf.getSyncServerPort());
+ }
+
+ private void resetLatch(){
+ startLatch = new CountDownLatch(1);
+ stopLatch = new CountDownLatch(1);
}
/**
@@ -78,9 +98,21 @@ public class SyncServerManager implements IService {
*/
@Override
public void stop() {
- if (conf.isSyncEnable()) {
+ if (!conf.isSyncEnable()) {
+ return;
+ }
+ ((SyncServiceThread) syncServerThread).close();
+ LOGGER.info("{}: closing {}...", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName());
+ if (syncServerThread != null) {
((SyncServiceThread) syncServerThread).close();
}
+ try {
+ stopLatch.await();
+ resetLatch();
+ LOGGER.info("{}: close {} successfully", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName());
+ } catch (InterruptedException e) {
+ LOGGER.error("{}: close {} failed because {}", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName(), e);
+ }
}
@Override
@@ -90,7 +122,7 @@ public class SyncServerManager implements IService {
private static class ServerManagerHolder {
- private static final SyncServerManager INSTANCE = new SyncServerManager();
+ private static final SyncServiceManager INSTANCE = new SyncServiceManager();
}
private class SyncServiceThread extends Thread {
@@ -100,9 +132,13 @@ public class SyncServerManager implements IService {
private Factory protocolFactory;
private Processor<SyncService.Iface> processor;
private TThreadPoolServer.Args poolArgs;
+ private CountDownLatch threadStartLatch;
+ private CountDownLatch threadStopLatch;
- public SyncServiceThread() {
- processor = new SyncService.Processor<>(new SyncServiceImpl());
+ public SyncServiceThread(CountDownLatch threadStartLatch, CountDownLatch threadStopLatch) {
+ this.processor = new SyncService.Processor<>(new SyncServiceImpl());
+ this.threadStartLatch = threadStartLatch;
+ this.threadStopLatch = threadStopLatch;
}
@Override
@@ -116,6 +152,7 @@ public class SyncServerManager implements IService {
poolArgs.processor(processor);
poolArgs.protocolFactory(protocolFactory);
poolServer = new TThreadPoolServer(poolArgs);
+ poolServer.setServerEventHandler(new SyncServiceEventHandler(threadStartLatch));
poolServer.serve();
} catch (TTransportException e) {
LOGGER.error("{}: failed to start {}, because ", IoTDBConstant.GLOBAL_DB_NAME,
@@ -124,6 +161,14 @@ public class SyncServerManager implements IService {
LOGGER.error("{}: {} exit, because ", IoTDBConstant.GLOBAL_DB_NAME, getID().getName(), e);
} finally {
close();
+ if(threadStopLatch == null) {
+ LOGGER.info("Sync Service Stop Count Down latch is null");
+ } else {
+ LOGGER.info("Sync Service Stop Count Down latch is {}", threadStopLatch.getCount());
+ }
+ if (threadStopLatch != null && threadStopLatch.getCount() == 1) {
+ threadStopLatch.countDown();
+ }
LOGGER.info("{}: close TThreadPoolServer and TServerSocket for {}",
IoTDBConstant.GLOBAL_DB_NAME, getID().getName());
}