You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/28 08:13:52 UTC
[27/34] incubator-rocketmq git commit: ROCKETMQ-18 Reformat all codes.
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java
index c493c70..6a34a69 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java
@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.broker.processor;
+import io.netty.channel.ChannelHandlerContext;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.TopicFilterType;
import org.apache.rocketmq.common.constant.LoggerName;
@@ -33,11 +34,9 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.PutMessageResult;
-import io.netty.channel.ChannelHandlerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
public class EndTransactionProcessor implements NettyRequestProcessor {
private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME);
private final BrokerController brokerController;
@@ -50,36 +49,35 @@ public class EndTransactionProcessor implements NettyRequestProcessor {
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final EndTransactionRequestHeader requestHeader =
- (EndTransactionRequestHeader) request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);
-
+ (EndTransactionRequestHeader)request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);
if (requestHeader.getFromTransactionCheck()) {
switch (requestHeader.getCommitOrRollback()) {
case MessageSysFlag.TRANSACTION_NOT_TYPE: {
LOGGER.warn("check producer[{}] transaction state, but it's pending status."
- + "RequestHeader: {} Remark: {}",
- RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
- requestHeader.toString(),
- request.getRemark());
+ + "RequestHeader: {} Remark: {}",
+ RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
+ requestHeader.toString(),
+ request.getRemark());
return null;
}
case MessageSysFlag.TRANSACTION_COMMIT_TYPE: {
LOGGER.warn("check producer[{}] transaction state, the producer commit the message."
- + "RequestHeader: {} Remark: {}",
- RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
- requestHeader.toString(),
- request.getRemark());
+ + "RequestHeader: {} Remark: {}",
+ RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
+ requestHeader.toString(),
+ request.getRemark());
break;
}
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: {
LOGGER.warn("check producer[{}] transaction state, the producer rollback the message."
- + "RequestHeader: {} Remark: {}",
- RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
- requestHeader.toString(),
- request.getRemark());
+ + "RequestHeader: {} Remark: {}",
+ RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
+ requestHeader.toString(),
+ request.getRemark());
break;
}
default:
@@ -89,10 +87,10 @@ public class EndTransactionProcessor implements NettyRequestProcessor {
switch (requestHeader.getCommitOrRollback()) {
case MessageSysFlag.TRANSACTION_NOT_TYPE: {
LOGGER.warn("the producer[{}] end transaction in sending message, and it's pending status."
- + "RequestHeader: {} Remark: {}",
- RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
- requestHeader.toString(),
- request.getRemark());
+ + "RequestHeader: {} Remark: {}",
+ RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
+ requestHeader.toString(),
+ request.getRemark());
return null;
}
@@ -102,10 +100,10 @@ public class EndTransactionProcessor implements NettyRequestProcessor {
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: {
LOGGER.warn("the producer[{}] end transaction in sending message, rollback the message."
- + "RequestHeader: {} Remark: {}",
- RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
- requestHeader.toString(),
- request.getRemark());
+ + "RequestHeader: {} Remark: {}",
+ RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
+ requestHeader.toString(),
+ request.getRemark());
break;
}
default:
@@ -210,8 +208,8 @@ public class EndTransactionProcessor implements NettyRequestProcessor {
MessageAccessor.setProperties(msgInner, msgExt.getProperties());
TopicFilterType topicFilterType =
- (msgInner.getSysFlag() & MessageSysFlag.MULTI_TAGS_FLAG) == MessageSysFlag.MULTI_TAGS_FLAG ? TopicFilterType.MULTI_TAG
- : TopicFilterType.SINGLE_TAG;
+ (msgInner.getSysFlag() & MessageSysFlag.MULTI_TAGS_FLAG) == MessageSysFlag.MULTI_TAGS_FLAG ? TopicFilterType.MULTI_TAG
+ : TopicFilterType.SINGLE_TAG;
long tagsCodeValue = MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());
msgInner.setTagsCode(tagsCodeValue);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/processor/ForwardRequestProcessor.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ForwardRequestProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ForwardRequestProcessor.java
index 67e55a4..2a6482c 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ForwardRequestProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ForwardRequestProcessor.java
@@ -6,36 +6,33 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.broker.processor;
+import io.netty.channel.ChannelHandlerContext;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-import io.netty.channel.ChannelHandlerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
public class ForwardRequestProcessor implements NettyRequestProcessor {
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final BrokerController brokerController;
-
public ForwardRequestProcessor(final BrokerController brokerController) {
this.brokerController = brokerController;
}
-
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) {
return null;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
index 041037f..7169b9c 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
@@ -16,6 +16,13 @@
*/
package org.apache.rocketmq.broker.processor;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.FileRegion;
+import java.nio.ByteBuffer;
+import java.util.List;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
import org.apache.rocketmq.broker.longpolling.PullRequest;
@@ -49,14 +56,9 @@ import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
-import io.netty.channel.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.nio.ByteBuffer;
-import java.util.List;
-
-
public class PullMessageProcessor implements NettyRequestProcessor {
private static final Logger LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final BrokerController brokerController;
@@ -77,12 +79,11 @@ public class PullMessageProcessor implements NettyRequestProcessor {
}
private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)
- throws RemotingCommandException {
+ throws RemotingCommandException {
RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
- final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader();
+ final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader)response.readCustomHeader();
final PullMessageRequestHeader requestHeader =
- (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
-
+ (PullMessageRequestHeader)request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
response.setOpaque(request.getOpaque());
@@ -90,24 +91,21 @@ public class PullMessageProcessor implements NettyRequestProcessor {
LOG.debug("receive PullMessage request command, " + request);
}
-
if (!PermName.isReadable(this.brokerController.getBrokerConfig().getBrokerPermission())) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] pulling message is forbidden");
return response;
}
-
SubscriptionGroupConfig subscriptionGroupConfig =
- this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup());
+ this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup());
if (null == subscriptionGroupConfig) {
response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
response.setRemark("subscription group not exist, " + requestHeader.getConsumerGroup() + " "
- + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
+ + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
return response;
}
-
if (!subscriptionGroupConfig.isConsumeEnable()) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark("subscription group no permission, " + requestHeader.getConsumerGroup());
@@ -120,49 +118,45 @@ public class PullMessageProcessor implements NettyRequestProcessor {
final long suspendTimeoutMillisLong = hasSuspendFlag ? requestHeader.getSuspendTimeoutMillis() : 0;
-
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
if (null == topicConfig) {
LOG.error("the topic " + requestHeader.getTopic() + " not exist, consumer: " + RemotingHelper.parseChannelRemoteAddr(channel));
response.setCode(ResponseCode.TOPIC_NOT_EXIST);
response.setRemark(
- "topic[" + requestHeader.getTopic() + "] not exist, apply first please!" + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
+ "topic[" + requestHeader.getTopic() + "] not exist, apply first please!" + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
return response;
}
-
if (!PermName.isReadable(topicConfig.getPerm())) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark("the topic[" + requestHeader.getTopic() + "] pulling message is forbidden");
return response;
}
-
if (requestHeader.getQueueId() < 0 || requestHeader.getQueueId() >= topicConfig.getReadQueueNums()) {
String errorInfo = "queueId[" + requestHeader.getQueueId() + "] is illagal,Topic :" + requestHeader.getTopic()
- + " topicConfig.readQueueNums: " + topicConfig.getReadQueueNums() + " consumer: " + channel.remoteAddress();
+ + " topicConfig.readQueueNums: " + topicConfig.getReadQueueNums() + " consumer: " + channel.remoteAddress();
LOG.warn(errorInfo);
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(errorInfo);
return response;
}
-
SubscriptionData subscriptionData = null;
if (hasSubscriptionFlag) {
try {
subscriptionData = FilterAPI.buildSubscriptionData(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
- requestHeader.getSubscription());
+ requestHeader.getSubscription());
} catch (Exception e) {
LOG.warn("parse the consumer's subscription[{}] failed, group: {}", requestHeader.getSubscription(), //
- requestHeader.getConsumerGroup());
+ requestHeader.getConsumerGroup());
response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED);
response.setRemark("parse the consumer's subscription failed");
return response;
}
} else {
ConsumerGroupInfo consumerGroupInfo =
- this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup());
+ this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup());
if (null == consumerGroupInfo) {
LOG.warn("the consumer's group info not exist, group: {}", requestHeader.getConsumerGroup());
response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
@@ -171,7 +165,7 @@ public class PullMessageProcessor implements NettyRequestProcessor {
}
if (!subscriptionGroupConfig.isConsumeBroadcastEnable() //
- && consumerGroupInfo.getMessageModel() == MessageModel.BROADCASTING) {
+ && consumerGroupInfo.getMessageModel() == MessageModel.BROADCASTING) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark("the consumer group[" + requestHeader.getConsumerGroup() + "] can not consume by broadcast way");
return response;
@@ -185,10 +179,9 @@ public class PullMessageProcessor implements NettyRequestProcessor {
return response;
}
-
if (subscriptionData.getSubVersion() < requestHeader.getSubVersion()) {
LOG.warn("the broker's subscription is not latest, group: {} {}", requestHeader.getConsumerGroup(),
- subscriptionData.getSubString());
+ subscriptionData.getSubString());
response.setCode(ResponseCode.SUBSCRIPTION_NOT_LATEST);
response.setRemark("the consumer's subscription not latest");
return response;
@@ -196,15 +189,14 @@ public class PullMessageProcessor implements NettyRequestProcessor {
}
final GetMessageResult getMessageResult =
- this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
- requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), subscriptionData);
+ this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
+ requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), subscriptionData);
if (getMessageResult != null) {
response.setRemark(getMessageResult.getStatus().name());
responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset());
responseHeader.setMinOffset(getMessageResult.getMinOffset());
responseHeader.setMaxOffset(getMessageResult.getMaxOffset());
-
if (getMessageResult.isSuggestPullingFromSlave()) {
responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
} else {
@@ -250,11 +242,11 @@ public class PullMessageProcessor implements NettyRequestProcessor {
// XXX: warn and notify me
LOG.info("the broker store no queue data, fix the request offset {} to {}, Topic: {} QueueId: {} Consumer Group: {}", //
- requestHeader.getQueueOffset(), //
- getMessageResult.getNextBeginOffset(), //
- requestHeader.getTopic(), //
- requestHeader.getQueueId(), //
- requestHeader.getConsumerGroup()//
+ requestHeader.getQueueOffset(), //
+ getMessageResult.getNextBeginOffset(), //
+ requestHeader.getTopic(), //
+ requestHeader.getQueueId(), //
+ requestHeader.getConsumerGroup()//
);
} else {
response.setCode(ResponseCode.PULL_NOT_FOUND);
@@ -270,7 +262,7 @@ public class PullMessageProcessor implements NettyRequestProcessor {
response.setCode(ResponseCode.PULL_OFFSET_MOVED);
// XXX: warn and notify me
LOG.info("the request offset: " + requestHeader.getQueueOffset() + " over flow badly, broker max offset: "
- + getMessageResult.getMaxOffset() + ", consumer: " + channel.remoteAddress());
+ + getMessageResult.getMaxOffset() + ", consumer: " + channel.remoteAddress());
break;
case OFFSET_OVERFLOW_ONE:
response.setCode(ResponseCode.PULL_NOT_FOUND);
@@ -278,8 +270,8 @@ public class PullMessageProcessor implements NettyRequestProcessor {
case OFFSET_TOO_SMALL:
response.setCode(ResponseCode.PULL_OFFSET_MOVED);
LOG.info("the request offset too small. group={}, topic={}, requestOffset={}, brokerMinOffset={}, clientIp={}",
- requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueOffset(),
- getMessageResult.getMinOffset(), channel.remoteAddress());
+ requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueOffset(),
+ getMessageResult.getMinOffset(), channel.remoteAddress());
break;
default:
assert false;
@@ -308,7 +300,6 @@ public class PullMessageProcessor implements NettyRequestProcessor {
case ResponseCode.PULL_NOT_FOUND:
if (!brokerAllowSuspend) {
-
context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_EPOLLS);
context.setCommercialRcvTimes(1);
context.setCommercialOwner(owner);
@@ -333,23 +324,23 @@ public class PullMessageProcessor implements NettyRequestProcessor {
case ResponseCode.SUCCESS:
this.brokerController.getBrokerStatsManager().incGroupGetNums(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
- getMessageResult.getMessageCount());
+ getMessageResult.getMessageCount());
this.brokerController.getBrokerStatsManager().incGroupGetSize(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
- getMessageResult.getBufferTotalSize());
+ getMessageResult.getBufferTotalSize());
this.brokerController.getBrokerStatsManager().incBrokerGetNums(getMessageResult.getMessageCount());
if (this.brokerController.getBrokerConfig().isTransferMsgByHeap()) {
final long beginTimeMills = this.brokerController.getMessageStore().now();
final byte[] r = this.readGetMessageResult(getMessageResult, requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId());
this.brokerController.getBrokerStatsManager().incGroupGetLatency(requestHeader.getConsumerGroup(),
- requestHeader.getTopic(), requestHeader.getQueueId(),
- (int) (this.brokerController.getMessageStore().now() - beginTimeMills));
+ requestHeader.getTopic(), requestHeader.getQueueId(),
+ (int)(this.brokerController.getMessageStore().now() - beginTimeMills));
response.setBody(r);
} else {
try {
FileRegion fileRegion =
- new ManyMessageTransfer(response.encodeHeader(getMessageResult.getBufferTotalSize()), getMessageResult);
+ new ManyMessageTransfer(response.encodeHeader(getMessageResult.getBufferTotalSize()), getMessageResult);
channel.writeAndFlush(fileRegion).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
@@ -379,18 +370,17 @@ public class PullMessageProcessor implements NettyRequestProcessor {
long offset = requestHeader.getQueueOffset();
int queueId = requestHeader.getQueueId();
PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
- this.brokerController.getMessageStore().now(), offset, subscriptionData);
+ this.brokerController.getMessageStore().now(), offset, subscriptionData);
this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
response = null;
break;
}
-
case ResponseCode.PULL_RETRY_IMMEDIATELY:
break;
case ResponseCode.PULL_OFFSET_MOVED:
if (this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE
- || this.brokerController.getMessageStoreConfig().isOffsetCheckInSlave()) {
+ || this.brokerController.getMessageStoreConfig().isOffsetCheckInSlave()) {
MessageQueue mq = new MessageQueue();
mq.setTopic(requestHeader.getTopic());
mq.setQueueId(requestHeader.getQueueId());
@@ -403,15 +393,15 @@ public class PullMessageProcessor implements NettyRequestProcessor {
event.setOffsetNew(getMessageResult.getNextBeginOffset());
this.generateOffsetMovedEvent(event);
LOG.warn(
- "PULL_OFFSET_MOVED:correction offset. topic={}, groupId={}, requestOffset={}, newOffset={}, suggestBrokerId={}",
- requestHeader.getTopic(), requestHeader.getConsumerGroup(), event.getOffsetRequest(), event.getOffsetNew(),
- responseHeader.getSuggestWhichBrokerId());
+ "PULL_OFFSET_MOVED:correction offset. topic={}, groupId={}, requestOffset={}, newOffset={}, suggestBrokerId={}",
+ requestHeader.getTopic(), requestHeader.getConsumerGroup(), event.getOffsetRequest(), event.getOffsetNew(),
+ responseHeader.getSuggestWhichBrokerId());
} else {
responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());
response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
LOG.warn("PULL_OFFSET_MOVED:none correction. topic={}, groupId={}, requestOffset={}, suggestBrokerId={}",
- requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getQueueOffset(),
- responseHeader.getSuggestWhichBrokerId());
+ requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getQueueOffset(),
+ responseHeader.getSuggestWhichBrokerId());
}
break;
@@ -423,19 +413,17 @@ public class PullMessageProcessor implements NettyRequestProcessor {
response.setRemark("store getMessage return null");
}
-
boolean storeOffsetEnable = brokerAllowSuspend;
storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag;
storeOffsetEnable = storeOffsetEnable
- && this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE;
+ && this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE;
if (storeOffsetEnable) {
this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel),
- requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
+ requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
}
return response;
}
-
public boolean hasConsumeMessageHook() {
return consumeMessageHookList != null && !this.consumeMessageHookList.isEmpty();
}
@@ -512,7 +500,7 @@ public class PullMessageProcessor implements NettyRequestProcessor {
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
LOG.error("processRequestWrapper response to " + future.channel().remoteAddress() + " failed",
- future.cause());
+ future.cause());
LOG.error(request.toString());
LOG.error(response.toString());
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java
index 0b6b775..04f206f 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java
@@ -6,16 +6,20 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.broker.processor;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.FileRegion;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.pagecache.OneMessageTransfer;
import org.apache.rocketmq.broker.pagecache.QueryMessageTransfer;
@@ -31,28 +35,21 @@ import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.QueryMessageResult;
import org.apache.rocketmq.store.SelectMappedBufferResult;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.FileRegion;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
public class QueryMessageProcessor implements NettyRequestProcessor {
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final BrokerController brokerController;
-
public QueryMessageProcessor(final BrokerController brokerController) {
this.brokerController = brokerController;
}
-
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
- throws RemotingCommandException {
+ throws RemotingCommandException {
switch (request.getCode()) {
case RequestCode.QUERY_MESSAGE:
return this.queryMessage(ctx, request);
@@ -70,44 +67,40 @@ public class QueryMessageProcessor implements NettyRequestProcessor {
return false;
}
-
public RemotingCommand queryMessage(ChannelHandlerContext ctx, RemotingCommand request)
- throws RemotingCommandException {
+ throws RemotingCommandException {
final RemotingCommand response =
- RemotingCommand.createResponseCommand(QueryMessageResponseHeader.class);
+ RemotingCommand.createResponseCommand(QueryMessageResponseHeader.class);
final QueryMessageResponseHeader responseHeader =
- (QueryMessageResponseHeader) response.readCustomHeader();
+ (QueryMessageResponseHeader)response.readCustomHeader();
final QueryMessageRequestHeader requestHeader =
- (QueryMessageRequestHeader) request
- .decodeCommandCustomHeader(QueryMessageRequestHeader.class);
-
+ (QueryMessageRequestHeader)request
+ .decodeCommandCustomHeader(QueryMessageRequestHeader.class);
response.setOpaque(request.getOpaque());
-
String isUniqueKey = request.getExtFields().get(MixAll.UNIQUE_MSG_QUERY_FLAG);
if (isUniqueKey != null && isUniqueKey.equals("true")) {
requestHeader.setMaxNum(this.brokerController.getMessageStoreConfig().getDefaultQueryMaxNum());
}
final QueryMessageResult queryMessageResult =
- this.brokerController.getMessageStore().queryMessage(requestHeader.getTopic(),
- requestHeader.getKey(), requestHeader.getMaxNum(), requestHeader.getBeginTimestamp(),
- requestHeader.getEndTimestamp());
+ this.brokerController.getMessageStore().queryMessage(requestHeader.getTopic(),
+ requestHeader.getKey(), requestHeader.getMaxNum(), requestHeader.getBeginTimestamp(),
+ requestHeader.getEndTimestamp());
assert queryMessageResult != null;
responseHeader.setIndexLastUpdatePhyoffset(queryMessageResult.getIndexLastUpdatePhyoffset());
responseHeader.setIndexLastUpdateTimestamp(queryMessageResult.getIndexLastUpdateTimestamp());
-
if (queryMessageResult.getBufferTotalSize() > 0) {
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
try {
FileRegion fileRegion =
- new QueryMessageTransfer(response.encodeHeader(queryMessageResult
- .getBufferTotalSize()), queryMessageResult);
+ new QueryMessageTransfer(response.encodeHeader(queryMessageResult
+ .getBufferTotalSize()), queryMessageResult);
ctx.channel().writeAndFlush(fileRegion).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
@@ -130,26 +123,24 @@ public class QueryMessageProcessor implements NettyRequestProcessor {
return response;
}
-
public RemotingCommand viewMessageById(ChannelHandlerContext ctx, RemotingCommand request)
- throws RemotingCommandException {
+ throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final ViewMessageRequestHeader requestHeader =
- (ViewMessageRequestHeader) request.decodeCommandCustomHeader(ViewMessageRequestHeader.class);
-
+ (ViewMessageRequestHeader)request.decodeCommandCustomHeader(ViewMessageRequestHeader.class);
response.setOpaque(request.getOpaque());
final SelectMappedBufferResult selectMappedBufferResult =
- this.brokerController.getMessageStore().selectOneMessageByOffset(requestHeader.getOffset());
+ this.brokerController.getMessageStore().selectOneMessageByOffset(requestHeader.getOffset());
if (selectMappedBufferResult != null) {
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
try {
FileRegion fileRegion =
- new OneMessageTransfer(response.encodeHeader(selectMappedBufferResult.getSize()),
- selectMappedBufferResult);
+ new OneMessageTransfer(response.encodeHeader(selectMappedBufferResult.getSize()),
+ selectMappedBufferResult);
ctx.channel().writeAndFlush(fileRegion).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index 6002df2..1b95205 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -17,11 +17,17 @@
package org.apache.rocketmq.broker.processor;
import io.netty.channel.ChannelHandlerContext;
+import java.net.SocketAddress;
+import java.util.List;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook;
import org.apache.rocketmq.broker.mqtrace.SendMessageContext;
-import org.apache.rocketmq.common.*;
+import org.apache.rocketmq.common.MQVersion;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.TopicFilterType;
+import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.common.message.MessageAccessor;
@@ -44,10 +50,6 @@ import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.config.StorePathConfigHelper;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
-import java.net.SocketAddress;
-import java.util.List;
-
-
public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
private List<ConsumeMessageHook> consumeMessageHookList;
@@ -80,14 +82,14 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
@Override
public boolean rejectRequest() {
return this.brokerController.getMessageStore().isOSPageCacheBusy() ||
- this.brokerController.getMessageStore().isTransientStorePoolDeficient();
+ this.brokerController.getMessageStore().isTransientStorePoolDeficient();
}
private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request)
- throws RemotingCommandException {
+ throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final ConsumerSendMsgBackRequestHeader requestHeader =
- (ConsumerSendMsgBackRequestHeader) request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);
+ (ConsumerSendMsgBackRequestHeader)request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);
if (this.hasConsumeMessageHook() && !UtilAll.isBlank(requestHeader.getOriginMsgId())) {
@@ -101,24 +103,21 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
this.executeConsumeMessageHookAfter(context);
}
-
SubscriptionGroupConfig subscriptionGroupConfig =
- this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroup());
+ this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroup());
if (null == subscriptionGroupConfig) {
response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
response.setRemark("subscription group not exist, " + requestHeader.getGroup() + " "
- + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
+ + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
return response;
}
-
if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending message is forbidden");
return response;
}
-
if (subscriptionGroupConfig.getRetryQueueNums() <= 0) {
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
@@ -128,24 +127,21 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();
-
int topicSysFlag = 0;
if (requestHeader.isUnitMode()) {
topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
}
-
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(//
- newTopic, //
- subscriptionGroupConfig.getRetryQueueNums(), //
- PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
+ newTopic, //
+ subscriptionGroupConfig.getRetryQueueNums(), //
+ PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
if (null == topicConfig) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("topic[" + newTopic + "] not exist");
return response;
}
-
if (!PermName.isWriteable(topicConfig.getPerm())) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(String.format("the topic[%s] sending message is forbidden", newTopic));
@@ -159,31 +155,27 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
return response;
}
-
final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
if (null == retryTopic) {
MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic());
}
msgExt.setWaitStoreMsgOK(false);
-
int delayLevel = requestHeader.getDelayLevel();
-
int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
}
-
if (msgExt.getReconsumeTimes() >= maxReconsumeTimes//
- || delayLevel < 0) {
+ || delayLevel < 0) {
newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, //
- DLQ_NUMS_PER_GROUP, //
- PermName.PERM_WRITE, 0
+ DLQ_NUMS_PER_GROUP, //
+ PermName.PERM_WRITE, 0
);
if (null == topicConfig) {
response.setCode(ResponseCode.SYSTEM_ERROR);
@@ -247,13 +239,12 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
}
private RemotingCommand sendMessage(final ChannelHandlerContext ctx, //
- final RemotingCommand request, //
- final SendMessageContext sendMessageContext, //
- final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
+ final RemotingCommand request, //
+ final SendMessageContext sendMessageContext, //
+ final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
- final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader();
-
+ final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();
response.setOpaque(request.getOpaque());
@@ -296,15 +287,14 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
if (null != newTopic && newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
String groupName = newTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
SubscriptionGroupConfig subscriptionGroupConfig =
- this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupName);
+ this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupName);
if (null == subscriptionGroupConfig) {
response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
response.setRemark(
- "subscription group not exist, " + groupName + " " + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
+ "subscription group not exist, " + groupName + " " + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
return response;
}
-
int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
@@ -314,8 +304,8 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
newTopic = MixAll.getDLQTopic(groupName);
queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, //
- DLQ_NUMS_PER_GROUP, //
- PermName.PERM_WRITE, 0
+ DLQ_NUMS_PER_GROUP, //
+ PermName.PERM_WRITE, 0
);
if (null == topicConfig) {
response.setCode(ResponseCode.SYSTEM_ERROR);
@@ -344,7 +334,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
if (traFlag != null) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(
- "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending transaction message is forbidden");
+ "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending transaction message is forbidden");
return response;
}
}
@@ -381,12 +371,12 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
case PROPERTIES_SIZE_EXCEEDED:
response.setCode(ResponseCode.MESSAGE_ILLEGAL);
response.setRemark(
- "the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k.");
+ "the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k.");
break;
case SERVICE_NOT_AVAILABLE:
response.setCode(ResponseCode.SERVICE_NOT_AVAILABLE);
response.setRemark(
- "service not available now, maybe disk full, " + diskUtil() + ", maybe your broker machine memory too small.");
+ "service not available now, maybe disk full, " + diskUtil() + ", maybe your broker machine memory too small.");
break;
case OS_PAGECACHE_BUSY:
response.setCode(ResponseCode.SYSTEM_ERROR);
@@ -407,7 +397,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
this.brokerController.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic());
this.brokerController.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(),
- putMessageResult.getAppendMessageResult().getWroteBytes());
+ putMessageResult.getAppendMessageResult().getWroteBytes());
this.brokerController.getBrokerStatsManager().incBrokerPutNums();
response.setRemark(null);
@@ -416,10 +406,8 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
responseHeader.setQueueId(queueIdInt);
responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset());
-
doResponse(ctx, request, response);
-
if (hasSendMessageHook()) {
sendMessageContext.setMsgId(responseHeader.getMsgId());
sendMessageContext.setQueueId(responseHeader.getQueueId());
@@ -427,7 +415,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount();
int wroteSize = putMessageResult.getAppendMessageResult().getWroteBytes();
- int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT) * commercialBaseCount;
+ int incValue = (int)Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT) * commercialBaseCount;
sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_SUCCESS);
sendMessageContext.setCommercialSendTimes(incValue);
@@ -438,7 +426,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
} else {
if (hasSendMessageHook()) {
int wroteSize = request.getBody().length;
- int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT);
+ int incValue = (int)Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT);
sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_FAILURE);
sendMessageContext.setCommercialSendTimes(incValue);
@@ -479,11 +467,11 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
String storePathLogis =
- StorePathConfigHelper.getStorePathConsumeQueue(this.brokerController.getMessageStoreConfig().getStorePathRootDir());
+ StorePathConfigHelper.getStorePathConsumeQueue(this.brokerController.getMessageStoreConfig().getStorePathRootDir());
double logisRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogis);
String storePathIndex =
- StorePathConfigHelper.getStorePathIndex(this.brokerController.getMessageStoreConfig().getStorePathRootDir());
+ StorePathConfigHelper.getStorePathIndex(this.brokerController.getMessageStoreConfig().getStorePathRootDir());
double indexRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathIndex);
return String.format("CL: %5.2f CQ: %5.2f INDEX: %5.2f", physicRatio, logisRatio, indexRatio);
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
index 2db2317..2545f1f 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
@@ -6,16 +6,17 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.broker.slave;
+import java.io.IOException;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
import org.apache.rocketmq.common.MixAll;
@@ -27,30 +28,23 @@ import org.apache.rocketmq.store.config.StorePathConfigHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-
-
public class SlaveSynchronize {
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final BrokerController brokerController;
private volatile String masterAddr = null;
-
public SlaveSynchronize(BrokerController brokerController) {
this.brokerController = brokerController;
}
-
public String getMasterAddr() {
return masterAddr;
}
-
public void setMasterAddr(String masterAddr) {
this.masterAddr = masterAddr;
}
-
public void syncAll() {
this.syncTopicConfig();
this.syncConsumerOffset();
@@ -58,21 +52,20 @@ public class SlaveSynchronize {
this.syncSubscriptionGroupConfig();
}
-
private void syncTopicConfig() {
String masterAddrBak = this.masterAddr;
if (masterAddrBak != null) {
try {
TopicConfigSerializeWrapper topicWrapper =
- this.brokerController.getBrokerOuterAPI().getAllTopicConfig(masterAddrBak);
+ this.brokerController.getBrokerOuterAPI().getAllTopicConfig(masterAddrBak);
if (!this.brokerController.getTopicConfigManager().getDataVersion()
- .equals(topicWrapper.getDataVersion())) {
+ .equals(topicWrapper.getDataVersion())) {
this.brokerController.getTopicConfigManager().getDataVersion()
- .assignNewOne(topicWrapper.getDataVersion());
+ .assignNewOne(topicWrapper.getDataVersion());
this.brokerController.getTopicConfigManager().getTopicConfigTable().clear();
this.brokerController.getTopicConfigManager().getTopicConfigTable()
- .putAll(topicWrapper.getTopicConfigTable());
+ .putAll(topicWrapper.getTopicConfigTable());
this.brokerController.getTopicConfigManager().persist();
log.info("update slave topic config from master, {}", masterAddrBak);
@@ -83,15 +76,14 @@ public class SlaveSynchronize {
}
}
-
private void syncConsumerOffset() {
String masterAddrBak = this.masterAddr;
if (masterAddrBak != null) {
try {
ConsumerOffsetSerializeWrapper offsetWrapper =
- this.brokerController.getBrokerOuterAPI().getAllConsumerOffset(masterAddrBak);
+ this.brokerController.getBrokerOuterAPI().getAllConsumerOffset(masterAddrBak);
this.brokerController.getConsumerOffsetManager().getOffsetTable()
- .putAll(offsetWrapper.getOffsetTable());
+ .putAll(offsetWrapper.getOffsetTable());
this.brokerController.getConsumerOffsetManager().persist();
log.info("update slave consumer offset from master, {}", masterAddrBak);
} catch (Exception e) {
@@ -100,18 +92,17 @@ public class SlaveSynchronize {
}
}
-
private void syncDelayOffset() {
String masterAddrBak = this.masterAddr;
if (masterAddrBak != null) {
try {
String delayOffset =
- this.brokerController.getBrokerOuterAPI().getAllDelayOffset(masterAddrBak);
+ this.brokerController.getBrokerOuterAPI().getAllDelayOffset(masterAddrBak);
if (delayOffset != null) {
String fileName =
- StorePathConfigHelper.getDelayOffsetStorePath(this.brokerController
- .getMessageStoreConfig().getStorePathRootDir());
+ StorePathConfigHelper.getDelayOffsetStorePath(this.brokerController
+ .getMessageStoreConfig().getStorePathRootDir());
try {
MixAll.string2File(delayOffset, fileName);
} catch (IOException e) {
@@ -125,24 +116,23 @@ public class SlaveSynchronize {
}
}
-
private void syncSubscriptionGroupConfig() {
String masterAddrBak = this.masterAddr;
if (masterAddrBak != null) {
try {
SubscriptionGroupWrapper subscriptionWrapper =
- this.brokerController.getBrokerOuterAPI()
- .getAllSubscriptionGroupConfig(masterAddrBak);
+ this.brokerController.getBrokerOuterAPI()
+ .getAllSubscriptionGroupConfig(masterAddrBak);
if (!this.brokerController.getSubscriptionGroupManager().getDataVersion()
- .equals(subscriptionWrapper.getDataVersion())) {
+ .equals(subscriptionWrapper.getDataVersion())) {
SubscriptionGroupManager subscriptionGroupManager =
- this.brokerController.getSubscriptionGroupManager();
+ this.brokerController.getSubscriptionGroupManager();
subscriptionGroupManager.getDataVersion().assignNewOne(
- subscriptionWrapper.getDataVersion());
+ subscriptionWrapper.getDataVersion());
subscriptionGroupManager.getSubscriptionGroupTable().clear();
subscriptionGroupManager.getSubscriptionGroupTable().putAll(
- subscriptionWrapper.getSubscriptionGroupTable());
+ subscriptionWrapper.getSubscriptionGroupTable());
subscriptionGroupManager.persist();
log.info("update slave Subscription Group from master, {}", masterAddrBak);
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
index 7865bc7..f77249a 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
@@ -16,6 +16,10 @@
*/
package org.apache.rocketmq.broker.subscription;
+import java.io.File;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.BrokerPathConfigHelper;
import org.apache.rocketmq.common.ConfigManager;
@@ -27,25 +31,23 @@ import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.util.Iterator;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-
-
public class SubscriptionGroupManager extends ConfigManager {
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final ConcurrentHashMap<String, SubscriptionGroupConfig> subscriptionGroupTable =
- new ConcurrentHashMap<String, SubscriptionGroupConfig>(1024);
+ new ConcurrentHashMap<String, SubscriptionGroupConfig>(1024);
private final DataVersion dataVersion = new DataVersion();
private transient BrokerController brokerController;
-
public SubscriptionGroupManager() {
this.init();
}
+ public SubscriptionGroupManager(BrokerController brokerController) {
+ this.brokerController = brokerController;
+ this.init();
+ }
+
private void init() {
{
SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
@@ -94,13 +96,6 @@ public class SubscriptionGroupManager extends ConfigManager {
}
}
-
- public SubscriptionGroupManager(BrokerController brokerController) {
- this.brokerController = brokerController;
- this.init();
- }
-
-
public void updateSubscriptionGroupConfig(final SubscriptionGroupConfig config) {
SubscriptionGroupConfig old = this.subscriptionGroupTable.put(config.getGroupName(), config);
if (old != null) {
@@ -122,7 +117,6 @@ public class SubscriptionGroupManager extends ConfigManager {
}
}
-
public SubscriptionGroupConfig findSubscriptionGroupConfig(final String group) {
SubscriptionGroupConfig subscriptionGroupConfig = this.subscriptionGroupTable.get(group);
if (null == subscriptionGroupConfig) {
@@ -141,7 +135,6 @@ public class SubscriptionGroupManager extends ConfigManager {
return subscriptionGroupConfig;
}
-
@Override
public String encode() {
return this.encode(false);
@@ -181,12 +174,10 @@ public class SubscriptionGroupManager extends ConfigManager {
return subscriptionGroupTable;
}
-
public DataVersion getDataVersion() {
return dataVersion;
}
-
public void deleteSubscriptionGroupConfig(final String groupName) {
SubscriptionGroupConfig old = this.subscriptionGroupTable.remove(groupName);
if (old != null) {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
index 9e14332..e826d24 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
@@ -16,6 +16,16 @@
*/
package org.apache.rocketmq.broker.topic;
+import java.io.File;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.BrokerPathConfigHelper;
import org.apache.rocketmq.common.ConfigManager;
@@ -30,34 +40,20 @@ import org.apache.rocketmq.common.sysflag.TopicSysFlag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-
public class TopicConfigManager extends ConfigManager {
private static final Logger LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final long LOCK_TIMEOUT_MILLIS = 3000;
private transient final Lock lockTopicConfigTable = new ReentrantLock();
private final ConcurrentHashMap<String, TopicConfig> topicConfigTable =
- new ConcurrentHashMap<String, TopicConfig>(1024);
+ new ConcurrentHashMap<String, TopicConfig>(1024);
private final DataVersion dataVersion = new DataVersion();
private final Set<String> systemTopicList = new HashSet<String>();
private transient BrokerController brokerController;
-
public TopicConfigManager() {
}
-
public TopicConfigManager(BrokerController brokerController) {
this.brokerController = brokerController;
{
@@ -76,9 +72,9 @@ public class TopicConfigManager extends ConfigManager {
TopicConfig topicConfig = new TopicConfig(topic);
this.systemTopicList.add(topic);
topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig()
- .getDefaultTopicQueueNums());
+ .getDefaultTopicQueueNums());
topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig()
- .getDefaultTopicQueueNums());
+ .getDefaultTopicQueueNums());
int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE;
topicConfig.setPerm(perm);
this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
@@ -130,29 +126,24 @@ public class TopicConfigManager extends ConfigManager {
}
}
-
public boolean isSystemTopic(final String topic) {
return this.systemTopicList.contains(topic);
}
-
public Set<String> getSystemTopic() {
return this.systemTopicList;
}
-
public boolean isTopicCanSendMessage(final String topic) {
return !topic.equals(MixAll.DEFAULT_TOPIC);
}
-
public TopicConfig selectTopicConfig(final String topic) {
return this.topicConfigTable.get(topic);
}
-
public TopicConfig createTopicInSendMessageMethod(final String topic, final String defaultTopic,
- final String remoteAddress, final int clientDefaultTopicQueueNums, final int topicSysFlag) {
+ final String remoteAddress, final int clientDefaultTopicQueueNums, final int topicSysFlag) {
TopicConfig topicConfig = null;
boolean createNew = false;
@@ -175,8 +166,8 @@ public class TopicConfigManager extends ConfigManager {
topicConfig = new TopicConfig(topic);
int queueNums =
- clientDefaultTopicQueueNums > defaultTopicConfig.getWriteQueueNums() ? defaultTopicConfig
- .getWriteQueueNums() : clientDefaultTopicQueueNums;
+ clientDefaultTopicQueueNums > defaultTopicConfig.getWriteQueueNums() ? defaultTopicConfig
+ .getWriteQueueNums() : clientDefaultTopicQueueNums;
if (queueNums < 0) {
queueNums = 0;
@@ -191,17 +182,17 @@ public class TopicConfigManager extends ConfigManager {
topicConfig.setTopicFilterType(defaultTopicConfig.getTopicFilterType());
} else {
LOG.warn("create new topic failed, because the default topic[" + defaultTopic
- + "] no perm, " + defaultTopicConfig.getPerm() + " producer: "
- + remoteAddress);
+ + "] no perm, " + defaultTopicConfig.getPerm() + " producer: "
+ + remoteAddress);
}
} else {
LOG.warn("create new topic failed, because the default topic[" + defaultTopic
- + "] not exist." + " producer: " + remoteAddress);
+ + "] not exist." + " producer: " + remoteAddress);
}
if (topicConfig != null) {
LOG.info("create new topic by default topic[" + defaultTopic + "], " + topicConfig
- + " producer: " + remoteAddress);
+ + " producer: " + remoteAddress);
this.topicConfigTable.put(topic, topicConfig);
@@ -227,10 +218,10 @@ public class TopicConfigManager extends ConfigManager {
}
public TopicConfig createTopicInSendMessageBackMethod(
- final String topic,
- final int clientDefaultTopicQueueNums,
- final int perm,
- final int topicSysFlag) {
+ final String topic,
+ final int clientDefaultTopicQueueNums,
+ final int perm,
+ final int topicSysFlag) {
TopicConfig topicConfig = this.topicConfigTable.get(topic);
if (topicConfig != null)
return topicConfig;
@@ -282,7 +273,7 @@ public class TopicConfigManager extends ConfigManager {
}
LOG.info("update topic sys flag. oldTopicSysFlag={}, newTopicSysFlag", oldTopicSysFlag,
- topicConfig.getTopicSysFlag());
+ topicConfig.getTopicSysFlag());
this.topicConfigTable.put(topic, topicConfig);
@@ -302,7 +293,7 @@ public class TopicConfigManager extends ConfigManager {
}
LOG.info("update topic sys flag. oldTopicSysFlag={}, newTopicSysFlag", oldTopicSysFlag,
- topicConfig.getTopicSysFlag());
+ topicConfig.getTopicSysFlag());
this.topicConfigTable.put(topic, topicConfig);
@@ -326,7 +317,6 @@ public class TopicConfigManager extends ConfigManager {
this.persist();
}
-
public void updateOrderTopicConfig(final KVTable orderKVTableFromNs) {
if (orderKVTableFromNs != null && orderKVTableFromNs.getTable() != null) {
@@ -403,7 +393,7 @@ public class TopicConfigManager extends ConfigManager {
public void decode(String jsonString) {
if (jsonString != null) {
TopicConfigSerializeWrapper topicConfigSerializeWrapper =
- TopicConfigSerializeWrapper.fromJson(jsonString, TopicConfigSerializeWrapper.class);
+ TopicConfigSerializeWrapper.fromJson(jsonString, TopicConfigSerializeWrapper.class);
if (topicConfigSerializeWrapper != null) {
this.topicConfigTable.putAll(topicConfigSerializeWrapper.getTopicConfigTable());
this.dataVersion.assignNewOne(topicConfigSerializeWrapper.getDataVersion());
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionRecord.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionRecord.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionRecord.java
index 68256d9..830a0c5 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionRecord.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionRecord.java
@@ -6,13 +6,13 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.broker.transaction;
@@ -22,22 +22,18 @@ public class TransactionRecord {
private long offset;
private String producerGroup;
-
public long getOffset() {
return offset;
}
-
public void setOffset(long offset) {
this.offset = offset;
}
-
public String getProducerGroup() {
return producerGroup;
}
-
public void setProducerGroup(String producerGroup) {
this.producerGroup = producerGroup;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionStore.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionStore.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionStore.java
index d6e897a..f9b56d5 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionStore.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionStore.java
@@ -6,41 +6,33 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.broker.transaction;
import java.util.List;
-
public interface TransactionStore {
boolean open();
-
void close();
-
boolean put(final List<TransactionRecord> trs);
-
void remove(final List<Long> pks);
-
List<TransactionRecord> traverse(final long pk, final int nums);
-
long totalRecords();
-
long minPK();
-
long maxPK();
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/transaction/jdbc/JDBCTransactionStore.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/jdbc/JDBCTransactionStore.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/jdbc/JDBCTransactionStore.java
index 4bf73d2..240e141 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/jdbc/JDBCTransactionStore.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/jdbc/JDBCTransactionStore.java
@@ -6,17 +6,27 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.broker.transaction.jdbc;
+import java.net.URL;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.broker.transaction.TransactionRecord;
import org.apache.rocketmq.broker.transaction.TransactionStore;
import org.apache.rocketmq.common.MixAll;
@@ -24,13 +34,6 @@ import org.apache.rocketmq.common.constant.LoggerName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.net.URL;
-import java.sql.*;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicLong;
-
-
public class JDBCTransactionStore implements TransactionStore {
private static final Logger log = LoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME);
private final JDBCTransactionStoreConfig jdbcTransactionStoreConfig;
@@ -50,11 +53,10 @@ public class JDBCTransactionStore implements TransactionStore {
try {
this.connection =
- DriverManager.getConnection(this.jdbcTransactionStoreConfig.getJdbcURL(), props);
+ DriverManager.getConnection(this.jdbcTransactionStoreConfig.getJdbcURL(), props);
this.connection.setAutoCommit(false);
-
if (!this.computeTotalRecords()) {
return this.createDB();
}
@@ -72,7 +74,7 @@ public class JDBCTransactionStore implements TransactionStore {
try {
Class.forName(this.jdbcTransactionStoreConfig.getJdbcDriverClass()).newInstance();
log.info("Loaded the appropriate driver, {}",
- this.jdbcTransactionStoreConfig.getJdbcDriverClass());
+ this.jdbcTransactionStoreConfig.getJdbcDriverClass());
return true;
} catch (Exception e) {
log.info("Loaded the appropriate driver Exception", e);
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/transaction/jdbc/JDBCTransactionStoreConfig.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/jdbc/JDBCTransactionStoreConfig.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/jdbc/JDBCTransactionStoreConfig.java
index 5789329..86c1ec8 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/jdbc/JDBCTransactionStoreConfig.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/jdbc/JDBCTransactionStoreConfig.java
@@ -6,13 +6,13 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.broker.transaction.jdbc;
@@ -23,42 +23,34 @@ public class JDBCTransactionStoreConfig {
private String jdbcUser = "xxx";
private String jdbcPassword = "xxx";
-
public String getJdbcDriverClass() {
return jdbcDriverClass;
}
-
public void setJdbcDriverClass(String jdbcDriverClass) {
this.jdbcDriverClass = jdbcDriverClass;
}
-
public String getJdbcURL() {
return jdbcURL;
}
-
public void setJdbcURL(String jdbcURL) {
this.jdbcURL = jdbcURL;
}
-
public String getJdbcUser() {
return jdbcUser;
}
-
public void setJdbcUser(String jdbcUser) {
this.jdbcUser = jdbcUser;
}
-
public String getJdbcPassword() {
return jdbcPassword;
}
-
public void setJdbcPassword(String jdbcPassword) {
this.jdbcPassword = jdbcPassword;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java
----------------------------------------------------------------------
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java
index 6e7b9b0..f7675c2 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java
@@ -6,13 +6,13 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.broker;
@@ -27,9 +27,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class BrokerControllerTest {
- protected Logger logger = LoggerFactory.getLogger(BrokerControllerTest.class);
-
private static final int RESTART_NUM = 3;
+ protected Logger logger = LoggerFactory.getLogger(BrokerControllerTest.class);
/**
* Tests if the controller can be properly stopped and started.
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/test/java/org/apache/rocketmq/broker/BrokerTestHarness.java
----------------------------------------------------------------------
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/BrokerTestHarness.java b/broker/src/test/java/org/apache/rocketmq/broker/BrokerTestHarness.java
index 4fd7a5b..5e944d8 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/BrokerTestHarness.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/BrokerTestHarness.java
@@ -6,13 +6,15 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * $Id: SendMessageTest.java 1831 2013-05-16 01:39:51Z vintagewang@apache.org $
*/
/**
@@ -20,6 +22,8 @@
*/
package org.apache.rocketmq.broker;
+import java.io.File;
+import java.util.Random;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
@@ -30,15 +34,11 @@ import org.junit.Before;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.util.Random;
-
public class BrokerTestHarness {
+ public final String BROKER_NAME = "TestBrokerName";
protected BrokerController brokerController = null;
-
protected Random random = new Random();
- public final String BROKER_NAME = "TestBrokerName";
protected String brokerAddr = "";
protected Logger logger = LoggerFactory.getLogger(BrokerTestHarness.class);
protected BrokerConfig brokerConfig = new BrokerConfig();