You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shenyu.apache.org by xi...@apache.org on 2023/05/04 04:20:57 UTC

[shenyu] branch master updated: Tcp plugin (#4607)

This is an automated email from the ASF dual-hosted git repository.

xiaoyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shenyu.git


The following commit(s) were added to refs/heads/master by this push:
     new 86a4c1c14 Tcp plugin (#4607)
86a4c1c14 is described below

commit 86a4c1c14e3464318e115aaa9d6e3fea14747752
Author: 杨文杰 <31...@users.noreply.github.com>
AuthorDate: Thu May 4 12:20:51 2023 +0800

    Tcp plugin (#4607)
    
    * init tcp plugin
    
    * init tcp plugin
    
    * tigger ci
    
    * init tcp
    
    * init tcp
    
    * demo
    
    * add tcp
    proxy demo
    
    * change code
    
    * add tcp
    proxy demo
    
    * change code
    
    * change code
    
    * change code
    
    * change codes
    
    * tcp proxy
    
    * tcp proxy
    
    * add tcp
    proxy
    
    * add tcp
    proxy
    
    * change tcp proxy code
    
    * add tcp
    proxy
    
    * support hot update
    
    * support hot update
    
    * add tcp
    proxy
    
    * add tcp
    proxy
    
    * add tcp
    proxy
    
    * add tcp
    proxy
---
 pom.xml                                            |   2 +-
 .../shenyu/common/dto/ProxySelectorData.java       | 146 ++++++++++++++++++
 .../dto/convert/selector/DiscoveryUpstream.java    |  56 ++++---
 .../org/apache/shenyu/common/enums/PluginEnum.java |   4 +
 .../discovery/api/ShenyuDiscoveryService.java      |  19 ++-
 .../zookeeper/ZookeeperDiscoveryService.java       |  17 ++-
 shenyu-plugin/pom.xml                              |   1 +
 .../base/handler/ProxySelectorDataHandler.java     |  43 +++---
 shenyu-plugin/shenyu-plugin-tcp/pom.xml            |  51 +++++++
 .../plugin/tcp/handler/TcpBootstrapFactory.java    |  57 +++++++
 .../plugin/tcp/handler/TcpUpstreamDataHandler.java |  72 +++++++++
 shenyu-protocol/pom.xml                            |   1 +
 shenyu-protocol/shenyu-protocol-tcp/pom.xml        |  71 +++++++++
 .../shenyu/protocol/tcp/BootstrapServer.java       |  39 +++--
 .../shenyu/protocol/tcp/TcpBootstrapServer.java    | 121 +++++++++++++++
 .../protocol/tcp/TcpServerConfiguration.java       | 167 +++++++++++++++++++++
 .../shenyu/protocol/tcp/UpstreamProvider.java      |  79 ++++++++++
 .../tcp/connection/ActivityConnectionObserver.java |  96 ++++++++++++
 .../shenyu/protocol/tcp/connection/Bridge.java     |  35 ++---
 .../connection/ClientConnectionConfigProvider.java |  36 ++---
 .../protocol/tcp/connection/ConnectionContext.java |  76 ++++++++++
 .../DefaultConnectionConfigProvider.java           |  67 +++++++++
 .../tcp/connection/TcpConnectionBridge.java        |  53 +++++++
 23 files changed, 1186 insertions(+), 123 deletions(-)

diff --git a/pom.xml b/pom.xml
index c7344e46e..06f6f9b1b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -112,7 +112,6 @@
         <wiremock.version>2.18.0</wiremock.version>
         <zookeeper.version>3.5.7</zookeeper.version>
         <zkclient.version>0.10</zkclient.version>
-        <apollo.version>2.1.0</apollo.version>
         <shiro.version>1.8.0</shiro.version>
         <jwt.version>3.12.0</jwt.version>
         <motan.version>1.2.0</motan.version>
@@ -170,6 +169,7 @@
         <caffeine.version>2.9.3</caffeine.version>
         <httpasyncclient.version>4.1.5</httpasyncclient.version>
         <asm.version>9.2</asm.version>
+        <apollo.version>2.1.0</apollo.version>
     </properties>
 
     <dependencyManagement>
diff --git a/shenyu-common/src/main/java/org/apache/shenyu/common/dto/ProxySelectorData.java b/shenyu-common/src/main/java/org/apache/shenyu/common/dto/ProxySelectorData.java
new file mode 100644
index 000000000..09d3d852e
--- /dev/null
+++ b/shenyu-common/src/main/java/org/apache/shenyu/common/dto/ProxySelectorData.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.common.dto;
+
+import java.util.Properties;
+
+/**
+ * ProxySelectorData.
+ */
+public class ProxySelectorData {
+
+    private String id;
+
+    private String name;
+
+    private String pluginName;
+
+    private String type;
+
+    private Integer forwardPort;
+
+    private Properties props = new Properties();
+
+    /**
+     * getId.
+     *
+     * @return id
+     */
+    public String getId() {
+        return id;
+    }
+
+    /**
+     * setId.
+     *
+     * @param id id
+     */
+    public void setId(final String id) {
+        this.id = id;
+    }
+
+    /**
+     * getName.
+     *
+     * @return name
+     */
+    public String getName() {
+        return name;
+    }
+
+    /**
+     * setName.
+     *
+     * @param name name
+     */
+    public void setName(final String name) {
+        this.name = name;
+    }
+
+    /**
+     * getPluginName.
+     *
+     * @return pluginName
+     */
+    public String getPluginName() {
+        return pluginName;
+    }
+
+    /**
+     * setPluginName.
+     *
+     * @param pluginName pluginName
+     */
+    public void setPluginName(final String pluginName) {
+        this.pluginName = pluginName;
+    }
+
+    /**
+     * getType.
+     *
+     * @return type
+     */
+    public String getType() {
+        return type;
+    }
+
+    /**
+     * setType.
+     *
+     * @param type type
+     */
+    public void setType(final String type) {
+        this.type = type;
+    }
+
+    /**
+     * getForwardPort.
+     *
+     * @return forwardPort
+     */
+    public Integer getForwardPort() {
+        return forwardPort;
+    }
+
+    /**
+     * setForwardPort.
+     *
+     * @param forwardPort forwardPort
+     */
+    public void setForwardPort(final Integer forwardPort) {
+        this.forwardPort = forwardPort;
+    }
+
+    /**
+     * getProps.
+     *
+     * @return props
+     */
+    public Properties getProps() {
+        return props;
+    }
+
+    /**
+     * setProps.
+     *
+     * @param props props
+     */
+    public void setProps(final Properties props) {
+        this.props = props;
+    }
+}
diff --git a/shenyu-discovery/shenyu-discovery-api/src/main/java/org/apache/shenyu/discovery/api/ShenyuDiscoveryService.java b/shenyu-common/src/main/java/org/apache/shenyu/common/dto/convert/selector/DiscoveryUpstream.java
similarity index 54%
copy from shenyu-discovery/shenyu-discovery-api/src/main/java/org/apache/shenyu/discovery/api/ShenyuDiscoveryService.java
copy to shenyu-common/src/main/java/org/apache/shenyu/common/dto/convert/selector/DiscoveryUpstream.java
index e286e7015..a81217e6a 100644
--- a/shenyu-discovery/shenyu-discovery-api/src/main/java/org/apache/shenyu/discovery/api/ShenyuDiscoveryService.java
+++ b/shenyu-common/src/main/java/org/apache/shenyu/common/dto/convert/selector/DiscoveryUpstream.java
@@ -15,38 +15,50 @@
  * limitations under the License.
  */
 
-package org.apache.shenyu.discovery.api;
-
-import org.apache.shenyu.discovery.api.config.DiscoveryConfig;
-import org.apache.shenyu.discovery.api.listener.DataChangedEventListener;
-import org.apache.shenyu.spi.SPI;
+package org.apache.shenyu.common.dto.convert.selector;
 
 /**
- * The interface for shenyu discovery service.
+ * DiscoveryUpstream.
  */
-@SPI
-public interface ShenyuDiscoveryService {
-    
+public class DiscoveryUpstream extends CommonUpstream {
+
+    private int weight;
+
+    private String prop;
+
+    /**
+     * getWeight.
+     *
+     * @return weight
+     */
+    public int getWeight() {
+        return weight;
+    }
+
     /**
-     * Init shenyu discovery service .
+     * setWeight.
      *
-     * @param config the config
+     * @param weight weight
      */
-    void init(DiscoveryConfig config);
-    
+    public void setWeight(final int weight) {
+        this.weight = weight;
+    }
+
     /**
-     * Watcher path , fire data changed event.
+     * getProp.
      *
-     * @param key the key
-     * @param listener the listener
+     * @return prop
      */
-    void watcher(String key, DataChangedEventListener listener);
-    
+    public String getProp() {
+        return prop;
+    }
+
     /**
-     * Register data.
+     * setProp.
      *
-     * @param key the key
-     * @param value the value
+     * @param prop prop
      */
-    void register(String key, String value);
+    public void setProp(final String prop) {
+        this.prop = prop;
+    }
 }
diff --git a/shenyu-common/src/main/java/org/apache/shenyu/common/enums/PluginEnum.java b/shenyu-common/src/main/java/org/apache/shenyu/common/enums/PluginEnum.java
index 8e28218de..ea0b0d81e 100644
--- a/shenyu-common/src/main/java/org/apache/shenyu/common/enums/PluginEnum.java
+++ b/shenyu-common/src/main/java/org/apache/shenyu/common/enums/PluginEnum.java
@@ -31,6 +31,10 @@ public enum PluginEnum {
      * Global plugin enum.
      */
     GLOBAL(-1, 0, "global"),
+    /**
+     * Tcp plugin enum.
+     */
+    TCP(0, 0, "tcp"),
     /**
      * Mqtt plugin enum.
      */
diff --git a/shenyu-discovery/shenyu-discovery-api/src/main/java/org/apache/shenyu/discovery/api/ShenyuDiscoveryService.java b/shenyu-discovery/shenyu-discovery-api/src/main/java/org/apache/shenyu/discovery/api/ShenyuDiscoveryService.java
index e286e7015..f59b13f64 100644
--- a/shenyu-discovery/shenyu-discovery-api/src/main/java/org/apache/shenyu/discovery/api/ShenyuDiscoveryService.java
+++ b/shenyu-discovery/shenyu-discovery-api/src/main/java/org/apache/shenyu/discovery/api/ShenyuDiscoveryService.java
@@ -26,27 +26,36 @@ import org.apache.shenyu.spi.SPI;
  */
 @SPI
 public interface ShenyuDiscoveryService {
-    
+
     /**
      * Init shenyu discovery service .
      *
      * @param config the config
      */
     void init(DiscoveryConfig config);
-    
+
     /**
      * Watcher path , fire data changed event.
      *
-     * @param key the key
+     * @param key      the key
      * @param listener the listener
      */
     void watcher(String key, DataChangedEventListener listener);
-    
+
     /**
      * Register data.
      *
-     * @param key the key
+     * @param key   the key
      * @param value the value
      */
     void register(String key, String value);
+
+    /**
+     * getData by key.
+     *
+     * @param key key
+     * @return value
+     */
+    String getData(String key);
+
 }
diff --git a/shenyu-discovery/shenyu-discovery-zookeeper/src/main/java/org/apache/shenyu/discovery/zookeeper/ZookeeperDiscoveryService.java b/shenyu-discovery/shenyu-discovery-zookeeper/src/main/java/org/apache/shenyu/discovery/zookeeper/ZookeeperDiscoveryService.java
index 0ffa670b9..8e6e2ebae 100644
--- a/shenyu-discovery/shenyu-discovery-zookeeper/src/main/java/org/apache/shenyu/discovery/zookeeper/ZookeeperDiscoveryService.java
+++ b/shenyu-discovery/shenyu-discovery-zookeeper/src/main/java/org/apache/shenyu/discovery/zookeeper/ZookeeperDiscoveryService.java
@@ -27,19 +27,24 @@ import org.apache.shenyu.spi.Join;
  */
 @Join
 public class ZookeeperDiscoveryService implements ShenyuDiscoveryService {
-    
+
     @Override
     public void init(final DiscoveryConfig config) {
-        
+
     }
-    
+
     @Override
     public void watcher(final String key, final DataChangedEventListener listener) {
-        
+
     }
-    
+
     @Override
     public void register(final String key, final String value) {
-        
+
+    }
+
+    @Override
+    public String getData(final String key) {
+        return null;
     }
 }
diff --git a/shenyu-plugin/pom.xml b/shenyu-plugin/pom.xml
index fd9ad504b..81b56c2df 100644
--- a/shenyu-plugin/pom.xml
+++ b/shenyu-plugin/pom.xml
@@ -65,5 +65,6 @@
         <module>shenyu-plugin-casdoor</module>
         <module>shenyu-plugin-key-auth</module>
         <module>shenyu-plugin-brpc</module>
+        <module>shenyu-plugin-tcp</module>
     </modules>
 </project>
diff --git a/shenyu-discovery/shenyu-discovery-api/src/main/java/org/apache/shenyu/discovery/api/ShenyuDiscoveryService.java b/shenyu-plugin/shenyu-plugin-base/src/main/java/org/apache/shenyu/plugin/base/handler/ProxySelectorDataHandler.java
similarity index 51%
copy from shenyu-discovery/shenyu-discovery-api/src/main/java/org/apache/shenyu/discovery/api/ShenyuDiscoveryService.java
copy to shenyu-plugin/shenyu-plugin-base/src/main/java/org/apache/shenyu/plugin/base/handler/ProxySelectorDataHandler.java
index e286e7015..6ee9bd652 100644
--- a/shenyu-discovery/shenyu-discovery-api/src/main/java/org/apache/shenyu/discovery/api/ShenyuDiscoveryService.java
+++ b/shenyu-plugin/shenyu-plugin-base/src/main/java/org/apache/shenyu/plugin/base/handler/ProxySelectorDataHandler.java
@@ -15,38 +15,39 @@
  * limitations under the License.
  */
 
-package org.apache.shenyu.discovery.api;
+package org.apache.shenyu.plugin.base.handler;
 
-import org.apache.shenyu.discovery.api.config.DiscoveryConfig;
-import org.apache.shenyu.discovery.api.listener.DataChangedEventListener;
-import org.apache.shenyu.spi.SPI;
+import org.apache.shenyu.common.dto.ProxySelectorData;
+import org.apache.shenyu.common.dto.convert.selector.DiscoveryUpstream;
+
+import java.util.List;
 
 /**
- * The interface for shenyu discovery service.
+ * ProxySelectorDataHandler.
  */
-@SPI
-public interface ShenyuDiscoveryService {
-    
+public interface ProxySelectorDataHandler {
+
     /**
-     * Init shenyu discovery service .
+     * handlerProxySelector.
      *
-     * @param config the config
+     * @param selectorData selectorData
+     * @param upstreamsList upstreamsList
      */
-    void init(DiscoveryConfig config);
-    
+    void handlerProxySelector(ProxySelectorData selectorData, List<DiscoveryUpstream> upstreamsList);
+
     /**
-     * Watcher path , fire data changed event.
+     * updateProxySelector.
      *
-     * @param key the key
-     * @param listener the listener
+     * @param proxySelectorName proxySelectorName
+     * @param upstreamsList upstreamsList
      */
-    void watcher(String key, DataChangedEventListener listener);
-    
+    void updateProxySelector(String proxySelectorName, List<DiscoveryUpstream> upstreamsList);
+
     /**
-     * Register data.
+     * removeProxySelector.
      *
-     * @param key the key
-     * @param value the value
+     * @param proxySelectorName proxySelectorName
      */
-    void register(String key, String value);
+    void removeProxySelector(String proxySelectorName);
+
 }
diff --git a/shenyu-plugin/shenyu-plugin-tcp/pom.xml b/shenyu-plugin/shenyu-plugin-tcp/pom.xml
new file mode 100644
index 000000000..8e73c0f89
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-tcp/pom.xml
@@ -0,0 +1,51 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+~ Licensed to the Apache Software Foundation (ASF) under one or more
+~ contributor license agreements.  See the NOTICE file distributed with
+~ this work for additional information regarding copyright ownership.
+~ The ASF licenses this file to You under the Apache License, Version 2.0
+~ (the "License"); you may not use this file except in compliance with
+~ the License.  You may obtain a copy of the License at
+~
+~     http://www.apache.org/licenses/LICENSE-2.0
+~
+~ Unless required by applicable law or agreed to in writing, software
+~ distributed under the License is distributed on an "AS IS" BASIS,
+~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+~ See the License for the specific language governing permissions and
+~ limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.shenyu</groupId>
+        <artifactId>shenyu-plugin</artifactId>
+        <version>2.6.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>shenyu-plugin-tcp</artifactId>
+
+    <properties>
+        <maven.compiler.source>8</maven.compiler.source>
+        <maven.compiler.target>8</maven.compiler.target>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.shenyu</groupId>
+            <artifactId>shenyu-plugin-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <artifactId>shenyu-protocol-tcp</artifactId>
+            <groupId>org.apache.shenyu</groupId>
+            <version>${project.version}</version>
+        </dependency>
+
+    </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/shenyu-plugin/shenyu-plugin-tcp/src/main/java/org/apache/shenyu/plugin/tcp/handler/TcpBootstrapFactory.java b/shenyu-plugin/shenyu-plugin-tcp/src/main/java/org/apache/shenyu/plugin/tcp/handler/TcpBootstrapFactory.java
new file mode 100644
index 000000000..671195cd4
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-tcp/src/main/java/org/apache/shenyu/plugin/tcp/handler/TcpBootstrapFactory.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.tcp.handler;
+
+import com.google.common.eventbus.EventBus;
+import org.apache.shenyu.protocol.tcp.BootstrapServer;
+import org.apache.shenyu.protocol.tcp.TcpBootstrapServer;
+import org.apache.shenyu.protocol.tcp.TcpServerConfiguration;
+
+/**
+ * TcpBootstrapFactory.
+ */
+public final class TcpBootstrapFactory {
+
+    private static final TcpBootstrapFactory SINGLETON = new TcpBootstrapFactory();
+
+    private TcpBootstrapFactory() {
+    }
+
+    /**
+     * getSingleton.
+     *
+     * @return TcpBootstrapFactory
+     */
+    public static TcpBootstrapFactory getSingleton() {
+        return SINGLETON;
+    }
+
+    /**
+     * createBootstrapServer.
+     *
+     * @param configuration configuration
+     * @return BootstrapServer
+     */
+    public BootstrapServer createBootstrapServer(final TcpServerConfiguration configuration) {
+        EventBus eventBus = new EventBus();
+        BootstrapServer bootstrapServer = new TcpBootstrapServer(eventBus);
+        bootstrapServer.start(configuration);
+        return bootstrapServer;
+    }
+
+}
diff --git a/shenyu-plugin/shenyu-plugin-tcp/src/main/java/org/apache/shenyu/plugin/tcp/handler/TcpUpstreamDataHandler.java b/shenyu-plugin/shenyu-plugin-tcp/src/main/java/org/apache/shenyu/plugin/tcp/handler/TcpUpstreamDataHandler.java
new file mode 100644
index 000000000..2c1c4cc39
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-tcp/src/main/java/org/apache/shenyu/plugin/tcp/handler/TcpUpstreamDataHandler.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.tcp.handler;
+
+import org.apache.shenyu.common.dto.ProxySelectorData;
+import org.apache.shenyu.common.dto.convert.selector.DiscoveryUpstream;
+import org.apache.shenyu.plugin.base.handler.ProxySelectorDataHandler;
+import org.apache.shenyu.protocol.tcp.BootstrapServer;
+import org.apache.shenyu.protocol.tcp.TcpServerConfiguration;
+import org.apache.shenyu.protocol.tcp.UpstreamProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * upstreamList data change.
+ */
+public class TcpUpstreamDataHandler implements ProxySelectorDataHandler {
+
+    private static final Logger LOG = LoggerFactory.getLogger(TcpUpstreamDataHandler.class);
+
+    private final Map<String, BootstrapServer> cache = new ConcurrentHashMap<>();
+
+    @Override
+    public void handlerProxySelector(final ProxySelectorData proxySelectorData, final List<DiscoveryUpstream> upstreamsList) {
+        String name = proxySelectorData.getName();
+        Integer forwardPort = proxySelectorData.getForwardPort();
+        TcpServerConfiguration tcpServerConfiguration = new TcpServerConfiguration();
+        tcpServerConfiguration.setPort(forwardPort);
+        tcpServerConfiguration.setProps(proxySelectorData.getProps());
+        tcpServerConfiguration.setPluginSelectorName(name);
+        UpstreamProvider.getSingleton().createUpstreams(name, upstreamsList);
+        BootstrapServer bootstrapServer = TcpBootstrapFactory.getSingleton().createBootstrapServer(tcpServerConfiguration);
+        cache.put(name, bootstrapServer);
+        LOG.info("shenyu create TcpBootstrapServer success port is {}", forwardPort);
+    }
+
+    @Override
+    public void updateProxySelector(final String proxySelectorName, final List<DiscoveryUpstream> upstreamsList) {
+        List<DiscoveryUpstream> removed = UpstreamProvider.getSingleton().refreshCache(proxySelectorName, upstreamsList);
+        BootstrapServer bootstrapServer = cache.get(proxySelectorName);
+        bootstrapServer.removeCommonUpstream(removed);
+        LOG.info("shenyu update TcpBootstrapServer success remove is {}", removed);
+    }
+
+    @Override
+    public void removeProxySelector(final String proxySelectorName) {
+        if (cache.containsKey(proxySelectorName)) {
+            cache.remove(proxySelectorName).shutdown();
+            LOG.info("shenyu shutdown {}", proxySelectorName);
+        }
+    }
+
+}
diff --git a/shenyu-protocol/pom.xml b/shenyu-protocol/pom.xml
index 70c1b5567..5190a9f20 100644
--- a/shenyu-protocol/pom.xml
+++ b/shenyu-protocol/pom.xml
@@ -29,5 +29,6 @@
     <modules>
         <module>shenyu-protocol-grpc</module>
         <module>shenyu-protocol-mqtt</module>
+        <module>shenyu-protocol-tcp</module>
     </modules>
 </project>
\ No newline at end of file
diff --git a/shenyu-protocol/shenyu-protocol-tcp/pom.xml b/shenyu-protocol/shenyu-protocol-tcp/pom.xml
new file mode 100644
index 000000000..497fcb0ee
--- /dev/null
+++ b/shenyu-protocol/shenyu-protocol-tcp/pom.xml
@@ -0,0 +1,71 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+~ Licensed to the Apache Software Foundation (ASF) under one or more
+~ contributor license agreements.  See the NOTICE file distributed with
+~ this work for additional information regarding copyright ownership.
+~ The ASF licenses this file to You under the Apache License, Version 2.0
+~ (the "License"); you may not use this file except in compliance with
+~ the License.  You may obtain a copy of the License at
+~
+~     http://www.apache.org/licenses/LICENSE-2.0
+~
+~ Unless required by applicable law or agreed to in writing, software
+~ distributed under the License is distributed on an "AS IS" BASIS,
+~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+~ See the License for the specific language governing permissions and
+~ limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <artifactId>shenyu-protocol</artifactId>
+        <groupId>org.apache.shenyu</groupId>
+        <version>2.6.0-SNAPSHOT</version>
+    </parent>
+    <artifactId>shenyu-protocol-tcp</artifactId>
+
+    <properties>
+        <maven.compiler.source>8</maven.compiler.source>
+        <maven.compiler.target>8</maven.compiler.target>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    </properties>
+    <dependencies>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-simple</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.shenyu</groupId>
+            <artifactId>shenyu-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>io.projectreactor.netty</groupId>
+            <artifactId>reactor-netty-core</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.shenyu</groupId>
+            <artifactId>shenyu-loadbalancer</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+            <version>2.11.0</version>
+        </dependency>
+
+    </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/shenyu-discovery/shenyu-discovery-api/src/main/java/org/apache/shenyu/discovery/api/ShenyuDiscoveryService.java b/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/BootstrapServer.java
similarity index 54%
copy from shenyu-discovery/shenyu-discovery-api/src/main/java/org/apache/shenyu/discovery/api/ShenyuDiscoveryService.java
copy to shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/BootstrapServer.java
index e286e7015..ff009d567 100644
--- a/shenyu-discovery/shenyu-discovery-api/src/main/java/org/apache/shenyu/discovery/api/ShenyuDiscoveryService.java
+++ b/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/BootstrapServer.java
@@ -15,38 +15,33 @@
  * limitations under the License.
  */
 
-package org.apache.shenyu.discovery.api;
+package org.apache.shenyu.protocol.tcp;
 
-import org.apache.shenyu.discovery.api.config.DiscoveryConfig;
-import org.apache.shenyu.discovery.api.listener.DataChangedEventListener;
-import org.apache.shenyu.spi.SPI;
+import org.apache.shenyu.common.dto.convert.selector.DiscoveryUpstream;
+
+import java.util.List;
 
 /**
- * The interface for shenyu discovery service.
+ * BootstrapServer.
  */
-@SPI
-public interface ShenyuDiscoveryService {
-    
+public interface BootstrapServer {
+
     /**
-     * Init shenyu discovery service .
+     * start.
      *
-     * @param config the config
+     * @param tcpServerConfiguration tcpServerConfiguration
      */
-    void init(DiscoveryConfig config);
-    
+    void start(TcpServerConfiguration tcpServerConfiguration);
+
     /**
-     * Watcher path , fire data changed event.
+     * doOnUpdate.
      *
-     * @param key the key
-     * @param listener the listener
+     * @param removeList removeList
      */
-    void watcher(String key, DataChangedEventListener listener);
-    
+    void removeCommonUpstream(List<DiscoveryUpstream> removeList);
+
     /**
-     * Register data.
-     *
-     * @param key the key
-     * @param value the value
+     * shutdown.
      */
-    void register(String key, String value);
+    void shutdown();
 }
diff --git a/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/TcpBootstrapServer.java b/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/TcpBootstrapServer.java
new file mode 100644
index 000000000..8d3e6fa2b
--- /dev/null
+++ b/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/TcpBootstrapServer.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.protocol.tcp;
+
+import com.google.common.eventbus.EventBus;
+import io.netty.handler.logging.LogLevel;
+import io.netty.handler.logging.LoggingHandler;
+
+import org.apache.shenyu.common.dto.convert.selector.DiscoveryUpstream;
+import org.apache.shenyu.protocol.tcp.connection.Bridge;
+import org.apache.shenyu.protocol.tcp.connection.ConnectionContext;
+import org.apache.shenyu.protocol.tcp.connection.DefaultConnectionConfigProvider;
+import org.apache.shenyu.protocol.tcp.connection.TcpConnectionBridge;
+import org.apache.shenyu.protocol.tcp.connection.ActivityConnectionObserver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import reactor.core.publisher.Mono;
+import reactor.netty.Connection;
+import reactor.netty.DisposableServer;
+import reactor.netty.resources.LoopResources;
+import reactor.netty.tcp.TcpServer;
+
+import java.net.SocketAddress;
+import java.util.List;
+
+
+/**
+ * BootstrapServer.
+ */
+public class TcpBootstrapServer implements BootstrapServer {
+    private static final Logger LOG = LoggerFactory.getLogger(TcpBootstrapServer.class);
+
+    private Bridge bridge;
+
+    private ConnectionContext connectionContext;
+
+    private DisposableServer server;
+
+    private final EventBus eventBus;
+
+    public TcpBootstrapServer(final EventBus eventBus) {
+        this.eventBus = eventBus;
+    }
+
+    @Override
+    public void start(final TcpServerConfiguration tcpServerConfiguration) {
+        String loadBalanceAlgorithm = tcpServerConfiguration.getProps().getOrDefault("shenyu.tcpPlugin.tcpServerConfiguration.props.loadBalanceAlgorithm", "random").toString();
+        DefaultConnectionConfigProvider connectionConfigProvider = new DefaultConnectionConfigProvider(loadBalanceAlgorithm, tcpServerConfiguration.getPluginSelectorName());
+        this.bridge = new TcpConnectionBridge();
+        connectionContext = new ConnectionContext(connectionConfigProvider);
+        connectionContext.init(tcpServerConfiguration.getProps());
+        LoopResources loopResources = LoopResources.create("shenyu-tcp-bootstrap-server", tcpServerConfiguration.getBossGroupThreadCount(),
+                tcpServerConfiguration.getWorkerGroupThreadCount(), true);
+
+        TcpServer tcpServer = TcpServer.create()
+                .doOnChannelInit((connObserver, channel, remoteAddress) -> {
+                    channel.pipeline().addFirst(new LoggingHandler(LogLevel.INFO));
+                })
+                .wiretap(true)
+                .observe((c, s) -> {
+                    LOG.info("connection={}|status={}", c, s);
+                })
+                //.childObserve(connectionObserver)
+                .doOnConnection(this::bridgeConnections)
+                .port(tcpServerConfiguration.getPort())
+                .runOn(loopResources);
+        server = tcpServer.bindNow();
+    }
+
+    private void bridgeConnections(final Connection serverConn) {
+        LOG.info("Starting proxy client ={}", serverConn);
+        SocketAddress socketAddress = serverConn.channel().remoteAddress();
+        ActivityConnectionObserver connectionObserver = new ActivityConnectionObserver("TcpClient");
+        eventBus.register(connectionObserver);
+        Mono<Connection> client = connectionContext.getTcpClientConnection(getIp(socketAddress), connectionObserver);
+        client.subscribe(clientConn -> bridge.bridge(serverConn, clientConn));
+    }
+
+    private String getIp(final SocketAddress socketAddress) {
+        if (socketAddress == null) {
+            throw new NullPointerException("remoteAddress is null");
+        }
+        String address = socketAddress.toString();
+        return address.substring(2, address.indexOf(':'));
+    }
+
+    /**
+     * doOnUpdate.
+     *
+     * @param removeList removeList
+     */
+    @Override
+    public void removeCommonUpstream(final List<DiscoveryUpstream> removeList) {
+        eventBus.post(removeList);
+    }
+
+
+    /**
+     * shutdown.
+     */
+    @Override
+    public void shutdown() {
+        server.disposeNow();
+    }
+
+}
diff --git a/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/TcpServerConfiguration.java b/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/TcpServerConfiguration.java
new file mode 100644
index 000000000..0e07a54af
--- /dev/null
+++ b/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/TcpServerConfiguration.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.protocol.tcp;
+
+import java.util.Properties;
+
+/**
+ * tcp server configuration.
+ */
+public class TcpServerConfiguration {
+
+    private String pluginSelectorName;
+
+    private int port = 9500;
+
+    private int bossGroupThreadCount = 1;
+
+    private int maxPayloadSize = 65536;
+
+    private int workerGroupThreadCount = 12;
+
+    private String leakDetectorLevel = "DISABLED";
+
+    private Properties props = new Properties();
+
+    /**
+     * getPluginSelectorName.
+     *
+     * @return pluginSelectorName
+     */
+    public String getPluginSelectorName() {
+        return pluginSelectorName;
+    }
+
+    /**
+     * setPluginSelectorName.
+     *
+     * @param pluginSelectorName pluginSelectorName
+     */
+    public void setPluginSelectorName(final String pluginSelectorName) {
+        this.pluginSelectorName = pluginSelectorName;
+    }
+
+    /**
+     * get port.
+     *
+     * @return port
+     */
+    public int getPort() {
+        return port;
+    }
+
+    /**
+     * set port.
+     *
+     * @param port port
+     */
+    public void setPort(final int port) {
+        this.port = port;
+    }
+
+    /**
+     * get bossGroupThreadCount.
+     *
+     * @return bossGroupThreadCount
+     */
+    public int getBossGroupThreadCount() {
+        return bossGroupThreadCount;
+    }
+
+    /**
+     * set bossGroupThreadCount.
+     *
+     * @param bossGroupThreadCount bossGroupThreadCount
+     */
+    public void setBossGroupThreadCount(final int bossGroupThreadCount) {
+        this.bossGroupThreadCount = bossGroupThreadCount;
+    }
+
+    /**
+     * get maxPayloadSize.
+     *
+     * @return maxPayloadSize
+     */
+    public int getMaxPayloadSize() {
+        return maxPayloadSize;
+    }
+
+    /**
+     * set maxPayloadSize.
+     *
+     * @param maxPayloadSize maxPayloadSize
+     */
+    public void setMaxPayloadSize(final int maxPayloadSize) {
+        this.maxPayloadSize = maxPayloadSize;
+    }
+
+    /**
+     * get workerGroupThreadCount.
+     *
+     * @return workerGroupThreadCount
+     */
+    public int getWorkerGroupThreadCount() {
+        return workerGroupThreadCount;
+    }
+
+    /**
+     * set workerGroupThreadCount.
+     *
+     * @param workerGroupThreadCount workerGroupThreadCount
+     */
+    public void setWorkerGroupThreadCount(final int workerGroupThreadCount) {
+        this.workerGroupThreadCount = workerGroupThreadCount;
+    }
+
+    /**
+     * get leakDetectorLevel.
+     *
+     * @return leakDetectorLevel
+     */
+    public String getLeakDetectorLevel() {
+        return leakDetectorLevel;
+    }
+
+    /**
+     * set leakDetectorLevel.
+     *
+     * @param leakDetectorLevel leakDetectorLevel
+     */
+    public void setLeakDetectorLevel(final String leakDetectorLevel) {
+        this.leakDetectorLevel = leakDetectorLevel;
+    }
+
+    /**
+     * getProps.
+     *
+     * @return props
+     */
+    public Properties getProps() {
+        return props;
+    }
+
+    /**
+     * setProps.
+     *
+     * @param props props
+     */
+    public void setProps(final Properties props) {
+        this.props = props;
+    }
+
+}
diff --git a/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/UpstreamProvider.java b/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/UpstreamProvider.java
new file mode 100644
index 000000000..116b1baed
--- /dev/null
+++ b/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/UpstreamProvider.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.protocol.tcp;
+
+import org.apache.shenyu.common.dto.convert.selector.DiscoveryUpstream;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * UpstreamProvider.
+ */
+public final class UpstreamProvider {
+
+    private static final UpstreamProvider SINGLETON = new UpstreamProvider();
+
+    private final Map<String, List<DiscoveryUpstream>> cache = new ConcurrentHashMap<>();
+
+    private UpstreamProvider() {
+    }
+
+    /**
+     * getSingleton.
+     *
+     * @return UpstreamProvider
+     */
+    public static UpstreamProvider getSingleton() {
+        return SINGLETON;
+    }
+
+    /**
+     * provide.
+     *
+     * @param pluginSelectorName pluginSelectorName
+     * @return UpstreamList
+     */
+    public List<DiscoveryUpstream> provide(final String pluginSelectorName) {
+        return cache.get(pluginSelectorName);
+    }
+
+    /**
+     * createUpstreams.
+     *
+     * @param pluginSelectorName pluginSelectorName
+     * @param upstreams          upstreams
+     */
+    public void createUpstreams(final String pluginSelectorName, final List<DiscoveryUpstream> upstreams) {
+        cache.put(pluginSelectorName, upstreams);
+    }
+
+    /**
+     * refreshCache.
+     *
+     * @param pluginSelectorName pluginSelectorName
+     * @param upstreams          upstreams
+     * @return removeList
+     */
+    public List<DiscoveryUpstream> refreshCache(final String pluginSelectorName, final List<DiscoveryUpstream> upstreams) {
+        List<DiscoveryUpstream> remove = cache.remove(pluginSelectorName);
+        cache.put(pluginSelectorName, upstreams);
+        return remove;
+    }
+}
diff --git a/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/connection/ActivityConnectionObserver.java b/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/connection/ActivityConnectionObserver.java
new file mode 100644
index 000000000..0cbdf5403
--- /dev/null
+++ b/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/connection/ActivityConnectionObserver.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.protocol.tcp.connection;
+
+import com.google.common.eventbus.Subscribe;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.shenyu.common.dto.convert.selector.DiscoveryUpstream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import reactor.netty.Connection;
+import reactor.netty.ConnectionObserver;
+
+import java.net.SocketAddress;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * ActivityConnectionObserver.
+ */
+public class ActivityConnectionObserver implements ConnectionObserver {
+    private static final Logger LOG = LoggerFactory.getLogger(ActivityConnectionObserver.class);
+
+    private final Map<Connection, State> cache = new ConcurrentHashMap<>();
+
+    private final String name;
+
+    public ActivityConnectionObserver(final String name) {
+        this.name = name;
+    }
+
+    @Override
+    public void onStateChange(final Connection connection, final State newState) {
+        if (newState == State.CONNECTED) {
+            cache.put(connection, newState);
+            LOG.info("{} add connection into cache ={}", name, connection);
+        } else if (newState == State.DISCONNECTING
+                || newState == State.RELEASED
+        ) {
+            cache.remove(connection);
+            LOG.info("{} remove connection into cache ={}", name, connection);
+        } else {
+            if (cache.containsKey(connection)) {
+                cache.put(connection, newState);
+            }
+        }
+    }
+
+    /**
+     * onRemove.
+     *
+     * @param remove removeList
+     */
+    @Subscribe
+    public void onRemove(final List<DiscoveryUpstream> remove) {
+        LOG.info("shenyu {} ConnectionObserver  do on remove upstreams", name);
+        for (Connection connection : cache.keySet()) {
+            SocketAddress socketAddress = connection.channel().remoteAddress();
+            if (in(remove, socketAddress)) {
+                LOG.info("shenyu dispose {} connection ", connection);
+                connection.disposeNow();
+            }
+        }
+    }
+
+    /**
+     * in.
+     *
+     * @param removeList         removeList
+     * @param cacheSocketAddress cacheSocketAddress
+     * @return boolean
+     */
+    private boolean in(final List<DiscoveryUpstream> removeList, final SocketAddress cacheSocketAddress) {
+        return removeList.stream().anyMatch(u -> {
+            String cacheUrl = cacheSocketAddress.toString().substring(1);
+            String removedUrl = u.getUpstreamUrl();
+            LOG.info("compare {} , {}", cacheUrl, removedUrl);
+            return StringUtils.equals(cacheUrl, removedUrl);
+        });
+    }
+}
diff --git a/shenyu-discovery/shenyu-discovery-zookeeper/src/main/java/org/apache/shenyu/discovery/zookeeper/ZookeeperDiscoveryService.java b/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/connection/Bridge.java
similarity index 53%
copy from shenyu-discovery/shenyu-discovery-zookeeper/src/main/java/org/apache/shenyu/discovery/zookeeper/ZookeeperDiscoveryService.java
copy to shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/connection/Bridge.java
index 0ffa670b9..e4d027f63 100644
--- a/shenyu-discovery/shenyu-discovery-zookeeper/src/main/java/org/apache/shenyu/discovery/zookeeper/ZookeeperDiscoveryService.java
+++ b/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/connection/Bridge.java
@@ -15,31 +15,20 @@
  * limitations under the License.
  */
 
-package org.apache.shenyu.discovery.zookeeper;
+package org.apache.shenyu.protocol.tcp.connection;
 
-import org.apache.shenyu.discovery.api.ShenyuDiscoveryService;
-import org.apache.shenyu.discovery.api.config.DiscoveryConfig;
-import org.apache.shenyu.discovery.api.listener.DataChangedEventListener;
-import org.apache.shenyu.spi.Join;
+import reactor.netty.Connection;
 
 /**
- * The type Zookeeper for shenyu discovery service.
+ * Bridge.
  */
-@Join
-public class ZookeeperDiscoveryService implements ShenyuDiscoveryService {
-    
-    @Override
-    public void init(final DiscoveryConfig config) {
-        
-    }
-    
-    @Override
-    public void watcher(final String key, final DataChangedEventListener listener) {
-        
-    }
-    
-    @Override
-    public void register(final String key, final String value) {
-        
-    }
+public interface Bridge {
+
+    /**
+     * bridge.
+     *
+     * @param server tcp server connection
+     * @param client tcp client connection
+     */
+    void bridge(Connection server, Connection client);
 }
diff --git a/shenyu-discovery/shenyu-discovery-zookeeper/src/main/java/org/apache/shenyu/discovery/zookeeper/ZookeeperDiscoveryService.java b/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/connection/ClientConnectionConfigProvider.java
similarity index 53%
copy from shenyu-discovery/shenyu-discovery-zookeeper/src/main/java/org/apache/shenyu/discovery/zookeeper/ZookeeperDiscoveryService.java
copy to shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/connection/ClientConnectionConfigProvider.java
index 0ffa670b9..8fbf41bdf 100644
--- a/shenyu-discovery/shenyu-discovery-zookeeper/src/main/java/org/apache/shenyu/discovery/zookeeper/ZookeeperDiscoveryService.java
+++ b/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/connection/ClientConnectionConfigProvider.java
@@ -15,31 +15,21 @@
  * limitations under the License.
  */
 
-package org.apache.shenyu.discovery.zookeeper;
+package org.apache.shenyu.protocol.tcp.connection;
 
-import org.apache.shenyu.discovery.api.ShenyuDiscoveryService;
-import org.apache.shenyu.discovery.api.config.DiscoveryConfig;
-import org.apache.shenyu.discovery.api.listener.DataChangedEventListener;
-import org.apache.shenyu.spi.Join;
+import java.net.URI;
 
 /**
- * The type Zookeeper for shenyu discovery service.
+ * ClientConnectionConfigProvider.
  */
-@Join
-public class ZookeeperDiscoveryService implements ShenyuDiscoveryService {
-    
-    @Override
-    public void init(final DiscoveryConfig config) {
-        
-    }
-    
-    @Override
-    public void watcher(final String key, final DataChangedEventListener listener) {
-        
-    }
-    
-    @Override
-    public void register(final String key, final String value) {
-        
-    }
+public interface ClientConnectionConfigProvider {
+
+    /**
+     * getProxiedService.
+     *
+     * @param ip ip
+     * @return URI
+     */
+    URI getProxiedService(String ip);
+
 }
diff --git a/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/connection/ConnectionContext.java b/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/connection/ConnectionContext.java
new file mode 100644
index 000000000..dfcb6ef72
--- /dev/null
+++ b/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/connection/ConnectionContext.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.protocol.tcp.connection;
+
+import reactor.core.publisher.Mono;
+import reactor.netty.Connection;
+import reactor.netty.resources.ConnectionProvider;
+import reactor.netty.tcp.TcpClient;
+
+import java.time.Duration;
+import java.util.Properties;
+
+/**
+ * ConnectionContext.
+ */
+public class ConnectionContext {
+
+    private final ClientConnectionConfigProvider connectionConfigProvider;
+
+    private ConnectionProvider connectionProvider;
+
+    public ConnectionContext(final ClientConnectionConfigProvider connectionConfigProvider) {
+        this.connectionConfigProvider = connectionConfigProvider;
+    }
+
+    /**
+     * init.
+     *
+     * @param props props
+     */
+    public void init(final Properties props) {
+        final String maxTotal = props.getProperty("tcpProxy.maxConnections", "800");
+        final String maxIdleTimeMs = props.getProperty("tcpProxy.maxIdleTimeMs", "100");
+        final String tcpProxyClientName = props.getProperty("tcpProxy.Name", "shenyu-tcp-connection-pool-client");
+        final String disposeTimeoutMs = props.getProperty("tcpProxy.disposeTimeoutMs", "2000");
+        connectionProvider = ConnectionProvider.builder(tcpProxyClientName)
+                .maxConnections(Integer.parseInt(maxTotal))
+                .maxIdleTime(Duration.ofMillis(Integer.parseInt(maxIdleTimeMs)))
+                .disposeTimeout(Duration.ofMillis(Integer.parseInt(disposeTimeoutMs)))
+                .build();
+    }
+
+    /**
+     * getTcpClientConnection.
+     *
+     * @param ip       ip
+     * @param observer observer
+     * @return MonoConnection
+     */
+    public Mono<Connection> getTcpClientConnection(final String ip, final ActivityConnectionObserver observer) {
+        return Mono.just(connectionConfigProvider.getProxiedService(ip))
+                .flatMap(url ->
+                        TcpClient.create(connectionProvider)
+                                .host(url.getHost())
+                                .port(url.getPort())
+                                .observe(observer)
+                                .connect()
+                );
+    }
+
+}
diff --git a/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/connection/DefaultConnectionConfigProvider.java b/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/connection/DefaultConnectionConfigProvider.java
new file mode 100644
index 000000000..595601328
--- /dev/null
+++ b/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/connection/DefaultConnectionConfigProvider.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.protocol.tcp.connection;
+
+import org.apache.shenyu.common.exception.ShenyuException;
+import org.apache.shenyu.loadbalancer.entity.Upstream;
+import org.apache.shenyu.loadbalancer.factory.LoadBalancerFactory;
+import org.apache.shenyu.protocol.tcp.UpstreamProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+
+/**
+ * ClientConnectionConfigProviderFactory.
+ */
+public class DefaultConnectionConfigProvider implements ClientConnectionConfigProvider {
+
+    private static final Logger LOG = LoggerFactory.getLogger(DefaultConnectionConfigProvider.class);
+
+    private final String loadBalanceAlgorithm;
+
+    private final String pluginSelectorName;
+
+    public DefaultConnectionConfigProvider(final String loadBalanceAlgorithm, final String pluginSelectorName) {
+        this.loadBalanceAlgorithm = loadBalanceAlgorithm;
+        this.pluginSelectorName = pluginSelectorName;
+    }
+
+    @Override
+    public URI getProxiedService(final String ip) {
+        List<Upstream> upstreamList = UpstreamProvider.getSingleton().provide(this.pluginSelectorName).stream().map(dp -> {
+            return Upstream.builder().url(dp.getUpstreamUrl()).status(dp.isStatus()).weight(dp.getWeight()).protocol(dp.getProtocol()).build();
+        }).collect(Collectors.toList());
+        Upstream upstream = LoadBalancerFactory.selector(upstreamList, loadBalanceAlgorithm, ip);
+        return cover(upstream);
+    }
+
+    private URI cover(final Upstream upstream) {
+        try {
+            return new URI(upstream.getProtocol() + "://" + upstream.getUrl());
+        } catch (URISyntaxException e) {
+            LOG.error("Upstream url is wrong", e);
+            throw new ShenyuException(e);
+        }
+    }
+
+}
diff --git a/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/connection/TcpConnectionBridge.java b/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/connection/TcpConnectionBridge.java
new file mode 100644
index 000000000..66945e9da
--- /dev/null
+++ b/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/connection/TcpConnectionBridge.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.protocol.tcp.connection;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import reactor.core.Disposable;
+import reactor.core.Disposables;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.netty.Connection;
+import reactor.netty.NettyInbound;
+import reactor.netty.NettyOutbound;
+
+/**
+ * TcpConnectionBridge.
+ */
+public class TcpConnectionBridge implements Bridge {
+
+    private static final Logger LOG = LoggerFactory.getLogger(TcpConnectionBridge.class);
+
+    @Override
+    public void bridge(final Connection server, final Connection client) {
+        //   LOG.info("start server#inbound -> client#outbound");
+        Disposable requestDisposable = bridge(server.inbound(), client.outbound());
+        //  LOG.info("start client#inbound -> server#outbound");
+        Disposable responseDisposable = bridge(client.inbound(), server.outbound());
+        // binding dispose: when server connection is disposed ,client while close too.
+        server.onDispose(Disposables.composite(requestDisposable, responseDisposable, client.channel()::close));
+        client.onDispose(Disposables.composite(requestDisposable, responseDisposable, server.channel()::close));
+    }
+
+    private Disposable bridge(final NettyInbound inbound, final NettyOutbound outbound) {
+        Flux<byte[]> flux = inbound.receive().asByteArray();
+        return flux.concatMap(next -> outbound.sendByteArray(Mono.just(next))).subscribe();
+    }
+
+}