You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2019/06/28 11:19:49 UTC
[rocketmq-remoting] 14/39: Clean code and polish protocol
description
This is an automated email from the ASF dual-hosted git repository.
yukon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-remoting.git
commit 58fcc75a6c1206a33909e3ba76b0d1e4c29a8b21
Author: yukon <yu...@apache.org>
AuthorDate: Fri May 17 20:08:47 2019 +0800
Clean code and polish protocol description
---
.../api/channel/ChannelHandlerContextWrapper.java | 21 -------------
.../rocketmq/remoting/common/ResponseFuture.java | 9 ------
.../remoting/common/SemaphoreReleaseOnlyOnce.java | 4 ---
.../rocketmq/remoting/external/ThreadUtils.java | 3 +-
.../channel/ChannelHandlerContextWrapperImpl.java | 33 --------------------
.../impl/command/RemotingSysResponseCode.java | 2 --
.../remoting/impl/netty/NettyRemotingAbstract.java | 10 ++++++-
.../org/apache/rocketmq/remoting/package-info.java | 35 +++++++++-------------
8 files changed, 24 insertions(+), 93 deletions(-)
diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/channel/ChannelHandlerContextWrapper.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/channel/ChannelHandlerContextWrapper.java
deleted file mode 100644
index 05c3b18..0000000
--- a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/channel/ChannelHandlerContextWrapper.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.remoting.api.channel;
-
-public interface ChannelHandlerContextWrapper<T> {
- T getContext();
-}
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ResponseFuture.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ResponseFuture.java
index 014dd78..76f3472 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ResponseFuture.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ResponseFuture.java
@@ -42,7 +42,6 @@ public class ResponseFuture {
private SemaphoreReleaseOnlyOnce once;
private RemotingCommand requestCommand;
- private InterceptorGroup interceptorGroup;
private String remoteAddr;
public ResponseFuture(int requestId, long timeoutMillis, AsyncHandler asyncHandler,
@@ -139,14 +138,6 @@ public class ResponseFuture {
this.requestCommand = requestCommand;
}
- public InterceptorGroup getInterceptorGroup() {
- return interceptorGroup;
- }
-
- public void setInterceptorGroup(InterceptorGroup interceptorGroup) {
- this.interceptorGroup = interceptorGroup;
- }
-
public String getRemoteAddr() {
return remoteAddr;
}
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/SemaphoreReleaseOnlyOnce.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/SemaphoreReleaseOnlyOnce.java
index 1c5849b..9babace 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/SemaphoreReleaseOnlyOnce.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/SemaphoreReleaseOnlyOnce.java
@@ -33,8 +33,4 @@ public class SemaphoreReleaseOnlyOnce {
this.semaphore.release();
}
}
-
- public Semaphore getSemaphore() {
- return semaphore;
- }
}
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/external/ThreadUtils.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/external/ThreadUtils.java
index a4a7487..3f43b62 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/external/ThreadUtils.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/external/ThreadUtils.java
@@ -168,8 +168,7 @@ public final class ThreadUtils {
executor.shutdown();
try {
// Wait a while for existing tasks to terminate.
- if (!executor
- .awaitTermination(timeout, timeUnit)) {
+ if (!executor.awaitTermination(timeout, timeUnit)) {
executor.shutdownNow();
// Wait a while for tasks to respond to being cancelled.
if (!executor.awaitTermination(timeout, timeUnit)) {
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/channel/ChannelHandlerContextWrapperImpl.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/channel/ChannelHandlerContextWrapperImpl.java
deleted file mode 100644
index bbd33ea..0000000
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/channel/ChannelHandlerContextWrapperImpl.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.remoting.impl.channel;
-
-import org.apache.rocketmq.remoting.api.channel.ChannelHandlerContextWrapper;
-
-public class ChannelHandlerContextWrapperImpl<ChannelHandlerContext> implements ChannelHandlerContextWrapper {
-
- private io.netty.channel.ChannelHandlerContext context;
-
- public ChannelHandlerContextWrapperImpl(io.netty.channel.ChannelHandlerContext context) {
- this.context = context;
- }
-
- public io.netty.channel.ChannelHandlerContext getContext() {
- return context;
- }
-}
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingSysResponseCode.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingSysResponseCode.java
index ae76c6f..f47fd72 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingSysResponseCode.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingSysResponseCode.java
@@ -26,6 +26,4 @@ public class RemotingSysResponseCode {
public static final short SYSTEM_BUSY = 2;
public static final short REQUEST_CODE_NOT_SUPPORTED = 3;
-
- public static final short TRANSACTION_FAILED = 4;
}
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
index 4c72b78..38059a8 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
@@ -147,7 +147,7 @@ public abstract class NettyRemotingAbstract implements RemotingService {
processResponseCommand(ctx, command);
break;
default:
- LOG.warn("Not supported The traffic type {} !", command.trafficType());
+ LOG.warn("The traffic type {} is NOT supported!", command.trafficType());
break;
}
}
@@ -156,6 +156,14 @@ public abstract class NettyRemotingAbstract implements RemotingService {
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
Pair<RequestProcessor, ExecutorService> processorExecutorPair = this.processorTables.get(cmd.cmdCode());
+ if (processorExecutorPair == null) {
+ final RemotingCommand response = commandFactory().createResponse(cmd);
+ response.opCode(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED);
+ ctx.writeAndFlush(response);
+ LOG.warn("The command code {} is NOT supported!", cmd.cmdCode());
+ return;
+ }
+
RemotingChannel channel = new NettyChannelImpl(ctx.channel());
Runnable run = buildProcessorTask(ctx, cmd, processorExecutorPair, channel);
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/package-info.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/package-info.java
index e64f66b..28de0af 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/package-info.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/package-info.java
@@ -21,28 +21,21 @@
* Remoting wire-format protocol description:
*
* <pre>
- * 2015-04-29 16:07:14 v1.0
- * 2016-04-23 16:18:05 v2.0
- * 2016-05-31 09:33:11 v3.0
- * 2016-11-10 09:33:11 v3.1 remove deprecated tag field
- *
- *
- * 1.Protocol Type 1 byte
- * 2.Total Length 4 byte,exclude protocol type size
- * 3.RequestID 4 byte,used for repeatable requests,connection reuse.an requestID string
+ * 1.Protocol Magic 1 byte(0x14)
+ * 2.Total Length 4 byte,exclude protocol type size
+ * 3.Command Code 2 byte, command key
+ * 4.Command Version 2 byte, command version
+ * 5.RequestID 4 byte,used for repeatable requests,connection reuse.an requestID string
* representing a client-generated, globally unique for some time unit, identifier for the request
- * 4.Serializer Type 1 byte
- * 5.Traffic Type 1 byte,0-sync;1-async;2-oneway;3-response
- * 6.OpCode Length 2 byte
- * 7.OpCode variant length,utf8 string
- * 8.Remark Length 2 byte
- * 9.Remark variant length,utf8 string
- * 10.Properties Size 2 byte
- * Property Length 2 byte
- * Property Body variant length,utf8,Key\nValue
- * 11.Inbound or OutBound payload length 4 byte
- * 12.Inbound or OutBound payload variant length, max size limitation is 16M
- * 13.Extra payload variant length
+ * 6.Traffic Type 1 byte,0-sync;1-async;2-oneway;3-response
+ * 7.OpCode 2 byte, operation result code(success or error)
+ * 8.Remark Length 2 byte
+ * 9.Remark variant length,utf8 string
+ * 10.Properties Size 2 byte
+ * 11.Property Length 2 byte
+ * 12.Property Body variant length,utf8,Key\nValue
+ * 13.Inbound or OutBound payload length 4 byte
+ * 14.Inbound or OutBound payload variant length, max size limitation is 16M
*
* </pre>
*/