You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by cr...@apache.org on 2022/07/22 03:59:54 UTC
[dubbo] branch 3.1 updated: abstraction of port unification server (#10318)
This is an automated email from the ASF dual-hosted git repository.
crazyhzm pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.1 by this push:
new 9e6517a145 abstraction of port unification server (#10318)
9e6517a145 is described below
commit 9e6517a14575ce8d8a957ae08dd4d84450645243
Author: TrueAbc <47...@users.noreply.github.com>
AuthorDate: Fri Jul 22 11:59:40 2022 +0800
abstraction of port unification server (#10318)
* change ByteBuf parameter to ChannelBuffer
* ChannelBuffers test
* Abstract PortUnificationServer
and make it work on triple protocol
* adjust test cases
* add license to test file
and remove useless code
* resolve conversation
* fix error for CICD check
* update pom file
* comment fix
* adjust config default pu transporter
* delete unused imports
* adjust test cases
* reset SPI default value
* adjust resources file
* fix wrong resources directory
---
...WireProtocol.java => AbstractWireProtocol.java} | 11 ++--
.../dubbo/remoting/api/ProtocolDetector.java | 6 +-
.../api/pu/AbstractPortUnificationServer.java} | 29 +++++----
.../DefaultPuHandler.java} | 32 ++++++---
.../api/pu/PortUnificationTransporter.java} | 27 ++++----
.../dubbo/remoting/buffer/ChannelBuffers.java | 22 +++++++
.../exchange/PortUnificationExchanger.java | 31 ++++++---
.../apache/dubbo/remoting/api/ConnectionTest.java | 66 -------------------
.../dubbo/remoting/buffer/ChannelBuffersTest.java | 9 +++
.../netty4/NettyPortUnificationServer.java} | 75 ++++++++++++----------
.../netty4/NettyPortUnificationServerHandler.java} | 18 ++++--
.../netty4/NettyPortUnificationTransporter.java} | 29 +++++----
...ubbo.remoting.api.pu.PortUnificationTransporter | 1 +
.../remoting/transport/netty4}/ConnectionTest.java | 46 +++----------
.../remoting/transport/netty4/DefaultCodec.java} | 23 +++----
.../transport/netty4/EmptyWireProtocol.java} | 28 +++++---
.../netty4}/PortUnificationExchangerTest.java | 16 ++---
.../netty4}/PortUnificationServerTest.java | 17 ++---
.../internal/org.apache.dubbo.remoting.Codec2 | 1 +
.../org.apache.dubbo.remoting.api.WireProtocol | 1 +
dubbo-rpc/dubbo-rpc-triple/pom.xml | 6 ++
.../dubbo/rpc/protocol/tri/DefaultTriCodec.java | 23 +++----
.../rpc/protocol/tri}/Http2ProtocolDetector.java | 19 +++---
.../rpc/protocol/tri/TripleHttp2Protocol.java | 14 +++-
.../dubbo/rpc/protocol/tri/TripleProtocol.java | 5 +-
.../internal/org.apache.dubbo.remoting.Codec2 | 1 +
.../protocol/tri}/Http2ProtocolDetectorTest.java | 15 +++--
27 files changed, 293 insertions(+), 278 deletions(-)
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/Http2WireProtocol.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/AbstractWireProtocol.java
similarity index 68%
copy from dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/Http2WireProtocol.java
copy to dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/AbstractWireProtocol.java
index 90e617e7d2..675e6f59cd 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/Http2WireProtocol.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/AbstractWireProtocol.java
@@ -16,14 +16,13 @@
*/
package org.apache.dubbo.remoting.api;
-import io.netty.handler.codec.http2.Http2FrameLogger;
+public abstract class AbstractWireProtocol implements WireProtocol {
-import static io.netty.handler.logging.LogLevel.DEBUG;
+ private final ProtocolDetector detector;
-public abstract class Http2WireProtocol implements WireProtocol {
- public static final Http2FrameLogger CLIENT_LOGGER = new Http2FrameLogger(DEBUG, "H2_CLIENT");
- public static final Http2FrameLogger SERVER_LOGGER = new Http2FrameLogger(DEBUG, "H2_SERVER");
- private final ProtocolDetector detector = new Http2ProtocolDetector();
+ public AbstractWireProtocol(ProtocolDetector detector) {
+ this.detector = detector;
+ }
@Override
public ProtocolDetector detector() {
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/ProtocolDetector.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/ProtocolDetector.java
index b868b7aad4..856d0f1631 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/ProtocolDetector.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/ProtocolDetector.java
@@ -17,8 +17,8 @@
package org.apache.dubbo.remoting.api;
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelHandlerContext;
+import org.apache.dubbo.remoting.buffer.ChannelBuffer;
+
/**
* Determine incoming bytes belong to the specific protocol.
@@ -26,7 +26,7 @@ import io.netty.channel.ChannelHandlerContext;
*/
public interface ProtocolDetector {
- Result detect(final ChannelHandlerContext ctx, final ByteBuf in);
+ Result detect(ChannelBuffer in);
enum Result {
RECOGNIZED, UNRECOGNIZED, NEED_MORE_DATA
diff --git a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/api/PortUnificationServerTest.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/pu/AbstractPortUnificationServer.java
similarity index 53%
copy from dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/api/PortUnificationServerTest.java
copy to dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/pu/AbstractPortUnificationServer.java
index 92bcfb5f09..a38648b4d5 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/api/PortUnificationServerTest.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/pu/AbstractPortUnificationServer.java
@@ -14,25 +14,26 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dubbo.remoting.api;
+package org.apache.dubbo.remoting.api.pu;
import org.apache.dubbo.common.URL;
-import org.apache.dubbo.common.constants.CommonConstants;
-import org.apache.dubbo.common.url.component.ServiceConfigURL;
-import org.apache.dubbo.remoting.Constants;
+import org.apache.dubbo.remoting.ChannelHandler;
+import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.api.WireProtocol;
+import org.apache.dubbo.remoting.transport.AbstractServer;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
+import java.util.List;
-public class PortUnificationServerTest {
+public abstract class AbstractPortUnificationServer extends AbstractServer {
+ private final List<WireProtocol> protocols;
- @Test
- public void testBind() {
- URL url = new ServiceConfigURL(CommonConstants.TRIPLE, "localhost", 8898,
- new String[]{Constants.BIND_PORT_KEY, String.valueOf(8898)});
+ public AbstractPortUnificationServer(URL url, ChannelHandler handler) throws RemotingException {
+ super(url, handler);
+ this.protocols = url.getOrDefaultFrameworkModel().getExtensionLoader(WireProtocol.class).getActivateExtension(url, new String[0]);
+ }
- final PortUnificationServer server = new PortUnificationServer(url);
- server.bind();
- Assertions.assertTrue(server.isBound());
+ public List<WireProtocol> getProtocols() {
+ return protocols;
}
+
}
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/Http2WireProtocol.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/pu/DefaultPuHandler.java
similarity index 54%
copy from dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/Http2WireProtocol.java
copy to dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/pu/DefaultPuHandler.java
index 90e617e7d2..0e021f934d 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/Http2WireProtocol.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/pu/DefaultPuHandler.java
@@ -14,23 +14,35 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dubbo.remoting.api;
+package org.apache.dubbo.remoting.api.pu;
-import io.netty.handler.codec.http2.Http2FrameLogger;
+import org.apache.dubbo.remoting.Channel;
+import org.apache.dubbo.remoting.ChannelHandler;
+import org.apache.dubbo.remoting.RemotingException;
-import static io.netty.handler.logging.LogLevel.DEBUG;
+public class DefaultPuHandler implements ChannelHandler {
+ @Override
+ public void connected(Channel channel) throws RemotingException {
+
+ }
+
+ @Override
+ public void disconnected(Channel channel) throws RemotingException {
+
+ }
+
+ @Override
+ public void sent(Channel channel, Object message) throws RemotingException {
-public abstract class Http2WireProtocol implements WireProtocol {
- public static final Http2FrameLogger CLIENT_LOGGER = new Http2FrameLogger(DEBUG, "H2_CLIENT");
- public static final Http2FrameLogger SERVER_LOGGER = new Http2FrameLogger(DEBUG, "H2_SERVER");
- private final ProtocolDetector detector = new Http2ProtocolDetector();
+ }
@Override
- public ProtocolDetector detector() {
- return detector;
+ public void received(Channel channel, Object message) throws RemotingException {
+
}
@Override
- public void close() {
+ public void caught(Channel channel, Throwable exception) throws RemotingException {
+
}
}
diff --git a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/api/PortUnificationServerTest.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/pu/PortUnificationTransporter.java
similarity index 54%
copy from dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/api/PortUnificationServerTest.java
copy to dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/pu/PortUnificationTransporter.java
index 92bcfb5f09..655e91f415 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/api/PortUnificationServerTest.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/pu/PortUnificationTransporter.java
@@ -14,25 +14,24 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dubbo.remoting.api;
+package org.apache.dubbo.remoting.api.pu;
import org.apache.dubbo.common.URL;
-import org.apache.dubbo.common.constants.CommonConstants;
-import org.apache.dubbo.common.url.component.ServiceConfigURL;
+import org.apache.dubbo.common.extension.Adaptive;
+import org.apache.dubbo.common.extension.ExtensionScope;
+import org.apache.dubbo.common.extension.SPI;
+import org.apache.dubbo.remoting.ChannelHandler;
+import org.apache.dubbo.remoting.Client;
import org.apache.dubbo.remoting.Constants;
+import org.apache.dubbo.remoting.RemotingException;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
+@SPI(value = "netty", scope = ExtensionScope.FRAMEWORK)
+public interface PortUnificationTransporter {
-public class PortUnificationServerTest {
+ @Adaptive({Constants.SERVER_KEY, Constants.TRANSPORTER_KEY})
+ AbstractPortUnificationServer bind(URL url, ChannelHandler handler) throws RemotingException;
- @Test
- public void testBind() {
- URL url = new ServiceConfigURL(CommonConstants.TRIPLE, "localhost", 8898,
- new String[]{Constants.BIND_PORT_KEY, String.valueOf(8898)});
+ @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
+ Client connect(URL url, ChannelHandler handler) throws RemotingException;
- final PortUnificationServer server = new PortUnificationServer(url);
- server.bind();
- Assertions.assertTrue(server.isBound());
- }
}
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/buffer/ChannelBuffers.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/buffer/ChannelBuffers.java
index 1c0d9703bd..cc62925da1 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/buffer/ChannelBuffers.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/buffer/ChannelBuffers.java
@@ -114,6 +114,28 @@ public final class ChannelBuffers {
return true;
}
+ // prefix
+ public static boolean prefixEquals(ChannelBuffer bufferA, ChannelBuffer bufferB, int count) {
+ final int aLen = bufferA.readableBytes();
+ final int bLen = bufferB.readableBytes();
+ if (aLen < count || bLen < count) {
+ return false;
+ }
+
+ int aIndex = bufferA.readerIndex();
+ int bIndex = bufferB.readerIndex();
+
+ for (int i = count; i > 0; i--) {
+ if (bufferA.getByte(aIndex) != bufferB.getByte(bIndex)) {
+ return false;
+ }
+ aIndex++;
+ bIndex++;
+ }
+
+ return true;
+ }
+
public static int hasCode(ChannelBuffer buffer) {
final int aLen = buffer.readableBytes();
final int byteCount = aLen & 7;
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/PortUnificationExchanger.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/PortUnificationExchanger.java
index fc00d4cd40..f817855fe6 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/PortUnificationExchanger.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/PortUnificationExchanger.java
@@ -19,7 +19,11 @@ package org.apache.dubbo.remoting.exchange;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
-import org.apache.dubbo.remoting.api.PortUnificationServer;
+import org.apache.dubbo.remoting.ChannelHandler;
+import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.RemotingServer;
+import org.apache.dubbo.remoting.api.pu.AbstractPortUnificationServer;
+import org.apache.dubbo.remoting.api.pu.PortUnificationTransporter;
import java.util.ArrayList;
import java.util.concurrent.ConcurrentHashMap;
@@ -28,20 +32,25 @@ import java.util.concurrent.ConcurrentMap;
public class PortUnificationExchanger {
private static final Logger log = LoggerFactory.getLogger(PortUnificationExchanger.class);
- private static final ConcurrentMap<String, PortUnificationServer> servers = new ConcurrentHashMap<>();
+ private static final ConcurrentMap<String, RemotingServer> servers = new ConcurrentHashMap<>();
- public static void bind(URL url) {
+ public static void bind(URL url, ChannelHandler handler) {
servers.computeIfAbsent(url.getAddress(), addr -> {
- final PortUnificationServer server = new PortUnificationServer(url);
- server.bind();
+ final AbstractPortUnificationServer server;
+ try {
+ server = getTransporter(url).bind(url, handler);
+ } catch (RemotingException e) {
+ throw new RuntimeException(e);
+ }
+ // server.bind();
return server;
});
}
public static void close() {
- final ArrayList<PortUnificationServer> toClose = new ArrayList<>(servers.values());
+ final ArrayList<RemotingServer> toClose = new ArrayList<>(servers.values());
servers.clear();
- for (PortUnificationServer server : toClose) {
+ for (RemotingServer server : toClose) {
try {
server.close();
} catch (Throwable throwable) {
@@ -51,7 +60,13 @@ public class PortUnificationExchanger {
}
// for test
- public static ConcurrentMap<String, PortUnificationServer> getServers() {
+ public static ConcurrentMap<String, RemotingServer> getServers() {
return servers;
}
+
+ public static PortUnificationTransporter getTransporter(URL url) {
+ return url.getOrDefaultFrameworkModel().getExtensionLoader(PortUnificationTransporter.class)
+ .getAdaptiveExtension();
+ }
+
}
diff --git a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/api/ConnectionTest.java b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/api/ConnectionTest.java
index 9f465764ac..5b6bd48bdf 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/api/ConnectionTest.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/api/ConnectionTest.java
@@ -17,14 +17,11 @@
package org.apache.dubbo.remoting.api;
import org.apache.dubbo.common.URL;
-import org.apache.dubbo.common.utils.NetUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
class ConnectionTest {
@@ -59,67 +56,4 @@ class ConnectionTest {
Assertions.assertEquals(0, latch.getCount());
}
- @Test
- public void connectSyncTest() throws Throwable {
- int port = NetUtils.getAvailablePort();
- URL url = URL.valueOf("empty://127.0.0.1:" + port + "?foo=bar");
- PortUnificationServer server = null;
- try {
- server = new PortUnificationServer(url);
- server.bind();
-
- Connection connection = new Connection(url);
- Assertions.assertTrue(connection.isAvailable());
-
- server.close();
- Assertions.assertFalse(connection.isAvailable());
-
- server.bind();
- // auto reconnect
- Assertions.assertTrue(connection.isAvailable());
-
- connection.close();
- Assertions.assertFalse(connection.isAvailable());
- } finally {
- try {
- server.close();
- } catch (Throwable e) {
- // ignored
- }
- }
-
-
- }
-
- @Test
- public void testMultiConnect() throws Throwable {
- int port = NetUtils.getAvailablePort();
- URL url = URL.valueOf("empty://127.0.0.1:" + port + "?foo=bar");
- PortUnificationServer server = null;
- try {
- server = new PortUnificationServer(url);
- server.close();
-
- Connection connection = new Connection(url);
- ExecutorService service = Executors.newFixedThreadPool(10);
- final CountDownLatch latch = new CountDownLatch(10);
- for (int i = 0; i < 10; i++) {
- Runnable runnable = () -> {
- try {
- Assertions.assertTrue(connection.isAvailable());
- latch.countDown();
- } catch (Exception e) {
- // ignore
- }
- };
- service.execute(runnable);
- }
- } finally {
- try {
- server.close();
- } catch (Throwable e) {
- // ignored
- }
- }
- }
}
diff --git a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/buffer/ChannelBuffersTest.java b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/buffer/ChannelBuffersTest.java
index e6811f2bd7..9e71910f9d 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/buffer/ChannelBuffersTest.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/buffer/ChannelBuffersTest.java
@@ -40,6 +40,15 @@ public class ChannelBuffersTest {
Assertions.assertEquals(channelBuffer.capacity(), 32);
}
+ @Test
+ public void testPrefixEquals(){
+ ChannelBuffer bufA = ChannelBuffers.wrappedBuffer("abcedfaf".getBytes());
+ ChannelBuffer bufB = ChannelBuffers.wrappedBuffer("abcedfaa".getBytes());
+ Assertions.assertTrue(ChannelBuffers.equals(bufA, bufB));
+ Assertions.assertTrue(ChannelBuffers.prefixEquals(bufA, bufB, 7));
+ Assertions.assertFalse(ChannelBuffers.prefixEquals(bufA, bufB, 8));
+ }
+
@Test
public void testBuffer() {
ChannelBuffer channelBuffer = ChannelBuffers.buffer(DEFAULT_CAPACITY);
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/PortUnificationServer.java b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyPortUnificationServer.java
similarity index 81%
rename from dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/PortUnificationServer.java
rename to dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyPortUnificationServer.java
index 51125bda34..1c017d577d 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/PortUnificationServer.java
+++ b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyPortUnificationServer.java
@@ -14,20 +14,24 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dubbo.remoting.api;
+package org.apache.dubbo.remoting.transport.netty4;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.config.ConfigurationUtils;
-import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
-import org.apache.dubbo.common.utils.ExecutorUtil;
import org.apache.dubbo.common.utils.NetUtils;
+import org.apache.dubbo.remoting.Channel;
+import org.apache.dubbo.remoting.ChannelHandler;
import org.apache.dubbo.remoting.Constants;
+import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.api.NettyEventLoopFactory;
+import org.apache.dubbo.remoting.api.SslContexts;
+import org.apache.dubbo.remoting.api.WireProtocol;
+import org.apache.dubbo.remoting.api.pu.AbstractPortUnificationServer;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
@@ -40,7 +44,7 @@ import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.net.InetSocketAddress;
-import java.util.List;
+import java.util.Collection;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.dubbo.common.constants.CommonConstants.ANYHOST_KEY;
@@ -53,11 +57,9 @@ import static org.apache.dubbo.remoting.Constants.EVENT_LOOP_WORKER_POOL_NAME;
/**
* PortUnificationServer.
*/
-public class PortUnificationServer {
+public class NettyPortUnificationServer extends AbstractPortUnificationServer {
- private static final Logger logger = LoggerFactory.getLogger(PortUnificationServer.class);
- private final List<WireProtocol> protocols;
- private final URL url;
+ private static final Logger logger = LoggerFactory.getLogger(NettyPortUnificationServer.class);
private final DefaultChannelGroup channels = new DefaultChannelGroup(
GlobalEventExecutor.INSTANCE);
@@ -70,41 +72,35 @@ public class PortUnificationServer {
/**
* the boss channel that receive connections and dispatch these to worker channel.
*/
- private Channel channel;
+ private io.netty.channel.Channel channel;
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
- public PortUnificationServer(URL url) {
+ public NettyPortUnificationServer(URL url, ChannelHandler handler) throws RemotingException {
+ super(url, handler);
+
// you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in CommonConstants.
// the handler will be wrapped: MultiMessageHandler->HeartbeatHandler->handler
- this.url = ExecutorUtil.setThreadName(url, "DubboPUServerHandler");
- this.protocols = ExtensionLoader.getExtensionLoader(WireProtocol.class)
- .getActivateExtension(url, new String[0]);
// read config before destroy
serverShutdownTimeoutMills = ConfigurationUtils.getServerShutdownTimeout(
getUrl().getOrDefaultModuleModel());
}
- public URL getUrl() {
- return url;
- }
-
- public void bind() {
- if (channel == null) {
- doOpen();
+ @Override
+ public void close() {
+ if (channel != null) {
+ doClose();
}
}
- public void close() throws Throwable {
- if (channel != null) {
- doClose();
+ public void bind(){
+ if(channel == null) {
+ doOpen();
}
}
- /**
- * Init and start netty server
- */
- protected void doOpen() {
+ @Override
+ public void doOpen() {
bootstrap = new ServerBootstrap();
bossGroup = NettyEventLoopFactory.eventLoopGroup(1, EVENT_LOOP_BOSS_POOL_NAME);
@@ -115,7 +111,7 @@ public class PortUnificationServer {
final boolean enableSsl = getUrl().getParameter(SSL_ENABLED_KEY, false);
final SslContext sslContext;
if (enableSsl) {
- sslContext = SslContexts.buildServerSslContext(url);
+ sslContext = SslContexts.buildServerSslContext(getUrl());
} else {
sslContext = null;
}
@@ -129,8 +125,8 @@ public class PortUnificationServer {
protected void initChannel(SocketChannel ch) throws Exception {
// Do not add idle state handler here, because it should be added in the protocol handler.
final ChannelPipeline p = ch.pipeline();
- final PortUnificationServerHandler puHandler;
- puHandler = new PortUnificationServerHandler(url, sslContext, true, protocols,
+ final NettyPortUnificationServerHandler puHandler;
+ puHandler = new NettyPortUnificationServerHandler(getUrl(), sslContext, true, getProtocols(),
channels);
p.addLast("negotiation-protocol", puHandler);
}
@@ -139,7 +135,7 @@ public class PortUnificationServer {
String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
- if (url.getParameter(ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
+ if (getUrl().getParameter(ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
bindIp = ANYHOST_VALUE;
}
InetSocketAddress bindAddress = new InetSocketAddress(bindIp, bindPort);
@@ -148,7 +144,8 @@ public class PortUnificationServer {
channel = channelFuture.channel();
}
- protected void doClose() throws Throwable {
+ @Override
+ public void doClose(){
final long st = System.currentTimeMillis();
try {
@@ -165,7 +162,7 @@ public class PortUnificationServer {
logger.warn("Interrupted while shutting down", e);
}
- for (WireProtocol protocol : protocols) {
+ for (WireProtocol protocol : getProtocols()) {
protocol.close();
}
@@ -189,6 +186,16 @@ public class PortUnificationServer {
return channel.isActive();
}
+ @Override
+ public Collection<Channel> getChannels() {
+ return null;
+ }
+
+ @Override
+ public Channel getChannel(InetSocketAddress remoteAddress) {
+ return null;
+ }
+
public InetSocketAddress getLocalAddress() {
return (InetSocketAddress) channel.localAddress();
}
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/PortUnificationServerHandler.java b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyPortUnificationServerHandler.java
similarity index 85%
rename from dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/PortUnificationServerHandler.java
rename to dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyPortUnificationServerHandler.java
index 634b406313..0db96b09fa 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/PortUnificationServerHandler.java
+++ b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyPortUnificationServerHandler.java
@@ -14,12 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dubbo.remoting.api;
+package org.apache.dubbo.remoting.transport.netty4;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.io.Bytes;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.remoting.api.ProtocolDetector;
+import org.apache.dubbo.remoting.api.WireProtocol;
+import org.apache.dubbo.remoting.buffer.ChannelBuffer;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
@@ -32,10 +35,10 @@ import io.netty.handler.ssl.SslHandler;
import java.util.List;
import java.util.Set;
-public class PortUnificationServerHandler extends ByteToMessageDecoder {
+public class NettyPortUnificationServerHandler extends ByteToMessageDecoder {
private static final Logger LOGGER = LoggerFactory.getLogger(
- PortUnificationServerHandler.class);
+ NettyPortUnificationServerHandler.class);
private final ChannelGroup channels;
@@ -44,8 +47,8 @@ public class PortUnificationServerHandler extends ByteToMessageDecoder {
private final boolean detectSsl;
private final List<WireProtocol> protocols;
- public PortUnificationServerHandler(URL url, SslContext sslCtx, boolean detectSsl,
- List<WireProtocol> protocols, ChannelGroup channels) {
+ public NettyPortUnificationServerHandler(URL url, SslContext sslCtx, boolean detectSsl,
+ List<WireProtocol> protocols, ChannelGroup channels) {
this.url = url;
this.sslCtx = sslCtx;
this.protocols = protocols;
@@ -77,7 +80,8 @@ public class PortUnificationServerHandler extends ByteToMessageDecoder {
} else {
for (final WireProtocol protocol : protocols) {
in.markReaderIndex();
- final ProtocolDetector.Result result = protocol.detector().detect(ctx, in);
+ ChannelBuffer buf = new NettyBackedChannelBuffer(in);
+ final ProtocolDetector.Result result = protocol.detector().detect(buf);
in.resetReaderIndex();
switch (result) {
case UNRECOGNIZED:
@@ -111,7 +115,7 @@ public class PortUnificationServerHandler extends ByteToMessageDecoder {
ChannelPipeline p = ctx.pipeline();
p.addLast("ssl", sslCtx.newHandler(ctx.alloc()));
p.addLast("unificationA",
- new PortUnificationServerHandler(url, sslCtx, false, protocols, channels));
+ new NettyPortUnificationServerHandler(url, sslCtx, false, protocols, channels));
p.remove(this);
}
diff --git a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/api/PortUnificationServerTest.java b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyPortUnificationTransporter.java
similarity index 52%
copy from dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/api/PortUnificationServerTest.java
copy to dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyPortUnificationTransporter.java
index 92bcfb5f09..b13dc0f316 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/api/PortUnificationServerTest.java
+++ b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyPortUnificationTransporter.java
@@ -14,25 +14,26 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dubbo.remoting.api;
+package org.apache.dubbo.remoting.transport.netty4;
import org.apache.dubbo.common.URL;
-import org.apache.dubbo.common.constants.CommonConstants;
-import org.apache.dubbo.common.url.component.ServiceConfigURL;
-import org.apache.dubbo.remoting.Constants;
+import org.apache.dubbo.remoting.ChannelHandler;
+import org.apache.dubbo.remoting.Client;
+import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.api.pu.AbstractPortUnificationServer;
+import org.apache.dubbo.remoting.api.pu.PortUnificationTransporter;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
+public class NettyPortUnificationTransporter implements PortUnificationTransporter {
-public class PortUnificationServerTest {
+ public static final String NAME = "netty";
- @Test
- public void testBind() {
- URL url = new ServiceConfigURL(CommonConstants.TRIPLE, "localhost", 8898,
- new String[]{Constants.BIND_PORT_KEY, String.valueOf(8898)});
+ @Override
+ public AbstractPortUnificationServer bind(URL url, ChannelHandler handler) throws RemotingException {
+ return new NettyPortUnificationServer(url, handler);
+ }
- final PortUnificationServer server = new PortUnificationServer(url);
- server.bind();
- Assertions.assertTrue(server.isBound());
+ @Override
+ public Client connect(URL url, ChannelHandler handler) throws RemotingException {
+ return null;
}
}
diff --git a/dubbo-remoting/dubbo-remoting-netty4/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.api.pu.PortUnificationTransporter b/dubbo-remoting/dubbo-remoting-netty4/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.api.pu.PortUnificationTransporter
new file mode 100644
index 0000000000..69a981496a
--- /dev/null
+++ b/dubbo-remoting/dubbo-remoting-netty4/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.api.pu.PortUnificationTransporter
@@ -0,0 +1 @@
+netty=org.apache.dubbo.remoting.transport.netty4.NettyPortUnificationTransporter
diff --git a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/api/ConnectionTest.java b/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/ConnectionTest.java
similarity index 65%
copy from dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/api/ConnectionTest.java
copy to dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/ConnectionTest.java
index 9f465764ac..55aa550639 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/api/ConnectionTest.java
+++ b/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/ConnectionTest.java
@@ -14,10 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dubbo.remoting.api;
+package org.apache.dubbo.remoting.transport.netty4;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.utils.NetUtils;
+import org.apache.dubbo.remoting.api.Connection;
+import org.apache.dubbo.remoting.api.pu.DefaultPuHandler;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -26,46 +28,14 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-class ConnectionTest {
-
- @Test
- public void testRefCnt0() throws InterruptedException {
- Connection connection = new Connection(URL.valueOf("empty://127.0.0.1:8080?foo=bar"));
- CountDownLatch latch = new CountDownLatch(1);
- connection.getClosePromise().addListener(future -> latch.countDown());
- connection.release();
- latch.await();
- Assertions.assertEquals(0, latch.getCount());
- }
-
- @Test
- public void testRefCnt1() {
- Connection connection = new Connection(URL.valueOf("empty://127.0.0.1:8080?foo=bar"));
- CountDownLatch latch = new CountDownLatch(1);
- connection.retain();
- connection.getClosePromise().addListener(future -> latch.countDown());
- connection.release();
- Assertions.assertEquals(1, latch.getCount());
- }
-
- @Test
- public void testRefCnt2() throws InterruptedException {
- Connection connection = new Connection(URL.valueOf("empty://127.0.0.1:8080?foo=bar"));
- CountDownLatch latch = new CountDownLatch(1);
- connection.retain();
- connection.getClosePromise().addListener(future -> latch.countDown());
- connection.release(2);
- latch.await();
- Assertions.assertEquals(0, latch.getCount());
- }
-
+public class ConnectionTest {
@Test
public void connectSyncTest() throws Throwable {
int port = NetUtils.getAvailablePort();
URL url = URL.valueOf("empty://127.0.0.1:" + port + "?foo=bar");
- PortUnificationServer server = null;
+ NettyPortUnificationServer server = null;
try {
- server = new PortUnificationServer(url);
+ server = new NettyPortUnificationServer(url, new DefaultPuHandler());
server.bind();
Connection connection = new Connection(url);
@@ -95,9 +65,9 @@ class ConnectionTest {
public void testMultiConnect() throws Throwable {
int port = NetUtils.getAvailablePort();
URL url = URL.valueOf("empty://127.0.0.1:" + port + "?foo=bar");
- PortUnificationServer server = null;
+ NettyPortUnificationServer server = null;
try {
- server = new PortUnificationServer(url);
+ server = new NettyPortUnificationServer(url, new DefaultPuHandler());
server.close();
Connection connection = new Connection(url);
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/ProtocolDetector.java b/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/DefaultCodec.java
similarity index 61%
copy from dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/ProtocolDetector.java
copy to dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/DefaultCodec.java
index b868b7aad4..9f33bfaa00 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/ProtocolDetector.java
+++ b/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/DefaultCodec.java
@@ -14,21 +14,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dubbo.remoting.api;
+package org.apache.dubbo.remoting.transport.netty4;
+import org.apache.dubbo.remoting.Channel;
+import org.apache.dubbo.remoting.Codec2;
+import org.apache.dubbo.remoting.buffer.ChannelBuffer;
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelHandlerContext;
+import java.io.IOException;
-/**
- * Determine incoming bytes belong to the specific protocol.
- *
- */
-public interface ProtocolDetector {
+public class DefaultCodec implements Codec2 {
+ @Override
+ public void encode(Channel channel, ChannelBuffer buffer, Object message) throws IOException {
- Result detect(final ChannelHandlerContext ctx, final ByteBuf in);
+ }
- enum Result {
- RECOGNIZED, UNRECOGNIZED, NEED_MORE_DATA
+ @Override
+ public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
+ return null;
}
}
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/Http2WireProtocol.java b/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/EmptyWireProtocol.java
similarity index 60%
rename from dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/Http2WireProtocol.java
rename to dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/EmptyWireProtocol.java
index 90e617e7d2..322b524b85 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/Http2WireProtocol.java
+++ b/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/EmptyWireProtocol.java
@@ -14,23 +14,33 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dubbo.remoting.api;
+package org.apache.dubbo.remoting.transport.netty4;
-import io.netty.handler.codec.http2.Http2FrameLogger;
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.remoting.api.ProtocolDetector;
+import org.apache.dubbo.remoting.api.WireProtocol;
-import static io.netty.handler.logging.LogLevel.DEBUG;
-
-public abstract class Http2WireProtocol implements WireProtocol {
- public static final Http2FrameLogger CLIENT_LOGGER = new Http2FrameLogger(DEBUG, "H2_CLIENT");
- public static final Http2FrameLogger SERVER_LOGGER = new Http2FrameLogger(DEBUG, "H2_SERVER");
- private final ProtocolDetector detector = new Http2ProtocolDetector();
+import io.netty.channel.ChannelPipeline;
+import io.netty.handler.ssl.SslContext;
+public class EmptyWireProtocol implements WireProtocol {
@Override
public ProtocolDetector detector() {
- return detector;
+ return null;
+ }
+
+ @Override
+ public void configServerPipeline(URL url, ChannelPipeline pipeline, SslContext sslContext) {
+
+ }
+
+ @Override
+ public void configClientPipeline(URL url, ChannelPipeline pipeline, SslContext sslContext) {
+
}
@Override
public void close() {
+
}
}
diff --git a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/PortUnificationExchangerTest.java b/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/PortUnificationExchangerTest.java
similarity index 71%
rename from dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/PortUnificationExchangerTest.java
rename to dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/PortUnificationExchangerTest.java
index a842cbb6c1..2fd8232db5 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/PortUnificationExchangerTest.java
+++ b/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/PortUnificationExchangerTest.java
@@ -14,12 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dubbo.remoting.exchange;
+package org.apache.dubbo.remoting.transport.netty4;
import org.apache.dubbo.common.URL;
-import org.apache.dubbo.common.constants.CommonConstants;
-import org.apache.dubbo.common.url.component.ServiceConfigURL;
-import org.apache.dubbo.remoting.Constants;
+import org.apache.dubbo.common.utils.NetUtils;
+import org.apache.dubbo.remoting.api.pu.DefaultPuHandler;
+import org.apache.dubbo.remoting.exchange.PortUnificationExchanger;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -28,10 +28,10 @@ public class PortUnificationExchangerTest {
@Test
public void test() {
- URL url = new ServiceConfigURL(CommonConstants.TRIPLE, "localhost", 9103,
- new String[]{Constants.BIND_PORT_KEY, String.valueOf(9103)});
- PortUnificationExchanger.bind(url);
- PortUnificationExchanger.bind(url);
+ int port = NetUtils.getAvailablePort();
+ URL url = URL.valueOf("empty://127.0.0.1:" + port + "?foo=bar");
+ PortUnificationExchanger.bind(url, new DefaultPuHandler());
+ PortUnificationExchanger.bind(url, new DefaultPuHandler());
Assertions.assertEquals(PortUnificationExchanger.getServers().size(), 1);
PortUnificationExchanger.close();
diff --git a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/api/PortUnificationServerTest.java b/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/PortUnificationServerTest.java
similarity index 64%
rename from dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/api/PortUnificationServerTest.java
rename to dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/PortUnificationServerTest.java
index 92bcfb5f09..23fdcb9ee3 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/api/PortUnificationServerTest.java
+++ b/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/PortUnificationServerTest.java
@@ -14,12 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dubbo.remoting.api;
+package org.apache.dubbo.remoting.transport.netty4;
import org.apache.dubbo.common.URL;
-import org.apache.dubbo.common.constants.CommonConstants;
-import org.apache.dubbo.common.url.component.ServiceConfigURL;
-import org.apache.dubbo.remoting.Constants;
+import org.apache.dubbo.common.utils.NetUtils;
+import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.api.pu.DefaultPuHandler;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -27,11 +27,12 @@ import org.junit.jupiter.api.Test;
public class PortUnificationServerTest {
@Test
- public void testBind() {
- URL url = new ServiceConfigURL(CommonConstants.TRIPLE, "localhost", 8898,
- new String[]{Constants.BIND_PORT_KEY, String.valueOf(8898)});
+ public void testBind() throws RemotingException {
+ int port = NetUtils.getAvailablePort();
+ URL url = URL.valueOf("empty://127.0.0.1:" + port + "?foo=bar");
- final PortUnificationServer server = new PortUnificationServer(url);
+ // abstract endpoint need to get codec of url(which is in triple package)
+ final NettyPortUnificationServer server = new NettyPortUnificationServer(url, new DefaultPuHandler());
server.bind();
Assertions.assertTrue(server.isBound());
}
diff --git a/dubbo-remoting/dubbo-remoting-netty4/src/test/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.Codec2 b/dubbo-remoting/dubbo-remoting-netty4/src/test/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.Codec2
new file mode 100644
index 0000000000..58df523dd4
--- /dev/null
+++ b/dubbo-remoting/dubbo-remoting-netty4/src/test/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.Codec2
@@ -0,0 +1 @@
+empty=org.apache.dubbo.remoting.transport.netty4.DefaultCodec
diff --git a/dubbo-remoting/dubbo-remoting-netty4/src/test/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.api.WireProtocol b/dubbo-remoting/dubbo-remoting-netty4/src/test/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.api.WireProtocol
new file mode 100644
index 0000000000..74a5075355
--- /dev/null
+++ b/dubbo-remoting/dubbo-remoting-netty4/src/test/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.api.WireProtocol
@@ -0,0 +1 @@
+empty=org.apache.dubbo.remoting.transport.netty4.EmptyWireProtocol
diff --git a/dubbo-rpc/dubbo-rpc-triple/pom.xml b/dubbo-rpc/dubbo-rpc-triple/pom.xml
index 7d8a5c7258..9a2c0eaa3f 100644
--- a/dubbo-rpc/dubbo-rpc-triple/pom.xml
+++ b/dubbo-rpc/dubbo-rpc-triple/pom.xml
@@ -38,6 +38,12 @@
<artifactId>dubbo-rpc-api</artifactId>
<version>${project.parent.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.dubbo</groupId>
+ <artifactId>dubbo-remoting-netty4</artifactId>
+ <version>${project.parent.version}</version>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/ProtocolDetector.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/DefaultTriCodec.java
similarity index 62%
copy from dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/ProtocolDetector.java
copy to dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/DefaultTriCodec.java
index b868b7aad4..259c986fc7 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/ProtocolDetector.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/DefaultTriCodec.java
@@ -14,21 +14,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dubbo.remoting.api;
+package org.apache.dubbo.rpc.protocol.tri;
+import org.apache.dubbo.remoting.Channel;
+import org.apache.dubbo.remoting.Codec2;
+import org.apache.dubbo.remoting.buffer.ChannelBuffer;
+import java.io.IOException;
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelHandlerContext;
+public class DefaultTriCodec implements Codec2 {
-/**
- * Determine incoming bytes belong to the specific protocol.
- *
- */
-public interface ProtocolDetector {
+ @Override
+ public void encode(Channel channel, ChannelBuffer buffer, Object message) throws IOException {
- Result detect(final ChannelHandlerContext ctx, final ByteBuf in);
+ }
- enum Result {
- RECOGNIZED, UNRECOGNIZED, NEED_MORE_DATA
+ @Override
+ public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
+ return null;
}
}
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/Http2ProtocolDetector.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Http2ProtocolDetector.java
similarity index 70%
rename from dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/Http2ProtocolDetector.java
rename to dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Http2ProtocolDetector.java
index ae5e4c663c..9071ca4393 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/Http2ProtocolDetector.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Http2ProtocolDetector.java
@@ -14,27 +14,28 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dubbo.remoting.api;
+package org.apache.dubbo.rpc.protocol.tri;
+
+import org.apache.dubbo.remoting.api.ProtocolDetector;
+import org.apache.dubbo.remoting.buffer.ByteBufferBackedChannelBuffer;
+import org.apache.dubbo.remoting.buffer.ChannelBuffer;
+import org.apache.dubbo.remoting.buffer.ChannelBuffers;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufUtil;
-import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http2.Http2CodecUtil;
import static java.lang.Math.min;
public class Http2ProtocolDetector implements ProtocolDetector {
- private final ByteBuf clientPrefaceString = Http2CodecUtil.connectionPrefaceBuf();
+ private final ChannelBuffer clientPrefaceString = new ByteBufferBackedChannelBuffer(
+ Http2CodecUtil.connectionPrefaceBuf().nioBuffer());
@Override
- public Result detect(ChannelHandlerContext ctx, ByteBuf in) {
+ public Result detect(ChannelBuffer in) {
int prefaceLen = clientPrefaceString.readableBytes();
int bytesRead = min(in.readableBytes(), prefaceLen);
// If the input so far doesn't match the preface, break the connection.
- if (bytesRead == 0 || !ByteBufUtil.equals(in, 0,
- clientPrefaceString, 0, bytesRead)) {
-
+ if (bytesRead == 0 || !ChannelBuffers.prefixEquals(in, clientPrefaceString, bytesRead)) {
return Result.UNRECOGNIZED;
}
if (bytesRead == prefaceLen) {
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
index 881303f7ef..2c1c051c7e 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
@@ -23,7 +23,7 @@ import org.apache.dubbo.common.config.ConfigurationUtils;
import org.apache.dubbo.common.extension.Activate;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
-import org.apache.dubbo.remoting.api.Http2WireProtocol;
+import org.apache.dubbo.remoting.api.AbstractWireProtocol;
import org.apache.dubbo.rpc.HeaderFilter;
import org.apache.dubbo.rpc.model.ApplicationModel;
import org.apache.dubbo.rpc.model.FrameworkModel;
@@ -39,8 +39,10 @@ import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.http2.Http2FrameCodec;
import io.netty.handler.codec.http2.Http2FrameCodecBuilder;
+import io.netty.handler.codec.http2.Http2FrameLogger;
import io.netty.handler.codec.http2.Http2MultiplexHandler;
import io.netty.handler.codec.http2.Http2Settings;
+import io.netty.handler.logging.LogLevel;
import io.netty.handler.ssl.SslContext;
import java.util.Collections;
@@ -56,7 +58,7 @@ import static org.apache.dubbo.rpc.Constants.H2_SETTINGS_MAX_FRAME_SIZE_KEY;
import static org.apache.dubbo.rpc.Constants.H2_SETTINGS_MAX_HEADER_LIST_SIZE_KEY;
@Activate
-public class TripleHttp2Protocol extends Http2WireProtocol implements ScopeModelAware {
+public class TripleHttp2Protocol extends AbstractWireProtocol implements ScopeModelAware {
// 1 MiB
private static final int MIB_1 = 1 << 20;
@@ -67,11 +69,19 @@ public class TripleHttp2Protocol extends Http2WireProtocol implements ScopeModel
private static final int DEFAULT_MAX_FRAME_SIZE = MIB_8;
private static final int DEFAULT_WINDOW_INIT_SIZE = MIB_8;
+ public static final Http2FrameLogger CLIENT_LOGGER = new Http2FrameLogger(LogLevel.DEBUG, "H2_CLIENT");
+
+ public static final Http2FrameLogger SERVER_LOGGER = new Http2FrameLogger(LogLevel.DEBUG, "H2_SERVER");
+
private ExtensionLoader<HeaderFilter> filtersLoader;
private FrameworkModel frameworkModel;
private Configuration config = ConfigurationUtils.getGlobalConfiguration(
ApplicationModel.defaultModel());
+ public TripleHttp2Protocol() {
+ super(new Http2ProtocolDetector());
+ }
+
@Override
public void setFrameworkModel(FrameworkModel frameworkModel) {
this.frameworkModel = frameworkModel;
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocol.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocol.java
index e3d2f71697..5d728aa2d5 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocol.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocol.java
@@ -22,6 +22,7 @@ import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
import org.apache.dubbo.remoting.api.ConnectionManager;
+import org.apache.dubbo.remoting.api.pu.DefaultPuHandler;
import org.apache.dubbo.remoting.exchange.PortUnificationExchanger;
import org.apache.dubbo.rpc.Exporter;
import org.apache.dubbo.rpc.Invoker;
@@ -107,12 +108,12 @@ public class TripleProtocol extends AbstractProtocol {
.setStatus(url.getServiceKey(), HealthCheckResponse.ServingStatus.SERVING);
triBuiltinService.getHealthStatusManager()
.setStatus(url.getServiceInterface(), HealthCheckResponse.ServingStatus.SERVING);
-
// init
url.getOrDefaultApplicationModel().getExtensionLoader(ExecutorRepository.class)
.getDefaultExtension()
.createExecutorIfAbsent(url);
- PortUnificationExchanger.bind(url);
+
+ PortUnificationExchanger.bind(url, new DefaultPuHandler());
optimizeSerialization(url);
return exporter;
}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.Codec2 b/dubbo-rpc/dubbo-rpc-triple/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.Codec2
new file mode 100644
index 0000000000..bd58299466
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.Codec2
@@ -0,0 +1 @@
+tri=org.apache.dubbo.rpc.protocol.tri.DefaultTriCodec
diff --git a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/api/Http2ProtocolDetectorTest.java b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/Http2ProtocolDetectorTest.java
similarity index 74%
rename from dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/api/Http2ProtocolDetectorTest.java
rename to dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/Http2ProtocolDetectorTest.java
index 702bf10319..e336db6226 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/api/Http2ProtocolDetectorTest.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/Http2ProtocolDetectorTest.java
@@ -14,7 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dubbo.remoting.api;
+package org.apache.dubbo.rpc.protocol.tri;
+
+import org.apache.dubbo.remoting.Channel;
+import org.apache.dubbo.remoting.api.ProtocolDetector;
+import org.apache.dubbo.remoting.buffer.ByteBufferBackedChannelBuffer;
+import org.apache.dubbo.remoting.buffer.ChannelBuffer;
+import org.apache.dubbo.remoting.buffer.ChannelBuffers;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
@@ -36,16 +42,17 @@ public class Http2ProtocolDetectorTest {
ByteBuf connectionPrefaceBuf = Http2CodecUtil.connectionPrefaceBuf();
ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer();
- ProtocolDetector.Result result = detector.detect(ctx, byteBuf);
+ ChannelBuffer in = new ByteBufferBackedChannelBuffer(byteBuf.nioBuffer());
+ ProtocolDetector.Result result = detector.detect(in);
Assertions.assertEquals(result, ProtocolDetector.Result.UNRECOGNIZED);
byteBuf.writeBytes(connectionPrefaceBuf);
- result = detector.detect(ctx, byteBuf);
+ result = detector.detect(new ByteBufferBackedChannelBuffer(byteBuf.nioBuffer()));
Assertions.assertEquals(result, ProtocolDetector.Result.RECOGNIZED);
byteBuf.clear();
byteBuf.writeBytes(connectionPrefaceBuf, 0, 1);
- result = detector.detect(ctx, byteBuf);
+ result = detector.detect(new ByteBufferBackedChannelBuffer(byteBuf.nioBuffer()));
Assertions.assertEquals(result, ProtocolDetector.Result.NEED_MORE_DATA);
}