You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/04/17 03:25:40 UTC

[incubator-iotdb] branch fix_syncservice_bug created (now d6874d5)

This is an automated email from the ASF dual-hosted git repository.

lta pushed a change to branch fix_syncservice_bug
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git.


      at d6874d5  fix sync thread bug

This branch includes the following new commits:

     new d6874d5  fix sync thread bug

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-iotdb] 01/01: fix sync thread bug

Posted by lt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch fix_syncservice_bug
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit d6874d52a15202d5a2cfa583a787b4b767592908
Author: lta <li...@163.com>
AuthorDate: Wed Apr 17 11:25:07 2019 +0800

    fix sync thread bug
---
 .../java/org/apache/iotdb/db/service/IoTDB.java    |  5 +-
 .../org/apache/iotdb/db/service/JDBCService.java   |  4 +-
 .../iotdb/db/service/JDBCServiceEventHandler.java  |  2 +-
 .../receiver/SyncServiceEventHandler.java}         | 39 ++++--------
 ...cServerManager.java => SyncServiceManager.java} | 69 ++++++++++++++++++----
 5 files changed, 75 insertions(+), 44 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/service/IoTDB.java b/iotdb/src/main/java/org/apache/iotdb/db/service/IoTDB.java
index 6ea138b..d9a1938 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/service/IoTDB.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/service/IoTDB.java
@@ -33,8 +33,7 @@ import org.apache.iotdb.db.exception.StartupException;
 import org.apache.iotdb.db.exception.builder.ExceptionBuilder;
 import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.monitor.StatMonitor;
-import org.apache.iotdb.db.query.control.FileReaderManager;
-import org.apache.iotdb.db.sync.receiver.SyncServerManager;
+import org.apache.iotdb.db.sync.receiver.SyncServiceManager;
 import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
 import org.apache.iotdb.db.writelog.manager.WriteLogNodeManager;
 import org.slf4j.Logger;
