You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2020/04/16 09:12:29 UTC
[incubator-iotdb] branch disable_mqtt_server updated: add log for
TServerSocket
This is an automated email from the ASF dual-hosted git repository.
hxd pushed a commit to branch disable_mqtt_server
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/disable_mqtt_server by this push:
new 37fc5d5 add log for TServerSocket
37fc5d5 is described below
commit 37fc5d5976e6b3c8c222dc346953f2e690a1d573
Author: xiangdong huang <sa...@gmail.com>
AuthorDate: Thu Apr 16 17:12:11 2020 +0800
add log for TServerSocket
---
.../org/apache/iotdb/db/service/RPCService.java | 5 +-
.../thrift/transport/CustomizedTServerSocket.java | 138 +++++++++++++++++++++
server/src/test/resources/logback.xml | 2 +-
3 files changed, 142 insertions(+), 3 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/service/RPCService.java b/server/src/main/java/org/apache/iotdb/db/service/RPCService.java
index 8983bda..691c3de 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/RPCService.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/RPCService.java
@@ -35,6 +35,7 @@ import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.server.CustomizedTThreadPoolServer;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.CustomizedTThreadPoolServer.Args;
+import org.apache.thrift.transport.CustomizedTServerSocket;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
@@ -175,7 +176,7 @@ public class RPCService implements RPCServiceMBean, IService {
private class RPCServiceThread extends Thread {
- private TServerSocket serverTransport;
+ private CustomizedTServerSocket serverTransport;
private TServer poolServer;
private CountDownLatch threadStartLatch;
private CountDownLatch threadStopLatch;
@@ -201,7 +202,7 @@ public class RPCService implements RPCServiceMBean, IService {
logger.info("The RPC service thread begin to run...");
try {
IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
- serverTransport = new TServerSocket(new InetSocketAddress(config.getRpcAddress(),
+ serverTransport = new CustomizedTServerSocket(new InetSocketAddress(config.getRpcAddress(),
config.getRpcPort()));
//this is for testing.
if (!serverTransport.getServerSocket().isBound()) {
diff --git a/server/src/main/java/org/apache/thrift/transport/CustomizedTServerSocket.java b/server/src/main/java/org/apache/thrift/transport/CustomizedTServerSocket.java
new file mode 100644
index 0000000..05714fd
--- /dev/null
+++ b/server/src/main/java/org/apache/thrift/transport/CustomizedTServerSocket.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.thrift.transport;
+
+import java.net.SocketOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketException;
+
+/**
+ * Wrapper around ServerSocket for Thrift.
+ *
+ */
+public class CustomizedTServerSocket extends TServerTransport {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(CustomizedTServerSocket.class);
+
+ /**
+ * Underlying ServerSocket object
+ */
+ private ServerSocket serverSocket_ = null;
+
+ /**
+ * Timeout for client sockets from accept
+ */
+ private int clientTimeout_ = 0;
+
+ public static class ServerSocketTransportArgs extends AbstractServerTransportArgs<ServerSocketTransportArgs> {
+ ServerSocket serverSocket;
+
+ public ServerSocketTransportArgs serverSocket(ServerSocket serverSocket) {
+ this.serverSocket = serverSocket;
+ return this;
+ }
+
+ public ServerSocketTransportArgs bindAddr(InetSocketAddress bindAddr) {
+ super.bindAddr(bindAddr);
+ return this;
+ }
+ }
+
+ public CustomizedTServerSocket(InetSocketAddress bindAddr) throws TTransportException {
+ this(bindAddr, 0);
+ }
+
+ public CustomizedTServerSocket(InetSocketAddress bindAddr, int clientTimeout) throws TTransportException {
+ this(new ServerSocketTransportArgs().bindAddr(bindAddr).clientTimeout(clientTimeout));
+ }
+
+ public CustomizedTServerSocket(ServerSocketTransportArgs args) throws TTransportException {
+ clientTimeout_ = args.clientTimeout;
+ if (args.serverSocket != null) {
+ LOGGER.info("server socket is not null, reuse it.");
+ this.serverSocket_ = args.serverSocket;
+ return;
+ }
+ try {
+ // Make server socket
+ serverSocket_ = new ServerSocket();
+ // Prevent 2MSL delay problem on server restarts
+ serverSocket_.setReuseAddress(true);
+ // Bind to listening port
+ serverSocket_.bind(args.bindAddr, args.backlog);
+ LOGGER.info("server socket is null, generate one {}.", args.bindAddr);
+ } catch (IOException ioe) {
+ close();
+ throw new TTransportException("Could not create ServerSocket on address " + args.bindAddr.toString() + ".", ioe);
+ }
+ }
+
+ public void listen() throws TTransportException {
+ // Make sure to block on accept
+ if (serverSocket_ != null) {
+ try {
+ serverSocket_.setSoTimeout(0);
+ } catch (SocketException sx) {
+ LOGGER.error("Could not set socket timeout.", sx);
+ }
+ }
+ }
+
+ protected TSocket acceptImpl() throws TTransportException {
+ if (serverSocket_ == null) {
+ throw new TTransportException(TTransportException.NOT_OPEN, "No underlying server socket.");
+ }
+ try {
+ Socket result = serverSocket_.accept();
+ TSocket result2 = new TSocket(result);
+ result2.setTimeout(clientTimeout_);
+ return result2;
+ } catch (IOException iox) {
+ throw new TTransportException(iox);
+ }
+ }
+
+ public void close() {
+ if (serverSocket_ != null) {
+ try {
+ serverSocket_.close();
+ } catch (IOException iox) {
+ LOGGER.warn("Could not close server socket.", iox);
+ }
+ serverSocket_ = null;
+ }
+ }
+
+ public void interrupt() {
+ // The thread-safeness of this is dubious, but Java documentation suggests
+ // that it is safe to do this from a different thread context
+ close();
+ }
+
+ public ServerSocket getServerSocket() {
+ return serverSocket_;
+ }
+}
diff --git a/server/src/test/resources/logback.xml b/server/src/test/resources/logback.xml
index 7ebde9d..d1c9bd3 100644
--- a/server/src/test/resources/logback.xml
+++ b/server/src/test/resources/logback.xml
@@ -51,7 +51,7 @@
<logger name="org.apache.iotdb.db.service.MQTTService" level="INFO"/>
<logger name="org.apache.iotdb.db.sync.receiver.SyncServerManager" level="INFO"/>
<logger name="org.apache.thrift.server.CustomizedTThreadPoolServer" level="INFO" />
-
+ <logger name="org.apache.thrift.transport.CustomizedTServerSocket" level="INFO" />
<root level="WARN">
<appender-ref ref="stdout"/>
</root>