You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by al...@apache.org on 2022/07/18 09:36:16 UTC
[dubbo-spi-extensions] branch master updated: Enhance remoting module (#131)
This is an automated email from the ASF dual-hosted git repository.
albumenj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dubbo-spi-extensions.git
The following commit(s) were added to refs/heads/master by this push:
new ce25638 Enhance remoting module (#131)
ce25638 is described below
commit ce25638bf8d4eb9ff1eccda7cc23d9fec35bd397
Author: Albumen Kevin <jh...@gmail.com>
AuthorDate: Mon Jul 18 17:36:11 2022 +0800
Enhance remoting module (#131)
* add etcd3 remoting
* add grizzly remoting
* add mina remoting
* add p2p remoting
* add redis remoting
* opt pom
* fix compile
* fix compile
---
dubbo-extensions-dependencies-bom/pom.xml | 84 ++-
.../dubbo-remoting-etcd3/pom.xml | 107 +++
.../dubbo/remoting/etcd/AbstractRetryPolicy.java | 45 ++
.../apache/dubbo/remoting/etcd/ChildListener.java | 25 +
.../org/apache/dubbo/remoting/etcd/Constants.java | 55 ++
.../org/apache/dubbo/remoting/etcd/EtcdClient.java | 191 ++++++
.../dubbo/remoting/etcd/EtcdTransporter.java | 47 ++
.../apache/dubbo/remoting/etcd/RetryPolicy.java | 31 +
.../apache/dubbo/remoting/etcd/StateListener.java | 27 +
.../etcd/jetcd/ConnectionStateListener.java | 31 +
.../dubbo/remoting/etcd/jetcd/JEtcdClient.java | 473 +++++++++++++
.../remoting/etcd/jetcd/JEtcdClientWrapper.java | 752 +++++++++++++++++++++
.../remoting/etcd/jetcd/JEtcdTransporter.java | 30 +
.../dubbo/remoting/etcd/jetcd/RetryLoops.java | 100 +++
.../dubbo/remoting/etcd/jetcd/RetryNTimes.java | 36 +
.../dubbo/remoting/etcd/option/OptionUtil.java | 73 ++
.../remoting/etcd/support/AbstractEtcdClient.java | 194 ++++++
.../org.apache.dubbo.remoting.etcd.EtcdTransporter | 1 +
.../dubbo/remoting/etcd/jetcd/JEtcdClientTest.java | 428 ++++++++++++
.../etcd/jetcd/JEtcdClientWrapperTest.java | 187 +++++
.../dubbo/remoting/etcd/jetcd/LeaseTest.java | 156 +++++
.../{ => dubbo-remoting-grizzly}/pom.xml | 32 +-
.../remoting/transport/grizzly/GrizzlyChannel.java | 198 ++++++
.../remoting/transport/grizzly/GrizzlyClient.java | 112 +++
.../transport/grizzly/GrizzlyCodecAdapter.java | 143 ++++
.../remoting/transport/grizzly/GrizzlyHandler.java | 118 ++++
.../remoting/transport/grizzly/GrizzlyServer.java | 129 ++++
.../transport/grizzly/GrizzlyTransporter.java | 43 ++
.../internal/org.apache.dubbo.remoting.Transporter | 1 +
.../transport/grizzly/GrizzlyTransporterTest.java | 41 ++
.../dubbo-remoting-mina/pom.xml | 51 ++
.../dubbo/remoting/transport/mina/MinaChannel.java | 191 ++++++
.../dubbo/remoting/transport/mina/MinaClient.java | 174 +++++
.../remoting/transport/mina/MinaCodecAdapter.java | 167 +++++
.../dubbo/remoting/transport/mina/MinaHandler.java | 95 +++
.../dubbo/remoting/transport/mina/MinaServer.java | 112 +++
.../remoting/transport/mina/MinaTransporter.java | 40 ++
.../internal/org.apache.dubbo.remoting.Transporter | 1 +
.../transport/mina/ClientToServerTest.java | 92 +++
.../remoting/transport/mina/ClientsTest.java | 65 ++
.../org/apache/remoting/transport/mina/Hello.java | 45 ++
.../transport/mina/MinaClientToServerTest.java | 41 ++
.../org/apache/remoting/transport/mina/World.java | 45 ++
.../remoting/transport/mina/WorldHandler.java | 36 +
.../{ => dubbo-remoting-p2p}/pom.xml | 33 +-
.../java/org/apache/dubbo/remoting/p2p/Group.java | 57 ++
.../org/apache/dubbo/remoting/p2p/Networker.java | 39 ++
.../org/apache/dubbo/remoting/p2p/Networkers.java | 47 ++
.../java/org/apache/dubbo/remoting/p2p/Peer.java | 36 +
.../dubbo/remoting/p2p/exchange/ExchangeGroup.java | 36 +
.../remoting/p2p/exchange/ExchangeNetworker.java | 35 +
.../remoting/p2p/exchange/ExchangeNetworkers.java | 45 ++
.../dubbo/remoting/p2p/exchange/ExchangePeer.java | 26 +
.../exchange/support/AbstractExchangeGroup.java | 128 ++++
.../p2p/exchange/support/ExchangeServerPeer.java | 137 ++++
.../p2p/exchange/support/FileExchangeGroup.java | 135 ++++
.../exchange/support/FileExchangeNetworker.java | 34 +
.../exchange/support/MulticastExchangeGroup.java | 108 +++
.../support/MulticastExchangeNetworker.java | 34 +
.../dubbo/remoting/p2p/support/AbstractGroup.java | 119 ++++
.../dubbo/remoting/p2p/support/FileGroup.java | 133 ++++
.../dubbo/remoting/p2p/support/FileNetworker.java | 34 +
.../dubbo/remoting/p2p/support/MulticastGroup.java | 108 +++
.../remoting/p2p/support/MulticastNetworker.java | 34 +
.../dubbo/remoting/p2p/support/ServerPeer.java | 124 ++++
.../org.apache.dubbo.remoting.p2p.Networker | 2 +
.../support/MulticastExchangeNetworkerTest.java | 81 +++
.../remoting/p2p/support/FileNetworkerTest.java | 83 +++
.../p2p/support/MulticastNetworkerTest.java | 71 ++
.../dubbo-remoting-redis/pom.xml | 53 ++
.../apache/dubbo/remoting/redis/RedisClient.java | 46 ++
.../remoting/redis/jedis/ClusterRedisClient.java | 136 ++++
.../remoting/redis/jedis/MonoRedisClient.java | 119 ++++
.../remoting/redis/jedis/SentinelRedisClient.java | 122 ++++
.../redis/support/AbstractRedisClient.java | 95 +++
dubbo-remoting-extensions/pom.xml | 5 +
76 files changed, 7311 insertions(+), 29 deletions(-)
diff --git a/dubbo-extensions-dependencies-bom/pom.xml b/dubbo-extensions-dependencies-bom/pom.xml
index ffaf218..977804d 100644
--- a/dubbo-extensions-dependencies-bom/pom.xml
+++ b/dubbo-extensions-dependencies-bom/pom.xml
@@ -116,6 +116,13 @@
<kryo_serializers_version>0.42</kryo_serializers_version>
<msgpack_version>0.8.22</msgpack_version>
<protostuff_version>1.5.9</protostuff_version>
+ <mina_version>1.1.7</mina_version>
+ <slf4j_version>1.7.25</slf4j_version>
+ <grizzly_version>2.4.4</grizzly_version>
+ <jetcd_version>0.5.7</jetcd_version>
+ <grpc.version>1.31.1</grpc.version>
+ <etcd_launcher_version>0.5.7</etcd_launcher_version>
+ <netty4_version>4.1.66.Final</netty4_version>
<maven_flatten_version>1.2.5</maven_flatten_version>
</properties>
@@ -313,7 +320,82 @@
<artifactId>protostuff-runtime</artifactId>
<version>${protostuff_version}</version>
</dependency>
-
+ <dependency>
+ <groupId>org.apache.mina</groupId>
+ <artifactId>mina-core</artifactId>
+ <version>${mina_version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>${slf4j_version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.glassfish.grizzly</groupId>
+ <artifactId>grizzly-core</artifactId>
+ <version>${grizzly_version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.etcd</groupId>
+ <artifactId>jetcd-core</artifactId>
+ <version>${jetcd_version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-codec-http2</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-handler-proxy</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-core</artifactId>
+ <version>${grpc.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-netty-shaded</artifactId>
+ <version>${grpc.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-netty</artifactId>
+ <version>${grpc.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-protobuf</artifactId>
+ <version>${grpc.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-stub</artifactId>
+ <version>${grpc.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-grpclb</artifactId>
+ <version>${grpc.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.etcd</groupId>
+ <artifactId>jetcd-launcher</artifactId>
+ <version>${etcd_launcher_version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.github.spotbugs</groupId>
+ <artifactId>spotbugs-annotations</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-all</artifactId>
+ <version>${netty4_version}</version>
+ </dependency>
</dependencies>
</dependencyManagement>
diff --git a/dubbo-remoting-extensions/dubbo-remoting-etcd3/pom.xml b/dubbo-remoting-extensions/dubbo-remoting-etcd3/pom.xml
new file mode 100644
index 0000000..dd7df73
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-etcd3/pom.xml
@@ -0,0 +1,107 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>dubbo-remoting-extensions</artifactId>
+ <groupId>org.apache.dubbo.extensions</groupId>
+ <version>${revision}</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <modelVersion>4.0.0</modelVersion>
+ <version>1.0.0-SNAPSHOT</version>
+ <artifactId>dubbo-remoting-etcd3</artifactId>
+ <packaging>jar</packaging>
+ <name>${project.artifactId}</name>
+ <description>The etcd3 remoting module of Dubbo project</description>
+
+ <properties>
+ <assertj.version>3.13.2</assertj.version>
+ <skipIntegrationTests>true</skipIntegrationTests>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.dubbo</groupId>
+ <artifactId>dubbo-remoting-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.dubbo</groupId>
+ <artifactId>dubbo-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.etcd</groupId>
+ <artifactId>jetcd-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-netty</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-protobuf</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-stub</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-grpclb</artifactId>
+ </dependency>
+ <!-- https://mvnrepository.com/artifact/io.etcd/jetcd-launcher -->
+ <dependency>
+ <groupId>io.etcd</groupId>
+ <artifactId>jetcd-launcher</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-all</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.assertj</groupId>
+ <artifactId>assertj-core</artifactId>
+ <version>${assertj.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <skipTests>${skipIntegrationTests}</skipTests>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+
+</project>
diff --git a/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/AbstractRetryPolicy.java b/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/AbstractRetryPolicy.java
new file mode 100644
index 0000000..b084ba3
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/AbstractRetryPolicy.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.etcd;
+
+public abstract class AbstractRetryPolicy implements RetryPolicy {
+
+ private final int maxRetried;
+
+ protected AbstractRetryPolicy(int maxRetried) {
+ this.maxRetried = maxRetried;
+ }
+
+ @Override
+ public boolean shouldRetry(int retried, long elapsed, boolean sleep) {
+ if (retried < maxRetried) {
+ try {
+ if (sleep) {
+ Thread.sleep(getSleepTime(retried, elapsed));
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return false;
+ }
+ return true;
+ }
+ return false;
+ }
+
+ protected abstract long getSleepTime(int retried, long elapsed);
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/ChildListener.java b/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/ChildListener.java
new file mode 100644
index 0000000..46a0af8
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/ChildListener.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.etcd;
+
+import java.util.List;
+
+public interface ChildListener {
+
+ void childChanged(String path, List<String> children);
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/Constants.java b/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/Constants.java
new file mode 100644
index 0000000..3450bb5
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/Constants.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.remoting.etcd;
+
+import static org.apache.dubbo.remoting.Constants.DEFAULT_IO_THREADS;
+
+public interface Constants {
+ String ETCD3_NOTIFY_MAXTHREADS_KEYS = "etcd3.notify.maxthreads";
+
+ int DEFAULT_ETCD3_NOTIFY_THREADS = DEFAULT_IO_THREADS;
+
+ String DEFAULT_ETCD3_NOTIFY_QUEUES_KEY = "etcd3.notify.queues";
+
+ int DEFAULT_GRPC_QUEUES = 300_0000;
+
+ String RETRY_PERIOD_KEY = "retry.period";
+
+ int DEFAULT_RETRY_PERIOD = 5 * 1000;
+
+ int DEFAULT_SESSION_TIMEOUT = 60 * 1000;
+
+ String HTTP_SUBFIX_KEY = "://";
+
+ String HTTP_KEY = "http://";
+
+ int DEFAULT_KEEPALIVE_TIMEOUT = DEFAULT_SESSION_TIMEOUT / 2;
+
+ String SESSION_TIMEOUT_KEY = "session";
+
+ int DEFAULT_RECONNECT_PERIOD = 3 * 1000;
+
+ String ROUTERS_CATEGORY = "routers";
+
+ String PROVIDERS_CATEGORY = "providers";
+
+ String CONSUMERS_CATEGORY = "consumers";
+
+ String CONFIGURATORS_CATEGORY = "configurators";
+}
+
diff --git a/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/EtcdClient.java b/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/EtcdClient.java
new file mode 100644
index 0000000..45b54b0
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/EtcdClient.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.etcd;
+
+import org.apache.dubbo.common.URL;
+
+import java.util.List;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public interface EtcdClient {
+
+ /**
+ * save the specified path to the etcd registry.
+ *
+ * @param path the path to be saved
+ */
+ void create(String path);
+
+ /**
+ * save the specified path to the etcd registry.
+ * if node disconnect from etcd, it will be deleted
+ * automatically by etcd when session timeout.
+ *
+ * @param path the path to be saved
+ * @return the lease of current path.
+ */
+ long createEphemeral(String path);
+
+ /**
+ * remove the specified from etcd registry.
+ *
+ * @param path the path to be removed
+ */
+ void delete(String path);
+
+ /**
+ * find direct children directory, excluding path self,
+ * Never return null.
+ *
+ * @param path the path to be found direct children.
+ * @return direct children directory, contains zero element
+ * list if children directory not exists.
+ */
+ List<String> getChildren(String path);
+
+ /**
+ * register children listener for specified path.
+ *
+ * @param path the path to be watched when children is added, delete or update.
+ * @param listener when children is changed , listener will be triggered.
+ * @return direct children directory, contains zero element
+ * list if children directory not exists.
+ */
+ List<String> addChildListener(String path, ChildListener listener);
+
+ /**
+ * find watcher of the children listener for specified path.
+ *
+ * @param path the path to be watched when children is added, delete or update.
+ * @param listener when children is changed , listener will be triggered.
+ * @return watcher if find else null
+ */
+ <T> T getChildListener(String path, ChildListener listener);
+
+ /**
+ * unregister children lister for specified path.
+ *
+ * @param path the path to be unwatched .
+ * @param listener when children is changed , lister will be triggered.
+ */
+ void removeChildListener(String path, ChildListener listener);
+
+ /**
+ * support connection notify if connection state was changed.
+ *
+ * @param listener if state changed, listener will be triggered.
+ */
+ void addStateListener(StateListener listener);
+
+ /**
+ * remove connection notify if connection state was changed.
+ *
+ * @param listener remove already registered listener, if listener
+ * not exists nothing happened.
+ */
+ void removeStateListener(StateListener listener);
+
+ /**
+ * test if current client is active.
+ *
+ * @return true if connection is active else false.
+ */
+ boolean isConnected();
+
+ /**
+ * close current client and release all resourses.
+ */
+ void close();
+
+ URL getUrl();
+
+ /***
+ * create new lease from specified second ,it should be waiting if failed.<p>
+ *
+ * @param second lease time (support second only).
+ * @return lease id from etcd
+ */
+ long createLease(long second);
+
+ /***
+ * create new lease from specified ttl second before waiting specified timeout.<p>
+ *
+ * @param ttl lease time (support second only).
+ * @param timeout the maximum time to wait
+ * @param unit the time unit of the timeout argument
+ * @throws CancellationException if this future was cancelled
+ * @throws ExecutionException if this future completed exceptionally
+ * @throws InterruptedException if the current thread was interrupted
+ * while waiting
+ * @throws TimeoutException if the wait timed out
+ * @return lease id from etcd
+ */
+ long createLease(long ttl, long timeout, TimeUnit unit)
+ throws InterruptedException, ExecutionException, TimeoutException;
+
+ /**
+ * revoke specified lease, any associated path will removed automatically.
+ *
+ * @param lease to be removed lease
+ */
+ void revokeLease(long lease);
+
+
+ /**
+ * Get the value of the specified key.
+ * @param key the specified key
+ * @return null if the value is not found
+ */
+ String getKVValue(String key);
+
+ /**
+ * Put the key value pair to etcd
+ * @param key the specified key
+ * @param value the paired value
+ * @return true if put success
+ */
+ boolean put(String key, String value);
+
+ /**
+ * Put the key value pair to etcd (Ephemeral)
+ * @param key the specified key
+ * @param value the paired value
+ * @return true if put success
+ */
+ boolean putEphemeral(String key, String value);
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/EtcdTransporter.java b/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/EtcdTransporter.java
new file mode 100644
index 0000000..754ba64
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/EtcdTransporter.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.etcd;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.extension.Adaptive;
+import org.apache.dubbo.common.extension.SPI;
+import org.apache.dubbo.remoting.Constants;
+
+@SPI("jetcd")
+public interface EtcdTransporter {
+
+ @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
+ EtcdClient connect(URL url);
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/RetryPolicy.java b/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/RetryPolicy.java
new file mode 100644
index 0000000..6133ab7
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/RetryPolicy.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.etcd;
+
+public interface RetryPolicy {
+
+ /**
+ * Whether retry is supported when operation fails.
+ *
+ * @param retried the number of times retried so far
+ * @param elapsed the elapsed time in millisecond since the operation was attempted
+ * @param sleep should be sleep
+ * @return true should be retry
+ */
+ boolean shouldRetry(int retried, long elapsed, boolean sleep);
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/StateListener.java b/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/StateListener.java
new file mode 100644
index 0000000..4358083
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/StateListener.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.etcd;
+
+public interface StateListener {
+
+ int DISCONNECTED = 0;
+
+ int CONNECTED = 1;
+
+ void stateChanged(int connected);
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/ConnectionStateListener.java b/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/ConnectionStateListener.java
new file mode 100644
index 0000000..729a157
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/ConnectionStateListener.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.remoting.etcd.jetcd;
+
+import io.etcd.jetcd.Client;
+
+public interface ConnectionStateListener {
+
+ /**
+ * Called when there is a state change in the connection
+ *
+ * @param client the client
+ * @param newState the new state
+ */
+ void stateChanged(Client client, int newState);
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java b/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java
new file mode 100644
index 0000000..0da56fa
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java
@@ -0,0 +1,473 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.remoting.etcd.jetcd;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.ExecutorUtil;
+import org.apache.dubbo.common.utils.NamedThreadFactory;
+import org.apache.dubbo.remoting.etcd.ChildListener;
+import org.apache.dubbo.remoting.etcd.StateListener;
+import org.apache.dubbo.remoting.etcd.option.OptionUtil;
+import org.apache.dubbo.remoting.etcd.support.AbstractEtcdClient;
+
+import com.google.protobuf.ByteString;
+import io.etcd.jetcd.ByteSequence;
+import io.etcd.jetcd.api.Event;
+import io.etcd.jetcd.api.KeyValue;
+import io.etcd.jetcd.api.WatchCancelRequest;
+import io.etcd.jetcd.api.WatchCreateRequest;
+import io.etcd.jetcd.api.WatchGrpc;
+import io.etcd.jetcd.api.WatchRequest;
+import io.etcd.jetcd.api.WatchResponse;
+import io.etcd.jetcd.common.exception.ClosedClientException;
+import io.grpc.ManagedChannel;
+import io.grpc.Status;
+import io.grpc.stub.StreamObserver;
+import io.netty.util.internal.ConcurrentSet;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.dubbo.common.constants.CommonConstants.PATH_SEPARATOR;
+import static org.apache.dubbo.remoting.etcd.Constants.DEFAULT_ETCD3_NOTIFY_QUEUES_KEY;
+import static org.apache.dubbo.remoting.etcd.Constants.DEFAULT_ETCD3_NOTIFY_THREADS;
+import static org.apache.dubbo.remoting.etcd.Constants.DEFAULT_GRPC_QUEUES;
+import static org.apache.dubbo.remoting.etcd.Constants.DEFAULT_RETRY_PERIOD;
+import static org.apache.dubbo.remoting.etcd.Constants.DEFAULT_SESSION_TIMEOUT;
+import static org.apache.dubbo.remoting.etcd.Constants.ETCD3_NOTIFY_MAXTHREADS_KEYS;
+import static org.apache.dubbo.remoting.etcd.Constants.RETRY_PERIOD_KEY;
+import static org.apache.dubbo.remoting.etcd.jetcd.JEtcdClientWrapper.UTF_8;
+
+/**
+ * etcd3 client.
+ */
+public class JEtcdClient extends AbstractEtcdClient<JEtcdClient.EtcdWatcher> {
+
+ private JEtcdClientWrapper clientWrapper;
+ private ScheduledExecutorService reconnectSchedule;
+
+ private ExecutorService notifyExecutor;
+
+ private int delayPeriod;
+ private Logger logger = LoggerFactory.getLogger(JEtcdClient.class);
+
+ public JEtcdClient(URL url) {
+ super(url);
+ try {
+ clientWrapper = new JEtcdClientWrapper(url);
+ clientWrapper.setConnectionStateListener((client, state) -> {
+ if (state == StateListener.CONNECTED) {
+ JEtcdClient.this.stateChanged(StateListener.CONNECTED);
+ } else if (state == StateListener.DISCONNECTED) {
+ JEtcdClient.this.stateChanged(StateListener.DISCONNECTED);
+ }
+ });
+ delayPeriod = getUrl().getParameter(RETRY_PERIOD_KEY, DEFAULT_RETRY_PERIOD);
+ reconnectSchedule = Executors.newScheduledThreadPool(1,
+ new NamedThreadFactory("etcd3-watch-auto-reconnect"));
+
+ notifyExecutor = new ThreadPoolExecutor(
+ 1
+ , url.getParameter(ETCD3_NOTIFY_MAXTHREADS_KEYS, DEFAULT_ETCD3_NOTIFY_THREADS)
+ , DEFAULT_SESSION_TIMEOUT
+ , TimeUnit.MILLISECONDS
+ , new LinkedBlockingQueue<Runnable>(url.getParameter(DEFAULT_ETCD3_NOTIFY_QUEUES_KEY, DEFAULT_GRPC_QUEUES * 3))
+ , new NamedThreadFactory("etcd3-notify", true));
+
+ clientWrapper.start();
+ } catch (Exception e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void doCreatePersistent(String path) {
+ clientWrapper.createPersistent(path);
+ }
+
+ @Override
+ public long doCreateEphemeral(String path) {
+ return clientWrapper.createEphemeral(path);
+ }
+
+ @Override
+ public boolean checkExists(String path) {
+ return clientWrapper.checkExists(path);
+ }
+
+ @Override
+ public EtcdWatcher createChildWatcherListener(String path, ChildListener listener) {
+ return new EtcdWatcher(listener);
+ }
+
+ @Override
+ public List<String> addChildWatcherListener(String path, EtcdWatcher etcdWatcher) {
+ return etcdWatcher.forPath(path);
+ }
+
+ @Override
+ public void removeChildWatcherListener(String path, EtcdWatcher etcdWatcher) {
+ etcdWatcher.unwatch();
+ }
+
+ @Override
+ public List<String> getChildren(String path) {
+ return clientWrapper.getChildren(path);
+ }
+
+ @Override
+ public boolean isConnected() {
+ return clientWrapper.isConnected();
+ }
+
+ @Override
+ public long createLease(long second) {
+ return clientWrapper.createLease(second);
+ }
+
+ @Override
+ public long createLease(long ttl, long timeout, TimeUnit unit)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ return clientWrapper.createLease(ttl, timeout, unit);
+ }
+
+ @Override
+ public void delete(String path) {
+ clientWrapper.delete(path);
+ }
+
+ @Override
+ public void revokeLease(long lease) {
+ clientWrapper.revokeLease(lease);
+ }
+
+ @Override
+ public void doClose() {
+ try {
+ if (notifyExecutor != null) {
+ ExecutorUtil.shutdownNow(notifyExecutor, 100);
+ }
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ }
+
+ try {
+ if (reconnectSchedule != null) {
+ ExecutorUtil.shutdownNow(reconnectSchedule, 100);
+ }
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ } finally {
+ clientWrapper.doClose();
+ }
+ }
+
+ @Override
+ public String getKVValue(String key) {
+ return clientWrapper.getKVValue(key);
+ }
+
+ @Override
+ public boolean put(String key, String value) {
+ return clientWrapper.put(key, value);
+ }
+
+ @Override
+ public boolean putEphemeral(String key, String value) {
+ return clientWrapper.putEphemeral(key, value);
+ }
+
+ public ManagedChannel getChannel() {
+ return clientWrapper.getChannel();
+ }
+
+ public class EtcdWatcher implements StreamObserver<WatchResponse> {
+
+ protected WatchGrpc.WatchStub watchStub;
+ protected StreamObserver<WatchRequest> watchRequest;
+ protected long watchId;
+ protected String path;
+ protected Throwable throwable;
+ protected volatile Set<String> urls = new ConcurrentSet<>();
+ private ChildListener listener;
+
+ protected ReentrantLock lock = new ReentrantLock(true);
+
+ public EtcdWatcher(ChildListener listener) {
+ this.listener = listener;
+ }
+
+ @Override
+ public void onNext(WatchResponse response) {
+
+ // prevents grpc on sending watchResponse to a closed watch client.
+ if (!isConnected()) {
+ return;
+ }
+
+ watchId = response.getWatchId();
+
+ if (listener != null) {
+ int modified = 0;
+ String service = null;
+ Iterator<Event> iterator = response.getEventsList().iterator();
+ while (iterator.hasNext()) {
+ Event event = iterator.next();
+ switch (event.getType()) {
+ case PUT: {
+ if (((service = find(event)) != null)
+ && safeUpdate(service, true)) {
+ modified++;
+ }
+ break;
+ }
+ case DELETE: {
+ if (((service = find(event)) != null)
+ && safeUpdate(service, false)) {
+ modified++;
+ }
+ break;
+ }
+ default:
+ break;
+ }
+ }
+ if (modified > 0) {
+ notifyExecutor.execute(() -> listener.childChanged(path, new ArrayList<>(urls)));
+ }
+
+ }
+ }
+
+ @Override
+ public void onError(Throwable e) {
+ tryReconnect(e);
+ }
+
+ public void unwatch() {
+
+ // prevents grpc on sending watchResponse to a closed watch client.
+ if (!isConnected()) {
+ return;
+ }
+
+ try {
+ /**
+ * issue : https://github.com/apache/dubbo/issues/4115
+ *
+ * When the network is reconnected, the listener is empty
+ * and the data cannot be received.
+ */
+ // this.listener = null;
+
+ if (watchRequest != null) {
+ WatchCancelRequest watchCancelRequest =
+ WatchCancelRequest.newBuilder().setWatchId(watchId).build();
+ WatchRequest cancelRequest = WatchRequest.newBuilder()
+ .setCancelRequest(watchCancelRequest).build();
+ this.watchRequest.onNext(cancelRequest);
+ }
+ } catch (Exception ignored) {
+ logger.warn("Failed to cancel watch for path '" + path + "'", ignored);
+ }
+ }
+
+ public List<String> forPath(String path) {
+
+ if (!isConnected()) {
+ throw new ClosedClientException("watch client has been closed, path '" + path + "'");
+ }
+ if (this.path != null) {
+ unwatch();
+ }
+
+ this.path = path;
+
+ lock.lock();
+ try {
+
+ this.watchStub = WatchGrpc.newStub(clientWrapper.getChannel());
+ this.watchRequest = watchStub.watch(this);
+ this.watchRequest.onNext(nextRequest());
+
+ List<String> children = clientWrapper.getChildren(path);
+ /**
+ * caching the current service
+ */
+ if (!children.isEmpty()) {
+ this.urls.addAll(filterChildren(children));
+ }
+
+ return new ArrayList<>(urls);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private boolean safeUpdate(String service, boolean add) {
+ lock.lock();
+ try {
+ /**
+ * If the collection already contains the specified service, do nothing
+ */
+ return add ? this.urls.add(service) : this.urls.remove(service);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private String find(Event event) {
+ KeyValue keyValue = event.getKv();
+ String key = keyValue.getKey().toStringUtf8();
+
+ int len = path.length(), index = len, count = 0;
+ if (key.length() >= index) {
+ for (; (index = key.indexOf(PATH_SEPARATOR, index)) != -1; ++index) {
+ if (count++ > 1) {
+ break;
+ }
+ }
+ }
+
+ /**
+ * if children changed , we should refresh invokers
+ */
+ if (count == 1) {
+ /**
+ * remove prefix
+ */
+ return key.substring(len + 1);
+ }
+
+ return null;
+ }
+
+ private List<String> filterChildren(List<String> children) {
+ if (children == null) {
+ return Collections.emptyList();
+ }
+ if (children.size() <= 0) {
+ return children;
+ }
+ final int len = path.length();
+ return children.stream().parallel()
+ .filter(child -> {
+ int index = len, count = 0;
+ if (child.length() > len) {
+ for (; (index = child.indexOf(PATH_SEPARATOR, index)) != -1; ++index) {
+ if (count++ > 1) {
+ break;
+ }
+ }
+ }
+ return count == 1;
+ })
+ .map(child -> child.substring(len + 1))
+ .collect(toList());
+ }
+
+ /**
+ * create new watching request for current path.
+ */
+ protected WatchRequest nextRequest() {
+
+ WatchCreateRequest.Builder builder = WatchCreateRequest.newBuilder()
+ .setKey(ByteString.copyFromUtf8(path))
+ .setRangeEnd(ByteString.copyFrom(
+ OptionUtil.prefixEndOf(ByteSequence.from(path, UTF_8)).getBytes()))
+ .setProgressNotify(true);
+
+ return WatchRequest.newBuilder().setCreateRequest(builder).build();
+ }
+
+ public void tryReconnect(Throwable e) {
+
+ this.throwable = e;
+
+ logger.error("watcher client has error occurred, current path '" + path + "'", e);
+
+ // prevents grpc on sending error to a closed watch client.
+ if (!isConnected()) {
+ return;
+ }
+
+
+ Status status = Status.fromThrowable(e);
+ // system may be recover later, current connect won't be lost
+ if (OptionUtil.isHaltError(status) || OptionUtil.isNoLeaderError(status)) {
+ reconnectSchedule.schedule(this::reconnect, new Random().nextInt(delayPeriod), TimeUnit.MILLISECONDS);
+ return;
+ }
+ // reconnect with a delay; avoiding immediate retry on a long connection downtime.
+ reconnectSchedule.schedule(this::reconnect, new Random().nextInt(delayPeriod), TimeUnit.MILLISECONDS);
+ }
+
+ protected synchronized void reconnect() {
+ this.closeWatchRequest();
+ this.recreateWatchRequest();
+ }
+
+ protected void recreateWatchRequest() {
+ if (watchRequest == null) {
+ this.watchStub = WatchGrpc.newStub(clientWrapper.getChannel());
+ this.watchRequest = watchStub.watch(this);
+ }
+ this.watchRequest.onNext(nextRequest());
+ this.throwable = null;
+ logger.warn("watch client retried connect for path '" + path + "', connection status : " + isConnected());
+ }
+
+ protected void closeWatchRequest() {
+ if (this.watchRequest == null) {
+ return;
+ }
+
+ try {
+ WatchCancelRequest watchCancelRequest =
+ WatchCancelRequest.newBuilder().setWatchId(watchId).build();
+ WatchRequest cancelRequest = WatchRequest.newBuilder()
+ .setCancelRequest(watchCancelRequest).build();
+ watchRequest.onNext(cancelRequest);
+ } finally {
+ this.watchRequest.onCompleted();
+ this.watchRequest = null;
+ }
+ }
+
+ @Override
+ public void onCompleted() {
+ // do not touch this method, if you want terminate this stream.
+ }
+ }
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java b/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java
new file mode 100644
index 0000000..286586f
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java
@@ -0,0 +1,752 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.remoting.etcd.jetcd;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.ConcurrentHashSet;
+import org.apache.dubbo.common.utils.NamedThreadFactory;
+import org.apache.dubbo.common.utils.ReflectUtils;
+import org.apache.dubbo.common.utils.StringUtils;
+import org.apache.dubbo.remoting.etcd.RetryPolicy;
+import org.apache.dubbo.remoting.etcd.StateListener;
+
+import io.etcd.jetcd.ByteSequence;
+import io.etcd.jetcd.Client;
+import io.etcd.jetcd.ClientBuilder;
+import io.etcd.jetcd.KeyValue;
+import io.etcd.jetcd.common.exception.ErrorCode;
+import io.etcd.jetcd.common.exception.EtcdException;
+import io.etcd.jetcd.kv.GetResponse;
+import io.etcd.jetcd.kv.PutResponse;
+import io.etcd.jetcd.lease.LeaseKeepAliveResponse;
+import io.etcd.jetcd.options.GetOption;
+import io.etcd.jetcd.options.PutOption;
+import io.etcd.jetcd.support.CloseableClient;
+import io.etcd.jetcd.support.Observers;
+import io.grpc.ConnectivityState;
+import io.grpc.ManagedChannel;
+import io.grpc.Status;
+import io.grpc.stub.StreamObserver;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.dubbo.common.constants.CommonConstants.COMMA_SEPARATOR;
+import static org.apache.dubbo.common.constants.CommonConstants.PATH_SEPARATOR;
+import static org.apache.dubbo.remoting.etcd.Constants.DEFAULT_KEEPALIVE_TIMEOUT;
+import static org.apache.dubbo.remoting.etcd.Constants.DEFAULT_RECONNECT_PERIOD;
+import static org.apache.dubbo.remoting.etcd.Constants.DEFAULT_RETRY_PERIOD;
+import static org.apache.dubbo.remoting.etcd.Constants.HTTP_KEY;
+import static org.apache.dubbo.remoting.etcd.Constants.HTTP_SUBFIX_KEY;
+import static org.apache.dubbo.remoting.etcd.Constants.RETRY_PERIOD_KEY;
+import static org.apache.dubbo.remoting.etcd.Constants.SESSION_TIMEOUT_KEY;
+
+public class JEtcdClientWrapper {
+
+ private Logger logger = LoggerFactory.getLogger(JEtcdClientWrapper.class);
+
+ private final URL url;
+ private volatile Client client;
+ private volatile boolean started = false;
+ private volatile boolean connectState = false;
+ private ScheduledFuture future;
+ private ScheduledExecutorService reconnectNotify;
+ private AtomicReference<ManagedChannel> channel;
+
+ private ConnectionStateListener connectionStateListener;
+
+ private long expirePeriod;
+
+ private CompletableFuture<Client> completableFuture;
+
+ private RetryPolicy retryPolicy;
+
+ private RuntimeException failed;
+
+ private final ScheduledFuture<?> retryFuture;
+ private final ScheduledExecutorService retryExecutor = Executors.newScheduledThreadPool(1,
+ new NamedThreadFactory("Etcd3RegistryKeepAliveFailedRetryTimer", true));
+
+ private final Set<String> failedRegistered = new ConcurrentHashSet<String>();
+
+ private final Set<String> registeredPaths = new ConcurrentHashSet<>();
+ private volatile CloseableClient keepAlive = null;
+
+ /**
+ * Support temporary nodes to reuse the same lease
+ */
+ private volatile long globalLeaseId;
+
+ private volatile boolean cancelKeepAlive = false;
+
+ public static final Charset UTF_8 = StandardCharsets.UTF_8;
+
+ public JEtcdClientWrapper(URL url) {
+ this.url = url;
+ this.expirePeriod = url.getParameter(SESSION_TIMEOUT_KEY, DEFAULT_KEEPALIVE_TIMEOUT) / 1000;
+ if (expirePeriod <= 0) {
+ this.expirePeriod = DEFAULT_KEEPALIVE_TIMEOUT / 1000;
+ }
+ this.channel = new AtomicReference<>();
+ this.completableFuture = CompletableFuture.supplyAsync(() -> prepareClient(url));
+ this.reconnectNotify = Executors.newScheduledThreadPool(1,
+ new NamedThreadFactory("reconnectNotify", true));
+ this.retryPolicy = new RetryNTimes(1, 1000, TimeUnit.MILLISECONDS);
+
+ this.failed = new IllegalStateException("Etcd3 registry is not connected yet, url:" + url);
+ int retryPeriod = url.getParameter(RETRY_PERIOD_KEY, DEFAULT_RETRY_PERIOD);
+
+ this.retryFuture = retryExecutor.scheduleWithFixedDelay(() -> {
+ try {
+ retry();
+ } catch (Throwable t) {
+ logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t);
+ }
+ }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);
+ }
+
+ private Client prepareClient(URL url) {
+
+ int maxInboundSize = DEFAULT_INBOUND_SIZE;
+ if (StringUtils.isNotEmpty(System.getProperty(GRPC_MAX_INBOUND_SIZE_KEY))) {
+ maxInboundSize = Integer.valueOf(System.getProperty(GRPC_MAX_INBOUND_SIZE_KEY));
+ }
+
+ // TODO, uses default pick-first round robin.
+ ClientBuilder clientBuilder = Client.builder()
+ .endpoints(endPoints(url.getBackupAddress()))
+ .maxInboundMessageSize(maxInboundSize);
+
+ return clientBuilder.build();
+ }
+
+ public Client getClient() {
+ return client;
+ }
+
+ /**
+ * try to get current connected channel.
+ *
+ * @return connected channel.
+ */
+ public ManagedChannel getChannel() {
+ if (channel.get() == null || (channel.get().isShutdown() || channel.get().isTerminated())) {
+ channel.set(newChannel(client));
+ }
+ return channel.get();
+ }
+
+ /**
+ * find direct children directory, excluding path self,
+ * Never return null.
+ *
+ * @param path the path to be found direct children.
+ * @return direct children directory, contains zero element
+ * list if children directory not exists.
+ */
+ public List<String> getChildren(String path) {
+ try {
+ return RetryLoops.invokeWithRetry(
+ () -> {
+ requiredNotNull(client, failed);
+ int len = path.length();
+ return client.getKVClient()
+ .get(ByteSequence.from(path, UTF_8),
+ GetOption.newBuilder().withPrefix(ByteSequence.from(path, UTF_8)).build())
+ .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS)
+ .getKvs().stream().parallel()
+ .filter(pair -> {
+ String key = pair.getKey().toString(UTF_8);
+ int index = len, count = 0;
+ if (key.length() > len) {
+ for (; (index = key.indexOf(PATH_SEPARATOR, index)) != -1; ++index) {
+ if (count++ > 1) {
+ break;
+ }
+ }
+ }
+ return count == 1;
+ })
+ .map(pair -> pair.getKey().toString(UTF_8))
+ .collect(toList());
+ }, retryPolicy);
+ } catch (Exception e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+
+ public boolean isConnected() {
+ return ConnectivityState.READY == (getChannel().getState(false))
+ || ConnectivityState.IDLE == (getChannel().getState(false));
+ }
+
+ public long createLease(long second) {
+ try {
+ return RetryLoops.invokeWithRetry(
+ () -> {
+ requiredNotNull(client, failed);
+ return client.getLeaseClient()
+ .grant(second)
+ .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS)
+ .getID();
+ }, retryPolicy);
+ } catch (Exception e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+
+ public void revokeLease(long lease) {
+ try {
+ RetryLoops.invokeWithRetry(
+ (Callable<Void>) () -> {
+ requiredNotNull(client, failed);
+ client.getLeaseClient()
+ .revoke(lease)
+ .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS);
+ return null;
+ }, retryPolicy);
+ } catch (Exception e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+
+ public long createLease(long ttl, long timeout, TimeUnit unit)
+ throws InterruptedException, ExecutionException, TimeoutException {
+
+ if (timeout <= 0) {
+ return createLease(ttl);
+ }
+
+ requiredNotNull(client, failed);
+ return client.getLeaseClient()
+ .grant(ttl)
+ .get(timeout, unit).getID();
+ }
+
+
+ /**
+ * try to check if path exists.
+ */
+ public boolean checkExists(String path) {
+ try {
+ return RetryLoops.invokeWithRetry(
+ () -> {
+ requiredNotNull(client, failed);
+ return client.getKVClient()
+ .get(ByteSequence.from(path, UTF_8), GetOption.newBuilder().withCountOnly(true).build())
+ .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS)
+ .getCount() > 0;
+ }, retryPolicy);
+ } catch (Exception e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+
+ /**
+ * only internal use only, maybe change in the future
+ */
+ protected Long find(String path) {
+ try {
+ return RetryLoops.invokeWithRetry(
+ () -> {
+ requiredNotNull(client, failed);
+ return client.getKVClient()
+ .get(ByteSequence.from(path, UTF_8))
+ .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS)
+ .getKvs().stream()
+ .mapToLong(keyValue -> Long.valueOf(keyValue.getValue().toString(UTF_8)))
+ .findFirst().getAsLong();
+ }, retryPolicy);
+ } catch (Exception e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+
+ public void createPersistent(String path) {
+ try {
+ RetryLoops.invokeWithRetry(
+ (Callable<Void>) () -> {
+ requiredNotNull(client, failed);
+ client.getKVClient()
+ .put(ByteSequence.from(path, UTF_8),
+ ByteSequence.from(String.valueOf(path.hashCode()), UTF_8))
+ .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS);
+ return null;
+ }, retryPolicy);
+ } catch (Exception e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+
+ /**
+ * create new ephemeral path save to etcd .
+ * if node disconnect from etcd, it will be deleted
+ * automatically by etcd when session timeout.
+ *
+ * @param path the path to be saved
+ * @return the lease of current path.
+ */
+ public long createEphemeral(String path) {
+ try {
+ return RetryLoops.invokeWithRetry(
+ () -> {
+ requiredNotNull(client, failed);
+
+ registeredPaths.add(path);
+ keepAlive();
+ final long leaseId = globalLeaseId;
+ client.getKVClient()
+ .put(ByteSequence.from(path, UTF_8)
+ , ByteSequence.from(String.valueOf(leaseId), UTF_8)
+ , PutOption.newBuilder().withLeaseId(leaseId).build())
+ .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS);
+ return leaseId;
+ }, retryPolicy);
+ } catch (Exception e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+
+ // easy for mock
+ public void keepAlive(long lease) {
+ this.keepAlive(lease, null);
+ }
+
+ @SuppressWarnings("unchecked")
+ private <T> void keepAlive(long lease, Consumer<T> onFailed) {
+ final StreamObserver<LeaseKeepAliveResponse> observer = new Observers.Builder()
+ .onError((e) -> {
+ if (e instanceof EtcdException) {
+ EtcdException error = (EtcdException) e;
+ /**
+ * ttl has expired
+ */
+ if (error.getErrorCode() == ErrorCode.NOT_FOUND) {
+ keepAlive0(onFailed);
+ }
+ }
+ }).onCompleted(() -> {
+ /**
+ * deadline reached.
+ */
+ keepAlive0(onFailed);
+ }).build();
+
+ /**
+ * If there is already a keepalive, cancel first
+ */
+ cancelKeepAlive();
+
+ /**
+ * create and set new keepAlive to globalKeepAliveRef
+ */
+ this.keepAlive = client.getLeaseClient().keepAlive(lease, observer);
+ }
+
+ private void keepAlive() throws Exception {
+ if (keepAlive == null) {
+ synchronized (this) {
+ if (keepAlive == null) {
+ this.globalLeaseId = client.getLeaseClient()
+ .grant(expirePeriod)
+ .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS)
+ .getID();
+ /**
+ * If the keepAlive expires, the registration will be re-attempted
+ */
+ keepAlive(globalLeaseId, (NULL) -> recovery());
+ }
+ }
+ }
+ }
+
+ private <T> void keepAlive0(Consumer<T> onFailed) {
+ if (onFailed != null) {
+
+ /**
+ * The following two scenarios will cause the keep-alive failure:
+ *
+ * 1. Service is offline
+ * 2. Local deadline check expired
+ *
+ * The multiplex lease cannot update the local deadline,
+ * causing the extreme scene service to be dropped.
+ *
+ */
+ long leaseId = globalLeaseId;
+ try {
+ if (logger.isWarnEnabled()) {
+ logger.warn("Failed to keep alive for global lease '" + leaseId + "', waiting for retry again.");
+ }
+ onFailed.accept(null);
+ } catch (Exception ignored) {
+ logger.warn("Failed to recover from global lease expired or lease deadline exceeded. lease '" + leaseId + "'", ignored);
+ }
+ }
+ }
+
+ private void recovery() {
+
+ try {
+ /**
+ * The client is processing reconnection
+ */
+ if (cancelKeepAlive) {
+ return;
+ }
+
+ cancelKeepAlive();
+
+ Set<String> ephemeralPaths = new HashSet<String>(registeredPaths);
+ if (!ephemeralPaths.isEmpty()) {
+ for (String path : ephemeralPaths) {
+ try {
+
+ /**
+ * The client is processing reconnection,
+ * cancel remaining service registration
+ */
+ if (cancelKeepAlive) {
+ return;
+ }
+
+ createEphemeral(path);
+ failedRegistered.remove(path);
+ } catch (Exception e) {
+
+ /**
+ * waiting for retry again
+ */
+ failedRegistered.add(path);
+
+ Status status = Status.fromThrowable(e);
+ if (status.getCode() == Status.Code.NOT_FOUND) {
+ cancelKeepAlive();
+ }
+ }
+ }
+ }
+ } catch (Throwable t) {
+ logger.warn("Unexpected error, failed to recover from global lease expired or deadline exceeded.", t);
+ }
+ }
+
+ public void delete(String path) {
+ try {
+ RetryLoops.invokeWithRetry(
+ (Callable<Void>) () -> {
+ requiredNotNull(client, failed);
+ client.getKVClient()
+ .delete(ByteSequence.from(path, UTF_8))
+ .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS);
+ registeredPaths.remove(path);
+ return null;
+ }, retryPolicy);
+ } catch (Exception e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ } finally {
+ /**
+ * Cancel retry
+ */
+ failedRegistered.remove(path);
+ }
+ }
+
+ public String[] endPoints(String backupAddress) {
+ String[] endpoints = backupAddress.split(COMMA_SEPARATOR);
+ List<String> addresses = Arrays.stream(endpoints)
+ .map(address -> address.contains(HTTP_SUBFIX_KEY)
+ ? address
+ : HTTP_KEY + address)
+ .collect(toList());
+ Collections.shuffle(addresses);
+ return addresses.toArray(new String[0]);
+ }
+
+ /**
+ * because jetcd's connection change callback not supported yet, we must
+ * loop to test if connect or disconnect event happened or not. It will be changed
+ * in the future if we found better choice.
+ */
+ public void start() {
+ if (!started) {
+ try {
+ this.client = completableFuture.get(expirePeriod, TimeUnit.SECONDS);
+ this.connectState = isConnected();
+ this.started = true;
+ } catch (Throwable t) {
+ logger.error("Timeout! etcd3 server can not be connected in : " + expirePeriod + " seconds! url: " + url, t);
+
+ completableFuture.whenComplete((c, e) -> {
+ this.client = c;
+ if (e != null) {
+ logger.error("Got an exception when trying to create etcd3 instance, can not connect to etcd3 server, url: " + url, e);
+ }
+ });
+
+ }
+
+ try {
+ this.future = reconnectNotify.scheduleWithFixedDelay(() -> {
+ boolean connected = isConnected();
+ if (connectState != connected) {
+ int notifyState = connected ? StateListener.CONNECTED : StateListener.DISCONNECTED;
+ if (connectionStateListener != null) {
+ try {
+ if (connected) {
+ clearKeepAlive();
+ }
+ connectionStateListener.stateChanged(getClient(), notifyState);
+ } finally {
+ cancelKeepAlive = false;
+ }
+ }
+ connectState = connected;
+ }
+ }, DEFAULT_RECONNECT_PERIOD, DEFAULT_RECONNECT_PERIOD, TimeUnit.MILLISECONDS);
+ } catch (Throwable t) {
+ logger.error("monitor reconnect status failed.", t);
+ }
+ }
+ }
+
+ private void cancelKeepAlive() {
+ try {
+ if (keepAlive != null) {
+ keepAlive.close();
+ }
+ } finally {
+ // help for gc
+ keepAlive = null;
+ }
+ }
+
+ private void clearKeepAlive() {
+ cancelKeepAlive = true;
+ failedRegistered.clear();
+ cancelKeepAlive();
+ }
+
+ protected void doClose() {
+
+ try {
+ cancelKeepAlive = true;
+ if (globalLeaseId != 0) {
+ revokeLease(this.globalLeaseId);
+ }
+ } catch (Exception e) {
+ logger.warn("revoke global lease '" + globalLeaseId + "' failed, registry: " + url, e);
+ }
+
+ try {
+ if (started && future != null) {
+ started = false;
+ future.cancel(true);
+ reconnectNotify.shutdownNow();
+ }
+ } catch (Exception e) {
+ logger.warn("stop reconnect Notify failed, registry: " + url, e);
+ }
+
+ try {
+ retryFuture.cancel(true);
+ retryExecutor.shutdownNow();
+ } catch (Throwable t) {
+ logger.warn(t.getMessage(), t);
+ }
+
+ if (getClient() != null) {
+ getClient().close();
+ }
+ }
+
+ /**
+ * try get client's shared channel, because all fields is private on jetcd,
+ * we must using it by reflect, in the future, jetcd may provider better tools.
+ *
+ * @param client get channel from current client
+ * @return current connection channel
+ */
+ private ManagedChannel newChannel(Client client) {
+ try {
+ Field connectionField = client.getClass().getDeclaredField("connectionManager");
+ connectionField.setAccessible(true);
+ Object connection = connectionField.get(client);
+ Method channel = connection.getClass().getDeclaredMethod("getChannel");
+ ReflectUtils.makeAccessible(channel);
+ return (ManagedChannel) channel.invoke(connection);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to obtain connection channel from " + url.getBackupAddress(), e);
+ }
+ }
+
+ public ConnectionStateListener getConnectionStateListener() {
+ return connectionStateListener;
+ }
+
+ public void setConnectionStateListener(ConnectionStateListener connectionStateListener) {
+ this.connectionStateListener = connectionStateListener;
+ }
+
+ public static void requiredNotNull(Object obj, RuntimeException exception) {
+ if (obj == null) {
+ throw exception;
+ }
+ }
+
+ public String getKVValue(String key) {
+ if (null == key) {
+ return null;
+ }
+
+ CompletableFuture<GetResponse> responseFuture = this.client.getKVClient().get(ByteSequence.from(key, UTF_8));
+
+ try {
+ List<KeyValue> result = responseFuture.get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS).getKvs();
+ if (!result.isEmpty()) {
+ return result.get(0).getValue().toString(UTF_8);
+ }
+ } catch (Exception e) {
+ // ignore
+ }
+
+ return null;
+ }
+
+
+ public boolean put(String key, String value) {
+ if (key == null || value == null) {
+ return false;
+ }
+ CompletableFuture<PutResponse> putFuture =
+ this.client.getKVClient().put(ByteSequence.from(key, UTF_8), ByteSequence.from(value, UTF_8));
+ try {
+ putFuture.get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS);
+ return true;
+ } catch (Exception e) {
+ // ignore
+ }
+ return false;
+ }
+
+ public boolean putEphemeral(final String key, String value) {
+ try {
+ return RetryLoops.invokeWithRetry(
+ () -> {
+ requiredNotNull(client, failed);
+ // recovery an retry
+ keepAlive();
+ final long leaseId = globalLeaseId;
+ client.getKVClient()
+ .put(ByteSequence.from(key, UTF_8)
+ , ByteSequence.from(String.valueOf(value), UTF_8)
+ , PutOption.newBuilder().withLeaseId(leaseId).build())
+ .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS);
+ return true;
+ }, retryPolicy);
+ } catch (Exception e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+
+ private void retry() {
+ if (!failedRegistered.isEmpty()) {
+ Set<String> failed = new HashSet<String>(failedRegistered);
+ if (!failed.isEmpty()) {
+
+ if (cancelKeepAlive) {
+ return;
+ }
+
+ if (logger.isWarnEnabled()) {
+ logger.warn("Retry failed register(keep alive) for path '" + failed
+ + "', path size: " + failed.size());
+ }
+ try {
+ for (String path : failed) {
+ try {
+
+ /**
+ * Is it currently reconnecting ?
+ */
+ if (cancelKeepAlive) {
+ return;
+ }
+
+ createEphemeral(path);
+ failedRegistered.remove(path);
+ } catch (Throwable e) {
+
+ failedRegistered.add(path);
+
+ Status status = Status.fromThrowable(e);
+ if (status.getCode() == Status.Code.NOT_FOUND) {
+ cancelKeepAlive();
+ }
+
+ logger.warn("Failed to retry register(keep alive) for path '" + path + "', waiting for again, cause: " + e.getMessage(), e);
+ }
+ }
+ } catch (Throwable t) {
+ logger.warn("Failed to retry register(keep alive) for path '" + failed + "', waiting for again, cause: " + t.getMessage(), t);
+ }
+ }
+ }
+ }
+
+ /**
+ * default request timeout
+ */
+ public static final long DEFAULT_REQUEST_TIMEOUT = obtainRequestTimeout();
+
+ public static final int DEFAULT_INBOUND_SIZE = 100 * 1024 * 1024;
+
+ public static final String GRPC_MAX_INBOUND_SIZE_KEY = "grpc.max.inbound.size";
+
+ public static final String ETCD_REQUEST_TIMEOUT_KEY = "etcd.request.timeout";
+
+ private static int obtainRequestTimeout() {
+ if (StringUtils.isNotEmpty(System.getProperty(ETCD_REQUEST_TIMEOUT_KEY))) {
+ return Integer.valueOf(System.getProperty(ETCD_REQUEST_TIMEOUT_KEY));
+ }
+ /**
+ * 10 seconds.
+ */
+ return 10 * 1000;
+ }
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdTransporter.java b/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdTransporter.java
new file mode 100644
index 0000000..5ddec8e
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdTransporter.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.etcd.jetcd;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.remoting.etcd.EtcdClient;
+import org.apache.dubbo.remoting.etcd.EtcdTransporter;
+
+public class JEtcdTransporter implements EtcdTransporter {
+
+ @Override
+ public EtcdClient connect(URL url) {
+ return new JEtcdClient(url);
+ }
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/RetryLoops.java b/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/RetryLoops.java
new file mode 100644
index 0000000..409d243
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/RetryLoops.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.etcd.jetcd;
+
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.remoting.etcd.RetryPolicy;
+import org.apache.dubbo.remoting.etcd.option.OptionUtil;
+
+import io.grpc.Status;
+
+import java.util.concurrent.Callable;
+
+public class RetryLoops {
+
+ private final long startTimeMs = System.currentTimeMillis();
+ private boolean isDone = false;
+ private int retriedCount = 0;
+ private Logger logger = LoggerFactory.getLogger(RetryLoops.class);
+
+ public static <R> R invokeWithRetry(Callable<R> task, RetryPolicy retryPolicy) throws Exception {
+ R result = null;
+ RetryLoops retryLoop = new RetryLoops();
+ while (retryLoop.shouldContinue()) {
+ try {
+ result = task.call();
+ retryLoop.complete();
+ } catch (Exception e) {
+ retryLoop.fireException(e, retryPolicy);
+ }
+ }
+ return result;
+ }
+
+ public void fireException(Exception e, RetryPolicy retryPolicy) throws Exception {
+
+ if (e instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
+
+ boolean rethrow = true;
+ if (isRetryException(e)
+ && retryPolicy.shouldRetry(retriedCount++, System.currentTimeMillis() - startTimeMs, true)) {
+ rethrow = false;
+ }
+
+ if (rethrow) {
+ throw e;
+ }
+ }
+
+ private boolean isRetryException(Throwable e) {
+ Status status = Status.fromThrowable(e);
+ if (OptionUtil.isRecoverable(status)) {
+ return true;
+ }
+
+ return false;
+ }
+
+ public boolean shouldContinue() {
+ return !isDone;
+ }
+
+ public void complete() {
+ isDone = true;
+ }
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/RetryNTimes.java b/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/RetryNTimes.java
new file mode 100644
index 0000000..7453208
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/RetryNTimes.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.etcd.jetcd;
+
+import org.apache.dubbo.remoting.etcd.AbstractRetryPolicy;
+
+import java.util.concurrent.TimeUnit;
+
+public class RetryNTimes extends AbstractRetryPolicy {
+
+ private final long sleepMilliseconds;
+
+ public RetryNTimes(int maxRetried, int sleepTime, TimeUnit unit) {
+ super(maxRetried);
+ this.sleepMilliseconds = unit.convert(sleepTime, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ protected long getSleepTime(int retried, long elapsed) {
+ return sleepMilliseconds;
+ }
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/option/OptionUtil.java b/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/option/OptionUtil.java
new file mode 100644
index 0000000..fa5955c
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/option/OptionUtil.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.etcd.option;
+
+import io.etcd.jetcd.ByteSequence;
+import io.grpc.Status;
+import io.netty.handler.codec.http2.Http2Exception;
+
+public class OptionUtil {
+
+ public static final byte[] NO_PREFIX_END = {0};
+
+ public static final ByteSequence prefixEndOf(ByteSequence prefix) {
+ byte[] endKey = prefix.getBytes().clone();
+ if (prefix.size() > 0) {
+ endKey[endKey.length - 1] = (byte) (endKey[endKey.length - 1] + 1);
+ return ByteSequence.from(endKey);
+ }
+ return ByteSequence.from(NO_PREFIX_END);
+ }
+
+ public static boolean isRecoverable(Status status) {
+ return isHaltError(status)
+ || isNoLeaderError(status)
+ // ephemeral is expired
+ || status.getCode() == Status.Code.NOT_FOUND;
+ }
+
+ public static boolean isHaltError(Status status) {
+ // Unavailable codes mean the system will be right back.
+ // (e.g., can't connect, lost leader)
+ // Treat Internal codes as if something failed, leaving the
+ // system in an inconsistent state, but retrying could make progress.
+ // (e.g., failed in middle of send, corrupted frame)
+ return status.getCode() != Status.Code.UNAVAILABLE && status.getCode() != Status.Code.INTERNAL;
+ }
+
+ public static boolean isNoLeaderError(Status status) {
+ return status.getCode() == Status.Code.UNAVAILABLE
+ && "etcdserver: no leader".equals(status.getDescription());
+ }
+
+ public static boolean isProtocolError(Throwable e) {
+ if (e == null) {
+ return false;
+ }
+ Throwable cause = e.getCause();
+ while (cause != null) {
+ if (cause instanceof Http2Exception) {
+ Http2Exception t = (Http2Exception) cause;
+ if ("PROTOCOL_ERROR".equals(t.error().name())) {
+ return true;
+ }
+ }
+ cause = cause.getCause();
+ }
+ return false;
+ }
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/support/AbstractEtcdClient.java b/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/support/AbstractEtcdClient.java
new file mode 100644
index 0000000..430bc54
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/support/AbstractEtcdClient.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.etcd.support;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.ConcurrentHashSet;
+import org.apache.dubbo.common.utils.StringUtils;
+import org.apache.dubbo.remoting.etcd.ChildListener;
+import org.apache.dubbo.remoting.etcd.EtcdClient;
+import org.apache.dubbo.remoting.etcd.StateListener;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import static org.apache.dubbo.common.constants.CommonConstants.PATH_SEPARATOR;
+import static org.apache.dubbo.remoting.etcd.Constants.CONFIGURATORS_CATEGORY;
+import static org.apache.dubbo.remoting.etcd.Constants.CONSUMERS_CATEGORY;
+import static org.apache.dubbo.remoting.etcd.Constants.PROVIDERS_CATEGORY;
+import static org.apache.dubbo.remoting.etcd.Constants.ROUTERS_CATEGORY;
+
+public abstract class AbstractEtcdClient<WatcherListener> implements EtcdClient {
+
+ protected static final Logger logger = LoggerFactory.getLogger(AbstractEtcdClient.class);
+
+ private final URL url;
+
+ private final Set<StateListener> stateListeners = new ConcurrentHashSet<>();
+
+ private final ConcurrentMap<String, ConcurrentMap<ChildListener, WatcherListener>> childListeners = new ConcurrentHashMap<>();
+ private final List<String> categories = Arrays.asList(PROVIDERS_CATEGORY, CONSUMERS_CATEGORY, ROUTERS_CATEGORY,
+ CONFIGURATORS_CATEGORY);
+ private volatile boolean closed = false;
+
+ public AbstractEtcdClient(URL url) {
+ this.url = url;
+ }
+
+ @Override
+ public URL getUrl() {
+ return url;
+ }
+
+ @Override
+ public void create(String path) {
+ String fixedPath = fixNamespace(path);
+ createParentIfAbsent(fixedPath);
+ doCreatePersistent(fixedPath);
+ }
+
+ @Override
+ public long createEphemeral(String path) {
+ String fixedPath = fixNamespace(path);
+ createParentIfAbsent(fixedPath);
+ return doCreateEphemeral(path);
+ }
+
+ @Override
+ public void addStateListener(StateListener listener) {
+ stateListeners.add(listener);
+ }
+
+ @Override
+ public void removeStateListener(StateListener listener) {
+ stateListeners.remove(listener);
+ }
+
+ public Set<StateListener> getSessionListeners() {
+ return stateListeners;
+ }
+
+ @Override
+ public List<String> addChildListener(String path, final ChildListener listener) {
+ ConcurrentMap<ChildListener, WatcherListener> listeners = childListeners.computeIfAbsent(path, k -> new ConcurrentHashMap<>());
+ WatcherListener targetListener = listeners.computeIfAbsent(listener, k -> createChildWatcherListener(path, k));
+ return addChildWatcherListener(path, targetListener);
+ }
+
+ @Override
+ public WatcherListener getChildListener(String path, ChildListener listener) {
+ ConcurrentMap<ChildListener, WatcherListener> listeners = childListeners.get(path);
+ if (listeners == null) {
+ return null;
+ }
+ return listeners.computeIfAbsent(listener, k -> createChildWatcherListener(path, k));
+ }
+
+ @Override
+ public void removeChildListener(String path, ChildListener listener) {
+ ConcurrentMap<ChildListener, WatcherListener> listeners = childListeners.get(path);
+ if (listeners != null) {
+ WatcherListener targetListener = listeners.remove(listener);
+ if (targetListener != null) {
+ removeChildWatcherListener(path, targetListener);
+ }
+ }
+ }
+
+ protected void stateChanged(int state) {
+ for (StateListener sessionListener : getSessionListeners()) {
+ sessionListener.stateChanged(state);
+ }
+ }
+
+ protected String fixNamespace(String path) {
+ if (StringUtils.isEmpty(path)) {
+ throw new IllegalArgumentException("path is required, actual null or ''");
+ }
+ return (path.charAt(0) != '/') ? (PATH_SEPARATOR + path) : path;
+ }
+
+ protected void createParentIfAbsent(String fixedPath) {
+ int i = fixedPath.lastIndexOf('/');
+ if (i > 0) {
+ String parentPath = fixedPath.substring(0, i);
+ if (categories.stream().anyMatch(c -> fixedPath.endsWith(c))) {
+ if (!checkExists(parentPath)) {
+ this.doCreatePersistent(parentPath);
+ }
+ } else if (categories.stream().anyMatch(c -> parentPath.endsWith(c))) {
+ String grandfather = parentPath.substring(0, parentPath.lastIndexOf('/'));
+ if (!checkExists(grandfather)) {
+ this.doCreatePersistent(grandfather);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+ if (closed) {
+ return;
+ }
+ closed = true;
+ try {
+ doClose();
+ } catch (Throwable t) {
+ logger.warn(t.getMessage(), t);
+ }
+ }
+
+ public abstract void doClose();
+
+ public abstract void doCreatePersistent(String path);
+
+ public abstract long doCreateEphemeral(String path);
+
+ @Override
+ public abstract void delete(String path);
+
+ public abstract boolean checkExists(String path);
+
+ public abstract WatcherListener createChildWatcherListener(String path, ChildListener listener);
+
+ public abstract List<String> addChildWatcherListener(String path, WatcherListener listener);
+
+ public abstract void removeChildWatcherListener(String path, WatcherListener listener);
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.etcd.EtcdTransporter b/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.etcd.EtcdTransporter
new file mode 100644
index 0000000..c6ce47a
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.etcd.EtcdTransporter
@@ -0,0 +1 @@
+jetcd=org.apache.dubbo.remoting.etcd.jetcd.JEtcdTransporter
diff --git a/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/test/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientTest.java b/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/test/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientTest.java
new file mode 100644
index 0000000..15c1634
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/test/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientTest.java
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.etcd.jetcd;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.utils.ReflectUtils;
+import org.apache.dubbo.remoting.etcd.ChildListener;
+
+import com.google.protobuf.ByteString;
+import io.etcd.jetcd.ByteSequence;
+import io.etcd.jetcd.Client;
+import io.etcd.jetcd.Watch;
+import io.etcd.jetcd.api.Event;
+import io.etcd.jetcd.api.WatchCancelRequest;
+import io.etcd.jetcd.api.WatchCreateRequest;
+import io.etcd.jetcd.api.WatchGrpc;
+import io.etcd.jetcd.api.WatchRequest;
+import io.etcd.jetcd.api.WatchResponse;
+import io.etcd.jetcd.common.exception.ClosedClientException;
+import io.etcd.jetcd.watch.WatchEvent;
+import io.grpc.ManagedChannel;
+import io.grpc.Status;
+import io.grpc.stub.StreamObserver;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.dubbo.remoting.etcd.Constants.SESSION_TIMEOUT_KEY;
+
+@Disabled
+public class JEtcdClientTest {
+
+ JEtcdClient client;
+
+ @Test
+ public void test_watch_when_create_path() throws InterruptedException {
+
+ String path = "/dubbo/com.alibaba.dubbo.demo.DemoService/providers";
+ String child = "/dubbo/com.alibaba.dubbo.demo.DemoService/providers/demoService1";
+
+ final CountDownLatch notNotified = new CountDownLatch(1);
+
+ ChildListener childListener = (parent, children) -> {
+ Assertions.assertEquals(1, children.size());
+ Assertions.assertEquals(child.substring(child.lastIndexOf("/") + 1), children.get(0));
+ notNotified.countDown();
+ };
+
+ client.addChildListener(path, childListener);
+
+ client.createEphemeral(child);
+ Assertions.assertTrue(notNotified.await(10, TimeUnit.SECONDS));
+
+ client.removeChildListener(path, childListener);
+ client.delete(child);
+ }
+
+ @Test
+ public void test_watch_when_modify() {
+ String path = "/dubbo/config/jetcd-client-unit-test/configurators";
+ String endpoint = "http://127.0.0.1:2379";
+ CountDownLatch latch = new CountDownLatch(1);
+ ByteSequence key = ByteSequence.from(path, UTF_8);
+
+ Watch.Listener listener = Watch.listener(response -> {
+ for (WatchEvent event : response.getEvents()) {
+ Assertions.assertEquals("PUT", event.getEventType().toString());
+ Assertions.assertEquals(path, event.getKeyValue().getKey().toString(UTF_8));
+ Assertions.assertEquals("Hello", event.getKeyValue().getValue().toString(UTF_8));
+ latch.countDown();
+ }
+
+ });
+
+ try (Client client = Client.builder().endpoints(endpoint).build();
+ Watch watch = client.getWatchClient();
+ Watch.Watcher watcher = watch.watch(key, listener)) {
+ // try to modify the key
+ client.getKVClient().put(ByteSequence.from(path, UTF_8), ByteSequence.from("Hello", UTF_8));
+ latch.await();
+ } catch (Exception e) {
+ Assertions.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testWatchWithGrpc() {
+ String path = "/dubbo/config/test_watch_with_grpc/configurators";
+ String endpoint = "http://127.0.0.1:2379";
+ CountDownLatch latch = new CountDownLatch(1);
+ try (Client client = Client.builder().endpoints(endpoint).build()) {
+ ManagedChannel channel = getChannel(client);
+ StreamObserver<WatchRequest> observer = WatchGrpc.newStub(channel).watch(new StreamObserver<WatchResponse>() {
+ @Override
+ public void onNext(WatchResponse response) {
+ for (Event event : response.getEventsList()) {
+ Assertions.assertEquals("PUT", event.getType().toString());
+ Assertions.assertEquals(path, event.getKv().getKey().toString(UTF_8));
+ Assertions.assertEquals("Hello", event.getKv().getValue().toString(UTF_8));
+ latch.countDown();
+ }
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+
+ }
+
+ @Override
+ public void onCompleted() {
+
+ }
+ });
+ WatchCreateRequest.Builder builder = WatchCreateRequest.newBuilder()
+ .setKey(ByteString.copyFrom(path, UTF_8));
+
+ observer.onNext(WatchRequest.newBuilder().setCreateRequest(builder).build());
+
+ // try to modify the key
+ client.getKVClient().put(ByteSequence.from(path, UTF_8), ByteSequence.from("Hello", UTF_8));
+ latch.await(5, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ Assertions.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testCancelWatchWithGrpc() {
+ String path = "/dubbo/config/testCancelWatchWithGrpc/configurators";
+ String endpoint = "http://127.0.0.1:2379";
+ CountDownLatch updateLatch = new CountDownLatch(1);
+ CountDownLatch cancelLatch = new CountDownLatch(1);
+ final AtomicLong watchID = new AtomicLong(-1L);
+ try (Client client = Client.builder().endpoints(endpoint).build()) {
+ ManagedChannel channel = getChannel(client);
+ StreamObserver<WatchRequest> observer = WatchGrpc.newStub(channel).watch(new StreamObserver<WatchResponse>() {
+ @Override
+ public void onNext(WatchResponse response) {
+ watchID.set(response.getWatchId());
+ for (Event event : response.getEventsList()) {
+ Assertions.assertEquals("PUT", event.getType().toString());
+ Assertions.assertEquals(path, event.getKv().getKey().toString(UTF_8));
+ Assertions.assertEquals("Hello", event.getKv().getValue().toString(UTF_8));
+ updateLatch.countDown();
+ }
+ if (response.getCanceled()) {
+ // received the cancel response
+ cancelLatch.countDown();
+ }
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+
+ }
+
+ @Override
+ public void onCompleted() {
+
+ }
+ });
+ // create
+ WatchCreateRequest.Builder builder = WatchCreateRequest.newBuilder()
+ .setKey(ByteString.copyFrom(path, UTF_8));
+
+ // make the grpc call to watch the key
+ observer.onNext(WatchRequest.newBuilder().setCreateRequest(builder).build());
+
+ // try to put the value
+ client.getKVClient().put(ByteSequence.from(path, UTF_8), ByteSequence.from("Hello", UTF_8));
+
+ // response received, latch counts down to zero
+ updateLatch.await();
+
+ WatchCancelRequest watchCancelRequest =
+ WatchCancelRequest.newBuilder().setWatchId(watchID.get()).build();
+ WatchRequest cancelRequest = WatchRequest.newBuilder()
+ .setCancelRequest(watchCancelRequest).build();
+ observer.onNext(cancelRequest);
+
+ // try to put the value
+ client.getKVClient().put(ByteSequence.from(path, UTF_8), ByteSequence.from("Hello world", UTF_8));
+
+ cancelLatch.await();
+ } catch (Exception e) {
+ Assertions.fail(e.getMessage());
+ }
+
+ }
+
+ @Test
+ public void test_watch_when_create_wrong_path() throws InterruptedException {
+
+ String path = "/dubbo/com.alibaba.dubbo.demo.DemoService/providers";
+ String child = "/dubbo/com.alibaba.dubbo.demo.DemoService/routers/demoService1";
+
+ final CountDownLatch notNotified = new CountDownLatch(1);
+
+ ChildListener childListener = (parent, children) -> {
+ Assertions.assertEquals(1, children.size());
+ Assertions.assertEquals(child, children.get(0));
+ notNotified.countDown();
+ };
+
+ client.addChildListener(path, childListener);
+
+ client.createEphemeral(child);
+ Assertions.assertFalse(notNotified.await(1, TimeUnit.SECONDS));
+
+ client.removeChildListener(path, childListener);
+ client.delete(child);
+ }
+
+ @Test
+ public void test_watch_when_delete_path() throws InterruptedException {
+
+ String path = "/dubbo/com.alibaba.dubbo.demo.DemoService/providers";
+ String child = "/dubbo/com.alibaba.dubbo.demo.DemoService/providers/demoService1";
+
+ final CountDownLatch notNotified = new CountDownLatch(1);
+
+ ChildListener childListener = (parent, children) -> {
+ Assertions.assertEquals(0, children.size());
+ notNotified.countDown();
+ };
+
+ client.createEphemeral(child);
+
+ client.addChildListener(path, childListener);
+ client.delete(child);
+
+ Assertions.assertTrue(notNotified.await(10, TimeUnit.SECONDS));
+ client.removeChildListener(path, childListener);
+ }
+
+ @Test
+ public void test_watch_then_unwatch() throws InterruptedException {
+
+ String path = "/dubbo/com.alibaba.dubbo.demo.DemoService/providers";
+ String child = "/dubbo/com.alibaba.dubbo.demo.DemoService/providers/demoService2";
+
+ final CountDownLatch notNotified = new CountDownLatch(1);
+ final CountDownLatch notTwiceNotified = new CountDownLatch(2);
+
+ final Holder notified = new Holder();
+
+ ChildListener childListener = (parent, children) -> {
+ Assertions.assertEquals(1, children.size());
+ Assertions.assertEquals(child.substring(child.lastIndexOf("/") + 1), children.get(0));
+ notNotified.countDown();
+ notTwiceNotified.countDown();
+ notified.getAndIncrease();
+ };
+
+ client.addChildListener(path, childListener);
+
+ client.createEphemeral(child);
+ Assertions.assertTrue(notNotified.await(15, TimeUnit.SECONDS));
+
+ client.removeChildListener(path, childListener);
+ client.delete(child);
+
+ Assertions.assertFalse(notTwiceNotified.await(5, TimeUnit.SECONDS));
+ Assertions.assertEquals(1, notified.value);
+ client.delete(child);
+ }
+
+ @Test
+ public void test_watch_on_unrecoverable_connection() throws InterruptedException {
+
+ String path = "/dubbo/com.alibaba.dubbo.demo.DemoService/providers";
+ JEtcdClient.EtcdWatcher watcher = null;
+ try {
+ ChildListener childListener = (parent, children) -> {
+ Assertions.assertEquals(path, parent);
+ };
+ client.addChildListener(path, childListener);
+ watcher = client.getChildListener(path, childListener);
+ watcher.watchRequest.onError(Status.ABORTED.withDescription("connection error").asRuntimeException());
+
+ watcher.watchRequest.onNext(watcher.nextRequest());
+ } catch (Exception e) {
+ Assertions.assertTrue(e.getMessage().contains("call was cancelled"));
+ }
+ }
+
+ @Test
+ public void test_watch_on_recoverable_connection() throws InterruptedException {
+
+ String path = "/dubbo/com.alibaba.dubbo.demo.DemoService/connection";
+ String child = "/dubbo/com.alibaba.dubbo.demo.DemoService/connection/demoService1";
+
+ final CountDownLatch notNotified = new CountDownLatch(1);
+ final CountDownLatch notTwiceNotified = new CountDownLatch(2);
+ final Holder notified = new Holder();
+ ChildListener childListener = (parent, children) -> {
+ notTwiceNotified.countDown();
+ switch (notified.increaseAndGet()) {
+ case 1: {
+ notNotified.countDown();
+ Assertions.assertEquals(1, children.size());
+ Assertions.assertEquals(child.substring(child.lastIndexOf("/") + 1), children.get(0));
+ break;
+ }
+ case 2: {
+ Assertions.assertEquals(0, children.size());
+ Assertions.assertEquals(path, parent);
+ break;
+ }
+ default:
+ Assertions.fail("two many callback invoked.");
+ }
+ };
+
+ client.addChildListener(path, childListener);
+ client.createEphemeral(child);
+
+ // make sure first time callback successfully
+ Assertions.assertTrue(notNotified.await(15, TimeUnit.SECONDS));
+
+ // connection error causes client to release all resources including current watcher
+ JEtcdClient.EtcdWatcher watcher = client.getChildListener(path, childListener);
+ watcher.onError(Status.UNAVAILABLE.withDescription("temporary connection issue").asRuntimeException());
+
+ // trigger delete after unavailable
+ client.delete(child);
+ Assertions.assertTrue(notTwiceNotified.await(15, TimeUnit.SECONDS));
+
+ client.removeChildListener(path, childListener);
+ }
+
+ @Test
+ public void test_watch_after_client_closed() throws InterruptedException {
+
+ String path = "/dubbo/com.alibaba.dubbo.demo.DemoService/providers";
+ client.close();
+
+ try {
+ client.addChildListener(path, (parent, children) -> {
+ Assertions.assertEquals(path, parent);
+ });
+ } catch (ClosedClientException e) {
+ Assertions.assertEquals("watch client has been closed, path '" + path + "'", e.getMessage());
+ }
+ }
+
+ @BeforeEach
+ public void setUp() {
+ // timeout in 15 seconds.
+ URL url = URL.valueOf("etcd3://127.0.0.1:2379/com.alibaba.dubbo.registry.RegistryService")
+ .addParameter(SESSION_TIMEOUT_KEY, 15000);
+
+ client = new JEtcdClient(url);
+ }
+
+ @AfterEach
+ public void tearDown() {
+ client.close();
+ }
+
+ static class Holder {
+
+ volatile int value;
+
+ synchronized int getAndIncrease() {
+ return value++;
+ }
+
+ synchronized int increaseAndGet() {
+ return ++value;
+ }
+ }
+
+ private ManagedChannel getChannel(Client client) {
+ try {
+ // hack, use reflection to get the shared channel.
+ Field connectionField = client.getClass().getDeclaredField("connectionManager");
+ connectionField.setAccessible(true);
+ Object connection = connectionField.get(client);
+ Method channelMethod = connection.getClass().getDeclaredMethod("getChannel");
+ ReflectUtils.makeAccessible(channelMethod);
+ ManagedChannel channel = (ManagedChannel) channelMethod.invoke(connection);
+ return channel;
+ } catch (Exception e) {
+ return null;
+ }
+ }
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/test/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapperTest.java b/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/test/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapperTest.java
new file mode 100644
index 0000000..337b6de
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/test/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapperTest.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.etcd.jetcd;
+
+import org.apache.dubbo.common.URL;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.LockSupport;
+
+import static org.apache.dubbo.remoting.etcd.Constants.SESSION_TIMEOUT_KEY;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
+
+@Disabled
+public class JEtcdClientWrapperTest {
+
+ JEtcdClientWrapper clientWrapper;
+
+ @Test
+ public void test_path_exists() {
+ String path = "/dubbo/org.apache.dubbo.demo.DemoService/providers";
+ clientWrapper.createPersistent(path);
+ Assertions.assertTrue(clientWrapper.checkExists(path));
+ Assertions.assertFalse(clientWrapper.checkExists(path + "/noneexits"));
+ clientWrapper.delete(path);
+ }
+
+ @Test
+ public void test_create_emerphal_path() {
+ String path = "/dubbo/org.apache.dubbo.demo.DemoService/providers";
+ clientWrapper.createEphemeral(path);
+ Assertions.assertTrue(clientWrapper.checkExists(path));
+ clientWrapper.delete(path);
+ }
+
+ @Test
+ public void test_grant_lease_then_revoke() {
+ long lease = clientWrapper.createLease(1);
+ clientWrapper.revokeLease(lease);
+
+ long newLease = clientWrapper.createLease(1);
+ LockSupport.parkNanos(this, TimeUnit.SECONDS.toNanos(2));
+ // test timeout of lease
+ clientWrapper.revokeLease(newLease);
+ }
+
+ @Test
+ public void test_create_emerphal_path_then_timeout() {
+ String path = "/dubbo/org.apache.dubbo.demo.DemoService/providers";
+
+ URL url = URL.valueOf("etcd3://127.0.0.1:2379/org.apache.dubbo.registry.RegistryService")
+ .addParameter(SESSION_TIMEOUT_KEY, 1000);
+
+ JEtcdClientWrapper saved = clientWrapper;
+
+ try {
+ clientWrapper = spy(new JEtcdClientWrapper(url));
+ clientWrapper.start();
+
+ doAnswer(new Answer() {
+ int timeout;
+
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ LockSupport.parkNanos(this, TimeUnit.SECONDS.toNanos(2));
+ if (timeout++ > 0) {
+ throw new TimeoutException();
+ }
+ return null;
+ }
+ }).when(clientWrapper).keepAlive(anyLong());
+
+ try {
+ clientWrapper.createEphemeral(path);
+ } catch (IllegalStateException ex) {
+ Assertions.assertEquals("failed to create ephereral by path '" + path + "'", ex.getMessage());
+ }
+
+ } finally {
+ clientWrapper.doClose();
+ clientWrapper = saved;
+ }
+ }
+
+ @Test
+ public void test_get_emerphal_children_path() {
+ String path = "/dubbo/org.apache.dubbo.demo.DemoService/providers";
+ String[] children = {
+ "/dubbo/org.apache.dubbo.demo.DemoService/providers/service1"
+ , "/dubbo/org.apache.dubbo.demo.DemoService/providers/service2"
+ , "/dubbo/org.apache.dubbo.demo.DemoService/providers/service3"
+ , "/dubbo/org.apache.dubbo.demo.DemoService/providers/service4"
+ , "/dubbo/org.apache.dubbo.demo.DemoService/providers/service5/exclude"
+ };
+
+ Arrays.stream(children).forEach((child) -> {
+ Assertions.assertFalse(clientWrapper.checkExists(child));
+ clientWrapper.createEphemeral(child);
+ });
+
+ List<String> extected = clientWrapper.getChildren(path);
+
+ Assertions.assertEquals(4, extected.size());
+ extected.stream().forEach((child) -> {
+ boolean found = false;
+ for (int i = 0; i < children.length; ++i) {
+ if (child.equals(children[i])) {
+ found = true;
+ break;
+ }
+ }
+ Assertions.assertTrue(found);
+ clientWrapper.delete(child);
+ });
+ }
+
+ @Test
+ public void test_connect_cluster() {
+ URL url = URL.valueOf("etcd3://127.0.0.1:22379/org.apache.dubbo.registry.RegistryService?backup=127.0.0.1:2379,127.0.0.1:32379");
+ JEtcdClientWrapper clientWrapper = new JEtcdClientWrapper(url);
+ try {
+ clientWrapper.start();
+ String path = "/dubbo/org.apache.dubbo.demo.DemoService/providers";
+ clientWrapper.createEphemeral(path);
+ Assertions.assertTrue(clientWrapper.checkExists(path));
+ Assertions.assertFalse(clientWrapper.checkExists(path + "/noneexits"));
+ clientWrapper.delete(path);
+ } finally {
+ clientWrapper.doClose();
+ }
+ }
+
+ @BeforeEach
+ public void setUp() {
+ URL url = URL.valueOf("etcd3://127.0.0.1:2379/org.apache.dubbo.registry.RegistryService");
+ clientWrapper = new JEtcdClientWrapper(url);
+ clientWrapper.start();
+ }
+
+ @AfterEach
+ public void tearDown() {
+ clientWrapper.doClose();
+ }
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/test/java/org/apache/dubbo/remoting/etcd/jetcd/LeaseTest.java b/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/test/java/org/apache/dubbo/remoting/etcd/jetcd/LeaseTest.java
new file mode 100644
index 0000000..6188984
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-etcd3/src/test/java/org/apache/dubbo/remoting/etcd/jetcd/LeaseTest.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.etcd.jetcd;
+
+import com.google.common.base.Charsets;
+import io.etcd.jetcd.ByteSequence;
+import io.etcd.jetcd.Client;
+import io.etcd.jetcd.KV;
+import io.etcd.jetcd.Lease;
+import io.etcd.jetcd.launcher.EtcdCluster;
+import io.etcd.jetcd.launcher.EtcdClusterFactory;
+import io.etcd.jetcd.lease.LeaseKeepAliveResponse;
+import io.etcd.jetcd.options.PutOption;
+import io.etcd.jetcd.support.CloseableClient;
+import io.etcd.jetcd.support.Observers;
+import io.grpc.stub.StreamObserver;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * @author cvictory ON 2019-08-16
+ */
+@Disabled
+public class LeaseTest {
+
+ private static EtcdCluster cluster;
+
+ private KV kvClient;
+ private Client client;
+ private Lease leaseClient;
+
+ private static final ByteSequence KEY = ByteSequence.from("foo", Charsets.UTF_8);
+ private static final ByteSequence KEY_2 = ByteSequence.from("foo2", Charsets.UTF_8);
+ private static final ByteSequence VALUE = ByteSequence.from("bar", Charsets.UTF_8);
+
+ @BeforeAll
+ public static void beforeClass() {
+ cluster = EtcdClusterFactory.buildCluster("etcd-lease", 3, false);
+ cluster.start();
+ }
+
+ @AfterAll
+ public static void afterClass() {
+ cluster.close();
+ }
+
+ @BeforeEach
+ public void setUp() {
+ client = Client.builder().endpoints(cluster.getClientEndpoints()).build();
+ kvClient = client.getKVClient();
+ leaseClient = client.getLeaseClient();
+ }
+
+ @AfterEach
+ public void tearDown() {
+ if (client != null) {
+ client.close();
+ }
+
+ }
+
+ @Test
+ public void testGrant() throws Exception {
+ long leaseID = leaseClient.grant(5).get().getID();
+
+ kvClient.put(KEY, VALUE, PutOption.newBuilder().withLeaseId(leaseID).build()).get();
+ assertThat(kvClient.get(KEY).get().getCount()).isEqualTo(1);
+
+ Thread.sleep(6000);
+ assertThat(kvClient.get(KEY).get().getCount()).isEqualTo(0);
+ }
+
+ @Test
+ public void testRevoke() throws Exception {
+ long leaseID = leaseClient.grant(5).get().getID();
+ kvClient.put(KEY, VALUE, PutOption.newBuilder().withLeaseId(leaseID).build()).get();
+ assertThat(kvClient.get(KEY).get().getCount()).isEqualTo(1);
+ leaseClient.revoke(leaseID).get();
+ assertThat(kvClient.get(KEY).get().getCount()).isEqualTo(0);
+ }
+
+ @Test
+ public void testKeepAliveOnce() throws ExecutionException, InterruptedException {
+ long leaseID = leaseClient.grant(2).get().getID();
+ kvClient.put(KEY, VALUE, PutOption.newBuilder().withLeaseId(leaseID).build()).get();
+ assertThat(kvClient.get(KEY).get().getCount()).isEqualTo(1);
+ LeaseKeepAliveResponse rp = leaseClient.keepAliveOnce(leaseID).get();
+ assertThat(rp.getTTL()).isGreaterThan(0);
+ }
+
+ @Test
+ public void testKeepAlive() throws ExecutionException, InterruptedException {
+ long leaseID = leaseClient.grant(2).get().getID();
+ kvClient.put(KEY, VALUE, PutOption.newBuilder().withLeaseId(leaseID).build()).get();
+ assertThat(kvClient.get(KEY).get().getCount()).isEqualTo(1);
+
+ CountDownLatch latch = new CountDownLatch(1);
+ AtomicReference<LeaseKeepAliveResponse> responseRef = new AtomicReference<>();
+ StreamObserver<LeaseKeepAliveResponse> observer = Observers.observer(response -> {
+ responseRef.set(response);
+ latch.countDown();
+ });
+
+ try (CloseableClient c = leaseClient.keepAlive(leaseID, observer)) {
+ latch.await(5, TimeUnit.SECONDS);
+ LeaseKeepAliveResponse response = responseRef.get();
+ assertThat(response.getTTL()).isGreaterThan(0);
+ }
+
+ Thread.sleep(3000);
+ assertThat(kvClient.get(KEY).get().getCount()).isEqualTo(0);
+ }
+
+}
diff --git a/dubbo-remoting-extensions/pom.xml b/dubbo-remoting-extensions/dubbo-remoting-grizzly/pom.xml
similarity index 54%
copy from dubbo-remoting-extensions/pom.xml
copy to dubbo-remoting-extensions/dubbo-remoting-grizzly/pom.xml
index 6a6b72a..ce69f0e 100644
--- a/dubbo-remoting-extensions/pom.xml
+++ b/dubbo-remoting-extensions/dubbo-remoting-grizzly/pom.xml
@@ -1,4 +1,3 @@
-<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
@@ -15,24 +14,29 @@
See the License for the specific language governing permissions and
limitations under the License.
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<parent>
+ <artifactId>dubbo-remoting-extensions</artifactId>
<groupId>org.apache.dubbo.extensions</groupId>
- <artifactId>extensions-parent</artifactId>
<version>${revision}</version>
<relativePath>../pom.xml</relativePath>
</parent>
- <modelVersion>4.0.0</modelVersion>
-
- <artifactId>dubbo-remoting-extensions</artifactId>
- <version>${revision}</version>
- <packaging>pom</packaging>
-
- <modules>
- <module>dubbo-remoting-quic</module>
- </modules>
+ <modelVersion>4.0.0</modelVersion>
+ <version>1.0.0-SNAPSHOT</version>
+ <artifactId>dubbo-remoting-grizzly</artifactId>
+ <packaging>jar</packaging>
+ <name>${project.artifactId}</name>
+ <description>The grizzly remoting module of dubbo project</description>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.dubbo</groupId>
+ <artifactId>dubbo-remoting-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.glassfish.grizzly</groupId>
+ <artifactId>grizzly-core</artifactId>
+ </dependency>
+ </dependencies>
</project>
diff --git a/dubbo-remoting-extensions/dubbo-remoting-grizzly/src/main/java/org/apache/dubbo/remoting/transport/grizzly/GrizzlyChannel.java b/dubbo-remoting-extensions/dubbo-remoting-grizzly/src/main/java/org/apache/dubbo/remoting/transport/grizzly/GrizzlyChannel.java
new file mode 100644
index 0000000..b8f1736
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-grizzly/src/main/java/org/apache/dubbo/remoting/transport/grizzly/GrizzlyChannel.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.transport.grizzly;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.remoting.ChannelHandler;
+import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.transport.AbstractChannel;
+
+import org.glassfish.grizzly.Connection;
+import org.glassfish.grizzly.Grizzly;
+import org.glassfish.grizzly.GrizzlyFuture;
+import org.glassfish.grizzly.attributes.Attribute;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_TIMEOUT;
+import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY;
+
+/**
+ * GrizzlyChannel
+ *
+ *
+ */
+final class GrizzlyChannel extends AbstractChannel {
+
+ private static final Logger logger = LoggerFactory.getLogger(GrizzlyChannel.class);
+
+ private static final String CHANNEL_KEY = GrizzlyChannel.class.getName() + ".CHANNEL";
+
+ private static final Attribute<GrizzlyChannel> ATTRIBUTE = Grizzly.DEFAULT_ATTRIBUTE_BUILDER.createAttribute(CHANNEL_KEY);
+
+ private final Connection<?> connection;
+
+ /**
+ * @param connection
+ * @param url
+ * @param handler
+ */
+ private GrizzlyChannel(Connection<?> connection, URL url, ChannelHandler handler) {
+ super(url, handler);
+ if (connection == null) {
+ throw new IllegalArgumentException("grizzly connection == null");
+ }
+ this.connection = connection;
+ }
+
+ static GrizzlyChannel getOrAddChannel(Connection<?> connection, URL url, ChannelHandler handler) {
+ if (connection == null) {
+ return null;
+ }
+ GrizzlyChannel ret = ATTRIBUTE.get(connection);
+ if (ret == null) {
+ ret = new GrizzlyChannel(connection, url, handler);
+ if (connection.isOpen()) {
+ ATTRIBUTE.set(connection, ret);
+ }
+ }
+ return ret;
+ }
+
+ static void removeChannelIfDisconnected(Connection<?> connection) {
+ if (connection != null && !connection.isOpen()) {
+ ATTRIBUTE.remove(connection);
+ }
+ }
+
+ @Override
+ public InetSocketAddress getRemoteAddress() {
+ return (InetSocketAddress) connection.getPeerAddress();
+ }
+
+ @Override
+ public boolean isConnected() {
+ return connection.isOpen();
+ }
+
+ @Override
+ public InetSocketAddress getLocalAddress() {
+ return (InetSocketAddress) connection.getLocalAddress();
+ }
+
+ @Override
+ @SuppressWarnings("rawtypes")
+ public void send(Object message, boolean sent) throws RemotingException {
+ super.send(message, sent);
+
+ int timeout = 0;
+ try {
+ GrizzlyFuture future = connection.write(message);
+ if (sent) {
+ timeout = getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
+ future.get(timeout, TimeUnit.MILLISECONDS);
+ }
+ } catch (TimeoutException e) {
+ throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
+ + "in timeout(" + timeout + "ms) limit", e);
+ } catch (Throwable e) {
+ throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void close() {
+ try {
+ super.close();
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ }
+ try {
+ removeChannelIfDisconnected(connection);
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ }
+ try {
+ if (logger.isInfoEnabled()) {
+ logger.info("Close grizzly channel " + connection);
+ }
+ connection.close();
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public boolean hasAttribute(String key) {
+ return getAttribute(key) == null;
+ }
+
+ @Override
+ public Object getAttribute(String key) {
+ return Grizzly.DEFAULT_ATTRIBUTE_BUILDER.createAttribute(key).get(connection);
+ }
+
+ @Override
+ public void setAttribute(String key, Object value) {
+ Grizzly.DEFAULT_ATTRIBUTE_BUILDER.createAttribute(key).set(connection, value);
+ }
+
+ @Override
+ public void removeAttribute(String key) {
+ Grizzly.DEFAULT_ATTRIBUTE_BUILDER.createAttribute(key).remove(connection);
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((connection == null) ? 0 : connection.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ GrizzlyChannel other = (GrizzlyChannel) obj;
+ if (connection == null) {
+ if (other.connection != null) {
+ return false;
+ }
+ } else if (!connection.equals(other.connection)) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "GrizzlyChannel [connection=" + connection + "]";
+ }
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-grizzly/src/main/java/org/apache/dubbo/remoting/transport/grizzly/GrizzlyClient.java b/dubbo-remoting-extensions/dubbo-remoting-grizzly/src/main/java/org/apache/dubbo/remoting/transport/grizzly/GrizzlyClient.java
new file mode 100644
index 0000000..9c290c7
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-grizzly/src/main/java/org/apache/dubbo/remoting/transport/grizzly/GrizzlyClient.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.transport.grizzly;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.remoting.Channel;
+import org.apache.dubbo.remoting.ChannelHandler;
+import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.transport.AbstractClient;
+
+import org.glassfish.grizzly.Connection;
+import org.glassfish.grizzly.filterchain.FilterChainBuilder;
+import org.glassfish.grizzly.filterchain.TransportFilter;
+import org.glassfish.grizzly.nio.transport.TCPNIOTransport;
+import org.glassfish.grizzly.nio.transport.TCPNIOTransportBuilder;
+import org.glassfish.grizzly.strategies.SameThreadIOStrategy;
+import org.glassfish.grizzly.threadpool.ThreadPoolConfig;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_TIMEOUT;
+import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY;
+
+/**
+ * GrizzlyClient
+ *
+ *
+ */
+public class GrizzlyClient extends AbstractClient {
+
+ private static final Logger logger = LoggerFactory.getLogger(GrizzlyClient.class);
+
+ private TCPNIOTransport transport;
+
+ private volatile Connection<?> connection; // volatile, please copy reference to use
+
+ public GrizzlyClient(URL url, ChannelHandler handler) throws RemotingException {
+ super(url, handler);
+ }
+
+ @Override
+ protected void doOpen() throws Throwable {
+ FilterChainBuilder filterChainBuilder = FilterChainBuilder.stateless();
+ filterChainBuilder.add(new TransportFilter());
+ filterChainBuilder.add(new GrizzlyCodecAdapter(getCodec(), getUrl(), this));
+ filterChainBuilder.add(new GrizzlyHandler(getUrl(), this));
+ TCPNIOTransportBuilder builder = TCPNIOTransportBuilder.newInstance();
+ ThreadPoolConfig config = builder.getWorkerThreadPoolConfig();
+ config.setPoolName(CLIENT_THREAD_POOL_NAME)
+ .setQueueLimit(-1)
+ .setCorePoolSize(0)
+ .setMaxPoolSize(Integer.MAX_VALUE)
+ .setKeepAliveTime(60L, TimeUnit.SECONDS);
+ builder.setTcpNoDelay(true).setKeepAlive(true)
+ .setConnectionTimeout(getConnectTimeout())
+ .setIOStrategy(SameThreadIOStrategy.getInstance());
+ transport = builder.build();
+ transport.setProcessor(filterChainBuilder.build());
+ transport.start();
+ }
+
+
+ @Override
+ protected void doConnect() throws Throwable {
+ connection = transport.connect(getConnectAddress())
+ .get(getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT), TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ protected void doDisConnect() throws Throwable {
+ try {
+ GrizzlyChannel.removeChannelIfDisconnected(connection);
+ } catch (Throwable t) {
+ logger.warn(t.getMessage());
+ }
+ }
+
+ @Override
+ protected void doClose() throws Throwable {
+ try {
+ transport.stop();
+ } catch (Throwable e) {
+ logger.warn(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ protected Channel getChannel() {
+ Connection<?> c = connection;
+ if (c == null || !c.isOpen()) {
+ return null;
+ }
+ return GrizzlyChannel.getOrAddChannel(c, getUrl(), this);
+ }
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-grizzly/src/main/java/org/apache/dubbo/remoting/transport/grizzly/GrizzlyCodecAdapter.java b/dubbo-remoting-extensions/dubbo-remoting-grizzly/src/main/java/org/apache/dubbo/remoting/transport/grizzly/GrizzlyCodecAdapter.java
new file mode 100644
index 0000000..898d3ce
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-grizzly/src/main/java/org/apache/dubbo/remoting/transport/grizzly/GrizzlyCodecAdapter.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.transport.grizzly;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.remoting.Channel;
+import org.apache.dubbo.remoting.ChannelHandler;
+import org.apache.dubbo.remoting.Codec2;
+import org.apache.dubbo.remoting.buffer.ChannelBuffer;
+import org.apache.dubbo.remoting.buffer.ChannelBuffers;
+import org.apache.dubbo.remoting.buffer.DynamicChannelBuffer;
+
+import org.glassfish.grizzly.Buffer;
+import org.glassfish.grizzly.Connection;
+import org.glassfish.grizzly.filterchain.BaseFilter;
+import org.glassfish.grizzly.filterchain.FilterChainContext;
+import org.glassfish.grizzly.filterchain.NextAction;
+
+import java.io.IOException;
+
+import static org.apache.dubbo.remoting.Constants.BUFFER_KEY;
+import static org.apache.dubbo.remoting.Constants.DEFAULT_BUFFER_SIZE;
+import static org.apache.dubbo.remoting.Constants.MAX_BUFFER_SIZE;
+import static org.apache.dubbo.remoting.Constants.MIN_BUFFER_SIZE;
+
+/**
+ * GrizzlyCodecAdapter
+ */
+public class GrizzlyCodecAdapter extends BaseFilter {
+
+ private final Codec2 codec;
+
+ private final URL url;
+
+ private final ChannelHandler handler;
+
+ private final int bufferSize;
+
+ private ChannelBuffer previousData = ChannelBuffers.EMPTY_BUFFER;
+
+ public GrizzlyCodecAdapter(Codec2 codec, URL url, ChannelHandler handler) {
+ this.codec = codec;
+ this.url = url;
+ this.handler = handler;
+ int b = url.getPositiveParameter(BUFFER_KEY, DEFAULT_BUFFER_SIZE);
+ this.bufferSize = b >= MIN_BUFFER_SIZE && b <= MAX_BUFFER_SIZE ? b : DEFAULT_BUFFER_SIZE;
+ }
+
+ @Override
+ public NextAction handleWrite(FilterChainContext context) throws IOException {
+ Connection<?> connection = context.getConnection();
+ GrizzlyChannel channel = GrizzlyChannel.getOrAddChannel(connection, url, handler);
+ try {
+ ChannelBuffer channelBuffer = ChannelBuffers.dynamicBuffer(1024); // Do not need to close
+
+ Object msg = context.getMessage();
+ codec.encode(channel, channelBuffer, msg);
+
+ GrizzlyChannel.removeChannelIfDisconnected(connection);
+ Buffer buffer = connection.getTransport().getMemoryManager().allocate(channelBuffer.readableBytes());
+ buffer.put(channelBuffer.toByteBuffer());
+ buffer.flip();
+ buffer.allowBufferDispose(true);
+ context.setMessage(buffer);
+ } finally {
+ GrizzlyChannel.removeChannelIfDisconnected(connection);
+ }
+ return context.getInvokeAction();
+ }
+
+ @Override
+ public NextAction handleRead(FilterChainContext context) throws IOException {
+ Object message = context.getMessage();
+ Connection<?> connection = context.getConnection();
+ Channel channel = GrizzlyChannel.getOrAddChannel(connection, url, handler);
+ try {
+ if (message instanceof Buffer) { // receive a new packet
+ Buffer grizzlyBuffer = (Buffer) message; // buffer
+
+ ChannelBuffer frame;
+
+ if (previousData.readable()) {
+ if (previousData instanceof DynamicChannelBuffer) {
+ previousData.writeBytes(grizzlyBuffer.toByteBuffer());
+ frame = previousData;
+ } else {
+ int size = previousData.readableBytes() + grizzlyBuffer.remaining();
+ frame = ChannelBuffers.dynamicBuffer(size > bufferSize ? size : bufferSize);
+ frame.writeBytes(previousData, previousData.readableBytes());
+ frame.writeBytes(grizzlyBuffer.toByteBuffer());
+ }
+ } else {
+ frame = ChannelBuffers.wrappedBuffer(grizzlyBuffer.toByteBuffer());
+ }
+
+ Object msg;
+ int savedReadIndex;
+
+ do {
+ savedReadIndex = frame.readerIndex();
+ try {
+ msg = codec.decode(channel, frame);
+ } catch (Exception e) {
+ previousData = ChannelBuffers.EMPTY_BUFFER;
+ throw new IOException(e.getMessage(), e);
+ }
+ if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
+ frame.readerIndex(savedReadIndex);
+ return context.getStopAction();
+ } else {
+ if (savedReadIndex == frame.readerIndex()) {
+ previousData = ChannelBuffers.EMPTY_BUFFER;
+ throw new IOException("Decode without read data.");
+ }
+ if (msg != null) {
+ context.setMessage(msg);
+ }
+ return context.getInvokeAction();
+ }
+ } while (frame.readable());
+ } else { // Other events are passed down directly
+ return context.getInvokeAction();
+ }
+ } finally {
+ GrizzlyChannel.removeChannelIfDisconnected(connection);
+ }
+ }
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-grizzly/src/main/java/org/apache/dubbo/remoting/transport/grizzly/GrizzlyHandler.java b/dubbo-remoting-extensions/dubbo-remoting-grizzly/src/main/java/org/apache/dubbo/remoting/transport/grizzly/GrizzlyHandler.java
new file mode 100644
index 0000000..f162911
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-grizzly/src/main/java/org/apache/dubbo/remoting/transport/grizzly/GrizzlyHandler.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.transport.grizzly;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.StringUtils;
+import org.apache.dubbo.remoting.ChannelHandler;
+import org.apache.dubbo.remoting.RemotingException;
+
+import org.glassfish.grizzly.Connection;
+import org.glassfish.grizzly.filterchain.BaseFilter;
+import org.glassfish.grizzly.filterchain.FilterChainContext;
+import org.glassfish.grizzly.filterchain.NextAction;
+
+import java.io.IOException;
+
+/**
+ * GrizzlyHandler
+ */
+public class GrizzlyHandler extends BaseFilter {
+
+ private static final Logger logger = LoggerFactory.getLogger(GrizzlyHandler.class);
+
+ private final URL url;
+
+ private final ChannelHandler handler;
+
+ public GrizzlyHandler(URL url, ChannelHandler handler) {
+ this.url = url;
+ this.handler = handler;
+ }
+
+ @Override
+ public NextAction handleConnect(FilterChainContext ctx) throws IOException {
+ Connection<?> connection = ctx.getConnection();
+ GrizzlyChannel channel = GrizzlyChannel.getOrAddChannel(connection, url, handler);
+ try {
+ handler.connected(channel);
+ } catch (RemotingException e) {
+ throw new IOException(StringUtils.toString(e));
+ } finally {
+ GrizzlyChannel.removeChannelIfDisconnected(connection);
+ }
+ return ctx.getInvokeAction();
+ }
+
+ @Override
+ public NextAction handleClose(FilterChainContext ctx) throws IOException {
+ Connection<?> connection = ctx.getConnection();
+ GrizzlyChannel channel = GrizzlyChannel.getOrAddChannel(connection, url, handler);
+ try {
+ handler.disconnected(channel);
+ } catch (RemotingException e) {
+ throw new IOException(StringUtils.toString(e));
+ } finally {
+ GrizzlyChannel.removeChannelIfDisconnected(connection);
+ }
+ return ctx.getInvokeAction();
+ }
+
+ @Override
+ public NextAction handleRead(FilterChainContext ctx) throws IOException {
+ Connection<?> connection = ctx.getConnection();
+ GrizzlyChannel channel = GrizzlyChannel.getOrAddChannel(connection, url, handler);
+ try {
+ handler.received(channel, ctx.getMessage());
+ } catch (RemotingException e) {
+ throw new IOException(StringUtils.toString(e));
+ } finally {
+ GrizzlyChannel.removeChannelIfDisconnected(connection);
+ }
+ return ctx.getInvokeAction();
+ }
+
+ @Override
+ public NextAction handleWrite(FilterChainContext ctx) throws IOException {
+ Connection<?> connection = ctx.getConnection();
+ GrizzlyChannel channel = GrizzlyChannel.getOrAddChannel(connection, url, handler);
+ try {
+ handler.sent(channel, ctx.getMessage());
+ } catch (RemotingException e) {
+ throw new IOException(StringUtils.toString(e));
+ } finally {
+ GrizzlyChannel.removeChannelIfDisconnected(connection);
+ }
+ return ctx.getInvokeAction();
+ }
+
+ @Override
+ public void exceptionOccurred(FilterChainContext ctx, Throwable error) {
+ Connection<?> connection = ctx.getConnection();
+ GrizzlyChannel channel = GrizzlyChannel.getOrAddChannel(connection, url, handler);
+ try {
+ handler.caught(channel, error);
+ } catch (RemotingException e) {
+ logger.error("RemotingException on channel " + channel, e);
+ } finally {
+ GrizzlyChannel.removeChannelIfDisconnected(connection);
+ }
+ }
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-grizzly/src/main/java/org/apache/dubbo/remoting/transport/grizzly/GrizzlyServer.java b/dubbo-remoting-extensions/dubbo-remoting-grizzly/src/main/java/org/apache/dubbo/remoting/transport/grizzly/GrizzlyServer.java
new file mode 100644
index 0000000..b323841
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-grizzly/src/main/java/org/apache/dubbo/remoting/transport/grizzly/GrizzlyServer.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.transport.grizzly;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.NetUtils;
+import org.apache.dubbo.remoting.Channel;
+import org.apache.dubbo.remoting.ChannelHandler;
+import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.transport.AbstractServer;
+
+import org.glassfish.grizzly.filterchain.FilterChainBuilder;
+import org.glassfish.grizzly.filterchain.TransportFilter;
+import org.glassfish.grizzly.nio.transport.TCPNIOTransport;
+import org.glassfish.grizzly.nio.transport.TCPNIOTransportBuilder;
+import org.glassfish.grizzly.strategies.SameThreadIOStrategy;
+import org.glassfish.grizzly.threadpool.ThreadPoolConfig;
+
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_THREADPOOL;
+import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_THREADS;
+import static org.apache.dubbo.common.constants.CommonConstants.THREADPOOL_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.THREADS_KEY;
+
+/**
+ * GrizzlyServer
+ */
+public class GrizzlyServer extends AbstractServer {
+
+ private static final Logger logger = LoggerFactory.getLogger(GrizzlyServer.class);
+
+ private final Map<String, Channel> channels = new ConcurrentHashMap<String, Channel>(); // <ip:port, channel>
+
+ private TCPNIOTransport transport;
+
+ public GrizzlyServer(URL url, ChannelHandler handler) throws RemotingException {
+ super(url, handler);
+ }
+
+ @Override
+ protected void doOpen() throws Throwable {
+ FilterChainBuilder filterChainBuilder = FilterChainBuilder.stateless();
+ filterChainBuilder.add(new TransportFilter());
+ filterChainBuilder.add(new GrizzlyCodecAdapter(getCodec(), getUrl(), this));
+ filterChainBuilder.add(new GrizzlyHandler(getUrl(), this));
+
+ TCPNIOTransportBuilder builder = TCPNIOTransportBuilder.newInstance();
+ ThreadPoolConfig config = ThreadPoolConfig.defaultConfig();
+ config.setPoolName(SERVER_THREAD_POOL_NAME).setQueueLimit(-1);
+ String threadpool = getUrl().getParameter(THREADPOOL_KEY, DEFAULT_THREADPOOL);
+ if (DEFAULT_THREADPOOL.equals(threadpool)) {
+ int threads = getUrl().getPositiveParameter(THREADS_KEY, DEFAULT_THREADS);
+ config.setCorePoolSize(threads).setMaxPoolSize(threads)
+ .setKeepAliveTime(0L, TimeUnit.SECONDS);
+ } else if ("cached".equals(threadpool)) {
+ int threads = getUrl().getPositiveParameter(THREADS_KEY, Integer.MAX_VALUE);
+ config.setCorePoolSize(0).setMaxPoolSize(threads)
+ .setKeepAliveTime(60L, TimeUnit.SECONDS);
+ } else {
+ throw new IllegalArgumentException("Unsupported threadpool type " + threadpool);
+ }
+ builder.setWorkerThreadPoolConfig(config)
+ .setKeepAlive(true)
+ .setReuseAddress(false)
+ .setIOStrategy(SameThreadIOStrategy.getInstance());
+ transport = builder.build();
+ transport.setProcessor(filterChainBuilder.build());
+ transport.bind(getBindAddress());
+ transport.start();
+ }
+
+ @Override
+ protected void doClose() throws Throwable {
+ try {
+ transport.stop();
+ } catch (Throwable e) {
+ logger.warn(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public boolean isBound() {
+ return !transport.isStopped();
+ }
+
+ @Override
+ public Collection<Channel> getChannels() {
+ return channels.values();
+ }
+
+ @Override
+ public Channel getChannel(InetSocketAddress remoteAddress) {
+ return channels.get(NetUtils.toAddressString(remoteAddress));
+ }
+
+ @Override
+ public void connected(Channel ch) throws RemotingException {
+ channels.put(NetUtils.toAddressString(ch.getRemoteAddress()), ch);
+ super.connected(ch);
+ }
+
+ @Override
+ public void disconnected(Channel ch) throws RemotingException {
+ channels.remove(NetUtils.toAddressString(ch.getRemoteAddress()));
+ super.disconnected(ch);
+ }
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-grizzly/src/main/java/org/apache/dubbo/remoting/transport/grizzly/GrizzlyTransporter.java b/dubbo-remoting-extensions/dubbo-remoting-grizzly/src/main/java/org/apache/dubbo/remoting/transport/grizzly/GrizzlyTransporter.java
new file mode 100644
index 0000000..e0a4e59
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-grizzly/src/main/java/org/apache/dubbo/remoting/transport/grizzly/GrizzlyTransporter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.transport.grizzly;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.remoting.ChannelHandler;
+import org.apache.dubbo.remoting.Client;
+import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.RemotingServer;
+import org.apache.dubbo.remoting.Transporter;
+
+/**
+ * GrizzlyTransporter
+ */
+public class GrizzlyTransporter implements Transporter {
+
+ public static final String NAME = "grizzly";
+
+ @Override
+ public RemotingServer bind(URL url, ChannelHandler handler) throws RemotingException {
+ return new GrizzlyServer(url, handler);
+ }
+
+ @Override
+ public Client connect(URL url, ChannelHandler handler) throws RemotingException {
+ return new GrizzlyClient(url, handler);
+ }
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-grizzly/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.Transporter b/dubbo-remoting-extensions/dubbo-remoting-grizzly/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.Transporter
new file mode 100644
index 0000000..d276e79
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-grizzly/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.Transporter
@@ -0,0 +1 @@
+grizzly=org.apache.dubbo.remoting.transport.grizzly.GrizzlyTransporter
diff --git a/dubbo-remoting-extensions/dubbo-remoting-grizzly/src/test/java/org/apache/dubbo/remoting/transport/grizzly/GrizzlyTransporterTest.java b/dubbo-remoting-extensions/dubbo-remoting-grizzly/src/test/java/org/apache/dubbo/remoting/transport/grizzly/GrizzlyTransporterTest.java
new file mode 100644
index 0000000..dbd593e
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-grizzly/src/test/java/org/apache/dubbo/remoting/transport/grizzly/GrizzlyTransporterTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.transport.grizzly;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.utils.NetUtils;
+import org.apache.dubbo.remoting.RemotingServer;
+import org.apache.dubbo.remoting.transport.ChannelHandlerAdapter;
+
+import org.junit.jupiter.api.Test;
+
+import static org.apache.dubbo.remoting.Constants.BIND_PORT_KEY;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+public class GrizzlyTransporterTest {
+ @Test
+ public void shouldAbleToBindGrizzly() throws Exception {
+ int port = NetUtils.getAvailablePort();
+ URL url = new URL("telnet", "localhost", port,
+ new String[]{BIND_PORT_KEY, String.valueOf(port)});
+
+ RemotingServer server = new GrizzlyTransporter().bind(url, new ChannelHandlerAdapter());
+
+ assertThat(server.isBound(), is(true));
+ }
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-mina/pom.xml b/dubbo-remoting-extensions/dubbo-remoting-mina/pom.xml
new file mode 100644
index 0000000..b3f1ec1
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-mina/pom.xml
@@ -0,0 +1,51 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <parent>
+ <artifactId>dubbo-remoting-extensions</artifactId>
+ <groupId>org.apache.dubbo.extensions</groupId>
+ <version>${revision}</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <modelVersion>4.0.0</modelVersion>
+ <version>1.0.0-SNAPSHOT</version>
+ <artifactId>dubbo-remoting-mina</artifactId>
+ <packaging>jar</packaging>
+ <name>${project.artifactId}</name>
+ <description>The mina remoting module of dubbo project</description>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.dubbo</groupId>
+ <artifactId>dubbo-remoting-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.mina</groupId>
+ <artifactId>mina-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.dubbo</groupId>
+ <artifactId>dubbo-serialization-hessian2</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/dubbo-remoting-extensions/dubbo-remoting-mina/src/main/java/org/apache/dubbo/remoting/transport/mina/MinaChannel.java b/dubbo-remoting-extensions/dubbo-remoting-mina/src/main/java/org/apache/dubbo/remoting/transport/mina/MinaChannel.java
new file mode 100644
index 0000000..7229cb1
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-mina/src/main/java/org/apache/dubbo/remoting/transport/mina/MinaChannel.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.transport.mina;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.remoting.ChannelHandler;
+import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.transport.AbstractChannel;
+
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.WriteFuture;
+
+import java.net.InetSocketAddress;
+
+import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_TIMEOUT;
+import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY;
+
+/**
+ * MinaChannel
+ */
+final class MinaChannel extends AbstractChannel {
+
+ private static final Logger logger = LoggerFactory.getLogger(MinaChannel.class);
+
+ private static final String CHANNEL_KEY = MinaChannel.class.getName() + ".CHANNEL";
+
+ private final IoSession session;
+
+ private MinaChannel(IoSession session, URL url, ChannelHandler handler) {
+ super(url, handler);
+ if (session == null) {
+ throw new IllegalArgumentException("mina session == null");
+ }
+ this.session = session;
+ }
+
+ static MinaChannel getOrAddChannel(IoSession session, URL url, ChannelHandler handler) {
+ if (session == null) {
+ return null;
+ }
+ MinaChannel ret = (MinaChannel) session.getAttribute(CHANNEL_KEY);
+ if (ret == null) {
+ ret = new MinaChannel(session, url, handler);
+ if (session.isConnected()) {
+ MinaChannel old = (MinaChannel) session.setAttribute(CHANNEL_KEY, ret);
+ if (old != null) {
+ session.setAttribute(CHANNEL_KEY, old);
+ ret = old;
+ }
+ }
+ }
+ return ret;
+ }
+
+ static void removeChannelIfDisconnected(IoSession session) {
+ if (session != null && !session.isConnected()) {
+ session.removeAttribute(CHANNEL_KEY);
+ }
+ }
+
+ @Override
+ public InetSocketAddress getLocalAddress() {
+ return (InetSocketAddress) session.getLocalAddress();
+ }
+
+ @Override
+ public InetSocketAddress getRemoteAddress() {
+ return (InetSocketAddress) session.getRemoteAddress();
+ }
+
+ @Override
+ public boolean isConnected() {
+ return session.isConnected();
+ }
+
+ @Override
+ public void send(Object message, boolean sent) throws RemotingException {
+ super.send(message, sent);
+
+ boolean success = true;
+ int timeout = 0;
+ try {
+ WriteFuture future = session.write(message);
+ if (sent) {
+ timeout = getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
+ success = future.join(timeout);
+ }
+ } catch (Throwable e) {
+ throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
+ }
+
+ if (!success) {
+ throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
+ + "in timeout(" + timeout + "ms) limit");
+ }
+ }
+
+ @Override
+ public void close() {
+ try {
+ super.close();
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ }
+ try {
+ removeChannelIfDisconnected(session);
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ }
+ try {
+ if (logger.isInfoEnabled()) {
+ logger.info("CLose mina channel " + session);
+ }
+ session.close();
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public boolean hasAttribute(String key) {
+ return session.containsAttribute(key);
+ }
+
+ @Override
+ public Object getAttribute(String key) {
+ return session.getAttribute(key);
+ }
+
+ @Override
+ public void setAttribute(String key, Object value) {
+ session.setAttribute(key, value);
+ }
+
+ @Override
+ public void removeAttribute(String key) {
+ session.removeAttribute(key);
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((session == null) ? 0 : session.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ MinaChannel other = (MinaChannel) obj;
+ if (session == null) {
+ if (other.session != null) {
+ return false;
+ }
+ } else if (!session.equals(other.session)) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "MinaChannel [session=" + session + "]";
+ }
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-mina/src/main/java/org/apache/dubbo/remoting/transport/mina/MinaClient.java b/dubbo-remoting-extensions/dubbo-remoting-mina/src/main/java/org/apache/dubbo/remoting/transport/mina/MinaClient.java
new file mode 100644
index 0000000..ad047a0
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-mina/src/main/java/org/apache/dubbo/remoting/transport/mina/MinaClient.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.transport.mina;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.Version;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.NamedThreadFactory;
+import org.apache.dubbo.common.utils.NetUtils;
+import org.apache.dubbo.remoting.Channel;
+import org.apache.dubbo.remoting.ChannelHandler;
+import org.apache.dubbo.remoting.Constants;
+import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.transport.AbstractClient;
+
+import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.IoFuture;
+import org.apache.mina.common.IoFutureListener;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.ThreadModel;
+import org.apache.mina.filter.codec.ProtocolCodecFilter;
+import org.apache.mina.transport.socket.nio.SocketConnector;
+import org.apache.mina.transport.socket.nio.SocketConnectorConfig;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Mina client.
+ */
+public class MinaClient extends AbstractClient {
+
+ private static final Logger logger = LoggerFactory.getLogger(MinaClient.class);
+
+ private static final Map<String, SocketConnector> CONNECTORS = new ConcurrentHashMap<String, SocketConnector>();
+
+ private String connectorKey;
+
+ private SocketConnector connector;
+
+ private volatile IoSession session; // volatile, please copy reference to use
+
+ public MinaClient(final URL url, final ChannelHandler handler) throws RemotingException {
+ super(url, wrapChannelHandler(url, handler));
+ }
+
+ @Override
+ protected void doOpen() throws Throwable {
+ connectorKey = getUrl().toFullString();
+ SocketConnector c = CONNECTORS.get(connectorKey);
+ if (c != null) {
+ connector = c;
+ } else {
+ // set thread pool.
+ connector = new SocketConnector(Constants.DEFAULT_IO_THREADS,
+ Executors.newCachedThreadPool(new NamedThreadFactory("MinaClientWorker", true)));
+ // config
+ SocketConnectorConfig cfg = (SocketConnectorConfig) connector.getDefaultConfig();
+ cfg.setThreadModel(ThreadModel.MANUAL);
+ cfg.getSessionConfig().setTcpNoDelay(true);
+ cfg.getSessionConfig().setKeepAlive(true);
+ int timeout = getConnectTimeout();
+ cfg.setConnectTimeout(timeout < 1000 ? 1 : timeout / 1000);
+ // set codec.
+ connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new MinaCodecAdapter(getCodec(), getUrl(), this)));
+ CONNECTORS.put(connectorKey, connector);
+ }
+ }
+
+ @Override
+ protected void doConnect() throws Throwable {
+ ConnectFuture future = connector.connect(getConnectAddress(), new MinaHandler(getUrl(), this));
+ long start = System.currentTimeMillis();
+ final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
+ final CountDownLatch finish = new CountDownLatch(1); // resolve future.awaitUninterruptibly() dead lock
+ future.addListener(new IoFutureListener() {
+ @Override
+ public void operationComplete(IoFuture future) {
+ try {
+ if (future.isReady()) {
+ IoSession newSession = future.getSession();
+ try {
+ // Close old channel
+ IoSession oldSession = MinaClient.this.session; // copy reference
+ if (oldSession != null) {
+ try {
+ if (logger.isInfoEnabled()) {
+ logger.info("Close old mina channel " + oldSession + " on create new mina channel " + newSession);
+ }
+ oldSession.close();
+ } finally {
+ MinaChannel.removeChannelIfDisconnected(oldSession);
+ }
+ }
+ } finally {
+ if (MinaClient.this.isClosed()) {
+ try {
+ if (logger.isInfoEnabled()) {
+ logger.info("Close new mina channel " + newSession + ", because the client closed.");
+ }
+ newSession.close();
+ } finally {
+ MinaClient.this.session = null;
+ MinaChannel.removeChannelIfDisconnected(newSession);
+ }
+ } else {
+ MinaClient.this.session = newSession;
+ }
+ }
+ }
+ } catch (Exception e) {
+ exception.set(e);
+ } finally {
+ finish.countDown();
+ }
+ }
+ });
+ try {
+ finish.await(getConnectTimeout(), TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server " + getRemoteAddress() + " client-side timeout "
+ + getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start)
+ + "ms) from netty client " + NetUtils.getLocalHost() + " using dubbo version "
+ + Version.getVersion() + ", cause: " + e.getMessage(), e);
+ }
+ Throwable e = exception.get();
+ if (e != null) {
+ throw e;
+ }
+ }
+
+ @Override
+ protected void doDisConnect() throws Throwable {
+ try {
+ MinaChannel.removeChannelIfDisconnected(session);
+ } catch (Throwable t) {
+ logger.warn(t.getMessage());
+ }
+ }
+
+ @Override
+ protected void doClose() throws Throwable {
+ //release mina resources.
+ }
+
+ @Override
+ protected Channel getChannel() {
+ IoSession s = session;
+ if (s == null || !s.isConnected()) {
+ return null;
+ }
+ return MinaChannel.getOrAddChannel(s, getUrl(), this);
+ }
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-mina/src/main/java/org/apache/dubbo/remoting/transport/mina/MinaCodecAdapter.java b/dubbo-remoting-extensions/dubbo-remoting-mina/src/main/java/org/apache/dubbo/remoting/transport/mina/MinaCodecAdapter.java
new file mode 100644
index 0000000..7ce9253
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-mina/src/main/java/org/apache/dubbo/remoting/transport/mina/MinaCodecAdapter.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.transport.mina;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.remoting.Channel;
+import org.apache.dubbo.remoting.ChannelHandler;
+import org.apache.dubbo.remoting.Codec2;
+import org.apache.dubbo.remoting.buffer.ChannelBuffer;
+import org.apache.dubbo.remoting.buffer.ChannelBuffers;
+import org.apache.dubbo.remoting.buffer.DynamicChannelBuffer;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.filter.codec.ProtocolCodecFactory;
+import org.apache.mina.filter.codec.ProtocolDecoder;
+import org.apache.mina.filter.codec.ProtocolDecoderOutput;
+import org.apache.mina.filter.codec.ProtocolEncoder;
+import org.apache.mina.filter.codec.ProtocolEncoderOutput;
+
+import static org.apache.dubbo.remoting.Constants.BUFFER_KEY;
+import static org.apache.dubbo.remoting.Constants.DEFAULT_BUFFER_SIZE;
+import static org.apache.dubbo.remoting.Constants.MAX_BUFFER_SIZE;
+import static org.apache.dubbo.remoting.Constants.MIN_BUFFER_SIZE;
+
+/**
+ * MinaCodecAdapter.
+ */
+final class MinaCodecAdapter implements ProtocolCodecFactory {
+
+ private final ProtocolEncoder encoder = new InternalEncoder();
+
+ private final ProtocolDecoder decoder = new InternalDecoder();
+
+ private final Codec2 codec;
+
+ private final URL url;
+
+ private final ChannelHandler handler;
+
+ private final int bufferSize;
+
+ public MinaCodecAdapter(Codec2 codec, URL url, ChannelHandler handler) {
+ this.codec = codec;
+ this.url = url;
+ this.handler = handler;
+ int b = url.getPositiveParameter(BUFFER_KEY, DEFAULT_BUFFER_SIZE);
+ this.bufferSize = b >= MIN_BUFFER_SIZE && b <= MAX_BUFFER_SIZE ? b : DEFAULT_BUFFER_SIZE;
+ }
+
+ @Override
+ public ProtocolEncoder getEncoder() {
+ return encoder;
+ }
+
+ @Override
+ public ProtocolDecoder getDecoder() {
+ return decoder;
+ }
+
+ private class InternalEncoder implements ProtocolEncoder {
+
+ @Override
+ public void dispose(IoSession session) throws Exception {
+ }
+
+ @Override
+ public void encode(IoSession session, Object msg, ProtocolEncoderOutput out) throws Exception {
+ ChannelBuffer buffer = ChannelBuffers.dynamicBuffer(1024);
+ MinaChannel channel = MinaChannel.getOrAddChannel(session, url, handler);
+ try {
+ codec.encode(channel, buffer, msg);
+ } finally {
+ MinaChannel.removeChannelIfDisconnected(session);
+ }
+ out.write(ByteBuffer.wrap(buffer.toByteBuffer()));
+ out.flush();
+ }
+ }
+
+ private class InternalDecoder implements ProtocolDecoder {
+
+ private ChannelBuffer buffer = ChannelBuffers.EMPTY_BUFFER;
+
+ @Override
+ public void decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception {
+ int readable = in.limit();
+ if (readable <= 0) {
+ return;
+ }
+
+ ChannelBuffer frame;
+
+ if (buffer.readable()) {
+ if (buffer instanceof DynamicChannelBuffer) {
+ buffer.writeBytes(in.buf());
+ frame = buffer;
+ } else {
+ int size = buffer.readableBytes() + in.remaining();
+ frame = ChannelBuffers.dynamicBuffer(size > bufferSize ? size : bufferSize);
+ frame.writeBytes(buffer, buffer.readableBytes());
+ frame.writeBytes(in.buf());
+ }
+ } else {
+ frame = ChannelBuffers.wrappedBuffer(in.buf());
+ }
+
+ Channel channel = MinaChannel.getOrAddChannel(session, url, handler);
+ Object msg;
+ int savedReadIndex;
+
+ try {
+ do {
+ savedReadIndex = frame.readerIndex();
+ try {
+ msg = codec.decode(channel, frame);
+ } catch (Exception e) {
+ buffer = ChannelBuffers.EMPTY_BUFFER;
+ throw e;
+ }
+ if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
+ frame.readerIndex(savedReadIndex);
+ break;
+ } else {
+ if (savedReadIndex == frame.readerIndex()) {
+ buffer = ChannelBuffers.EMPTY_BUFFER;
+ throw new Exception("Decode without read data.");
+ }
+ if (msg != null) {
+ out.write(msg);
+ }
+ }
+ } while (frame.readable());
+ } finally {
+ if (frame.readable()) {
+ frame.discardReadBytes();
+ buffer = frame;
+ } else {
+ buffer = ChannelBuffers.EMPTY_BUFFER;
+ }
+ MinaChannel.removeChannelIfDisconnected(session);
+ }
+ }
+
+ @Override
+ public void dispose(IoSession session) throws Exception {
+ }
+
+ @Override
+ public void finishDecode(IoSession session, ProtocolDecoderOutput out) throws Exception {
+ }
+ }
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-mina/src/main/java/org/apache/dubbo/remoting/transport/mina/MinaHandler.java b/dubbo-remoting-extensions/dubbo-remoting-mina/src/main/java/org/apache/dubbo/remoting/transport/mina/MinaHandler.java
new file mode 100644
index 0000000..3f6ac5e
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-mina/src/main/java/org/apache/dubbo/remoting/transport/mina/MinaHandler.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.transport.mina;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.remoting.ChannelHandler;
+
+import org.apache.mina.common.IoHandlerAdapter;
+import org.apache.mina.common.IoSession;
+
+/**
+ * MinaHandler
+ */
+public class MinaHandler extends IoHandlerAdapter {
+
+ private final URL url;
+
+ private final ChannelHandler handler;
+
+ public MinaHandler(URL url, ChannelHandler handler) {
+ if (url == null) {
+ throw new IllegalArgumentException("url == null");
+ }
+ if (handler == null) {
+ throw new IllegalArgumentException("handler == null");
+ }
+ this.url = url;
+ this.handler = handler;
+ }
+
+ @Override
+ public void sessionOpened(IoSession session) throws Exception {
+ MinaChannel channel = MinaChannel.getOrAddChannel(session, url, handler);
+ try {
+ handler.connected(channel);
+ } finally {
+ MinaChannel.removeChannelIfDisconnected(session);
+ }
+ }
+
+ @Override
+ public void sessionClosed(IoSession session) throws Exception {
+ MinaChannel channel = MinaChannel.getOrAddChannel(session, url, handler);
+ try {
+ handler.disconnected(channel);
+ } finally {
+ MinaChannel.removeChannelIfDisconnected(session);
+ }
+ }
+
+ @Override
+ public void messageReceived(IoSession session, Object message) throws Exception {
+ MinaChannel channel = MinaChannel.getOrAddChannel(session, url, handler);
+ try {
+ handler.received(channel, message);
+ } finally {
+ MinaChannel.removeChannelIfDisconnected(session);
+ }
+ }
+
+ @Override
+ public void messageSent(IoSession session, Object message) throws Exception {
+ MinaChannel channel = MinaChannel.getOrAddChannel(session, url, handler);
+ try {
+ handler.sent(channel, message);
+ } finally {
+ MinaChannel.removeChannelIfDisconnected(session);
+ }
+ }
+
+ @Override
+ public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
+ MinaChannel channel = MinaChannel.getOrAddChannel(session, url, handler);
+ try {
+ handler.caught(channel, cause);
+ } finally {
+ MinaChannel.removeChannelIfDisconnected(session);
+ }
+ }
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-mina/src/main/java/org/apache/dubbo/remoting/transport/mina/MinaServer.java b/dubbo-remoting-extensions/dubbo-remoting-mina/src/main/java/org/apache/dubbo/remoting/transport/mina/MinaServer.java
new file mode 100644
index 0000000..f88b967
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-mina/src/main/java/org/apache/dubbo/remoting/transport/mina/MinaServer.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.transport.mina;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.ExecutorUtil;
+import org.apache.dubbo.common.utils.NamedThreadFactory;
+import org.apache.dubbo.remoting.Channel;
+import org.apache.dubbo.remoting.ChannelHandler;
+import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.transport.AbstractServer;
+import org.apache.dubbo.remoting.transport.dispatcher.ChannelHandlers;
+
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.ThreadModel;
+import org.apache.mina.filter.codec.ProtocolCodecFilter;
+import org.apache.mina.transport.socket.nio.SocketAcceptor;
+import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
+
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.Executors;
+
+import static org.apache.dubbo.common.constants.CommonConstants.IO_THREADS_KEY;
+import static org.apache.dubbo.remoting.Constants.DEFAULT_IO_THREADS;
+
+/**
+ * MinaServer
+ */
+public class MinaServer extends AbstractServer {
+
+ private static final Logger logger = LoggerFactory.getLogger(MinaServer.class);
+
+ private SocketAcceptor acceptor;
+
+ public MinaServer(URL url, ChannelHandler handler) throws RemotingException {
+ super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
+ }
+
+ @Override
+ protected void doOpen() throws Throwable {
+ // set thread pool.
+ acceptor = new SocketAcceptor(getUrl().getPositiveParameter(IO_THREADS_KEY, DEFAULT_IO_THREADS),
+ Executors.newCachedThreadPool(new NamedThreadFactory("MinaServerWorker",
+ true)));
+ // config
+ SocketAcceptorConfig cfg = acceptor.getDefaultConfig();
+ cfg.setThreadModel(ThreadModel.MANUAL);
+ // set codec.
+ acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new MinaCodecAdapter(getCodec(), getUrl(), this)));
+
+ acceptor.bind(getBindAddress(), new MinaHandler(getUrl(), this));
+ }
+
+ @Override
+ protected void doClose() throws Throwable {
+ try {
+ if (acceptor != null) {
+ acceptor.unbind(getBindAddress());
+ }
+ } catch (Throwable e) {
+ logger.warn(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public Collection<Channel> getChannels() {
+ Set<IoSession> sessions = acceptor.getManagedSessions(getBindAddress());
+ Collection<Channel> channels = new HashSet<Channel>();
+ for (IoSession session : sessions) {
+ if (session.isConnected()) {
+ channels.add(MinaChannel.getOrAddChannel(session, getUrl(), this));
+ }
+ }
+ return channels;
+ }
+
+ @Override
+ public Channel getChannel(InetSocketAddress remoteAddress) {
+ Set<IoSession> sessions = acceptor.getManagedSessions(getBindAddress());
+ for (IoSession session : sessions) {
+ if (session.getRemoteAddress().equals(remoteAddress)) {
+ return MinaChannel.getOrAddChannel(session, getUrl(), this);
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public boolean isBound() {
+ return acceptor.isManaged(getBindAddress());
+ }
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-mina/src/main/java/org/apache/dubbo/remoting/transport/mina/MinaTransporter.java b/dubbo-remoting-extensions/dubbo-remoting-mina/src/main/java/org/apache/dubbo/remoting/transport/mina/MinaTransporter.java
new file mode 100644
index 0000000..ae2b190
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-mina/src/main/java/org/apache/dubbo/remoting/transport/mina/MinaTransporter.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.transport.mina;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.remoting.ChannelHandler;
+import org.apache.dubbo.remoting.Client;
+import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.RemotingServer;
+import org.apache.dubbo.remoting.Transporter;
+
+public class MinaTransporter implements Transporter {
+
+ public static final String NAME = "mina";
+
+ @Override
+ public RemotingServer bind(URL url, ChannelHandler handler) throws RemotingException {
+ return new MinaServer(url, handler);
+ }
+
+ @Override
+ public Client connect(URL url, ChannelHandler handler) throws RemotingException {
+ return new MinaClient(url, handler);
+ }
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-mina/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.Transporter b/dubbo-remoting-extensions/dubbo-remoting-mina/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.Transporter
new file mode 100644
index 0000000..70b87ac
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-mina/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.Transporter
@@ -0,0 +1 @@
+mina=org.apache.dubbo.remoting.transport.mina.MinaTransporter
diff --git a/dubbo-remoting-extensions/dubbo-remoting-mina/src/test/java/org/apache/remoting/transport/mina/ClientToServerTest.java b/dubbo-remoting-extensions/dubbo-remoting-mina/src/test/java/org/apache/remoting/transport/mina/ClientToServerTest.java
new file mode 100644
index 0000000..d6fd957
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-mina/src/test/java/org/apache/remoting/transport/mina/ClientToServerTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.remoting.transport.mina;
+
+import org.apache.dubbo.common.utils.NetUtils;
+import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.exchange.ExchangeChannel;
+import org.apache.dubbo.remoting.exchange.ExchangeServer;
+import org.apache.dubbo.remoting.exchange.support.Replier;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * ClientToServer
+ */
+public abstract class ClientToServerTest {
+
+ protected static final String LOCALHOST = "127.0.0.1";
+
+ protected ExchangeServer server;
+
+ protected ExchangeChannel client;
+
+ protected WorldHandler handler = new WorldHandler();
+
+ protected abstract ExchangeServer newServer(int port, Replier<?> receiver) throws RemotingException;
+
+ protected abstract ExchangeChannel newClient(int port) throws RemotingException;
+
+ @BeforeEach
+ protected void setUp() throws Exception {
+ int port = NetUtils.getAvailablePort();
+ server = newServer(port, handler);
+ client = newClient(port);
+ }
+
+ @AfterEach
+ protected void tearDown() {
+ try {
+ if (server != null)
+ server.close();
+ } finally {
+ if (client != null)
+ client.close();
+ }
+ }
+
+ @Test
+ public void testFuture() throws Exception {
+ CompletableFuture<Object> future = client.request(new World("world"));
+ Hello result = (Hello) future.get();
+ Assertions.assertEquals("hello,world", result.getName());
+ }
+
+// @Test
+// public void testCallback() throws Exception {
+// final Object waitter = new Object();
+// client.invoke(new World("world"), new InvokeCallback<Hello>() {
+// public void callback(Hello result) {
+// Assertions.assertEquals("hello,world", result.getName());
+// synchronized (waitter) {
+// waitter.notifyAll();
+// }
+// }
+// public void onException(Throwable exception) {
+// }
+// });
+// synchronized (waitter) {
+// waitter.wait();
+// }
+// }
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-mina/src/test/java/org/apache/remoting/transport/mina/ClientsTest.java b/dubbo-remoting-extensions/dubbo-remoting-mina/src/test/java/org/apache/remoting/transport/mina/ClientsTest.java
new file mode 100644
index 0000000..078f92d
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-mina/src/test/java/org/apache/remoting/transport/mina/ClientsTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.remoting.transport.mina;
+
+import org.apache.dubbo.common.extension.ExtensionLoader;
+import org.apache.dubbo.remoting.Transporter;
+import org.apache.dubbo.remoting.transport.mina.MinaTransporter;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class ClientsTest {
+
+ @Test
+ public void testGetTransportEmpty() {
+ try {
+ ExtensionLoader.getExtensionLoader(Transporter.class).getExtension("");
+ fail();
+ } catch (IllegalArgumentException expected) {
+ assertThat(expected.getMessage(), containsString("Extension name == null"));
+ }
+ }
+
+ @Test
+ public void testGetTransportNull() {
+ Assertions.assertThrows(IllegalArgumentException.class, () -> {
+ String name = null;
+ ExtensionLoader.getExtensionLoader(Transporter.class).getExtension(name);
+ });
+ }
+
+ @Test
+ public void testGetTransport1() {
+ String name = "mina";
+ assertEquals(MinaTransporter.class, ExtensionLoader.getExtensionLoader(Transporter.class).getExtension(name).getClass());
+ }
+
+ @Test
+ public void testGetTransportWrong() {
+ Assertions.assertThrows(IllegalStateException.class, () -> {
+ String name = "nety";
+ assertNull(ExtensionLoader.getExtensionLoader(Transporter.class).getExtension(name).getClass());
+ });
+ }
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-mina/src/test/java/org/apache/remoting/transport/mina/Hello.java b/dubbo-remoting-extensions/dubbo-remoting-mina/src/test/java/org/apache/remoting/transport/mina/Hello.java
new file mode 100644
index 0000000..15ae552
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-mina/src/test/java/org/apache/remoting/transport/mina/Hello.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.remoting.transport.mina;
+
+import java.io.Serializable;
+
+/**
+ * Result
+ */
+public class Hello implements Serializable {
+
+ private static final long serialVersionUID = 8563900571013747774L;
+
+ private String name;
+
+ public Hello() {
+ }
+
+ public Hello(String name) {
+ this.name = name;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-mina/src/test/java/org/apache/remoting/transport/mina/MinaClientToServerTest.java b/dubbo-remoting-extensions/dubbo-remoting-mina/src/test/java/org/apache/remoting/transport/mina/MinaClientToServerTest.java
new file mode 100644
index 0000000..d1c0525
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-mina/src/test/java/org/apache/remoting/transport/mina/MinaClientToServerTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.remoting.transport.mina;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.exchange.ExchangeChannel;
+import org.apache.dubbo.remoting.exchange.ExchangeServer;
+import org.apache.dubbo.remoting.exchange.Exchangers;
+import org.apache.dubbo.remoting.exchange.support.Replier;
+
+/**
+ * MinaServerClientTest
+ */
+public class MinaClientToServerTest extends ClientToServerTest {
+
+ @Override
+ protected ExchangeServer newServer(int port, Replier<?> receiver) throws RemotingException {
+ return Exchangers.bind(URL.valueOf("exchange://localhost:" + port + "?server=mina"), receiver);
+ }
+
+ @Override
+ protected ExchangeChannel newClient(int port) throws RemotingException {
+ return Exchangers.connect(URL.valueOf("exchange://localhost:" + port + "?client=mina&timeout=3000"));
+ }
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-mina/src/test/java/org/apache/remoting/transport/mina/World.java b/dubbo-remoting-extensions/dubbo-remoting-mina/src/test/java/org/apache/remoting/transport/mina/World.java
new file mode 100644
index 0000000..1f64d96
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-mina/src/test/java/org/apache/remoting/transport/mina/World.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.remoting.transport.mina;
+
+import java.io.Serializable;
+
+/**
+ * Data
+ */
+public class World implements Serializable {
+
+ private static final long serialVersionUID = 8563900571013747774L;
+
+ private String name;
+
+ public World() {
+ }
+
+ public World(String name) {
+ this.name = name;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-mina/src/test/java/org/apache/remoting/transport/mina/WorldHandler.java b/dubbo-remoting-extensions/dubbo-remoting-mina/src/test/java/org/apache/remoting/transport/mina/WorldHandler.java
new file mode 100644
index 0000000..9a328b6
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-mina/src/test/java/org/apache/remoting/transport/mina/WorldHandler.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.remoting.transport.mina;
+
+import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.exchange.ExchangeChannel;
+import org.apache.dubbo.remoting.exchange.support.Replier;
+
+/**
+ * DataHandler
+ */
+public class WorldHandler implements Replier<World> {
+
+ public Class<World> interest() {
+ return World.class;
+ }
+
+ public Object reply(ExchangeChannel channel, World msg) throws RemotingException {
+ return new Hello("hello," + msg.getName());
+ }
+
+}
diff --git a/dubbo-remoting-extensions/pom.xml b/dubbo-remoting-extensions/dubbo-remoting-p2p/pom.xml
similarity index 54%
copy from dubbo-remoting-extensions/pom.xml
copy to dubbo-remoting-extensions/dubbo-remoting-p2p/pom.xml
index 6a6b72a..08a49a7 100644
--- a/dubbo-remoting-extensions/pom.xml
+++ b/dubbo-remoting-extensions/dubbo-remoting-p2p/pom.xml
@@ -1,4 +1,3 @@
-<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
@@ -15,24 +14,30 @@
See the License for the specific language governing permissions and
limitations under the License.
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<parent>
+ <artifactId>dubbo-remoting-extensions</artifactId>
<groupId>org.apache.dubbo.extensions</groupId>
- <artifactId>extensions-parent</artifactId>
<version>${revision}</version>
<relativePath>../pom.xml</relativePath>
</parent>
- <modelVersion>4.0.0</modelVersion>
-
- <artifactId>dubbo-remoting-extensions</artifactId>
- <version>${revision}</version>
- <packaging>pom</packaging>
-
- <modules>
- <module>dubbo-remoting-quic</module>
- </modules>
+ <modelVersion>4.0.0</modelVersion>
+ <version>1.0.0-SNAPSHOT</version>
+ <artifactId>dubbo-remoting-p2p</artifactId>
+ <packaging>jar</packaging>
+ <name>${project.artifactId}</name>
+ <description>The p2p remoting module of dubbo project</description>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.dubbo</groupId>
+ <artifactId>dubbo-remoting-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.dubbo</groupId>
+ <artifactId>dubbo-remoting-netty4</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
</project>
diff --git a/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/Group.java b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/Group.java
new file mode 100644
index 0000000..1c335e2
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/Group.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.p2p;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.remoting.ChannelHandler;
+import org.apache.dubbo.remoting.RemotingException;
+
+/**
+ * Group. (SPI, Prototype, ThreadSafe)
+ * <p>
+ * <a href="http://en.wikipedia.org/wiki/Peer-to-peer">Peer-to-peer</a>
+ */
+public interface Group {
+
+ /**
+ * get group url.
+ *
+ * @return group url.
+ */
+ URL getUrl();
+
+ /**
+ * join.
+ *
+ * @param url
+ */
+ Peer join(URL url, ChannelHandler handler) throws RemotingException;
+
+ /**
+ * leave.
+ *
+ * @param url
+ * @throws RemotingException
+ */
+ void leave(URL url) throws RemotingException;
+
+ /**
+ * close the group.
+ */
+ void close();
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/Networker.java b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/Networker.java
new file mode 100644
index 0000000..c4801cd
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/Networker.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.p2p;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.extension.SPI;
+import org.apache.dubbo.remoting.RemotingException;
+
+/**
+ * Networker. (SPI, Singleton, ThreadSafe)
+ * <p>
+ * <a href="http://en.wikipedia.org/wiki/Peer-to-peer">Peer-to-peer</a>
+ */
+@SPI
+public interface Networker {
+
+ /**
+ * lookup group.
+ *
+ * @param url group url
+ * @return group.
+ */
+ Group lookup(URL url) throws RemotingException;
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/Networkers.java b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/Networkers.java
new file mode 100644
index 0000000..980c3d4
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/Networkers.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.p2p;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.extension.ExtensionLoader;
+import org.apache.dubbo.remoting.ChannelHandler;
+import org.apache.dubbo.remoting.RemotingException;
+
+/**
+ * Networkers. (API, Static, ThreadSafe)
+ * <p>
+ * <a href="http://en.wikipedia.org/wiki/Peer-to-peer">Peer-to-peer</a>
+ */
+public class Networkers {
+
+ public static Peer join(String group, String peer, ChannelHandler handler) throws RemotingException {
+ return join(URL.valueOf(group), URL.valueOf(peer), handler);
+ }
+
+ public static Peer join(URL group, URL peer, ChannelHandler handler) throws RemotingException {
+ return lookup(group).join(peer, handler);
+ }
+
+ public static Group lookup(String group) throws RemotingException {
+ return lookup(URL.valueOf(group));
+ }
+
+ public static Group lookup(URL group) throws RemotingException {
+ Networker networker = ExtensionLoader.getExtensionLoader(Networker.class).getExtension(group.getProtocol());
+ return networker.lookup(group);
+ }
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/Peer.java b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/Peer.java
new file mode 100644
index 0000000..474f795
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/Peer.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.p2p;
+
+import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.RemotingServer;
+
+/**
+ * Peer. (SPI, Prototype, ThreadSafe)
+ * <p>
+ * <a href="http://en.wikipedia.org/wiki/Peer-to-peer">Peer-to-peer</a>
+ */
+public interface Peer extends RemotingServer {
+
+ /**
+ * leave.
+ *
+ * @throws RemotingException
+ */
+ void leave() throws RemotingException;
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/exchange/ExchangeGroup.java b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/exchange/ExchangeGroup.java
new file mode 100644
index 0000000..7fe21ef
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/exchange/ExchangeGroup.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.p2p.exchange;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.exchange.ExchangeHandler;
+import org.apache.dubbo.remoting.p2p.Group;
+
+/**
+ * Group
+ */
+public interface ExchangeGroup extends Group {
+
+ /**
+ * join.
+ *
+ * @param url
+ */
+ ExchangePeer join(URL url, ExchangeHandler handler) throws RemotingException;
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/exchange/ExchangeNetworker.java b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/exchange/ExchangeNetworker.java
new file mode 100644
index 0000000..ac77d68
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/exchange/ExchangeNetworker.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.p2p.exchange;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.remoting.RemotingException;
+
+/**
+ * Networker
+ */
+public interface ExchangeNetworker {
+
+ /**
+ * lookup group.
+ *
+ * @param url group url
+ * @return group.
+ */
+ ExchangeGroup lookup(URL url) throws RemotingException;
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/exchange/ExchangeNetworkers.java b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/exchange/ExchangeNetworkers.java
new file mode 100644
index 0000000..0c047b7
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/exchange/ExchangeNetworkers.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.p2p.exchange;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.extension.ExtensionLoader;
+import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.exchange.ExchangeHandler;
+
+/**
+ * Networkers
+ */
+public class ExchangeNetworkers {
+
+ public static ExchangePeer join(String group, String peer, ExchangeHandler handler) throws RemotingException {
+ return join(URL.valueOf(group), URL.valueOf(peer), handler);
+ }
+
+ public static ExchangePeer join(URL group, URL peer, ExchangeHandler handler) throws RemotingException {
+ return lookup(group).join(peer, handler);
+ }
+
+ public static ExchangeGroup lookup(String group) throws RemotingException {
+ return lookup(URL.valueOf(group));
+ }
+
+ public static ExchangeGroup lookup(URL group) throws RemotingException {
+ ExchangeNetworker networker = ExtensionLoader.getExtensionLoader(ExchangeNetworker.class).getExtension(group.getProtocol());
+ return networker.lookup(group);
+ }
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/exchange/ExchangePeer.java b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/exchange/ExchangePeer.java
new file mode 100644
index 0000000..a9f2419
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/exchange/ExchangePeer.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.p2p.exchange;
+
+import org.apache.dubbo.remoting.exchange.ExchangeServer;
+import org.apache.dubbo.remoting.p2p.Peer;
+
+/**
+ * Peer
+ */
+public interface ExchangePeer extends Peer, ExchangeServer {
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/exchange/support/AbstractExchangeGroup.java b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/exchange/support/AbstractExchangeGroup.java
new file mode 100644
index 0000000..37d81a2
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/exchange/support/AbstractExchangeGroup.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.p2p.exchange.support;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.remoting.ChannelHandler;
+import org.apache.dubbo.remoting.Client;
+import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.RemotingServer;
+import org.apache.dubbo.remoting.exchange.ExchangeClient;
+import org.apache.dubbo.remoting.exchange.ExchangeHandler;
+import org.apache.dubbo.remoting.exchange.ExchangeServer;
+import org.apache.dubbo.remoting.exchange.Exchangers;
+import org.apache.dubbo.remoting.exchange.support.ExchangeHandlerDispatcher;
+import org.apache.dubbo.remoting.p2p.Peer;
+import org.apache.dubbo.remoting.p2p.exchange.ExchangeGroup;
+import org.apache.dubbo.remoting.p2p.exchange.ExchangePeer;
+
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * AbstractGroup
+ */
+public abstract class AbstractExchangeGroup implements ExchangeGroup {
+
+ // log output
+ protected static final Logger logger = LoggerFactory.getLogger(AbstractExchangeGroup.class);
+
+ protected final URL url;
+
+ protected final Map<URL, ExchangeServer> servers = new ConcurrentHashMap<URL, ExchangeServer>();
+
+ protected final Map<URL, ExchangeClient> clients = new ConcurrentHashMap<URL, ExchangeClient>();
+
+ protected final ExchangeHandlerDispatcher dispatcher = new ExchangeHandlerDispatcher();
+
+ public AbstractExchangeGroup(URL url) {
+ if (url == null) {
+ throw new IllegalArgumentException("url == null");
+ }
+ this.url = url;
+ }
+
+ @Override
+ public URL getUrl() {
+ return url;
+ }
+
+ @Override
+ public void close() {
+ for (URL url : new ArrayList<URL>(servers.keySet())) {
+ try {
+ leave(url);
+ } catch (Throwable t) {
+ logger.error(t.getMessage(), t);
+ }
+ }
+ for (URL url : new ArrayList<URL>(clients.keySet())) {
+ try {
+ disconnect(url);
+ } catch (Throwable t) {
+ logger.error(t.getMessage(), t);
+ }
+ }
+ }
+
+ @Override
+ public Peer join(URL url, ChannelHandler handler) throws RemotingException {
+ return join(url, (ExchangeHandler) handler);
+ }
+
+ @Override
+ public ExchangePeer join(URL url, ExchangeHandler handler) throws RemotingException {
+ ExchangeServer server = servers.get(url);
+ if (server == null) { // TODO exist concurrent gap
+ server = Exchangers.bind(url, handler);
+ servers.put(url, server);
+ dispatcher.addChannelHandler(handler);
+ }
+ return new ExchangeServerPeer(server, clients, this);
+ }
+
+ @Override
+ public void leave(URL url) throws RemotingException {
+ RemotingServer server = servers.remove(url);
+ if (server != null) {
+ server.close();
+ }
+ }
+
+ protected Client connect(URL url) throws RemotingException {
+ if (servers.containsKey(url)) {
+ return null;
+ }
+ ExchangeClient client = clients.get(url);
+ if (client == null) { // TODO exist concurrent gap
+ client = Exchangers.connect(url, dispatcher);
+ clients.put(url, client);
+ }
+ return client;
+ }
+
+ protected void disconnect(URL url) throws RemotingException {
+ Client client = clients.remove(url);
+ if (client != null) {
+ client.close();
+ }
+ }
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/exchange/support/ExchangeServerPeer.java b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/exchange/support/ExchangeServerPeer.java
new file mode 100644
index 0000000..5f3aebd
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/exchange/support/ExchangeServerPeer.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.p2p.exchange.support;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.remoting.Channel;
+import org.apache.dubbo.remoting.Client;
+import org.apache.dubbo.remoting.Constants;
+import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.exchange.ExchangeChannel;
+import org.apache.dubbo.remoting.exchange.ExchangeClient;
+import org.apache.dubbo.remoting.exchange.ExchangeServer;
+import org.apache.dubbo.remoting.exchange.support.ExchangeServerDelegate;
+import org.apache.dubbo.remoting.p2p.exchange.ExchangeGroup;
+import org.apache.dubbo.remoting.p2p.exchange.ExchangePeer;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * ServerPeer
+ */
+public class ExchangeServerPeer extends ExchangeServerDelegate implements ExchangePeer {
+
+ private static final Logger logger = LoggerFactory.getLogger(ExchangeServerPeer.class);
+
+ private final Map<URL, ExchangeClient> clients;
+
+ private final ExchangeGroup group;
+
+ public ExchangeServerPeer(ExchangeServer server, Map<URL, ExchangeClient> clients, ExchangeGroup group) {
+ super(server);
+ this.clients = clients;
+ this.group = group;
+ }
+
+ @Override
+ public void leave() throws RemotingException {
+ group.leave(getUrl());
+ }
+
+ @Override
+ public void close() {
+ try {
+ leave();
+ } catch (RemotingException e) {
+ logger.error(e.getMessage(), e);
+ }
+ }
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ @Override
+ public Collection<Channel> getChannels() {
+ return (Collection) getExchangeChannels();
+ }
+
+ @Override
+ public Channel getChannel(InetSocketAddress remoteAddress) {
+ return getExchangeChannel(remoteAddress);
+ }
+
+ @Override
+ public Collection<ExchangeChannel> getExchangeChannels() {
+ Collection<ExchangeChannel> channels = super.getExchangeChannels();
+ if (clients.size() > 0) {
+ channels = channels == null ? new ArrayList<ExchangeChannel>() : new ArrayList<ExchangeChannel>(channels);
+ channels.addAll(clients.values());
+ }
+ return channels;
+ }
+
+ @Override
+ public ExchangeChannel getExchangeChannel(InetSocketAddress remoteAddress) {
+ String host = remoteAddress.getAddress() != null ? remoteAddress.getAddress().getHostAddress() : remoteAddress.getHostName();
+ int port = remoteAddress.getPort();
+ ExchangeChannel channel = super.getExchangeChannel(remoteAddress);
+ if (channel == null) {
+ for (Map.Entry<URL, ExchangeClient> entry : clients.entrySet()) {
+ URL url = entry.getKey();
+ if (url.getIp().equals(host) && url.getPort() == port) {
+ return entry.getValue();
+ }
+ }
+ }
+ return channel;
+ }
+
+ @Override
+ public void send(Object message) throws RemotingException {
+ send(message, getUrl().getParameter(Constants.SENT_KEY, false));
+ }
+
+ @Override
+ public void send(Object message, boolean sent) throws RemotingException {
+ Throwable last = null;
+ try {
+ super.send(message, sent);
+ } catch (Throwable t) {
+ last = t;
+ }
+ for (Client client : clients.values()) {
+ try {
+ client.send(message, sent);
+ } catch (Throwable t) {
+ last = t;
+ }
+ }
+ if (last != null) {
+ if (last instanceof RemotingException) {
+ throw (RemotingException) last;
+ } else if (last instanceof RuntimeException) {
+ throw (RuntimeException) last;
+ } else {
+ throw new RuntimeException(last.getMessage(), last);
+ }
+ }
+ }
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/exchange/support/FileExchangeGroup.java b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/exchange/support/FileExchangeGroup.java
new file mode 100644
index 0000000..2ea192e
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/exchange/support/FileExchangeGroup.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.p2p.exchange.support;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.utils.ExecutorUtil;
+import org.apache.dubbo.common.utils.IOUtils;
+import org.apache.dubbo.common.utils.NamedThreadFactory;
+import org.apache.dubbo.common.utils.NetUtils;
+import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.exchange.ExchangeHandler;
+import org.apache.dubbo.remoting.p2p.exchange.ExchangePeer;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * FileGroup
+ */
+public class FileExchangeGroup extends AbstractExchangeGroup {
+
+ private final File file;
+ // scheduled executor service
+ private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3, new NamedThreadFactory("FileGroupModifiedChecker", true));
+ // Reconnect the timer to check whether the connection is available at a time, and when unavailable, an infinite reconnection
+ private final ScheduledFuture<?> checkModifiedFuture;
+ private volatile long last;
+
+ public FileExchangeGroup(URL url) {
+ super(url);
+ String path = url.getHost() + "/" + url.getPath();
+ file = new File(path);
+ if (!file.exists()) {
+ throw new IllegalStateException("The group file not exists. file: " + path);
+ }
+ checkModifiedFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {
+ @Override
+ public void run() {
+ // check the file change
+ try {
+ check();
+ } catch (Throwable t) { // Defensive fault tolerance
+ logger.error("Unexpected error occur at reconnect, cause: " + t.getMessage(), t);
+ }
+ }
+ }, 2000, 2000, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public void close() {
+ super.close();
+ try {
+ ExecutorUtil.cancelScheduledFuture(checkModifiedFuture);
+ } catch (Throwable t) {
+ logger.error(t.getMessage(), t);
+ }
+ }
+
+ private void check() throws RemotingException {
+ long modified = file.lastModified();
+ if (modified > last) {
+ last = modified;
+ changed();
+ }
+ }
+
+ private void changed() throws RemotingException {
+ try {
+ String[] lines = IOUtils.readLines(file);
+ for (String line : lines) {
+ connect(URL.valueOf(line));
+ }
+ } catch (IOException e) {
+ throw new RemotingException(new InetSocketAddress(NetUtils.getLocalHost(), 0), getUrl().toInetSocketAddress(), e.getMessage(), e);
+ }
+ }
+
+ public ExchangePeer joinExchange(URL url, ExchangeHandler handler) throws RemotingException {
+ ExchangePeer peer = super.join(url, handler);
+ try {
+ String full = url.toFullString();
+ String[] lines = IOUtils.readLines(file);
+ for (String line : lines) {
+ if (full.equals(line)) {
+ return peer;
+ }
+ }
+ IOUtils.appendLines(file, new String[]{full});
+ } catch (IOException e) {
+ throw new RemotingException(new InetSocketAddress(NetUtils.getLocalHost(), 0), getUrl().toInetSocketAddress(), e.getMessage(), e);
+ }
+ return peer;
+ }
+
+ @Override
+ public void leave(URL url) throws RemotingException {
+ super.leave(url);
+ try {
+ String full = url.toFullString();
+ String[] lines = IOUtils.readLines(file);
+ List<String> saves = new ArrayList<String>();
+ for (String line : lines) {
+ if (full.equals(line)) {
+ return;
+ }
+ saves.add(line);
+ }
+ IOUtils.appendLines(file, saves.toArray(new String[0]));
+ } catch (IOException e) {
+ throw new RemotingException(new InetSocketAddress(NetUtils.getLocalHost(), 0), getUrl().toInetSocketAddress(), e.getMessage(), e);
+ }
+ }
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/exchange/support/FileExchangeNetworker.java b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/exchange/support/FileExchangeNetworker.java
new file mode 100644
index 0000000..d6d90c7
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/exchange/support/FileExchangeNetworker.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.p2p.exchange.support;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.p2p.exchange.ExchangeGroup;
+import org.apache.dubbo.remoting.p2p.exchange.ExchangeNetworker;
+
+/**
+ * FileNetworker
+ */
+public class FileExchangeNetworker implements ExchangeNetworker {
+
+ @Override
+ public ExchangeGroup lookup(URL url) throws RemotingException {
+ return new FileExchangeGroup(url);
+ }
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/exchange/support/MulticastExchangeGroup.java b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/exchange/support/MulticastExchangeGroup.java
new file mode 100644
index 0000000..f21f873
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/exchange/support/MulticastExchangeGroup.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.p2p.exchange.support;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.utils.NetUtils;
+import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.exchange.ExchangeHandler;
+import org.apache.dubbo.remoting.p2p.exchange.ExchangePeer;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.MulticastSocket;
+
+/**
+ * MulticastGroup
+ */
+public class MulticastExchangeGroup extends AbstractExchangeGroup {
+
+ private static final String JOIN = "join";
+
+ private static final String LEAVE = "leave";
+
+ private InetAddress multicastAddress;
+
+ private MulticastSocket multicastSocket;
+
+ public MulticastExchangeGroup(URL url) {
+ super(url);
+ if (!NetUtils.isMulticastAddress(url.getHost())) {
+ throw new IllegalArgumentException("Invalid multicast address " + url.getHost() + ", scope: 224.0.0.0 - 239.255.255.255");
+ }
+ try {
+ multicastAddress = InetAddress.getByName(url.getHost());
+ multicastSocket = new MulticastSocket(url.getPort());
+ multicastSocket.setLoopbackMode(false);
+ multicastSocket.joinGroup(multicastAddress);
+ Thread thread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ byte[] buf = new byte[1024];
+ DatagramPacket recv = new DatagramPacket(buf, buf.length);
+ while (true) {
+ try {
+ multicastSocket.receive(recv);
+ MulticastExchangeGroup.this.receive(new String(recv.getData()).trim(), (InetSocketAddress) recv.getSocketAddress());
+ } catch (Exception e) {
+ logger.error(e.getMessage(), e);
+ }
+ }
+ }
+ }, "MulticastGroupReceiver");
+ thread.setDaemon(true);
+ thread.start();
+ } catch (IOException e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+
+ private void send(String msg) throws RemotingException {
+ DatagramPacket hi = new DatagramPacket(msg.getBytes(), msg.length(), multicastAddress, multicastSocket.getLocalPort());
+ try {
+ multicastSocket.send(hi);
+ } catch (IOException e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+
+ private void receive(String msg, InetSocketAddress remoteAddress) throws RemotingException {
+ if (msg.startsWith(JOIN)) {
+ String url = msg.substring(JOIN.length()).trim();
+ connect(URL.valueOf(url));
+ } else if (msg.startsWith(LEAVE)) {
+ String url = msg.substring(LEAVE.length()).trim();
+ disconnect(URL.valueOf(url));
+ }
+ }
+
+ @Override
+ public ExchangePeer join(URL url, ExchangeHandler handler) throws RemotingException {
+ ExchangePeer peer = super.join(url, handler);
+ send(JOIN + " " + url.toFullString());
+ return peer;
+ }
+
+ @Override
+ public void leave(URL url) throws RemotingException {
+ super.leave(url);
+ send(LEAVE + " " + url.toFullString());
+ }
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/exchange/support/MulticastExchangeNetworker.java b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/exchange/support/MulticastExchangeNetworker.java
new file mode 100644
index 0000000..18d6186
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/exchange/support/MulticastExchangeNetworker.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.p2p.exchange.support;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.p2p.exchange.ExchangeGroup;
+import org.apache.dubbo.remoting.p2p.exchange.ExchangeNetworker;
+
+/**
+ * MulticastNetworker
+ */
+public class MulticastExchangeNetworker implements ExchangeNetworker {
+
+ @Override
+ public ExchangeGroup lookup(URL url) throws RemotingException {
+ return new MulticastExchangeGroup(url);
+ }
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/support/AbstractGroup.java b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/support/AbstractGroup.java
new file mode 100644
index 0000000..0eec957
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/support/AbstractGroup.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.p2p.support;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.remoting.ChannelHandler;
+import org.apache.dubbo.remoting.Client;
+import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.RemotingServer;
+import org.apache.dubbo.remoting.Transporters;
+import org.apache.dubbo.remoting.p2p.Group;
+import org.apache.dubbo.remoting.p2p.Peer;
+import org.apache.dubbo.remoting.transport.ChannelHandlerDispatcher;
+
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * AbstractGroup
+ */
+public abstract class AbstractGroup implements Group {
+
+ // log output
+ protected static final Logger logger = LoggerFactory.getLogger(AbstractGroup.class);
+
+ protected final URL url;
+
+ protected final Map<URL, RemotingServer> servers = new ConcurrentHashMap<URL, RemotingServer>();
+
+ protected final Map<URL, Client> clients = new ConcurrentHashMap<URL, Client>();
+
+ protected final ChannelHandlerDispatcher dispatcher = new ChannelHandlerDispatcher();
+
+ public AbstractGroup(URL url) {
+ if (url == null) {
+ throw new IllegalArgumentException("url == null");
+ }
+ this.url = url;
+ }
+
+ @Override
+ public URL getUrl() {
+ return url;
+ }
+
+ @Override
+ public void close() {
+ for (URL url : new ArrayList<URL>(servers.keySet())) {
+ try {
+ leave(url);
+ } catch (Throwable t) {
+ logger.error(t.getMessage(), t);
+ }
+ }
+ for (URL url : new ArrayList<URL>(clients.keySet())) {
+ try {
+ disconnect(url);
+ } catch (Throwable t) {
+ logger.error(t.getMessage(), t);
+ }
+ }
+ }
+
+ @Override
+ public Peer join(URL url, ChannelHandler handler) throws RemotingException {
+ RemotingServer server = servers.get(url);
+ if (server == null) { // TODO exist concurrent gap
+ server = Transporters.bind(url, handler);
+ servers.put(url, server);
+ dispatcher.addChannelHandler(handler);
+ }
+ return new ServerPeer(server, clients, this);
+ }
+
+ @Override
+ public void leave(URL url) throws RemotingException {
+ RemotingServer server = servers.remove(url);
+ if (server != null) {
+ server.close();
+ }
+ }
+
+ protected Client connect(URL url) throws RemotingException {
+ if (servers.containsKey(url)) {
+ return null;
+ }
+ Client client = clients.get(url);
+ if (client == null) { // TODO exist concurrent gap
+ client = Transporters.connect(url, dispatcher);
+ clients.put(url, client);
+ }
+ return client;
+ }
+
+ protected void disconnect(URL url) throws RemotingException {
+ Client client = clients.remove(url);
+ if (client != null) {
+ client.close();
+ }
+ }
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/support/FileGroup.java b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/support/FileGroup.java
new file mode 100644
index 0000000..aa69c08
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/support/FileGroup.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.p2p.support;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.utils.ExecutorUtil;
+import org.apache.dubbo.common.utils.IOUtils;
+import org.apache.dubbo.common.utils.NamedThreadFactory;
+import org.apache.dubbo.common.utils.NetUtils;
+import org.apache.dubbo.remoting.ChannelHandler;
+import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.p2p.Peer;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * FileGroup
+ */
+public class FileGroup extends AbstractGroup {
+
+ private final File file;
+ // Scheduled executor service
+ private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3, new NamedThreadFactory("FileGroupModifiedChecker", true));
+ // Reconnect the timer to check whether the connection is available at a time, and when unavailable, an infinite reconnection
+ private final ScheduledFuture<?> checkModifiedFuture;
+ private volatile long last;
+
+ public FileGroup(URL url) {
+ super(url);
+ String path = url.getAbsolutePath();
+ file = new File(path);
+ checkModifiedFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {
+ @Override
+ public void run() {
+ // Check the file change
+ try {
+ check();
+ } catch (Throwable t) { // Defensive fault tolerance
+ logger.error("Unexpected error occur at reconnect, cause: " + t.getMessage(), t);
+ }
+ }
+ }, 2000, 2000, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public void close() {
+ super.close();
+ try {
+ ExecutorUtil.cancelScheduledFuture(checkModifiedFuture);
+ } catch (Throwable t) {
+ logger.error(t.getMessage(), t);
+ }
+ }
+
+ private void check() throws RemotingException {
+ long modified = file.lastModified();
+ if (modified > last) {
+ last = modified;
+ changed();
+ }
+ }
+
+ private void changed() throws RemotingException {
+ try {
+ String[] lines = IOUtils.readLines(file);
+ for (String line : lines) {
+ connect(URL.valueOf(line));
+ }
+ } catch (IOException e) {
+ throw new RemotingException(new InetSocketAddress(NetUtils.getLocalHost(), 0), getUrl().toInetSocketAddress(), e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public Peer join(URL url, ChannelHandler handler) throws RemotingException {
+ Peer peer = super.join(url, handler);
+ try {
+ String full = url.toFullString();
+ String[] lines = IOUtils.readLines(file);
+ for (String line : lines) {
+ if (full.equals(line)) {
+ return peer;
+ }
+ }
+ IOUtils.appendLines(file, new String[]{full});
+ } catch (IOException e) {
+ throw new RemotingException(new InetSocketAddress(NetUtils.getLocalHost(), 0), getUrl().toInetSocketAddress(), e.getMessage(), e);
+ }
+ return peer;
+ }
+
+ @Override
+ public void leave(URL url) throws RemotingException {
+ super.leave(url);
+ try {
+ String full = url.toFullString();
+ String[] lines = IOUtils.readLines(file);
+ List<String> saves = new ArrayList<String>();
+ for (String line : lines) {
+ if (full.equals(line)) {
+ return;
+ }
+ saves.add(line);
+ }
+ IOUtils.appendLines(file, saves.toArray(new String[0]));
+ } catch (IOException e) {
+ throw new RemotingException(new InetSocketAddress(NetUtils.getLocalHost(), 0), getUrl().toInetSocketAddress(), e.getMessage(), e);
+ }
+ }
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/support/FileNetworker.java b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/support/FileNetworker.java
new file mode 100644
index 0000000..e9fd1b0
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/support/FileNetworker.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.p2p.support;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.p2p.Group;
+import org.apache.dubbo.remoting.p2p.Networker;
+
+/**
+ * FileNetworker
+ */
+public class FileNetworker implements Networker {
+
+ @Override
+ public Group lookup(URL url) throws RemotingException {
+ return new FileGroup(url);
+ }
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/support/MulticastGroup.java b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/support/MulticastGroup.java
new file mode 100644
index 0000000..8af1722
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/support/MulticastGroup.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.p2p.support;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.utils.NetUtils;
+import org.apache.dubbo.remoting.ChannelHandler;
+import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.p2p.Peer;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.MulticastSocket;
+
+/**
+ * MulticastGroup
+ */
+public class MulticastGroup extends AbstractGroup {
+
+ private static final String JOIN = "join";
+
+ private static final String LEAVE = "leave";
+
+ private InetAddress multicastAddress;
+
+ private MulticastSocket multicastSocket;
+
+ public MulticastGroup(URL url) {
+ super(url);
+ if (!NetUtils.isMulticastAddress(url.getHost())) {
+ throw new IllegalArgumentException("Invalid multicast address " + url.getHost() + ", scope: 224.0.0.0 - 239.255.255.255");
+ }
+ try {
+ multicastAddress = InetAddress.getByName(url.getHost());
+ multicastSocket = new MulticastSocket(url.getPort());
+ multicastSocket.setLoopbackMode(false);
+ multicastSocket.joinGroup(multicastAddress);
+ Thread thread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ byte[] buf = new byte[1024];
+ DatagramPacket recv = new DatagramPacket(buf, buf.length);
+ while (true) {
+ try {
+ multicastSocket.receive(recv);
+ MulticastGroup.this.receive(new String(recv.getData()).trim(), (InetSocketAddress) recv.getSocketAddress());
+ } catch (Exception e) {
+ logger.error(e.getMessage(), e);
+ }
+ }
+ }
+ }, "MulticastGroupReceiver");
+ thread.setDaemon(true);
+ thread.start();
+ } catch (IOException e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+
+ private void send(String msg) throws RemotingException {
+ DatagramPacket hi = new DatagramPacket(msg.getBytes(), msg.length(), multicastAddress, multicastSocket.getLocalPort());
+ try {
+ multicastSocket.send(hi);
+ } catch (IOException e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+
+ private void receive(String msg, InetSocketAddress remoteAddress) throws RemotingException {
+ if (msg.startsWith(JOIN)) {
+ String url = msg.substring(JOIN.length()).trim();
+ connect(URL.valueOf(url));
+ } else if (msg.startsWith(LEAVE)) {
+ String url = msg.substring(LEAVE.length()).trim();
+ disconnect(URL.valueOf(url));
+ }
+ }
+
+ @Override
+ public Peer join(URL url, ChannelHandler handler) throws RemotingException {
+ Peer peer = super.join(url, handler);
+ send(JOIN + " " + url.toFullString());
+ return peer;
+ }
+
+ @Override
+ public void leave(URL url) throws RemotingException {
+ super.leave(url);
+ send(LEAVE + " " + url.toFullString());
+ }
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/support/MulticastNetworker.java b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/support/MulticastNetworker.java
new file mode 100644
index 0000000..c706591
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/support/MulticastNetworker.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.p2p.support;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.p2p.Group;
+import org.apache.dubbo.remoting.p2p.Networker;
+
+/**
+ * MulticastNetworker
+ */
+public class MulticastNetworker implements Networker {
+
+ @Override
+ public Group lookup(URL url) throws RemotingException {
+ return new MulticastGroup(url);
+ }
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/support/ServerPeer.java b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/support/ServerPeer.java
new file mode 100644
index 0000000..d5a2575
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/java/org/apache/dubbo/remoting/p2p/support/ServerPeer.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.p2p.support;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.remoting.Channel;
+import org.apache.dubbo.remoting.Client;
+import org.apache.dubbo.remoting.Constants;
+import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.RemotingServer;
+import org.apache.dubbo.remoting.p2p.Group;
+import org.apache.dubbo.remoting.p2p.Peer;
+import org.apache.dubbo.remoting.transport.ServerDelegate;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * ServerPeer
+ */
+public class ServerPeer extends ServerDelegate implements Peer {
+
+ private static final Logger logger = LoggerFactory.getLogger(ServerPeer.class);
+
+ private final Map<URL, Client> clients;
+
+ private final Group group;
+
+ public ServerPeer(RemotingServer server, Map<URL, Client> clients, Group group) {
+ super(server);
+ this.clients = clients;
+ this.group = group;
+ }
+
+ @Override
+ public void leave() throws RemotingException {
+ group.leave(getUrl());
+ }
+
+ @Override
+ public void close() {
+ try {
+ leave();
+ } catch (RemotingException e) {
+ logger.error(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public Collection<Channel> getChannels() {
+ Collection<Channel> channels = super.getChannels();
+ if (clients.size() > 0) {
+ channels = channels == null ? new ArrayList<Channel>() : new ArrayList<Channel>(channels);
+ channels.addAll(clients.values());
+ }
+ return channels;
+ }
+
+ @Override
+ public Channel getChannel(InetSocketAddress remoteAddress) {
+ String host = remoteAddress.getAddress() != null ? remoteAddress.getAddress().getHostAddress() : remoteAddress.getHostName();
+ int port = remoteAddress.getPort();
+ Channel channel = super.getChannel(remoteAddress);
+ if (channel == null) {
+ for (Map.Entry<URL, Client> entry : clients.entrySet()) {
+ URL url = entry.getKey();
+ if (url.getIp().equals(host) && url.getPort() == port) {
+ return entry.getValue();
+ }
+ }
+ }
+ return channel;
+ }
+
+ @Override
+ public void send(Object message) throws RemotingException {
+ send(message, getUrl().getParameter(Constants.SENT_KEY, false));
+ }
+
+ @Override
+ public void send(Object message, boolean sent) throws RemotingException {
+ Throwable last = null;
+ try {
+ super.send(message, sent);
+ } catch (Throwable t) {
+ last = t;
+ }
+ for (Client client : clients.values()) {
+ try {
+ client.send(message, sent);
+ } catch (Throwable t) {
+ last = t;
+ }
+ }
+ if (last != null) {
+ if (last instanceof RemotingException) {
+ throw (RemotingException) last;
+ } else if (last instanceof RuntimeException) {
+ throw (RuntimeException) last;
+ } else {
+ throw new RuntimeException(last.getMessage(), last);
+ }
+ }
+ }
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.p2p.Networker b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.p2p.Networker
new file mode 100644
index 0000000..0712e65
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.p2p.Networker
@@ -0,0 +1,2 @@
+multicast=org.apache.dubbo.remoting.p2p.support.MulticastNetworker
+file=org.apache.dubbo.remoting.p2p.support.FileNetworker
diff --git a/dubbo-remoting-extensions/dubbo-remoting-p2p/src/test/java/org/apache/dubbo/remoting/p2p/exchange/support/MulticastExchangeNetworkerTest.java b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/test/java/org/apache/dubbo/remoting/p2p/exchange/support/MulticastExchangeNetworkerTest.java
new file mode 100644
index 0000000..573f245
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/test/java/org/apache/dubbo/remoting/p2p/exchange/support/MulticastExchangeNetworkerTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.p2p.exchange.support;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.utils.NetUtils;
+import org.apache.dubbo.remoting.Channel;
+import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.exchange.ExchangeChannel;
+import org.apache.dubbo.remoting.exchange.ExchangeHandler;
+import org.apache.dubbo.remoting.exchange.support.ExchangeHandlerAdapter;
+import org.apache.dubbo.remoting.p2p.Group;
+import org.apache.dubbo.remoting.p2p.Networkers;
+import org.apache.dubbo.remoting.p2p.Peer;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Mockito.mock;
+
+public class MulticastExchangeNetworkerTest {
+ @Test
+ public void testJoin() throws RemotingException, InterruptedException {
+ final String groupURL = "multicast://224.5.6.7:1234";
+
+ MulticastExchangeNetworker multicastExchangeNetworker = new MulticastExchangeNetworker();
+
+ final CountDownLatch countDownLatch = new CountDownLatch(1);
+ Peer peer1 = multicastExchangeNetworker.lookup(URL.valueOf(groupURL))
+ .join(URL.valueOf("exchange://0.0.0.0:" + NetUtils.getAvailablePort() + "?exchanger=header"), new ExchangeHandlerAdapter() {
+ @Override
+ public CompletableFuture<Object> reply(ExchangeChannel channel, Object msg) throws RemotingException {
+ countDownLatch.countDown();
+ return super.reply(channel, msg);
+ }
+ });
+ Peer peer2 = multicastExchangeNetworker.lookup(URL.valueOf(groupURL))
+ .join(URL.valueOf("exchange://0.0.0.0:" + NetUtils.getAvailablePort() + "?exchanger=header"), mock(ExchangeHandler.class));
+
+ while (true) {
+ for (Channel channel : peer1.getChannels()) {
+ channel.send("hello multicast exchange network!");
+ }
+ TimeUnit.MILLISECONDS.sleep(50);
+
+ long count = countDownLatch.getCount();
+ if (count > 0) {
+ break;
+ }
+ }
+
+ Group lookup = Networkers.lookup(groupURL);
+ assertThat(lookup, not(nullValue()));
+
+ assertThat(peer1, instanceOf(ExchangeServerPeer.class));
+
+ peer1.close();
+ peer2.close();
+ }
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-p2p/src/test/java/org/apache/dubbo/remoting/p2p/support/FileNetworkerTest.java b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/test/java/org/apache/dubbo/remoting/p2p/support/FileNetworkerTest.java
new file mode 100644
index 0000000..d947761
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/test/java/org/apache/dubbo/remoting/p2p/support/FileNetworkerTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.p2p.support;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.utils.NetUtils;
+import org.apache.dubbo.remoting.Channel;
+import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.p2p.Group;
+import org.apache.dubbo.remoting.p2p.Peer;
+import org.apache.dubbo.remoting.transport.ChannelHandlerAdapter;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.nio.file.Path;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Mockito.mock;
+
+public class FileNetworkerTest {
+
+ @BeforeEach
+ public void setUp(@TempDir Path folder) throws Exception {
+ folder.toFile().createNewFile();
+ }
+
+ @AfterEach
+ public void tearDown(@TempDir Path folder) {
+ folder.getFileName().toAbsolutePath().toFile().delete();
+ }
+
+ @Test
+ public void testJoin(@TempDir Path folder) throws RemotingException, InterruptedException {
+ final String groupURL = "file:///" + folder.getFileName().toAbsolutePath();
+
+ FileNetworker networker = new FileNetworker();
+ Group group = networker.lookup(URL.valueOf(groupURL));
+
+ final CountDownLatch countDownLatch = new CountDownLatch(1);
+ Peer peer1 = group.join(URL.valueOf("exchange://0.0.0.0:" + NetUtils.getAvailablePort() + "?exchanger=header"), new ChannelHandlerAdapter() {
+ @Override
+ public void received(Channel channel, Object message) {
+ countDownLatch.countDown();
+ }
+ });
+ Peer peer2 = group.join(URL.valueOf("exchange://0.0.0.0:" + NetUtils.getAvailablePort() + "?exchanger=header"),
+ mock(ChannelHandlerAdapter.class));
+
+ while (true) {
+ long count = countDownLatch.getCount();
+ if (count > 0) {
+ break;
+ }
+ for (Channel channel : peer1.getChannels()) {
+ channel.send(0, false);
+ channel.send("hello world!");
+ }
+ TimeUnit.MILLISECONDS.sleep(50);
+ }
+
+
+ peer2.close();
+ peer1.close();
+ }
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-p2p/src/test/java/org/apache/dubbo/remoting/p2p/support/MulticastNetworkerTest.java b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/test/java/org/apache/dubbo/remoting/p2p/support/MulticastNetworkerTest.java
new file mode 100644
index 0000000..88d7e97
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-p2p/src/test/java/org/apache/dubbo/remoting/p2p/support/MulticastNetworkerTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.p2p.support;
+
+import org.apache.dubbo.common.utils.NetUtils;
+import org.apache.dubbo.remoting.Channel;
+import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.p2p.Group;
+import org.apache.dubbo.remoting.p2p.Networkers;
+import org.apache.dubbo.remoting.p2p.Peer;
+import org.apache.dubbo.remoting.transport.ChannelHandlerAdapter;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Mockito.mock;
+
+public class MulticastNetworkerTest {
+
+ @Test
+ public void testJoin() throws RemotingException, InterruptedException {
+ final String groupURL = "multicast://224.5.6.7:1234";
+ final String peerURL = "exchange://0.0.0.0:" + NetUtils.getAvailablePort() + "?exchanger=header";
+
+ final CountDownLatch countDownLatch = new CountDownLatch(1);
+ Peer peer1 = Networkers.join(groupURL, peerURL, new ChannelHandlerAdapter() {
+ @Override
+ public void received(Channel channel, Object message) {
+ countDownLatch.countDown();
+ }
+ });
+ Peer peer2 = Networkers.join(groupURL, "exchange://0.0.0.0:" + NetUtils.getAvailablePort() + "?exchanger=header",
+ mock(ChannelHandlerAdapter.class));
+
+ while (true) {
+ long count = countDownLatch.getCount();
+ if (count > 0) {
+ break;
+ }
+ for (Channel channel : peer1.getChannels()) {
+ channel.send("hello world!");
+ }
+ TimeUnit.MILLISECONDS.sleep(50);
+ }
+
+ Group lookup = Networkers.lookup(groupURL);
+ assertThat(lookup, not(nullValue()));
+
+ peer2.close();
+ peer1.close();
+ }
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-redis/pom.xml b/dubbo-remoting-extensions/dubbo-remoting-redis/pom.xml
new file mode 100644
index 0000000..e357bd1
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-redis/pom.xml
@@ -0,0 +1,53 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>dubbo-remoting-extensions</artifactId>
+ <groupId>org.apache.dubbo.extensions</groupId>
+ <version>${revision}</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <modelVersion>4.0.0</modelVersion>
+ <version>1.0.0-SNAPSHOT</version>
+ <artifactId>dubbo-remoting-redis</artifactId>
+
+ <packaging>jar</packaging>
+
+ <name>${project.artifactId}</name>
+ <description>The redis remoting module of dubbo project</description>
+
+ <properties>
+ <skip_maven_deploy>false</skip_maven_deploy>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.dubbo</groupId>
+ <artifactId>dubbo-remoting-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.dubbo</groupId>
+ <artifactId>dubbo-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>redis.clients</groupId>
+ <artifactId>jedis</artifactId>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/dubbo-remoting-extensions/dubbo-remoting-redis/src/main/java/org/apache/dubbo/remoting/redis/RedisClient.java b/dubbo-remoting-extensions/dubbo-remoting-redis/src/main/java/org/apache/dubbo/remoting/redis/RedisClient.java
new file mode 100644
index 0000000..7b2d7b8
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-redis/src/main/java/org/apache/dubbo/remoting/redis/RedisClient.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.redis;
+
+import redis.clients.jedis.JedisPubSub;
+
+import java.util.Map;
+import java.util.Set;
+
+public interface RedisClient {
+ Long hset(String key, String field, String value);
+
+ Long publish(String channel, String message);
+
+// void clean(String pattern);
+
+ boolean isConnected();
+
+ void destroy();
+
+ Long hdel(final String key, final String... fields);
+
+ Set<String> scan(String pattern);
+
+ Map<String, String> hgetAll(String key);
+
+ void psubscribe(final JedisPubSub jedisPubSub, final String... patterns);
+
+ void disconnect();
+
+ void close();
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-redis/src/main/java/org/apache/dubbo/remoting/redis/jedis/ClusterRedisClient.java b/dubbo-remoting-extensions/dubbo-remoting-redis/src/main/java/org/apache/dubbo/remoting/redis/jedis/ClusterRedisClient.java
new file mode 100644
index 0000000..e0e4c14
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-redis/src/main/java/org/apache/dubbo/remoting/redis/jedis/ClusterRedisClient.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.redis.jedis;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.StringUtils;
+import org.apache.dubbo.remoting.redis.RedisClient;
+import org.apache.dubbo.remoting.redis.support.AbstractRedisClient;
+
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisPubSub;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import static org.apache.dubbo.common.constants.CommonConstants.COMMA_SPLIT_PATTERN;
+
+public class ClusterRedisClient extends AbstractRedisClient implements RedisClient {
+ private static final Logger logger = LoggerFactory.getLogger(ClusterRedisClient.class);
+
+ private static final int DEFAULT_TIMEOUT = 2000;
+
+ private static final int DEFAULT_SO_TIMEOUT = 2000;
+
+ private static final int DEFAULT_MAX_ATTEMPTS = 5;
+
+ private JedisCluster jedisCluster;
+ private Pattern COLON_SPLIT_PATTERN = Pattern.compile("\\s*[:]+\\s*");
+
+ public ClusterRedisClient(URL url) {
+ super(url);
+ Set<HostAndPort> nodes = getNodes(url);
+ jedisCluster = new JedisCluster(nodes, url.getParameter("connection.timeout", DEFAULT_TIMEOUT),
+ url.getParameter("so.timeout", DEFAULT_SO_TIMEOUT), url.getParameter("max.attempts", DEFAULT_MAX_ATTEMPTS),
+ url.getPassword(), getConfig());
+ }
+
+ @Override
+ public Long hset(String key, String field, String value) {
+ return jedisCluster.hset(key, field, value);
+ }
+
+ @Override
+ public Long publish(String channel, String message) {
+ return jedisCluster.publish(channel, message);
+ }
+
+ @Override
+ public boolean isConnected() {
+ Map<String, JedisPool> poolMap = jedisCluster.getClusterNodes();
+ for (JedisPool jedisPool : poolMap.values()) {
+ Jedis jedis = jedisPool.getResource();
+ if (jedis.isConnected()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public void destroy() {
+ jedisCluster.close();
+ }
+
+ @Override
+ public Long hdel(String key, String... fields) {
+ return jedisCluster.hdel(key, fields);
+ }
+
+ @Override
+ public Set<String> scan(String pattern) {
+ Map<String, JedisPool> nodes = jedisCluster.getClusterNodes();
+ Set<String> result = new HashSet<>();
+ for (JedisPool jedisPool : nodes.values()) {
+ Jedis jedis = jedisPool.getResource();
+ result.addAll(scan(jedis, pattern));
+ jedis.close();
+ }
+ return result;
+ }
+
+ @Override
+ public Map<String, String> hgetAll(String key) {
+ return jedisCluster.hgetAll(key);
+ }
+
+ @Override
+ public void psubscribe(JedisPubSub jedisPubSub, String... patterns) {
+ jedisCluster.psubscribe(jedisPubSub, patterns);
+ }
+
+ @Override
+ public void disconnect() {
+ jedisCluster.close();
+ }
+
+ @Override
+ public void close() {
+ jedisCluster.close();
+ }
+
+ private Set<HostAndPort> getNodes(URL url) {
+ Set<HostAndPort> hostAndPorts = new HashSet<>();
+ hostAndPorts.add(new HostAndPort(url.getHost(), url.getPort()));
+ String backupAddresses = url.getBackupAddress(6379);
+ String[] nodes = StringUtils.isEmpty(backupAddresses) ? new String[0] : COMMA_SPLIT_PATTERN.split(backupAddresses);
+ if (nodes.length > 0) {
+ for (String node : nodes) {
+ String[] hostAndPort = COLON_SPLIT_PATTERN.split(node);
+ hostAndPorts.add(new HostAndPort(hostAndPort[0], Integer.valueOf(hostAndPort[1])));
+ }
+ }
+ return hostAndPorts;
+ }
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-redis/src/main/java/org/apache/dubbo/remoting/redis/jedis/MonoRedisClient.java b/dubbo-remoting-extensions/dubbo-remoting-redis/src/main/java/org/apache/dubbo/remoting/redis/jedis/MonoRedisClient.java
new file mode 100644
index 0000000..864f18e
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-redis/src/main/java/org/apache/dubbo/remoting/redis/jedis/MonoRedisClient.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.redis.jedis;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.remoting.redis.RedisClient;
+import org.apache.dubbo.remoting.redis.support.AbstractRedisClient;
+
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisPubSub;
+
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_TIMEOUT;
+import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY;
+
+public class MonoRedisClient extends AbstractRedisClient implements RedisClient {
+ private static final Logger logger = LoggerFactory.getLogger(MonoRedisClient.class);
+
+ private static final String START_CURSOR = "0";
+
+ private JedisPool jedisPool;
+
+ public MonoRedisClient(URL url) {
+ super(url);
+ jedisPool = new JedisPool(getConfig(), url.getHost(), url.getPort(),
+ url.getParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT), url.getPassword());
+ }
+
+ @Override
+ public Long hset(String key, String field, String value) {
+ Jedis jedis = jedisPool.getResource();
+ Long result = jedis.hset(key, field, value);
+ jedis.close();
+ return result;
+ }
+
+ @Override
+ public Long publish(String channel, String message) {
+ Jedis jedis = jedisPool.getResource();
+ Long result = jedis.publish(channel, message);
+ jedis.close();
+ return result;
+ }
+
+ @Override
+ public boolean isConnected() {
+ Jedis jedis = jedisPool.getResource();
+ boolean connected = jedis.isConnected();
+ jedis.close();
+ return connected;
+ }
+
+ @Override
+ public void destroy() {
+ jedisPool.close();
+ }
+
+ @Override
+ public Long hdel(String key, String... fields) {
+ Jedis jedis = jedisPool.getResource();
+ Long result = jedis.hdel(key, fields);
+ jedis.close();
+ return result;
+ }
+
+ @Override
+ public Set<String> scan(String pattern) {
+ Jedis jedis = jedisPool.getResource();
+ Set<String> result = super.scan(jedis, pattern);
+ jedis.close();
+ return result;
+ }
+
+ @Override
+ public Map<String, String> hgetAll(String key) {
+ Jedis jedis = jedisPool.getResource();
+ Map<String, String> result = jedis.hgetAll(key);
+ jedis.close();
+ return result;
+ }
+
+ @Override
+ public void psubscribe(JedisPubSub jedisPubSub, String... patterns) {
+ Jedis jedis = jedisPool.getResource();
+ jedis.psubscribe(jedisPubSub, patterns);
+ jedis.close();
+ }
+
+ @Override
+ public void disconnect() {
+ jedisPool.close();
+ }
+
+ @Override
+ public void close() {
+ jedisPool.close();
+ }
+
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-redis/src/main/java/org/apache/dubbo/remoting/redis/jedis/SentinelRedisClient.java b/dubbo-remoting-extensions/dubbo-remoting-redis/src/main/java/org/apache/dubbo/remoting/redis/jedis/SentinelRedisClient.java
new file mode 100644
index 0000000..6a10bf9
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-redis/src/main/java/org/apache/dubbo/remoting/redis/jedis/SentinelRedisClient.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.redis.jedis;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.constants.RemotingConstants;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.remoting.redis.RedisClient;
+import org.apache.dubbo.remoting.redis.support.AbstractRedisClient;
+
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisPubSub;
+import redis.clients.jedis.JedisSentinelPool;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+public class SentinelRedisClient extends AbstractRedisClient implements RedisClient {
+ private static final Logger logger = LoggerFactory.getLogger(SentinelRedisClient.class);
+
+ private JedisSentinelPool sentinelPool;
+
+ public SentinelRedisClient(URL url) {
+ super(url);
+ String masterName = url.getParameter("master.name", "Sentinel-master");
+ String address = (new StringBuilder()).append(url.getAddress()).toString();
+ String[] backupAddresses = url.getParameter(RemotingConstants.BACKUP_KEY, new String[0]);
+ if (backupAddresses.length == 0) {
+ throw new IllegalStateException("Sentinel addresses can not be empty");
+ }
+ Set<String> sentinels = new HashSet<>(Arrays.asList(backupAddresses));
+ sentinels.add(address);
+ sentinelPool = new JedisSentinelPool(masterName, sentinels, getConfig(), url.getPassword());
+ }
+
+ @Override
+ public Long hset(String key, String field, String value) {
+ Jedis jedis = sentinelPool.getResource();
+ Long result = jedis.hset(key, field, value);
+ jedis.close();
+ return result;
+ }
+
+ @Override
+ public Long publish(String channel, String message) {
+ Jedis jedis = sentinelPool.getResource();
+ Long result = jedis.publish(channel, message);
+ jedis.close();
+ return result;
+ }
+
+ @Override
+ public boolean isConnected() {
+ Jedis jedis = sentinelPool.getResource();
+ boolean result = jedis.isConnected();
+ jedis.close();
+ return result;
+ }
+
+ @Override
+ public void destroy() {
+ sentinelPool.close();
+ }
+
+ @Override
+ public Long hdel(String key, String... fields) {
+ Jedis jedis = sentinelPool.getResource();
+ Long result = jedis.hdel(key, fields);
+ jedis.close();
+ return result;
+ }
+
+ @Override
+ public Set<String> scan(String pattern) {
+ Jedis jedis = sentinelPool.getResource();
+ Set<String> result = scan(jedis, pattern);
+ jedis.close();
+ return result;
+ }
+
+ @Override
+ public Map<String, String> hgetAll(String key) {
+ Jedis jedis = sentinelPool.getResource();
+ Map<String, String> result = jedis.hgetAll(key);
+ jedis.close();
+ return result;
+ }
+
+ @Override
+ public void psubscribe(JedisPubSub jedisPubSub, String... patterns) {
+ Jedis jedis = sentinelPool.getResource();
+ jedis.psubscribe(jedisPubSub, patterns);
+ jedis.close();
+ }
+
+ @Override
+ public void disconnect() {
+ sentinelPool.close();
+ }
+
+ @Override
+ public void close() {
+ sentinelPool.close();
+ }
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-redis/src/main/java/org/apache/dubbo/remoting/redis/support/AbstractRedisClient.java b/dubbo-remoting-extensions/dubbo-remoting-redis/src/main/java/org/apache/dubbo/remoting/redis/support/AbstractRedisClient.java
new file mode 100644
index 0000000..4af640e
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-redis/src/main/java/org/apache/dubbo/remoting/redis/support/AbstractRedisClient.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.redis.support;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.utils.CollectionUtils;
+import org.apache.dubbo.remoting.redis.RedisClient;
+
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisPoolConfig;
+import redis.clients.jedis.ScanParams;
+import redis.clients.jedis.ScanResult;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public abstract class AbstractRedisClient implements RedisClient {
+ private URL url;
+
+ private JedisPoolConfig config;
+
+ public AbstractRedisClient(URL url) {
+ this.url = url;
+ config = new JedisPoolConfig();
+ config.setTestOnBorrow(url.getParameter("test.on.borrow", true));
+ config.setTestOnReturn(url.getParameter("test.on.return", false));
+ config.setTestWhileIdle(url.getParameter("test.while.idle", false));
+ if (url.getParameter("max.idle", 0) > 0) {
+ config.setMaxIdle(url.getParameter("max.idle", 0));
+ }
+ if (url.getParameter("min.idle", 0) > 0) {
+ config.setMinIdle(url.getParameter("min.idle", 0));
+ }
+ if (url.getParameter("max.active", 0) > 0) {
+ config.setMaxTotal(url.getParameter("max.active", 0));
+ }
+ if (url.getParameter("max.total", 0) > 0) {
+ config.setMaxTotal(url.getParameter("max.total", 0));
+ }
+ if (url.getParameter("max.wait", url.getParameter("timeout", 0)) > 0) {
+ config.setMaxWaitMillis(url.getParameter("max.wait", url.getParameter("timeout", 0)));
+ }
+ if (url.getParameter("num.tests.per.eviction.run", 0) > 0) {
+ config.setNumTestsPerEvictionRun(url.getParameter("num.tests.per.eviction.run", 0));
+ }
+ if (url.getParameter("time.between.eviction.runs.millis", 0) > 0) {
+ config.setTimeBetweenEvictionRunsMillis(url.getParameter("time.between.eviction.runs.millis", 0));
+ }
+ if (url.getParameter("min.evictable.idle.time.millis", 0) > 0) {
+ config.setMinEvictableIdleTimeMillis(url.getParameter("min.evictable.idle.time.millis", 0));
+ }
+ }
+
+ protected Set<String> scan(Jedis jedis, String pattern) {
+ Set<String> result = new HashSet<>();
+ String cursor = ScanParams.SCAN_POINTER_START;
+ ScanParams params = new ScanParams();
+ params.match(pattern);
+ while (true) {
+ ScanResult<String> scanResult = jedis.scan(cursor, params);
+ List<String> list = scanResult.getResult();
+ if (CollectionUtils.isNotEmpty(list)) {
+ result.addAll(list);
+ }
+ if (ScanParams.SCAN_POINTER_START.equals(scanResult.getCursor())) {
+ break;
+ }
+ cursor = scanResult.getCursor();
+ }
+ return result;
+ }
+
+ public URL getUrl() {
+ return url;
+ }
+
+ public JedisPoolConfig getConfig() {
+ return config;
+ }
+}
diff --git a/dubbo-remoting-extensions/pom.xml b/dubbo-remoting-extensions/pom.xml
index 6a6b72a..9b52ee1 100644
--- a/dubbo-remoting-extensions/pom.xml
+++ b/dubbo-remoting-extensions/pom.xml
@@ -32,6 +32,11 @@
<modules>
<module>dubbo-remoting-quic</module>
+ <module>dubbo-remoting-etcd3</module>
+ <module>dubbo-remoting-grizzly</module>
+ <module>dubbo-remoting-mina</module>
+ <module>dubbo-remoting-p2p</module>
+ <module>dubbo-remoting-redis</module>
</modules>