You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by vi...@apache.org on 2016/12/28 10:02:49 UTC
[42/50] [abbrv] incubator-rocketmq git commit: ROCKETMQ-18 Reformat
all codes.
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
index 0f9954b..3047faf 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
@@ -6,23 +6,17 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.broker.client;
-import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.remoting.common.RemotingHelper;
-import org.apache.rocketmq.remoting.common.RemotingUtil;
import io.netty.channel.Channel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
@@ -30,7 +24,11 @@ import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class ProducerManager {
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
@@ -38,16 +36,14 @@ public class ProducerManager {
private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120;
private final Lock groupChannelLock = new ReentrantLock();
private final HashMap<String /* group name */, HashMap<Channel, ClientChannelInfo>> groupChannelTable =
- new HashMap<String, HashMap<Channel, ClientChannelInfo>>();
-
+ new HashMap<String, HashMap<Channel, ClientChannelInfo>>();
public ProducerManager() {
}
-
public HashMap<String, HashMap<Channel, ClientChannelInfo>> getGroupChannelTable() {
HashMap<String /* group name */, HashMap<Channel, ClientChannelInfo>> newGroupChannelTable =
- new HashMap<String, HashMap<Channel, ClientChannelInfo>>();
+ new HashMap<String, HashMap<Channel, ClientChannelInfo>>();
try {
if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
@@ -62,13 +58,12 @@ public class ProducerManager {
return newGroupChannelTable;
}
-
public void scanNotActiveChannel() {
try {
if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
for (final Map.Entry<String, HashMap<Channel, ClientChannelInfo>> entry : this.groupChannelTable
- .entrySet()) {
+ .entrySet()) {
final String group = entry.getKey();
final HashMap<Channel, ClientChannelInfo> chlMap = entry.getValue();
@@ -82,8 +77,8 @@ public class ProducerManager {
if (diff > CHANNEL_EXPIRED_TIMEOUT) {
it.remove();
log.warn(
- "SCAN: remove expired channel[{}] from ProducerManager groupChannelTable, producer group name: {}",
- RemotingHelper.parseChannelRemoteAddr(info.getChannel()), group);
+ "SCAN: remove expired channel[{}] from ProducerManager groupChannelTable, producer group name: {}",
+ RemotingHelper.parseChannelRemoteAddr(info.getChannel()), group);
RemotingUtil.closeChannel(info.getChannel());
}
}
@@ -99,23 +94,22 @@ public class ProducerManager {
}
}
-
public void doChannelCloseEvent(final String remoteAddr, final Channel channel) {
if (channel != null) {
try {
if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
for (final Map.Entry<String, HashMap<Channel, ClientChannelInfo>> entry : this.groupChannelTable
- .entrySet()) {
+ .entrySet()) {
final String group = entry.getKey();
final HashMap<Channel, ClientChannelInfo> clientChannelInfoTable =
- entry.getValue();
+ entry.getValue();
final ClientChannelInfo clientChannelInfo =
- clientChannelInfoTable.remove(channel);
+ clientChannelInfoTable.remove(channel);
if (clientChannelInfo != null) {
log.info(
- "NETTY EVENT: remove channel[{}][{}] from ProducerManager groupChannelTable, producer group: {}",
- clientChannelInfo.toString(), remoteAddr, group);
+ "NETTY EVENT: remove channel[{}][{}] from ProducerManager groupChannelTable, producer group: {}",
+ clientChannelInfo.toString(), remoteAddr, group);
}
}
@@ -131,7 +125,6 @@ public class ProducerManager {
}
}
-
public void registerProducer(final String group, final ClientChannelInfo clientChannelInfo) {
try {
ClientChannelInfo clientChannelInfoFound = null;
@@ -148,7 +141,7 @@ public class ProducerManager {
if (null == clientChannelInfoFound) {
channelTable.put(clientChannelInfo.getChannel(), clientChannelInfo);
log.info("new producer connected, group: {} channel: {}", group,
- clientChannelInfo.toString());
+ clientChannelInfo.toString());
}
} finally {
this.groupChannelLock.unlock();
@@ -165,7 +158,6 @@ public class ProducerManager {
}
}
-
public void unregisterProducer(final String group, final ClientChannelInfo clientChannelInfo) {
try {
if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
@@ -175,7 +167,7 @@ public class ProducerManager {
ClientChannelInfo old = channelTable.remove(clientChannelInfo.getChannel());
if (old != null) {
log.info("unregister a producer[{}] from groupChannelTable {}", group,
- clientChannelInfo.toString());
+ clientChannelInfo.toString());
}
if (channelTable.isEmpty()) {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java b/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java
index 40eff81..f6d7955 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java
@@ -16,6 +16,16 @@
*/
package org.apache.rocketmq.broker.client.net;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.FileRegion;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ClientChannelInfo;
import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
@@ -40,21 +50,9 @@ import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.SelectMappedBufferResult;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
-import io.netty.channel.FileRegion;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-
-
public class Broker2Client {
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final BrokerController brokerController;
@@ -64,17 +62,17 @@ public class Broker2Client {
}
public void checkProducerTransactionState(
- final Channel channel,
- final CheckTransactionStateRequestHeader requestHeader,
- final SelectMappedBufferResult selectMappedBufferResult) {
+ final Channel channel,
+ final CheckTransactionStateRequestHeader requestHeader,
+ final SelectMappedBufferResult selectMappedBufferResult) {
RemotingCommand request =
- RemotingCommand.createRequestCommand(RequestCode.CHECK_TRANSACTION_STATE, requestHeader);
+ RemotingCommand.createRequestCommand(RequestCode.CHECK_TRANSACTION_STATE, requestHeader);
request.markOnewayRPC();
try {
FileRegion fileRegion =
- new OneMessageTransfer(request.encodeHeader(selectMappedBufferResult.getSize()),
- selectMappedBufferResult);
+ new OneMessageTransfer(request.encodeHeader(selectMappedBufferResult.getSize()),
+ selectMappedBufferResult);
channel.writeAndFlush(fileRegion).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
@@ -91,14 +89,14 @@ public class Broker2Client {
}
public RemotingCommand callClient(final Channel channel,
- final RemotingCommand request
+ final RemotingCommand request
) throws RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
return this.brokerController.getRemotingServer().invokeSync(channel, request, 10000);
}
public void notifyConsumerIdsChanged(
- final Channel channel,
- final String consumerGroup) {
+ final Channel channel,
+ final String consumerGroup) {
if (null == consumerGroup) {
log.error("notifyConsumerIdsChanged consumerGroup is null");
return;
@@ -107,7 +105,7 @@ public class Broker2Client {
NotifyConsumerIdsChangedRequestHeader requestHeader = new NotifyConsumerIdsChangedRequestHeader();
requestHeader.setConsumerGroup(consumerGroup);
RemotingCommand request =
- RemotingCommand.createRequestCommand(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, requestHeader);
+ RemotingCommand.createRequestCommand(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, requestHeader);
try {
this.brokerController.getRemotingServer().invokeOneway(channel, request, 10);
@@ -121,7 +119,7 @@ public class Broker2Client {
}
public RemotingCommand resetOffset(String topic, String group, long timeStamp, boolean isForce,
- boolean isC) {
+ boolean isC) {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
@@ -141,7 +139,7 @@ public class Broker2Client {
mq.setQueueId(i);
long consumerOffset =
- this.brokerController.getConsumerOffsetManager().queryOffset(group, topic, i);
+ this.brokerController.getConsumerOffsetManager().queryOffset(group, topic, i);
if (-1 == consumerOffset) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(String.format("THe consumer group <%s> not exist", group));
@@ -173,7 +171,7 @@ public class Broker2Client {
requestHeader.setGroup(group);
requestHeader.setTimestamp(timeStamp);
RemotingCommand request =
- RemotingCommand.createRequestCommand(RequestCode.RESET_CONSUMER_CLIENT_OFFSET, requestHeader);
+ RemotingCommand.createRequestCommand(RequestCode.RESET_CONSUMER_CLIENT_OFFSET, requestHeader);
if (isC) {
// c++ language
ResetOffsetBodyForC body = new ResetOffsetBodyForC();
@@ -188,37 +186,37 @@ public class Broker2Client {
}
ConsumerGroupInfo consumerGroupInfo =
- this.brokerController.getConsumerManager().getConsumerGroupInfo(group);
+ this.brokerController.getConsumerManager().getConsumerGroupInfo(group);
if (consumerGroupInfo != null && !consumerGroupInfo.getAllChannel().isEmpty()) {
ConcurrentHashMap<Channel, ClientChannelInfo> channelInfoTable =
- consumerGroupInfo.getChannelInfoTable();
+ consumerGroupInfo.getChannelInfoTable();
for (Map.Entry<Channel, ClientChannelInfo> entry : channelInfoTable.entrySet()) {
int version = entry.getValue().getVersion();
if (version >= MQVersion.Version.V3_0_7_SNAPSHOT.ordinal()) {
try {
this.brokerController.getRemotingServer().invokeOneway(entry.getKey(), request, 5000);
log.info("[reset-offset] reset offset success. topic={}, group={}, clientId={}",
- topic, group, entry.getValue().getClientId());
+ topic, group, entry.getValue().getClientId());
} catch (Exception e) {
log.error("[reset-offset] reset offset exception. topic={}, group={}",
- new Object[]{topic, group}, e);
+ new Object[] {topic, group}, e);
}
} else {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("the client does not support this feature. version="
- + MQVersion.getVersionDesc(version));
+ + MQVersion.getVersionDesc(version));
log.warn("[reset-offset] the client does not support this feature. version={}",
- RemotingHelper.parseChannelRemoteAddr(entry.getKey()), MQVersion.getVersionDesc(version));
+ RemotingHelper.parseChannelRemoteAddr(entry.getKey()), MQVersion.getVersionDesc(version));
return response;
}
}
} else {
String errorInfo =
- String.format("Consumer not online, so can not reset offset, Group: %s Topic: %s Timestamp: %d",
- requestHeader.getGroup(),
- requestHeader.getTopic(),
- requestHeader.getTimestamp());
+ String.format("Consumer not online, so can not reset offset, Group: %s Topic: %s Timestamp: %d",
+ requestHeader.getGroup(),
+ requestHeader.getTopic(),
+ requestHeader.getTimestamp());
log.error(errorInfo);
response.setCode(ResponseCode.CONSUMER_NOT_ONLINE);
response.setRemark(errorInfo);
@@ -236,7 +234,7 @@ public class Broker2Client {
for (Entry<MessageQueue, Long> entry : table.entrySet()) {
MessageQueue mq = entry.getKey();
MessageQueueForC tmp =
- new MessageQueueForC(mq.getTopic(), mq.getBrokerName(), mq.getQueueId(), entry.getValue());
+ new MessageQueueForC(mq.getTopic(), mq.getBrokerName(), mq.getQueueId(), entry.getValue());
list.add(tmp);
}
return list;
@@ -249,13 +247,13 @@ public class Broker2Client {
requestHeader.setTopic(topic);
requestHeader.setGroup(group);
RemotingCommand request =
- RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT,
- requestHeader);
+ RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT,
+ requestHeader);
Map<String, Map<MessageQueue, Long>> consumerStatusTable =
- new HashMap<String, Map<MessageQueue, Long>>();
+ new HashMap<String, Map<MessageQueue, Long>>();
ConcurrentHashMap<Channel, ClientChannelInfo> channelInfoTable =
- this.brokerController.getConsumerManager().getConsumerGroupInfo(group).getChannelInfoTable();
+ this.brokerController.getConsumerManager().getConsumerGroupInfo(group).getChannelInfoTable();
if (null == channelInfoTable || channelInfoTable.isEmpty()) {
result.setCode(ResponseCode.SYSTEM_ERROR);
result.setRemark(String.format("No Any Consumer online in the consumer group: [%s]", group));
@@ -268,26 +266,26 @@ public class Broker2Client {
if (version < MQVersion.Version.V3_0_7_SNAPSHOT.ordinal()) {
result.setCode(ResponseCode.SYSTEM_ERROR);
result.setRemark("the client does not support this feature. version="
- + MQVersion.getVersionDesc(version));
+ + MQVersion.getVersionDesc(version));
log.warn("[get-consumer-status] the client does not support this feature. version={}",
- RemotingHelper.parseChannelRemoteAddr(entry.getKey()), MQVersion.getVersionDesc(version));
+ RemotingHelper.parseChannelRemoteAddr(entry.getKey()), MQVersion.getVersionDesc(version));
return result;
} else if (UtilAll.isBlank(originClientId) || originClientId.equals(clientId)) {
try {
RemotingCommand response =
- this.brokerController.getRemotingServer().invokeSync(entry.getKey(), request, 5000);
+ this.brokerController.getRemotingServer().invokeSync(entry.getKey(), request, 5000);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
if (response.getBody() != null) {
GetConsumerStatusBody body =
- GetConsumerStatusBody.decode(response.getBody(),
- GetConsumerStatusBody.class);
+ GetConsumerStatusBody.decode(response.getBody(),
+ GetConsumerStatusBody.class);
consumerStatusTable.put(clientId, body.getMessageQueueTable());
log.info(
- "[get-consumer-status] get consumer status success. topic={}, group={}, channelRemoteAddr={}",
- topic, group, clientId);
+ "[get-consumer-status] get consumer status success. topic={}, group={}, channelRemoteAddr={}",
+ topic, group, clientId);
}
}
default:
@@ -295,8 +293,8 @@ public class Broker2Client {
}
} catch (Exception e) {
log.error(
- "[get-consumer-status] get consumer status exception. topic={}, group={}, offset={}",
- new Object[]{topic, group}, e);
+ "[get-consumer-status] get consumer status exception. topic={}, group={}, offset={}",
+ new Object[] {topic, group}, e);
}
if (!UtilAll.isBlank(originClientId) && originClientId.equals(clientId)) {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/client/rebalance/RebalanceLockManager.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/rebalance/RebalanceLockManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/rebalance/RebalanceLockManager.java
index 82ca014..88c044a 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/rebalance/RebalanceLockManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/rebalance/RebalanceLockManager.java
@@ -16,25 +16,23 @@
*/
package org.apache.rocketmq.broker.client.rebalance;
-import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.common.message.MessageQueue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class RebalanceLockManager {
private static final Logger log = LoggerFactory.getLogger(LoggerName.REBALANCE_LOCK_LOGGER_NAME);
private final static long REBALANCE_LOCK_MAX_LIVE_TIME = Long.parseLong(System.getProperty(
- "rocketmq.broker.rebalance.lockMaxLiveTime", "60000"));
+ "rocketmq.broker.rebalance.lockMaxLiveTime", "60000"));
private final Lock lock = new ReentrantLock();
private final ConcurrentHashMap<String/* group */, ConcurrentHashMap<MessageQueue, LockEntry>> mqLockTable =
- new ConcurrentHashMap<String, ConcurrentHashMap<MessageQueue, LockEntry>>(1024);
+ new ConcurrentHashMap<String, ConcurrentHashMap<MessageQueue, LockEntry>>(1024);
public boolean tryLock(final String group, final MessageQueue mq, final String clientId) {
@@ -54,9 +52,9 @@ public class RebalanceLockManager {
lockEntry.setClientId(clientId);
groupValue.put(mq, lockEntry);
log.info("tryLock, message queue not locked, I got it. Group: {} NewClientId: {} {}", //
- group, //
- clientId, //
- mq);
+ group, //
+ clientId, //
+ mq);
}
if (lockEntry.isLocked(clientId)) {
@@ -66,26 +64,24 @@ public class RebalanceLockManager {
String oldClientId = lockEntry.getClientId();
-
if (lockEntry.isExpired()) {
lockEntry.setClientId(clientId);
lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
log.warn(
- "tryLock, message queue lock expired, I got it. Group: {} OldClientId: {} NewClientId: {} {}", //
- group, //
- oldClientId, //
- clientId, //
- mq);
- return true;
- }
-
-
- log.warn(
- "tryLock, message queue locked by other client. Group: {} OtherClientId: {} NewClientId: {} {}", //
+ "tryLock, message queue lock expired, I got it. Group: {} OldClientId: {} NewClientId: {} {}", //
group, //
oldClientId, //
clientId, //
mq);
+ return true;
+ }
+
+ log.warn(
+ "tryLock, message queue locked by other client. Group: {} OtherClientId: {} NewClientId: {} {}", //
+ group, //
+ oldClientId, //
+ clientId, //
+ mq);
return false;
} finally {
this.lock.unlock();
@@ -118,11 +114,10 @@ public class RebalanceLockManager {
}
public Set<MessageQueue> tryLockBatch(final String group, final Set<MessageQueue> mqs,
- final String clientId) {
+ final String clientId) {
Set<MessageQueue> lockedMqs = new HashSet<MessageQueue>(mqs.size());
Set<MessageQueue> notLockedMqs = new HashSet<MessageQueue>(mqs.size());
-
for (MessageQueue mq : mqs) {
if (this.isLocked(group, mq, clientId)) {
lockedMqs.add(mq);
@@ -141,7 +136,6 @@ public class RebalanceLockManager {
this.mqLockTable.put(group, groupValue);
}
-
for (MessageQueue mq : notLockedMqs) {
LockEntry lockEntry = groupValue.get(mq);
if (null == lockEntry) {
@@ -149,13 +143,12 @@ public class RebalanceLockManager {
lockEntry.setClientId(clientId);
groupValue.put(mq, lockEntry);
log.info(
- "tryLockBatch, message queue not locked, I got it. Group: {} NewClientId: {} {}", //
- group, //
- clientId, //
- mq);
+ "tryLockBatch, message queue not locked, I got it. Group: {} NewClientId: {} {}", //
+ group, //
+ clientId, //
+ mq);
}
-
if (lockEntry.isLocked(clientId)) {
lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
lockedMqs.add(mq);
@@ -164,27 +157,25 @@ public class RebalanceLockManager {
String oldClientId = lockEntry.getClientId();
-
if (lockEntry.isExpired()) {
lockEntry.setClientId(clientId);
lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
log.warn(
- "tryLockBatch, message queue lock expired, I got it. Group: {} OldClientId: {} NewClientId: {} {}", //
- group, //
- oldClientId, //
- clientId, //
- mq);
+ "tryLockBatch, message queue lock expired, I got it. Group: {} OldClientId: {} NewClientId: {} {}", //
+ group, //
+ oldClientId, //
+ clientId, //
+ mq);
lockedMqs.add(mq);
continue;
}
-
log.warn(
- "tryLockBatch, message queue locked by other client. Group: {} OtherClientId: {} NewClientId: {} {}", //
- group, //
- oldClientId, //
- clientId, //
- mq);
+ "tryLockBatch, message queue locked by other client. Group: {} OtherClientId: {} NewClientId: {} {}", //
+ group, //
+ oldClientId, //
+ clientId, //
+ mq);
}
} finally {
this.lock.unlock();
@@ -209,27 +200,27 @@ public class RebalanceLockManager {
if (lockEntry.getClientId().equals(clientId)) {
groupValue.remove(mq);
log.info("unlockBatch, Group: {} {} {}",
- group,
- mq,
- clientId);
+ group,
+ mq,
+ clientId);
} else {
log.warn("unlockBatch, but mq locked by other client: {}, Group: {} {} {}",
- lockEntry.getClientId(),
- group,
- mq,
- clientId);
- }
- } else {
- log.warn("unlockBatch, but mq not locked, Group: {} {} {}",
+ lockEntry.getClientId(),
group,
mq,
clientId);
+ }
+ } else {
+ log.warn("unlockBatch, but mq not locked, Group: {} {} {}",
+ group,
+ mq,
+ clientId);
}
}
} else {
log.warn("unlockBatch, group not exist, Group: {} {}",
- group,
- clientId);
+ group,
+ clientId);
}
} finally {
this.lock.unlock();
@@ -243,22 +234,18 @@ public class RebalanceLockManager {
private String clientId;
private volatile long lastUpdateTimestamp = System.currentTimeMillis();
-
public String getClientId() {
return clientId;
}
-
public void setClientId(String clientId) {
this.clientId = clientId;
}
-
public long getLastUpdateTimestamp() {
return lastUpdateTimestamp;
}
-
public void setLastUpdateTimestamp(long lastUpdateTimestamp) {
this.lastUpdateTimestamp = lastUpdateTimestamp;
}
@@ -270,7 +257,7 @@ public class RebalanceLockManager {
public boolean isExpired() {
boolean expired =
- (System.currentTimeMillis() - this.lastUpdateTimestamp) > REBALANCE_LOCK_MAX_LIVE_TIME;
+ (System.currentTimeMillis() - this.lastUpdateTimestamp) > REBALANCE_LOCK_MAX_LIVE_TIME;
return expired;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java
index 5b86d99..c1b67ae 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java
@@ -17,15 +17,7 @@
package org.apache.rocketmq.broker.filtersrv;
-import org.apache.rocketmq.broker.BrokerController;
-import org.apache.rocketmq.broker.BrokerStartup;
-import org.apache.rocketmq.common.ThreadFactoryImpl;
-import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.remoting.common.RemotingUtil;
import io.netty.channel.Channel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
@@ -34,18 +26,24 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.BrokerStartup;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class FilterServerManager {
public static final long FILTER_SERVER_MAX_IDLE_TIME_MILLS = 30000;
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final ConcurrentHashMap<Channel, FilterServerInfo> filterServerTable =
- new ConcurrentHashMap<Channel, FilterServerInfo>(16);
+ new ConcurrentHashMap<Channel, FilterServerInfo>(16);
private final BrokerController brokerController;
private ScheduledExecutorService scheduledExecutorService = Executors
- .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("FilterServerManagerScheduledThread"));
+ .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("FilterServerManagerScheduledThread"));
public FilterServerManager(final BrokerController brokerController) {
this.brokerController = brokerController;
@@ -67,7 +65,7 @@ public class FilterServerManager {
public void createFilterServer() {
int more =
- this.brokerController.getBrokerConfig().getFilterServerNums() - this.filterServerTable.size();
+ this.brokerController.getBrokerConfig().getFilterServerNums() - this.filterServerTable.size();
String cmd = this.buildStartCommand();
for (int i = 0; i < more; i++) {
FilterServerUtil.callShell(cmd, log);
@@ -86,12 +84,12 @@ public class FilterServerManager {
if (RemotingUtil.isWindowsPlatform()) {
return String.format("start /b %s\\bin\\mqfiltersrv.exe %s",
- this.brokerController.getBrokerConfig().getRocketmqHome(),
- config);
+ this.brokerController.getBrokerConfig().getRocketmqHome(),
+ config);
} else {
return String.format("sh %s/bin/startfsrv.sh %s",
- this.brokerController.getBrokerConfig().getRocketmqHome(),
- config);
+ this.brokerController.getBrokerConfig().getRocketmqHome(),
+ config);
}
}
@@ -134,7 +132,7 @@ public class FilterServerManager {
FilterServerInfo old = this.filterServerTable.remove(channel);
if (old != null) {
log.warn("The Filter Server<{}> connection<{}> closed, remove it", old.getFilterServerAddr(),
- remoteAddr);
+ remoteAddr);
}
}
@@ -152,22 +150,18 @@ public class FilterServerManager {
private String filterServerAddr;
private long lastUpdateTimestamp;
-
public String getFilterServerAddr() {
return filterServerAddr;
}
-
public void setFilterServerAddr(String filterServerAddr) {
this.filterServerAddr = filterServerAddr;
}
-
public long getLastUpdateTimestamp() {
return lastUpdateTimestamp;
}
-
public void setLastUpdateTimestamp(long lastUpdateTimestamp) {
this.lastUpdateTimestamp = lastUpdateTimestamp;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerUtil.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerUtil.java b/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerUtil.java
index de4cc37..9edfbe4 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerUtil.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerUtil.java
@@ -6,20 +6,19 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.broker.filtersrv;
import org.slf4j.Logger;
-
public class FilterServerUtil {
public static void callShell(final String shellString, final Logger log) {
Process process = null;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
index 5359368..f616e33 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
@@ -6,16 +6,19 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.broker.latency;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
@@ -24,21 +27,27 @@ import org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-
public class BrokerFastFailure {
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
- "BrokerFastFailureScheduledThread"));
+ "BrokerFastFailureScheduledThread"));
private final BrokerController brokerController;
public BrokerFastFailure(final BrokerController brokerController) {
this.brokerController = brokerController;
}
+ public static RequestTask castRunnable(final Runnable runnable) {
+ try {
+ FutureTaskExt object = (FutureTaskExt)runnable;
+ return (RequestTask)object.getRunnable();
+ } catch (Throwable e) {
+ log.error(String.format("castRunnable exception, %s", runnable.getClass().getName()), e);
+ }
+
+ return null;
+ }
+
public void start() {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
@@ -95,17 +104,6 @@ public class BrokerFastFailure {
}
}
- public static RequestTask castRunnable(final Runnable runnable) {
- try {
- FutureTaskExt object = (FutureTaskExt) runnable;
- return (RequestTask) object.getRunnable();
- } catch (Throwable e) {
- log.error(String.format("castRunnable exception, %s", runnable.getClass().getName()), e);
- }
-
- return null;
- }
-
public void shutdown() {
this.scheduledExecutorService.shutdown();
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFixedThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFixedThreadPoolExecutor.java b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFixedThreadPoolExecutor.java
index 8c4c5e8..5a4ad2d 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFixedThreadPoolExecutor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFixedThreadPoolExecutor.java
@@ -17,22 +17,31 @@
package org.apache.rocketmq.broker.latency;
-import java.util.concurrent.*;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
public class BrokerFixedThreadPoolExecutor extends ThreadPoolExecutor {
- public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, final TimeUnit unit, final BlockingQueue<Runnable> workQueue) {
+ public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, final TimeUnit unit,
+ final BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
- public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, final TimeUnit unit, final BlockingQueue<Runnable> workQueue, final ThreadFactory threadFactory) {
+ public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, final TimeUnit unit,
+ final BlockingQueue<Runnable> workQueue, final ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}
- public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, final TimeUnit unit, final BlockingQueue<Runnable> workQueue, final RejectedExecutionHandler handler) {
+ public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, final TimeUnit unit,
+ final BlockingQueue<Runnable> workQueue, final RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
}
- public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, final TimeUnit unit, final BlockingQueue<Runnable> workQueue, final ThreadFactory threadFactory, final RejectedExecutionHandler handler) {
+ public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, final TimeUnit unit,
+ final BlockingQueue<Runnable> workQueue, final ThreadFactory threadFactory, final RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/longpolling/ManyPullRequest.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/ManyPullRequest.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/ManyPullRequest.java
index e261b40..0ee02ad 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/ManyPullRequest.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/ManyPullRequest.java
@@ -6,37 +6,33 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.broker.longpolling;
import java.util.ArrayList;
import java.util.List;
-
public class ManyPullRequest {
private final ArrayList<PullRequest> pullRequestList = new ArrayList<PullRequest>();
-
public synchronized void addPullRequest(final PullRequest pullRequest) {
this.pullRequestList.add(pullRequest);
}
-
public synchronized void addPullRequest(final List<PullRequest> many) {
this.pullRequestList.addAll(many);
}
-
public synchronized List<PullRequest> cloneListAndClear() {
if (!this.pullRequestList.isEmpty()) {
- List<PullRequest> result = (ArrayList<PullRequest>) this.pullRequestList.clone();
+ List<PullRequest> result = (ArrayList<PullRequest>)this.pullRequestList.clone();
this.pullRequestList.clear();
return result;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java
index f953c1e..97e54f9 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java
@@ -6,29 +6,26 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.broker.longpolling;
import org.apache.rocketmq.store.MessageArrivingListener;
-
public class NotifyMessageArrivingListener implements MessageArrivingListener {
private final PullRequestHoldService pullRequestHoldService;
-
public NotifyMessageArrivingListener(final PullRequestHoldService pullRequestHoldService) {
this.pullRequestHoldService = pullRequestHoldService;
}
-
@Override
public void arriving(String topic, int queueId, long logicOffset, long tagsCode) {
this.pullRequestHoldService.notifyMessageArriving(topic, queueId, logicOffset, tagsCode);
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequest.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequest.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequest.java
index 40716f8..1cceec8 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequest.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequest.java
@@ -6,20 +6,19 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.broker.longpolling;
+import io.netty.channel.Channel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-import io.netty.channel.Channel;
-
public class PullRequest {
private final RemotingCommand requestCommand;
@@ -29,9 +28,8 @@ public class PullRequest {
private final long pullFromThisOffset;
private final SubscriptionData subscriptionData;
-
public PullRequest(RemotingCommand requestCommand, Channel clientChannel, long timeoutMillis, long suspendTimestamp,
- long pullFromThisOffset, SubscriptionData subscriptionData) {
+ long pullFromThisOffset, SubscriptionData subscriptionData) {
this.requestCommand = requestCommand;
this.clientChannel = clientChannel;
this.timeoutMillis = timeoutMillis;
@@ -40,27 +38,22 @@ public class PullRequest {
this.subscriptionData = subscriptionData;
}
-
public RemotingCommand getRequestCommand() {
return requestCommand;
}
-
public Channel getClientChannel() {
return clientChannel;
}
-
public long getTimeoutMillis() {
return timeoutMillis;
}
-
public long getSuspendTimestamp() {
return suspendTimestamp;
}
-
public long getPullFromThisOffset() {
return pullFromThisOffset;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java
index 5182664..0e5be9b 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java
@@ -16,6 +16,9 @@
*/
package org.apache.rocketmq.broker.longpolling;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.SystemClock;
@@ -25,11 +28,6 @@ import org.apache.rocketmq.store.MessageFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-
-
public class PullRequestHoldService extends ServiceThread {
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final String TOPIC_QUEUEID_SEPARATOR = "@";
@@ -37,8 +35,7 @@ public class PullRequestHoldService extends ServiceThread {
private final SystemClock systemClock = new SystemClock();
private final MessageFilter messageFilter = new DefaultMessageFilter();
private ConcurrentHashMap<String/* topic@queueId */, ManyPullRequest> pullRequestTable =
- new ConcurrentHashMap<String, ManyPullRequest>(1024);
-
+ new ConcurrentHashMap<String, ManyPullRequest>(1024);
public PullRequestHoldService(final BrokerController brokerController) {
this.brokerController = brokerController;
@@ -135,7 +132,7 @@ public class PullRequestHoldService extends ServiceThread {
if (this.messageFilter.isMessageMatched(request.getSubscriptionData(), tmp)) {
try {
this.brokerController.getPullMessageProcessor().excuteRequestWhenWakeup(request.getClientChannel(),
- request.getRequestCommand());
+ request.getRequestCommand());
} catch (Throwable e) {
log.error("execute request when wakeup failed.", e);
}
@@ -146,14 +143,13 @@ public class PullRequestHoldService extends ServiceThread {
if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) {
try {
this.brokerController.getPullMessageProcessor().excuteRequestWhenWakeup(request.getClientChannel(),
- request.getRequestCommand());
+ request.getRequestCommand());
} catch (Throwable e) {
log.error("execute request when wakeup failed.", e);
}
continue;
}
-
replayList.add(request);
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/ConsumeMessageContext.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/ConsumeMessageContext.java b/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/ConsumeMessageContext.java
index 3a167fa..cc2f218 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/ConsumeMessageContext.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/ConsumeMessageContext.java
@@ -6,20 +6,18 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.broker.mqtrace;
-import org.apache.rocketmq.store.stats.BrokerStatsManager;
-
import java.util.Map;
-
+import org.apache.rocketmq.store.stats.BrokerStatsManager;
public class ConsumeMessageContext {
private String consumerGroup;
@@ -38,102 +36,82 @@ public class ConsumeMessageContext {
private int commercialRcvTimes;
private int commercialRcvSize;
-
public String getConsumerGroup() {
return consumerGroup;
}
-
public void setConsumerGroup(String consumerGroup) {
this.consumerGroup = consumerGroup;
}
-
public String getTopic() {
return topic;
}
-
public void setTopic(String topic) {
this.topic = topic;
}
-
public Integer getQueueId() {
return queueId;
}
-
public void setQueueId(Integer queueId) {
this.queueId = queueId;
}
-
public String getClientHost() {
return clientHost;
}
-
public void setClientHost(String clientHost) {
this.clientHost = clientHost;
}
-
public String getStoreHost() {
return storeHost;
}
-
public void setStoreHost(String storeHost) {
this.storeHost = storeHost;
}
-
public Map<String, Long> getMessageIds() {
return messageIds;
}
-
public void setMessageIds(Map<String, Long> messageIds) {
this.messageIds = messageIds;
}
-
public boolean isSuccess() {
return success;
}
-
public void setSuccess(boolean success) {
this.success = success;
}
-
public String getStatus() {
return status;
}
-
public void setStatus(String status) {
this.status = status;
}
-
public Object getMqTraceContext() {
return mqTraceContext;
}
-
public void setMqTraceContext(Object mqTraceContext) {
this.mqTraceContext = mqTraceContext;
}
-
public int getBodyLength() {
return bodyLength;
}
-
public void setBodyLength(int bodyLength) {
this.bodyLength = bodyLength;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/ConsumeMessageHook.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/ConsumeMessageHook.java b/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/ConsumeMessageHook.java
index c4b7f36..7e724a0 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/ConsumeMessageHook.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/ConsumeMessageHook.java
@@ -6,22 +6,20 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.broker.mqtrace;
public interface ConsumeMessageHook {
String hookName();
-
void consumeMessageBefore(final ConsumeMessageContext context);
-
void consumeMessageAfter(final ConsumeMessageContext context);
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/SendMessageContext.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/SendMessageContext.java b/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/SendMessageContext.java
index ca8121d..8ad7919 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/SendMessageContext.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/SendMessageContext.java
@@ -6,22 +6,20 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.broker.mqtrace;
+import java.util.Properties;
import org.apache.rocketmq.common.message.MessageType;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
-import java.util.Properties;
-
-
public class SendMessageContext {
private String producerGroup;
private String topic;
@@ -92,137 +90,110 @@ public class SendMessageContext {
return producerGroup;
}
-
public void setProducerGroup(String producerGroup) {
this.producerGroup = producerGroup;
}
-
public String getTopic() {
return topic;
}
-
public void setTopic(String topic) {
this.topic = topic;
}
-
public String getMsgId() {
return msgId;
}
-
public void setMsgId(String msgId) {
this.msgId = msgId;
}
-
public String getOriginMsgId() {
return originMsgId;
}
-
public void setOriginMsgId(String originMsgId) {
this.originMsgId = originMsgId;
}
-
public Integer getQueueId() {
return queueId;
}
-
public void setQueueId(Integer queueId) {
this.queueId = queueId;
}
-
public Long getQueueOffset() {
return queueOffset;
}
-
public void setQueueOffset(Long queueOffset) {
this.queueOffset = queueOffset;
}
-
public String getBrokerAddr() {
return brokerAddr;
}
-
public void setBrokerAddr(String brokerAddr) {
this.brokerAddr = brokerAddr;
}
-
public String getBornHost() {
return bornHost;
}
-
public void setBornHost(String bornHost) {
this.bornHost = bornHost;
}
-
public int getBodyLength() {
return bodyLength;
}
-
public void setBodyLength(int bodyLength) {
this.bodyLength = bodyLength;
}
-
public int getCode() {
return code;
}
-
public void setCode(int code) {
this.code = code;
}
-
public String getErrorMsg() {
return errorMsg;
}
-
public void setErrorMsg(String errorMsg) {
this.errorMsg = errorMsg;
}
-
public String getMsgProps() {
return msgProps;
}
-
public void setMsgProps(String msgProps) {
this.msgProps = msgProps;
}
-
public Object getMqTraceContext() {
return mqTraceContext;
}
-
public void setMqTraceContext(Object mqTraceContext) {
this.mqTraceContext = mqTraceContext;
}
-
public Properties getExtProps() {
return extProps;
}
-
public void setExtProps(Properties extProps) {
this.extProps = extProps;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/SendMessageHook.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/SendMessageHook.java b/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/SendMessageHook.java
index 84cbdcb..a84d899 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/SendMessageHook.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/SendMessageHook.java
@@ -6,22 +6,20 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.broker.mqtrace;
public interface SendMessageHook {
public String hookName();
-
public void sendMessageBefore(final SendMessageContext context);
-
public void sendMessageAfter(final SendMessageContext context);
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
index 06ceb36..9346067 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
@@ -6,16 +6,23 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.broker.offset;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.BrokerPathConfigHelper;
import org.apache.rocketmq.common.ConfigManager;
@@ -25,30 +32,22 @@ import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.*;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-
-
public class ConsumerOffsetManager extends ConfigManager {
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final String TOPIC_GROUP_SEPARATOR = "@";
private ConcurrentHashMap<String/* topic@group */, ConcurrentHashMap<Integer, Long>> offsetTable =
- new ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>>(512);
+ new ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>>(512);
private transient BrokerController brokerController;
-
public ConsumerOffsetManager() {
}
-
public ConsumerOffsetManager(BrokerController brokerController) {
this.brokerController = brokerController;
}
-
public void scanUnsubscribedTopic() {
Iterator<Entry<String, ConcurrentHashMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator();
while (it.hasNext()) {
@@ -60,7 +59,7 @@ public class ConsumerOffsetManager extends ConfigManager {
String group = arrays[1];
if (null == brokerController.getConsumerManager().findSubscriptionData(group, topic)
- && this.offsetBehindMuchThanData(topic, next.getValue())) {
+ && this.offsetBehindMuchThanData(topic, next.getValue())) {
it.remove();
log.warn("remove topic offset, {}", topicAtGroup);
}
@@ -68,7 +67,6 @@ public class ConsumerOffsetManager extends ConfigManager {
}
}
-
private boolean offsetBehindMuchThanData(final String topic, ConcurrentHashMap<Integer, Long> table) {
Iterator<Entry<Integer, Long>> it = table.entrySet().iterator();
boolean result = !table.isEmpty();
@@ -83,7 +81,6 @@ public class ConsumerOffsetManager extends ConfigManager {
return result;
}
-
public Set<String> whichTopicByConsumer(final String group) {
Set<String> topics = new HashSet<String>();
@@ -102,7 +99,6 @@ public class ConsumerOffsetManager extends ConfigManager {
return topics;
}
-
public Set<String> whichGroupByTopic(final String topic) {
Set<String> groups = new HashSet<String>();
@@ -121,7 +117,6 @@ public class ConsumerOffsetManager extends ConfigManager {
return groups;
}
-
public void commitOffset(final String clientHost, final String group, final String topic, final int queueId, final long offset) {
// topic@group
String key = topic + TOPIC_GROUP_SEPARATOR + group;
@@ -182,12 +177,10 @@ public class ConsumerOffsetManager extends ConfigManager {
return offsetTable;
}
-
public void setOffsetTable(ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> offsetTable) {
this.offsetTable = offsetTable;
}
-
public Map<Integer, Long> queryMinOffsetInAllGroup(final String topic, final String filterGroups) {
Map<Integer, Long> queueMinOffset = new HashMap<Integer, Long>();
@@ -224,14 +217,12 @@ public class ConsumerOffsetManager extends ConfigManager {
return queueMinOffset;
}
-
public Map<Integer, Long> queryOffset(final String group, final String topic) {
// topic@group
String key = topic + TOPIC_GROUP_SEPARATOR + group;
return this.offsetTable.get(key);
}
-
public void cloneOffset(final String srcGroup, final String destGroup, final String topic) {
ConcurrentHashMap<Integer, Long> offsets = this.offsetTable.get(topic + TOPIC_GROUP_SEPARATOR + srcGroup);
if (offsets != null) {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index 3d969c4..25b333a 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -16,6 +16,9 @@
*/
package org.apache.rocketmq.broker.out;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
@@ -23,24 +26,27 @@ import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
import org.apache.rocketmq.common.namesrv.TopAddressing;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
-import org.apache.rocketmq.common.protocol.body.*;
+import org.apache.rocketmq.common.protocol.body.ConsumerOffsetSerializeWrapper;
+import org.apache.rocketmq.common.protocol.body.KVTable;
+import org.apache.rocketmq.common.protocol.body.RegisterBrokerBody;
+import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
+import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerResponseHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.UnRegisterBrokerRequestHeader;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.RemotingClient;
-import org.apache.rocketmq.remoting.exception.*;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.UnsupportedEncodingException;
-import java.util.ArrayList;
-import java.util.List;
-
-
public class BrokerOuterAPI {
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final RemotingClient remotingClient;
@@ -92,15 +98,15 @@ public class BrokerOuterAPI {
}
public RegisterBrokerResult registerBrokerAll(
- final String clusterName,
- final String brokerAddr,
- final String brokerName,
- final long brokerId,
- final String haServerAddr,
- final TopicConfigSerializeWrapper topicConfigWrapper,
- final List<String> filterServerList,
- final boolean oneway,
- final int timeoutMills) {
+ final String clusterName,
+ final String brokerAddr,
+ final String brokerName,
+ final long brokerId,
+ final String haServerAddr,
+ final TopicConfigSerializeWrapper topicConfigWrapper,
+ final List<String> filterServerList,
+ final boolean oneway,
+ final int timeoutMills) {
RegisterBrokerResult registerBrokerResult = null;
List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
@@ -108,7 +114,7 @@ public class BrokerOuterAPI {
for (String namesrvAddr : nameServerAddressList) {
try {
RegisterBrokerResult result = this.registerBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId,
- haServerAddr, topicConfigWrapper, filterServerList, oneway, timeoutMills);
+ haServerAddr, topicConfigWrapper, filterServerList, oneway, timeoutMills);
if (result != null) {
registerBrokerResult = result;
}
@@ -124,18 +130,18 @@ public class BrokerOuterAPI {
}
private RegisterBrokerResult registerBroker(
- final String namesrvAddr,
- final String clusterName,
- final String brokerAddr,
- final String brokerName,
- final long brokerId,
- final String haServerAddr,
- final TopicConfigSerializeWrapper topicConfigWrapper,
- final List<String> filterServerList,
- final boolean oneway,
- final int timeoutMills
+ final String namesrvAddr,
+ final String clusterName,
+ final String brokerAddr,
+ final String brokerName,
+ final long brokerId,
+ final String haServerAddr,
+ final TopicConfigSerializeWrapper topicConfigWrapper,
+ final List<String> filterServerList,
+ final boolean oneway,
+ final int timeoutMills
) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
- InterruptedException {
+ InterruptedException {
RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
requestHeader.setBrokerAddr(brokerAddr);
requestHeader.setBrokerId(brokerId);
@@ -163,7 +169,7 @@ public class BrokerOuterAPI {
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
RegisterBrokerResponseHeader responseHeader =
- (RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class);
+ (RegisterBrokerResponseHeader)response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class);
RegisterBrokerResult result = new RegisterBrokerResult();
result.setMasterAddr(responseHeader.getMasterAddr());
result.setHaServerAddr(responseHeader.getHaServerAddr());
@@ -181,10 +187,10 @@ public class BrokerOuterAPI {
}
public void unregisterBrokerAll(
- final String clusterName,
- final String brokerAddr,
- final String brokerName,
- final long brokerId
+ final String clusterName,
+ final String brokerAddr,
+ final String brokerName,
+ final long brokerId
) {
List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
if (nameServerAddressList != null) {
@@ -200,11 +206,11 @@ public class BrokerOuterAPI {
}
public void unregisterBroker(
- final String namesrvAddr,
- final String clusterName,
- final String brokerAddr,
- final String brokerName,
- final long brokerId
+ final String namesrvAddr,
+ final String clusterName,
+ final String brokerAddr,
+ final String brokerName,
+ final long brokerId
) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException {
UnRegisterBrokerRequestHeader requestHeader = new UnRegisterBrokerRequestHeader();
requestHeader.setBrokerAddr(brokerAddr);
@@ -227,7 +233,7 @@ public class BrokerOuterAPI {
}
public TopicConfigSerializeWrapper getAllTopicConfig(final String addr) throws RemotingConnectException, RemotingSendRequestException,
- RemotingTimeoutException, InterruptedException, MQBrokerException {
+ RemotingTimeoutException, InterruptedException, MQBrokerException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, null);
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(true, addr), request, 3000);
@@ -244,7 +250,7 @@ public class BrokerOuterAPI {
}
public ConsumerOffsetSerializeWrapper getAllConsumerOffset(final String addr) throws InterruptedException, RemotingTimeoutException,
- RemotingSendRequestException, RemotingConnectException, MQBrokerException {
+ RemotingSendRequestException, RemotingConnectException, MQBrokerException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_CONSUMER_OFFSET, null);
RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000);
assert response != null;
@@ -260,7 +266,7 @@ public class BrokerOuterAPI {
}
public String getAllDelayOffset(final String addr) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
- RemotingConnectException, MQBrokerException, UnsupportedEncodingException {
+ RemotingConnectException, MQBrokerException, UnsupportedEncodingException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_DELAY_OFFSET, null);
RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000);
assert response != null;
@@ -276,7 +282,7 @@ public class BrokerOuterAPI {
}
public SubscriptionGroupWrapper getAllSubscriptionGroupConfig(final String addr) throws InterruptedException, RemotingTimeoutException,
- RemotingSendRequestException, RemotingConnectException, MQBrokerException {
+ RemotingSendRequestException, RemotingConnectException, MQBrokerException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG, null);
RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000);
assert response != null;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/pagecache/ManyMessageTransfer.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/pagecache/ManyMessageTransfer.java b/broker/src/main/java/org/apache/rocketmq/broker/pagecache/ManyMessageTransfer.java
index e4c3045..1b1d5bb 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/pagecache/ManyMessageTransfer.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/pagecache/ManyMessageTransfer.java
@@ -6,38 +6,34 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.broker.pagecache;
-import org.apache.rocketmq.store.GetMessageResult;
import io.netty.channel.FileRegion;
import io.netty.util.AbstractReferenceCounted;
-
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
import java.util.List;
-
+import org.apache.rocketmq.store.GetMessageResult;
public class ManyMessageTransfer extends AbstractReferenceCounted implements FileRegion {
private final ByteBuffer byteBufferHeader;
private final GetMessageResult getMessageResult;
private long transfered; // the bytes which was transfered already
-
public ManyMessageTransfer(ByteBuffer byteBufferHeader, GetMessageResult getMessageResult) {
this.byteBufferHeader = byteBufferHeader;
this.getMessageResult = getMessageResult;
}
-
@Override
public long position() {
int pos = byteBufferHeader.position();
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/pagecache/OneMessageTransfer.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/pagecache/OneMessageTransfer.java b/broker/src/main/java/org/apache/rocketmq/broker/pagecache/OneMessageTransfer.java
index 3f00ece..b4cf0da 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/pagecache/OneMessageTransfer.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/pagecache/OneMessageTransfer.java
@@ -6,37 +6,33 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.broker.pagecache;
-import org.apache.rocketmq.store.SelectMappedBufferResult;
import io.netty.channel.FileRegion;
import io.netty.util.AbstractReferenceCounted;
-
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
-
+import org.apache.rocketmq.store.SelectMappedBufferResult;
public class OneMessageTransfer extends AbstractReferenceCounted implements FileRegion {
private final ByteBuffer byteBufferHeader;
private final SelectMappedBufferResult selectMappedBufferResult;
private long transfered; // the bytes which was transfered already
-
public OneMessageTransfer(ByteBuffer byteBufferHeader, SelectMappedBufferResult selectMappedBufferResult) {
this.byteBufferHeader = byteBufferHeader;
this.selectMappedBufferResult = selectMappedBufferResult;
}
-
@Override
public long position() {
return this.byteBufferHeader.position() + this.selectMappedBufferResult.getByteBuffer().position();