You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by sh...@apache.org on 2017/07/05 11:39:11 UTC
incubator-rocketmq git commit: ROCKETMQ-6: Use logger for exceptions
instead of e.printStackTrace().
Repository: incubator-rocketmq
Updated Branches:
refs/heads/develop 9ad9ad064 -> 246be9eb0
ROCKETMQ-6: Use logger for exceptions instead of e.printStackTrace().
Signed-off-by: shroman <rs...@yahoo.com>
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/246be9eb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/246be9eb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/246be9eb
Branch: refs/heads/develop
Commit: 246be9eb06a8686435f1e8db1e4fac7a0b737e89
Parents: 9ad9ad0
Author: shroman <rs...@yahoo.com>
Authored: Wed Jul 5 20:38:13 2017 +0900
Committer: shroman <rs...@yahoo.com>
Committed: Wed Jul 5 20:38:13 2017 +0900
----------------------------------------------------------------------
.../rocketmq/broker/BrokerController.java | 2 +-
.../broker/processor/PullMessageProcessor.java | 76 ++++++++++----------
.../broker/topic/TopicConfigManager.java | 40 ++++++-----
.../apache/rocketmq/common/BrokerConfig.java | 30 ++++----
.../apache/rocketmq/common/ConfigManager.java | 12 ++--
.../java/org/apache/rocketmq/common/MixAll.java | 27 +++----
.../apache/rocketmq/common/ServiceThread.java | 17 ++---
.../org/apache/rocketmq/common/UtilAll.java | 16 ++++-
.../rocketmq/common/namesrv/NamesrvConfig.java | 2 +-
.../rocketmq/common/namesrv/TopAddressing.java | 1 +
.../common/protocol/MQProtosHelper.java | 7 +-
.../rocketmq/filtersrv/filter/DynaCode.java | 8 +--
.../processor/ClusterTestRequestProcessor.java | 2 +-
.../remoting/common/RemotingHelper.java | 6 +-
.../rocketmq/remoting/common/RemotingUtil.java | 8 +--
.../rocketmq/remoting/common/ServiceThread.java | 17 ++---
.../rocketmq/remoting/netty/NettyDecoder.java | 1 +
.../remoting/netty/NettyRemotingAbstract.java | 54 +++++++-------
.../store/AllocateMappedFileService.java | 2 +-
.../org/apache/rocketmq/store/CommitLog.java | 62 ++++++++--------
.../org/apache/rocketmq/store/ConsumeQueue.java | 2 +-
.../org/apache/rocketmq/store/MappedFile.java | 2 +-
.../apache/rocketmq/store/StoreCheckpoint.java | 2 +-
.../rocketmq/store/ha/WaitNotifyObject.java | 9 ++-
.../apache/rocketmq/store/index/IndexFile.java | 4 +-
.../rocketmq/store/index/IndexService.java | 2 +-
.../store/schedule/ScheduleMessageService.java | 3 +-
.../rocketmq/store/stats/BrokerStats.java | 1 +
28 files changed, 227 insertions(+), 188 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/246be9eb/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 53968fa..c8624c4 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -210,7 +210,7 @@ public class BrokerController {
this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
} catch (IOException e) {
result = false;
- e.printStackTrace();
+ log.error("Failed to initialize", e);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/246be9eb/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
index 10945da..fb7ea20 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
@@ -67,7 +67,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PullMessageProcessor implements NettyRequestProcessor {
- private static final Logger LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final BrokerController brokerController;
private List<ConsumeMessageHook> consumeMessageHookList;
@@ -94,9 +94,7 @@ public class PullMessageProcessor implements NettyRequestProcessor {
response.setOpaque(request.getOpaque());
- if (LOG.isDebugEnabled()) {
- LOG.debug("receive PullMessage request command, {}", request);
- }
+ log.debug("receive PullMessage request command, {}", request);
if (!PermName.isReadable(this.brokerController.getBrokerConfig().getBrokerPermission())) {
response.setCode(ResponseCode.NO_PERMISSION);
@@ -126,7 +124,7 @@ public class PullMessageProcessor implements NettyRequestProcessor {
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
if (null == topicConfig) {
- LOG.error("The topic {} not exist, consumer: {} ", requestHeader.getTopic(), RemotingHelper.parseChannelRemoteAddr(channel));
+ log.error("the topic {} not exist, consumer: {}", requestHeader.getTopic(), RemotingHelper.parseChannelRemoteAddr(channel));
response.setCode(ResponseCode.TOPIC_NOT_EXIST);
response.setRemark(String.format("topic[%s] not exist, apply first please! %s", requestHeader.getTopic(), FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)));
return response;
@@ -141,7 +139,7 @@ public class PullMessageProcessor implements NettyRequestProcessor {
if (requestHeader.getQueueId() < 0 || requestHeader.getQueueId() >= topicConfig.getReadQueueNums()) {
String errorInfo = String.format("queueId[%d] is illegal, topic:[%s] topicConfig.readQueueNums:[%d] consumer:[%s]",
requestHeader.getQueueId(), requestHeader.getTopic(), topicConfig.getReadQueueNums(), channel.remoteAddress());
- LOG.warn(errorInfo);
+ log.warn(errorInfo);
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(errorInfo);
return response;
@@ -162,7 +160,7 @@ public class PullMessageProcessor implements NettyRequestProcessor {
assert consumerFilterData != null;
}
} catch (Exception e) {
- LOG.warn("Parse the consumer's subscription[{}] failed, group: {}", requestHeader.getSubscription(), //
+ log.warn("Parse the consumer's subscription[{}] failed, group: {}", requestHeader.getSubscription(), //
requestHeader.getConsumerGroup());
response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED);
response.setRemark("parse the consumer's subscription failed");
@@ -172,7 +170,7 @@ public class PullMessageProcessor implements NettyRequestProcessor {
ConsumerGroupInfo consumerGroupInfo =
this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup());
if (null == consumerGroupInfo) {
- LOG.warn("The consumer's group info not exist, group: {}", requestHeader.getConsumerGroup());
+ log.warn("the consumer's group info not exist, group: {}", requestHeader.getConsumerGroup());
response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
response.setRemark("the consumer's group info not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));
return response;
@@ -187,14 +185,14 @@ public class PullMessageProcessor implements NettyRequestProcessor {
subscriptionData = consumerGroupInfo.findSubscriptionData(requestHeader.getTopic());
if (null == subscriptionData) {
- LOG.warn("The consumer's subscription not exist, group: {}, topic:{}", requestHeader.getConsumerGroup(), requestHeader.getTopic());
+ log.warn("the consumer's subscription not exist, group: {}, topic:{}", requestHeader.getConsumerGroup(), requestHeader.getTopic());
response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
response.setRemark("the consumer's subscription not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));
return response;
}
if (subscriptionData.getSubVersion() < requestHeader.getSubVersion()) {
- LOG.warn("The broker's subscription is not latest, group: {} {}", requestHeader.getConsumerGroup(),
+ log.warn("The broker's subscription is not latest, group: {} {}", requestHeader.getConsumerGroup(),
subscriptionData.getSubString());
response.setCode(ResponseCode.SUBSCRIPTION_NOT_LATEST);
response.setRemark("the consumer's subscription not latest");
@@ -209,7 +207,7 @@ public class PullMessageProcessor implements NettyRequestProcessor {
return response;
}
if (consumerFilterData.getClientVersion() < requestHeader.getSubVersion()) {
- LOG.warn("The broker's consumer filter data is not latest, group: {}, topic: {}, serverV: {}, clientV: {}",
+ log.warn("The broker's consumer filter data is not latest, group: {}, topic: {}, serverV: {}, clientV: {}",
requestHeader.getConsumerGroup(), requestHeader.getTopic(), consumerFilterData.getClientVersion(), requestHeader.getSubVersion());
response.setCode(ResponseCode.FILTER_DATA_NOT_LATEST);
response.setRemark("the consumer's consumer filter data not latest");
@@ -287,12 +285,12 @@ public class PullMessageProcessor implements NettyRequestProcessor {
response.setCode(ResponseCode.PULL_OFFSET_MOVED);
// XXX: warn and notify me
- LOG.info("the broker store no queue data, fix the request offset {} to {}, Topic: {} QueueId: {} Consumer Group: {}", //
- requestHeader.getQueueOffset(), //
- getMessageResult.getNextBeginOffset(), //
- requestHeader.getTopic(), //
- requestHeader.getQueueId(), //
- requestHeader.getConsumerGroup()//
+ log.info("the broker store no queue data, fix the request offset {} to {}, Topic: {} QueueId: {} Consumer Group: {}", //
+ requestHeader.getQueueOffset(), //
+ getMessageResult.getNextBeginOffset(), //
+ requestHeader.getTopic(), //
+ requestHeader.getQueueId(), //
+ requestHeader.getConsumerGroup()//
);
} else {
response.setCode(ResponseCode.PULL_NOT_FOUND);
@@ -307,16 +305,17 @@ public class PullMessageProcessor implements NettyRequestProcessor {
case OFFSET_OVERFLOW_BADLY:
response.setCode(ResponseCode.PULL_OFFSET_MOVED);
// XXX: warn and notify me
- LOG.info("The request offset:{} over flow badly, broker max offset:{} , consumer: {}", requestHeader.getQueueOffset(), getMessageResult.getMaxOffset(), channel.remoteAddress());
+ log.info("the request offset: {} over flow badly, broker max offset: {}, consumer: {}",
+ requestHeader.getQueueOffset(), getMessageResult.getMaxOffset(), channel.remoteAddress());
break;
case OFFSET_OVERFLOW_ONE:
response.setCode(ResponseCode.PULL_NOT_FOUND);
break;
case OFFSET_TOO_SMALL:
response.setCode(ResponseCode.PULL_OFFSET_MOVED);
- LOG.info("The request offset is too small. group={}, topic={}, requestOffset={}, brokerMinOffset={}, clientIp={}",
- requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueOffset(),
- getMessageResult.getMinOffset(), channel.remoteAddress());
+ log.info("the request offset too small. group={}, topic={}, requestOffset={}, brokerMinOffset={}, clientIp={}",
+ requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueOffset(),
+ getMessageResult.getMinOffset(), channel.remoteAddress());
break;
default:
assert false;
@@ -391,12 +390,12 @@ public class PullMessageProcessor implements NettyRequestProcessor {
public void operationComplete(ChannelFuture future) throws Exception {
getMessageResult.release();
if (!future.isSuccess()) {
- LOG.error("Fail to transfer messages from page cache to {}", channel.remoteAddress(), future.cause());
+ log.error("transfer many message by pagecache failed, {}", channel.remoteAddress(), future.cause());
}
}
});
} catch (Throwable e) {
- LOG.error("Error occurred when transferring messages from page cache", e);
+ log.error("transfer many message by pagecache exception", e);
getMessageResult.release();
}
@@ -437,16 +436,16 @@ public class PullMessageProcessor implements NettyRequestProcessor {
event.setOffsetRequest(requestHeader.getQueueOffset());
event.setOffsetNew(getMessageResult.getNextBeginOffset());
this.generateOffsetMovedEvent(event);
- LOG.warn(
- "PULL_OFFSET_MOVED:correction offset. topic={}, groupId={}, requestOffset={}, newOffset={}, suggestBrokerId={}",
- requestHeader.getTopic(), requestHeader.getConsumerGroup(), event.getOffsetRequest(), event.getOffsetNew(),
- responseHeader.getSuggestWhichBrokerId());
+ log.warn(
+ "PULL_OFFSET_MOVED:correction offset. topic={}, groupId={}, requestOffset={}, newOffset={}, suggestBrokerId={}",
+ requestHeader.getTopic(), requestHeader.getConsumerGroup(), event.getOffsetRequest(), event.getOffsetNew(),
+ responseHeader.getSuggestWhichBrokerId());
} else {
responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());
response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
- LOG.warn("PULL_OFFSET_MOVED:none correction. topic={}, groupId={}, requestOffset={}, suggestBrokerId={}",
- requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getQueueOffset(),
- responseHeader.getSuggestWhichBrokerId());
+ log.warn("PULL_OFFSET_MOVED:none correction. topic={}, groupId={}, requestOffset={}, suggestBrokerId={}",
+ requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getQueueOffset(),
+ responseHeader.getSuggestWhichBrokerId());
}
break;
@@ -525,7 +524,7 @@ public class PullMessageProcessor implements NettyRequestProcessor {
PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
} catch (Exception e) {
- LOG.warn(String.format("GenerateOffsetMovedEvent Exception, %s", event.toString()), e);
+ log.warn(String.format("generateOffsetMovedEvent Exception, %s", event.toString()), e);
}
}
@@ -544,20 +543,21 @@ public class PullMessageProcessor implements NettyRequestProcessor {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
- LOG.error("ProcessRequestWrapper response to {} failed", future.channel().remoteAddress(), future.cause());
- LOG.error(request.toString());
- LOG.error(response.toString());
+ log.error("processRequestWrapper response to {} failed",
+ future.channel().remoteAddress(), future.cause());
+ log.error(request.toString());
+ log.error(response.toString());
}
}
});
} catch (Throwable e) {
- LOG.error("ProcessRequestWrapper process request over, but response failed", e);
- LOG.error(request.toString());
- LOG.error(response.toString());
+ log.error("processRequestWrapper process request over, but response failed", e);
+ log.error(request.toString());
+ log.error(response.toString());
}
}
} catch (RemotingCommandException e1) {
- LOG.error("ExecuteRequestWhenWakeup run", e1);
+ log.error("excuteRequestWhenWakeup run", e1);
}
}
};
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/246be9eb/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
index 3bcafc0..0d10a16 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
@@ -41,7 +41,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TopicConfigManager extends ConfigManager {
- private static final Logger LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final long LOCK_TIMEOUT_MILLIS = 3000;
private transient final Lock lockTopicConfigTable = new ReentrantLock();
@@ -181,15 +181,17 @@ public class TopicConfigManager extends ConfigManager {
topicConfig.setTopicSysFlag(topicSysFlag);
topicConfig.setTopicFilterType(defaultTopicConfig.getTopicFilterType());
} else {
- LOG.warn("Create new topic failed, because the default topic[{}] has no perm [{}] producer:[{}]",
- defaultTopic, defaultTopicConfig.getPerm(), remoteAddress);
+ log.warn("Create new topic failed, because the default topic[{}] has no perm [{}] producer:[{}]",
+ defaultTopic, defaultTopicConfig.getPerm(), remoteAddress);
}
} else {
- LOG.warn("Create new topic failed, because the default topic[{}] not exist. producer:[{}]", defaultTopic, remoteAddress);
+ log.warn("Create new topic failed, because the default topic[{}] not exist. producer:[{}]",
+ defaultTopic, remoteAddress);
}
if (topicConfig != null) {
- LOG.info("Create new topic by default topic:[{}] config:[{}] producer:[{}]", defaultTopic, topicConfig, remoteAddress);
+ log.info("Create new topic by default topic:[{}] config:[{}] producer:[{}]",
+ defaultTopic, topicConfig, remoteAddress);
this.topicConfigTable.put(topic, topicConfig);
@@ -204,7 +206,7 @@ public class TopicConfigManager extends ConfigManager {
}
}
} catch (InterruptedException e) {
- LOG.error("createTopicInSendMessageMethod exception", e);
+ log.error("createTopicInSendMessageMethod exception", e);
}
if (createNew) {
@@ -238,7 +240,7 @@ public class TopicConfigManager extends ConfigManager {
topicConfig.setPerm(perm);
topicConfig.setTopicSysFlag(topicSysFlag);
- LOG.info("create new topic {}", topicConfig);
+ log.info("create new topic {}", topicConfig);
this.topicConfigTable.put(topic, topicConfig);
createNew = true;
this.dataVersion.nextVersion();
@@ -248,7 +250,7 @@ public class TopicConfigManager extends ConfigManager {
}
}
} catch (InterruptedException e) {
- LOG.error("createTopicInSendMessageBackMethod exception", e);
+ log.error("createTopicInSendMessageBackMethod exception", e);
}
if (createNew) {
@@ -269,8 +271,8 @@ public class TopicConfigManager extends ConfigManager {
topicConfig.setTopicSysFlag(TopicSysFlag.clearUnitFlag(oldTopicSysFlag));
}
- LOG.info("update topic sys flag. oldTopicSysFlag={}, newTopicSysFlag", oldTopicSysFlag,
- topicConfig.getTopicSysFlag());
+ log.info("update topic sys flag. oldTopicSysFlag={}, newTopicSysFlag", oldTopicSysFlag,
+ topicConfig.getTopicSysFlag());
this.topicConfigTable.put(topic, topicConfig);
@@ -289,8 +291,8 @@ public class TopicConfigManager extends ConfigManager {
topicConfig.setTopicSysFlag(TopicSysFlag.setUnitSubFlag(oldTopicSysFlag));
}
- LOG.info("update topic sys flag. oldTopicSysFlag={}, newTopicSysFlag", oldTopicSysFlag,
- topicConfig.getTopicSysFlag());
+ log.info("update topic sys flag. oldTopicSysFlag={}, newTopicSysFlag", oldTopicSysFlag,
+ topicConfig.getTopicSysFlag());
this.topicConfigTable.put(topic, topicConfig);
@@ -304,9 +306,9 @@ public class TopicConfigManager extends ConfigManager {
public void updateTopicConfig(final TopicConfig topicConfig) {
TopicConfig old = this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
if (old != null) {
- LOG.info("update topic config, old:[{}] new:[{}]", old, topicConfig);
+ log.info("update topic config, old:[{}] new:[{}]", old, topicConfig);
} else {
- LOG.info("create new topic [{}]", topicConfig);
+ log.info("create new topic [{}]", topicConfig);
}
this.dataVersion.nextVersion();
@@ -324,7 +326,7 @@ public class TopicConfigManager extends ConfigManager {
if (topicConfig != null && !topicConfig.isOrder()) {
topicConfig.setOrder(true);
isChange = true;
- LOG.info("update order topic config, topic={}, order={}", topic, true);
+ log.info("update order topic config, topic={}, order={}", topic, true);
}
}
@@ -335,7 +337,7 @@ public class TopicConfigManager extends ConfigManager {
if (topicConfig.isOrder()) {
topicConfig.setOrder(false);
isChange = true;
- LOG.info("update order topic config, topic={}, order={}", topic, false);
+ log.info("update order topic config, topic={}, order={}", topic, false);
}
}
}
@@ -359,11 +361,11 @@ public class TopicConfigManager extends ConfigManager {
public void deleteTopicConfig(final String topic) {
TopicConfig old = this.topicConfigTable.remove(topic);
if (old != null) {
- LOG.info("Delete topic config OK, topic:{}", old);
+ log.info("delete topic config OK, topic: {}", old);
this.dataVersion.nextVersion();
this.persist();
} else {
- LOG.warn("Delete topic config failed, topic:{} not exist", topic);
+ log.warn("delete topic config failed, topic: {} not exists", topic);
}
}
@@ -409,7 +411,7 @@ public class TopicConfigManager extends ConfigManager {
Iterator<Entry<String, TopicConfig>> it = tcs.getTopicConfigTable().entrySet().iterator();
while (it.hasNext()) {
Entry<String, TopicConfig> next = it.next();
- LOG.info("load exist local topic, {}", next.getValue().toString());
+ log.info("load exist local topic, {}", next.getValue().toString());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/246be9eb/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 5bce013..f22729c 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -16,13 +16,19 @@
*/
package org.apache.rocketmq.common;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
import org.apache.rocketmq.common.annotation.ImportantField;
+import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
public class BrokerConfig {
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
+
private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
@ImportantField
private String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV));
@@ -121,16 +127,6 @@ public class BrokerConfig {
private boolean filterSupportRetry = false;
private boolean enablePropertyFilter = false;
- public static String localHostName() {
- try {
- return InetAddress.getLocalHost().getHostName();
- } catch (UnknownHostException e) {
- e.printStackTrace();
- }
-
- return "DEFAULT_BROKER";
- }
-
public boolean isTraceOn() {
return traceOn;
}
@@ -179,6 +175,16 @@ public class BrokerConfig {
this.slaveReadEnable = slaveReadEnable;
}
+ public static String localHostName() {
+ try {
+ return InetAddress.getLocalHost().getHostName();
+ } catch (UnknownHostException e) {
+ log.error("Failed to obtain the host name", e);
+ }
+
+ return "DEFAULT_BROKER";
+ }
+
public int getRegisterBrokerTimeoutMills() {
return registerBrokerTimeoutMills;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/246be9eb/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java b/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java
index f826a2e..c33ebdf 100644
--- a/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java
+++ b/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java
@@ -22,7 +22,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class ConfigManager {
- private static final Logger PLOG = LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
public abstract String encode();
@@ -36,11 +36,11 @@ public abstract class ConfigManager {
return this.loadBak();
} else {
this.decode(jsonString);
- PLOG.info("load {} OK", fileName);
+ log.info("load {} OK", fileName);
return true;
}
} catch (Exception e) {
- PLOG.error("load " + fileName + " Failed, and try to load backup file", e);
+ log.error("load [{}] failed, and try to load backup file", fileName, e);
return this.loadBak();
}
}
@@ -54,11 +54,11 @@ public abstract class ConfigManager {
String jsonString = MixAll.file2String(fileName + ".bak");
if (jsonString != null && jsonString.length() > 0) {
this.decode(jsonString);
- PLOG.info("load " + fileName + " OK");
+ log.info("load [{}] OK", fileName);
return true;
}
} catch (Exception e) {
- PLOG.error("load " + fileName + " Failed", e);
+ log.error("load [{}] Failed", fileName, e);
return false;
}
@@ -74,7 +74,7 @@ public abstract class ConfigManager {
try {
MixAll.string2File(jsonString, fileName);
} catch (IOException e) {
- PLOG.error("persist file Exception, " + fileName, e);
+ log.error("persist file [{}] exception", fileName, e);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/246be9eb/common/src/main/java/org/apache/rocketmq/common/MixAll.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/MixAll.java b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
index 36d81d0..f8e9b4e 100644
--- a/common/src/main/java/org/apache/rocketmq/common/MixAll.java
+++ b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
@@ -22,7 +22,6 @@ import java.io.FileInputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
-import java.io.UnsupportedEncodingException;
import java.lang.annotation.Annotation;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
@@ -43,10 +42,14 @@ import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.common.annotation.ImportantField;
+import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.help.FAQUrl;
import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class MixAll {
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
+
public static final String ROCKETMQ_HOME_ENV = "ROCKETMQ_HOME";
public static final String ROCKETMQ_HOME_PROPERTY = "rocketmq.home.dir";
public static final String NAMESRV_ADDR_ENV = "NAMESRV_ADDR";
@@ -243,11 +246,11 @@ public class MixAll {
return url.getPath();
}
- public static void printObjectProperties(final Logger log, final Object object) {
- printObjectProperties(log, object, false);
+ public static void printObjectProperties(final Logger logger, final Object object) {
+ printObjectProperties(logger, object, false);
}
- public static void printObjectProperties(final Logger log, final Object object, final boolean onlyImportantField) {
+ public static void printObjectProperties(final Logger logger, final Object object, final boolean onlyImportantField) {
Field[] fields = object.getClass().getDeclaredFields();
for (Field field : fields) {
if (!Modifier.isStatic(field.getModifiers())) {
@@ -261,7 +264,7 @@ public class MixAll {
value = "";
}
} catch (IllegalAccessException e) {
- e.printStackTrace();
+ log.error("Failed to obtain object properties", e);
}
if (onlyImportantField) {
@@ -271,8 +274,9 @@ public class MixAll {
}
}
- if (log != null) {
- log.info(name + "=" + value);
+ if (logger != null) {
+ logger.info(name + "=" + value);
+ } else {
}
}
}
@@ -294,11 +298,8 @@ public class MixAll {
try {
InputStream in = new ByteArrayInputStream(str.getBytes(DEFAULT_CHARSET));
properties.load(in);
- } catch (UnsupportedEncodingException e) {
- e.printStackTrace();
- return null;
- } catch (IOException e) {
- e.printStackTrace();
+ } catch (Exception e) {
+ log.error("Failed to handle properties", e);
return null;
}
@@ -318,7 +319,7 @@ public class MixAll {
field.setAccessible(true);
value = field.get(object);
} catch (IllegalAccessException e) {
- e.printStackTrace();
+ log.error("Failed to handle properties", e);
}
if (value != null) {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/246be9eb/common/src/main/java/org/apache/rocketmq/common/ServiceThread.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/ServiceThread.java b/common/src/main/java/org/apache/rocketmq/common/ServiceThread.java
index 6a30e4e..7b96880 100644
--- a/common/src/main/java/org/apache/rocketmq/common/ServiceThread.java
+++ b/common/src/main/java/org/apache/rocketmq/common/ServiceThread.java
@@ -23,7 +23,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class ServiceThread implements Runnable {
- private static final Logger STLOG = LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
+
private static final long JOIN_TIME = 90 * 1000;
protected final Thread thread;
@@ -47,7 +48,7 @@ public abstract class ServiceThread implements Runnable {
public void shutdown(final boolean interrupt) {
this.stopped = true;
- STLOG.info("shutdown thread " + this.getServiceName() + " interrupt " + interrupt);
+ log.info("shutdown thread " + this.getServiceName() + " interrupt " + interrupt);
if (hasNotified.compareAndSet(false, true)) {
waitPoint.countDown(); // notify
@@ -63,10 +64,10 @@ public abstract class ServiceThread implements Runnable {
this.thread.join(this.getJointime());
}
long eclipseTime = System.currentTimeMillis() - beginTime;
- STLOG.info("join thread " + this.getServiceName() + " eclipse time(ms) " + eclipseTime + " "
- + this.getJointime());
+ log.info("join thread " + this.getServiceName() + " eclipse time(ms) " + eclipseTime + " "
+ + this.getJointime());
} catch (InterruptedException e) {
- e.printStackTrace();
+ log.error("Interrupted", e);
}
}
@@ -80,7 +81,7 @@ public abstract class ServiceThread implements Runnable {
public void stop(final boolean interrupt) {
this.stopped = true;
- STLOG.info("stop thread " + this.getServiceName() + " interrupt " + interrupt);
+ log.info("stop thread " + this.getServiceName() + " interrupt " + interrupt);
if (hasNotified.compareAndSet(false, true)) {
waitPoint.countDown(); // notify
@@ -93,7 +94,7 @@ public abstract class ServiceThread implements Runnable {
public void makeStop() {
this.stopped = true;
- STLOG.info("makestop thread " + this.getServiceName());
+ log.info("makestop thread " + this.getServiceName());
}
public void wakeup() {
@@ -114,7 +115,7 @@ public abstract class ServiceThread implements Runnable {
try {
waitPoint.await(interval, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
- e.printStackTrace();
+ log.error("Interrupted", e);
} finally {
hasNotified.set(false);
this.onWaitEnd();
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/246be9eb/common/src/main/java/org/apache/rocketmq/common/UtilAll.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java
index e9d926f..15d4108 100644
--- a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java
+++ b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java
@@ -36,9 +36,16 @@ import java.util.Map;
import java.util.zip.CRC32;
import java.util.zip.DeflaterOutputStream;
import java.util.zip.InflaterInputStream;
+
+import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
public class UtilAll {
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
+
public static final String YYYY_MM_DD_HH_MM_SS = "yyyy-MM-dd HH:mm:ss";
public static final String YYYY_MM_DD_HH_MM_SS_SSS = "yyyy-MM-dd#HH:mm:ss:SSS";
public static final String YYYYMMDDHHMMSS = "yyyyMMddHHmmss";
@@ -269,15 +276,18 @@ public class UtilAll {
} finally {
try {
byteArrayInputStream.close();
- } catch (IOException ignored) {
+ } catch (IOException e) {
+ log.error("Failed to close the stream", e);
}
try {
inflaterInputStream.close();
- } catch (IOException ignored) {
+ } catch (IOException e) {
+ log.error("Failed to close the stream", e);
}
try {
byteArrayOutputStream.close();
- } catch (IOException ignored) {
+ } catch (IOException e) {
+ log.error("Failed to close the stream", e);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/246be9eb/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java b/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java
index 63f0fe0..6f740f7 100644
--- a/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java
@@ -28,8 +28,8 @@ import org.slf4j.LoggerFactory;
public class NamesrvConfig {
private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
- private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
+ private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json";
private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties";
private String productEnvName = "center";
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/246be9eb/common/src/main/java/org/apache/rocketmq/common/namesrv/TopAddressing.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/namesrv/TopAddressing.java b/common/src/main/java/org/apache/rocketmq/common/namesrv/TopAddressing.java
index 990e748..57af0e7 100644
--- a/common/src/main/java/org/apache/rocketmq/common/namesrv/TopAddressing.java
+++ b/common/src/main/java/org/apache/rocketmq/common/namesrv/TopAddressing.java
@@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory;
public class TopAddressing {
private static final Logger log = LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
+
private String nsAddr;
private String wsAddr;
private String unitName;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/246be9eb/common/src/main/java/org/apache/rocketmq/common/protocol/MQProtosHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/MQProtosHelper.java b/common/src/main/java/org/apache/rocketmq/common/protocol/MQProtosHelper.java
index bff7333..c1cd69c 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/MQProtosHelper.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/MQProtosHelper.java
@@ -17,11 +17,16 @@
package org.apache.rocketmq.common.protocol;
+import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerRequestHeader;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class MQProtosHelper {
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
+
public static boolean registerBrokerToNameServer(final String nsaddr, final String brokerAddr,
final long timeoutMillis) {
RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
@@ -36,7 +41,7 @@ public class MQProtosHelper {
return ResponseCode.SUCCESS == response.getCode();
}
} catch (Exception e) {
- e.printStackTrace();
+ log.error("Failed to register broker", e);
}
return false;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/246be9eb/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/DynaCode.java
----------------------------------------------------------------------
diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/DynaCode.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/DynaCode.java
index e0a94d7..92bbf8d 100644
--- a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/DynaCode.java
+++ b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/DynaCode.java
@@ -44,7 +44,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DynaCode {
- private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME);
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME);
private static final String FILE_SP = System.getProperty("file.separator");
@@ -231,7 +231,7 @@ public class DynaCode {
loadClass.put(getFullClassName(code), null);
}
if (null != srcFile) {
- LOGGER.warn("Dyna Create Java Source File:---->" + srcFile.getAbsolutePath());
+ log.warn("Dyna Create Java Source File:----> {}", srcFile.getAbsolutePath());
srcFileAbsolutePaths.add(srcFile.getAbsolutePath());
srcFile.deleteOnExit();
}
@@ -277,9 +277,9 @@ public class DynaCode {
Class<?> classz = classLoader.loadClass(key);
if (null != classz) {
loadClass.put(key, classz);
- LOGGER.info("Dyna Load Java Class File OK:----> className: " + key);
+ log.info("Dyna Load Java Class File OK:----> className: {}", key);
} else {
- LOGGER.error("Dyna Load Java Class File Fail:----> className: " + key);
+ log.error("Dyna Load Java Class File Fail:----> className: {}", key);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/246be9eb/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessor.java
----------------------------------------------------------------------
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessor.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessor.java
index 2fdd4fb..04cf870 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessor.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessor.java
@@ -45,7 +45,7 @@ public class ClusterTestRequestProcessor extends DefaultRequestProcessor {
try {
adminExt.start();
} catch (MQClientException e) {
- e.printStackTrace();
+ log.error("Failed to start processor", e);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/246be9eb/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
index 8c4fab7..40c5943 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
@@ -26,11 +26,15 @@ import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class RemotingHelper {
public static final String ROCKETMQ_REMOTING = "RocketmqRemoting";
public static final String DEFAULT_CHARSET = "UTF-8";
+ private static final Logger log = LoggerFactory.getLogger(ROCKETMQ_REMOTING);
+
public static String exceptionSimpleDesc(final Throwable e) {
StringBuffer sb = new StringBuffer();
if (e != null) {
@@ -126,7 +130,7 @@ public class RemotingHelper {
byteBufferBody.flip();
return RemotingCommand.decode(byteBufferBody);
} catch (IOException e) {
- e.printStackTrace();
+ log.error("invokeSync failure", e);
if (sendRequestOK) {
throw new RemotingTimeoutException(addr, timeoutMillis);
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/246be9eb/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java
index f64f9e1..8d24e76 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java
@@ -26,8 +26,6 @@ import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.NetworkInterface;
import java.net.SocketAddress;
-import java.net.SocketException;
-import java.net.UnknownHostException;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
@@ -130,10 +128,8 @@ public class RemotingUtil {
//If failed to find,fall back to localhost
final InetAddress localHost = InetAddress.getLocalHost();
return normalizeHostAddress(localHost);
- } catch (SocketException e) {
- e.printStackTrace();
- } catch (UnknownHostException e) {
- e.printStackTrace();
+ } catch (Exception e) {
+ log.error("Failed to obtain local address", e);
}
return null;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/246be9eb/remoting/src/main/java/org/apache/rocketmq/remoting/common/ServiceThread.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/common/ServiceThread.java b/remoting/src/main/java/org/apache/rocketmq/remoting/common/ServiceThread.java
index 7c7e89b..8436189 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/common/ServiceThread.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/common/ServiceThread.java
@@ -23,7 +23,8 @@ import org.slf4j.LoggerFactory;
* Base class for background thread
*/
public abstract class ServiceThread implements Runnable {
- private static final Logger STLOG = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
+ private static final Logger log = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
+
private static final long JOIN_TIME = 90 * 1000;
protected final Thread thread;
protected volatile boolean hasNotified = false;
@@ -45,7 +46,7 @@ public abstract class ServiceThread implements Runnable {
public void shutdown(final boolean interrupt) {
this.stopped = true;
- STLOG.info("shutdown thread " + this.getServiceName() + " interrupt " + interrupt);
+ log.info("shutdown thread " + this.getServiceName() + " interrupt " + interrupt);
synchronized (this) {
if (!this.hasNotified) {
this.hasNotified = true;
@@ -61,10 +62,10 @@ public abstract class ServiceThread implements Runnable {
long beginTime = System.currentTimeMillis();
this.thread.join(this.getJointime());
long eclipseTime = System.currentTimeMillis() - beginTime;
- STLOG.info("join thread " + this.getServiceName() + " eclipse time(ms) " + eclipseTime + " "
- + this.getJointime());
+ log.info("join thread " + this.getServiceName() + " eclipse time(ms) " + eclipseTime + " "
+ + this.getJointime());
} catch (InterruptedException e) {
- e.printStackTrace();
+ log.error("Interrupted", e);
}
}
@@ -78,7 +79,7 @@ public abstract class ServiceThread implements Runnable {
public void stop(final boolean interrupt) {
this.stopped = true;
- STLOG.info("stop thread " + this.getServiceName() + " interrupt " + interrupt);
+ log.info("stop thread " + this.getServiceName() + " interrupt " + interrupt);
synchronized (this) {
if (!this.hasNotified) {
this.hasNotified = true;
@@ -93,7 +94,7 @@ public abstract class ServiceThread implements Runnable {
public void makeStop() {
this.stopped = true;
- STLOG.info("makestop thread " + this.getServiceName());
+ log.info("makestop thread " + this.getServiceName());
}
public void wakeup() {
@@ -116,7 +117,7 @@ public abstract class ServiceThread implements Runnable {
try {
this.wait(interval);
} catch (InterruptedException e) {
- e.printStackTrace();
+ log.error("Interrupted", e);
} finally {
this.hasNotified = false;
this.onWaitEnd();
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/246be9eb/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyDecoder.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyDecoder.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyDecoder.java
index 33cef33..4ed156d 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyDecoder.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyDecoder.java
@@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory;
public class NettyDecoder extends LengthFieldBasedFrameDecoder {
private static final Logger log = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
+
private static final int FRAME_MAX_LENGTH = //
Integer.parseInt(System.getProperty("com.rocketmq.remoting.frameMaxLength", "16777216"));
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/246be9eb/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
index 0ba714a..ba74b53 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
@@ -53,7 +53,7 @@ public abstract class NettyRemotingAbstract {
/**
* Remoting logger instance.
*/
- private static final Logger PLOG = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
+ private static final Logger log = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
/**
* Semaphore to limit maximum number of on-going one-way requests, which protects system memory footprint.
@@ -175,17 +175,17 @@ public abstract class NettyRemotingAbstract {
try {
ctx.writeAndFlush(response);
} catch (Throwable e) {
- PLOG.error("process request over, but response failed", e);
- PLOG.error(cmd.toString());
- PLOG.error(response.toString());
+ log.error("process request over, but response failed", e);
+ log.error(cmd.toString());
+ log.error(response.toString());
}
} else {
}
}
} catch (Throwable e) {
- PLOG.error("process request exception", e);
- PLOG.error(cmd.toString());
+ log.error("process request exception", e);
+ log.error(cmd.toString());
if (!cmd.isOnewayRPC()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, //
@@ -210,7 +210,7 @@ public abstract class NettyRemotingAbstract {
pair.getObject2().submit(requestTask);
} catch (RejectedExecutionException e) {
if ((System.currentTimeMillis() % 10000) == 0) {
- PLOG.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) //
+ log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) //
+ ", too many requests and system thread pool busy, RejectedExecutionException " //
+ pair.getObject2().toString() //
+ " request code: " + cmd.getCode());
@@ -229,7 +229,7 @@ public abstract class NettyRemotingAbstract {
RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
response.setOpaque(opaque);
ctx.writeAndFlush(response);
- PLOG.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
+ log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
}
}
@@ -254,8 +254,8 @@ public abstract class NettyRemotingAbstract {
responseFuture.putResponse(cmd);
}
} else {
- PLOG.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
- PLOG.warn(cmd.toString());
+ log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+ log.warn(cmd.toString());
}
}
@@ -274,13 +274,13 @@ public abstract class NettyRemotingAbstract {
try {
responseFuture.executeInvokeCallback();
} catch (Throwable e) {
- PLOG.warn("execute callback in executor exception, and callback throw", e);
+ log.warn("execute callback in executor exception, and callback throw", e);
}
}
});
} catch (Exception e) {
runInThisThread = true;
- PLOG.warn("execute callback in executor exception, maybe executor busy", e);
+ log.warn("execute callback in executor exception, maybe executor busy", e);
}
} else {
runInThisThread = true;
@@ -290,7 +290,7 @@ public abstract class NettyRemotingAbstract {
try {
responseFuture.executeInvokeCallback();
} catch (Throwable e) {
- PLOG.warn("executeInvokeCallback Exception", e);
+ log.warn("executeInvokeCallback Exception", e);
}
}
}
@@ -324,7 +324,7 @@ public abstract class NettyRemotingAbstract {
rep.release();
it.remove();
rfList.add(rep);
- PLOG.warn("remove timeout request, " + rep);
+ log.warn("remove timeout request, " + rep);
}
}
@@ -332,7 +332,7 @@ public abstract class NettyRemotingAbstract {
try {
executeInvokeCallback(rf);
} catch (Throwable e) {
- PLOG.warn("scanResponseTable, operationComplete Exception", e);
+ log.warn("scanResponseTable, operationComplete Exception", e);
}
}
}
@@ -358,7 +358,7 @@ public abstract class NettyRemotingAbstract {
responseTable.remove(opaque);
responseFuture.setCause(f.cause());
responseFuture.putResponse(null);
- PLOG.warn("send a request command to channel <" + addr + "> failed.");
+ log.warn("send a request command to channel <" + addr + "> failed.");
}
});
@@ -404,17 +404,17 @@ public abstract class NettyRemotingAbstract {
try {
executeInvokeCallback(responseFuture);
} catch (Throwable e) {
- PLOG.warn("excute callback in writeAndFlush addListener, and callback throw", e);
+ log.warn("excute callback in writeAndFlush addListener, and callback throw", e);
} finally {
responseFuture.release();
}
- PLOG.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));
+ log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));
}
});
} catch (Exception e) {
responseFuture.release();
- PLOG.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e);
+ log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e);
throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
}
} else {
@@ -427,7 +427,7 @@ public abstract class NettyRemotingAbstract {
this.semaphoreAsync.getQueueLength(), //
this.semaphoreAsync.availablePermits()//
);
- PLOG.warn(info);
+ log.warn(info);
throw new RemotingTimeoutException(info);
}
}
@@ -445,13 +445,13 @@ public abstract class NettyRemotingAbstract {
public void operationComplete(ChannelFuture f) throws Exception {
once.release();
if (!f.isSuccess()) {
- PLOG.warn("send a request command to channel <" + channel.remoteAddress() + "> failed.");
+ log.warn("send a request command to channel <" + channel.remoteAddress() + "> failed.");
}
}
});
} catch (Exception e) {
once.release();
- PLOG.warn("write send a request command to channel <" + channel.remoteAddress() + "> failed.");
+ log.warn("write send a request command to channel <" + channel.remoteAddress() + "> failed.");
throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
}
} else {
@@ -464,7 +464,7 @@ public abstract class NettyRemotingAbstract {
this.semaphoreOneway.getQueueLength(), //
this.semaphoreOneway.availablePermits()//
);
- PLOG.warn(info);
+ log.warn(info);
throw new RemotingTimeoutException(info);
}
}
@@ -478,13 +478,13 @@ public abstract class NettyRemotingAbstract {
if (this.eventQueue.size() <= maxSize) {
this.eventQueue.add(event);
} else {
- PLOG.warn("event queue size[{}] enough, so drop this event {}", this.eventQueue.size(), event.toString());
+ log.warn("event queue size[{}] enough, so drop this event {}", this.eventQueue.size(), event.toString());
}
}
@Override
public void run() {
- PLOG.info(this.getServiceName() + " service started");
+ log.info(this.getServiceName() + " service started");
final ChannelEventListener listener = NettyRemotingAbstract.this.getChannelEventListener();
@@ -511,11 +511,11 @@ public abstract class NettyRemotingAbstract {
}
}
} catch (Exception e) {
- PLOG.warn(this.getServiceName() + " service has exception. ", e);
+ log.warn(this.getServiceName() + " service has exception. ", e);
}
}
- PLOG.info(this.getServiceName() + " service end");
+ log.info(this.getServiceName() + " service end");
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/246be9eb/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java b/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java
index abb8385..ad8e65d 100644
--- a/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java
@@ -127,7 +127,7 @@ public class AllocateMappedFileService extends ServiceThread {
try {
this.thread.join(this.getJointime());
} catch (InterruptedException e) {
- e.printStackTrace();
+ log.error("Interrupted", e);
}
for (AllocateRequest req : this.requestTable.values()) {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/246be9eb/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index b44211c..0810d0c 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -903,35 +903,6 @@ public class CommitLog {
- public static class GroupCommitRequest {
- private final long nextOffset;
- private final CountDownLatch countDownLatch = new CountDownLatch(1);
- private volatile boolean flushOK = false;
-
- public GroupCommitRequest(long nextOffset) {
- this.nextOffset = nextOffset;
- }
-
- public long getNextOffset() {
- return nextOffset;
- }
-
- public void wakeupCustomer(final boolean flushOK) {
- this.flushOK = flushOK;
- this.countDownLatch.countDown();
- }
-
- public boolean waitForFlush(long timeout) {
- try {
- this.countDownLatch.await(timeout, TimeUnit.MILLISECONDS);
- return this.flushOK;
- } catch (InterruptedException e) {
- e.printStackTrace();
- return false;
- }
- }
- }
-
abstract class FlushCommitLogService extends ServiceThread {
protected static final int RETRY_TIMES_OVER = 10;
}
@@ -1070,6 +1041,39 @@ public class CommitLog {
}
}
+ public static class GroupCommitRequest {
+ private final long nextOffset;
+ private final CountDownLatch countDownLatch = new CountDownLatch(1);
+ private volatile boolean flushOK = false;
+
+
+ public GroupCommitRequest(long nextOffset) {
+ this.nextOffset = nextOffset;
+ }
+
+
+ public long getNextOffset() {
+ return nextOffset;
+ }
+
+
+ public void wakeupCustomer(final boolean flushOK) {
+ this.flushOK = flushOK;
+ this.countDownLatch.countDown();
+ }
+
+
+ public boolean waitForFlush(long timeout) {
+ try {
+ this.countDownLatch.await(timeout, TimeUnit.MILLISECONDS);
+ return this.flushOK;
+ } catch (InterruptedException e) {
+ log.error("Interrupted", e);
+ return false;
+ }
+ }
+ }
+
/**
* GroupCommit Service
*/
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/246be9eb/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
index d03ff0f..275334c 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
@@ -25,9 +25,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ConsumeQueue {
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
public static final int CQ_STORE_UNIT_SIZE = 20;
- private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
private static final Logger LOG_ERROR = LoggerFactory.getLogger(LoggerName.STORE_ERROR_LOGGER_NAME);
private final DefaultMessageStore defaultMessageStore;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/246be9eb/store/src/main/java/org/apache/rocketmq/store/MappedFile.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/MappedFile.java b/store/src/main/java/org/apache/rocketmq/store/MappedFile.java
index a9a00a8..4250450 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MappedFile.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MappedFile.java
@@ -512,7 +512,7 @@ public class MappedFile extends ReferenceResource {
try {
Thread.sleep(0);
} catch (InterruptedException e) {
- e.printStackTrace();
+ log.error("Interrupted", e);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/246be9eb/store/src/main/java/org/apache/rocketmq/store/StoreCheckpoint.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/StoreCheckpoint.java b/store/src/main/java/org/apache/rocketmq/store/StoreCheckpoint.java
index 5484dce..c5981c6 100644
--- a/store/src/main/java/org/apache/rocketmq/store/StoreCheckpoint.java
+++ b/store/src/main/java/org/apache/rocketmq/store/StoreCheckpoint.java
@@ -71,7 +71,7 @@ public class StoreCheckpoint {
try {
this.fileChannel.close();
} catch (IOException e) {
- e.printStackTrace();
+ log.error("Failed to properly close the channel", e);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/246be9eb/store/src/main/java/org/apache/rocketmq/store/ha/WaitNotifyObject.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/WaitNotifyObject.java b/store/src/main/java/org/apache/rocketmq/store/ha/WaitNotifyObject.java
index fb283d6..6aba375 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/WaitNotifyObject.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/WaitNotifyObject.java
@@ -16,9 +16,14 @@
*/
package org.apache.rocketmq.store.ha;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.HashMap;
public class WaitNotifyObject {
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
protected final HashMap<Long/* thread id */, Boolean/* notified */> waitingThreadTable =
new HashMap<Long, Boolean>(16);
@@ -45,7 +50,7 @@ public class WaitNotifyObject {
try {
this.wait(interval);
} catch (InterruptedException e) {
- e.printStackTrace();
+ log.error("Interrupted", e);
} finally {
this.hasNotified = false;
this.onWaitEnd();
@@ -84,7 +89,7 @@ public class WaitNotifyObject {
try {
this.wait(interval);
} catch (InterruptedException e) {
- e.printStackTrace();
+ log.error("Interrupted", e);
} finally {
this.waitingThreadTable.put(currentThreadId, false);
this.onWaitEnd();
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/246be9eb/store/src/main/java/org/apache/rocketmq/store/index/IndexFile.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/index/IndexFile.java b/store/src/main/java/org/apache/rocketmq/store/index/IndexFile.java
index fb75cf9..54f5732 100644
--- a/store/src/main/java/org/apache/rocketmq/store/index/IndexFile.java
+++ b/store/src/main/java/org/apache/rocketmq/store/index/IndexFile.java
@@ -147,7 +147,7 @@ public class IndexFile {
try {
fileLock.release();
} catch (IOException e) {
- e.printStackTrace();
+ log.error("Failed to release the lock", e);
}
}
}
@@ -254,7 +254,7 @@ public class IndexFile {
try {
fileLock.release();
} catch (IOException e) {
- e.printStackTrace();
+ log.error("Failed to release the lock", e);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/246be9eb/store/src/main/java/org/apache/rocketmq/store/index/IndexService.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/index/IndexService.java b/store/src/main/java/org/apache/rocketmq/store/index/IndexService.java
index 1ebf52a..c434df5 100644
--- a/store/src/main/java/org/apache/rocketmq/store/index/IndexService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/index/IndexService.java
@@ -275,7 +275,7 @@ public class IndexService {
log.info("Tried to create index file " + times + " times");
Thread.sleep(1000);
} catch (InterruptedException e) {
- e.printStackTrace();
+ log.error("Interrupted", e);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/246be9eb/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
index 012a4f0..25640a4 100644
--- a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
@@ -44,8 +44,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ScheduleMessageService extends ConfigManager {
- public static final String SCHEDULE_TOPIC = "SCHEDULE_TOPIC_XXXX";
private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+
+ public static final String SCHEDULE_TOPIC = "SCHEDULE_TOPIC_XXXX";
private static final long FIRST_DELAY_TIME = 1000L;
private static final long DELAY_FOR_A_WHILE = 100L;
private static final long DELAY_FOR_A_PERIOD = 10000L;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/246be9eb/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStats.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStats.java b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStats.java
index 0d2747d..5555b8b 100644
--- a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStats.java
+++ b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStats.java
@@ -23,6 +23,7 @@ import org.slf4j.LoggerFactory;
public class BrokerStats {
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+
private final DefaultMessageStore defaultMessageStore;
private volatile long msgPutTotalYesterdayMorning;