You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2020/07/02 09:11:55 UTC

[incubator-iotdb] 01/01: refactor the thrift rpc service to reduce duplicate codes of RPC and Sync module

This is an automated email from the ASF dual-hosted git repository.

hxd pushed a commit to branch refactor_thrift_rpc_service
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit df2fde7595d195073522602b6607f54493d626f9
Author: xiangdong huang <sa...@gmail.com>
AuthorDate: Thu Jul 2 17:11:19 2020 +0800

    refactor the thrift rpc service to reduce duplicate codes of RPC and Sync module
---
 .../org/apache/iotdb/db/service/RPCService.java    | 218 ++++-----------------
 .../iotdb/db/service/thrift/ThriftService.java     | 160 +++++++++++++++
 .../db/service/thrift/ThriftServiceThread.java     | 149 ++++++++++++++
 .../iotdb/db/sync/receiver/SyncServerManager.java  | 180 +++--------------
 .../db/sync/receiver/SyncServerManagerMBean.java   |  34 ++++
 5 files changed, 410 insertions(+), 331 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/service/RPCService.java b/server/src/main/java/org/apache/iotdb/db/service/RPCService.java
index 2ae9928..ebf2feb 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/RPCService.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/RPCService.java
@@ -18,47 +18,21 @@
  */
 package org.apache.iotdb.db.service;
 
-import java.net.InetSocketAddress;
-import java.util.concurrent.CountDownLatch;
-import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.concurrent.ThreadName;
 import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.StartupException;
-import org.apache.iotdb.db.exception.runtime.RPCServiceException;
-import org.apache.iotdb.service.rpc.thrift.TSIService;
+import org.apache.iotdb.db.service.thrift.ThriftService;
+import org.apache.iotdb.db.service.thrift.ThriftServiceThread;
 import org.apache.iotdb.service.rpc.thrift.TSIService.Processor;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TCompactProtocol;
-import org.apache.thrift.protocol.TProtocolFactory;
-import org.apache.thrift.server.TServer;
-import org.apache.thrift.server.TThreadPoolServer;
-import org.apache.thrift.transport.TFastFramedTransport;
-import org.apache.thrift.transport.TServerSocket;
-import org.apache.thrift.transport.TTransportException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * A service to handle jdbc request from client.
  */
