You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2020/04/16 09:12:29 UTC

[incubator-iotdb] branch disable_mqtt_server updated: add log for TServerSocket

This is an automated email from the ASF dual-hosted git repository.

hxd pushed a commit to branch disable_mqtt_server
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/disable_mqtt_server by this push:
     new 37fc5d5  add log for TServerSocket
37fc5d5 is described below

commit 37fc5d5976e6b3c8c222dc346953f2e690a1d573
Author: xiangdong huang <sa...@gmail.com>
AuthorDate: Thu Apr 16 17:12:11 2020 +0800

    add log for TServerSocket
---
 .../org/apache/iotdb/db/service/RPCService.java    |   5 +-
 .../thrift/transport/CustomizedTServerSocket.java  | 138 +++++++++++++++++++++
 server/src/test/resources/logback.xml              |   2 +-
 3 files changed, 142 insertions(+), 3 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/service/RPCService.java b/server/src/main/java/org/apache/iotdb/db/service/RPCService.java
index 8983bda..691c3de 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/RPCService.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/RPCService.java
@@ -35,6 +35,7 @@ import org.apache.thrift.protocol.TProtocolFactory;
 import org.apache.thrift.server.CustomizedTThreadPoolServer;
 import org.apache.thrift.server.TServer;
 import org.apache.thrift.server.CustomizedTThreadPoolServer.Args;
+import org.apache.thrift.transport.CustomizedTServerSocket;
 import org.apache.thrift.transport.TServerSocket;
 import org.apache.thrift.transport.TTransportException;
 import org.slf4j.Logger;
@@ -175,7 +176,7 @@ public class RPCService implements RPCServiceMBean, IService {
 
   private class RPCServiceThread extends Thread {
 
-    private TServerSocket serverTransport;
+    private CustomizedTServerSocket serverTransport;
     private TServer poolServer;
     private CountDownLatch threadStartLatch;
     private CountDownLatch threadStopLatch;
@@ -201,7 +202,7 @@ public class RPCService implements RPCServiceMBean, IService {
       logger.info("The RPC service thread begin to run...");
       try {
         IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
-        serverTransport = new TServerSocket(new InetSocketAddress(config.getRpcAddress(),
+        serverTransport = new CustomizedTServerSocket(new InetSocketAddress(config.getRpcAddress(),
             config.getRpcPort()));
         //this is for testing.
         if (!serverTransport.getServerSocket().isBound()) {
diff --git a/server/src/main/java/org/apache/thrift/transport/CustomizedTServerSocket.java b/server/src/main/java/org/apache/thrift/transport/CustomizedTServerSocket.java
new file mode 100644
index 0000000..05714fd
--- /dev/null
+++ b/server/src/main/java/org/apache/thrift/transport/CustomizedTServerSocket.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.thrift.transport;
+
+import java.net.SocketOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketException;
+
+/**
+ * Wrapper around ServerSocket for Thrift.
+ *
+ */
+public class CustomizedTServerSocket extends TServerTransport {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(CustomizedTServerSocket.class);
+
+  /**
+   * Underlying ServerSocket object
+   */
+  private ServerSocket serverSocket_ = null;
+
+  /**
+   * Timeout for client sockets from accept
+   */
+  private int clientTimeout_ = 0;
+
+  public static class ServerSocketTransportArgs extends AbstractServerTransportArgs<ServerSocketTransportArgs> {
+    ServerSocket serverSocket;
+
+    public ServerSocketTransportArgs serverSocket(ServerSocket serverSocket) {
+      this.serverSocket = serverSocket;
+      return this;
+    }
+
+    public ServerSocketTransportArgs bindAddr(InetSocketAddress bindAddr) {
+      super.bindAddr(bindAddr);
+      return this;
+    }
+  }
+
+  public CustomizedTServerSocket(InetSocketAddress bindAddr) throws TTransportException {
+    this(bindAddr, 0);
+  }
+
+  public CustomizedTServerSocket(InetSocketAddress bindAddr, int clientTimeout) throws TTransportException {
+    this(new ServerSocketTransportArgs().bindAddr(bindAddr).clientTimeout(clientTimeout));
+  }
+
+  public CustomizedTServerSocket(ServerSocketTransportArgs args) throws TTransportException {
+    clientTimeout_ = args.clientTimeout;
+    if (args.serverSocket != null) {
+      LOGGER.info("server socket is not null, reuse it.");
+      this.serverSocket_ = args.serverSocket;
+      return;
+    }
+    try {
+      // Make server socket
+      serverSocket_ = new ServerSocket();
+      // Prevent 2MSL delay problem on server restarts
+      serverSocket_.setReuseAddress(true);
+      // Bind to listening port
+      serverSocket_.bind(args.bindAddr, args.backlog);
+      LOGGER.info("server socket is null, generate one {}.", args.bindAddr);
+    } catch (IOException ioe) {
+      close();
+      throw new TTransportException("Could not create ServerSocket on address " + args.bindAddr.toString() + ".", ioe);
+    }
+  }
+
+  public void listen() throws TTransportException {
+    // Make sure to block on accept
+    if (serverSocket_ != null) {
+      try {
+        serverSocket_.setSoTimeout(0);
+      } catch (SocketException sx) {
+        LOGGER.error("Could not set socket timeout.", sx);
+      }
+    }
+  }
+
+  protected TSocket acceptImpl() throws TTransportException {
+    if (serverSocket_ == null) {
+      throw new TTransportException(TTransportException.NOT_OPEN, "No underlying server socket.");
+    }
+    try {
+      Socket result = serverSocket_.accept();
+      TSocket result2 = new TSocket(result);
+      result2.setTimeout(clientTimeout_);
+      return result2;
+    } catch (IOException iox) {
+      throw new TTransportException(iox);
+    }
+  }
+
+  public void close() {
+    if (serverSocket_ != null) {
+      try {
+        serverSocket_.close();
+      } catch (IOException iox) {
+        LOGGER.warn("Could not close server socket.", iox);
+      }
+      serverSocket_ = null;
+    }
+  }
+
+  public void interrupt() {
+    // The thread-safeness of this is dubious, but Java documentation suggests
+    // that it is safe to do this from a different thread context
+    close();
+  }
+
+  public ServerSocket getServerSocket() {
+    return serverSocket_;
+  }
+}
diff --git a/server/src/test/resources/logback.xml b/server/src/test/resources/logback.xml
index 7ebde9d..d1c9bd3 100644
--- a/server/src/test/resources/logback.xml
+++ b/server/src/test/resources/logback.xml
@@ -51,7 +51,7 @@
     <logger name="org.apache.iotdb.db.service.MQTTService" level="INFO"/>
     <logger name="org.apache.iotdb.db.sync.receiver.SyncServerManager" level="INFO"/>
     <logger name="org.apache.thrift.server.CustomizedTThreadPoolServer" level="INFO" />
-
+    <logger name="org.apache.thrift.transport.CustomizedTServerSocket" level="INFO" />
     <root level="WARN">
         <appender-ref ref="stdout"/>
     </root>