You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2020/07/02 09:11:54 UTC

[incubator-iotdb] branch refactor_thrift_rpc_service created (now df2fde7)

This is an automated email from the ASF dual-hosted git repository.

hxd pushed a change to branch refactor_thrift_rpc_service
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git.


      at df2fde7  refactor the thrift rpc service to reduce duplicate codes of RPC and Sync module

This branch includes the following new commits:

     new df2fde7  refactor the thrift rpc service to reduce duplicate codes of RPC and Sync module

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-iotdb] 01/01: refactor the thrift rpc service to reduce duplicate codes of RPC and Sync module

Posted by hx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hxd pushed a commit to branch refactor_thrift_rpc_service
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit df2fde7595d195073522602b6607f54493d626f9
Author: xiangdong huang <sa...@gmail.com>
AuthorDate: Thu Jul 2 17:11:19 2020 +0800

    refactor the thrift rpc service to reduce duplicate codes of RPC and Sync module
---
 .../org/apache/iotdb/db/service/RPCService.java    | 218 ++++-----------------
 .../iotdb/db/service/thrift/ThriftService.java     | 160 +++++++++++++++
 .../db/service/thrift/ThriftServiceThread.java     | 149 ++++++++++++++
 .../iotdb/db/sync/receiver/SyncServerManager.java  | 180 +++--------------
 .../db/sync/receiver/SyncServerManagerMBean.java   |  34 ++++
 5 files changed, 410 insertions(+), 331 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/service/RPCService.java b/server/src/main/java/org/apache/iotdb/db/service/RPCService.java
index 2ae9928..ebf2feb 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/RPCService.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/RPCService.java
@@ -18,47 +18,21 @@
  */
 package org.apache.iotdb.db.service;
 
-import java.net.InetSocketAddress;
-import java.util.concurrent.CountDownLatch;
-import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.concurrent.ThreadName;
 import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.StartupException;
-import org.apache.iotdb.db.exception.runtime.RPCServiceException;
-import org.apache.iotdb.service.rpc.thrift.TSIService;
+import org.apache.iotdb.db.service.thrift.ThriftService;
+import org.apache.iotdb.db.service.thrift.ThriftServiceThread;
 import org.apache.iotdb.service.rpc.thrift.TSIService.Processor;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TCompactProtocol;
-import org.apache.thrift.protocol.TProtocolFactory;
-import org.apache.thrift.server.TServer;
-import org.apache.thrift.server.TThreadPoolServer;
-import org.apache.thrift.transport.TFastFramedTransport;
-import org.apache.thrift.transport.TServerSocket;
-import org.apache.thrift.transport.TTransportException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * A service to handle jdbc request from client.
  */
