You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2019/01/08 02:00:31 UTC
[rocketmq] branch snode updated: Add async push logic after send
message succeed
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch snode
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/snode by this push:
new 41366dd Add async push logic after send message succeed
41366dd is described below
commit 41366dda32f72bc0be5e3d33ef064e0ce9e8902a
Author: duhenglucky <du...@gmail.com>
AuthorDate: Tue Jan 8 09:59:54 2019 +0800
Add async push logic after send message succeed
---
.../apache/rocketmq/broker/BrokerController.java | 6 +-
.../apache/rocketmq/broker/out/BrokerOuterAPI.java | 2 +-
.../broker/processor/PullMessageProcessor.java | 2 +-
.../rocketmq/client/impl/MQClientAPIImpl.java | 1 -
.../client/impl/consumer/RebalancePushImpl.java | 6 ++
.../rocketmq/common/protocol/RequestCode.java | 2 +
.../protocol/header/PullMessageRequestHeader.java | 10 +--
.../common/protocol/heartbeat/ConsumerData.java | 21 ++++++-
.../subscription/SubscriptionGroupConfig.java | 34 +++-------
.../apache/rocketmq/namesrv/NamesrvController.java | 3 +-
.../apache/rocketmq/remoting/RemotingClient.java | 8 +--
.../rocketmq/remoting/RemotingClientFactory.java | 10 ++-
.../rocketmq/remoting/RemotingServerFactory.java | 9 ++-
.../remoting}/interceptor/ExceptionContext.java | 2 +-
.../remoting}/interceptor/Interceptor.java | 6 +-
.../remoting}/interceptor/InterceptorFactory.java | 17 ++---
.../remoting}/interceptor/InterceptorGroup.java | 14 ++---
.../remoting}/interceptor/RequestContext.java | 2 +-
.../remoting}/interceptor/ResponseContext.java | 2 +-
.../remoting/netty/NettyRemotingAbstract.java | 2 +-
.../transport/NettyRemotingClientAbstract.java | 12 +---
.../remoting/transport/rocketmq/NettyDecoder.java | 5 +-
.../transport/rocketmq/NettyRemotingClient.java | 1 -
.../transport/rocketmq/NettyRemotingServer.java | 6 +-
.../org/apache/rocketmq/snode/SnodeController.java | 37 +++++------
.../rocketmq/snode/client/ConsumerGroupInfo.java | 1 -
.../rocketmq/snode/client/ConsumerManager.java | 5 +-
.../rocketmq/snode/client/ProducerManager.java | 4 --
.../rocketmq/snode/client/PushSessionManager.java | 56 -----------------
.../snode/client/SubscriptionGroupManager.java | 1 -
.../apache/rocketmq/snode/config/SnodeConfig.java | 12 ++++
.../snode/processor/HeartbeatProcessor.java | 7 ++-
.../snode/processor/PullMessageProcessor.java | 72 ++++++++++++++--------
.../snode/processor/SendMessageProcessor.java | 36 +++++++++--
.../rocketmq/snode/service/EnodeService.java | 8 ++-
.../apache/rocketmq/snode/service/PushService.java | 6 +-
.../snode/service/impl/EnodeServiceImpl.java | 20 ++----
.../snode/service/impl/PushServiceImpl.java | 57 +++++++++--------
.../snode/service/impl/ScheduledServiceImpl.java | 2 +-
39 files changed, 244 insertions(+), 263 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 61a5008..c12ebe1 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -248,14 +248,12 @@ public class BrokerController {
result = result && this.messageStore.load();
if (result) {
- this.remotingServer = RemotingServerFactory.createInstance();
- this.remotingServer.init(this.nettyServerConfig, this.clientHousekeepingService);
+ this.remotingServer = RemotingServerFactory.getInstance().createRemotingServer().init(this.nettyServerConfig, this.clientHousekeepingService);
// this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
ServerConfig fastConfig = (ServerConfig) this.nettyServerConfig.clone();
fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
// this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
- this.fastRemotingServer = RemotingServerFactory.createInstance();
- this.fastRemotingServer.init(fastConfig, this.clientHousekeepingService);
+ this.fastRemotingServer = RemotingServerFactory.getInstance().createRemotingServer().init(fastConfig, this.clientHousekeepingService);
this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getSendMessageThreadPoolNums(),
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index 2c204ce..4be411a 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -71,7 +71,7 @@ public class BrokerOuterAPI {
}
public BrokerOuterAPI(final ClientConfig nettyClientConfig, RPCHook rpcHook) {
- this.remotingClient = RemotingClientFactory.createInstance().init(nettyClientConfig, null);
+ this.remotingClient = RemotingClientFactory.getInstance().createRemotingClient().init(nettyClientConfig, null);
this.remotingClient.registerRPCHook(rpcHook);
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
index 391b599..60b9ad8 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
@@ -100,7 +100,7 @@ public class PullMessageProcessor implements RequestProcessor {
response.setOpaque(request.getOpaque());
- log.debug("receive PullMessage request command, {}", request);
+ log.info("receive PullMessage request command, {}", request);
if (!PermName.isReadable(this.brokerController.getBrokerConfig().getBrokerPermission())) {
response.setCode(ResponseCode.NO_PERMISSION);
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 39f1b61..5e009fd 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -562,7 +562,6 @@ public class MQClientAPIImpl {
final CommunicationMode communicationMode,
final PullCallback pullCallback
) throws RemotingException, MQBrokerException, InterruptedException {
- requestHeader.setEnodeAddr(addr);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SNODE_PULL_MESSAGE, requestHeader);
switch (communicationMode) {
case ONEWAY:
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
index e5166f3..79e1a64 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.client.impl.consumer;
+import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@@ -58,6 +59,11 @@ public class RebalancePushImpl extends RebalanceImpl {
log.info("{} Rebalance changed, also update version: {}, {}", topic, subscriptionData.getSubVersion(), newVersion);
subscriptionData.setSubVersion(newVersion);
+ Set<Integer> queueIdSet = new HashSet<Integer>();
+ for (MessageQueue messageQueue : mqAll) {
+ queueIdSet.add(messageQueue.getQueueId());
+ }
+ subscriptionData.setQueueIdSet(queueIdSet);
int currentQueueCount = this.processQueueTable.size();
if (currentQueueCount != 0) {
int pullThresholdForTopic = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdForTopic();
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
index b36bdb2..f9b097f 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
@@ -172,4 +172,6 @@ public class RequestCode {
public static final int SNODE_PULL_MESSAGE = 351;
+ public static final int SNODE_PUSH_MESSAGE = 352;
+
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java
index 8332307..53be8b2 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java
@@ -48,14 +48,14 @@ public class PullMessageRequestHeader implements CommandCustomHeader {
private Long subVersion;
private String expressionType;
- private String enodeAddr;
+ private String enodeName;
- public String getEnodeAddr() {
- return enodeAddr;
+ public String getEnodeName() {
+ return enodeName;
}
- public void setEnodeAddr(String enodeAddr) {
- this.enodeAddr = enodeAddr;
+ public void setEnodeName(String enodeName) {
+ this.enodeName = enodeName;
}
@Override
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/ConsumerData.java b/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/ConsumerData.java
index d4605d0..505eeed 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/ConsumerData.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/ConsumerData.java
@@ -31,6 +31,7 @@ public class ConsumerData {
private ConsumeFromWhere consumeFromWhere;
private Set<SubscriptionData> subscriptionDataSet = new HashSet<SubscriptionData>();
private boolean unitMode;
+ private boolean realPushEnable = true;
public String getGroupName() {
return groupName;
@@ -80,10 +81,24 @@ public class ConsumerData {
this.unitMode = isUnitMode;
}
+ public boolean isRealPushEnable() {
+ return realPushEnable;
+ }
+
+ public void setRealPushEnable(boolean realPushEnable) {
+ this.realPushEnable = realPushEnable;
+ }
+
@Override
public String toString() {
- return "ConsumerData [groupName=" + groupName + ", consumeType=" + consumeType + ", messageModel="
- + messageModel + ", consumeFromWhere=" + consumeFromWhere + ", unitMode=" + unitMode
- + ", subscriptionDataSet=" + subscriptionDataSet + "]";
+ return "ConsumerData{" +
+ "groupName='" + groupName + '\'' +
+ ", consumeType=" + consumeType +
+ ", messageModel=" + messageModel +
+ ", consumeFromWhere=" + consumeFromWhere +
+ ", subscriptionDataSet=" + subscriptionDataSet +
+ ", unitMode=" + unitMode +
+ ", realPushEnable=" + realPushEnable +
+ '}';
}
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/subscription/SubscriptionGroupConfig.java b/common/src/main/java/org/apache/rocketmq/common/subscription/SubscriptionGroupConfig.java
index 158b025..8f4703f 100644
--- a/common/src/main/java/org/apache/rocketmq/common/subscription/SubscriptionGroupConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/subscription/SubscriptionGroupConfig.java
@@ -38,8 +38,6 @@ public class SubscriptionGroupConfig {
private boolean notifyConsumerIdsChangedEnable = true;
- private boolean realPushEnable = false;
-
public String getGroupName() {
return groupName;
}
@@ -112,14 +110,6 @@ public class SubscriptionGroupConfig {
this.notifyConsumerIdsChangedEnable = notifyConsumerIdsChangedEnable;
}
- public boolean isRealPushEnable() {
- return realPushEnable;
- }
-
- public void setRealPushEnable(boolean realPushEnable) {
- this.realPushEnable = realPushEnable;
- }
-
@Override
public int hashCode() {
final int prime = 31;
@@ -128,7 +118,6 @@ public class SubscriptionGroupConfig {
result = prime * result + (consumeBroadcastEnable ? 1231 : 1237);
result = prime * result + (consumeEnable ? 1231 : 1237);
result = prime * result + (consumeFromMinEnable ? 1231 : 1237);
- result = prime * result + (realPushEnable ? 1231 : 1237);
result = prime * result + (notifyConsumerIdsChangedEnable ? 1231 : 1237);
result = prime * result + ((groupName == null) ? 0 : groupName.hashCode());
result = prime * result + retryMaxTimes;
@@ -155,8 +144,6 @@ public class SubscriptionGroupConfig {
return false;
if (consumeFromMinEnable != other.consumeFromMinEnable)
return false;
- if (realPushEnable != other.realPushEnable)
- return false;
if (groupName == null) {
if (other.groupName != null)
return false;
@@ -173,18 +160,13 @@ public class SubscriptionGroupConfig {
return true;
}
- @Override public String toString() {
- return "SubscriptionGroupConfig{" +
- "groupName='" + groupName + '\'' +
- ", consumeEnable=" + consumeEnable +
- ", consumeFromMinEnable=" + consumeFromMinEnable +
- ", consumeBroadcastEnable=" + consumeBroadcastEnable +
- ", retryQueueNums=" + retryQueueNums +
- ", retryMaxTimes=" + retryMaxTimes +
- ", brokerId=" + brokerId +
- ", whichBrokerWhenConsumeSlowly=" + whichBrokerWhenConsumeSlowly +
- ", notifyConsumerIdsChangedEnable=" + notifyConsumerIdsChangedEnable +
- ", realPushEnable=" + realPushEnable +
- '}';
+ @Override
+ public String toString() {
+ return "SubscriptionGroupConfig [groupName=" + groupName + ", consumeEnable=" + consumeEnable
+ + ", consumeFromMinEnable=" + consumeFromMinEnable + ", consumeBroadcastEnable="
+ + consumeBroadcastEnable + ", retryQueueNums=" + retryQueueNums + ", retryMaxTimes="
+ + retryMaxTimes + ", brokerId=" + brokerId + ", whichBrokerWhenConsumeSlowly="
+ + whichBrokerWhenConsumeSlowly + ", notifyConsumerIdsChangedEnable="
+ + notifyConsumerIdsChangedEnable + "]";
}
}
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
index e833ae6..740d502 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
@@ -77,9 +77,8 @@ public class NamesrvController {
this.kvConfigManager.load();
- this.remotingServer = RemotingServerFactory.createInstance();
+ this.remotingServer = RemotingServerFactory.getInstance().createRemotingServer();
this.remotingServer.init(this.nettyServerConfig, this.brokerHousekeepingService);
-// this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
index 84d4241..bc9b61a 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
@@ -26,19 +26,19 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public interface RemotingClient extends RemotingService {
- void updateNameServerAddressList(final List<String> addrs);
+ void updateNameServerAddressList(final List<String> addresses);
List<String> getNameServerAddressList();
- RemotingCommand invokeSync(final String addr, final RemotingCommand request,
+ RemotingCommand invokeSync(final String address, final RemotingCommand request,
final long timeoutMillis) throws InterruptedException, RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException;
- void invokeAsync(final String addr, final RemotingCommand request, final long timeoutMillis,
+ void invokeAsync(final String address, final RemotingCommand request, final long timeoutMillis,
final InvokeCallback invokeCallback) throws InterruptedException, RemotingConnectException,
RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;
- void invokeOneway(final String addr, final RemotingCommand request, final long timeoutMillis)
+ void invokeOneway(final String address, final RemotingCommand request, final long timeoutMillis)
throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException,
RemotingTimeoutException, RemotingSendRequestException;
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClientFactory.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClientFactory.java
index 4df2a70..b5e5a91 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClientFactory.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClientFactory.java
@@ -24,7 +24,11 @@ import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.util.ServiceProvider;
public class RemotingClientFactory {
- private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
+ private static RemotingClientFactory instance = new RemotingClientFactory();
+
+ public static RemotingClientFactory getInstance(){
+ return instance;
+ }
private RemotingClientFactory() {
}
@@ -37,11 +41,11 @@ public class RemotingClientFactory {
paths = ServiceProvider.loadPath(CLIENT_LOCATION);
}
- public static RemotingClient createInstance(String protocol) {
+ public RemotingClient createRemotingClient(String protocol) {
return ServiceProvider.createInstance(paths.get(protocol), RemotingClient.class);
}
- public static RemotingClient createInstance() {
+ public RemotingClient createRemotingClient() {
return ServiceProvider.createInstance(paths.get(RemotingUtil.DEFAULT_PROTOCOL), RemotingClient.class);
}
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServerFactory.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServerFactory.java
index 6dbf22a..231e19a 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServerFactory.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServerFactory.java
@@ -24,8 +24,11 @@ import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.util.ServiceProvider;
public class RemotingServerFactory {
+ private static RemotingServerFactory instance = new RemotingServerFactory();
- private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
+ public static RemotingServerFactory getInstance() {
+ return instance;
+ }
private RemotingServerFactory() {
}
@@ -38,11 +41,11 @@ public class RemotingServerFactory {
protocolPathMap = ServiceProvider.loadPath(SERVER_LOCATION);
}
- public static RemotingServer createInstance(String protocol) {
+ public RemotingServer createRemotingServer(String protocol) {
return ServiceProvider.createInstance(protocolPathMap.get(protocol), RemotingClient.class);
}
- public static RemotingServer createInstance() {
+ public RemotingServer createRemotingServer() {
return ServiceProvider.createInstance(protocolPathMap.get(RemotingUtil.DEFAULT_PROTOCOL), RemotingServer.class);
}
}
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/interceptor/ExceptionContext.java b/remoting/src/main/java/org/apache/rocketmq/remoting/interceptor/ExceptionContext.java
similarity index 96%
rename from snode/src/main/java/org/apache/rocketmq/snode/interceptor/ExceptionContext.java
rename to remoting/src/main/java/org/apache/rocketmq/remoting/interceptor/ExceptionContext.java
index 192f249..bfb9d23 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/interceptor/ExceptionContext.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/interceptor/ExceptionContext.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.snode.interceptor;
+package org.apache.rocketmq.remoting.interceptor;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/interceptor/Interceptor.java b/remoting/src/main/java/org/apache/rocketmq/remoting/interceptor/Interceptor.java
similarity index 84%
rename from snode/src/main/java/org/apache/rocketmq/snode/interceptor/Interceptor.java
rename to remoting/src/main/java/org/apache/rocketmq/remoting/interceptor/Interceptor.java
index 18cf6b3..5baee83 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/interceptor/Interceptor.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/interceptor/Interceptor.java
@@ -14,11 +14,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.snode.interceptor;
+package org.apache.rocketmq.remoting.interceptor;
public interface Interceptor {
- void beforeSendMessage(RequestContext requestContext);
+ void beforeRequest(RequestContext requestContext);
- void afterSendMessage(ResponseContext responseContext);
+ void afterRequest(ResponseContext responseContext);
void onException(ExceptionContext exceptionContext);
}
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/interceptor/InterceptorFactory.java b/remoting/src/main/java/org/apache/rocketmq/remoting/interceptor/InterceptorFactory.java
similarity index 59%
rename from snode/src/main/java/org/apache/rocketmq/snode/interceptor/InterceptorFactory.java
rename to remoting/src/main/java/org/apache/rocketmq/remoting/interceptor/InterceptorFactory.java
index e2b4332..16d5170 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/interceptor/InterceptorFactory.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/interceptor/InterceptorFactory.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.snode.interceptor;
+package org.apache.rocketmq.remoting.interceptor;
import java.util.List;
import org.apache.rocketmq.remoting.util.ServiceProvider;
@@ -26,21 +26,12 @@ public class InterceptorFactory {
return ourInstance;
}
- private final String SEND_MESSAGE_INTERCEPTOR = "META-INF/service/org.apache.rocketmq.snode.interceptor.SendMessageInterceptor";
-
- private final String CONSUME_MESSAGE_INTERCEPTOR = "META-INF/service/org.apache.rocketmq.snode.interceptor.ConsumeMessageInterceptor";
-
private InterceptorFactory() {
}
- public List loadConsumeMessageInterceptors() {
- List<Interceptor> consumeMessageInterceptors = ServiceProvider.loadServiceList(CONSUME_MESSAGE_INTERCEPTOR, Interceptor.class);
- return consumeMessageInterceptors;
- }
-
- public List loadSendMessageInterceptors() {
- List<Interceptor> sendMessageInterceptors = ServiceProvider.loadServiceList(SEND_MESSAGE_INTERCEPTOR, Interceptor.class);
- return sendMessageInterceptors;
+ public List loadInterceptors(String servicePath) {
+ List<Interceptor> interceptors = ServiceProvider.loadServiceList(servicePath, Interceptor.class);
+ return interceptors;
}
}
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/interceptor/InterceptorGroup.java b/remoting/src/main/java/org/apache/rocketmq/remoting/interceptor/InterceptorGroup.java
similarity index 77%
rename from snode/src/main/java/org/apache/rocketmq/snode/interceptor/InterceptorGroup.java
rename to remoting/src/main/java/org/apache/rocketmq/remoting/interceptor/InterceptorGroup.java
index fcee7a5..de80021 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/interceptor/InterceptorGroup.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/interceptor/InterceptorGroup.java
@@ -14,28 +14,28 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.snode.interceptor;
+package org.apache.rocketmq.remoting.interceptor;
import java.util.ArrayList;
import java.util.List;
public class InterceptorGroup {
- private List<Interceptor> interceptors = new ArrayList<>();
+ private List<Interceptor> interceptors = new ArrayList<Interceptor>();
- public void registerInterceptor(Interceptor sendMessageInterceptor) {
- if (sendMessageInterceptor != null) {
- interceptors.add(sendMessageInterceptor);
+ public void registerInterceptor(Interceptor interceptor) {
+ if (interceptor != null) {
+ interceptors.add(interceptor);
}
}
public void beforeRequest(RequestContext requestContext) {
for (Interceptor interceptor : interceptors) {
- interceptor.beforeSendMessage(requestContext);
+ interceptor.beforeRequest(requestContext);
}
}
public void afterRequest(ResponseContext responseContext) {
for (Interceptor interceptor : interceptors) {
- interceptor.afterSendMessage(responseContext);
+ interceptor.afterRequest(responseContext);
}
}
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/interceptor/RequestContext.java b/remoting/src/main/java/org/apache/rocketmq/remoting/interceptor/RequestContext.java
similarity index 96%
rename from snode/src/main/java/org/apache/rocketmq/snode/interceptor/RequestContext.java
rename to remoting/src/main/java/org/apache/rocketmq/remoting/interceptor/RequestContext.java
index 5f32e6b..a0fffbb 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/interceptor/RequestContext.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/interceptor/RequestContext.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.snode.interceptor;
+package org.apache.rocketmq.remoting.interceptor;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/interceptor/ResponseContext.java b/remoting/src/main/java/org/apache/rocketmq/remoting/interceptor/ResponseContext.java
similarity index 96%
rename from snode/src/main/java/org/apache/rocketmq/snode/interceptor/ResponseContext.java
rename to remoting/src/main/java/org/apache/rocketmq/remoting/interceptor/ResponseContext.java
index 9901981..a643b44 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/interceptor/ResponseContext.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/interceptor/ResponseContext.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.snode.interceptor;
+package org.apache.rocketmq.remoting.interceptor;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
index 1e0235d..1555959 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
@@ -468,7 +468,7 @@ public abstract class NettyRemotingAbstract {
public void invokeAsyncImpl(final String addr, final Channel currentChannel, final RemotingCommand request,
final long timeoutMillis,
final InvokeCallback invokeCallback)
- throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
+ throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException {
final long beginStartTime = System.currentTimeMillis();
boolean acquired = semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
if (acquired) {
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/NettyRemotingClientAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/NettyRemotingClientAbstract.java
index 957b053..b61fc63 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/NettyRemotingClientAbstract.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/NettyRemotingClientAbstract.java
@@ -29,10 +29,8 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Random;
-import java.util.Timer;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@@ -40,17 +38,11 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
-import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.common.RemotingUtil;
-import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
-import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
-import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
import org.apache.rocketmq.remoting.netty.NettyEvent;
import org.apache.rocketmq.remoting.netty.NettyEventType;
import org.apache.rocketmq.remoting.netty.NettyRemotingAbstract;
-import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-import org.apache.rocketmq.remoting.util.ThreadUtils;
public abstract class NettyRemotingClientAbstract extends NettyRemotingAbstract {
private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
@@ -179,7 +171,7 @@ public abstract class NettyRemotingClientAbstract extends NettyRemotingAbstract
@Override
protected Channel getAndCreateChannel(final String addr, long timeout) throws InterruptedException {
if (null == addr) {
- return getAndCreateNameserverChannel(timeout);
+ return getAndCreateNameServerChannel(timeout);
}
ChannelWrapper cw = this.channelTables.get(addr);
@@ -190,7 +182,7 @@ public abstract class NettyRemotingClientAbstract extends NettyRemotingAbstract
return this.createChannel(addr, timeout);
}
- private Channel getAndCreateNameserverChannel(long timeout) throws InterruptedException {
+ private Channel getAndCreateNameServerChannel(long timeout) throws InterruptedException {
String addr = this.namesrvAddrChoosed.get();
if (addr != null) {
ChannelWrapper cw = this.channelTables.get(addr);
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyDecoder.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyDecoder.java
index 70858a9..9240630 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyDecoder.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyDecoder.java
@@ -20,12 +20,11 @@ import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import java.nio.ByteBuffer;
-import org.apache.rocketmq.remoting.common.RemotingHelper;
-import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.netty.CodecHelper;
-import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public class NettyDecoder extends LengthFieldBasedFrameDecoder {
private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingClient.java
index a5790df..5e91b7c 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingClient.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingClient.java
@@ -61,7 +61,6 @@ public class NettyRemotingClient extends NettyRemotingClientAbstract implements
private ExecutorService publicExecutor;
- private ExecutorService asyncExecutor;
/**
* Invoke the callback methods in this executor when process response.
*/
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingServer.java
index 8f81ac2..f7ff842 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingServer.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingServer.java
@@ -20,7 +20,6 @@ import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
@@ -46,6 +45,8 @@ import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.RemotingServer;
+import org.apache.rocketmq.remoting.RequestProcessor;
+import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.remoting.common.Pair;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.common.TlsMode;
@@ -53,10 +54,7 @@ import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
import org.apache.rocketmq.remoting.netty.FileRegionEncoder;
-import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl;
import org.apache.rocketmq.remoting.netty.NettyChannelImpl;
-import org.apache.rocketmq.remoting.RequestProcessor;
-import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.remoting.netty.TlsHelper;
import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java b/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java
index f61b662..944277e 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java
@@ -28,23 +28,22 @@ import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.RemotingClient;
import org.apache.rocketmq.remoting.RemotingClientFactory;
import org.apache.rocketmq.remoting.RemotingServer;
import org.apache.rocketmq.remoting.RemotingServerFactory;
-import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.snode.client.ClientHousekeepingService;
import org.apache.rocketmq.snode.client.ConsumerIdsChangeListener;
import org.apache.rocketmq.snode.client.ConsumerManager;
import org.apache.rocketmq.snode.client.DefaultConsumerIdsChangeListener;
import org.apache.rocketmq.snode.client.ProducerManager;
-import org.apache.rocketmq.snode.client.PushSessionManager;
import org.apache.rocketmq.snode.client.SubscriptionGroupManager;
import org.apache.rocketmq.snode.config.SnodeConfig;
-import org.apache.rocketmq.snode.interceptor.InterceptorFactory;
-import org.apache.rocketmq.snode.interceptor.InterceptorGroup;
-import org.apache.rocketmq.snode.interceptor.Interceptor;
+import org.apache.rocketmq.remoting.interceptor.Interceptor;
+import org.apache.rocketmq.remoting.interceptor.InterceptorFactory;
+import org.apache.rocketmq.remoting.interceptor.InterceptorGroup;
import org.apache.rocketmq.snode.offset.ConsumerOffsetManager;
import org.apache.rocketmq.snode.processor.ConsumerManageProcessor;
import org.apache.rocketmq.snode.processor.HeartbeatProcessor;
@@ -86,7 +85,6 @@ public class SnodeController {
private HeartbeatProcessor hearbeatProcessor;
private InterceptorGroup consumeMessageInterceptorGroup;
private InterceptorGroup sendMessageInterceptorGroup;
- private PushSessionManager pushSessionManager;
private PushService pushService;
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
@@ -101,14 +99,14 @@ public class SnodeController {
this.enodeService = new EnodeServiceImpl(this);
this.nnodeService = new NnodeServiceImpl(this);
this.scheduledService = new ScheduledServiceImpl(this);
- this.remotingClient = RemotingClientFactory.createInstance().init(this.getNettyClientConfig(), null);
+ this.remotingClient = RemotingClientFactory.getInstance().createRemotingClient().init(this.getNettyClientConfig(), null);
this.sendMessageExecutor = ThreadUtils.newThreadPoolExecutor(
snodeConfig.getSnodeSendMessageMinPoolSize(),
snodeConfig.getSnodeSendMessageMaxPoolSize(),
3000,
TimeUnit.MILLISECONDS,
- new ArrayBlockingQueue<Runnable>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()),
+ new ArrayBlockingQueue<>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()),
"SnodeSendMessageThread",
false);
@@ -117,7 +115,7 @@ public class SnodeController {
snodeConfig.getSnodeSendMessageMaxPoolSize(),
3000,
TimeUnit.MILLISECONDS,
- new ArrayBlockingQueue<Runnable>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()),
+ new ArrayBlockingQueue<>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()),
"SnodePullMessageThread",
false);
@@ -126,7 +124,7 @@ public class SnodeController {
snodeConfig.getSnodeHeartBeatMaxPoolSize(),
1000 * 60,
TimeUnit.MILLISECONDS,
- new ArrayBlockingQueue<Runnable>(snodeConfig.getSnodeHeartBeatThreadPoolQueueCapacity()),
+ new ArrayBlockingQueue<>(snodeConfig.getSnodeHeartBeatThreadPoolQueueCapacity()),
"SnodeHeartbeatThread",
true);
@@ -135,7 +133,7 @@ public class SnodeController {
snodeConfig.getSnodeSendMessageMaxPoolSize(),
3000,
TimeUnit.MILLISECONDS,
- new ArrayBlockingQueue<Runnable>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()),
+ new ArrayBlockingQueue<>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()),
"SnodePullMessageThread",
false);
@@ -144,7 +142,7 @@ public class SnodeController {
snodeConfig.getSnodeSendMessageMaxPoolSize(),
3000,
TimeUnit.MILLISECONDS,
- new ArrayBlockingQueue<Runnable>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()),
+ new ArrayBlockingQueue<>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()),
"ConsumerManagerThread",
false);
@@ -164,7 +162,6 @@ public class SnodeController {
this.sendMessageProcessor = new SendMessageProcessor(this);
this.hearbeatProcessor = new HeartbeatProcessor(this);
this.pullMessageProcessor = new PullMessageProcessor(this);
- this.pushSessionManager = new PushSessionManager();
this.pushService = new PushServiceImpl(this);
}
@@ -174,22 +171,22 @@ public class SnodeController {
}
public boolean initialize() {
- this.snodeServer = RemotingServerFactory.createInstance().init(this.nettyServerConfig, this.clientHousekeepingService);
+ this.snodeServer = RemotingServerFactory.getInstance().createRemotingServer().init(this.nettyServerConfig, this.clientHousekeepingService);
this.registerProcessor();
initInterceptorGroup();
return true;
}
private void initInterceptorGroup() {
- List<Interceptor> consumeMessageInterceptors = InterceptorFactory.getInstance().loadConsumeMessageInterceptors();
- if (consumeMessageInterceptors != null) {
+ List<Interceptor> consumeMessageInterceptors = InterceptorFactory.getInstance().loadInterceptors(this.snodeConfig.getConsumeMessageInterceptorPath());
+ if (consumeMessageInterceptors != null && consumeMessageInterceptors.size() > 0) {
this.consumeMessageInterceptorGroup = new InterceptorGroup();
for (Interceptor interceptor : consumeMessageInterceptors) {
this.consumeMessageInterceptorGroup.registerInterceptor(interceptor);
}
}
- List<Interceptor> sendMessageInterceptors = InterceptorFactory.getInstance().loadSendMessageInterceptors();
- if (sendMessageInterceptors != null) {
+ List<Interceptor> sendMessageInterceptors = InterceptorFactory.getInstance().loadInterceptors(this.snodeConfig.getSendMessageInterceptorPath());
+ if (sendMessageInterceptors != null && sendMessageInterceptors.size() > 0) {
this.sendMessageInterceptorGroup = new InterceptorGroup();
for (Interceptor interceptor : sendMessageInterceptors) {
this.sendMessageInterceptorGroup.registerInterceptor(interceptor);
@@ -275,10 +272,6 @@ public class SnodeController {
return sendMessageInterceptorGroup;
}
- public PushSessionManager getPushSessionManager() {
- return pushSessionManager;
- }
-
public PushService getPushService() {
return pushService;
}
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/ConsumerGroupInfo.java b/snode/src/main/java/org/apache/rocketmq/snode/client/ConsumerGroupInfo.java
index c3cd21f..c8140c5 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/client/ConsumerGroupInfo.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/client/ConsumerGroupInfo.java
@@ -16,7 +16,6 @@
*/
package org.apache.rocketmq.snode.client;
-import java.nio.channels.Channel;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/ConsumerManager.java b/snode/src/main/java/org/apache/rocketmq/snode/client/ConsumerManager.java
index f44da69..dc3bb02 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/client/ConsumerManager.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/client/ConsumerManager.java
@@ -85,8 +85,7 @@ public class ConsumerManager {
if (info.getChannelInfoTable().isEmpty()) {
ConsumerGroupInfo remove = this.consumerTable.remove(next.getKey());
if (remove != null) {
- log.info("Unregister consumer ok, no any connection, and remove consumer group, {}",
- next.getKey());
+ log.info("Unregister consumer ok, no any connection, and remove consumer group, {}", next.getKey());
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.UNREGISTER, next.getKey());
}
}
@@ -208,7 +207,7 @@ public class ConsumerManager {
}
}
- public ClientChannelInfo getClientInfoTable(String topic, long queueId) {
+ public ClientChannelInfo getClientInfoTable(String topic, Integer queueId) {
ConcurrentHashMap<Integer, ClientChannelInfo> clientChannelInfoMap = this.topicConsumerTable.get(topic);
if (clientChannelInfoMap != null) {
return clientChannelInfoMap.get(queueId);
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/ProducerManager.java b/snode/src/main/java/org/apache/rocketmq/snode/client/ProducerManager.java
index b018dc3..dc2bb21 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/client/ProducerManager.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/client/ProducerManager.java
@@ -16,11 +16,8 @@
*/
package org.apache.rocketmq.snode.client;
-import io.netty.channel.Channel;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
-import java.util.List;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
@@ -31,7 +28,6 @@ import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.common.RemotingHelper;
-import org.apache.rocketmq.remoting.common.RemotingUtil;
public class ProducerManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/PushSessionManager.java b/snode/src/main/java/org/apache/rocketmq/snode/client/PushSessionManager.java
deleted file mode 100644
index f4228c5..0000000
--- a/snode/src/main/java/org/apache/rocketmq/snode/client/PushSessionManager.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.snode.client;
-
-import java.util.concurrent.ConcurrentHashMap;
-
-public class PushSessionManager {
- private final ConcurrentHashMap<String/*Topic*/, ConcurrentHashMap<Integer/*QueueId*/, ClientChannelInfo>> topicConsumerTable = new ConcurrentHashMap<>(2048);
-
- public void updateTopicConsumerTable(String topic, int queueId, ClientChannelInfo clientChannelInfo) {
- ConcurrentHashMap<Integer, ClientChannelInfo> clientChannelInfoMap = this.topicConsumerTable.get(topic);
-
- if (clientChannelInfoMap == null) {
- clientChannelInfoMap = new ConcurrentHashMap<>();
- ConcurrentHashMap prev = this.topicConsumerTable.putIfAbsent(topic, clientChannelInfoMap);
- if (prev != null) {
- clientChannelInfoMap = prev;
- }
- }
- clientChannelInfoMap.put(queueId, clientChannelInfo);
- }
-
- public ClientChannelInfo getClientInfoTable(String topic, long queueId) {
- ConcurrentHashMap<Integer, ClientChannelInfo> clientChannelInfoMap = this.topicConsumerTable.get(topic);
- if (clientChannelInfoMap != null) {
- return clientChannelInfoMap.get(queueId);
- }
- return null;
- }
-
- public void removeConsumerTopicTable(String topic, Integer queueId, ClientChannelInfo clientChannelInfo) {
- ConcurrentHashMap<Integer, ClientChannelInfo> clientChannelInfoMap = this.topicConsumerTable.get(topic);
- if (clientChannelInfoMap != null) {
- ClientChannelInfo old = clientChannelInfoMap.get(queueId);
- //TODO Thread safe issue: wait for the next heartbeat
- if (old == clientChannelInfo) {
- clientChannelInfoMap.remove(queueId, clientChannelInfo);
- }
- }
-
- }
-}
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionGroupManager.java b/snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionGroupManager.java
index 8b977d9..85ed8b1 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionGroupManager.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionGroupManager.java
@@ -20,7 +20,6 @@ import java.util.Iterator;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import org.apache.rocketmq.common.ConfigManager;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/config/SnodeConfig.java b/snode/src/main/java/org/apache/rocketmq/snode/config/SnodeConfig.java
index af14f0e..cd5beab 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/config/SnodeConfig.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/config/SnodeConfig.java
@@ -67,6 +67,10 @@ public class SnodeConfig {
private int snodePushMessageThreadPoolQueueCapacity = 10000;
+ private final String sendMessageInterceptorPath = "META-INF/service/org.apache.rocketmq.snode.interceptor.SendMessageInterceptor";
+
+ private final String consumeMessageInterceptorPath = "META-INF/service/org.apache.rocketmq.snode.interceptor.ConsumeMessageInterceptor";
+
private int listenPort = 11911;
@@ -264,4 +268,12 @@ public class SnodeConfig {
public void setSnodePushMessageThreadPoolQueueCapacity(int snodePushMessageThreadPoolQueueCapacity) {
this.snodePushMessageThreadPoolQueueCapacity = snodePushMessageThreadPoolQueueCapacity;
}
+
+ public String getSendMessageInterceptorPath() {
+ return sendMessageInterceptorPath;
+ }
+
+ public String getConsumeMessageInterceptorPath() {
+ return consumeMessageInterceptorPath;
+ }
}
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java
index f8ba684..2ea53ea 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java
@@ -16,7 +16,6 @@
*/
package org.apache.rocketmq.snode.processor;
-import java.util.List;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
@@ -31,6 +30,8 @@ import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.RequestProcessor;
import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl;
+import org.apache.rocketmq.remoting.netty.NettyChannelImpl;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.client.ClientChannelInfo;
@@ -60,7 +61,7 @@ public class HeartbeatProcessor implements RequestProcessor {
private RemotingCommand heartbeat(RemotingChannel remotingChannel, RemotingCommand request) {
HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);
ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
- remotingChannel,
+ new NettyChannelImpl((((NettyChannelHandlerContextImpl) remotingChannel).getChannelHandlerContext().channel())),
heartbeatData.getClientID(),
request.getLanguage(),
request.getVersion()
@@ -99,7 +100,7 @@ public class HeartbeatProcessor implements RequestProcessor {
);
}
- if (subscriptionGroupConfig.isRealPushEnable()) {
+ if (data.isRealPushEnable()) {
this.snodeController.getConsumerManager().updateTopicConsumerTable(data.getSubscriptionDataSet(), clientChannelInfo);
}
}
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/PullMessageProcessor.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/PullMessageProcessor.java
index 2abc196..4be3450 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/processor/PullMessageProcessor.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/PullMessageProcessor.java
@@ -15,6 +15,7 @@
* limitations under the License.
*/
package org.apache.rocketmq.snode.processor;
+
import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.help.FAQUrl;
@@ -24,6 +25,7 @@ import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.common.sysflag.PullSysFlag;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RemotingChannel;
@@ -32,9 +34,9 @@ import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.client.ConsumerGroupInfo;
-import org.apache.rocketmq.snode.interceptor.ExceptionContext;
-import org.apache.rocketmq.snode.interceptor.RequestContext;
-import org.apache.rocketmq.snode.interceptor.ResponseContext;
+import org.apache.rocketmq.remoting.interceptor.ExceptionContext;
+import org.apache.rocketmq.remoting.interceptor.RequestContext;
+import org.apache.rocketmq.remoting.interceptor.ResponseContext;
public class PullMessageProcessor implements RequestProcessor {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
@@ -63,11 +65,9 @@ public class PullMessageProcessor implements RequestProcessor {
private RemotingCommand pullMessage(RemotingChannel remotingChannel,
RemotingCommand request) throws RemotingCommandException {
RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
+ response.setOpaque(request.getOpaque());
final PullMessageRequestHeader requestHeader =
(PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
-
- ConsumerGroupInfo consumerGroupInfo = snodeController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup());
-
SubscriptionGroupConfig subscriptionGroupConfig =
this.snodeController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup());
if (null == subscriptionGroupConfig) {
@@ -82,29 +82,53 @@ public class PullMessageProcessor implements RequestProcessor {
return response;
}
- if (!subscriptionGroupConfig.isConsumeBroadcastEnable()
- && consumerGroupInfo.getMessageModel() == MessageModel.BROADCASTING) {
- response.setCode(ResponseCode.NO_PERMISSION);
- response.setRemark("The consumer group[" + requestHeader.getConsumerGroup() + "] can not consume by broadcast way");
- return response;
- }
- if ((consumerGroupInfo == null) || (consumerGroupInfo.findSubscriptionData(requestHeader.getTopic()) == null)) {
- log.warn("The consumer's subscription not exist, group: {}, topic:{}", requestHeader.getConsumerGroup(), requestHeader.getTopic());
- response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
- response.setRemark("The consumer's subscription not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));
+ final boolean hasSubscriptionFlag = PullSysFlag.hasSubscriptionFlag(requestHeader.getSysFlag());
+
+ if (requestHeader.getQueueId() < 0) {
+ String errorInfo = String.format("QueueId[%d] is illegal, topic:[%s] consumer:[%s]",
+ requestHeader.getQueueId(), requestHeader.getTopic(), remotingChannel.remoteAddress());
+ log.warn(errorInfo);
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark(errorInfo);
return response;
}
- SubscriptionData subscriptionData = consumerGroupInfo.findSubscriptionData(requestHeader.getTopic());
- if (subscriptionData.getSubVersion() < requestHeader.getSubVersion()) {
- log.warn("The broker's subscription is not latest, group: {} {}", requestHeader.getConsumerGroup(),
- subscriptionData.getSubString());
- response.setCode(ResponseCode.SUBSCRIPTION_NOT_LATEST);
- response.setRemark("The consumer's subscription not latest");
- return response;
+ SubscriptionData subscriptionData;
+ if (!hasSubscriptionFlag) {
+ ConsumerGroupInfo consumerGroupInfo =
+ this.snodeController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup());
+ if (null == consumerGroupInfo) {
+ log.warn("The consumer's group info not exist, group: {}", requestHeader.getConsumerGroup());
+ response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
+ response.setRemark("The consumer's group info not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));
+ return response;
+ }
+
+ if (!subscriptionGroupConfig.isConsumeBroadcastEnable()
+ && consumerGroupInfo.getMessageModel() == MessageModel.BROADCASTING) {
+ response.setCode(ResponseCode.NO_PERMISSION);
+ response.setRemark("The consumer group[" + requestHeader.getConsumerGroup() + "] can not consume by broadcast way");
+ return response;
+ }
+
+ subscriptionData = consumerGroupInfo.findSubscriptionData(requestHeader.getTopic());
+ if (null == subscriptionData) {
+ log.warn("The consumer's subscription not exist, group: {}, topic:{}", requestHeader.getConsumerGroup(), requestHeader.getTopic());
+ response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
+ response.setRemark("The consumer's subscription not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));
+ return response;
+ }
+
+ if (subscriptionData.getSubVersion() < requestHeader.getSubVersion()) {
+ log.warn("The broker's subscription is not latest, group: {} {}", requestHeader.getConsumerGroup(),
+ subscriptionData.getSubString());
+ response.setCode(ResponseCode.SUBSCRIPTION_NOT_LATEST);
+ response.setRemark("The consumer's subscription not latest");
+ return response;
+ }
}
- CompletableFuture<RemotingCommand> responseFuture = snodeController.getEnodeService().pullMessage(request);
+ CompletableFuture<RemotingCommand> responseFuture = snodeController.getEnodeService().pullMessage(requestHeader.getEnodeName(), request);
responseFuture.whenComplete((data, ex) -> {
if (ex == null) {
if (this.snodeController.getConsumeMessageInterceptorGroup() != null) {
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java
index 9477895..ecda813 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java
@@ -15,17 +15,22 @@
* limitations under the License.
*/
package org.apache.rocketmq.snode.processor;
+
import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.header.ConsumerSendMsgBackRequestHeader;
+import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.RequestProcessor;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.SnodeController;
-import org.apache.rocketmq.snode.interceptor.ExceptionContext;
-import org.apache.rocketmq.snode.interceptor.RequestContext;
-import org.apache.rocketmq.snode.interceptor.ResponseContext;
+import org.apache.rocketmq.remoting.interceptor.ExceptionContext;
+import org.apache.rocketmq.remoting.interceptor.RequestContext;
+import org.apache.rocketmq.remoting.interceptor.ResponseContext;
public class SendMessageProcessor implements RequestProcessor {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
@@ -37,12 +42,30 @@ public class SendMessageProcessor implements RequestProcessor {
}
@Override
- public RemotingCommand processRequest(RemotingChannel remotingChannel, RemotingCommand request) {
+ public RemotingCommand processRequest(RemotingChannel remotingChannel,
+ RemotingCommand request) throws RemotingCommandException {
if (this.snodeController.getSendMessageInterceptorGroup() != null) {
RequestContext requestContext = new RequestContext(request, remotingChannel);
this.snodeController.getSendMessageInterceptorGroup().beforeRequest(requestContext);
}
- CompletableFuture<RemotingCommand> responseFuture = snodeController.getEnodeService().sendMessage(request);
+ String enodeName;
+ SendMessageRequestHeaderV2 sendMessageRequestHeaderV2 = null;
+ boolean isSendBack = false;
+ if (request.getCode() == RequestCode.SEND_MESSAGE_V2) {
+ sendMessageRequestHeaderV2 = (SendMessageRequestHeaderV2) request.decodeCommandCustomHeader(SendMessageRequestHeaderV2.class);
+ enodeName = sendMessageRequestHeaderV2.getN();
+ } else {
+ isSendBack = true;
+ ConsumerSendMsgBackRequestHeader consumerSendMsgBackRequestHeader = (ConsumerSendMsgBackRequestHeader) request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);
+ enodeName = consumerSendMsgBackRequestHeader.getEnodeName();
+ }
+
+ CompletableFuture<RemotingCommand> responseFuture = snodeController.getEnodeService().sendMessage(enodeName, request);
+
+ final String topic = sendMessageRequestHeaderV2.getB();
+ final Integer queueId = sendMessageRequestHeaderV2.getE();
+ final byte[] message = request.getBody();
+ final boolean isNeedPush = !isSendBack;
responseFuture.whenComplete((data, ex) -> {
if (ex == null) {
if (this.snodeController.getSendMessageInterceptorGroup() != null) {
@@ -50,6 +73,9 @@ public class SendMessageProcessor implements RequestProcessor {
this.snodeController.getSendMessageInterceptorGroup().afterRequest(responseContext);
}
remotingChannel.reply(data);
+ if (isNeedPush) {
+ this.snodeController.getPushService().pushMessage(topic, queueId, message, data);
+ }
} else {
if (this.snodeController.getSendMessageInterceptorGroup() != null) {
ExceptionContext exceptionContext = new ExceptionContext(request, remotingChannel, ex, null);
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/EnodeService.java b/snode/src/main/java/org/apache/rocketmq/snode/service/EnodeService.java
index 0105027..d6e307f 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/service/EnodeService.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/service/EnodeService.java
@@ -15,6 +15,7 @@
* limitations under the License.
*/
package org.apache.rocketmq.snode.service;
+
import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.common.TopicConfig;
@@ -29,13 +30,14 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public interface EnodeService {
void sendHearbeat(RemotingCommand remotingCommand);
- CompletableFuture<RemotingCommand> sendMessage(final RemotingCommand request);
+ CompletableFuture<RemotingCommand> sendMessage(final String enodeName, final RemotingCommand request);
- CompletableFuture<RemotingCommand> pullMessage(final RemotingCommand request);
+ CompletableFuture<RemotingCommand> pullMessage(final String enodeName, final RemotingCommand request);
void notifyConsumerIdsChanged(final RemotingChannel channel, final String consumerGroup);
- RemotingCommand creatTopic(String enodeName, TopicConfig topicConfig)throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException ;
+ RemotingCommand creatTopic(String enodeName,
+ TopicConfig topicConfig) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException;
void updateEnodeAddr(String clusterName) throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException, MQBrokerException;
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/PushService.java b/snode/src/main/java/org/apache/rocketmq/snode/service/PushService.java
index 41fc4de..98bd41a 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/service/PushService.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/service/PushService.java
@@ -16,13 +16,15 @@
*/
package org.apache.rocketmq.snode.service;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
public interface PushService {
boolean registerPushSession(String consumerGroup);
void unregisterPushSession(String consumerGroup);
- void pushMessage(final String messageId, final byte[] message, final Integer queueId, final String topic,
- final long queueOffset);
+ void pushMessage(final String topic, final Integer queueId, final byte[] message,
+ final RemotingCommand response);
void start();
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/EnodeServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/EnodeServiceImpl.java
index f6d157a..55866a2 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/EnodeServiceImpl.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/EnodeServiceImpl.java
@@ -15,6 +15,7 @@
* limitations under the License.
*/
package org.apache.rocketmq.snode.service.impl;
+
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
@@ -79,13 +80,12 @@ public class EnodeServiceImpl implements EnodeService {
}
@Override
- public CompletableFuture<RemotingCommand> pullMessage(RemotingCommand request) {
+ public CompletableFuture<RemotingCommand> pullMessage(final String enodeName, final RemotingCommand request) {
CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
try {
- final PullMessageRequestHeader requestHeader =
- (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
- this.snodeController.getRemotingClient().invokeAsync(requestHeader.getEnodeAddr(), request, SnodeConstant.CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, new InvokeCallback() {
+ String enodeAddress = this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false);
+ this.snodeController.getRemotingClient().invokeAsync(enodeAddress, request, SnodeConstant.CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, new InvokeCallback() {
@Override
public void operationComplete(ResponseFuture responseFuture) {
RemotingCommand response = responseFuture.getResponseCommand();
@@ -110,17 +110,9 @@ public class EnodeServiceImpl implements EnodeService {
}
@Override
- public CompletableFuture<RemotingCommand> sendMessage(RemotingCommand request) {
+ public CompletableFuture<RemotingCommand> sendMessage(String enodeName, RemotingCommand request) {
CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
try {
- String enodeName;
- if (request.getCode() == RequestCode.SEND_MESSAGE_V2) {
- SendMessageRequestHeaderV2 sendMessageRequestHeaderV2 = (SendMessageRequestHeaderV2) request.decodeCommandCustomHeader(SendMessageRequestHeaderV2.class);
- enodeName = sendMessageRequestHeaderV2.getN();
- } else {
- ConsumerSendMsgBackRequestHeader consumerSendMsgBackRequestHeader = (ConsumerSendMsgBackRequestHeader) request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);
- enodeName = consumerSendMsgBackRequestHeader.getEnodeName();
- }
String enodeAddress = this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false);
this.snodeController.getRemotingClient().invokeAsync(enodeAddress, request, SnodeConstant.defaultTimeoutMills, (responseFuture) -> {
future.complete(responseFuture.getResponseCommand());
@@ -149,7 +141,7 @@ public class EnodeServiceImpl implements EnodeService {
try {
this.snodeController.getSnodeServer().invokeOneway(channel, request, SnodeConstant.oneWaytimeout);
} catch (Exception e) {
- log.error("NotifyConsumerIdsChanged exception, " + consumerGroup, e.getMessage());
+ log.error("NotifyConsumerIdsChanged consumer group: {} exception ", consumerGroup, e);
}
}
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java
index ffaa4df..0476a6b 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java
@@ -21,7 +21,9 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.header.PushMessageHeader;
+import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;
import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
@@ -45,57 +47,55 @@ public class PushServiceImpl implements PushService {
this.snodeController.getSnodeConfig().getSnodePushMessageMaxPoolSize(),
3000,
TimeUnit.MILLISECONDS,
- new ArrayBlockingQueue<Runnable>(this.snodeController.getSnodeConfig().getSnodeSendThreadPoolQueueCapacity()),
+ new ArrayBlockingQueue<>(this.snodeController.getSnodeConfig().getSnodeSendThreadPoolQueueCapacity()),
"SnodePushMessageThread",
false);
}
public class PushTask implements Runnable {
- private AtomicBoolean canceled;
- private final String messageId;
+ private AtomicBoolean canceled = new AtomicBoolean(false);
private final byte[] message;
private final Integer queueId;
private final String topic;
- private final long queueOffset;
+ private final RemotingCommand response;
- public PushTask(final String messageId, final byte[] message, final Integer queueId, final String topic,
- final long queueOffset) {
- this.messageId = messageId;
+ public PushTask(final String topic, final Integer queueId, final byte[] message,
+ final RemotingCommand response) {
this.message = message;
this.queueId = queueId;
this.topic = topic;
- this.queueOffset = queueOffset;
+ this.response = response;
}
@Override
public void run() {
if (!canceled.get()) {
- PushMessageHeader pushMessageHeader = new PushMessageHeader();
- pushMessageHeader.setMessageId(this.messageId);
- pushMessageHeader.setQueueOffset(queueOffset);
- pushMessageHeader.setTopic(topic);
- pushMessageHeader.setQueueId(queueId);
- RemotingCommand pushMessage = RemotingCommand.createResponseCommand(PushMessageHeader.class);
- pushMessage.setBody(message);
- pushMessage.setCustomHeader(pushMessageHeader);
try {
- ClientChannelInfo clientChannelInfo = snodeController.getPushSessionManager().getClientInfoTable(topic, queueId);
+ SendMessageResponseHeader sendMessageResponseHeader = (SendMessageResponseHeader) response.decodeCommandCustomHeader(SendMessageResponseHeader.class);
+ PushMessageHeader pushMessageHeader = new PushMessageHeader();
+ pushMessageHeader.setQueueOffset(sendMessageResponseHeader.getQueueOffset());
+ pushMessageHeader.setTopic(topic);
+ pushMessageHeader.setQueueId(queueId);
+ RemotingCommand pushMessage = RemotingCommand.createResponseCommand(PushMessageHeader.class);
+ pushMessage.setBody(message);
+ pushMessage.setCustomHeader(pushMessageHeader);
+ pushMessage.setCode(RequestCode.SNODE_PUSH_MESSAGE);
+ ClientChannelInfo clientChannelInfo = snodeController.getConsumerManager().getClientInfoTable(topic, queueId);
if (clientChannelInfo != null) {
+ log.warn("Push message to topic: {} queueId: {}, message:{}", topic, queueId, pushMessage);
RemotingChannel remotingChannel = clientChannelInfo.getChannel();
snodeController.getSnodeServer().push(remotingChannel, pushMessage, SnodeConstant.defaultTimeoutMills);
} else {
log.warn("Get client info to topic: {} queueId: {} is null", topic, queueId);
}
} catch (Exception ex) {
- log.warn("Push message to topic: {} queueId: {} ex:{}", topic, queueId, ex);
+ log.warn("Push message to topic: {} queueId: {}", topic, queueId, ex);
}
+ } else {
+ log.info("Push message to topic: {} queueId: {} canceled!", topic, queueId);
}
}
- public AtomicBoolean getCanceled() {
- return canceled;
- }
-
public void setCanceled(AtomicBoolean canceled) {
this.canceled = canceled;
}
@@ -113,10 +113,15 @@ public class PushServiceImpl implements PushService {
}
@Override
- public void pushMessage(final String messageId, final byte[] message, final Integer queueId, final String topic,
- final long queueOffset) {
- PushTask pushTask = new PushTask(messageId, message, queueId, topic, queueOffset);
- pushMessageExecutorService.submit(pushTask);
+ public void pushMessage(final String topic, final Integer queueId, final byte[] message,
+ final RemotingCommand response) {
+ ClientChannelInfo clientChannelInfo = this.snodeController.getConsumerManager().getClientInfoTable(topic, queueId);
+ if (clientChannelInfo != null) {
+ PushTask pushTask = new PushTask(topic, queueId, message, response);
+ pushMessageExecutorService.submit(pushTask);
+ } else {
+ log.info("Topic: {} QueueId: {} no need to push", topic, queueId);
+ }
}
@Override
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/ScheduledServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/ScheduledServiceImpl.java
index 4de080b..21e7fab 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/ScheduledServiceImpl.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/ScheduledServiceImpl.java
@@ -112,7 +112,7 @@ public class ScheduledServiceImpl implements ScheduledService {
log.warn("Update broker addr error:{}", ex);
}
}
- }, 1000 * 10, Math.max(10000, Math.min(snodeConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
+ }, 0, Math.max(10000, Math.min(snodeConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override