You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shenyu.apache.org by xi...@apache.org on 2023/05/04 04:20:57 UTC
[shenyu] branch master updated: Tcp plugin (#4607)
This is an automated email from the ASF dual-hosted git repository.
xiaoyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shenyu.git
The following commit(s) were added to refs/heads/master by this push:
new 86a4c1c14 Tcp plugin (#4607)
86a4c1c14 is described below
commit 86a4c1c14e3464318e115aaa9d6e3fea14747752
Author: 杨文杰 <31...@users.noreply.github.com>
AuthorDate: Thu May 4 12:20:51 2023 +0800
Tcp plugin (#4607)
* init tcp plugin
* init tcp plugin
* tigger ci
* init tcp
* init tcp
* demo
* add tcp
proxy demo
* change code
* add tcp
proxy demo
* change code
* change code
* change code
* change codes
* tcp proxy
* tcp proxy
* add tcp
proxy
* add tcp
proxy
* change tcp proxy code
* add tcp
proxy
* support hot update
* support hot update
* add tcp
proxy
* add tcp
proxy
* add tcp
proxy
* add tcp
proxy
---
pom.xml | 2 +-
.../shenyu/common/dto/ProxySelectorData.java | 146 ++++++++++++++++++
.../dto/convert/selector/DiscoveryUpstream.java | 56 ++++---
.../org/apache/shenyu/common/enums/PluginEnum.java | 4 +
.../discovery/api/ShenyuDiscoveryService.java | 19 ++-
.../zookeeper/ZookeeperDiscoveryService.java | 17 ++-
shenyu-plugin/pom.xml | 1 +
.../base/handler/ProxySelectorDataHandler.java | 43 +++---
shenyu-plugin/shenyu-plugin-tcp/pom.xml | 51 +++++++
.../plugin/tcp/handler/TcpBootstrapFactory.java | 57 +++++++
.../plugin/tcp/handler/TcpUpstreamDataHandler.java | 72 +++++++++
shenyu-protocol/pom.xml | 1 +
shenyu-protocol/shenyu-protocol-tcp/pom.xml | 71 +++++++++
.../shenyu/protocol/tcp/BootstrapServer.java | 39 +++--
.../shenyu/protocol/tcp/TcpBootstrapServer.java | 121 +++++++++++++++
.../protocol/tcp/TcpServerConfiguration.java | 167 +++++++++++++++++++++
.../shenyu/protocol/tcp/UpstreamProvider.java | 79 ++++++++++
.../tcp/connection/ActivityConnectionObserver.java | 96 ++++++++++++
.../shenyu/protocol/tcp/connection/Bridge.java | 35 ++---
.../connection/ClientConnectionConfigProvider.java | 36 ++---
.../protocol/tcp/connection/ConnectionContext.java | 76 ++++++++++
.../DefaultConnectionConfigProvider.java | 67 +++++++++
.../tcp/connection/TcpConnectionBridge.java | 53 +++++++
23 files changed, 1186 insertions(+), 123 deletions(-)
diff --git a/pom.xml b/pom.xml
index c7344e46e..06f6f9b1b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -112,7 +112,6 @@
<wiremock.version>2.18.0</wiremock.version>
<zookeeper.version>3.5.7</zookeeper.version>
<zkclient.version>0.10</zkclient.version>
- <apollo.version>2.1.0</apollo.version>
<shiro.version>1.8.0</shiro.version>
<jwt.version>3.12.0</jwt.version>
<motan.version>1.2.0</motan.version>
@@ -170,6 +169,7 @@
<caffeine.version>2.9.3</caffeine.version>
<httpasyncclient.version>4.1.5</httpasyncclient.version>
<asm.version>9.2</asm.version>
+ <apollo.version>2.1.0</apollo.version>
</properties>
<dependencyManagement>
diff --git a/shenyu-common/src/main/java/org/apache/shenyu/common/dto/ProxySelectorData.java b/shenyu-common/src/main/java/org/apache/shenyu/common/dto/ProxySelectorData.java
new file mode 100644
index 000000000..09d3d852e
--- /dev/null
+++ b/shenyu-common/src/main/java/org/apache/shenyu/common/dto/ProxySelectorData.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.common.dto;
+
+import java.util.Properties;
+
+/**
+ * ProxySelectorData.
+ */
+public class ProxySelectorData {
+
+ private String id;
+
+ private String name;
+
+ private String pluginName;
+
+ private String type;
+
+ private Integer forwardPort;
+
+ private Properties props = new Properties();
+
+ /**
+ * getId.
+ *
+ * @return id
+ */
+ public String getId() {
+ return id;
+ }
+
+ /**
+ * setId.
+ *
+ * @param id id
+ */
+ public void setId(final String id) {
+ this.id = id;
+ }
+
+ /**
+ * getName.
+ *
+ * @return name
+ */
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * setName.
+ *
+ * @param name name
+ */
+ public void setName(final String name) {
+ this.name = name;
+ }
+
+ /**
+ * getPluginName.
+ *
+ * @return pluginName
+ */
+ public String getPluginName() {
+ return pluginName;
+ }
+
+ /**
+ * setPluginName.
+ *
+ * @param pluginName pluginName
+ */
+ public void setPluginName(final String pluginName) {
+ this.pluginName = pluginName;
+ }
+
+ /**
+ * getType.
+ *
+ * @return type
+ */
+ public String getType() {
+ return type;
+ }
+
+ /**
+ * setType.
+ *
+ * @param type type
+ */
+ public void setType(final String type) {
+ this.type = type;
+ }
+
+ /**
+ * getForwardPort.
+ *
+ * @return forwardPort
+ */
+ public Integer getForwardPort() {
+ return forwardPort;
+ }
+
+ /**
+ * setForwardPort.
+ *
+ * @param forwardPort forwardPort
+ */
+ public void setForwardPort(final Integer forwardPort) {
+ this.forwardPort = forwardPort;
+ }
+
+ /**
+ * getProps.
+ *
+ * @return props
+ */
+ public Properties getProps() {
+ return props;
+ }
+
+ /**
+ * setProps.
+ *
+ * @param props props
+ */
+ public void setProps(final Properties props) {
+ this.props = props;
+ }
+}
diff --git a/shenyu-discovery/shenyu-discovery-api/src/main/java/org/apache/shenyu/discovery/api/ShenyuDiscoveryService.java b/shenyu-common/src/main/java/org/apache/shenyu/common/dto/convert/selector/DiscoveryUpstream.java
similarity index 54%
copy from shenyu-discovery/shenyu-discovery-api/src/main/java/org/apache/shenyu/discovery/api/ShenyuDiscoveryService.java
copy to shenyu-common/src/main/java/org/apache/shenyu/common/dto/convert/selector/DiscoveryUpstream.java
index e286e7015..a81217e6a 100644
--- a/shenyu-discovery/shenyu-discovery-api/src/main/java/org/apache/shenyu/discovery/api/ShenyuDiscoveryService.java
+++ b/shenyu-common/src/main/java/org/apache/shenyu/common/dto/convert/selector/DiscoveryUpstream.java
@@ -15,38 +15,50 @@
* limitations under the License.
*/
-package org.apache.shenyu.discovery.api;
-
-import org.apache.shenyu.discovery.api.config.DiscoveryConfig;
-import org.apache.shenyu.discovery.api.listener.DataChangedEventListener;
-import org.apache.shenyu.spi.SPI;
+package org.apache.shenyu.common.dto.convert.selector;
/**
- * The interface for shenyu discovery service.
+ * DiscoveryUpstream.
*/
-@SPI
-public interface ShenyuDiscoveryService {
-
+public class DiscoveryUpstream extends CommonUpstream {
+
+ private int weight;
+
+ private String prop;
+
+ /**
+ * getWeight.
+ *
+ * @return weight
+ */
+ public int getWeight() {
+ return weight;
+ }
+
/**
- * Init shenyu discovery service .
+ * setWeight.
*
- * @param config the config
+ * @param weight weight
*/
- void init(DiscoveryConfig config);
-
+ public void setWeight(final int weight) {
+ this.weight = weight;
+ }
+
/**
- * Watcher path , fire data changed event.
+ * getProp.
*
- * @param key the key
- * @param listener the listener
+ * @return prop
*/
- void watcher(String key, DataChangedEventListener listener);
-
+ public String getProp() {
+ return prop;
+ }
+
/**
- * Register data.
+ * setProp.
*
- * @param key the key
- * @param value the value
+ * @param prop prop
*/
- void register(String key, String value);
+ public void setProp(final String prop) {
+ this.prop = prop;
+ }
}
diff --git a/shenyu-common/src/main/java/org/apache/shenyu/common/enums/PluginEnum.java b/shenyu-common/src/main/java/org/apache/shenyu/common/enums/PluginEnum.java
index 8e28218de..ea0b0d81e 100644
--- a/shenyu-common/src/main/java/org/apache/shenyu/common/enums/PluginEnum.java
+++ b/shenyu-common/src/main/java/org/apache/shenyu/common/enums/PluginEnum.java
@@ -31,6 +31,10 @@ public enum PluginEnum {
* Global plugin enum.
*/
GLOBAL(-1, 0, "global"),
+ /**
+ * Tcp plugin enum.
+ */
+ TCP(0, 0, "tcp"),
/**
* Mqtt plugin enum.
*/
diff --git a/shenyu-discovery/shenyu-discovery-api/src/main/java/org/apache/shenyu/discovery/api/ShenyuDiscoveryService.java b/shenyu-discovery/shenyu-discovery-api/src/main/java/org/apache/shenyu/discovery/api/ShenyuDiscoveryService.java
index e286e7015..f59b13f64 100644
--- a/shenyu-discovery/shenyu-discovery-api/src/main/java/org/apache/shenyu/discovery/api/ShenyuDiscoveryService.java
+++ b/shenyu-discovery/shenyu-discovery-api/src/main/java/org/apache/shenyu/discovery/api/ShenyuDiscoveryService.java
@@ -26,27 +26,36 @@ import org.apache.shenyu.spi.SPI;
*/
@SPI
public interface ShenyuDiscoveryService {
-
+
/**
* Init shenyu discovery service .
*
* @param config the config
*/
void init(DiscoveryConfig config);
-
+
/**
* Watcher path , fire data changed event.
*
- * @param key the key
+ * @param key the key
* @param listener the listener
*/
void watcher(String key, DataChangedEventListener listener);
-
+
/**
* Register data.
*
- * @param key the key
+ * @param key the key
* @param value the value
*/
void register(String key, String value);
+
+ /**
+ * getData by key.
+ *
+ * @param key key
+ * @return value
+ */
+ String getData(String key);
+
}
diff --git a/shenyu-discovery/shenyu-discovery-zookeeper/src/main/java/org/apache/shenyu/discovery/zookeeper/ZookeeperDiscoveryService.java b/shenyu-discovery/shenyu-discovery-zookeeper/src/main/java/org/apache/shenyu/discovery/zookeeper/ZookeeperDiscoveryService.java
index 0ffa670b9..8e6e2ebae 100644
--- a/shenyu-discovery/shenyu-discovery-zookeeper/src/main/java/org/apache/shenyu/discovery/zookeeper/ZookeeperDiscoveryService.java
+++ b/shenyu-discovery/shenyu-discovery-zookeeper/src/main/java/org/apache/shenyu/discovery/zookeeper/ZookeeperDiscoveryService.java
@@ -27,19 +27,24 @@ import org.apache.shenyu.spi.Join;
*/
@Join
public class ZookeeperDiscoveryService implements ShenyuDiscoveryService {
-
+
@Override
public void init(final DiscoveryConfig config) {
-
+
}
-
+
@Override
public void watcher(final String key, final DataChangedEventListener listener) {
-
+
}
-
+
@Override
public void register(final String key, final String value) {
-
+
+ }
+
+ @Override
+ public String getData(final String key) {
+ return null;
}
}
diff --git a/shenyu-plugin/pom.xml b/shenyu-plugin/pom.xml
index fd9ad504b..81b56c2df 100644
--- a/shenyu-plugin/pom.xml
+++ b/shenyu-plugin/pom.xml
@@ -65,5 +65,6 @@
<module>shenyu-plugin-casdoor</module>
<module>shenyu-plugin-key-auth</module>
<module>shenyu-plugin-brpc</module>
+ <module>shenyu-plugin-tcp</module>
</modules>
</project>
diff --git a/shenyu-discovery/shenyu-discovery-api/src/main/java/org/apache/shenyu/discovery/api/ShenyuDiscoveryService.java b/shenyu-plugin/shenyu-plugin-base/src/main/java/org/apache/shenyu/plugin/base/handler/ProxySelectorDataHandler.java
similarity index 51%
copy from shenyu-discovery/shenyu-discovery-api/src/main/java/org/apache/shenyu/discovery/api/ShenyuDiscoveryService.java
copy to shenyu-plugin/shenyu-plugin-base/src/main/java/org/apache/shenyu/plugin/base/handler/ProxySelectorDataHandler.java
index e286e7015..6ee9bd652 100644
--- a/shenyu-discovery/shenyu-discovery-api/src/main/java/org/apache/shenyu/discovery/api/ShenyuDiscoveryService.java
+++ b/shenyu-plugin/shenyu-plugin-base/src/main/java/org/apache/shenyu/plugin/base/handler/ProxySelectorDataHandler.java
@@ -15,38 +15,39 @@
* limitations under the License.
*/
-package org.apache.shenyu.discovery.api;
+package org.apache.shenyu.plugin.base.handler;
-import org.apache.shenyu.discovery.api.config.DiscoveryConfig;
-import org.apache.shenyu.discovery.api.listener.DataChangedEventListener;
-import org.apache.shenyu.spi.SPI;
+import org.apache.shenyu.common.dto.ProxySelectorData;
+import org.apache.shenyu.common.dto.convert.selector.DiscoveryUpstream;
+
+import java.util.List;
/**
- * The interface for shenyu discovery service.
+ * ProxySelectorDataHandler.
*/
-@SPI
-public interface ShenyuDiscoveryService {
-
+public interface ProxySelectorDataHandler {
+
/**
- * Init shenyu discovery service .
+ * handlerProxySelector.
*
- * @param config the config
+ * @param selectorData selectorData
+ * @param upstreamsList upstreamsList
*/
- void init(DiscoveryConfig config);
-
+ void handlerProxySelector(ProxySelectorData selectorData, List<DiscoveryUpstream> upstreamsList);
+
/**
- * Watcher path , fire data changed event.
+ * updateProxySelector.
*
- * @param key the key
- * @param listener the listener
+ * @param proxySelectorName proxySelectorName
+ * @param upstreamsList upstreamsList
*/
- void watcher(String key, DataChangedEventListener listener);
-
+ void updateProxySelector(String proxySelectorName, List<DiscoveryUpstream> upstreamsList);
+
/**
- * Register data.
+ * removeProxySelector.
*
- * @param key the key
- * @param value the value
+ * @param proxySelectorName proxySelectorName
*/
- void register(String key, String value);
+ void removeProxySelector(String proxySelectorName);
+
}
diff --git a/shenyu-plugin/shenyu-plugin-tcp/pom.xml b/shenyu-plugin/shenyu-plugin-tcp/pom.xml
new file mode 100644
index 000000000..8e73c0f89
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-tcp/pom.xml
@@ -0,0 +1,51 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+~ Licensed to the Apache Software Foundation (ASF) under one or more
+~ contributor license agreements. See the NOTICE file distributed with
+~ this work for additional information regarding copyright ownership.
+~ The ASF licenses this file to You under the Apache License, Version 2.0
+~ (the "License"); you may not use this file except in compliance with
+~ the License. You may obtain a copy of the License at
+~
+~ http://www.apache.org/licenses/LICENSE-2.0
+~
+~ Unless required by applicable law or agreed to in writing, software
+~ distributed under the License is distributed on an "AS IS" BASIS,
+~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+~ See the License for the specific language governing permissions and
+~ limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.shenyu</groupId>
+ <artifactId>shenyu-plugin</artifactId>
+ <version>2.6.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>shenyu-plugin-tcp</artifactId>
+
+ <properties>
+ <maven.compiler.source>8</maven.compiler.source>
+ <maven.compiler.target>8</maven.compiler.target>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.shenyu</groupId>
+ <artifactId>shenyu-plugin-base</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <artifactId>shenyu-protocol-tcp</artifactId>
+ <groupId>org.apache.shenyu</groupId>
+ <version>${project.version}</version>
+ </dependency>
+
+ </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/shenyu-plugin/shenyu-plugin-tcp/src/main/java/org/apache/shenyu/plugin/tcp/handler/TcpBootstrapFactory.java b/shenyu-plugin/shenyu-plugin-tcp/src/main/java/org/apache/shenyu/plugin/tcp/handler/TcpBootstrapFactory.java
new file mode 100644
index 000000000..671195cd4
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-tcp/src/main/java/org/apache/shenyu/plugin/tcp/handler/TcpBootstrapFactory.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.tcp.handler;
+
+import com.google.common.eventbus.EventBus;
+import org.apache.shenyu.protocol.tcp.BootstrapServer;
+import org.apache.shenyu.protocol.tcp.TcpBootstrapServer;
+import org.apache.shenyu.protocol.tcp.TcpServerConfiguration;
+
+/**
+ * TcpBootstrapFactory.
+ */
+public final class TcpBootstrapFactory {
+
+ private static final TcpBootstrapFactory SINGLETON = new TcpBootstrapFactory();
+
+ private TcpBootstrapFactory() {
+ }
+
+ /**
+ * getSingleton.
+ *
+ * @return TcpBootstrapFactory
+ */
+ public static TcpBootstrapFactory getSingleton() {
+ return SINGLETON;
+ }
+
+ /**
+ * createBootstrapServer.
+ *
+ * @param configuration configuration
+ * @return BootstrapServer
+ */
+ public BootstrapServer createBootstrapServer(final TcpServerConfiguration configuration) {
+ EventBus eventBus = new EventBus();
+ BootstrapServer bootstrapServer = new TcpBootstrapServer(eventBus);
+ bootstrapServer.start(configuration);
+ return bootstrapServer;
+ }
+
+}
diff --git a/shenyu-plugin/shenyu-plugin-tcp/src/main/java/org/apache/shenyu/plugin/tcp/handler/TcpUpstreamDataHandler.java b/shenyu-plugin/shenyu-plugin-tcp/src/main/java/org/apache/shenyu/plugin/tcp/handler/TcpUpstreamDataHandler.java
new file mode 100644
index 000000000..2c1c4cc39
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-tcp/src/main/java/org/apache/shenyu/plugin/tcp/handler/TcpUpstreamDataHandler.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.tcp.handler;
+
+import org.apache.shenyu.common.dto.ProxySelectorData;
+import org.apache.shenyu.common.dto.convert.selector.DiscoveryUpstream;
+import org.apache.shenyu.plugin.base.handler.ProxySelectorDataHandler;
+import org.apache.shenyu.protocol.tcp.BootstrapServer;
+import org.apache.shenyu.protocol.tcp.TcpServerConfiguration;
+import org.apache.shenyu.protocol.tcp.UpstreamProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * upstreamList data change.
+ */
+public class TcpUpstreamDataHandler implements ProxySelectorDataHandler {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TcpUpstreamDataHandler.class);
+
+ private final Map<String, BootstrapServer> cache = new ConcurrentHashMap<>();
+
+ @Override
+ public void handlerProxySelector(final ProxySelectorData proxySelectorData, final List<DiscoveryUpstream> upstreamsList) {
+ String name = proxySelectorData.getName();
+ Integer forwardPort = proxySelectorData.getForwardPort();
+ TcpServerConfiguration tcpServerConfiguration = new TcpServerConfiguration();
+ tcpServerConfiguration.setPort(forwardPort);
+ tcpServerConfiguration.setProps(proxySelectorData.getProps());
+ tcpServerConfiguration.setPluginSelectorName(name);
+ UpstreamProvider.getSingleton().createUpstreams(name, upstreamsList);
+ BootstrapServer bootstrapServer = TcpBootstrapFactory.getSingleton().createBootstrapServer(tcpServerConfiguration);
+ cache.put(name, bootstrapServer);
+ LOG.info("shenyu create TcpBootstrapServer success port is {}", forwardPort);
+ }
+
+ @Override
+ public void updateProxySelector(final String proxySelectorName, final List<DiscoveryUpstream> upstreamsList) {
+ List<DiscoveryUpstream> removed = UpstreamProvider.getSingleton().refreshCache(proxySelectorName, upstreamsList);
+ BootstrapServer bootstrapServer = cache.get(proxySelectorName);
+ bootstrapServer.removeCommonUpstream(removed);
+ LOG.info("shenyu update TcpBootstrapServer success remove is {}", removed);
+ }
+
+ @Override
+ public void removeProxySelector(final String proxySelectorName) {
+ if (cache.containsKey(proxySelectorName)) {
+ cache.remove(proxySelectorName).shutdown();
+ LOG.info("shenyu shutdown {}", proxySelectorName);
+ }
+ }
+
+}
diff --git a/shenyu-protocol/pom.xml b/shenyu-protocol/pom.xml
index 70c1b5567..5190a9f20 100644
--- a/shenyu-protocol/pom.xml
+++ b/shenyu-protocol/pom.xml
@@ -29,5 +29,6 @@
<modules>
<module>shenyu-protocol-grpc</module>
<module>shenyu-protocol-mqtt</module>
+ <module>shenyu-protocol-tcp</module>
</modules>
</project>
\ No newline at end of file
diff --git a/shenyu-protocol/shenyu-protocol-tcp/pom.xml b/shenyu-protocol/shenyu-protocol-tcp/pom.xml
new file mode 100644
index 000000000..497fcb0ee
--- /dev/null
+++ b/shenyu-protocol/shenyu-protocol-tcp/pom.xml
@@ -0,0 +1,71 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+~ Licensed to the Apache Software Foundation (ASF) under one or more
+~ contributor license agreements. See the NOTICE file distributed with
+~ this work for additional information regarding copyright ownership.
+~ The ASF licenses this file to You under the Apache License, Version 2.0
+~ (the "License"); you may not use this file except in compliance with
+~ the License. You may obtain a copy of the License at
+~
+~ http://www.apache.org/licenses/LICENSE-2.0
+~
+~ Unless required by applicable law or agreed to in writing, software
+~ distributed under the License is distributed on an "AS IS" BASIS,
+~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+~ See the License for the specific language governing permissions and
+~ limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <artifactId>shenyu-protocol</artifactId>
+ <groupId>org.apache.shenyu</groupId>
+ <version>2.6.0-SNAPSHOT</version>
+ </parent>
+ <artifactId>shenyu-protocol-tcp</artifactId>
+
+ <properties>
+ <maven.compiler.source>8</maven.compiler.source>
+ <maven.compiler.target>8</maven.compiler.target>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ </properties>
+ <dependencies>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-simple</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.shenyu</groupId>
+ <artifactId>shenyu-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>io.projectreactor.netty</groupId>
+ <artifactId>reactor-netty-core</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.shenyu</groupId>
+ <artifactId>shenyu-loadbalancer</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ <version>2.11.0</version>
+ </dependency>
+
+ </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/shenyu-discovery/shenyu-discovery-api/src/main/java/org/apache/shenyu/discovery/api/ShenyuDiscoveryService.java b/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/BootstrapServer.java
similarity index 54%
copy from shenyu-discovery/shenyu-discovery-api/src/main/java/org/apache/shenyu/discovery/api/ShenyuDiscoveryService.java
copy to shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/BootstrapServer.java
index e286e7015..ff009d567 100644
--- a/shenyu-discovery/shenyu-discovery-api/src/main/java/org/apache/shenyu/discovery/api/ShenyuDiscoveryService.java
+++ b/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/BootstrapServer.java
@@ -15,38 +15,33 @@
* limitations under the License.
*/
-package org.apache.shenyu.discovery.api;
+package org.apache.shenyu.protocol.tcp;
-import org.apache.shenyu.discovery.api.config.DiscoveryConfig;
-import org.apache.shenyu.discovery.api.listener.DataChangedEventListener;
-import org.apache.shenyu.spi.SPI;
+import org.apache.shenyu.common.dto.convert.selector.DiscoveryUpstream;
+
+import java.util.List;
/**
- * The interface for shenyu discovery service.
+ * BootstrapServer.
*/
-@SPI
-public interface ShenyuDiscoveryService {
-
+public interface BootstrapServer {
+
/**
- * Init shenyu discovery service .
+ * start.
*
- * @param config the config
+ * @param tcpServerConfiguration tcpServerConfiguration
*/
- void init(DiscoveryConfig config);
-
+ void start(TcpServerConfiguration tcpServerConfiguration);
+
/**
- * Watcher path , fire data changed event.
+ * doOnUpdate.
*
- * @param key the key
- * @param listener the listener
+ * @param removeList removeList
*/
- void watcher(String key, DataChangedEventListener listener);
-
+ void removeCommonUpstream(List<DiscoveryUpstream> removeList);
+
/**
- * Register data.
- *
- * @param key the key
- * @param value the value
+ * shutdown.
*/
- void register(String key, String value);
+ void shutdown();
}
diff --git a/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/TcpBootstrapServer.java b/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/TcpBootstrapServer.java
new file mode 100644
index 000000000..8d3e6fa2b
--- /dev/null
+++ b/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/TcpBootstrapServer.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.protocol.tcp;
+
+import com.google.common.eventbus.EventBus;
+import io.netty.handler.logging.LogLevel;
+import io.netty.handler.logging.LoggingHandler;
+
+import org.apache.shenyu.common.dto.convert.selector.DiscoveryUpstream;
+import org.apache.shenyu.protocol.tcp.connection.Bridge;
+import org.apache.shenyu.protocol.tcp.connection.ConnectionContext;
+import org.apache.shenyu.protocol.tcp.connection.DefaultConnectionConfigProvider;
+import org.apache.shenyu.protocol.tcp.connection.TcpConnectionBridge;
+import org.apache.shenyu.protocol.tcp.connection.ActivityConnectionObserver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import reactor.core.publisher.Mono;
+import reactor.netty.Connection;
+import reactor.netty.DisposableServer;
+import reactor.netty.resources.LoopResources;
+import reactor.netty.tcp.TcpServer;
+
+import java.net.SocketAddress;
+import java.util.List;
+
+
+/**
+ * BootstrapServer.
+ */
+public class TcpBootstrapServer implements BootstrapServer {
+ private static final Logger LOG = LoggerFactory.getLogger(TcpBootstrapServer.class);
+
+ private Bridge bridge;
+
+ private ConnectionContext connectionContext;
+
+ private DisposableServer server;
+
+ private final EventBus eventBus;
+
+ public TcpBootstrapServer(final EventBus eventBus) {
+ this.eventBus = eventBus;
+ }
+
+ @Override
+ public void start(final TcpServerConfiguration tcpServerConfiguration) {
+ String loadBalanceAlgorithm = tcpServerConfiguration.getProps().getOrDefault("shenyu.tcpPlugin.tcpServerConfiguration.props.loadBalanceAlgorithm", "random").toString();
+ DefaultConnectionConfigProvider connectionConfigProvider = new DefaultConnectionConfigProvider(loadBalanceAlgorithm, tcpServerConfiguration.getPluginSelectorName());
+ this.bridge = new TcpConnectionBridge();
+ connectionContext = new ConnectionContext(connectionConfigProvider);
+ connectionContext.init(tcpServerConfiguration.getProps());
+ LoopResources loopResources = LoopResources.create("shenyu-tcp-bootstrap-server", tcpServerConfiguration.getBossGroupThreadCount(),
+ tcpServerConfiguration.getWorkerGroupThreadCount(), true);
+
+ TcpServer tcpServer = TcpServer.create()
+ .doOnChannelInit((connObserver, channel, remoteAddress) -> {
+ channel.pipeline().addFirst(new LoggingHandler(LogLevel.INFO));
+ })
+ .wiretap(true)
+ .observe((c, s) -> {
+ LOG.info("connection={}|status={}", c, s);
+ })
+ //.childObserve(connectionObserver)
+ .doOnConnection(this::bridgeConnections)
+ .port(tcpServerConfiguration.getPort())
+ .runOn(loopResources);
+ server = tcpServer.bindNow();
+ }
+
+ private void bridgeConnections(final Connection serverConn) {
+ LOG.info("Starting proxy client ={}", serverConn);
+ SocketAddress socketAddress = serverConn.channel().remoteAddress();
+ ActivityConnectionObserver connectionObserver = new ActivityConnectionObserver("TcpClient");
+ eventBus.register(connectionObserver);
+ Mono<Connection> client = connectionContext.getTcpClientConnection(getIp(socketAddress), connectionObserver);
+ client.subscribe(clientConn -> bridge.bridge(serverConn, clientConn));
+ }
+
+ private String getIp(final SocketAddress socketAddress) {
+ if (socketAddress == null) {
+ throw new NullPointerException("remoteAddress is null");
+ }
+ String address = socketAddress.toString();
+ return address.substring(2, address.indexOf(':'));
+ }
+
+ /**
+ * doOnUpdate.
+ *
+ * @param removeList removeList
+ */
+ @Override
+ public void removeCommonUpstream(final List<DiscoveryUpstream> removeList) {
+ eventBus.post(removeList);
+ }
+
+
+ /**
+ * shutdown.
+ */
+ @Override
+ public void shutdown() {
+ server.disposeNow();
+ }
+
+}
diff --git a/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/TcpServerConfiguration.java b/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/TcpServerConfiguration.java
new file mode 100644
index 000000000..0e07a54af
--- /dev/null
+++ b/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/TcpServerConfiguration.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.protocol.tcp;
+
+import java.util.Properties;
+
+/**
+ * tcp server configuration.
+ */
+public class TcpServerConfiguration {
+
+ private String pluginSelectorName;
+
+ private int port = 9500;
+
+ private int bossGroupThreadCount = 1;
+
+ private int maxPayloadSize = 65536;
+
+ private int workerGroupThreadCount = 12;
+
+ private String leakDetectorLevel = "DISABLED";
+
+ private Properties props = new Properties();
+
+ /**
+ * getPluginSelectorName.
+ *
+ * @return pluginSelectorName
+ */
+ public String getPluginSelectorName() {
+ return pluginSelectorName;
+ }
+
+ /**
+ * setPluginSelectorName.
+ *
+ * @param pluginSelectorName pluginSelectorName
+ */
+ public void setPluginSelectorName(final String pluginSelectorName) {
+ this.pluginSelectorName = pluginSelectorName;
+ }
+
+ /**
+ * get port.
+ *
+ * @return port
+ */
+ public int getPort() {
+ return port;
+ }
+
+ /**
+ * set port.
+ *
+ * @param port port
+ */
+ public void setPort(final int port) {
+ this.port = port;
+ }
+
+ /**
+ * get bossGroupThreadCount.
+ *
+ * @return bossGroupThreadCount
+ */
+ public int getBossGroupThreadCount() {
+ return bossGroupThreadCount;
+ }
+
+ /**
+ * set bossGroupThreadCount.
+ *
+ * @param bossGroupThreadCount bossGroupThreadCount
+ */
+ public void setBossGroupThreadCount(final int bossGroupThreadCount) {
+ this.bossGroupThreadCount = bossGroupThreadCount;
+ }
+
+ /**
+ * get maxPayloadSize.
+ *
+ * @return maxPayloadSize
+ */
+ public int getMaxPayloadSize() {
+ return maxPayloadSize;
+ }
+
+ /**
+ * set maxPayloadSize.
+ *
+ * @param maxPayloadSize maxPayloadSize
+ */
+ public void setMaxPayloadSize(final int maxPayloadSize) {
+ this.maxPayloadSize = maxPayloadSize;
+ }
+
+ /**
+ * get workerGroupThreadCount.
+ *
+ * @return workerGroupThreadCount
+ */
+ public int getWorkerGroupThreadCount() {
+ return workerGroupThreadCount;
+ }
+
+ /**
+ * set workerGroupThreadCount.
+ *
+ * @param workerGroupThreadCount workerGroupThreadCount
+ */
+ public void setWorkerGroupThreadCount(final int workerGroupThreadCount) {
+ this.workerGroupThreadCount = workerGroupThreadCount;
+ }
+
+ /**
+ * get leakDetectorLevel.
+ *
+ * @return leakDetectorLevel
+ */
+ public String getLeakDetectorLevel() {
+ return leakDetectorLevel;
+ }
+
+ /**
+ * set leakDetectorLevel.
+ *
+ * @param leakDetectorLevel leakDetectorLevel
+ */
+ public void setLeakDetectorLevel(final String leakDetectorLevel) {
+ this.leakDetectorLevel = leakDetectorLevel;
+ }
+
+ /**
+ * getProps.
+ *
+ * @return props
+ */
+ public Properties getProps() {
+ return props;
+ }
+
+ /**
+ * setProps.
+ *
+ * @param props props
+ */
+ public void setProps(final Properties props) {
+ this.props = props;
+ }
+
+}
diff --git a/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/UpstreamProvider.java b/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/UpstreamProvider.java
new file mode 100644
index 000000000..116b1baed
--- /dev/null
+++ b/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/UpstreamProvider.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.protocol.tcp;
+
+import org.apache.shenyu.common.dto.convert.selector.DiscoveryUpstream;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * UpstreamProvider.
+ */
+public final class UpstreamProvider {
+
+ private static final UpstreamProvider SINGLETON = new UpstreamProvider();
+
+ private final Map<String, List<DiscoveryUpstream>> cache = new ConcurrentHashMap<>();
+
+ private UpstreamProvider() {
+ }
+
+ /**
+ * getSingleton.
+ *
+ * @return UpstreamProvider
+ */
+ public static UpstreamProvider getSingleton() {
+ return SINGLETON;
+ }
+
+ /**
+ * provide.
+ *
+ * @param pluginSelectorName pluginSelectorName
+ * @return UpstreamList
+ */
+ public List<DiscoveryUpstream> provide(final String pluginSelectorName) {
+ return cache.get(pluginSelectorName);
+ }
+
+ /**
+ * createUpstreams.
+ *
+ * @param pluginSelectorName pluginSelectorName
+ * @param upstreams upstreams
+ */
+ public void createUpstreams(final String pluginSelectorName, final List<DiscoveryUpstream> upstreams) {
+ cache.put(pluginSelectorName, upstreams);
+ }
+
+ /**
+ * refreshCache.
+ *
+ * @param pluginSelectorName pluginSelectorName
+ * @param upstreams upstreams
+ * @return removeList
+ */
+ public List<DiscoveryUpstream> refreshCache(final String pluginSelectorName, final List<DiscoveryUpstream> upstreams) {
+ List<DiscoveryUpstream> remove = cache.remove(pluginSelectorName);
+ cache.put(pluginSelectorName, upstreams);
+ return remove;
+ }
+}
diff --git a/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/connection/ActivityConnectionObserver.java b/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/connection/ActivityConnectionObserver.java
new file mode 100644
index 000000000..0cbdf5403
--- /dev/null
+++ b/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/connection/ActivityConnectionObserver.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.protocol.tcp.connection;
+
+import com.google.common.eventbus.Subscribe;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.shenyu.common.dto.convert.selector.DiscoveryUpstream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import reactor.netty.Connection;
+import reactor.netty.ConnectionObserver;
+
+import java.net.SocketAddress;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * ActivityConnectionObserver.
+ */
+public class ActivityConnectionObserver implements ConnectionObserver {
+ private static final Logger LOG = LoggerFactory.getLogger(ActivityConnectionObserver.class);
+
+ private final Map<Connection, State> cache = new ConcurrentHashMap<>();
+
+ private final String name;
+
+ public ActivityConnectionObserver(final String name) {
+ this.name = name;
+ }
+
+ @Override
+ public void onStateChange(final Connection connection, final State newState) {
+ if (newState == State.CONNECTED) {
+ cache.put(connection, newState);
+ LOG.info("{} add connection into cache ={}", name, connection);
+ } else if (newState == State.DISCONNECTING
+ || newState == State.RELEASED
+ ) {
+ cache.remove(connection);
+ LOG.info("{} remove connection into cache ={}", name, connection);
+ } else {
+ if (cache.containsKey(connection)) {
+ cache.put(connection, newState);
+ }
+ }
+ }
+
+ /**
+ * onRemove.
+ *
+ * @param remove removeList
+ */
+ @Subscribe
+ public void onRemove(final List<DiscoveryUpstream> remove) {
+ LOG.info("shenyu {} ConnectionObserver do on remove upstreams", name);
+ for (Connection connection : cache.keySet()) {
+ SocketAddress socketAddress = connection.channel().remoteAddress();
+ if (in(remove, socketAddress)) {
+ LOG.info("shenyu dispose {} connection ", connection);
+ connection.disposeNow();
+ }
+ }
+ }
+
+ /**
+ * in.
+ *
+ * @param removeList removeList
+ * @param cacheSocketAddress cacheSocketAddress
+ * @return boolean
+ */
+ private boolean in(final List<DiscoveryUpstream> removeList, final SocketAddress cacheSocketAddress) {
+ return removeList.stream().anyMatch(u -> {
+ String cacheUrl = cacheSocketAddress.toString().substring(1);
+ String removedUrl = u.getUpstreamUrl();
+ LOG.info("compare {} , {}", cacheUrl, removedUrl);
+ return StringUtils.equals(cacheUrl, removedUrl);
+ });
+ }
+}
diff --git a/shenyu-discovery/shenyu-discovery-zookeeper/src/main/java/org/apache/shenyu/discovery/zookeeper/ZookeeperDiscoveryService.java b/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/connection/Bridge.java
similarity index 53%
copy from shenyu-discovery/shenyu-discovery-zookeeper/src/main/java/org/apache/shenyu/discovery/zookeeper/ZookeeperDiscoveryService.java
copy to shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/connection/Bridge.java
index 0ffa670b9..e4d027f63 100644
--- a/shenyu-discovery/shenyu-discovery-zookeeper/src/main/java/org/apache/shenyu/discovery/zookeeper/ZookeeperDiscoveryService.java
+++ b/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/connection/Bridge.java
@@ -15,31 +15,20 @@
* limitations under the License.
*/
-package org.apache.shenyu.discovery.zookeeper;
+package org.apache.shenyu.protocol.tcp.connection;
-import org.apache.shenyu.discovery.api.ShenyuDiscoveryService;
-import org.apache.shenyu.discovery.api.config.DiscoveryConfig;
-import org.apache.shenyu.discovery.api.listener.DataChangedEventListener;
-import org.apache.shenyu.spi.Join;
+import reactor.netty.Connection;
/**
- * The type Zookeeper for shenyu discovery service.
+ * Bridge.
*/
-@Join
-public class ZookeeperDiscoveryService implements ShenyuDiscoveryService {
-
- @Override
- public void init(final DiscoveryConfig config) {
-
- }
-
- @Override
- public void watcher(final String key, final DataChangedEventListener listener) {
-
- }
-
- @Override
- public void register(final String key, final String value) {
-
- }
+public interface Bridge {
+
+ /**
+ * bridge.
+ *
+ * @param server tcp server connection
+ * @param client tcp client connection
+ */
+ void bridge(Connection server, Connection client);
}
diff --git a/shenyu-discovery/shenyu-discovery-zookeeper/src/main/java/org/apache/shenyu/discovery/zookeeper/ZookeeperDiscoveryService.java b/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/connection/ClientConnectionConfigProvider.java
similarity index 53%
copy from shenyu-discovery/shenyu-discovery-zookeeper/src/main/java/org/apache/shenyu/discovery/zookeeper/ZookeeperDiscoveryService.java
copy to shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/connection/ClientConnectionConfigProvider.java
index 0ffa670b9..8fbf41bdf 100644
--- a/shenyu-discovery/shenyu-discovery-zookeeper/src/main/java/org/apache/shenyu/discovery/zookeeper/ZookeeperDiscoveryService.java
+++ b/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/connection/ClientConnectionConfigProvider.java
@@ -15,31 +15,21 @@
* limitations under the License.
*/
-package org.apache.shenyu.discovery.zookeeper;
+package org.apache.shenyu.protocol.tcp.connection;
-import org.apache.shenyu.discovery.api.ShenyuDiscoveryService;
-import org.apache.shenyu.discovery.api.config.DiscoveryConfig;
-import org.apache.shenyu.discovery.api.listener.DataChangedEventListener;
-import org.apache.shenyu.spi.Join;
+import java.net.URI;
/**
- * The type Zookeeper for shenyu discovery service.
+ * ClientConnectionConfigProvider.
*/
-@Join
-public class ZookeeperDiscoveryService implements ShenyuDiscoveryService {
-
- @Override
- public void init(final DiscoveryConfig config) {
-
- }
-
- @Override
- public void watcher(final String key, final DataChangedEventListener listener) {
-
- }
-
- @Override
- public void register(final String key, final String value) {
-
- }
+public interface ClientConnectionConfigProvider {
+
+ /**
+ * getProxiedService.
+ *
+ * @param ip ip
+ * @return URI
+ */
+ URI getProxiedService(String ip);
+
}
diff --git a/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/connection/ConnectionContext.java b/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/connection/ConnectionContext.java
new file mode 100644
index 000000000..dfcb6ef72
--- /dev/null
+++ b/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/connection/ConnectionContext.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.protocol.tcp.connection;
+
+import reactor.core.publisher.Mono;
+import reactor.netty.Connection;
+import reactor.netty.resources.ConnectionProvider;
+import reactor.netty.tcp.TcpClient;
+
+import java.time.Duration;
+import java.util.Properties;
+
+/**
+ * ConnectionContext.
+ */
+public class ConnectionContext {
+
+ private final ClientConnectionConfigProvider connectionConfigProvider;
+
+ private ConnectionProvider connectionProvider;
+
+ public ConnectionContext(final ClientConnectionConfigProvider connectionConfigProvider) {
+ this.connectionConfigProvider = connectionConfigProvider;
+ }
+
+ /**
+ * init.
+ *
+ * @param props props
+ */
+ public void init(final Properties props) {
+ final String maxTotal = props.getProperty("tcpProxy.maxConnections", "800");
+ final String maxIdleTimeMs = props.getProperty("tcpProxy.maxIdleTimeMs", "100");
+ final String tcpProxyClientName = props.getProperty("tcpProxy.Name", "shenyu-tcp-connection-pool-client");
+ final String disposeTimeoutMs = props.getProperty("tcpProxy.disposeTimeoutMs", "2000");
+ connectionProvider = ConnectionProvider.builder(tcpProxyClientName)
+ .maxConnections(Integer.parseInt(maxTotal))
+ .maxIdleTime(Duration.ofMillis(Integer.parseInt(maxIdleTimeMs)))
+ .disposeTimeout(Duration.ofMillis(Integer.parseInt(disposeTimeoutMs)))
+ .build();
+ }
+
+ /**
+ * getTcpClientConnection.
+ *
+ * @param ip ip
+ * @param observer observer
+ * @return MonoConnection
+ */
+ public Mono<Connection> getTcpClientConnection(final String ip, final ActivityConnectionObserver observer) {
+ return Mono.just(connectionConfigProvider.getProxiedService(ip))
+ .flatMap(url ->
+ TcpClient.create(connectionProvider)
+ .host(url.getHost())
+ .port(url.getPort())
+ .observe(observer)
+ .connect()
+ );
+ }
+
+}
diff --git a/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/connection/DefaultConnectionConfigProvider.java b/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/connection/DefaultConnectionConfigProvider.java
new file mode 100644
index 000000000..595601328
--- /dev/null
+++ b/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/connection/DefaultConnectionConfigProvider.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.protocol.tcp.connection;
+
+import org.apache.shenyu.common.exception.ShenyuException;
+import org.apache.shenyu.loadbalancer.entity.Upstream;
+import org.apache.shenyu.loadbalancer.factory.LoadBalancerFactory;
+import org.apache.shenyu.protocol.tcp.UpstreamProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+
+/**
+ * ClientConnectionConfigProviderFactory.
+ */
+public class DefaultConnectionConfigProvider implements ClientConnectionConfigProvider {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DefaultConnectionConfigProvider.class);
+
+ private final String loadBalanceAlgorithm;
+
+ private final String pluginSelectorName;
+
+ public DefaultConnectionConfigProvider(final String loadBalanceAlgorithm, final String pluginSelectorName) {
+ this.loadBalanceAlgorithm = loadBalanceAlgorithm;
+ this.pluginSelectorName = pluginSelectorName;
+ }
+
+ @Override
+ public URI getProxiedService(final String ip) {
+ List<Upstream> upstreamList = UpstreamProvider.getSingleton().provide(this.pluginSelectorName).stream().map(dp -> {
+ return Upstream.builder().url(dp.getUpstreamUrl()).status(dp.isStatus()).weight(dp.getWeight()).protocol(dp.getProtocol()).build();
+ }).collect(Collectors.toList());
+ Upstream upstream = LoadBalancerFactory.selector(upstreamList, loadBalanceAlgorithm, ip);
+ return cover(upstream);
+ }
+
+ private URI cover(final Upstream upstream) {
+ try {
+ return new URI(upstream.getProtocol() + "://" + upstream.getUrl());
+ } catch (URISyntaxException e) {
+ LOG.error("Upstream url is wrong", e);
+ throw new ShenyuException(e);
+ }
+ }
+
+}
diff --git a/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/connection/TcpConnectionBridge.java b/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/connection/TcpConnectionBridge.java
new file mode 100644
index 000000000..66945e9da
--- /dev/null
+++ b/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/connection/TcpConnectionBridge.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.protocol.tcp.connection;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import reactor.core.Disposable;
+import reactor.core.Disposables;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.netty.Connection;
+import reactor.netty.NettyInbound;
+import reactor.netty.NettyOutbound;
+
+/**
+ * TcpConnectionBridge.
+ */
+public class TcpConnectionBridge implements Bridge {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TcpConnectionBridge.class);
+
+ @Override
+ public void bridge(final Connection server, final Connection client) {
+ // LOG.info("start server#inbound -> client#outbound");
+ Disposable requestDisposable = bridge(server.inbound(), client.outbound());
+ // LOG.info("start client#inbound -> server#outbound");
+ Disposable responseDisposable = bridge(client.inbound(), server.outbound());
+ // binding dispose: when server connection is disposed ,client while close too.
+ server.onDispose(Disposables.composite(requestDisposable, responseDisposable, client.channel()::close));
+ client.onDispose(Disposables.composite(requestDisposable, responseDisposable, server.channel()::close));
+ }
+
+ private Disposable bridge(final NettyInbound inbound, final NettyOutbound outbound) {
+ Flux<byte[]> flux = inbound.receive().asByteArray();
+ return flux.concatMap(next -> outbound.sendByteArray(Mono.just(next))).subscribe();
+ }
+
+}