You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/04/17 03:25:41 UTC

[incubator-iotdb] 01/01: fix sync thread bug

This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch fix_syncservice_bug
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit d6874d52a15202d5a2cfa583a787b4b767592908
Author: lta <li...@163.com>
AuthorDate: Wed Apr 17 11:25:07 2019 +0800

    fix sync thread bug
---
 .../java/org/apache/iotdb/db/service/IoTDB.java    |  5 +-
 .../org/apache/iotdb/db/service/JDBCService.java   |  4 +-
 .../iotdb/db/service/JDBCServiceEventHandler.java  |  2 +-
 .../receiver/SyncServiceEventHandler.java}         | 39 ++++--------
 ...cServerManager.java => SyncServiceManager.java} | 69 ++++++++++++++++++----
 5 files changed, 75 insertions(+), 44 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/service/IoTDB.java b/iotdb/src/main/java/org/apache/iotdb/db/service/IoTDB.java
index 6ea138b..d9a1938 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/service/IoTDB.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/service/IoTDB.java
@@ -33,8 +33,7 @@ import org.apache.iotdb.db.exception.StartupException;
 import org.apache.iotdb.db.exception.builder.ExceptionBuilder;
 import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.monitor.StatMonitor;
-import org.apache.iotdb.db.query.control.FileReaderManager;
-import org.apache.iotdb.db.sync.receiver.SyncServerManager;
+import org.apache.iotdb.db.sync.receiver.SyncServiceManager;
 import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
 import org.apache.iotdb.db.writelog.manager.WriteLogNodeManager;
 import org.slf4j.Logger;
