You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by la...@apache.org on 2020/05/09 03:27:24 UTC
[incubator-tubemq] branch master updated: [TUBEMQ-91] replace
explicit type with <> (#71)
This is an automated email from the ASF dual-hosted git repository.
lamberliu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
The following commit(s) were added to refs/heads/master by this push:
new 71df903 [TUBEMQ-91] replace explicit type with <> (#71)
71df903 is described below
commit 71df903e30269bdf22deadc5c59028c1b44b3647
Author: 郭世雄 <37...@users.noreply.github.com>
AuthorDate: Sat May 9 11:27:17 2020 +0800
[TUBEMQ-91] replace explicit type with <> (#71)
Co-authored-by: 郭世雄 <gu...@zmeng123.com>
---
.../client/consumer/BaseMessageConsumer.java | 26 +++++++-------
.../tubemq/client/consumer/ClientSubInfo.java | 8 ++---
.../tubemq/client/consumer/ConsumerResult.java | 2 +-
.../tubemq/client/consumer/FetchContext.java | 2 +-
.../client/consumer/MessageFetchManager.java | 2 +-
.../tubemq/client/consumer/RmtDataCache.java | 42 +++++++++++-----------
.../tubemq/client/consumer/TopicProcessor.java | 2 +-
7 files changed, 42 insertions(+), 42 deletions(-)
diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/BaseMessageConsumer.java b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/BaseMessageConsumer.java
index 7ec0105..ab00fa4 100644
--- a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/BaseMessageConsumer.java
+++ b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/BaseMessageConsumer.java
@@ -87,9 +87,9 @@ public class BaseMessageConsumer implements MessageConsumer {
private final ScheduledExecutorService heartService2Master;
private final Thread rebalanceThread;
private final BlockingQueue<ConsumerEvent> rebalanceEvents =
- new ArrayBlockingQueue<ConsumerEvent>(REBALANCE_QUEUE_SIZE);
+ new ArrayBlockingQueue<>(REBALANCE_QUEUE_SIZE);
private final BlockingQueue<ConsumerEvent> rebalanceResults =
- new ArrayBlockingQueue<ConsumerEvent>(REBALANCE_QUEUE_SIZE);
+ new ArrayBlockingQueue<>(REBALANCE_QUEUE_SIZE);
// flowctrl
private boolean isCurGroupCtrl = false;
private AtomicLong lastCheckTime = new AtomicLong(0);
@@ -102,7 +102,7 @@ public class BaseMessageConsumer implements MessageConsumer {
private final RpcConfig rpcConfig = new RpcConfig();
private AtomicLong visitToken = new AtomicLong(TBaseConstants.META_VALUE_UNDEFINED);
private AtomicReference<String> authAuthorizedTokenRef =
- new AtomicReference<String>("");
+ new AtomicReference<>("");
private ClientAuthenticateHandler authenticateHandler =
new SimpleClientAuthenticateHandler();
private Thread heartBeatThread2Broker;
@@ -615,9 +615,9 @@ public class BaseMessageConsumer implements MessageConsumer {
}
private void disconnectFromBroker(ConsumerEvent event) throws InterruptedException {
- List<String> partKeys = new ArrayList<String>();
+ List<String> partKeys = new ArrayList<>();
HashMap<BrokerInfo, List<Partition>> unRegisterInfoMap =
- new HashMap<BrokerInfo, List<Partition>>();
+ new HashMap<>();
List<SubscribeInfo> subscribeInfoList = event.getSubscribeInfoList();
for (SubscribeInfo info : subscribeInfoList) {
BrokerInfo broker =
@@ -627,7 +627,7 @@ public class BaseMessageConsumer implements MessageConsumer {
List<Partition> unRegisterPartitionList =
unRegisterInfoMap.get(broker);
if (unRegisterPartitionList == null) {
- unRegisterPartitionList = new ArrayList<Partition>();
+ unRegisterPartitionList = new ArrayList<>();
unRegisterInfoMap.put(broker, unRegisterPartitionList);
}
if (!unRegisterPartitionList.contains(partition)) {
@@ -639,7 +639,7 @@ public class BaseMessageConsumer implements MessageConsumer {
return;
}
Map<BrokerInfo, List<PartitionSelectResult>> unNewRegisterInfoMap =
- new HashMap<BrokerInfo, List<PartitionSelectResult>>();
+ new HashMap<>();
try {
if (this.isPullConsume) {
unNewRegisterInfoMap =
@@ -661,14 +661,14 @@ public class BaseMessageConsumer implements MessageConsumer {
private void connect2Broker(ConsumerEvent event) throws InterruptedException {
Map<BrokerInfo, List<Partition>> registerInfoMap =
- new HashMap<BrokerInfo, List<Partition>>();
+ new HashMap<>();
List<SubscribeInfo> subscribeInfoList = event.getSubscribeInfoList();
for (SubscribeInfo info : subscribeInfoList) {
BrokerInfo broker = new BrokerInfo(info.getBrokerId(), info.getHost(), info.getPort());
Partition partition = new Partition(broker, info.getTopic(), info.getPartitionId());
List<Partition> curPartList = registerInfoMap.get(broker);
if (curPartList == null) {
- curPartList = new ArrayList<Partition>();
+ curPartList = new ArrayList<>();
registerInfoMap.put(broker, curPartList);
}
if (!curPartList.contains(partition)) {
@@ -678,7 +678,7 @@ public class BaseMessageConsumer implements MessageConsumer {
if ((isRebalanceStopped()) || (isShutdown())) {
return;
}
- List<Partition> unfinishedPartitions = new ArrayList<Partition>();
+ List<Partition> unfinishedPartitions = new ArrayList<>();
rmtDataCache.filterCachedPartitionInfo(registerInfoMap, unfinishedPartitions);
registerPartitions(registerInfoMap, unfinishedPartitions);
if (this.isFirst.get()) {
@@ -913,7 +913,7 @@ public class BaseMessageConsumer implements MessageConsumer {
private List<String> formatTopicCondInfo(
final ConcurrentHashMap<String, TopicProcessor> topicCondMap) {
final StringBuilder strBuffer = new StringBuilder(512);
- List<String> strTopicCondList = new ArrayList<String>();
+ List<String> strTopicCondList = new ArrayList<>();
if ((topicCondMap != null) && (!topicCondMap.isEmpty())) {
for (Map.Entry<String, TopicProcessor> entry : topicCondMap.entrySet()) {
if (entry.getKey() == null || entry.getValue() == null) {
@@ -1253,7 +1253,7 @@ public class BaseMessageConsumer implements MessageConsumer {
needFilter = true;
}
}
- List<Message> messageList = new ArrayList<Message>();
+ List<Message> messageList = new ArrayList<>();
for (Message message : tmpMessageList) {
if (message == null) {
continue;
@@ -1584,7 +1584,7 @@ public class BaseMessageConsumer implements MessageConsumer {
}
// Send heartbeat request to the broker connect by the client
for (BrokerInfo brokerInfo : rmtDataCache.getAllRegisterBrokers()) {
- List<String> partStrSet = new ArrayList<String>();
+ List<String> partStrSet = new ArrayList<>();
try {
// Handle the heartbeat response for partitions belong to the same broker.
List<Partition> partitions =
diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/ClientSubInfo.java b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/ClientSubInfo.java
index bc4dae7..c1c7017 100644
--- a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/ClientSubInfo.java
+++ b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/ClientSubInfo.java
@@ -28,7 +28,7 @@ import org.apache.tubemq.corebase.TokenConstants;
public class ClientSubInfo {
private final ConcurrentHashMap<String/* topic */, TopicProcessor> topicCondRegistry =
- new ConcurrentHashMap<String, TopicProcessor>();
+ new ConcurrentHashMap<>();
private boolean requireBound = false;
private AtomicBoolean isNotAllocated =
new AtomicBoolean(true);
@@ -37,9 +37,9 @@ public class ClientSubInfo {
private long subscribedTime;
private boolean isSelectBig = true;
private String requiredPartition = "";
- private Set<String> subscribedTopics = new HashSet<String>();
- private Map<String, Long> assignedPartMap = new HashMap<String, Long>();
- private Map<String, Boolean> topicFilterMap = new HashMap<String, Boolean>();
+ private Set<String> subscribedTopics = new HashSet<>();
+ private Map<String, Long> assignedPartMap = new HashMap<>();
+ private Map<String, Boolean> topicFilterMap = new HashMap<>();
public ClientSubInfo() {
diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/ConsumerResult.java b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/ConsumerResult.java
index e36e609..1f4d004 100644
--- a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/ConsumerResult.java
+++ b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/ConsumerResult.java
@@ -32,7 +32,7 @@ public class ConsumerResult {
private String topicName = "";
private PeerInfo peerInfo = new PeerInfo();
private String confirmContext = "";
- private List<Message> messageList = new ArrayList<Message>();
+ private List<Message> messageList = new ArrayList<>();
public ConsumerResult(int errCode, String errMsg) {
diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/FetchContext.java b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/FetchContext.java
index 8a29fcc..f5948c5 100644
--- a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/FetchContext.java
+++ b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/FetchContext.java
@@ -34,7 +34,7 @@ public class FetchContext {
private String errMsg = "";
private long currOffset = TBaseConstants.META_VALUE_UNDEFINED;
private String confirmContext = "";
- private List<Message> messageList = new ArrayList<Message>();
+ private List<Message> messageList = new ArrayList<>();
public FetchContext(PartitionSelectResult selectResult) {
this.partition = selectResult.getPartition();
diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/MessageFetchManager.java b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/MessageFetchManager.java
index 6212bc3..6afb8a9 100644
--- a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/MessageFetchManager.java
+++ b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/MessageFetchManager.java
@@ -34,7 +34,7 @@ public class MessageFetchManager {
private static final Logger logger =
LoggerFactory.getLogger(MessageFetchManager.class);
private final ConcurrentHashMap<Long, Integer> fetchWorkerStatusMap =
- new ConcurrentHashMap<Long, Integer>();
+ new ConcurrentHashMap<>();
private final ConsumerConfig consumerConfig;
private final SimplePushMessageConsumer pushConsumer;
// Manager status:
diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/RmtDataCache.java b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/RmtDataCache.java
index ac643af..17a8a37 100644
--- a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/RmtDataCache.java
+++ b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/RmtDataCache.java
@@ -56,21 +56,21 @@ public class RmtDataCache implements Closeable {
private final FlowCtrlRuleHandler defFlowCtrlRuleHandler;
private final AtomicInteger waitCont = new AtomicInteger(0);
private final ConcurrentHashMap<String, Timeout> timeouts =
- new ConcurrentHashMap<String, Timeout>();
+ new ConcurrentHashMap<>();
private final BlockingQueue<String> indexPartition =
- new LinkedBlockingQueue<String>();
+ new LinkedBlockingQueue<>();
private final ConcurrentHashMap<String /* index */, PartitionExt> partitionMap =
- new ConcurrentHashMap<String, PartitionExt>();
+ new ConcurrentHashMap<>();
private final ConcurrentHashMap<String /* index */, Long> partitionUsedMap =
- new ConcurrentHashMap<String, Long>();
+ new ConcurrentHashMap<>();
private final ConcurrentHashMap<String /* index */, Long> partitionOffsetMap =
- new ConcurrentHashMap<String, Long>();
+ new ConcurrentHashMap<>();
private final ConcurrentHashMap<String /* topic */, ConcurrentLinkedQueue<Partition>> topicPartitionConMap =
- new ConcurrentHashMap<String, ConcurrentLinkedQueue<Partition>>();
+ new ConcurrentHashMap<>();
private final ConcurrentHashMap<BrokerInfo/* broker */, ConcurrentLinkedQueue<Partition>> brokerPartitionConMap =
- new ConcurrentHashMap<BrokerInfo, ConcurrentLinkedQueue<Partition>>();
+ new ConcurrentHashMap<>();
private final ConcurrentHashMap<String/* partitionKey */, Integer> partRegisterBookMap =
- new ConcurrentHashMap<String/* partitionKey */, Integer>();
+ new ConcurrentHashMap<>();
private AtomicBoolean isClosed = new AtomicBoolean(false);
private CountDownLatch dataProcessSync = new CountDownLatch(0);
@@ -90,7 +90,7 @@ public class RmtDataCache implements Closeable {
}
this.defFlowCtrlRuleHandler = defFlowCtrlRuleHandler;
this.groupFlowCtrlRuleHandler = groupFlowCtrlRuleHandler;
- Map<Partition, Long> tmpPartOffsetMap = new HashMap<Partition, Long>();
+ Map<Partition, Long> tmpPartOffsetMap = new HashMap<>();
if (partitionList != null) {
for (Partition partition : partitionList) {
tmpPartOffsetMap.put(partition, -1L);
@@ -294,7 +294,7 @@ public class RmtDataCache implements Closeable {
if (partition == null) {
return;
}
- Map<Partition, Long> tmpPartOffsetMap = new HashMap<Partition, Long>();
+ Map<Partition, Long> tmpPartOffsetMap = new HashMap<>();
tmpPartOffsetMap.put(partition, currOffset);
addPartitionsInfo(tmpPartOffsetMap);
}
@@ -429,7 +429,7 @@ public class RmtDataCache implements Closeable {
* @return subscribe information list
*/
public List<SubscribeInfo> getSubscribeInfoList(String consumerId, String consumeGroup) {
- List<SubscribeInfo> subscribeInfoList = new ArrayList<SubscribeInfo>();
+ List<SubscribeInfo> subscribeInfoList = new ArrayList<>();
for (Partition partition : partitionMap.values()) {
if (partition != null) {
subscribeInfoList.add(new SubscribeInfo(consumerId, consumeGroup, partition));
@@ -444,7 +444,7 @@ public class RmtDataCache implements Closeable {
boolean isWaitTimeoutRollBack) {
StringBuilder sBuilder = new StringBuilder(512);
HashMap<BrokerInfo, List<PartitionSelectResult>> unNewRegisterInfoMap =
- new HashMap<BrokerInfo, List<PartitionSelectResult>>();
+ new HashMap<>();
pauseProcess();
try {
waitPartitions(partitionKeys, inUseWaitPeriodMs);
@@ -491,7 +491,7 @@ public class RmtDataCache implements Closeable {
List<PartitionSelectResult> targetPartitonList =
unNewRegisterInfoMap.get(entry.getKey());
if (targetPartitonList == null) {
- targetPartitonList = new ArrayList<PartitionSelectResult>();
+ targetPartitonList = new ArrayList<>();
unNewRegisterInfoMap.put(entry.getKey(), targetPartitonList);
}
targetPartitonList.add(partitionRet);
@@ -540,7 +540,7 @@ public class RmtDataCache implements Closeable {
*/
public Map<String, ConsumeOffsetInfo> getCurPartitionInfoMap() {
Map<String, ConsumeOffsetInfo> tmpPartitionMap =
- new ConcurrentHashMap<String, ConsumeOffsetInfo>();
+ new ConcurrentHashMap<>();
for (Map.Entry<String, PartitionExt> entry : partitionMap.entrySet()) {
if (entry.getKey() == null || entry.getValue() == null) {
continue;
@@ -553,12 +553,12 @@ public class RmtDataCache implements Closeable {
public Map<BrokerInfo, List<PartitionSelectResult>> getAllPartitionListWithStatus() {
Map<BrokerInfo, List<PartitionSelectResult>> registeredInfoMap =
- new HashMap<BrokerInfo, List<PartitionSelectResult>>();
+ new HashMap<>();
for (PartitionExt partitionExt : partitionMap.values()) {
List<PartitionSelectResult> registerPartitionList =
registeredInfoMap.get(partitionExt.getBroker());
if (registerPartitionList == null) {
- registerPartitionList = new ArrayList<PartitionSelectResult>();
+ registerPartitionList = new ArrayList<>();
registeredInfoMap.put(partitionExt.getBroker(), registerPartitionList);
}
registerPartitionList.add(new PartitionSelectResult(true,
@@ -584,7 +584,7 @@ public class RmtDataCache implements Closeable {
* @return partition list
*/
public List<Partition> getBrokerPartitionList(BrokerInfo brokerInfo) {
- List<Partition> retPartition = new ArrayList<Partition>();
+ List<Partition> retPartition = new ArrayList<>();
ConcurrentLinkedQueue<Partition> partitionList =
brokerPartitionConMap.get(brokerInfo);
if (partitionList != null) {
@@ -595,7 +595,7 @@ public class RmtDataCache implements Closeable {
public void filterCachedPartitionInfo(Map<BrokerInfo, List<Partition>> registerInfoMap,
List<Partition> unRegPartitionList) {
- List<BrokerInfo> brokerInfoList = new ArrayList<BrokerInfo>();
+ List<BrokerInfo> brokerInfoList = new ArrayList<>();
for (Map.Entry<BrokerInfo, List<Partition>> entry : registerInfoMap.entrySet()) {
if (entry.getKey() == null || entry.getValue() == null) {
continue;
@@ -628,7 +628,7 @@ public class RmtDataCache implements Closeable {
public void resumeTimeoutConsumePartitions(long allowedPeriodTimes) {
if (!partitionUsedMap.isEmpty()) {
- List<String> partKeys = new ArrayList<String>();
+ List<String> partKeys = new ArrayList<>();
partKeys.addAll(partitionUsedMap.keySet());
for (String keyId : partKeys) {
Long oldTime = partitionUsedMap.get(keyId);
@@ -689,7 +689,7 @@ public class RmtDataCache implements Closeable {
ConcurrentLinkedQueue<Partition> topicPartitionQue =
topicPartitionConMap.get(partition.getTopic());
if (topicPartitionQue == null) {
- topicPartitionQue = new ConcurrentLinkedQueue<Partition>();
+ topicPartitionQue = new ConcurrentLinkedQueue<>();
ConcurrentLinkedQueue<Partition> tmpTopicPartitionQue =
topicPartitionConMap.putIfAbsent(partition.getTopic(), topicPartitionQue);
if (tmpTopicPartitionQue != null) {
@@ -702,7 +702,7 @@ public class RmtDataCache implements Closeable {
ConcurrentLinkedQueue<Partition> brokerPartitionQue =
brokerPartitionConMap.get(partition.getBroker());
if (brokerPartitionQue == null) {
- brokerPartitionQue = new ConcurrentLinkedQueue<Partition>();
+ brokerPartitionQue = new ConcurrentLinkedQueue<>();
ConcurrentLinkedQueue<Partition> tmpBrokerPartQues =
brokerPartitionConMap.putIfAbsent(partition.getBroker(), brokerPartitionQue);
if (tmpBrokerPartQues != null) {
diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/TopicProcessor.java b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/TopicProcessor.java
index e18eaaf..d85316e 100644
--- a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/TopicProcessor.java
+++ b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/TopicProcessor.java
@@ -26,7 +26,7 @@ import java.util.TreeSet;
public class TopicProcessor {
private MessageListener messageListener;
private TreeSet<String> filterCondStrs;
- private List<Integer> filterCondCodes = new ArrayList<Integer>();
+ private List<Integer> filterCondCodes = new ArrayList<>();
public TopicProcessor(final MessageListener messageListener,
final TreeSet<String> filterConds) {