You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by al...@apache.org on 2021/09/28 09:50:47 UTC
[dubbo-spi-extensions] branch master updated: Add quic protocol
support (#71)
This is an automated email from the ASF dual-hosted git repository.
albumenj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dubbo-spi-extensions.git
The following commit(s) were added to refs/heads/master by this push:
new 59b4413 Add quic protocol support (#71)
59b4413 is described below
commit 59b4413efd958035b9388e11d0369120c9e5cdf8
Author: 张志勇 <go...@163.com>
AuthorDate: Tue Sep 28 17:50:42 2021 +0800
Add quic protocol support (#71)
---
.../{ => dubbo-remoting-quic}/pom.xml | 30 +-
.../remoting/transport/quic/NettyCodecAdapter.java | 101 +++++
.../quic/QuicNettyBackedChannelBuffer.java | 448 +++++++++++++++++++++
.../remoting/transport/quic/QuicNettyChannel.java | 304 ++++++++++++++
.../remoting/transport/quic/QuicNettyClient.java | 199 +++++++++
.../transport/quic/QuicNettyClientHandler.java | 157 ++++++++
.../transport/quic/QuicNettyEventLoopFactory.java | 56 +++
.../remoting/transport/quic/QuicNettyServer.java | 198 +++++++++
.../transport/quic/QuicNettyServerHandler.java | 134 ++++++
.../transport/quic/QuicNettyTransporter.java | 40 ++
.../internal/org.apache.dubbo.remoting.Transporter | 1 +
dubbo-remoting-extensions/pom.xml | 4 +
12 files changed, 1670 insertions(+), 2 deletions(-)
diff --git a/dubbo-remoting-extensions/pom.xml b/dubbo-remoting-extensions/dubbo-remoting-quic/pom.xml
similarity index 64%
copy from dubbo-remoting-extensions/pom.xml
copy to dubbo-remoting-extensions/dubbo-remoting-quic/pom.xml
index 805d66d..3658f59 100644
--- a/dubbo-remoting-extensions/pom.xml
+++ b/dubbo-remoting-extensions/dubbo-remoting-quic/pom.xml
@@ -19,14 +19,40 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
+ <artifactId>dubbo-remoting-extensions</artifactId>
<groupId>org.apache.dubbo.extensions</groupId>
- <artifactId>extensions-parent</artifactId>
<version>${revision}</version>
<relativePath>../pom.xml</relativePath>
</parent>
+
+
+
<modelVersion>4.0.0</modelVersion>
- <artifactId>dubbo-remoting-extensions</artifactId>
+ <artifactId>dubbo-remoting-quic</artifactId>
+
+
+ <dependencies>
+
+ <dependency>
+ <groupId>io.netty.incubator</groupId>
+ <artifactId>netty-incubator-codec-quic</artifactId>
+ <version>0.0.14.Final</version>
+ <classifier>${os.detected.classifier}</classifier>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.dubbo</groupId>
+ <artifactId>dubbo-remoting-api</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-all</artifactId>
+ </dependency>
+
+
+ </dependencies>
</project>
diff --git a/dubbo-remoting-extensions/dubbo-remoting-quic/src/main/java/org/apache/dubbo/remoting/transport/quic/NettyCodecAdapter.java b/dubbo-remoting-extensions/dubbo-remoting-quic/src/main/java/org/apache/dubbo/remoting/transport/quic/NettyCodecAdapter.java
new file mode 100644
index 0000000..6f17235
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-quic/src/main/java/org/apache/dubbo/remoting/transport/quic/NettyCodecAdapter.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.transport.quic;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.remoting.Codec2;
+import org.apache.dubbo.remoting.buffer.ChannelBuffer;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import io.netty.handler.codec.MessageToByteEncoder;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * NettyCodecAdapter.
+ */
+final public class NettyCodecAdapter {
+
+ private final ChannelHandler encoder = new InternalEncoder();
+
+ private final ChannelHandler decoder = new InternalDecoder();
+
+ private final Codec2 codec;
+
+ private final URL url;
+
+ private final org.apache.dubbo.remoting.ChannelHandler handler;
+
+ public NettyCodecAdapter(Codec2 codec, URL url, org.apache.dubbo.remoting.ChannelHandler handler) {
+ this.codec = codec;
+ this.url = url;
+ this.handler = handler;
+ }
+
+ public ChannelHandler getEncoder() {
+ return encoder;
+ }
+
+ public ChannelHandler getDecoder() {
+ return decoder;
+ }
+
+ private class InternalEncoder extends MessageToByteEncoder {
+
+ @Override
+ protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
+ ChannelBuffer buffer = new QuicNettyBackedChannelBuffer(out);
+ Channel ch = ctx.channel();
+ QuicNettyChannel channel = QuicNettyChannel.getOrAddChannel(ch, url, handler);
+ codec.encode(channel, buffer, msg);
+ }
+ }
+
+ private class InternalDecoder extends ByteToMessageDecoder {
+
+ @Override
+ protected void decode(ChannelHandlerContext ctx, ByteBuf input, List<Object> out) throws Exception {
+
+ ChannelBuffer message = new QuicNettyBackedChannelBuffer(input);
+
+ QuicNettyChannel channel = QuicNettyChannel.getOrAddChannel(ctx.channel(), url, handler);
+
+ // decode object.
+ do {
+ int saveReaderIndex = message.readerIndex();
+ Object msg = codec.decode(channel, message);
+ if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
+ message.readerIndex(saveReaderIndex);
+ break;
+ } else {
+ //is it possible to go here ?
+ if (saveReaderIndex == message.readerIndex()) {
+ throw new IOException("Decode without read data.");
+ }
+ if (msg != null) {
+ out.add(msg);
+ }
+ }
+ } while (message.readable());
+ }
+ }
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-quic/src/main/java/org/apache/dubbo/remoting/transport/quic/QuicNettyBackedChannelBuffer.java b/dubbo-remoting-extensions/dubbo-remoting-quic/src/main/java/org/apache/dubbo/remoting/transport/quic/QuicNettyBackedChannelBuffer.java
new file mode 100644
index 0000000..1249d08
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-quic/src/main/java/org/apache/dubbo/remoting/transport/quic/QuicNettyBackedChannelBuffer.java
@@ -0,0 +1,448 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.transport.quic;
+
+import org.apache.dubbo.common.utils.Assert;
+import org.apache.dubbo.remoting.buffer.ChannelBuffer;
+import org.apache.dubbo.remoting.buffer.ChannelBufferFactory;
+import org.apache.dubbo.remoting.buffer.ChannelBuffers;
+
+import io.netty.buffer.ByteBuf;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+public class QuicNettyBackedChannelBuffer implements ChannelBuffer {
+
+ private ByteBuf buffer;
+
+ public QuicNettyBackedChannelBuffer(ByteBuf buffer) {
+ Assert.notNull(buffer, "buffer == null");
+ this.buffer = buffer;
+ }
+
+
+ @Override
+ public int capacity() {
+ return buffer.capacity();
+ }
+
+
+ @Override
+ public ChannelBuffer copy(int index, int length) {
+ return new QuicNettyBackedChannelBuffer(buffer.copy(index, length));
+ }
+
+ //has nothing use
+ @Override
+ public ChannelBufferFactory factory() {
+ return null;
+ }
+
+
+ @Override
+ public byte getByte(int index) {
+ return buffer.getByte(index);
+ }
+
+
+ @Override
+ public void getBytes(int index, byte[] dst, int dstIndex, int length) {
+ buffer.getBytes(index, dst, dstIndex, length);
+ }
+
+
+ @Override
+ public void getBytes(int index, ByteBuffer dst) {
+ buffer.getBytes(index, dst);
+ }
+
+
+ @Override
+ public void getBytes(int index, ChannelBuffer dst, int dstIndex, int length) {
+ // careful
+ byte[] data = new byte[length];
+ buffer.getBytes(index, data, 0, length);
+ dst.setBytes(dstIndex, data, 0, length);
+ }
+
+
+ @Override
+ public void getBytes(int index, OutputStream dst, int length) throws IOException {
+ buffer.getBytes(index, dst, length);
+ }
+
+
+ @Override
+ public boolean isDirect() {
+ return buffer.isDirect();
+ }
+
+
+ @Override
+ public void setByte(int index, int value) {
+ buffer.setByte(index, value);
+ }
+
+
+ @Override
+ public void setBytes(int index, byte[] src, int srcIndex, int length) {
+ buffer.setBytes(index, src, srcIndex, length);
+ }
+
+
+ @Override
+ public void setBytes(int index, ByteBuffer src) {
+ buffer.setBytes(index, src);
+ }
+
+
+ @Override
+ public void setBytes(int index, ChannelBuffer src, int srcIndex, int length) {
+ // careful
+ byte[] data = new byte[length];
+ buffer.getBytes(srcIndex, data, 0, length);
+ setBytes(index, data, 0, length);
+ }
+
+
+ @Override
+ public int setBytes(int index, InputStream src, int length) throws IOException {
+ return buffer.setBytes(index, src, length);
+ }
+
+
+ @Override
+ public ByteBuffer toByteBuffer(int index, int length) {
+ return buffer.nioBuffer(index, length);
+ }
+
+
+ @Override
+ public byte[] array() {
+ return buffer.array();
+ }
+
+
+ @Override
+ public boolean hasArray() {
+ return buffer.hasArray();
+ }
+
+
+ @Override
+ public int arrayOffset() {
+ return buffer.arrayOffset();
+ }
+
+
+ // AbstractChannelBuffer
+
+
+
+ @Override
+ public void clear() {
+ buffer.clear();
+ }
+
+
+ @Override
+ public ChannelBuffer copy() {
+ return new QuicNettyBackedChannelBuffer(buffer.copy());
+ }
+
+
+ @Override
+ public void discardReadBytes() {
+ buffer.discardReadBytes();
+ }
+
+
+ @Override
+ public void ensureWritableBytes(int writableBytes) {
+ buffer.ensureWritable(writableBytes);
+ }
+
+
+ @Override
+ public void getBytes(int index, byte[] dst) {
+ buffer.getBytes(index, dst);
+ }
+
+
+ @Override
+ public void getBytes(int index, ChannelBuffer dst) {
+ // careful
+ getBytes(index, dst, dst.writableBytes());
+ }
+
+
+ @Override
+ public void getBytes(int index, ChannelBuffer dst, int length) {
+ // careful
+ if (length > dst.writableBytes()) {
+ throw new IndexOutOfBoundsException();
+ }
+ getBytes(index, dst, dst.writerIndex(), length);
+ dst.writerIndex(dst.writerIndex() + length);
+ }
+
+
+ @Override
+ public void markReaderIndex() {
+ buffer.markReaderIndex();
+ }
+
+
+ @Override
+ public void markWriterIndex() {
+ buffer.markWriterIndex();
+ }
+
+
+ @Override
+ public boolean readable() {
+ return buffer.isReadable();
+ }
+
+
+ @Override
+ public int readableBytes() {
+ return buffer.readableBytes();
+ }
+
+
+ @Override
+ public byte readByte() {
+ return buffer.readByte();
+ }
+
+
+ @Override
+ public void readBytes(byte[] dst) {
+ buffer.readBytes(dst);
+ }
+
+
+ @Override
+ public void readBytes(byte[] dst, int dstIndex, int length) {
+ buffer.readBytes(dst, dstIndex, length);
+ }
+
+
+ @Override
+ public void readBytes(ByteBuffer dst) {
+ buffer.readBytes(dst);
+ }
+
+
+ @Override
+ public void readBytes(ChannelBuffer dst) {
+ // careful
+ readBytes(dst, dst.writableBytes());
+ }
+
+
+ @Override
+ public void readBytes(ChannelBuffer dst, int length) {
+ // careful
+ if (length > dst.writableBytes()) {
+ throw new IndexOutOfBoundsException();
+ }
+ readBytes(dst, dst.writerIndex(), length);
+ dst.writerIndex(dst.writerIndex() + length);
+ }
+
+
+ @Override
+ public void readBytes(ChannelBuffer dst, int dstIndex, int length) {
+ // careful
+ if (readableBytes() < length) {
+ throw new IndexOutOfBoundsException();
+ }
+ byte[] data = new byte[length];
+ buffer.readBytes(data, 0, length);
+ dst.setBytes(dstIndex, data, 0, length);
+ }
+
+
+ @Override
+ public ChannelBuffer readBytes(int length) {
+ return new QuicNettyBackedChannelBuffer(buffer.readBytes(length));
+ }
+
+
+ @Override
+ public void resetReaderIndex() {
+ buffer.resetReaderIndex();
+ }
+
+
+ @Override
+ public void resetWriterIndex() {
+ buffer.resetWriterIndex();
+ }
+
+
+ @Override
+ public int readerIndex() {
+ return buffer.readerIndex();
+ }
+
+
+ @Override
+ public void readerIndex(int readerIndex) {
+ buffer.readerIndex(readerIndex);
+ }
+
+
+ @Override
+ public void readBytes(OutputStream dst, int length) throws IOException {
+ buffer.readBytes(dst, length);
+ }
+
+
+ @Override
+ public void setBytes(int index, byte[] src) {
+ buffer.setBytes(index, src);
+ }
+
+
+ @Override
+ public void setBytes(int index, ChannelBuffer src) {
+ // careful
+ setBytes(index, src, src.readableBytes());
+ }
+
+
+ @Override
+ public void setBytes(int index, ChannelBuffer src, int length) {
+ // careful
+ if (length > src.readableBytes()) {
+ throw new IndexOutOfBoundsException();
+ }
+ setBytes(index, src, src.readerIndex(), length);
+ src.readerIndex(src.readerIndex() + length);
+ }
+
+
+ @Override
+ public void setIndex(int readerIndex, int writerIndex) {
+ buffer.setIndex(readerIndex, writerIndex);
+ }
+
+
+ @Override
+ public void skipBytes(int length) {
+ buffer.skipBytes(length);
+ }
+
+
+ @Override
+ public ByteBuffer toByteBuffer() {
+ return buffer.nioBuffer();
+ }
+
+
+ @Override
+ public boolean writable() {
+ return buffer.isWritable();
+ }
+
+
+ @Override
+ public int writableBytes() {
+ return buffer.writableBytes();
+ }
+
+
+ @Override
+ public void writeByte(int value) {
+ buffer.writeByte(value);
+ }
+
+
+ @Override
+ public void writeBytes(byte[] src) {
+ buffer.writeBytes(src);
+ }
+
+
+ @Override
+ public void writeBytes(byte[] src, int index, int length) {
+ buffer.writeBytes(src, index, length);
+ }
+
+
+ @Override
+ public void writeBytes(ByteBuffer src) {
+ buffer.writeBytes(src);
+ }
+
+
+ @Override
+ public void writeBytes(ChannelBuffer src) {
+ // careful
+ writeBytes(src, src.readableBytes());
+ }
+
+
+ @Override
+ public void writeBytes(ChannelBuffer src, int length) {
+ // careful
+ if (length > src.readableBytes()) {
+ throw new IndexOutOfBoundsException();
+ }
+ writeBytes(src, src.readerIndex(), length);
+ src.readerIndex(src.readerIndex() + length);
+ }
+
+
+ @Override
+ public void writeBytes(ChannelBuffer src, int srcIndex, int length) {
+ // careful
+ byte[] data = new byte[length];
+ src.getBytes(srcIndex, data, 0, length);
+ writeBytes(data, 0, length);
+ }
+
+
+ @Override
+ public int writeBytes(InputStream src, int length) throws IOException {
+ return buffer.writeBytes(src, length);
+ }
+
+
+ @Override
+ public int writerIndex() {
+ return buffer.writerIndex();
+ }
+
+
+ @Override
+ public void writerIndex(int writerIndex) {
+ buffer.writerIndex(writerIndex);
+ }
+
+
+ @Override
+ public int compareTo(ChannelBuffer o) {
+ return ChannelBuffers.compare(this, o);
+ }
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-quic/src/main/java/org/apache/dubbo/remoting/transport/quic/QuicNettyChannel.java b/dubbo-remoting-extensions/dubbo-remoting-quic/src/main/java/org/apache/dubbo/remoting/transport/quic/QuicNettyChannel.java
new file mode 100644
index 0000000..2012c63
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-quic/src/main/java/org/apache/dubbo/remoting/transport/quic/QuicNettyChannel.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.transport.quic;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.remoting.ChannelHandler;
+import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.transport.AbstractChannel;
+import org.apache.dubbo.remoting.utils.PayloadDropper;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.incubator.codec.quic.QuicChannel;
+import io.netty.incubator.codec.quic.QuicStreamChannel;
+
+import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_TIMEOUT;
+import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY;
+
+/**
+ * NettyChannel maintains the cache of channel.
+ */
+final class QuicNettyChannel extends AbstractChannel {
+
+ private static final Logger logger = LoggerFactory.getLogger(QuicNettyChannel.class);
+ /**
+ * the cache for netty channel and dubbo channel
+ */
+ private static final ConcurrentMap<Channel, QuicNettyChannel> CHANNEL_MAP = new ConcurrentHashMap<Channel, QuicNettyChannel>();
+ /**
+ * netty channel
+ */
+ private final Channel channel;
+
+
+ private final Map<String, Object> attributes = new ConcurrentHashMap<String, Object>();
+
+ private final AtomicBoolean active = new AtomicBoolean(false);
+
+ /**
+ * The constructor of NettyChannel.
+ * It is private so NettyChannel usually create by {@link QuicNettyChannel#getOrAddChannel(Channel, URL, ChannelHandler)}
+ *
+ * @param channel netty channel
+ * @param url
+ * @param handler dubbo handler that contain netty handler
+ */
+ private QuicNettyChannel(Channel channel, URL url, ChannelHandler handler) {
+ super(url, handler);
+ if (channel == null) {
+ throw new IllegalArgumentException("netty channel == null;");
+ }
+ this.channel = channel;
+ }
+
+ /**
+ * Get dubbo channel by netty channel through channel cache.
+ * Put netty channel into it if dubbo channel don't exist in the cache.
+ *
+ * @param ch netty channel
+ * @param url
+ * @param handler dubbo handler that contain netty's handler
+ * @return
+ */
+ static QuicNettyChannel getOrAddChannel(Channel ch, URL url, ChannelHandler handler) {
+ if (ch == null) {
+ return null;
+ }
+
+ //FIXME
+ InetSocketAddress v = null;
+ if (ch instanceof QuicStreamChannel) {
+ QuicStreamChannel qsc = (QuicStreamChannel) ch;
+ QuicChannel qqc = qsc.parent();
+ try {
+ Field f = qqc.getClass().getDeclaredField("remote");
+ f.setAccessible(true);
+ v = (InetSocketAddress) f.get(qqc);
+ } catch (NoSuchFieldException | IllegalAccessException e) {
+ logger.error(e.getMessage());
+ }
+ }
+
+ QuicNettyChannel ret = CHANNEL_MAP.get(ch);
+ if (ret == null) {
+ QuicNettyChannel nettyChannel = new QuicNettyChannel(ch, url, handler);
+ nettyChannel.setRemoteAddress(v);
+ if (ch.isActive()) {
+ nettyChannel.markActive(true);
+ ret = CHANNEL_MAP.putIfAbsent(ch, nettyChannel);
+ }
+ if (ret == null) {
+ ret = nettyChannel;
+ }
+ }
+ return ret;
+ }
+
+ /**
+ * Remove the inactive channel.
+ *
+ * @param ch netty channel
+ */
+ static void removeChannelIfDisconnected(Channel ch) {
+ if (ch != null && !ch.isActive()) {
+ QuicNettyChannel nettyChannel = CHANNEL_MAP.remove(ch);
+ if (nettyChannel != null) {
+ nettyChannel.markActive(false);
+ }
+ }
+ }
+
+ static void removeChannel(Channel ch) {
+ if (ch != null) {
+ QuicNettyChannel nettyChannel = CHANNEL_MAP.remove(ch);
+ if (nettyChannel != null) {
+ nettyChannel.markActive(false);
+ }
+ }
+ }
+
+ @Override
+ public InetSocketAddress getLocalAddress() {
+ int port = this.getUrl().getPort();
+ InetSocketAddress res = new InetSocketAddress(port);
+ return res;
+ }
+
+ @Override
+ public InetSocketAddress getRemoteAddress() {
+ return this.remoteAddress;
+ }
+
+
+ private InetSocketAddress remoteAddress;
+
+ public void setRemoteAddress(InetSocketAddress remoteAddress) {
+ this.remoteAddress = remoteAddress;
+ }
+
+ @Override
+ public boolean isConnected() {
+ return !isClosed() && active.get();
+ }
+
+ public boolean isActive() {
+ return active.get();
+ }
+
+ public void markActive(boolean isActive) {
+ active.set(isActive);
+ }
+
+ /**
+ * Send message by netty and whether to wait the completion of the send.
+ *
+ * @param message message that need send.
+ * @param sent whether to ack async-sent
+ * @throws RemotingException throw RemotingException if wait until timeout or any exception thrown by method body that surrounded by try-catch.
+ */
+ @Override
+ public void send(Object message, boolean sent) throws RemotingException {
+ // whether the channel is closed
+ super.send(message, sent);
+
+ boolean success = true;
+ int timeout = 0;
+ try {
+ ChannelFuture future = channel.writeAndFlush(message);
+ if (sent) {
+ // wait timeout ms
+ timeout = getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
+ success = future.await(timeout);
+ }
+ Throwable cause = future.cause();
+ if (cause != null) {
+ throw cause;
+ }
+ } catch (Throwable e) {
+ removeChannelIfDisconnected(channel);
+ throw new RemotingException(this, "Failed to send message " + PayloadDropper.getRequestWithoutData(message) + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
+ }
+ if (!success) {
+ throw new RemotingException(this, "Failed to send message " + PayloadDropper.getRequestWithoutData(message) + " to " + getRemoteAddress()
+ + "in timeout(" + timeout + "ms) limit");
+ }
+ }
+
+ @Override
+ public void close() {
+ try {
+ super.close();
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ }
+ try {
+ removeChannelIfDisconnected(channel);
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ }
+ try {
+ attributes.clear();
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ }
+ try {
+ if (logger.isInfoEnabled()) {
+ logger.info("Close netty channel " + channel);
+ }
+ channel.close();
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public boolean hasAttribute(String key) {
+ return attributes.containsKey(key);
+ }
+
+ @Override
+ public Object getAttribute(String key) {
+ return attributes.get(key);
+ }
+
+ @Override
+ public void setAttribute(String key, Object value) {
+ // The null value is unallowed in the ConcurrentHashMap.
+ if (value == null) {
+ attributes.remove(key);
+ } else {
+ attributes.put(key, value);
+ }
+ }
+
+ @Override
+ public void removeAttribute(String key) {
+ attributes.remove(key);
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((channel == null) ? 0 : channel.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+
+ if (obj instanceof QuicNettyClient) {
+ QuicNettyClient client = (QuicNettyClient) obj;
+ return channel.equals(client.getNettyChannel());
+ }
+
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ QuicNettyChannel other = (QuicNettyChannel) obj;
+ if (channel == null) {
+ if (other.channel != null) {
+ return false;
+ }
+ } else if (!channel.equals(other.channel)) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "NettyChannel [channel=" + channel + "]";
+ }
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-quic/src/main/java/org/apache/dubbo/remoting/transport/quic/QuicNettyClient.java b/dubbo-remoting-extensions/dubbo-remoting-quic/src/main/java/org/apache/dubbo/remoting/transport/quic/QuicNettyClient.java
new file mode 100644
index 0000000..d5ce4d6
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-quic/src/main/java/org/apache/dubbo/remoting/transport/quic/QuicNettyClient.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.transport.quic;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.remoting.ChannelHandler;
+import org.apache.dubbo.remoting.Constants;
+import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.transport.AbstractClient;
+import org.apache.dubbo.remoting.utils.UrlUtils;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.ChannelInputShutdownReadComplete;
+import io.netty.channel.socket.nio.NioDatagramChannel;
+import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
+import io.netty.handler.timeout.IdleStateHandler;
+import io.netty.incubator.codec.quic.QuicChannel;
+import io.netty.incubator.codec.quic.QuicClientCodecBuilder;
+import io.netty.incubator.codec.quic.QuicSslContext;
+import io.netty.incubator.codec.quic.QuicSslContextBuilder;
+import io.netty.incubator.codec.quic.QuicStreamChannel;
+import io.netty.incubator.codec.quic.QuicStreamType;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * NettyClient.
+ */
+public class QuicNettyClient extends AbstractClient {
+
+ private static final Logger logger = LoggerFactory.getLogger(QuicNettyClient.class);
+ /**
+ * netty client bootstrap
+ */
+ private static final EventLoopGroup EVENT_LOOP_GROUP = QuicNettyEventLoopFactory.eventLoopGroup(Constants.DEFAULT_IO_THREADS, "QuicNettyClientWorker");
+
+
+ private Bootstrap bootstrap;
+
+ /**
+ * current channel. Each successful invocation of {@link QuicNettyClient#doConnect()} will
+ * replace this with new channel and close old channel.
+ * <b>volatile, please copy reference to use.</b>
+ */
+ private volatile Channel qchannel;
+
+ private volatile Channel channel;
+
+ private volatile Channel schannel;
+
+ /**
+ * The constructor of NettyClient.
+ * It wil init and start netty.
+ */
+ public QuicNettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
+ // you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in CommonConstants.
+ // the handler will be wrapped: MultiMessageHandler->HeartbeatHandler->handler
+ super(url, wrapChannelHandler(url, handler));
+ }
+
+ /**
+ * Init bootstrap
+ *
+ * @throws Throwable
+ */
+ @Override
+ protected void doOpen() throws Throwable {
+ QuicSslContext context = QuicSslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).
+ applicationProtocols("http/0.9").build();
+ NioEventLoopGroup group = new NioEventLoopGroup(1);
+ io.netty.channel.ChannelHandler codec = new QuicClientCodecBuilder()
+ .sslContext(context)
+ .maxIdleTimeout(5000, TimeUnit.MILLISECONDS)
+ .initialMaxData(10000000)
+ .initialMaxStreamDataBidirectionalLocal(1000000)
+ .build();
+
+ Bootstrap bs = new Bootstrap();
+ qchannel = bs.group(group)
+ .channel(NioDatagramChannel.class)
+ .handler(codec)
+ .bind(0).sync().channel();
+ logger.info("quic client do open finish");
+ }
+
+ @Override
+ protected void doConnect() throws Throwable {
+ logger.info("quic client do connect");
+ final QuicNettyClientHandler nettyClientHandler = new QuicNettyClientHandler(getUrl(), this);
+ InetSocketAddress address = getConnectAddress();
+ logger.info("quic connect address:"+address);
+ QuicChannel quicChannel = QuicChannel.newBootstrap(qchannel)
+ .streamHandler(new ChannelInboundHandlerAdapter() {
+ @Override
+ public void channelActive(ChannelHandlerContext ctx) {
+ ctx.close();
+ }
+ })
+ .remoteAddress(address)
+ .connect()
+ .get(5, TimeUnit.SECONDS);
+
+ this.schannel = quicChannel.createStream(QuicStreamType.BIDIRECTIONAL,
+ new ChannelInitializer<QuicStreamChannel>() {
+ @Override
+ protected void initChannel(QuicStreamChannel quicStreamChannel) throws Exception {
+
+ NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), QuicNettyClient.this);
+ int heartbeatInterval = UrlUtils.getHeartbeat(getUrl());
+
+ quicStreamChannel.pipeline().addLast(
+ new ChannelInboundHandlerAdapter() {
+ @Override
+ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
+ if (evt == ChannelInputShutdownReadComplete.INSTANCE) {
+ ((QuicChannel) ctx.channel().parent()).close(true, 0,
+ ctx.alloc().directBuffer(16)
+ .writeBytes(new byte[]{'k', 't', 'h', 'x', 'b', 'y', 'e'}));
+ }
+ }
+ }
+ )
+ .addLast("decoder", adapter.getDecoder())
+ .addLast("encoder", adapter.getEncoder())
+ .addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, TimeUnit.MILLISECONDS))
+ .addLast("handler", nettyClientHandler);
+ ;
+ }
+ }
+ ).sync().getNow();
+ }
+
+ @Override
+ public boolean isConnected() {
+ if (this.schannel == null) {
+ return false;
+ }
+ return schannel.isActive();
+ }
+
+
+
+ @Override
+ protected void doDisConnect() throws Throwable {
+ try {
+ QuicNettyChannel.removeChannelIfDisconnected(channel);
+ } catch (Throwable t) {
+ logger.warn(t.getMessage());
+ }
+ }
+
+ @Override
+ protected void doClose() throws Throwable {
+ // can't shutdown nioEventLoopGroup because the method will be invoked when closing one channel but not a client,
+ // but when and how to close the nioEventLoopGroup ?
+ // nioEventLoopGroup.shutdownGracefully();
+ }
+
+ @Override
+ protected org.apache.dubbo.remoting.Channel getChannel() {
+ Channel c = this.schannel;
+ if (c == null) {
+ return null;
+ }
+ return QuicNettyChannel.getOrAddChannel(c, getUrl(), this);
+ }
+
+ Channel getNettyChannel() {
+ return schannel;
+ }
+
+ @Override
+ public boolean canHandleIdle() {
+ return true;
+ }
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-quic/src/main/java/org/apache/dubbo/remoting/transport/quic/QuicNettyClientHandler.java b/dubbo-remoting-extensions/dubbo-remoting-quic/src/main/java/org/apache/dubbo/remoting/transport/quic/QuicNettyClientHandler.java
new file mode 100644
index 0000000..8cc4659
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-quic/src/main/java/org/apache/dubbo/remoting/transport/quic/QuicNettyClientHandler.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.transport.quic;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.Version;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.StringUtils;
+import org.apache.dubbo.remoting.ChannelHandler;
+import org.apache.dubbo.remoting.exchange.Request;
+import org.apache.dubbo.remoting.exchange.Response;
+
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+import io.netty.handler.timeout.IdleStateEvent;
+
+import static org.apache.dubbo.common.constants.CommonConstants.HEARTBEAT_EVENT;
+
+/**
+ * NettyClientHandler
+ */
+@io.netty.channel.ChannelHandler.Sharable
+public class QuicNettyClientHandler extends ChannelDuplexHandler {
+ private static final Logger logger = LoggerFactory.getLogger(QuicNettyClientHandler.class);
+
+ private final URL url;
+
+ private final ChannelHandler handler;
+
+ public QuicNettyClientHandler(URL url, ChannelHandler handler) {
+ if (url == null) {
+ throw new IllegalArgumentException("url == null");
+ }
+ if (handler == null) {
+ throw new IllegalArgumentException("handler == null");
+ }
+ this.url = url;
+ this.handler = handler;
+ }
+
+ @Override
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
+ QuicNettyChannel channel = QuicNettyChannel.getOrAddChannel(ctx.channel(), url, handler);
+ handler.connected(channel);
+ if (logger.isInfoEnabled()) {
+ logger.info("The connection of " + channel.getLocalAddress() + " -> " + channel.getRemoteAddress() + " is established.");
+ }
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ QuicNettyChannel channel = QuicNettyChannel.getOrAddChannel(ctx.channel(), url, handler);
+ try {
+ handler.disconnected(channel);
+ } finally {
+ QuicNettyChannel.removeChannel(ctx.channel());
+ }
+
+ if (logger.isInfoEnabled()) {
+ logger.info("The connection of " + channel.getLocalAddress() + " -> " + channel.getRemoteAddress() + " is disconnected.");
+ }
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ QuicNettyChannel channel = QuicNettyChannel.getOrAddChannel(ctx.channel(), url, handler);
+ handler.received(channel, msg);
+ }
+
+ @Override
+ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
+ super.write(ctx, msg, promise);
+ final QuicNettyChannel channel = QuicNettyChannel.getOrAddChannel(ctx.channel(), url, handler);
+ final boolean isRequest = msg instanceof Request;
+
+ // We add listeners to make sure our out bound event is correct.
+ // If our out bound event has an error (in most cases the encoder fails),
+ // we need to have the request return directly instead of blocking the invoke process.
+ promise.addListener(future -> {
+ if (future.isSuccess()) {
+ // if our future is success, mark the future to sent.
+ handler.sent(channel, msg);
+ return;
+ }
+
+ Throwable t = future.cause();
+ if (t != null && isRequest) {
+ Request request = (Request) msg;
+ Response response = buildErrorResponse(request, t);
+ handler.received(channel, response);
+ }
+ });
+ }
+
+ @Override
+ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+ // send heartbeat when read idle.
+ if (evt instanceof IdleStateEvent) {
+ try {
+ QuicNettyChannel channel = QuicNettyChannel.getOrAddChannel(ctx.channel(), url, handler);
+ if (logger.isDebugEnabled()) {
+ logger.debug("IdleStateEvent triggered, send heartbeat to channel " + channel);
+ }
+ Request req = new Request();
+ req.setVersion(Version.getProtocolVersion());
+ req.setTwoWay(true);
+ req.setEvent(HEARTBEAT_EVENT);
+ channel.send(req);
+ } finally {
+ QuicNettyChannel.removeChannelIfDisconnected(ctx.channel());
+ }
+ } else {
+ super.userEventTriggered(ctx, evt);
+ }
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
+ throws Exception {
+ QuicNettyChannel channel = QuicNettyChannel.getOrAddChannel(ctx.channel(), url, handler);
+ try {
+ handler.caught(channel, cause);
+ } finally {
+ QuicNettyChannel.removeChannelIfDisconnected(ctx.channel());
+ }
+ }
+
+ /**
+ * build a bad request's response
+ *
+ * @param request the request
+ * @param t the throwable. In most cases, serialization fails.
+ * @return the response
+ */
+ private static Response buildErrorResponse(Request request, Throwable t) {
+ Response response = new Response(request.getId(), request.getVersion());
+ response.setStatus(Response.BAD_REQUEST);
+ response.setErrorMessage(StringUtils.toString(t));
+ return response;
+ }
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-quic/src/main/java/org/apache/dubbo/remoting/transport/quic/QuicNettyEventLoopFactory.java b/dubbo-remoting-extensions/dubbo-remoting-quic/src/main/java/org/apache/dubbo/remoting/transport/quic/QuicNettyEventLoopFactory.java
new file mode 100644
index 0000000..42f885b
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-quic/src/main/java/org/apache/dubbo/remoting/transport/quic/QuicNettyEventLoopFactory.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.transport.quic;
+
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.epoll.Epoll;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollServerSocketChannel;
+import io.netty.channel.epoll.EpollSocketChannel;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.ServerSocketChannel;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.util.concurrent.DefaultThreadFactory;
+
+import java.util.concurrent.ThreadFactory;
+
+public class QuicNettyEventLoopFactory {
+ public static EventLoopGroup eventLoopGroup(int threads, String threadFactoryName) {
+ ThreadFactory threadFactory = new DefaultThreadFactory(threadFactoryName, true);
+ return shouldEpoll() ? new EpollEventLoopGroup(threads, threadFactory) :
+ new NioEventLoopGroup(threads, threadFactory);
+ }
+
+ public static Class<? extends SocketChannel> socketChannelClass() {
+ return shouldEpoll() ? EpollSocketChannel.class : NioSocketChannel.class;
+ }
+
+ public static Class<? extends ServerSocketChannel> serverSocketChannelClass() {
+ return shouldEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class;
+ }
+
+ private static boolean shouldEpoll() {
+ if (Boolean.parseBoolean(System.getProperty("netty.epoll.enable", "false"))) {
+ String osName = System.getProperty("os.name");
+ return osName.toLowerCase().contains("linux") && Epoll.isAvailable();
+ }
+
+ return false;
+ }
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-quic/src/main/java/org/apache/dubbo/remoting/transport/quic/QuicNettyServer.java b/dubbo-remoting-extensions/dubbo-remoting-quic/src/main/java/org/apache/dubbo/remoting/transport/quic/QuicNettyServer.java
new file mode 100644
index 0000000..2810065
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-quic/src/main/java/org/apache/dubbo/remoting/transport/quic/QuicNettyServer.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.transport.quic;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.ExecutorUtil;
+import org.apache.dubbo.common.utils.NetUtils;
+import org.apache.dubbo.remoting.Channel;
+import org.apache.dubbo.remoting.ChannelHandler;
+import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.RemotingServer;
+import org.apache.dubbo.remoting.transport.AbstractServer;
+import org.apache.dubbo.remoting.transport.dispatcher.ChannelHandlers;
+import org.apache.dubbo.remoting.utils.UrlUtils;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.socket.nio.NioDatagramChannel;
+import io.netty.handler.ssl.util.SelfSignedCertificate;
+import io.netty.handler.timeout.IdleStateHandler;
+import io.netty.incubator.codec.quic.InsecureQuicTokenHandler;
+import io.netty.incubator.codec.quic.QuicServerCodecBuilder;
+import io.netty.incubator.codec.quic.QuicSslContext;
+import io.netty.incubator.codec.quic.QuicSslContextBuilder;
+import io.netty.incubator.codec.quic.QuicStreamChannel;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * NettyServer.
+ */
+public class QuicNettyServer extends AbstractServer implements RemotingServer {
+
+ private static final Logger logger = LoggerFactory.getLogger(QuicNettyServer.class);
+ /**
+ * the cache for alive worker channel.
+ * <ip:port, dubbo channel>
+ */
+ private Map<String, Channel> channels;
+ /**
+ * netty server bootstrap.
+ */
+ private Bootstrap bootstrap;
+ /**
+ * the boss channel that receive connections and dispatch these to worker channel.
+ */
+ private io.netty.channel.Channel channel;
+
+ private EventLoopGroup bossGroup;
+ private EventLoopGroup workerGroup;
+
+ public QuicNettyServer(URL url, ChannelHandler handler) throws RemotingException {
+ // you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in CommonConstants.
+ // the handler will be wrapped: MultiMessageHandler->HeartbeatHandler->handler
+ super(ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME), ChannelHandlers.wrap(handler, url));
+ }
+
+ /**
+ * Init and start netty server
+ *
+ * @throws Throwable
+ */
+ @Override
+ protected void doOpen() throws Throwable {
+
+ bossGroup = QuicNettyEventLoopFactory.eventLoopGroup(1, "QuicNettyServerBoss");
+
+ final QuicNettyServerHandler nettyServerHandler = new QuicNettyServerHandler(getUrl(), this);
+ channels = nettyServerHandler.getChannels();
+
+
+ SelfSignedCertificate selfSignedCertificate = new SelfSignedCertificate();
+ QuicSslContext context = QuicSslContextBuilder.forServer(
+ selfSignedCertificate.privateKey(), null, selfSignedCertificate.certificate())
+ .applicationProtocols("http/0.9").build();
+
+
+ bootstrap = new Bootstrap();
+ io.netty.channel.ChannelHandler codec = new QuicServerCodecBuilder()
+ .sslContext(context)
+ .maxIdleTimeout(5000, TimeUnit.MILLISECONDS)
+ .initialMaxData(10000000)
+ .initialMaxStreamDataBidirectionalLocal(1000000)
+ .initialMaxStreamDataBidirectionalRemote(1000000)
+ .initialMaxStreamsBidirectional(100)
+ .initialMaxStreamsUnidirectional(100)
+ .tokenHandler(InsecureQuicTokenHandler.INSTANCE)
+ .streamHandler(new ChannelInitializer<QuicStreamChannel>() {
+ @Override
+ protected void initChannel(QuicStreamChannel ch) {
+ int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
+ NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), QuicNettyServer.this);
+ ch.pipeline()
+ .addLast("decoder", adapter.getDecoder())
+ .addLast("encoder", adapter.getEncoder())
+ .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, TimeUnit.MILLISECONDS))
+ .addLast("handler", nettyServerHandler);
+ }
+ }).build();
+
+ InetSocketAddress address = getBindAddress();
+ logger.info("bind address:"+address);
+ ChannelFuture channelFuture = bootstrap.group(bossGroup)
+ .channel(NioDatagramChannel.class)
+ .handler(codec)
+ .bind(address);
+ channelFuture.addListener((ChannelFutureListener) channelFuture1 -> logger.info("bind finish:"+channelFuture1));
+ }
+
+ @Override
+ protected void doClose() throws Throwable {
+ try {
+ if (channel != null) {
+ // unbind.
+ channel.close();
+ }
+ } catch (Throwable e) {
+ logger.warn(e.getMessage(), e);
+ }
+ try {
+ Collection<org.apache.dubbo.remoting.Channel> channels = getChannels();
+ if (channels != null && channels.size() > 0) {
+ for (org.apache.dubbo.remoting.Channel channel : channels) {
+ try {
+ channel.close();
+ } catch (Throwable e) {
+ logger.warn(e.getMessage(), e);
+ }
+ }
+ }
+ } catch (Throwable e) {
+ logger.warn(e.getMessage(), e);
+ }
+ try {
+ if (bootstrap != null) {
+ bossGroup.shutdownGracefully().syncUninterruptibly();
+ workerGroup.shutdownGracefully().syncUninterruptibly();
+ }
+ } catch (Throwable e) {
+ logger.warn(e.getMessage(), e);
+ }
+ try {
+ if (channels != null) {
+ channels.clear();
+ }
+ } catch (Throwable e) {
+ logger.warn(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public Collection<Channel> getChannels() {
+ Collection<Channel> chs = new ArrayList<>(this.channels.size());
+ // pick channels from NettyServerHandler ( needless to check connectivity )
+ chs.addAll(this.channels.values());
+ return chs;
+ }
+
+ @Override
+ public Channel getChannel(InetSocketAddress remoteAddress) {
+ return channels.get(NetUtils.toAddressString(remoteAddress));
+ }
+
+ @Override
+ public boolean canHandleIdle() {
+ return true;
+ }
+
+ @Override
+ public boolean isBound() {
+ return channel.isActive();
+ }
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-quic/src/main/java/org/apache/dubbo/remoting/transport/quic/QuicNettyServerHandler.java b/dubbo-remoting-extensions/dubbo-remoting-quic/src/main/java/org/apache/dubbo/remoting/transport/quic/QuicNettyServerHandler.java
new file mode 100644
index 0000000..5dcd472
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-quic/src/main/java/org/apache/dubbo/remoting/transport/quic/QuicNettyServerHandler.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.transport.quic;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.NetUtils;
+import org.apache.dubbo.remoting.Channel;
+import org.apache.dubbo.remoting.ChannelHandler;
+
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+import io.netty.handler.timeout.IdleStateEvent;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * NettyServerHandler.
+ */
+@io.netty.channel.ChannelHandler.Sharable
+public class QuicNettyServerHandler extends ChannelDuplexHandler {
+ private static final Logger logger = LoggerFactory.getLogger(QuicNettyServerHandler.class);
+ /**
+ * the cache for alive worker channel.
+ * <ip:port, dubbo channel>
+ */
+ private final Map<String, Channel> channels = new ConcurrentHashMap<String, Channel>();
+
+ private final URL url;
+
+ private final ChannelHandler handler;
+
+ public QuicNettyServerHandler(URL url, ChannelHandler handler) {
+ if (url == null) {
+ throw new IllegalArgumentException("url == null");
+ }
+ if (handler == null) {
+ throw new IllegalArgumentException("handler == null");
+ }
+ this.url = url;
+ this.handler = handler;
+ }
+
+ public Map<String, Channel> getChannels() {
+ return channels;
+ }
+
+ @Override
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
+ QuicNettyChannel channel = QuicNettyChannel.getOrAddChannel(ctx.channel(), url, handler);
+ if (channel != null) {
+ channels.put(NetUtils.toAddressString((InetSocketAddress) ctx.channel().remoteAddress()), channel);
+ }
+ handler.connected(channel);
+
+ if (logger.isInfoEnabled()) {
+ logger.info("The connection of " + channel.getRemoteAddress() + " -> " + channel.getLocalAddress() + " is established.");
+ }
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ QuicNettyChannel channel = QuicNettyChannel.getOrAddChannel(ctx.channel(), url, handler);
+ try {
+ channels.remove(NetUtils.toAddressString((InetSocketAddress) ctx.channel().remoteAddress()));
+ handler.disconnected(channel);
+ } finally {
+ QuicNettyChannel.removeChannel(ctx.channel());
+ }
+
+ if (logger.isInfoEnabled()) {
+ logger.info("The connection of " + channel.getRemoteAddress() + " -> " + channel.getLocalAddress() + " is disconnected.");
+ }
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ QuicNettyChannel channel = QuicNettyChannel.getOrAddChannel(ctx.channel(), url, handler);
+ handler.received(channel, msg);
+ }
+
+
+ @Override
+ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
+ super.write(ctx, msg, promise);
+ QuicNettyChannel channel = QuicNettyChannel.getOrAddChannel(ctx.channel(), url, handler);
+ handler.sent(channel, msg);
+ }
+
+ @Override
+ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+ // server will close channel when server don't receive any heartbeat from client util timeout.
+ if (evt instanceof IdleStateEvent) {
+ QuicNettyChannel channel = QuicNettyChannel.getOrAddChannel(ctx.channel(), url, handler);
+ try {
+ logger.info("IdleStateEvent triggered, close channel " + channel);
+ channel.close();
+ } finally {
+ QuicNettyChannel.removeChannelIfDisconnected(ctx.channel());
+ }
+ }
+ super.userEventTriggered(ctx, evt);
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
+ throws Exception {
+ QuicNettyChannel channel = QuicNettyChannel.getOrAddChannel(ctx.channel(), url, handler);
+ try {
+ handler.caught(channel, cause);
+ } finally {
+ QuicNettyChannel.removeChannelIfDisconnected(ctx.channel());
+ }
+ }
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-quic/src/main/java/org/apache/dubbo/remoting/transport/quic/QuicNettyTransporter.java b/dubbo-remoting-extensions/dubbo-remoting-quic/src/main/java/org/apache/dubbo/remoting/transport/quic/QuicNettyTransporter.java
new file mode 100644
index 0000000..0a90393
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-quic/src/main/java/org/apache/dubbo/remoting/transport/quic/QuicNettyTransporter.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.transport.quic;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.remoting.ChannelHandler;
+import org.apache.dubbo.remoting.Client;
+import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.RemotingServer;
+import org.apache.dubbo.remoting.Transporter;
+
+public class QuicNettyTransporter implements Transporter {
+
+ public static final String NAME = "quic";
+
+ @Override
+ public RemotingServer bind(URL url, ChannelHandler handler) throws RemotingException {
+ return new QuicNettyServer(url, handler);
+ }
+
+ @Override
+ public Client connect(URL url, ChannelHandler handler) throws RemotingException {
+ return new QuicNettyClient(url, handler);
+ }
+
+}
diff --git a/dubbo-remoting-extensions/dubbo-remoting-quic/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.Transporter b/dubbo-remoting-extensions/dubbo-remoting-quic/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.Transporter
new file mode 100644
index 0000000..8b3a733
--- /dev/null
+++ b/dubbo-remoting-extensions/dubbo-remoting-quic/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.Transporter
@@ -0,0 +1 @@
+quic=org.apache.dubbo.remoting.transport.quic.QuicNettyTransporter
diff --git a/dubbo-remoting-extensions/pom.xml b/dubbo-remoting-extensions/pom.xml
index 805d66d..3b65f1b 100644
--- a/dubbo-remoting-extensions/pom.xml
+++ b/dubbo-remoting-extensions/pom.xml
@@ -27,6 +27,10 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>dubbo-remoting-extensions</artifactId>
+ <packaging>pom</packaging>
+ <modules>
+ <module>dubbo-remoting-quic</module>
+ </modules>
</project>