@@ -107,7 +106,7 @@ public class IoTDB implements IoTDBMBean {
     registerManager.register(CloseMergeService.getInstance());
     registerManager.register(StatMonitor.getInstance());
     registerManager.register(BasicMemController.getInstance());
-    registerManager.register(SyncServerManager.getInstance());
+    registerManager.register(SyncServiceManager.getInstance());
 
     JMXService.registerMBean(getInstance(), mbeanName);
 
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/service/JDBCService.java b/iotdb/src/main/java/org/apache/iotdb/db/service/JDBCService.java
index ea78d5d..d9ccecf 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/service/JDBCService.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/service/JDBCService.java
@@ -218,9 +218,9 @@ public class JDBCService implements JDBCServiceMBean, IService {
         close();
         // TODO debug log, will be deleted in production env
         if(threadStopLatch == null) {
-          LOGGER.info("Stop Count Down latch is null");
+          LOGGER.info("JDBC Service Stop Count Down latch is null");
         } else {
-          LOGGER.info("Stop Count Down latch is {}", threadStopLatch.getCount());
+          LOGGER.info("JDBC Service Stop Count Down latch is {}", threadStopLatch.getCount());
         }
         // debug log, will be deleted in production env
 
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/service/JDBCServiceEventHandler.java b/iotdb/src/main/java/org/apache/iotdb/db/service/JDBCServiceEventHandler.java
index b39632f..b47a166 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/service/JDBCServiceEventHandler.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/service/JDBCServiceEventHandler.java
@@ -32,7 +32,7 @@ public class JDBCServiceEventHandler implements TServerEventHandler {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(JDBCServiceEventHandler.class);
   private TSServiceImpl serviceImpl;
-  CountDownLatch startLatch;
+  private CountDownLatch startLatch;
 
   public JDBCServiceEventHandler(TSServiceImpl serviceImpl, CountDownLatch startLatch) {
     this.serviceImpl = serviceImpl;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/service/JDBCServiceEventHandler.java b/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServiceEventHandler.java
similarity index 55%
copy from iotdb/src/main/java/org/apache/iotdb/db/service/JDBCServiceEventHandler.java
copy to iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServiceEventHandler.java
index b39632f..cb77a20 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/service/JDBCServiceEventHandler.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServiceEventHandler.java
@@ -16,53 +16,40 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.service;
+package org.apache.iotdb.db.sync.receiver;
 
 import java.util.concurrent.CountDownLatch;
-
-import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TProtocol;
 import org.apache.thrift.server.ServerContext;
 import org.apache.thrift.server.TServerEventHandler;
 import org.apache.thrift.transport.TTransport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-public class JDBCServiceEventHandler implements TServerEventHandler {
+public class SyncServiceEventHandler implements TServerEventHandler {
 
-  private static final Logger LOGGER = LoggerFactory.getLogger(JDBCServiceEventHandler.class);
-  private TSServiceImpl serviceImpl;
-  CountDownLatch startLatch;
+  private CountDownLatch startLatch;
 
-  public JDBCServiceEventHandler(TSServiceImpl serviceImpl, CountDownLatch startLatch) {
-    this.serviceImpl = serviceImpl;
+  public SyncServiceEventHandler(CountDownLatch startLatch) {
     this.startLatch = startLatch;
   }
 
   @Override
-  public ServerContext createContext(TProtocol arg0, TProtocol arg1) {
-    // TODO Auto-generated method stub
-    return null;
+  public void preServe() {
+    startLatch.countDown();
   }
 
   @Override
-  public void deleteContext(ServerContext arg0, TProtocol arg1, TProtocol arg2) {
-    try {
-      serviceImpl.handleClientExit();
-    } catch (TException e) {
-      LOGGER.error("failed to clear client status", e);
-    }
+  public ServerContext createContext(TProtocol input, TProtocol output) {
+    return null;
   }
 
   @Override
-  public void preServe() {
-    this.startLatch.countDown();
+  public void deleteContext(ServerContext serverContext, TProtocol input, TProtocol output) {
+    // Do nothing.
   }
 
   @Override
-  public void processContext(ServerContext arg0, TTransport arg1, TTransport arg2) {
-    // TODO Auto-generated method stub
-
+  public void processContext(ServerContext serverContext, TTransport inputTransport,
+      TTransport outputTransport) {
+    // Do nothing.
   }
-
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java b/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServiceManager.java
similarity index 62%
rename from iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java
rename to iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServiceManager.java
index 5348668..c0c1c00 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServiceManager.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.db.sync.receiver;
 
 import java.net.InetSocketAddress;
+import java.util.concurrent.CountDownLatch;
 import org.apache.iotdb.db.concurrent.ThreadName;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBConstant;
@@ -40,16 +41,18 @@ import org.slf4j.LoggerFactory;
 /**
  * sync receiver server.
  */
-public class SyncServerManager implements IService {
+public class SyncServiceManager implements IService {
 
-  private static final Logger LOGGER = LoggerFactory.getLogger(SyncServerManager.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(SyncServiceManager.class);
   private Thread syncServerThread;
   private IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig();
+  private CountDownLatch startLatch;
+  private CountDownLatch stopLatch;
 
-  private SyncServerManager() {
+  private SyncServiceManager() {
   }
 
-  public static final SyncServerManager getInstance() {
+  public static final SyncServiceManager getInstance() {
     return ServerManagerHolder.INSTANCE;
   }
 
@@ -67,10 +70,27 @@ public class SyncServerManager implements IService {
       return;
     }
     conf.setIpWhiteList(conf.getIpWhiteList().replaceAll(" ", ""));
-    syncServerThread = new SyncServiceThread();
-    syncServerThread.setName(ThreadName.SYNC_SERVER.getName());
-    syncServerThread.start();
-    LOGGER.info("Sync server has started.");
+    resetLatch();
+    try {
+      syncServerThread = new SyncServiceThread(startLatch, stopLatch);
+      syncServerThread.setName(ThreadName.SYNC_SERVER.getName());
+      syncServerThread.start();
+      startLatch.await();
+    } catch (InterruptedException e) {
+      String errorMessage = String
+          .format("Failed to start %s because of %s", this.getID().getName(),
+              e.getMessage());
+      LOGGER.error(errorMessage);
+      throw new StartupException(errorMessage);
+    }
+    LOGGER
+        .info("{}: start {} successfully, listening on ip {} port {}", IoTDBConstant.GLOBAL_DB_NAME,
+            this.getID().getName(), conf.getRpcAddress(), conf.getSyncServerPort());
+  }
+
+  private void resetLatch(){
+    startLatch = new CountDownLatch(1);
+    stopLatch = new CountDownLatch(1);
   }
 
   /**
@@ -78,9 +98,21 @@ public class SyncServerManager implements IService {
    */
   @Override
   public void stop() {
-    if (conf.isSyncEnable()) {
+    if (!conf.isSyncEnable()) {
+      return;
+    }
+    ((SyncServiceThread) syncServerThread).close();
+    LOGGER.info("{}: closing {}...", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName());
+    if (syncServerThread != null) {
       ((SyncServiceThread) syncServerThread).close();
     }
+    try {
+      stopLatch.await();
+      resetLatch();
+      LOGGER.info("{}: close {} successfully", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName());
+    } catch (InterruptedException e) {
+      LOGGER.error("{}: close {} failed because {}", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName(), e);
+    }
   }
 
   @Override
@@ -90,7 +122,7 @@ public class SyncServerManager implements IService {
 
   private static class ServerManagerHolder {
 
-    private static final SyncServerManager INSTANCE = new SyncServerManager();
+    private static final SyncServiceManager INSTANCE = new SyncServiceManager();
   }
 
   private class SyncServiceThread extends Thread {
@@ -100,9 +132,13 @@ public class SyncServerManager implements IService {
     private Factory protocolFactory;
     private Processor<SyncService.Iface> processor;
     private TThreadPoolServer.Args poolArgs;
+    private CountDownLatch threadStartLatch;
+    private CountDownLatch threadStopLatch;
 
-    public SyncServiceThread() {
-      processor = new SyncService.Processor<>(new SyncServiceImpl());
+    public SyncServiceThread(CountDownLatch threadStartLatch, CountDownLatch threadStopLatch) {
+      this.processor = new SyncService.Processor<>(new SyncServiceImpl());
+      this.threadStartLatch = threadStartLatch;
+      this.threadStopLatch = threadStopLatch;
     }
 
     @Override
@@ -116,6 +152,7 @@ public class SyncServerManager implements IService {
         poolArgs.processor(processor);
         poolArgs.protocolFactory(protocolFactory);
         poolServer = new TThreadPoolServer(poolArgs);
+        poolServer.setServerEventHandler(new SyncServiceEventHandler(threadStartLatch));
         poolServer.serve();
       } catch (TTransportException e) {
         LOGGER.error("{}: failed to start {}, because ", IoTDBConstant.GLOBAL_DB_NAME,
@@ -124,6 +161,14 @@ public class SyncServerManager implements IService {
         LOGGER.error("{}: {} exit, because ", IoTDBConstant.GLOBAL_DB_NAME, getID().getName(), e);
       } finally {
         close();
+        if(threadStopLatch == null) {
+          LOGGER.info("Sync Service Stop Count Down latch is null");
+        } else {
+          LOGGER.info("Sync Service Stop Count Down latch is {}", threadStopLatch.getCount());
+        }
+        if (threadStopLatch != null && threadStopLatch.getCount() == 1) {
+          threadStopLatch.countDown();
+        }
         LOGGER.info("{}: close TThreadPoolServer and TServerSocket for {}",
             IoTDBConstant.GLOBAL_DB_NAME, getID().getName());
       }