@@ -107,7 +106,7 @@ public class IoTDB implements IoTDBMBean {
     registerManager.register(CloseMergeService.getInstance());
     registerManager.register(StatMonitor.getInstance());
     registerManager.register(BasicMemController.getInstance());
-    registerManager.register(SyncServerManager.getInstance());
+    registerManager.register(SyncServiceManager.getInstance());
 
     JMXService.registerMBean(getInstance(), mbeanName);
 
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/service/JDBCService.java b/iotdb/src/main/java/org/apache/iotdb/db/service/JDBCService.java
index ea78d5d..d9ccecf 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/service/JDBCService.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/service/JDBCService.java
@@ -218,9 +218,9 @@ public class JDBCService implements JDBCServiceMBean, IService {
         close();
         // TODO debug log, will be deleted in production env
         if(threadStopLatch == null) {
-          LOGGER.info("Stop Count Down latch is null");
+          LOGGER.info("JDBC Service Stop Count Down latch is null");
         } else {
-          LOGGER.info("Stop Count Down latch is {}", threadStopLatch.getCount());
+          LOGGER.info("JDBC Service Stop Count Down latch is {}", threadStopLatch.getCount());
         }
         // debug log, will be deleted in production env
 
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/service/JDBCServiceEventHandler.java b/iotdb/src/main/java/org/apache/iotdb/db/service/JDBCServiceEventHandler.java
index b39632f..b47a166 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/service/JDBCServiceEventHandler.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/service/JDBCServiceEventHandler.java
@@ -32,7 +32,7 @@ public class JDBCServiceEventHandler implements TServerEventHandler {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(JDBCServiceEventHandler.class);
   private TSServiceImpl serviceImpl;
-  CountDownLatch startLatch;
+  private CountDownLatch startLatch;
 
   public JDBCServiceEventHandler(TSServiceImpl serviceImpl, CountDownLatch startLatch) {
     this.serviceImpl = serviceImpl;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/service/JDBCServiceEventHandler.java b/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServiceEventHandler.java
similarity index 55%
copy from iotdb/src/main/java/org/apache/iotdb/db/service/JDBCServiceEventHandler.java
copy to iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServiceEventHandler.java
index b39632f..cb77a20 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/service/JDBCServiceEventHandler.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServiceEventHandler.java
@@ -16,53 +16,40 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.service;
+package org.apache.iotdb.db.sync.receiver;
 
 import java.util.concurrent.CountDownLatch;
-
-import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TProtocol;
 import org.apache.thrift.server.ServerContext;
 import org.apache.thrift.server.TServerEventHandler;
 import org.apache.thrift.transport.TTransport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-public class JDBCServiceEventHandler implements TServerEventHandler {
+public class SyncServiceEventHandler implements TServerEventHandler {
 
-  private static final Logger LOGGER = LoggerFactory.getLogger(JDBCServiceEventHandler.class);
-  private TSServiceImpl serviceImpl;
-  CountDownLatch startLatch;
+  private CountDownLatch startLatch;
 
-  public JDBCServiceEventHandler(TSServiceImpl serviceImpl, CountDownLatch startLatch) {
-    this.serviceImpl = serviceImpl;
+  public SyncServiceEventHandler(CountDownLatch startLatch) {
     this.startLatch = startLatch;
   }
 
   @Override
-  public ServerContext createContext(TProtocol arg0, TProtocol arg1) {
-    // TODO Auto-generated method stub
-    return null;
+  public void preServe() {
+    startLatch.countDown();
   }
 
   @Override
-  public void deleteContext(ServerContext arg0, TProtocol arg1, TProtocol arg2) {
-    try {
-      serviceImpl.handleClientExit();
-    } catch (TException e) {
-      LOGGER.error("failed to clear client status", e);
-    }
+  public ServerContext createContext(TProtocol input, TProtocol output) {
+    return null;
   }
 
   @Override
-  public void preServe() {
-    this.startLatch.countDown();
+  public void deleteContext(ServerContext serverContext, TProtocol input, TProtocol output) {
+    // Do nothing.
   }
 
   @Override
-  public void processContext(ServerContext arg0, TTransport arg1, TTransport arg2) {
-    // TODO Auto-generated method stub
-
+  public void processContext(ServerContext serverContext, TTransport inputTransport,
+      TTransport outputTransport) {
+    // Do nothing.
   }
-
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java b/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServiceManager.java
similarity index 62%
rename from iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java
rename to iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServiceManager.java
index 5348668..c0c1c00 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServiceManager.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.db.sync.receiver;
 
 import java.net.InetSocketAddress;
+import java.util.concurrent.CountDownLatch;
 import org.apache.iotdb.db.concurrent.ThreadName;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBConstant;
@@ -40,16 +41,18 @@ import org.slf4j.LoggerFactory;
 /**
  * sync receiver server.
  */
-public class SyncServerManager implements IService {
+public class SyncServiceManager implements IService {
 
-  private static final Logger LOGGER = LoggerFactory.getLogger(SyncServerManager.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(SyncServiceManager.class);
   private Thread syncServerThread;
   private IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig();
+  private CountDownLatch startLatch;
+  private CountDownLatch stopLatch;
 
-  private SyncServerManager() {
+  private SyncServiceManager() {
   }
 
-  public static final SyncServerManager getInstance() {
+  public static final SyncServiceManager getInstance() {
     return ServerManagerHolder.INSTANCE;
   }
 
@@ -67,10 +70,27 @@ public class SyncServerManager implements IService {
       return;
     }
     conf.setIpWhiteList(conf.getIpWhiteList().replaceAll(" ", ""));
-    syncServerThread = new SyncServiceThread();
-    syncServerThread.setName(ThreadName.SYNC_SERVER.getName());
-    syncServerThread.start();
-    LOGGER.info("Sync server has started.");
+    resetLatch();
+    try {
+      syncServerThread = new SyncServiceThread(startLatch, stopLatch);
+      syncServerThread.setName(ThreadName.SYNC_SERVER.getName());
+      syncServerThread.start();
+      startLatch.await();
+    } catch (InterruptedException e) {
+      String errorMessage = String
+          .format("Failed to start %s because of %s", this.getID().getName(),
+              e.getMessage());
+      LOGGER.error(errorMessage);
+      throw new StartupException(errorMessage);
+    }
+    LOGGER
+        .info("{}: start {} successfully, listening on ip {} port {}", IoTDBConstant.GLOBAL_DB_NAME,
+            this.getID().getName(), conf.getRpcAddress(), conf.getSyncServerPort());
+  }
+
+  private void resetLatch(){
+    startLatch = new CountDownLatch(1);
+    stopLatch = new CountDownLatch(1);
   }
 
   /**
@@ -78,9 +98,21 @@ public class SyncServerManager implements IService {
    */
   @Override
   public void stop() {
-    if (conf.isSyncEnable()) {
+    if (!conf.isSyncEnable()) {
+      return;
+    }
+    ((SyncServiceThread) syncServerThread).close();
+    LOGGER.info("{}: closing {}...", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName());
+    if (syncServerThread != null) {
       ((SyncServiceThread) syncServerThread).close();
     }
+    try {
+      stopLatch.await();
+      resetLatch();
+      LOGGER.info("{}: close {} successfully", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName());
+    } catch (InterruptedException e) {
+      LOGGER.error("{}: close {} failed because {}", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName(), e);
+    }
   }
 
   @Override
@@ -90,7 +122,7 @@ public class SyncServerManager implements IService {
 
   private static class ServerManagerHolder {
 
-    private static final SyncServerManager INSTANCE = new SyncServerManager();
+    private static final SyncServiceManager INSTANCE = new SyncServiceManager();
   }
 
   private class SyncServiceThread extends Thread {
@@ -100,9 +132,13 @@ public class SyncServerManager implements IService {
     private Factory protocolFactory;
     private Processor<SyncService.Iface> processor;
     private TThreadPoolServer.Args poolArgs;
+    private CountDownLatch threadStartLatch;
+    private CountDownLatch threadStopLatch;
 
-    public SyncServiceThread() {
-      processor = new SyncService.Processor<>(new SyncServiceImpl());
+    public SyncServiceThread(CountDownLatch threadStartLatch, CountDownLatch threadStopLatch) {
+      this.processor = new SyncService.Processor<>(new SyncServiceImpl());
+      this.threadStartLatch = threadStartLatch;
+      this.threadStopLatch = threadStopLatch;
     }
 
     @Override
@@ -116,6 +152,7 @@ public class SyncServerManager implements IService {
         poolArgs.processor(processor);
         poolArgs.protocolFactory(protocolFactory);
         poolServer = new TThreadPoolServer(poolArgs);
+        poolServer.setServerEventHandler(new SyncServiceEventHandler(threadStartLatch));
         poolServer.serve();
       } catch (TTransportException e) {
         LOGGER.error("{}: failed to start {}, because ", IoTDBConstant.GLOBAL_DB_NAME,
@@ -124,6 +161,14 @@ public class SyncServerManager implements IService {
         LOGGER.error("{}: {} exit, because ", IoTDBConstant.GLOBAL_DB_NAME, getID().getName(), e);
       } finally {
         close();
+        if(threadStopLatch == null) {
+          LOGGER.info("Sync Service Stop Count Down latch is null");
+        } else {
+          LOGGER.info("Sync Service Stop Count Down latch is {}", threadStopLatch.getCount());
+        }
+        if (threadStopLatch != null && threadStopLatch.getCount() == 1) {
+          threadStopLatch.countDown();
+        }
         LOGGER.info("{}: close TThreadPoolServer and TServerSocket for {}",
             IoTDBConstant.GLOBAL_DB_NAME, getID().getName());
       }