-public class RPCService implements RPCServiceMBean, IService {
+public class RPCService extends ThriftService implements RPCServiceMBean {
 
-  private static final Logger logger = LoggerFactory.getLogger(RPCService.class);
-  private static final String STATUS_UP = "UP";
-  private static final String STATUS_DOWN = "DOWN";
-  private final String mbeanName = String
-      .format("%s:%s=%s", IoTDBConstant.IOTDB_PACKAGE, IoTDBConstant.JMX_TYPE,
-          getID().getJmxName());
-  private RPCServiceThread rpcServiceThread;
-  private TProtocolFactory protocolFactory;
-  private Processor<TSIService.Iface> processor;
-  private TThreadPoolServer.Args poolArgs;
   private TSServiceImpl impl;
 
-  private CountDownLatch stopLatch;
-
   private RPCService() {
   }
 
@@ -68,22 +42,7 @@ public class RPCService implements RPCServiceMBean, IService {
 
   @Override
   public String getRPCServiceStatus() {
-    if (rpcServiceThread == null) {
-      logger.debug("Start latch is null when getting status");
-    } else {
-      logger.debug("Start status is {} when getting status", rpcServiceThread.isServing());
-    }
-    if(stopLatch == null) {
-      logger.debug("Stop latch is null when getting status");
-    } else {
-      logger.debug("Stop latch is {} when getting status", stopLatch.getCount());
-    }	
-
-    if(rpcServiceThread != null && rpcServiceThread.isServing()) {
-      return STATUS_UP;
-    } else {
-      return STATUS_DOWN;
-    }
+    return super.getRPCServiceStatus();
   }
 
   @Override
@@ -93,15 +52,37 @@ public class RPCService implements RPCServiceMBean, IService {
   }
 
   @Override
-  public void start() throws StartupException {
-    JMXService.registerMBean(getInstance(), mbeanName);
-    startService();
+  public ThriftService getImplementation() {
+    return getInstance();
+  }
+
+  @Override
+  public void initTProcessor() throws ClassNotFoundException,IllegalAccessException,InstantiationException{
+      impl = (TSServiceImpl) Class.forName(IoTDBDescriptor.getInstance().getConfig()
+          .getRpcImplClassName()).newInstance();
+      processor = new Processor<>(impl);
+  }
+
+  @Override
+  public void initThriftServiceThread()
+      throws IllegalAccessException, InstantiationException, ClassNotFoundException {
+    IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+    thriftServiceThread = new ThriftServiceThread(processor,
+        getID().getName(), ThreadName.RPC_CLIENT.getName(),
+        config.getRpcAddress(), config.getRpcPort(), config.getRpcMaxConcurrentClientNum(),
+        config.getThriftServerAwaitTimeForStopService(),
+        new RPCServiceThriftHandler(impl));
+    thriftServiceThread.setName(ThreadName.RPC_SERVICE.getName());
+  }
+
+  @Override
+  public String getBindIP() {
+    return IoTDBDescriptor.getInstance().getConfig().getRpcAddress();
   }
 
   @Override
-  public void stop() {
-    stopService();
-    JMXService.deregisterMBean(mbeanName);
+  public int getBindPort() {
+    return IoTDBDescriptor.getInstance().getConfig().getRpcPort();
   }
 
   @Override
@@ -110,63 +91,18 @@ public class RPCService implements RPCServiceMBean, IService {
   }
 
   @Override
-  @SuppressWarnings("squid:S2276")
   public synchronized void startService() throws StartupException {
-    if (STATUS_UP.equals(getRPCServiceStatus())) {
-      logger.info("{}: {} has been already running now", IoTDBConstant.GLOBAL_DB_NAME,
-          this.getID().getName());
-      return;
-    }
-    logger.info("{}: start {}...", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName());
-    try {
-      reset();
-      rpcServiceThread = new RPCServiceThread(stopLatch);
-      rpcServiceThread.setName(ThreadName.RPC_SERVICE.getName());
-      rpcServiceThread.start();
-      while (!rpcServiceThread.isServing()) {
-        //sleep 100ms for waiting the rpc server start.
-        Thread.sleep(100);
-      }
-    } catch (InterruptedException | ClassNotFoundException |
-        IllegalAccessException | InstantiationException e) {
-      Thread.currentThread().interrupt();
-      throw new StartupException(this.getID().getName(), e.getMessage());
-    }
-
-    logger.info("{}: start {} successfully, listening on ip {} port {}", IoTDBConstant.GLOBAL_DB_NAME,
-        this.getID().getName(), IoTDBDescriptor.getInstance().getConfig().getRpcAddress(),
-        IoTDBDescriptor.getInstance().getConfig().getRpcPort());
-  }
-  
-  private void reset() {
-    rpcServiceThread = null;
-    stopLatch = new CountDownLatch(1);	  
+    super.startService();
   }
 
   @Override
   public synchronized void restartService() throws StartupException {
-    stopService();
-    startService();
+    super.restartService();
   }
 
   @Override
   public synchronized void stopService() {
-    if (STATUS_DOWN.equals(getRPCServiceStatus())) {
-      logger.info("{}: {} isn't running now", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName());
-      return;
-    }
-    logger.info("{}: closing {}...", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName());
-    if (rpcServiceThread != null) {
-      rpcServiceThread.close();
-    }
-    try {
-      stopLatch.await();
-      reset();
-      logger.info("{}: close {} successfully", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName());
-    } catch (InterruptedException e) {
-      logger.error("{}: close {} failed because: ", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName(), e);
-      Thread.currentThread().interrupt();
-    }
+    super.stopService();
   }
 
   private static class RPCServiceHolder {
@@ -177,88 +113,4 @@ public class RPCService implements RPCServiceMBean, IService {
     }
   }
 
-  private class RPCServiceThread extends Thread {
-
-    private TServerSocket serverTransport;
-    private TServer poolServer;
-    private CountDownLatch threadStopLatch;
-
-    public RPCServiceThread(CountDownLatch threadStopLatch)
-        throws ClassNotFoundException, IllegalAccessException, InstantiationException {
-      if(IoTDBDescriptor.getInstance().getConfig().isRpcThriftCompressionEnable()) {
-        protocolFactory = new TCompactProtocol.Factory();
-      }
-      else {
-        protocolFactory = new TBinaryProtocol.Factory();
-      }
-      IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
-      impl = (TSServiceImpl) Class.forName(config.getRpcImplClassName()).newInstance();
-      processor = new TSIService.Processor<>(impl);
-      this.threadStopLatch = threadStopLatch;
-    }
-
-    @SuppressWarnings("squid:S2093") // socket will be used later
-    @Override
-    public void run() {
-      logger.info("The RPC service thread begin to run...");
-      try {
-        IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
-        serverTransport = new TServerSocket(new InetSocketAddress(config.getRpcAddress(),
-            config.getRpcPort()));
-        //this is for testing.
-        if (!serverTransport.getServerSocket().isBound()) {
-          logger.error("The RPC service port is not bound.");
-        }
-        poolArgs = new TThreadPoolServer.Args(serverTransport).maxWorkerThreads(IoTDBDescriptor.
-            getInstance().getConfig().getRpcMaxConcurrentClientNum()).minWorkerThreads(1)
-            .stopTimeoutVal(
-                IoTDBDescriptor.getInstance().getConfig().getThriftServerAwaitTimeForStopService());
-        poolArgs.executorService = IoTDBThreadPoolFactory.createThriftRpcClientThreadPool(poolArgs,
-            ThreadName.RPC_CLIENT.getName());
-        poolArgs.processor(processor);
-        poolArgs.protocolFactory(protocolFactory);
-        poolArgs.transportFactory(new TFastFramedTransport.Factory());
-        poolServer = new TThreadPoolServer(poolArgs);
-        poolServer.setServerEventHandler(new RPCServiceThriftHandler(impl));
-        poolServer.serve();
-      } catch (TTransportException e) {
-        throw new RPCServiceException(String.format("%s: failed to start %s, because ", IoTDBConstant.GLOBAL_DB_NAME,
-            getID().getName()), e);
-      } catch (Exception e) {
-        throw new RPCServiceException(String.format("%s: %s exit, because ", IoTDBConstant.GLOBAL_DB_NAME, getID().getName()), e);
-      } finally {
-        close();
-        if (threadStopLatch == null) {
-          logger.debug("Stop Count Down latch is null");
-        } else {
-          logger.debug("Stop Count Down latch is {}", threadStopLatch.getCount());
-        }
-
-        if (threadStopLatch != null && threadStopLatch.getCount() == 1) {
-          threadStopLatch.countDown();
-        }
-        logger.debug("{}: close TThreadPoolServer and TServerSocket for {}",
-            IoTDBConstant.GLOBAL_DB_NAME, getID().getName());
-      }
-    }
-
-    private synchronized void close() {
-      if (poolServer != null) {
-        poolServer.setShouldStop(true);
-        poolServer.stop();
-        poolServer = null;
-      }
-      if (serverTransport != null) {
-        serverTransport.close();
-        serverTransport = null;
-      }
-    }
-
-    boolean isServing() {
-      if (poolServer != null) {
-        return poolServer.isServing();
-      }
-      return false;
-    }
-  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/ThriftService.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/ThriftService.java
new file mode 100644
index 0000000..4460072
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/ThriftService.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.service.thrift;
+
+import java.util.concurrent.CountDownLatch;
+import org.apache.iotdb.db.concurrent.ThreadName;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.ShutdownException;
+import org.apache.iotdb.db.exception.StartupException;
+import org.apache.iotdb.db.service.IService;
+import org.apache.iotdb.db.service.JMXService;
+import org.apache.iotdb.db.service.RPCService;
+import org.apache.iotdb.db.service.RPCServiceThriftHandler;
+import org.apache.iotdb.db.service.ServiceType;
+import org.apache.iotdb.db.service.TSServiceImpl;
+import org.apache.iotdb.service.rpc.thrift.TSIService;
+import org.apache.iotdb.service.rpc.thrift.TSIService.Processor;
+import org.apache.thrift.TProcessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class ThriftService implements IService {
+
+  private static final Logger logger = LoggerFactory.getLogger(ThriftService.class);
+  private static final String STATUS_UP = "UP";
+  private static final String STATUS_DOWN = "DOWN";
+  protected final String mbeanName = String
+      .format("%s:%s=%s", IoTDBConstant.IOTDB_PACKAGE, IoTDBConstant.JMX_TYPE,
+          getID().getJmxName());
+  protected ThriftServiceThread thriftServiceThread;
+  protected TProcessor processor;
+  //private TSServiceImpl impl;
+
+  private CountDownLatch stopLatch;
+
+  public String getRPCServiceStatus() {
+    if (thriftServiceThread == null) {
+      logger.debug("Start latch is null when getting status");
+    } else {
+      logger.debug("Start status is {} when getting status", thriftServiceThread.isServing());
+    }
+    if(stopLatch == null) {
+      logger.debug("Stop latch is null when getting status");
+    } else {
+      logger.debug("Stop latch is {} when getting status", stopLatch.getCount());
+    }
+
+    if(thriftServiceThread != null && thriftServiceThread.isServing()) {
+      return STATUS_UP;
+    } else {
+      return STATUS_DOWN;
+    }
+  }
+
+  public int getRPCPort() {
+    IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+    return config.getRpcPort();
+  }
+
+  public abstract ServiceType getID();
+  public abstract ThriftService getImplementation();
+
+  @Override
+  public  void start()  throws StartupException {
+    JMXService.registerMBean(getImplementation(), mbeanName);
+    startService();
+  }
+
+  @Override
+  public void stop() {
+    stopService();
+    JMXService.deregisterMBean(mbeanName);
+  }
+
+  public abstract void initTProcessor() throws ClassNotFoundException,IllegalAccessException,InstantiationException;
+  public abstract void initThriftServiceThread()
+      throws IllegalAccessException, InstantiationException, ClassNotFoundException;
+  public abstract String getBindIP();
+  public abstract int getBindPort();
+
+
+  @SuppressWarnings("squid:S2276")
+  public synchronized void startService() throws StartupException {
+    if (STATUS_UP.equals(getRPCServiceStatus())) {
+      logger.info("{}: {} has been already running now", IoTDBConstant.GLOBAL_DB_NAME,
+          this.getID().getName());
+      return;
+    }
+    logger.info("{}: start {}...", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName());
+    try {
+      reset();
+      initTProcessor();
+      initThriftServiceThread();
+      thriftServiceThread.setThreadStopLatch(stopLatch);
+      thriftServiceThread.start();
+
+      while (!thriftServiceThread.isServing()) {
+        //sleep 100ms for waiting the rpc server start.
+        Thread.sleep(100);
+      }
+    } catch (InterruptedException | ClassNotFoundException |
+        IllegalAccessException | InstantiationException e) {
+      Thread.currentThread().interrupt();
+      throw new StartupException(this.getID().getName(), e.getMessage());
+    }
+
+    logger.info("{}: start {} successfully, listening on ip {} port {}", IoTDBConstant.GLOBAL_DB_NAME,
+        this.getID().getName(), getBindIP(), getBindPort());
+  }
+
+  private void reset() {
+    thriftServiceThread = null;
+    stopLatch = new CountDownLatch(1);
+  }
+
+
+  public synchronized void restartService() throws StartupException {
+    stopService();
+    startService();
+  }
+
+  public synchronized void stopService() {
+    if (STATUS_DOWN.equals(getRPCServiceStatus())) {
+      logger.info("{}: {} isn't running now", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName());
+      return;
+    }
+    logger.info("{}: closing {}...", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName());
+    if (thriftServiceThread != null) {
+      thriftServiceThread.close();
+    }
+    try {
+      stopLatch.await();
+      reset();
+      logger.info("{}: close {} successfully", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName());
+    } catch (InterruptedException e) {
+      logger.error("{}: close {} failed because: ", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName(), e);
+      Thread.currentThread().interrupt();
+    }
+  }
+
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/ThriftServiceThread.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/ThriftServiceThread.java
new file mode 100644
index 0000000..589667a
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/ThriftServiceThread.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.service.thrift;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.CountDownLatch;
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.runtime.RPCServiceException;
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.server.TServerEventHandler;
+import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.transport.TFastFramedTransport;
+import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ThriftServiceThread extends Thread{
+
+  private static final Logger logger = LoggerFactory.getLogger(ThriftServiceThread.class);
+  private TServerSocket serverTransport;
+  private TServer poolServer;
+  private CountDownLatch threadStopLatch;
+
+  private String serviceName;
+
+  private TProtocolFactory protocolFactory;
+  private TProcessor processor;
+  private TThreadPoolServer.Args poolArgs;
+
+
+  public ThriftServiceThread(TProcessor processor, String serviceName,
+      String threadsName,
+      String bindAddress, int port, int maxWorkerThreads, int timeoutMs,
+      TServerEventHandler serverEventHandler)
+      throws ClassNotFoundException, IllegalAccessException, InstantiationException {
+    if(IoTDBDescriptor.getInstance().getConfig().isRpcThriftCompressionEnable()) {
+      protocolFactory = new TCompactProtocol.Factory();
+    }
+    else {
+      protocolFactory = new TBinaryProtocol.Factory();
+    }
+    IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+    this.processor = processor;
+    this.serviceName = serviceName;
+
+    try {
+      serverTransport = new TServerSocket(new InetSocketAddress(bindAddress, port));
+      poolArgs = new TThreadPoolServer.Args(serverTransport)
+          .maxWorkerThreads(maxWorkerThreads)
+          .minWorkerThreads(1)
+          .stopTimeoutVal(timeoutMs);
+      poolArgs.executorService = IoTDBThreadPoolFactory.createThriftRpcClientThreadPool(poolArgs,
+          threadsName);
+      poolArgs.processor(processor);
+      poolArgs.protocolFactory(protocolFactory);
+      poolArgs.transportFactory(new TFastFramedTransport.Factory());
+      poolServer = new TThreadPoolServer(poolArgs);
+      poolServer.setServerEventHandler(serverEventHandler);
+    } catch (TTransportException e) {
+      close();
+      if (threadStopLatch == null) {
+        logger.debug("Stop Count Down latch is null");
+      } else {
+        logger.debug("Stop Count Down latch is {}", threadStopLatch.getCount());
+      }
+      if (threadStopLatch != null && threadStopLatch.getCount() == 1) {
+        threadStopLatch.countDown();
+      }
+      logger.debug("{}: close TThreadPoolServer and TServerSocket for {}",
+          IoTDBConstant.GLOBAL_DB_NAME, serviceName);
+      throw new RPCServiceException(String.format("%s: failed to start %s, because ",
+          IoTDBConstant.GLOBAL_DB_NAME, serviceName), e);
+    }
+  }
+
+  public void setThreadStopLatch(CountDownLatch threadStopLatch) {
+    this.threadStopLatch = threadStopLatch;
+  }
+
+  @SuppressWarnings("squid:S2093") // socket will be used later
+  @Override
+  public void run() {
+    logger.info("The {} service thread begin to run...", serviceName);
+    try {
+      poolServer.serve();
+    } catch (Exception e) {
+      throw new RPCServiceException(String.format("%s: %s exit, because ",
+          IoTDBConstant.GLOBAL_DB_NAME, serviceName), e);
+    } finally {
+      close();
+      if (threadStopLatch == null) {
+        logger.debug("Stop Count Down latch is null");
+      } else {
+        logger.debug("Stop Count Down latch is {}", threadStopLatch.getCount());
+      }
+
+      if (threadStopLatch != null && threadStopLatch.getCount() == 1) {
+        threadStopLatch.countDown();
+      }
+      logger.debug("{}: close TThreadPoolServer and TServerSocket for {}",
+          IoTDBConstant.GLOBAL_DB_NAME, serviceName);
+    }
+  }
+
+  public synchronized void close() {
+    if (poolServer != null) {
+      poolServer.setShouldStop(true);
+      poolServer.stop();
+
+      poolServer = null;
+    }
+    if (serverTransport != null) {
+      serverTransport.close();
+      serverTransport = null;
+    }
+  }
+
+  public boolean isServing() {
+    if (poolServer != null) {
+      return poolServer.isServing();
+    }
+    return false;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java
index fa432fa..3da8d71 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java
@@ -18,183 +18,67 @@
  */
 package org.apache.iotdb.db.sync.receiver;
 
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.concurrent.CountDownLatch;
-import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.concurrent.ThreadName;
 import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.exception.StartupException;
-import org.apache.iotdb.db.service.IService;
 import org.apache.iotdb.db.service.ServiceType;
-import org.apache.iotdb.db.sync.receiver.load.FileLoaderManager;
-import org.apache.iotdb.db.sync.receiver.recover.SyncReceiverLogAnalyzer;
+import org.apache.iotdb.db.service.thrift.ThriftService;
+import org.apache.iotdb.db.service.thrift.ThriftServiceThread;
 import org.apache.iotdb.db.sync.receiver.transfer.SyncServiceImpl;
 import org.apache.iotdb.service.sync.thrift.SyncService;
-import org.apache.iotdb.service.sync.thrift.SyncService.Processor;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TCompactProtocol;
-import org.apache.thrift.protocol.TProtocolFactory;
-import org.apache.thrift.server.TServer;
-import org.apache.thrift.server.TThreadPoolServer;
-import org.apache.thrift.transport.TServerSocket;
-import org.apache.thrift.transport.TTransportException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * sync receiver server.
  */
-public class SyncServerManager implements IService {
+public class SyncServerManager  extends ThriftService implements SyncServerManagerMBean {
 
-  private static final Logger logger = LoggerFactory.getLogger(SyncServerManager.class);
+  private SyncServiceImpl serviceImpl;
 
-  private IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig();
-
-  private SyncServiceThread syncServerThread;
-
-  //stopLatch is also for letting the IT know whether the socket is closed.
-  private CountDownLatch stopLatch;
+  private static class ServerManagerHolder {
 
-  private SyncServerManager() {
+    private static final SyncServerManager INSTANCE = new SyncServerManager();
   }
 
   public static SyncServerManager getInstance() {
-    return ServerManagerHolder.INSTANCE;
+    return SyncServerManager.ServerManagerHolder.INSTANCE;
   }
 
-  /**
-   * Start sync receiver's server.
-   */
   @Override
-  public void start() throws StartupException {
-    if (!conf.isSyncEnable()) {
-      return;
-    }
-    FileLoaderManager.getInstance().start();
-    try {
-      SyncReceiverLogAnalyzer.getInstance().recoverAll();
-    } catch (IOException e) {
-      logger.error("Can not recover receiver sync state", e);
-    }
-    if (conf.getIpWhiteList() == null) {
-      logger.error(
-          "Sync server failed to start because IP white list is null, please set IP white list.");
-      return;
-    }
-    stopLatch = new CountDownLatch(1);
-    conf.setIpWhiteList(conf.getIpWhiteList().replaceAll(" ", ""));
-    syncServerThread = new SyncServiceThread(stopLatch);
-    syncServerThread.setName(ThreadName.SYNC_SERVER.getName());
-    syncServerThread.start();
-    try {
-      while (!syncServerThread.isServing()) {
-        //sleep 100ms for waiting the sync server start.
-        Thread.sleep(100);
-      }
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      throw new StartupException(this.getID().getName(), e.getMessage());
-    }
-    logger.info("Sync server has started.");
+  public ServiceType getID() {
+    return ServiceType.SYNC_SERVICE;
   }
 
-  /**
-   * Close sync receiver's server.
-   */
   @Override
-  public void stop() {
-    if (conf.isSyncEnable()) {
-      FileLoaderManager.getInstance().stop();
-      syncServerThread.close();
-      try {
-        stopLatch.await();
-      } catch (InterruptedException e) {
-        logger.error(e.getMessage(), e);
-        Thread.currentThread().interrupt();
-      }
-    }
+  public ThriftService getImplementation() {
+    return getInstance();
   }
 
   @Override
-  public ServiceType getID() {
-    return ServiceType.SYNC_SERVICE;
+  public void initTProcessor() {
+    serviceImpl = new SyncServiceImpl();
+    processor = new SyncService.Processor<>(serviceImpl);
   }
 
-  private static class ServerManagerHolder {
-
-    private static final SyncServerManager INSTANCE = new SyncServerManager();
+  @Override
+  public void initThriftServiceThread()
+      throws IllegalAccessException, InstantiationException, ClassNotFoundException {
+    IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+    thriftServiceThread = new ThriftServiceThread(processor,
+        getID().getName(), ThreadName.SYNC_CLIENT.getName(),
+        config.getRpcAddress(), config.getSyncServerPort(),
+        Integer.MAX_VALUE, config.getThriftServerAwaitTimeForStopService(),
+        new SyncServerThriftHandler(serviceImpl)
+        );
+    thriftServiceThread.setName(ThreadName.SYNC_SERVER.getName());
   }
 
-  private class SyncServiceThread extends Thread {
-
-    private TServerSocket serverTransport;
-    private TServer poolServer;
-    private TProtocolFactory protocolFactory;
-    private Processor<SyncService.Iface> processor;
-    private TThreadPoolServer.Args poolArgs;
-    private CountDownLatch threadStopLatch;
-    private SyncServiceImpl serviceImpl;
-
-    public SyncServiceThread(CountDownLatch stopLatch) {
-      serviceImpl = new SyncServiceImpl();
-      processor = new SyncService.Processor<>(serviceImpl);
-      this.threadStopLatch = stopLatch;
-    }
-
-    @Override
-    public void run() {
-      try {
-        serverTransport = new TServerSocket(
-            new InetSocketAddress(conf.getRpcAddress(), conf.getSyncServerPort()));
-        if (conf.isRpcThriftCompressionEnable()) {
-          protocolFactory = new TCompactProtocol.Factory();
-        } else {
-          protocolFactory = new TBinaryProtocol.Factory();
-        }
-        poolArgs = new TThreadPoolServer.Args(serverTransport).stopTimeoutVal(
-            IoTDBDescriptor.getInstance().getConfig().getThriftServerAwaitTimeForStopService());
-        poolArgs.executorService = IoTDBThreadPoolFactory.createThriftRpcClientThreadPool(poolArgs,
-            ThreadName.SYNC_CLIENT.getName());
-        poolArgs.protocolFactory(protocolFactory);
-        poolArgs.processor(processor);
-        poolServer = new TThreadPoolServer(poolArgs);
-        poolServer.setServerEventHandler(new SyncServerThriftHandler(serviceImpl));
-        poolServer.serve();
-      } catch (TTransportException e) {
-        logger.error("{}: failed to start {}, because ", IoTDBConstant.GLOBAL_DB_NAME,
-            getID().getName(), e);
-      } catch (Exception e) {
-        logger.error("{}: {} exit, because ", IoTDBConstant.GLOBAL_DB_NAME, getID().getName(), e);
-      } finally {
-        close();
-        if (threadStopLatch != null && threadStopLatch.getCount() == 1) {
-          threadStopLatch.countDown();
-        }
-        logger.info("{}: close TThreadPoolServer and TServerSocket for {}",
-            IoTDBConstant.GLOBAL_DB_NAME, getID().getName());
-
-      }
-    }
-
-    private synchronized void close() {
-      if (poolServer != null) {
-        poolServer.stop();
-        poolServer = null;
-      }
-      if (serverTransport != null) {
-        serverTransport.close();
-        serverTransport = null;
-      }
-    }
+  @Override
+  public String getBindIP() {
+    return IoTDBDescriptor.getInstance().getConfig().getRpcAddress();
+  }
 
-    boolean isServing() {
-      if (poolServer != null) {
-        return  poolServer.isServing();
-      }
-      return false;
-    }
+  @Override
+  public int getBindPort() {
+    return IoTDBDescriptor.getInstance().getConfig().getSyncServerPort();
   }
 }
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManagerMBean.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManagerMBean.java
new file mode 100644
index 0000000..3992693
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManagerMBean.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.sync.receiver;
+
+import org.apache.iotdb.db.exception.StartupException;
+
+public interface SyncServerManagerMBean {
+
+  String getRPCServiceStatus();
+
+  int getRPCPort();
+
+  void startService() throws StartupException;
+
+  void restartService() throws StartupException;
+
+  void stopService();
+}