You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/10/26 01:38:39 UTC

[doris] branch master updated: [improvement][refactor](mysql) remove old mysql server and add keep alive option (#13663)

This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new c709998faa [improvement][refactor](mysql) remove old mysql server and add keep alive option (#13663)
c709998faa is described below

commit c709998faa303bf7ad42d7dacb0cb05a3bf1cfd3
Author: Mingyu Chen <mo...@gmail.com>
AuthorDate: Wed Oct 26 09:38:33 2022 +0800

    [improvement][refactor](mysql) remove old mysql server and add keep alive option (#13663)
    
    * [improvement][refactor](mysql) remove old mysql server and add keep alive option
---
 docs/en/docs/admin-manual/config/fe-config.md      |   8 +-
 docs/zh-CN/docs/admin-manual/config/fe-config.md   |   6 -
 .../src/main/java/org/apache/doris/PaloFe.java     |   3 +-
 .../main/java/org/apache/doris/common/Config.java  |   5 -
 .../apache/doris/journal/bdbje/BDBDebugger.java    |   3 +-
 .../java/org/apache/doris/mysql/MysqlChannel.java  |   1 -
 .../java/org/apache/doris/mysql/MysqlServer.java   | 149 ---------------------
 .../org/apache/doris/mysql/nio/AcceptListener.java |   2 +
 .../org/apache/doris/mysql/nio/NMysqlServer.java   |  12 +-
 .../main/java/org/apache/doris/qe/QeService.java   |  21 +--
 .../org/apache/doris/mysql/MysqlServerTest.java    |  21 +--
 11 files changed, 17 insertions(+), 214 deletions(-)

diff --git a/docs/en/docs/admin-manual/config/fe-config.md b/docs/en/docs/admin-manual/config/fe-config.md
index a4de6ececb..f3a591bcd8 100644
--- a/docs/en/docs/admin-manual/config/fe-config.md
+++ b/docs/en/docs/admin-manual/config/fe-config.md
@@ -1667,13 +1667,7 @@ Cluster name will be shown as the title of web page
 
 Default:4
 
-When FeEstarts the MySQL server based on NIO model, the number of threads responsible for IO events. Only `mysql_service_nio_enabled` is true takes effect.
-
-### mysql_service_nio_enabled
-
-Default:true
-
-Whether FE starts the MySQL server based on NiO model. It is recommended to turn off this option when the query connection is less than 1000 or the concurrency scenario is not high
+When FeEstarts the MySQL server based on NIO model, the number of threads responsible for IO events.
 
 ### query_port
 
diff --git a/docs/zh-CN/docs/admin-manual/config/fe-config.md b/docs/zh-CN/docs/admin-manual/config/fe-config.md
index 6608b58845..69dfbe325b 100644
--- a/docs/zh-CN/docs/admin-manual/config/fe-config.md
+++ b/docs/zh-CN/docs/admin-manual/config/fe-config.md
@@ -1680,12 +1680,6 @@ mysql 中处理任务的最大线程数。
 
 mysql 中处理 io 事件的线程数。
 
-### `mysql_service_nio_enabled`
-
-默认值:true
-
-mysql 服务 nio 选项是否启用,默认启用
-
 ### `query_port`
 
 默认值:9030
diff --git a/fe/fe-core/src/main/java/org/apache/doris/PaloFe.java b/fe/fe-core/src/main/java/org/apache/doris/PaloFe.java
index 300768f95f..4eb52b9837 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/PaloFe.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/PaloFe.java
@@ -155,8 +155,7 @@ public class PaloFe {
             }
 
             if (options.enableQeService) {
-                QeService qeService = new QeService(Config.query_port, Config.mysql_service_nio_enabled,
-                        ExecuteEnv.getInstance().getScheduler());
+                QeService qeService = new QeService(Config.query_port, ExecuteEnv.getInstance().getScheduler());
                 qeService.start();
             }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
index ca8bb0ac1b..6859b1a3ac 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
@@ -409,11 +409,6 @@ public class Config extends ConfigBase {
      */
     @ConfField public static int query_port = 9030;
 
-    /**
-     * mysql service nio option.
-     */
-    @ConfField public static boolean mysql_service_nio_enabled = true;
-
     /**
      * num of thread to handle io events in mysql.
      */
diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBDebugger.java b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBDebugger.java
index bcde003e99..e6b0f872f4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBDebugger.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBDebugger.java
@@ -87,8 +87,7 @@ public class BDBDebugger {
         httpServer.start();
 
         // MySQl server
-        QeService qeService = new QeService(Config.query_port, Config.mysql_service_nio_enabled,
-                ExecuteEnv.getInstance().getScheduler());
+        QeService qeService = new QeService(Config.query_port, ExecuteEnv.getInstance().getScheduler());
         qeService.start();
 
         ThreadPoolManager.registerAllThreadPoolMetric();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java
index 9c8bd622da..416a62cf80 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java
@@ -52,7 +52,6 @@ public class MysqlChannel {
     protected String remoteIp;
     protected boolean isSend;
 
-
     protected MysqlChannel() {
         this.sequenceId = 0;
         this.sendBuffer = ByteBuffer.allocate(2 * 1024 * 1024);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlServer.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlServer.java
deleted file mode 100644
index 6655013b17..0000000000
--- a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlServer.java
+++ /dev/null
@@ -1,149 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.mysql;
-
-import org.apache.doris.catalog.Env;
-import org.apache.doris.common.ThreadPoolManager;
-import org.apache.doris.qe.ConnectContext;
-import org.apache.doris.qe.ConnectScheduler;
-
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.channels.ServerSocketChannel;
-import java.nio.channels.SocketChannel;
-import java.util.concurrent.Future;
-import java.util.concurrent.ThreadPoolExecutor;
-
-// MySQL protocol network service
-public class MysqlServer {
-    private static final Logger LOG = LogManager.getLogger(MysqlServer.class);
-
-    protected int port;
-    protected volatile boolean running;
-    private ServerSocketChannel serverChannel = null;
-    private ConnectScheduler scheduler = null;
-    // used to accept connect request from client
-    private ThreadPoolExecutor listener;
-    private Future listenerFuture;
-
-    public MysqlServer(int port, ConnectScheduler scheduler) {
-        this.port = port;
-        this.scheduler = scheduler;
-    }
-
-    protected MysqlServer() {
-    }
-
-    // start MySQL protocol service
-    // return true if success, otherwise false
-    public boolean start() {
-        if (scheduler == null) {
-            LOG.warn("scheduler is NULL.");
-            return false;
-        }
-
-        // open server socket
-        try {
-            serverChannel = ServerSocketChannel.open();
-            serverChannel.socket().bind(new InetSocketAddress("0.0.0.0", port), 2048);
-            serverChannel.configureBlocking(true);
-        } catch (IOException e) {
-            LOG.warn("Open MySQL network service failed.", e);
-            return false;
-        }
-
-        // start accept thread
-        listener = ThreadPoolManager.newDaemonCacheThreadPool(1, "MySQL-Protocol-Listener", true);
-        running = true;
-        listenerFuture = listener.submit(new Listener());
-
-        return true;
-    }
-
-    public void stop() {
-        if (running) {
-            running = false;
-            // close server channel, make accept throw exception
-            try {
-                serverChannel.close();
-            } catch (IOException e) {
-                LOG.warn("close server channel failed.", e);
-            }
-        }
-    }
-
-    public void join() {
-        try {
-            listenerFuture.get();
-        } catch (Exception e) {
-            // just return
-            LOG.warn("Join MySQL server exception.", e);
-        }
-    }
-
-    private class Listener implements Runnable {
-        @Override
-        public void run() {
-            while (running && serverChannel.isOpen()) {
-                SocketChannel clientChannel;
-                try {
-                    clientChannel = serverChannel.accept();
-                    if (clientChannel == null) {
-                        continue;
-                    }
-                    // submit this context to scheduler
-                    ConnectContext context = new ConnectContext(clientChannel);
-                    // Set catalog here.
-                    context.setEnv(Env.getCurrentEnv());
-                    if (!scheduler.submit(context)) {
-                        LOG.warn("Submit one connect request failed. Client=" + clientChannel.toString());
-                        // clear up context
-                        context.cleanup();
-                    }
-                } catch (IOException e) {
-                    // ClosedChannelException
-                    // AsynchronousCloseException
-                    // ClosedByInterruptException
-                    // Other IOException, for example "to many open files" ...
-                    LOG.warn("Query server encounter exception.", e);
-                    try {
-                        Thread.sleep(100);
-                    } catch (InterruptedException e1) {
-                       // Do nothing
-                    }
-                } catch (Throwable e) {
-                    // NotYetBoundException
-                    // SecurityException
-                    LOG.warn("Query server failed when calling accept.", e);
-                }
-            }
-        }
-    }
-
-    public ConnectScheduler getScheduler() {
-        return scheduler;
-    }
-
-    public void setScheduler(ConnectScheduler scheduler) {
-        this.scheduler = scheduler;
-    }
-
-}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/nio/AcceptListener.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/nio/AcceptListener.java
index 3814973fed..b4f28be2c5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mysql/nio/AcceptListener.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/nio/AcceptListener.java
@@ -27,6 +27,7 @@ import org.apache.doris.qe.ConnectScheduler;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.xnio.ChannelListener;
+import org.xnio.Options;
 import org.xnio.StreamConnection;
 import org.xnio.channels.AcceptingChannel;
 
@@ -50,6 +51,7 @@ public class AcceptListener implements ChannelListener<AcceptingChannel<StreamCo
             if (connection == null) {
                 return;
             }
+            connection.setOption(Options.KEEP_ALIVE, true);
             LOG.debug("Connection established. remote={}", connection.getPeerAddress());
             // connection has been established, so need to call context.cleanup()
             // if exception happens.
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/nio/NMysqlServer.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/nio/NMysqlServer.java
index e239d950cd..0c3e960046 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mysql/nio/NMysqlServer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/nio/NMysqlServer.java
@@ -19,7 +19,6 @@ package org.apache.doris.mysql.nio;
 
 import org.apache.doris.common.Config;
 import org.apache.doris.common.ThreadPoolManager;
-import org.apache.doris.mysql.MysqlServer;
 import org.apache.doris.qe.ConnectScheduler;
 
 import org.apache.logging.log4j.LogManager;
@@ -38,9 +37,12 @@ import java.util.concurrent.ExecutorService;
 /**
  * mysql protocol implementation based on nio.
  */
-public class NMysqlServer extends MysqlServer {
+public class NMysqlServer {
     private static final Logger LOG = LogManager.getLogger(NMysqlServer.class);
 
+    private int port;
+    private volatile boolean running;
+
     private XnioWorker xnioWorker;
 
     private AcceptListener acceptListener;
@@ -63,7 +65,6 @@ public class NMysqlServer extends MysqlServer {
 
     // start MySQL protocol service
     // return true if success, otherwise false
-    @Override
     public boolean start() {
         try {
             server = xnioWorker.createStreamConnectionServer(new InetSocketAddress(port), acceptListener,
@@ -78,7 +79,6 @@ public class NMysqlServer extends MysqlServer {
         }
     }
 
-    @Override
     public void stop() {
         if (running) {
             running = false;
@@ -90,8 +90,4 @@ public class NMysqlServer extends MysqlServer {
             }
         }
     }
-
-    public void setTaskService(ExecutorService taskService) {
-        this.taskService = taskService;
-    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QeService.java b/fe/fe-core/src/main/java/org/apache/doris/qe/QeService.java
index 90dd240c1f..45bec953d3 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/QeService.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QeService.java
@@ -17,7 +17,6 @@
 
 package org.apache.doris.qe;
 
-import org.apache.doris.mysql.MysqlServer;
 import org.apache.doris.mysql.nio.NMysqlServer;
 
 import org.apache.logging.log4j.LogManager;
@@ -32,21 +31,16 @@ public class QeService {
 
     private int port;
     // MySQL protocol service
-    private MysqlServer mysqlServer;
+    private NMysqlServer mysqlServer;
 
     @Deprecated
     public QeService(int port) {
         this.port = port;
     }
 
-    public QeService(int port, boolean nioEnabled, ConnectScheduler scheduler) {
-
+    public QeService(int port, ConnectScheduler scheduler) {
         this.port = port;
-        if (nioEnabled) {
-            mysqlServer = new NMysqlServer(port, scheduler);
-        } else {
-            mysqlServer = new MysqlServer(port, scheduler);
-        }
+        this.mysqlServer = new NMysqlServer(port, scheduler);
     }
 
     public void start() throws Exception {
@@ -64,13 +58,4 @@ public class QeService {
         }
         LOG.info("QE service start.");
     }
-
-    public MysqlServer getMysqlServer() {
-        return mysqlServer;
-    }
-
-    public void setMysqlServer(MysqlServer mysqlServer) {
-        this.mysqlServer = mysqlServer;
-    }
-
 }
diff --git a/fe/fe-core/src/test/java/org/apache/doris/mysql/MysqlServerTest.java b/fe/fe-core/src/test/java/org/apache/doris/mysql/MysqlServerTest.java
index 00eb1a82ef..353b137cb6 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/mysql/MysqlServerTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/mysql/MysqlServerTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.mysql;
 
+import org.apache.doris.mysql.nio.NMysqlServer;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.ConnectScheduler;
 
@@ -83,7 +84,7 @@ public class MysqlServerTest {
         int port = socket.getLocalPort();
         socket.close();
 
-        MysqlServer server = new MysqlServer(port, scheduler);
+        NMysqlServer server = new NMysqlServer(port, scheduler);
         Assert.assertTrue(server.start());
 
         // submit
@@ -102,32 +103,21 @@ public class MysqlServerTest {
 
         // stop and join
         server.stop();
-        server.join();
 
         Assert.assertEquals(2, submitNum);
     }
 
-    @Test
-    public void testInvalidParam() throws IOException {
-        ServerSocket socket = new ServerSocket(0);
-        int port = socket.getLocalPort();
-        socket.close();
-        MysqlServer server = new MysqlServer(port, null);
-        Assert.assertFalse(server.start());
-    }
-
     @Test
     public void testBindFail() throws IOException {
         ServerSocket socket = new ServerSocket(0);
         int port = socket.getLocalPort();
         socket.close();
-        MysqlServer server = new MysqlServer(port, scheduler);
+        NMysqlServer server = new NMysqlServer(port, scheduler);
         Assert.assertTrue(server.start());
-        MysqlServer server1 = new MysqlServer(port, scheduler);
+        NMysqlServer server1 = new NMysqlServer(port, scheduler);
         Assert.assertFalse(server1.start());
 
         server.stop();
-        server.join();
     }
 
     @Test
@@ -135,7 +125,7 @@ public class MysqlServerTest {
         ServerSocket socket = new ServerSocket(0);
         int port = socket.getLocalPort();
         socket.close();
-        MysqlServer server = new MysqlServer(port, badScheduler);
+        NMysqlServer server = new NMysqlServer(port, badScheduler);
         Assert.assertTrue(server.start());
 
         // submit
@@ -154,7 +144,6 @@ public class MysqlServerTest {
 
         // stop and join
         server.stop();
-        server.join();
 
         Assert.assertEquals(2, submitFailNum);
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org