-public class RPCService implements RPCServiceMBean, IService {
+public class RPCService extends ThriftService implements RPCServiceMBean {
 
-  private static final Logger logger = LoggerFactory.getLogger(RPCService.class);
-  private static final String STATUS_UP = "UP";
-  private static final String STATUS_DOWN = "DOWN";
-  private final String mbeanName = String
-      .format("%s:%s=%s", IoTDBConstant.IOTDB_PACKAGE, IoTDBConstant.JMX_TYPE,
-          getID().getJmxName());
-  private RPCServiceThread rpcServiceThread;
-  private TProtocolFactory protocolFactory;
-  private Processor<TSIService.Iface> processor;
-  private TThreadPoolServer.Args poolArgs;
   private TSServiceImpl impl;
 
-  private CountDownLatch stopLatch;
-
   private RPCService() {
   }
 
@@ -68,22 +42,7 @@ public class RPCService implements RPCServiceMBean, IService {
 
   @Override
   public String getRPCServiceStatus() {
-    if (rpcServiceThread == null) {
-      logger.debug("Start latch is null when getting status");
-    } else {
-      logger.debug("Start status is {} when getting status", rpcServiceThread.isServing());
-    }
-    if(stopLatch == null) {
-      logger.debug("Stop latch is null when getting status");
-    } else {
-      logger.debug("Stop latch is {} when getting status", stopLatch.getCount());
-    }	
-
-    if(rpcServiceThread != null && rpcServiceThread.isServing()) {
-      return STATUS_UP;
-    } else {
-      return STATUS_DOWN;
-    }
+    return super.getRPCServiceStatus();
   }
 
   @Override
@@ -93,15 +52,37 @@ public class RPCService implements RPCServiceMBean, IService {
   }
 
   @Override
-  public void start() throws StartupException {
-    JMXService.registerMBean(getInstance(), mbeanName);
-    startService();
+  public ThriftService getImplementation() {
+    return getInstance();
+  }
+
+  @Override
+  public void initTProcessor() throws ClassNotFoundException,IllegalAccessException,InstantiationException{
+      impl = (TSServiceImpl) Class.forName(IoTDBDescriptor.getInstance().getConfig()
+          .getRpcImplClassName()).newInstance();
+      processor = new Processor<>(impl);
+  }
+
+  @Override
+  public void initThriftServiceThread()
+      throws IllegalAccessException, InstantiationException, ClassNotFoundException {
+    IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+    thriftServiceThread = new ThriftServiceThread(processor,
+        getID().getName(), ThreadName.RPC_CLIENT.getName(),
+        config.getRpcAddress(), config.getRpcPort(), config.getRpcMaxConcurrentClientNum(),
+        config.getThriftServerAwaitTimeForStopService(),
+        new RPCServiceThriftHandler(impl));
+    thriftServiceThread.setName(ThreadName.RPC_SERVICE.getName());
+  }
+
+  @Override
+  public String getBindIP() {
+    return IoTDBDescriptor.getInstance().getConfig().getRpcAddress();
   }
 
   @Override
-  public void stop() {
-    stopService();
-    JMXService.deregisterMBean(mbeanName);
+  public int getBindPort() {
+    return IoTDBDescriptor.getInstance().getConfig().getRpcPort();
   }
 
   @Override
@@ -110,63 +91,18 @@ public class RPCService implements RPCServiceMBean, IService {
   }
 
   @Override
-  @SuppressWarnings("squid:S2276")
   public synchronized void startService() throws StartupException {
-    if (STATUS_UP.equals(getRPCServiceStatus())) {
-      logger.info("{}: {} has been already running now", IoTDBConstant.GLOBAL_DB_NAME,
-          this.getID().getName());
-      return;
-    }
-    logger.info("{}: start {}...", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName());
-    try {
-      reset();
-      rpcServiceThread = new RPCServiceThread(stopLatch);
-      rpcServiceThread.setName(ThreadName.RPC_SERVICE.getName());
-      rpcServiceThread.start();
-      while (!rpcServiceThread.isServing()) {
-        //sleep 100ms for waiting the rpc server start.
-        Thread.sleep(100);
-      }
-    } catch (InterruptedException | ClassNotFoundException |
-        IllegalAccessException | InstantiationException e) {
-      Thread.currentThread().interrupt();
-      throw new StartupException(this.getID().getName(), e.getMessage());
-    }
-
-    logger.info("{}: start {} successfully, listening on ip {} port {}", IoTDBConstant.GLOBAL_DB_NAME,
-        this.getID().getName(), IoTDBDescriptor.getInstance().getConfig().getRpcAddress(),
-        IoTDBDescriptor.getInstance().getConfig().getRpcPort());
-  }
-  
-  private void reset() {
-    rpcServiceThread = null;
-    stopLatch = new CountDownLatch(1);	  
+    super.startService();
   }
 
   @Override
   public synchronized void restartService() throws StartupException {
-    stopService();
-    startService();
+    super.restartService();
   }
 
   @Override
   public synchronized void stopService() {
-    if (STATUS_DOWN.equals(getRPCServiceStatus())) {
-      logger.info("{}: {} isn't running now", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName());
-      return;
-    }
-    logger.info("{}: closing {}...", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName());
-    if (rpcServiceThread != null) {
-      rpcServiceThread.close();
-    }
-    try {
-      stopLatch.await();
-      reset();
-      logger.info("{}: close {} successfully", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName());
-    } catch (InterruptedException e) {
-      logger.error("{}: close {} failed because: ", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName(), e);
-      Thread.currentThread().interrupt();
-    }
+    super.stopService();
   }
 
   private static class RPCServiceHolder {
@@ -177,88 +113,4 @@ public class RPCService implements RPCServiceMBean, IService {
     }
   }
 
-  private class RPCServiceThread extends Thread {
-
-    private TServerSocket serverTransport;
-    private TServer poolServer;
-    private CountDownLatch threadStopLatch;
-
-    public RPCServiceThread(CountDownLatch threadStopLatch)
-        throws ClassNotFoundException, IllegalAccessException, InstantiationException {
-      if(IoTDBDescriptor.getInstance().getConfig().isRpcThriftCompressionEnable()) {
-        protocolFactory = new TCompactProtocol.Factory();
-      }
-      else {
-        protocolFactory = new TBinaryProtocol.Factory();
-      }
-      IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
-      impl = (TSServiceImpl) Class.forName(config.getRpcImplClassName()).newInstance();
-      processor = new TSIService.Processor<>(impl);
-      this.threadStopLatch = threadStopLatch;
-    }
-
-    @SuppressWarnings("squid:S2093") // socket will be used later
-    @Override
-    public void run() {
-      logger.info("The RPC service thread begin to run...");
-      try {
-        IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
-        serverTransport = new TServerSocket(new InetSocketAddress(config.getRpcAddress(),
-            config.getRpcPort()));
-        //this is for testing.
-        if (!serverTransport.getServerSocket().isBound()) {
-          logger.error("The RPC service port is not bound.");
-        }
-        poolArgs = new TThreadPoolServer.Args(serverTransport).maxWorkerThreads(IoTDBDescriptor.
-            getInstance().getConfig().getRpcMaxConcurrentClientNum()).minWorkerThreads(1)
-            .stopTimeoutVal(
-                IoTDBDescriptor.getInstance().getConfig().getThriftServerAwaitTimeForStopService());
-        poolArgs.executorService = IoTDBThreadPoolFactory.createThriftRpcClientThreadPool(poolArgs,
-            ThreadName.RPC_CLIENT.getName());
-        poolArgs.processor(processor);
-        poolArgs.protocolFactory(protocolFactory);
-        poolArgs.transportFactory(new TFastFramedTransport.Factory());
-        poolServer = new TThreadPoolServer(poolArgs);
-        poolServer.setServerEventHandler(new RPCServiceThriftHandler(impl));
-        poolServer.serve();
-      } catch (TTransportException e) {
-        throw new RPCServiceException(String.format("%s: failed to start %s, because ", IoTDBConstant.GLOBAL_DB_NAME,
-            getID().getName()), e);
-      } catch (Exception e) {
-        throw new RPCServiceException(String.format("%s: %s exit, because ", IoTDBConstant.GLOBAL_DB_NAME, getID().getName()), e);
-      } finally {
-        close();
-        if (threadStopLatch == null) {
-          logger.debug("Stop Count Down latch is null");
-        } else {
-          logger.debug("Stop Count Down latch is {}", threadStopLatch.getCount());
-        }
-
-        if (threadStopLatch != null && threadStopLatch.getCount() == 1) {
-          threadStopLatch.countDown();
-        }
-        logger.debug("{}: close TThreadPoolServer and TServerSocket for {}",
-            IoTDBConstant.GLOBAL_DB_NAME, getID().getName());
-      }
-    }
-
-    private synchronized void close() {
-      if (poolServer != null) {
-        poolServer.setShouldStop(true);
-        poolServer.stop();
-        poolServer = null;
-      }
-      if (serverTransport != null) {
-        serverTransport.close();
-        serverTransport = null;
-      }
-    }
-
-    boolean isServing() {
-      if (poolServer != null) {
-        return poolServer.isServing();
-      }
-      return false;
-    }
-  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/ThriftService.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/ThriftService.java
new file mode 100644
index 0000000..4460072
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/ThriftService.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.service.thrift;
+
+import java.util.concurrent.CountDownLatch;
+import org.apache.iotdb.db.concurrent.ThreadName;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.ShutdownException;
+import org.apache.iotdb.db.exception.StartupException;
+import org.apache.iotdb.db.service.IService;
+import org.apache.iotdb.db.service.JMXService;
+import org.apache.iotdb.db.service.RPCService;
+import org.apache.iotdb.db.service.RPCServiceThriftHandler;
+import org.apache.iotdb.db.service.ServiceType;
+import org.apache.iotdb.db.service.TSServiceImpl;
+import org.apache.iotdb.service.rpc.thrift.TSIService;
+import org.apache.iotdb.service.rpc.thrift.TSIService.Processor;
+import org.apache.thrift.TProcessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class ThriftService implements IService {
+
+  private static final Logger logger = LoggerFactory.getLogger(ThriftService.class);
+  private static final String STATUS_UP = "UP";
+  private static final String STATUS_DOWN = "DOWN";
+  protected final String mbeanName = String
+      .format("%s:%s=%s", IoTDBConstant.IOTDB_PACKAGE, IoTDBConstant.JMX_TYPE,
+          getID().getJmxName());
+  protected ThriftServiceThread thriftServiceThread;
+  protected TProcessor processor;
+  //private TSServiceImpl impl;
+
+  private CountDownLatch stopLatch;
+
+  public String getRPCServiceStatus() {
+    if (thriftServiceThread == null) {
+      logger.debug("Start latch is null when getting status");
+    } else {
+      logger.debug("Start status is {} when getting status", thriftServiceThread.isServing());
+    }
+    if(stopLatch == null) {
+      logger.debug("Stop latch is null when getting status");
+    } else {
+      logger.debug("Stop latch is {} when getting status", stopLatch.getCount());
+    }
+
+    if(thriftServiceThread != null && thriftServiceThread.isServing()) {
+      return STATUS_UP;
+    } else {
+      return STATUS_DOWN;
+    }
+  }
+
+  public int getRPCPort() {
+    IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+    return config.getRpcPort();
+  }
+
+  public abstract ServiceType getID();
+  public abstract ThriftService getImplementation();
+
+  @Override
+  public  void start()  throws StartupException {
+    JMXService.registerMBean(getImplementation(), mbeanName);
+    startService();
+  }
+
+  @Override
+  public void stop() {
+    stopService();
+    JMXService.deregisterMBean(mbeanName);
+  }
+
+  public abstract void initTProcessor() throws ClassNotFoundException,IllegalAccessException,InstantiationException;
+  public abstract void initThriftServiceThread()
+      throws IllegalAccessException, InstantiationException, ClassNotFoundException;
+  public abstract String getBindIP();
+  public abstract int getBindPort();
+
+
+  @SuppressWarnings("squid:S2276")
+  public synchronized void startService() throws StartupException {
+    if (STATUS_UP.equals(getRPCServiceStatus())) {
+      logger.info("{}: {} has been already running now", IoTDBConstant.GLOBAL_DB_NAME,
+          this.getID().getName());
+      return;
+    }
+    logger.info("{}: start {}...", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName());
+    try {
+      reset();
+      initTProcessor();
+      initThriftServiceThread();
+      thriftServiceThread.setThreadStopLatch(stopLatch);
+      thriftServiceThread.start();
+
+      while (!thriftServiceThread.isServing()) {
+        //sleep 100ms for waiting the rpc server start.
+        Thread.sleep(100);
+      }
+    } catch (InterruptedException | ClassNotFoundException |
+        IllegalAccessException | InstantiationException e) {
+      Thread.currentThread().interrupt();
+      throw new StartupException(this.getID().getName(), e.getMessage());
+    }
+
+    logger.info("{}: start {} successfully, listening on ip {} port {}", IoTDBConstant.GLOBAL_DB_NAME,
+        this.getID().getName(), getBindIP(), getBindPort());
+  }
+
+  private void reset() {
+    thriftServiceThread = null;
+    stopLatch = new CountDownLatch(1);
+  }
+
+
+  public synchronized void restartService() throws StartupException {
+    stopService();
+    startService();
+  }
+
+  public synchronized void stopService() {
+    if (STATUS_DOWN.equals(getRPCServiceStatus())) {
+      logger.info("{}: {} isn't running now", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName());
+      return;
+    }
+    logger.info("{}: closing {}...", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName());
+    if (thriftServiceThread != null) {
+      thriftServiceThread.close();
+    }
+    try {
+      stopLatch.await();
+      reset();
+      logger.info("{}: close {} successfully", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName());
+    } catch (InterruptedException e) {
+      logger.error("{}: close {} failed because: ", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName(), e);
+      Thread.currentThread().interrupt();
+    }
+  }
+
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/ThriftServiceThread.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/ThriftServiceThread.java
new file mode 100644
index 0000000..589667a
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/ThriftServiceThread.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.service.thrift;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.CountDownLatch;
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.runtime.RPCServiceException;
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.server.TServerEventHandler;
+import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.transport.TFastFramedTransport;
+import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ThriftServiceThread extends Thread{
+
+  private static final Logger logger = LoggerFactory.getLogger(ThriftServiceThread.class);
+  private TServerSocket serverTransport;
+  private TServer poolServer;
+  private CountDownLatch threadStopLatch;
+
+  private String serviceName;
+
+  private TProtocolFactory protocolFactory;
+  private TProcessor processor;
+  private TThreadPoolServer.Args poolArgs;
+
+
+  public ThriftServiceThread(TProcessor processor, String serviceName,
+      String threadsName,
+      String bindAddress, int port, int maxWorkerThreads, int timeoutMs,
+      TServerEventHandler serverEventHandler)
+      throws ClassNotFoundException, IllegalAccessException, InstantiationException {
+    if(IoTDBDescriptor.getInstance().getConfig().isRpcThriftCompressionEnable()) {
+      protocolFactory = new TCompactProtocol.Factory();
+    }
+    else {
+      protocolFactory = new TBinaryProtocol.Factory();
+    }
+    IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+    this.processor = processor;
+    this.serviceName = serviceName;
+
+    try {
+      serverTransport = new TServerSocket(new InetSocketAddress(bindAddress, port));
+      poolArgs = new TThreadPoolServer.Args(serverTransport)
+          .maxWorkerThreads(maxWorkerThreads)
+          .minWorkerThreads(1)
+          .stopTimeoutVal(timeoutMs);
+      poolArgs.executorService = IoTDBThreadPoolFactory.createThriftRpcClientThreadPool(poolArgs,
+          threadsName);
+      poolArgs.processor(processor);
+      poolArgs.protocolFactory(protocolFactory);
+      poolArgs.transportFactory(new TFastFramedTransport.Factory());
+      poolServer = new TThreadPoolServer(poolArgs);
+      poolServer.setServerEventHandler(serverEventHandler);
+    } catch (TTransportException e) {
+      close();
+      if (threadStopLatch == null) {
+        logger.debug("Stop Count Down latch is null");
+      } else {
+        logger.debug("Stop Count Down latch is {}", threadStopLatch.getCount());
+      }
+      if (threadStopLatch != null && threadStopLatch.getCount() == 1) {
+        threadStopLatch.countDown();
+      }
+      logger.debug("{}: close TThreadPoolServer and TServerSocket for {}",
+          IoTDBConstant.GLOBAL_DB_NAME, serviceName);
+      throw new RPCServiceException(String.format("%s: failed to start %s, because ",
+          IoTDBConstant.GLOBAL_DB_NAME, serviceName), e);
+    }
+  }
+
+  public void setThreadStopLatch(CountDownLatch threadStopLatch) {
+    this.threadStopLatch = threadStopLatch;
+  }
+
+  @SuppressWarnings("squid:S2093") // socket will be used later
+  @Override
+  public void run() {
+    logger.info("The {} service thread begin to run...", serviceName);
+    try {
+      poolServer.serve();
+    } catch (Exception e) {
+      throw new RPCServiceException(String.format("%s: %s exit, because ",
+          IoTDBConstant.GLOBAL_DB_NAME, serviceName), e);
+    } finally {
+      close();
+      if (threadStopLatch == null) {
+        logger.debug("Stop Count Down latch is null");
+      } else {
+        logger.debug("Stop Count Down latch is {}", threadStopLatch.getCount());
+      }
+
+      if (threadStopLatch != null && threadStopLatch.getCount() == 1) {
+        threadStopLatch.countDown();
+      }
+      logger.debug("{}: close TThreadPoolServer and TServerSocket for {}",
+          IoTDBConstant.GLOBAL_DB_NAME, serviceName);
+    }
+  }
+
+  public synchronized void close() {
+    if (poolServer != null) {
+      poolServer.setShouldStop(true);
+      poolServer.stop();
+
+      poolServer = null;
+    }
+    if (serverTransport != null) {
+      serverTransport.close();
+      serverTransport = null;
+    }
+  }
+
+  public boolean isServing() {
+    if (poolServer != null) {
+      return poolServer.isServing();
+    }
+    return false;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java
index fa432fa..3da8d71 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java
@@ -18,183 +18,67 @@
  */
 package org.apache.iotdb.db.sync.receiver;
 
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.concurrent.CountDownLatch;
-import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.concurrent.ThreadName;
 import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.exception.StartupException;
-import org.apache.iotdb.db.service.IService;
 import org.apache.iotdb.db.service.ServiceType;
-import org.apache.iotdb.db.sync.receiver.load.FileLoaderManager;
-import org.apache.iotdb.db.sync.receiver.recover.SyncReceiverLogAnalyzer;
+import org.apache.iotdb.db.service.thrift.ThriftService;
+import org.apache.iotdb.db.service.thrift.ThriftServiceThread;
 import org.apache.iotdb.db.sync.receiver.transfer.SyncServiceImpl;
 import org.apache.iotdb.service.sync.thrift.SyncService;
-import org.apache.iotdb.service.sync.thrift.SyncService.Processor;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TCompactProtocol;
-import org.apache.thrift.protocol.TProtocolFactory;
-import org.apache.thrift.server.TServer;
-import org.apache.thrift.server.TThreadPoolServer;
-import org.apache.thrift.transport.TServerSocket;
-import org.apache.thrift.transport.TTransportException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * sync receiver server.
  */
-public class SyncServerManager implements IService {
+public class SyncServerManager  extends ThriftService implements SyncServerManagerMBean {
 
-  private static final Logger logger = LoggerFactory.getLogger(SyncServerManager.class);
+  private SyncServiceImpl serviceImpl;
 
-  private IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig();
-
-  private SyncServiceThread syncServerThread;
-
-  //stopLatch is also for letting the IT know whether the socket is closed.
-  private CountDownLatch stopLatch;
+  private static class ServerManagerHolder {
 
-  private SyncServerManager() {
+    private static final SyncServerManager INSTANCE = new SyncServerManager();
   }
 
   public static SyncServerManager getInstance() {
-    return ServerManagerHolder.INSTANCE;
+    return SyncServerManager.ServerManagerHolder.INSTANCE;
   }
 
-  /**
-   * Start sync receiver's server.
-   */
   @Override
-  public void start() throws StartupException {
-    if (!conf.isSyncEnable()) {
-      return;
-    }
-    FileLoaderManager.getInstance().start();
-    try {
-      SyncReceiverLogAnalyzer.getInstance().recoverAll();
-    } catch (IOException e) {
-      logger.error("Can not recover receiver sync state", e);
-    }
-    if (conf.getIpWhiteList() == null) {
-      logger.error(
-          "Sync server failed to start because IP white list is null, please set IP white list.");
-      return;
-    }
-    stopLatch = new CountDownLatch(1);
-    conf.setIpWhiteList(conf.getIpWhiteList().replaceAll(" ", ""));
-    syncServerThread = new SyncServiceThread(stopLatch);
-    syncServerThread.setName(ThreadName.SYNC_SERVER.getName());
-    syncServerThread.start();
-    try {
-      while (!syncServerThread.isServing()) {
-        //sleep 100ms for waiting the sync server start.
-        Thread.sleep(100);
-      }
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      throw new StartupException(this.getID().getName(), e.getMessage());
-    }
-    logger.info("Sync server has started.");
+  public ServiceType getID() {
+    return ServiceType.SYNC_SERVICE;
   }
 
-  /**
-   * Close sync receiver's server.
-   */
   @Override
-  public void stop() {
-    if (conf.isSyncEnable()) {
-      FileLoaderManager.getInstance().stop();
-      syncServerThread.close();
-      try {
-        stopLatch.await();
-      } catch (InterruptedException e) {
-        logger.error(e.getMessage(), e);
-        Thread.currentThread().interrupt();
-      }
-    }
+  public ThriftService getImplementation() {
+    return getInstance();
   }
 
   @Override
-  public ServiceType getID() {
-    return ServiceType.SYNC_SERVICE;
+  public void initTProcessor() {
+    serviceImpl = new SyncServiceImpl();
+    processor = new SyncService.Processor<>(serviceImpl);
   }
 
-  private static class ServerManagerHolder {
-
-    private static final SyncServerManager INSTANCE = new SyncServerManager();
+  @Override
+  public void initThriftServiceThread()
+      throws IllegalAccessException, InstantiationException, ClassNotFoundException {
+    IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+    thriftServiceThread = new ThriftServiceThread(processor,
+        getID().getName(), ThreadName.SYNC_CLIENT.getName(),
+        config.getRpcAddress(), config.getSyncServerPort(),
+        Integer.MAX_VALUE, config.getThriftServerAwaitTimeForStopService(),
+        new SyncServerThriftHandler(serviceImpl)
+        );
+    thriftServiceThread.setName(ThreadName.SYNC_SERVER.getName());
   }
 
-  private class SyncServiceThread extends Thread {
-
-    private TServerSocket serverTransport;
-    private TServer poolServer;
-    private TProtocolFactory protocolFactory;
-    private Processor<SyncService.Iface> processor;
-    private TThreadPoolServer.Args poolArgs;
-    private CountDownLatch threadStopLatch;
-    private SyncServiceImpl serviceImpl;
-
-    public SyncServiceThread(CountDownLatch stopLatch) {
-      serviceImpl = new SyncServiceImpl();
-      processor = new SyncService.Processor<>(serviceImpl);
-      this.threadStopLatch = stopLatch;
-    }
-
-    @Override
-    public void run() {
-      try {
-        serverTransport = new TServerSocket(
-            new InetSocketAddress(conf.getRpcAddress(), conf.getSyncServerPort()));
-        if (conf.isRpcThriftCompressionEnable()) {
-          protocolFactory = new TCompactProtocol.Factory();
-        } else {
-          protocolFactory = new TBinaryProtocol.Factory();
-        }
-        poolArgs = new TThreadPoolServer.Args(serverTransport).stopTimeoutVal(
-            IoTDBDescriptor.getInstance().getConfig().getThriftServerAwaitTimeForStopService());
-        poolArgs.executorService = IoTDBThreadPoolFactory.createThriftRpcClientThreadPool(poolArgs,
-            ThreadName.SYNC_CLIENT.getName());
-        poolArgs.protocolFactory(protocolFactory);
-        poolArgs.processor(processor);
-        poolServer = new TThreadPoolServer(poolArgs);
-        poolServer.setServerEventHandler(new SyncServerThriftHandler(serviceImpl));
-        poolServer.serve();
-      } catch (TTransportException e) {
-        logger.error("{}: failed to start {}, because ", IoTDBConstant.GLOBAL_DB_NAME,
-            getID().getName(), e);
-      } catch (Exception e) {
-        logger.error("{}: {} exit, because ", IoTDBConstant.GLOBAL_DB_NAME, getID().getName(), e);
-      } finally {
-        close();
-        if (threadStopLatch != null && threadStopLatch.getCount() == 1) {
-          threadStopLatch.countDown();
-        }
-        logger.info("{}: close TThreadPoolServer and TServerSocket for {}",
-            IoTDBConstant.GLOBAL_DB_NAME, getID().getName());
-
-      }
-    }
-
-    private synchronized void close() {
-      if (poolServer != null) {
-        poolServer.stop();
-        poolServer = null;
-      }
-      if (serverTransport != null) {
-        serverTransport.close();
-        serverTransport = null;
-      }
-    }
+  @Override
+  public String getBindIP() {
+    return IoTDBDescriptor.getInstance().getConfig().getRpcAddress();
+  }
 
-    boolean isServing() {
-      if (poolServer != null) {
-        return  poolServer.isServing();
-      }
-      return false;
-    }
+  @Override
+  public int getBindPort() {
+    return IoTDBDescriptor.getInstance().getConfig().getSyncServerPort();
   }
 }
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManagerMBean.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManagerMBean.java
new file mode 100644
index 0000000..3992693
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManagerMBean.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.sync.receiver;
+
+import org.apache.iotdb.db.exception.StartupException;
+
+public interface SyncServerManagerMBean {
+
+  String getRPCServiceStatus();
+
+  int getRPCPort();
+
+  void startService() throws StartupException;
+
+  void restartService() throws StartupException;
+
+  void stopService();
+}