You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by la...@apache.org on 2020/05/09 03:27:24 UTC

[incubator-tubemq] branch master updated: [TUBEMQ-91] replace explicit type with <> (#71)

This is an automated email from the ASF dual-hosted git repository.

lamberliu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git


The following commit(s) were added to refs/heads/master by this push:
     new 71df903  [TUBEMQ-91] replace explicit type with <> (#71)
71df903 is described below

commit 71df903e30269bdf22deadc5c59028c1b44b3647
Author: 郭世雄 <37...@users.noreply.github.com>
AuthorDate: Sat May 9 11:27:17 2020 +0800

    [TUBEMQ-91] replace explicit type with <> (#71)
    
    Co-authored-by: 郭世雄 <gu...@zmeng123.com>
---
 .../client/consumer/BaseMessageConsumer.java       | 26 +++++++-------
 .../tubemq/client/consumer/ClientSubInfo.java      |  8 ++---
 .../tubemq/client/consumer/ConsumerResult.java     |  2 +-
 .../tubemq/client/consumer/FetchContext.java       |  2 +-
 .../client/consumer/MessageFetchManager.java       |  2 +-
 .../tubemq/client/consumer/RmtDataCache.java       | 42 +++++++++++-----------
 .../tubemq/client/consumer/TopicProcessor.java     |  2 +-
 7 files changed, 42 insertions(+), 42 deletions(-)

diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/BaseMessageConsumer.java b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/BaseMessageConsumer.java
index 7ec0105..ab00fa4 100644
--- a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/BaseMessageConsumer.java
+++ b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/BaseMessageConsumer.java
@@ -87,9 +87,9 @@ public class BaseMessageConsumer implements MessageConsumer {
     private final ScheduledExecutorService heartService2Master;
     private final Thread rebalanceThread;
     private final BlockingQueue<ConsumerEvent> rebalanceEvents =
-            new ArrayBlockingQueue<ConsumerEvent>(REBALANCE_QUEUE_SIZE);
+            new ArrayBlockingQueue<>(REBALANCE_QUEUE_SIZE);
     private final BlockingQueue<ConsumerEvent> rebalanceResults =
-            new ArrayBlockingQueue<ConsumerEvent>(REBALANCE_QUEUE_SIZE);
+            new ArrayBlockingQueue<>(REBALANCE_QUEUE_SIZE);
     // flowctrl
     private boolean isCurGroupCtrl = false;
     private AtomicLong lastCheckTime = new AtomicLong(0);
@@ -102,7 +102,7 @@ public class BaseMessageConsumer implements MessageConsumer {
     private final RpcConfig rpcConfig = new RpcConfig();
     private AtomicLong visitToken = new AtomicLong(TBaseConstants.META_VALUE_UNDEFINED);
     private AtomicReference<String> authAuthorizedTokenRef =
-            new AtomicReference<String>("");
+            new AtomicReference<>("");
     private ClientAuthenticateHandler authenticateHandler =
             new SimpleClientAuthenticateHandler();
     private Thread heartBeatThread2Broker;
@@ -615,9 +615,9 @@ public class BaseMessageConsumer implements MessageConsumer {
     }
 
     private void disconnectFromBroker(ConsumerEvent event) throws InterruptedException {
-        List<String> partKeys = new ArrayList<String>();
+        List<String> partKeys = new ArrayList<>();
         HashMap<BrokerInfo, List<Partition>> unRegisterInfoMap =
-                new HashMap<BrokerInfo, List<Partition>>();
+                new HashMap<>();
         List<SubscribeInfo> subscribeInfoList = event.getSubscribeInfoList();
         for (SubscribeInfo info : subscribeInfoList) {
             BrokerInfo broker =
@@ -627,7 +627,7 @@ public class BaseMessageConsumer implements MessageConsumer {
             List<Partition> unRegisterPartitionList =
                     unRegisterInfoMap.get(broker);
             if (unRegisterPartitionList == null) {
-                unRegisterPartitionList = new ArrayList<Partition>();
+                unRegisterPartitionList = new ArrayList<>();
                 unRegisterInfoMap.put(broker, unRegisterPartitionList);
             }
             if (!unRegisterPartitionList.contains(partition)) {
@@ -639,7 +639,7 @@ public class BaseMessageConsumer implements MessageConsumer {
             return;
         }
         Map<BrokerInfo, List<PartitionSelectResult>> unNewRegisterInfoMap =
-                new HashMap<BrokerInfo, List<PartitionSelectResult>>();
+                new HashMap<>();
         try {
             if (this.isPullConsume) {
                 unNewRegisterInfoMap =
@@ -661,14 +661,14 @@ public class BaseMessageConsumer implements MessageConsumer {
 
     private void connect2Broker(ConsumerEvent event) throws InterruptedException {
         Map<BrokerInfo, List<Partition>> registerInfoMap =
-                new HashMap<BrokerInfo, List<Partition>>();
+                new HashMap<>();
         List<SubscribeInfo> subscribeInfoList = event.getSubscribeInfoList();
         for (SubscribeInfo info : subscribeInfoList) {
             BrokerInfo broker = new BrokerInfo(info.getBrokerId(), info.getHost(), info.getPort());
             Partition partition = new Partition(broker, info.getTopic(), info.getPartitionId());
             List<Partition> curPartList = registerInfoMap.get(broker);
             if (curPartList == null) {
-                curPartList = new ArrayList<Partition>();
+                curPartList = new ArrayList<>();
                 registerInfoMap.put(broker, curPartList);
             }
             if (!curPartList.contains(partition)) {
@@ -678,7 +678,7 @@ public class BaseMessageConsumer implements MessageConsumer {
         if ((isRebalanceStopped()) || (isShutdown())) {
             return;
         }
-        List<Partition> unfinishedPartitions = new ArrayList<Partition>();
+        List<Partition> unfinishedPartitions = new ArrayList<>();
         rmtDataCache.filterCachedPartitionInfo(registerInfoMap, unfinishedPartitions);
         registerPartitions(registerInfoMap, unfinishedPartitions);
         if (this.isFirst.get()) {
@@ -913,7 +913,7 @@ public class BaseMessageConsumer implements MessageConsumer {
     private List<String> formatTopicCondInfo(
             final ConcurrentHashMap<String, TopicProcessor> topicCondMap) {
         final StringBuilder strBuffer = new StringBuilder(512);
-        List<String> strTopicCondList = new ArrayList<String>();
+        List<String> strTopicCondList = new ArrayList<>();
         if ((topicCondMap != null) && (!topicCondMap.isEmpty())) {
             for (Map.Entry<String, TopicProcessor> entry : topicCondMap.entrySet()) {
                 if (entry.getKey() == null || entry.getValue() == null) {
@@ -1253,7 +1253,7 @@ public class BaseMessageConsumer implements MessageConsumer {
                             needFilter = true;
                         }
                     }
-                    List<Message> messageList = new ArrayList<Message>();
+                    List<Message> messageList = new ArrayList<>();
                     for (Message message : tmpMessageList) {
                         if (message == null) {
                             continue;
@@ -1584,7 +1584,7 @@ public class BaseMessageConsumer implements MessageConsumer {
                     }
                     // Send heartbeat request to the broker connect by the client
                     for (BrokerInfo brokerInfo : rmtDataCache.getAllRegisterBrokers()) {
-                        List<String> partStrSet = new ArrayList<String>();
+                        List<String> partStrSet = new ArrayList<>();
                         try {
                             // Handle the heartbeat response for partitions belong to the same broker.
                             List<Partition> partitions =
diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/ClientSubInfo.java b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/ClientSubInfo.java
index bc4dae7..c1c7017 100644
--- a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/ClientSubInfo.java
+++ b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/ClientSubInfo.java
@@ -28,7 +28,7 @@ import org.apache.tubemq.corebase.TokenConstants;
 
 public class ClientSubInfo {
     private final ConcurrentHashMap<String/* topic */, TopicProcessor> topicCondRegistry =
-            new ConcurrentHashMap<String, TopicProcessor>();
+            new ConcurrentHashMap<>();
     private boolean requireBound = false;
     private AtomicBoolean isNotAllocated =
             new AtomicBoolean(true);
@@ -37,9 +37,9 @@ public class ClientSubInfo {
     private long subscribedTime;
     private boolean isSelectBig = true;
     private String requiredPartition = "";
-    private Set<String> subscribedTopics = new HashSet<String>();
-    private Map<String, Long> assignedPartMap = new HashMap<String, Long>();
-    private Map<String, Boolean> topicFilterMap = new HashMap<String, Boolean>();
+    private Set<String> subscribedTopics = new HashSet<>();
+    private Map<String, Long> assignedPartMap = new HashMap<>();
+    private Map<String, Boolean> topicFilterMap = new HashMap<>();
 
     public ClientSubInfo() {
 
diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/ConsumerResult.java b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/ConsumerResult.java
index e36e609..1f4d004 100644
--- a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/ConsumerResult.java
+++ b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/ConsumerResult.java
@@ -32,7 +32,7 @@ public class ConsumerResult {
     private String topicName = "";
     private PeerInfo peerInfo = new PeerInfo();
     private String confirmContext = "";
-    private List<Message> messageList = new ArrayList<Message>();
+    private List<Message> messageList = new ArrayList<>();
 
 
     public ConsumerResult(int errCode, String errMsg) {
diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/FetchContext.java b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/FetchContext.java
index 8a29fcc..f5948c5 100644
--- a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/FetchContext.java
+++ b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/FetchContext.java
@@ -34,7 +34,7 @@ public class FetchContext {
     private String errMsg = "";
     private long currOffset = TBaseConstants.META_VALUE_UNDEFINED;
     private String confirmContext = "";
-    private List<Message> messageList = new ArrayList<Message>();
+    private List<Message> messageList = new ArrayList<>();
 
     public FetchContext(PartitionSelectResult selectResult) {
         this.partition = selectResult.getPartition();
diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/MessageFetchManager.java b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/MessageFetchManager.java
index 6212bc3..6afb8a9 100644
--- a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/MessageFetchManager.java
+++ b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/MessageFetchManager.java
@@ -34,7 +34,7 @@ public class MessageFetchManager {
     private static final Logger logger =
             LoggerFactory.getLogger(MessageFetchManager.class);
     private final ConcurrentHashMap<Long, Integer> fetchWorkerStatusMap =
-            new ConcurrentHashMap<Long, Integer>();
+            new ConcurrentHashMap<>();
     private final ConsumerConfig consumerConfig;
     private final SimplePushMessageConsumer pushConsumer;
     // Manager status:
diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/RmtDataCache.java b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/RmtDataCache.java
index ac643af..17a8a37 100644
--- a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/RmtDataCache.java
+++ b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/RmtDataCache.java
@@ -56,21 +56,21 @@ public class RmtDataCache implements Closeable {
     private final FlowCtrlRuleHandler defFlowCtrlRuleHandler;
     private final AtomicInteger waitCont = new AtomicInteger(0);
     private final ConcurrentHashMap<String, Timeout> timeouts =
-            new ConcurrentHashMap<String, Timeout>();
+            new ConcurrentHashMap<>();
     private final BlockingQueue<String> indexPartition =
-            new LinkedBlockingQueue<String>();
+            new LinkedBlockingQueue<>();
     private final ConcurrentHashMap<String /* index */, PartitionExt> partitionMap =
-            new ConcurrentHashMap<String, PartitionExt>();
+            new ConcurrentHashMap<>();
     private final ConcurrentHashMap<String /* index */, Long> partitionUsedMap =
-            new ConcurrentHashMap<String, Long>();
+            new ConcurrentHashMap<>();
     private final ConcurrentHashMap<String /* index */, Long> partitionOffsetMap =
-            new ConcurrentHashMap<String, Long>();
+            new ConcurrentHashMap<>();
     private final ConcurrentHashMap<String /* topic */, ConcurrentLinkedQueue<Partition>> topicPartitionConMap =
-            new ConcurrentHashMap<String, ConcurrentLinkedQueue<Partition>>();
+            new ConcurrentHashMap<>();
     private final ConcurrentHashMap<BrokerInfo/* broker */, ConcurrentLinkedQueue<Partition>> brokerPartitionConMap =
-            new ConcurrentHashMap<BrokerInfo, ConcurrentLinkedQueue<Partition>>();
+            new ConcurrentHashMap<>();
     private final ConcurrentHashMap<String/* partitionKey */, Integer> partRegisterBookMap =
-            new ConcurrentHashMap<String/* partitionKey */, Integer>();
+            new ConcurrentHashMap<>();
     private AtomicBoolean isClosed = new AtomicBoolean(false);
     private CountDownLatch dataProcessSync = new CountDownLatch(0);
 
@@ -90,7 +90,7 @@ public class RmtDataCache implements Closeable {
         }
         this.defFlowCtrlRuleHandler = defFlowCtrlRuleHandler;
         this.groupFlowCtrlRuleHandler = groupFlowCtrlRuleHandler;
-        Map<Partition, Long> tmpPartOffsetMap = new HashMap<Partition, Long>();
+        Map<Partition, Long> tmpPartOffsetMap = new HashMap<>();
         if (partitionList != null) {
             for (Partition partition : partitionList) {
                 tmpPartOffsetMap.put(partition, -1L);
@@ -294,7 +294,7 @@ public class RmtDataCache implements Closeable {
         if (partition == null) {
             return;
         }
-        Map<Partition, Long> tmpPartOffsetMap = new HashMap<Partition, Long>();
+        Map<Partition, Long> tmpPartOffsetMap = new HashMap<>();
         tmpPartOffsetMap.put(partition, currOffset);
         addPartitionsInfo(tmpPartOffsetMap);
     }
@@ -429,7 +429,7 @@ public class RmtDataCache implements Closeable {
      * @return subscribe information list
      */
     public List<SubscribeInfo> getSubscribeInfoList(String consumerId, String consumeGroup) {
-        List<SubscribeInfo> subscribeInfoList = new ArrayList<SubscribeInfo>();
+        List<SubscribeInfo> subscribeInfoList = new ArrayList<>();
         for (Partition partition : partitionMap.values()) {
             if (partition != null) {
                 subscribeInfoList.add(new SubscribeInfo(consumerId, consumeGroup, partition));
@@ -444,7 +444,7 @@ public class RmtDataCache implements Closeable {
             boolean isWaitTimeoutRollBack) {
         StringBuilder sBuilder = new StringBuilder(512);
         HashMap<BrokerInfo, List<PartitionSelectResult>> unNewRegisterInfoMap =
-                new HashMap<BrokerInfo, List<PartitionSelectResult>>();
+                new HashMap<>();
         pauseProcess();
         try {
             waitPartitions(partitionKeys, inUseWaitPeriodMs);
@@ -491,7 +491,7 @@ public class RmtDataCache implements Closeable {
                         List<PartitionSelectResult> targetPartitonList =
                                 unNewRegisterInfoMap.get(entry.getKey());
                         if (targetPartitonList == null) {
-                            targetPartitonList = new ArrayList<PartitionSelectResult>();
+                            targetPartitonList = new ArrayList<>();
                             unNewRegisterInfoMap.put(entry.getKey(), targetPartitonList);
                         }
                         targetPartitonList.add(partitionRet);
@@ -540,7 +540,7 @@ public class RmtDataCache implements Closeable {
      */
     public Map<String, ConsumeOffsetInfo> getCurPartitionInfoMap() {
         Map<String, ConsumeOffsetInfo> tmpPartitionMap =
-                new ConcurrentHashMap<String, ConsumeOffsetInfo>();
+                new ConcurrentHashMap<>();
         for (Map.Entry<String, PartitionExt> entry : partitionMap.entrySet()) {
             if (entry.getKey() == null || entry.getValue() == null) {
                 continue;
@@ -553,12 +553,12 @@ public class RmtDataCache implements Closeable {
 
     public Map<BrokerInfo, List<PartitionSelectResult>> getAllPartitionListWithStatus() {
         Map<BrokerInfo, List<PartitionSelectResult>> registeredInfoMap =
-                new HashMap<BrokerInfo, List<PartitionSelectResult>>();
+                new HashMap<>();
         for (PartitionExt partitionExt : partitionMap.values()) {
             List<PartitionSelectResult> registerPartitionList =
                     registeredInfoMap.get(partitionExt.getBroker());
             if (registerPartitionList == null) {
-                registerPartitionList = new ArrayList<PartitionSelectResult>();
+                registerPartitionList = new ArrayList<>();
                 registeredInfoMap.put(partitionExt.getBroker(), registerPartitionList);
             }
             registerPartitionList.add(new PartitionSelectResult(true,
@@ -584,7 +584,7 @@ public class RmtDataCache implements Closeable {
      * @return partition list
      */
     public List<Partition> getBrokerPartitionList(BrokerInfo brokerInfo) {
-        List<Partition> retPartition = new ArrayList<Partition>();
+        List<Partition> retPartition = new ArrayList<>();
         ConcurrentLinkedQueue<Partition> partitionList =
                 brokerPartitionConMap.get(brokerInfo);
         if (partitionList != null) {
@@ -595,7 +595,7 @@ public class RmtDataCache implements Closeable {
 
     public void filterCachedPartitionInfo(Map<BrokerInfo, List<Partition>> registerInfoMap,
                                           List<Partition> unRegPartitionList) {
-        List<BrokerInfo> brokerInfoList = new ArrayList<BrokerInfo>();
+        List<BrokerInfo> brokerInfoList = new ArrayList<>();
         for (Map.Entry<BrokerInfo, List<Partition>> entry : registerInfoMap.entrySet()) {
             if (entry.getKey() == null || entry.getValue() == null) {
                 continue;
@@ -628,7 +628,7 @@ public class RmtDataCache implements Closeable {
 
     public void resumeTimeoutConsumePartitions(long allowedPeriodTimes) {
         if (!partitionUsedMap.isEmpty()) {
-            List<String> partKeys = new ArrayList<String>();
+            List<String> partKeys = new ArrayList<>();
             partKeys.addAll(partitionUsedMap.keySet());
             for (String keyId : partKeys) {
                 Long oldTime = partitionUsedMap.get(keyId);
@@ -689,7 +689,7 @@ public class RmtDataCache implements Closeable {
             ConcurrentLinkedQueue<Partition> topicPartitionQue =
                     topicPartitionConMap.get(partition.getTopic());
             if (topicPartitionQue == null) {
-                topicPartitionQue = new ConcurrentLinkedQueue<Partition>();
+                topicPartitionQue = new ConcurrentLinkedQueue<>();
                 ConcurrentLinkedQueue<Partition> tmpTopicPartitionQue =
                         topicPartitionConMap.putIfAbsent(partition.getTopic(), topicPartitionQue);
                 if (tmpTopicPartitionQue != null) {
@@ -702,7 +702,7 @@ public class RmtDataCache implements Closeable {
             ConcurrentLinkedQueue<Partition> brokerPartitionQue =
                     brokerPartitionConMap.get(partition.getBroker());
             if (brokerPartitionQue == null) {
-                brokerPartitionQue = new ConcurrentLinkedQueue<Partition>();
+                brokerPartitionQue = new ConcurrentLinkedQueue<>();
                 ConcurrentLinkedQueue<Partition> tmpBrokerPartQues =
                         brokerPartitionConMap.putIfAbsent(partition.getBroker(), brokerPartitionQue);
                 if (tmpBrokerPartQues != null) {
diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/TopicProcessor.java b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/TopicProcessor.java
index e18eaaf..d85316e 100644
--- a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/TopicProcessor.java
+++ b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/TopicProcessor.java
@@ -26,7 +26,7 @@ import java.util.TreeSet;
 public class TopicProcessor {
     private MessageListener messageListener;
     private TreeSet<String> filterCondStrs;
-    private List<Integer> filterCondCodes = new ArrayList<Integer>();
+    private List<Integer> filterCondCodes = new ArrayList<>();
 
     public TopicProcessor(final MessageListener messageListener,
                           final TreeSet<String> filterConds) {