You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2020/07/02 09:11:55 UTC
[incubator-iotdb] 01/01: refactor the thrift rpc service to reduce
duplicate codes of RPC and Sync module
This is an automated email from the ASF dual-hosted git repository.
hxd pushed a commit to branch refactor_thrift_rpc_service
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit df2fde7595d195073522602b6607f54493d626f9
Author: xiangdong huang <sa...@gmail.com>
AuthorDate: Thu Jul 2 17:11:19 2020 +0800
refactor the thrift rpc service to reduce duplicate codes of RPC and Sync module
---
.../org/apache/iotdb/db/service/RPCService.java | 218 ++++-----------------
.../iotdb/db/service/thrift/ThriftService.java | 160 +++++++++++++++
.../db/service/thrift/ThriftServiceThread.java | 149 ++++++++++++++
.../iotdb/db/sync/receiver/SyncServerManager.java | 180 +++--------------
.../db/sync/receiver/SyncServerManagerMBean.java | 34 ++++
5 files changed, 410 insertions(+), 331 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/service/RPCService.java b/server/src/main/java/org/apache/iotdb/db/service/RPCService.java
index 2ae9928..ebf2feb 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/RPCService.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/RPCService.java
@@ -18,47 +18,21 @@
*/
package org.apache.iotdb.db.service;
-import java.net.InetSocketAddress;
-import java.util.concurrent.CountDownLatch;
-import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.concurrent.ThreadName;
import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.StartupException;
-import org.apache.iotdb.db.exception.runtime.RPCServiceException;
-import org.apache.iotdb.service.rpc.thrift.TSIService;
+import org.apache.iotdb.db.service.thrift.ThriftService;
+import org.apache.iotdb.db.service.thrift.ThriftServiceThread;
import org.apache.iotdb.service.rpc.thrift.TSIService.Processor;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TCompactProtocol;
-import org.apache.thrift.protocol.TProtocolFactory;
-import org.apache.thrift.server.TServer;
-import org.apache.thrift.server.TThreadPoolServer;
-import org.apache.thrift.transport.TFastFramedTransport;
-import org.apache.thrift.transport.TServerSocket;
-import org.apache.thrift.transport.TTransportException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* A service to handle jdbc request from client.
*/
-public class RPCService implements RPCServiceMBean, IService {
+public class RPCService extends ThriftService implements RPCServiceMBean {
- private static final Logger logger = LoggerFactory.getLogger(RPCService.class);
- private static final String STATUS_UP = "UP";
- private static final String STATUS_DOWN = "DOWN";
- private final String mbeanName = String
- .format("%s:%s=%s", IoTDBConstant.IOTDB_PACKAGE, IoTDBConstant.JMX_TYPE,
- getID().getJmxName());
- private RPCServiceThread rpcServiceThread;
- private TProtocolFactory protocolFactory;
- private Processor<TSIService.Iface> processor;
- private TThreadPoolServer.Args poolArgs;
private TSServiceImpl impl;
- private CountDownLatch stopLatch;
-
private RPCService() {
}
@@ -68,22 +42,7 @@ public class RPCService implements RPCServiceMBean, IService {
@Override
public String getRPCServiceStatus() {
- if (rpcServiceThread == null) {
- logger.debug("Start latch is null when getting status");
- } else {
- logger.debug("Start status is {} when getting status", rpcServiceThread.isServing());
- }
- if(stopLatch == null) {
- logger.debug("Stop latch is null when getting status");
- } else {
- logger.debug("Stop latch is {} when getting status", stopLatch.getCount());
- }
-
- if(rpcServiceThread != null && rpcServiceThread.isServing()) {
- return STATUS_UP;
- } else {
- return STATUS_DOWN;
- }
+ return super.getRPCServiceStatus();
}
@Override
@@ -93,15 +52,37 @@ public class RPCService implements RPCServiceMBean, IService {
}
@Override
- public void start() throws StartupException {
- JMXService.registerMBean(getInstance(), mbeanName);
- startService();
+ public ThriftService getImplementation() {
+ return getInstance();
+ }
+
+ @Override
+ public void initTProcessor() throws ClassNotFoundException,IllegalAccessException,InstantiationException{
+ impl = (TSServiceImpl) Class.forName(IoTDBDescriptor.getInstance().getConfig()
+ .getRpcImplClassName()).newInstance();
+ processor = new Processor<>(impl);
+ }
+
+ @Override
+ public void initThriftServiceThread()
+ throws IllegalAccessException, InstantiationException, ClassNotFoundException {
+ IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+ thriftServiceThread = new ThriftServiceThread(processor,
+ getID().getName(), ThreadName.RPC_CLIENT.getName(),
+ config.getRpcAddress(), config.getRpcPort(), config.getRpcMaxConcurrentClientNum(),
+ config.getThriftServerAwaitTimeForStopService(),
+ new RPCServiceThriftHandler(impl));
+ thriftServiceThread.setName(ThreadName.RPC_SERVICE.getName());
+ }
+
+ @Override
+ public String getBindIP() {
+ return IoTDBDescriptor.getInstance().getConfig().getRpcAddress();
}
@Override
- public void stop() {
- stopService();
- JMXService.deregisterMBean(mbeanName);
+ public int getBindPort() {
+ return IoTDBDescriptor.getInstance().getConfig().getRpcPort();
}
@Override
@@ -110,63 +91,18 @@ public class RPCService implements RPCServiceMBean, IService {
}
@Override
- @SuppressWarnings("squid:S2276")
public synchronized void startService() throws StartupException {
- if (STATUS_UP.equals(getRPCServiceStatus())) {
- logger.info("{}: {} has been already running now", IoTDBConstant.GLOBAL_DB_NAME,
- this.getID().getName());
- return;
- }
- logger.info("{}: start {}...", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName());
- try {
- reset();
- rpcServiceThread = new RPCServiceThread(stopLatch);
- rpcServiceThread.setName(ThreadName.RPC_SERVICE.getName());
- rpcServiceThread.start();
- while (!rpcServiceThread.isServing()) {
- //sleep 100ms for waiting the rpc server start.
- Thread.sleep(100);
- }
- } catch (InterruptedException | ClassNotFoundException |
- IllegalAccessException | InstantiationException e) {
- Thread.currentThread().interrupt();
- throw new StartupException(this.getID().getName(), e.getMessage());
- }
-
- logger.info("{}: start {} successfully, listening on ip {} port {}", IoTDBConstant.GLOBAL_DB_NAME,
- this.getID().getName(), IoTDBDescriptor.getInstance().getConfig().getRpcAddress(),
- IoTDBDescriptor.getInstance().getConfig().getRpcPort());
- }
-
- private void reset() {
- rpcServiceThread = null;
- stopLatch = new CountDownLatch(1);
+ super.startService();
}
@Override
public synchronized void restartService() throws StartupException {
- stopService();
- startService();
+ super.restartService();
}
@Override
public synchronized void stopService() {
- if (STATUS_DOWN.equals(getRPCServiceStatus())) {
- logger.info("{}: {} isn't running now", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName());
- return;
- }
- logger.info("{}: closing {}...", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName());
- if (rpcServiceThread != null) {
- rpcServiceThread.close();
- }
- try {
- stopLatch.await();
- reset();
- logger.info("{}: close {} successfully", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName());
- } catch (InterruptedException e) {
- logger.error("{}: close {} failed because: ", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName(), e);
- Thread.currentThread().interrupt();
- }
+ super.stopService();
}
private static class RPCServiceHolder {
@@ -177,88 +113,4 @@ public class RPCService implements RPCServiceMBean, IService {
}
}
- private class RPCServiceThread extends Thread {
-
- private TServerSocket serverTransport;
- private TServer poolServer;
- private CountDownLatch threadStopLatch;
-
- public RPCServiceThread(CountDownLatch threadStopLatch)
- throws ClassNotFoundException, IllegalAccessException, InstantiationException {
- if(IoTDBDescriptor.getInstance().getConfig().isRpcThriftCompressionEnable()) {
- protocolFactory = new TCompactProtocol.Factory();
- }
- else {
- protocolFactory = new TBinaryProtocol.Factory();
- }
- IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
- impl = (TSServiceImpl) Class.forName(config.getRpcImplClassName()).newInstance();
- processor = new TSIService.Processor<>(impl);
- this.threadStopLatch = threadStopLatch;
- }
-
- @SuppressWarnings("squid:S2093") // socket will be used later
- @Override
- public void run() {
- logger.info("The RPC service thread begin to run...");
- try {
- IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
- serverTransport = new TServerSocket(new InetSocketAddress(config.getRpcAddress(),
- config.getRpcPort()));
- //this is for testing.
- if (!serverTransport.getServerSocket().isBound()) {
- logger.error("The RPC service port is not bound.");
- }
- poolArgs = new TThreadPoolServer.Args(serverTransport).maxWorkerThreads(IoTDBDescriptor.
- getInstance().getConfig().getRpcMaxConcurrentClientNum()).minWorkerThreads(1)
- .stopTimeoutVal(
- IoTDBDescriptor.getInstance().getConfig().getThriftServerAwaitTimeForStopService());
- poolArgs.executorService = IoTDBThreadPoolFactory.createThriftRpcClientThreadPool(poolArgs,
- ThreadName.RPC_CLIENT.getName());
- poolArgs.processor(processor);
- poolArgs.protocolFactory(protocolFactory);
- poolArgs.transportFactory(new TFastFramedTransport.Factory());
- poolServer = new TThreadPoolServer(poolArgs);
- poolServer.setServerEventHandler(new RPCServiceThriftHandler(impl));
- poolServer.serve();
- } catch (TTransportException e) {
- throw new RPCServiceException(String.format("%s: failed to start %s, because ", IoTDBConstant.GLOBAL_DB_NAME,
- getID().getName()), e);
- } catch (Exception e) {
- throw new RPCServiceException(String.format("%s: %s exit, because ", IoTDBConstant.GLOBAL_DB_NAME, getID().getName()), e);
- } finally {
- close();
- if (threadStopLatch == null) {
- logger.debug("Stop Count Down latch is null");
- } else {
- logger.debug("Stop Count Down latch is {}", threadStopLatch.getCount());
- }
-
- if (threadStopLatch != null && threadStopLatch.getCount() == 1) {
- threadStopLatch.countDown();
- }
- logger.debug("{}: close TThreadPoolServer and TServerSocket for {}",
- IoTDBConstant.GLOBAL_DB_NAME, getID().getName());
- }
- }
-
- private synchronized void close() {
- if (poolServer != null) {
- poolServer.setShouldStop(true);
- poolServer.stop();
- poolServer = null;
- }
- if (serverTransport != null) {
- serverTransport.close();
- serverTransport = null;
- }
- }
-
- boolean isServing() {
- if (poolServer != null) {
- return poolServer.isServing();
- }
- return false;
- }
- }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/ThriftService.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/ThriftService.java
new file mode 100644
index 0000000..4460072
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/ThriftService.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.service.thrift;
+
+import java.util.concurrent.CountDownLatch;
+import org.apache.iotdb.db.concurrent.ThreadName;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.ShutdownException;
+import org.apache.iotdb.db.exception.StartupException;
+import org.apache.iotdb.db.service.IService;
+import org.apache.iotdb.db.service.JMXService;
+import org.apache.iotdb.db.service.RPCService;
+import org.apache.iotdb.db.service.RPCServiceThriftHandler;
+import org.apache.iotdb.db.service.ServiceType;
+import org.apache.iotdb.db.service.TSServiceImpl;
+import org.apache.iotdb.service.rpc.thrift.TSIService;
+import org.apache.iotdb.service.rpc.thrift.TSIService.Processor;
+import org.apache.thrift.TProcessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class ThriftService implements IService {
+
+ private static final Logger logger = LoggerFactory.getLogger(ThriftService.class);
+ private static final String STATUS_UP = "UP";
+ private static final String STATUS_DOWN = "DOWN";
+ protected final String mbeanName = String
+ .format("%s:%s=%s", IoTDBConstant.IOTDB_PACKAGE, IoTDBConstant.JMX_TYPE,
+ getID().getJmxName());
+ protected ThriftServiceThread thriftServiceThread;
+ protected TProcessor processor;
+ //private TSServiceImpl impl;
+
+ private CountDownLatch stopLatch;
+
+ public String getRPCServiceStatus() {
+ if (thriftServiceThread == null) {
+ logger.debug("Start latch is null when getting status");
+ } else {
+ logger.debug("Start status is {} when getting status", thriftServiceThread.isServing());
+ }
+ if(stopLatch == null) {
+ logger.debug("Stop latch is null when getting status");
+ } else {
+ logger.debug("Stop latch is {} when getting status", stopLatch.getCount());
+ }
+
+ if(thriftServiceThread != null && thriftServiceThread.isServing()) {
+ return STATUS_UP;
+ } else {
+ return STATUS_DOWN;
+ }
+ }
+
+ public int getRPCPort() {
+ IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+ return config.getRpcPort();
+ }
+
+ public abstract ServiceType getID();
+ public abstract ThriftService getImplementation();
+
+ @Override
+ public void start() throws StartupException {
+ JMXService.registerMBean(getImplementation(), mbeanName);
+ startService();
+ }
+
+ @Override
+ public void stop() {
+ stopService();
+ JMXService.deregisterMBean(mbeanName);
+ }
+
+ public abstract void initTProcessor() throws ClassNotFoundException,IllegalAccessException,InstantiationException;
+ public abstract void initThriftServiceThread()
+ throws IllegalAccessException, InstantiationException, ClassNotFoundException;
+ public abstract String getBindIP();
+ public abstract int getBindPort();
+
+
+ @SuppressWarnings("squid:S2276")
+ public synchronized void startService() throws StartupException {
+ if (STATUS_UP.equals(getRPCServiceStatus())) {
+ logger.info("{}: {} has been already running now", IoTDBConstant.GLOBAL_DB_NAME,
+ this.getID().getName());
+ return;
+ }
+ logger.info("{}: start {}...", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName());
+ try {
+ reset();
+ initTProcessor();
+ initThriftServiceThread();
+ thriftServiceThread.setThreadStopLatch(stopLatch);
+ thriftServiceThread.start();
+
+ while (!thriftServiceThread.isServing()) {
+ //sleep 100ms for waiting the rpc server start.
+ Thread.sleep(100);
+ }
+ } catch (InterruptedException | ClassNotFoundException |
+ IllegalAccessException | InstantiationException e) {
+ Thread.currentThread().interrupt();
+ throw new StartupException(this.getID().getName(), e.getMessage());
+ }
+
+ logger.info("{}: start {} successfully, listening on ip {} port {}", IoTDBConstant.GLOBAL_DB_NAME,
+ this.getID().getName(), getBindIP(), getBindPort());
+ }
+
+ private void reset() {
+ thriftServiceThread = null;
+ stopLatch = new CountDownLatch(1);
+ }
+
+
+ public synchronized void restartService() throws StartupException {
+ stopService();
+ startService();
+ }
+
+ public synchronized void stopService() {
+ if (STATUS_DOWN.equals(getRPCServiceStatus())) {
+ logger.info("{}: {} isn't running now", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName());
+ return;
+ }
+ logger.info("{}: closing {}...", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName());
+ if (thriftServiceThread != null) {
+ thriftServiceThread.close();
+ }
+ try {
+ stopLatch.await();
+ reset();
+ logger.info("{}: close {} successfully", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName());
+ } catch (InterruptedException e) {
+ logger.error("{}: close {} failed because: ", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName(), e);
+ Thread.currentThread().interrupt();
+ }
+ }
+
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/ThriftServiceThread.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/ThriftServiceThread.java
new file mode 100644
index 0000000..589667a
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/ThriftServiceThread.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.service.thrift;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.CountDownLatch;
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.runtime.RPCServiceException;
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.server.TServerEventHandler;
+import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.transport.TFastFramedTransport;
+import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ThriftServiceThread extends Thread{
+
+ private static final Logger logger = LoggerFactory.getLogger(ThriftServiceThread.class);
+ private TServerSocket serverTransport;
+ private TServer poolServer;
+ private CountDownLatch threadStopLatch;
+
+ private String serviceName;
+
+ private TProtocolFactory protocolFactory;
+ private TProcessor processor;
+ private TThreadPoolServer.Args poolArgs;
+
+
+ public ThriftServiceThread(TProcessor processor, String serviceName,
+ String threadsName,
+ String bindAddress, int port, int maxWorkerThreads, int timeoutMs,
+ TServerEventHandler serverEventHandler)
+ throws ClassNotFoundException, IllegalAccessException, InstantiationException {
+ if(IoTDBDescriptor.getInstance().getConfig().isRpcThriftCompressionEnable()) {
+ protocolFactory = new TCompactProtocol.Factory();
+ }
+ else {
+ protocolFactory = new TBinaryProtocol.Factory();
+ }
+ IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+ this.processor = processor;
+ this.serviceName = serviceName;
+
+ try {
+ serverTransport = new TServerSocket(new InetSocketAddress(bindAddress, port));
+ poolArgs = new TThreadPoolServer.Args(serverTransport)
+ .maxWorkerThreads(maxWorkerThreads)
+ .minWorkerThreads(1)
+ .stopTimeoutVal(timeoutMs);
+ poolArgs.executorService = IoTDBThreadPoolFactory.createThriftRpcClientThreadPool(poolArgs,
+ threadsName);
+ poolArgs.processor(processor);
+ poolArgs.protocolFactory(protocolFactory);
+ poolArgs.transportFactory(new TFastFramedTransport.Factory());
+ poolServer = new TThreadPoolServer(poolArgs);
+ poolServer.setServerEventHandler(serverEventHandler);
+ } catch (TTransportException e) {
+ close();
+ if (threadStopLatch == null) {
+ logger.debug("Stop Count Down latch is null");
+ } else {
+ logger.debug("Stop Count Down latch is {}", threadStopLatch.getCount());
+ }
+ if (threadStopLatch != null && threadStopLatch.getCount() == 1) {
+ threadStopLatch.countDown();
+ }
+ logger.debug("{}: close TThreadPoolServer and TServerSocket for {}",
+ IoTDBConstant.GLOBAL_DB_NAME, serviceName);
+ throw new RPCServiceException(String.format("%s: failed to start %s, because ",
+ IoTDBConstant.GLOBAL_DB_NAME, serviceName), e);
+ }
+ }
+
+ public void setThreadStopLatch(CountDownLatch threadStopLatch) {
+ this.threadStopLatch = threadStopLatch;
+ }
+
+ @SuppressWarnings("squid:S2093") // socket will be used later
+ @Override
+ public void run() {
+ logger.info("The {} service thread begin to run...", serviceName);
+ try {
+ poolServer.serve();
+ } catch (Exception e) {
+ throw new RPCServiceException(String.format("%s: %s exit, because ",
+ IoTDBConstant.GLOBAL_DB_NAME, serviceName), e);
+ } finally {
+ close();
+ if (threadStopLatch == null) {
+ logger.debug("Stop Count Down latch is null");
+ } else {
+ logger.debug("Stop Count Down latch is {}", threadStopLatch.getCount());
+ }
+
+ if (threadStopLatch != null && threadStopLatch.getCount() == 1) {
+ threadStopLatch.countDown();
+ }
+ logger.debug("{}: close TThreadPoolServer and TServerSocket for {}",
+ IoTDBConstant.GLOBAL_DB_NAME, serviceName);
+ }
+ }
+
+ public synchronized void close() {
+ if (poolServer != null) {
+ poolServer.setShouldStop(true);
+ poolServer.stop();
+
+ poolServer = null;
+ }
+ if (serverTransport != null) {
+ serverTransport.close();
+ serverTransport = null;
+ }
+ }
+
+ public boolean isServing() {
+ if (poolServer != null) {
+ return poolServer.isServing();
+ }
+ return false;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java
index fa432fa..3da8d71 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java
@@ -18,183 +18,67 @@
*/
package org.apache.iotdb.db.sync.receiver;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.concurrent.CountDownLatch;
-import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.concurrent.ThreadName;
import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.exception.StartupException;
-import org.apache.iotdb.db.service.IService;
import org.apache.iotdb.db.service.ServiceType;
-import org.apache.iotdb.db.sync.receiver.load.FileLoaderManager;
-import org.apache.iotdb.db.sync.receiver.recover.SyncReceiverLogAnalyzer;
+import org.apache.iotdb.db.service.thrift.ThriftService;
+import org.apache.iotdb.db.service.thrift.ThriftServiceThread;
import org.apache.iotdb.db.sync.receiver.transfer.SyncServiceImpl;
import org.apache.iotdb.service.sync.thrift.SyncService;
-import org.apache.iotdb.service.sync.thrift.SyncService.Processor;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TCompactProtocol;
-import org.apache.thrift.protocol.TProtocolFactory;
-import org.apache.thrift.server.TServer;
-import org.apache.thrift.server.TThreadPoolServer;
-import org.apache.thrift.transport.TServerSocket;
-import org.apache.thrift.transport.TTransportException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* sync receiver server.
*/
-public class SyncServerManager implements IService {
+public class SyncServerManager extends ThriftService implements SyncServerManagerMBean {
- private static final Logger logger = LoggerFactory.getLogger(SyncServerManager.class);
+ private SyncServiceImpl serviceImpl;
- private IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig();
-
- private SyncServiceThread syncServerThread;
-
- //stopLatch is also for letting the IT know whether the socket is closed.
- private CountDownLatch stopLatch;
+ private static class ServerManagerHolder {
- private SyncServerManager() {
+ private static final SyncServerManager INSTANCE = new SyncServerManager();
}
public static SyncServerManager getInstance() {
- return ServerManagerHolder.INSTANCE;
+ return SyncServerManager.ServerManagerHolder.INSTANCE;
}
- /**
- * Start sync receiver's server.
- */
@Override
- public void start() throws StartupException {
- if (!conf.isSyncEnable()) {
- return;
- }
- FileLoaderManager.getInstance().start();
- try {
- SyncReceiverLogAnalyzer.getInstance().recoverAll();
- } catch (IOException e) {
- logger.error("Can not recover receiver sync state", e);
- }
- if (conf.getIpWhiteList() == null) {
- logger.error(
- "Sync server failed to start because IP white list is null, please set IP white list.");
- return;
- }
- stopLatch = new CountDownLatch(1);
- conf.setIpWhiteList(conf.getIpWhiteList().replaceAll(" ", ""));
- syncServerThread = new SyncServiceThread(stopLatch);
- syncServerThread.setName(ThreadName.SYNC_SERVER.getName());
- syncServerThread.start();
- try {
- while (!syncServerThread.isServing()) {
- //sleep 100ms for waiting the sync server start.
- Thread.sleep(100);
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new StartupException(this.getID().getName(), e.getMessage());
- }
- logger.info("Sync server has started.");
+ public ServiceType getID() {
+ return ServiceType.SYNC_SERVICE;
}
- /**
- * Close sync receiver's server.
- */
@Override
- public void stop() {
- if (conf.isSyncEnable()) {
- FileLoaderManager.getInstance().stop();
- syncServerThread.close();
- try {
- stopLatch.await();
- } catch (InterruptedException e) {
- logger.error(e.getMessage(), e);
- Thread.currentThread().interrupt();
- }
- }
+ public ThriftService getImplementation() {
+ return getInstance();
}
@Override
- public ServiceType getID() {
- return ServiceType.SYNC_SERVICE;
+ public void initTProcessor() {
+ serviceImpl = new SyncServiceImpl();
+ processor = new SyncService.Processor<>(serviceImpl);
}
- private static class ServerManagerHolder {
-
- private static final SyncServerManager INSTANCE = new SyncServerManager();
+ @Override
+ public void initThriftServiceThread()
+ throws IllegalAccessException, InstantiationException, ClassNotFoundException {
+ IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+ thriftServiceThread = new ThriftServiceThread(processor,
+ getID().getName(), ThreadName.SYNC_CLIENT.getName(),
+ config.getRpcAddress(), config.getSyncServerPort(),
+ Integer.MAX_VALUE, config.getThriftServerAwaitTimeForStopService(),
+ new SyncServerThriftHandler(serviceImpl)
+ );
+ thriftServiceThread.setName(ThreadName.SYNC_SERVER.getName());
}
- private class SyncServiceThread extends Thread {
-
- private TServerSocket serverTransport;
- private TServer poolServer;
- private TProtocolFactory protocolFactory;
- private Processor<SyncService.Iface> processor;
- private TThreadPoolServer.Args poolArgs;
- private CountDownLatch threadStopLatch;
- private SyncServiceImpl serviceImpl;
-
- public SyncServiceThread(CountDownLatch stopLatch) {
- serviceImpl = new SyncServiceImpl();
- processor = new SyncService.Processor<>(serviceImpl);
- this.threadStopLatch = stopLatch;
- }
-
- @Override
- public void run() {
- try {
- serverTransport = new TServerSocket(
- new InetSocketAddress(conf.getRpcAddress(), conf.getSyncServerPort()));
- if (conf.isRpcThriftCompressionEnable()) {
- protocolFactory = new TCompactProtocol.Factory();
- } else {
- protocolFactory = new TBinaryProtocol.Factory();
- }
- poolArgs = new TThreadPoolServer.Args(serverTransport).stopTimeoutVal(
- IoTDBDescriptor.getInstance().getConfig().getThriftServerAwaitTimeForStopService());
- poolArgs.executorService = IoTDBThreadPoolFactory.createThriftRpcClientThreadPool(poolArgs,
- ThreadName.SYNC_CLIENT.getName());
- poolArgs.protocolFactory(protocolFactory);
- poolArgs.processor(processor);
- poolServer = new TThreadPoolServer(poolArgs);
- poolServer.setServerEventHandler(new SyncServerThriftHandler(serviceImpl));
- poolServer.serve();
- } catch (TTransportException e) {
- logger.error("{}: failed to start {}, because ", IoTDBConstant.GLOBAL_DB_NAME,
- getID().getName(), e);
- } catch (Exception e) {
- logger.error("{}: {} exit, because ", IoTDBConstant.GLOBAL_DB_NAME, getID().getName(), e);
- } finally {
- close();
- if (threadStopLatch != null && threadStopLatch.getCount() == 1) {
- threadStopLatch.countDown();
- }
- logger.info("{}: close TThreadPoolServer and TServerSocket for {}",
- IoTDBConstant.GLOBAL_DB_NAME, getID().getName());
-
- }
- }
-
- private synchronized void close() {
- if (poolServer != null) {
- poolServer.stop();
- poolServer = null;
- }
- if (serverTransport != null) {
- serverTransport.close();
- serverTransport = null;
- }
- }
+ @Override
+ public String getBindIP() {
+ return IoTDBDescriptor.getInstance().getConfig().getRpcAddress();
+ }
- boolean isServing() {
- if (poolServer != null) {
- return poolServer.isServing();
- }
- return false;
- }
+ @Override
+ public int getBindPort() {
+ return IoTDBDescriptor.getInstance().getConfig().getSyncServerPort();
}
}
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManagerMBean.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManagerMBean.java
new file mode 100644
index 0000000..3992693
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManagerMBean.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.sync.receiver;
+
+import org.apache.iotdb.db.exception.StartupException;
+
+public interface SyncServerManagerMBean {
+
+ String getRPCServiceStatus();
+
+ int getRPCPort();
+
+ void startService() throws StartupException;
+
+ void restartService() throws StartupException;
+
+ void stopService();
+}