You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2022/05/19 06:25:00 UTC
[rocketmq] branch develop updated: [ISSUE #4327] Init collection size (#4214)
This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 455ac6d5a [ISSUE #4327] Init collection size (#4214)
455ac6d5a is described below
commit 455ac6d5ab411642beedef21b664268caa0e5b0d
Author: Oliver <wq...@163.com>
AuthorDate: Thu May 19 14:24:44 2022 +0800
[ISSUE #4327] Init collection size (#4214)
[ISSUE #4327] Init collection size
---
.../org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java | 2 +-
.../apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java | 2 +-
.../rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java | 2 +-
.../main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java | 2 +-
.../rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java | 5 +++--
.../org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java | 2 +-
.../org/apache/rocketmq/client/impl/factory/MQClientInstance.java | 6 +++---
.../java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java | 1 +
8 files changed, 12 insertions(+), 10 deletions(-)
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
index 7fb6dc099..8e1c8d15a 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
@@ -640,7 +640,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
*/
@Deprecated
public void setSubscription(Map<String, String> subscription) {
- Map<String, String> subscriptionWithNamespace = new HashMap<String, String>();
+ Map<String, String> subscriptionWithNamespace = new HashMap<String, String>(subscription.size(), 1);
for (Entry<String, String> topicEntry : subscription.entrySet()) {
subscriptionWithNamespace.put(withNamespace(topicEntry.getKey()), topicEntry.getValue());
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java
index d380ba058..f949b75a8 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java
@@ -169,7 +169,7 @@ public class LocalFileOffsetStore implements OffsetStore {
@Override
public Map<MessageQueue, Long> cloneOffsetTable(String topic) {
- Map<MessageQueue, Long> cloneOffsetTable = new HashMap<MessageQueue, Long>();
+ Map<MessageQueue, Long> cloneOffsetTable = new HashMap<MessageQueue, Long>(this.offsetTable.size(), 1);
for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {
MessageQueue mq = entry.getKey();
if (!UtilAll.isBlank(topic) && !topic.equals(mq.getTopic())) {
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
index 15b5becfd..409ceab95 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
@@ -174,7 +174,7 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
@Override
public Map<MessageQueue, Long> cloneOffsetTable(String topic) {
- Map<MessageQueue, Long> cloneOffsetTable = new HashMap<MessageQueue, Long>();
+ Map<MessageQueue, Long> cloneOffsetTable = new HashMap<MessageQueue, Long>(this.offsetTable.size(), 1);
for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {
MessageQueue mq = entry.getKey();
if (!UtilAll.isBlank(topic) && !topic.equals(mq.getTopic())) {
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index be70d9f5f..f2d01897c 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -388,7 +388,7 @@ public class MQClientAPIImpl {
clusterAclVersionInfo.setBrokerAddr(responseHeader.getBrokerAddr());
clusterAclVersionInfo.setAclConfigDataVersion(DataVersion.fromJson(responseHeader.getVersion(), DataVersion.class));
HashMap<String, Object> dataVersionMap = JSON.parseObject(responseHeader.getAllAclFileVersion(), HashMap.class);
- Map<String, DataVersion> allAclConfigDataVersion = new HashMap<String, DataVersion>();
+ Map<String, DataVersion> allAclConfigDataVersion = new HashMap<String, DataVersion>(dataVersionMap.size(), 1);
for (Map.Entry<String, Object> entry : dataVersionMap.entrySet()) {
allAclConfigDataVersion.put(entry.getKey(),DataVersion.fromJson(JSON.toJSONString(entry.getValue()), DataVersion.class));
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index a338f7b68..31cd64c01 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -28,6 +28,7 @@ import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.Validators;
@@ -962,8 +963,8 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
for (String topic : rebalanceImpl.getSubscriptionInner().keySet()) {
Set<MessageQueue> mqs = rebalanceImpl.getTopicSubscribeInfoTable().get(topic);
- Map<MessageQueue, Long> offsetTable = new HashMap<MessageQueue, Long>();
- if (mqs != null) {
+ if (CollectionUtils.isNotEmpty(mqs)) {
+ Map<MessageQueue, Long> offsetTable = new HashMap<MessageQueue, Long>(mqs.size(), 1);
for (MessageQueue mq : mqs) {
long offset = searchOffset(mq, timeStamp);
offsetTable.put(mq, offset);
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
index 7677d8b68..f239d7946 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
@@ -117,7 +117,7 @@ public abstract class RebalanceImpl {
}
private HashMap<String/* brokerName */, Set<MessageQueue>> buildProcessQueueTableByBrokerName() {
- HashMap<String, Set<MessageQueue>> result = new HashMap<String, Set<MessageQueue>>();
+ HashMap<String, Set<MessageQueue>> result = new HashMap<String, Set<MessageQueue>>(this.processQueueTable.size(), 1);
for (MessageQueue mq : this.processQueueTable.keySet()) {
Set<MessageQueue> mqs = result.get(mq.getBrokerName());
if (null == mqs) {
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index d5b90979e..1ba3e32f3 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -366,7 +366,7 @@ public class MQClientInstance {
* @return newOffsetTable
*/
public Map<MessageQueue, Long> parseOffsetTableFromBroker(Map<MessageQueue, Long> offsetTable, String namespace) {
- HashMap<MessageQueue, Long> newOffsetTable = new HashMap<MessageQueue, Long>();
+ HashMap<MessageQueue, Long> newOffsetTable = new HashMap<MessageQueue, Long>(offsetTable.size(), 1);
if (StringUtils.isNotEmpty(namespace)) {
for (Entry<MessageQueue, Long> entry : offsetTable.entrySet()) {
MessageQueue queue = entry.getKey();
@@ -387,7 +387,7 @@ public class MQClientInstance {
try {
if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS))
try {
- ConcurrentHashMap<String, HashMap<Long, String>> updatedTable = new ConcurrentHashMap<String, HashMap<Long, String>>();
+ ConcurrentHashMap<String, HashMap<Long, String>> updatedTable = new ConcurrentHashMap<String, HashMap<Long, String>>(this.brokerAddrTable.size(), 1);
Iterator<Entry<String, HashMap<Long, String>>> itBrokerTable = this.brokerAddrTable.entrySet().iterator();
while (itBrokerTable.hasNext()) {
@@ -395,7 +395,7 @@ public class MQClientInstance {
String brokerName = entry.getKey();
HashMap<Long, String> oneTable = entry.getValue();
- HashMap<Long, String> cloneAddrTable = new HashMap<Long, String>();
+ HashMap<Long, String> cloneAddrTable = new HashMap<Long, String>(oneTable.size(), 1);
cloneAddrTable.putAll(oneTable);
Iterator<Entry<Long, String>> it = cloneAddrTable.entrySet().iterator();
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
index 86153f526..7652ee0e5 100644
--- a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
@@ -330,6 +330,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
traceExecutor.submit(asyncDataSendTask);
this.clear();
+
}
}