You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2020/05/11 02:20:15 UTC

[incubator-tubemq] branch master updated: [TUBEMQ-94] Substitute the parameterized type for core module (#75)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git


The following commit(s) were added to refs/heads/master by this push:
     new aa93150  [TUBEMQ-94] Substitute the parameterized type for core module (#75)
aa93150 is described below

commit aa93150432f25f13a3d16a7b4608e9a0c81ee8b6
Author: viviel <37...@users.noreply.github.com>
AuthorDate: Mon May 11 10:20:09 2020 +0800

    [TUBEMQ-94] Substitute the parameterized type for core module (#75)
---
 .../tubemq/corebase/balance/ConsumerEvent.java     |  2 +-
 .../tubemq/corebase/cluster/ConsumerInfo.java      |  2 +-
 .../apache/tubemq/corebase/cluster/MasterInfo.java |  6 +++---
 .../corebase/policies/FlowCtrlRuleHandler.java     | 12 ++++++------
 .../tubemq/corebase/utils/ConcurrentHashSet.java   |  4 ++--
 .../tubemq/corebase/utils/DataConverterUtil.java   | 16 ++++++++--------
 .../java/org/apache/tubemq/corerpc/RpcConfig.java  |  2 +-
 .../apache/tubemq/corerpc/RpcServiceFactory.java   | 22 +++++++++++-----------
 .../apache/tubemq/corerpc/codec/PbEnDecoder.java   |  4 ++--
 .../corerpc/netty/ByteBufferOutputStream.java      |  2 +-
 .../apache/tubemq/corerpc/netty/NettyClient.java   |  4 ++--
 .../tubemq/corerpc/netty/NettyClientFactory.java   |  2 +-
 .../tubemq/corerpc/netty/NettyProtocolDecoder.java |  6 +++---
 .../tubemq/corerpc/netty/NettyProtocolEncoder.java |  2 +-
 .../tubemq/corerpc/netty/NettyRpcServer.java       |  4 ++--
 .../tubemq/corerpc/protocol/ProtocolFactory.java   |  2 +-
 .../tubemq/corerpc/protocol/RpcProtocol.java       |  6 +++---
 .../corerpc/codec/DataConverterUtilTest.java       |  2 +-
 .../corerpc/netty/NettyProtocolEncoderTest.java    |  2 +-
 19 files changed, 51 insertions(+), 51 deletions(-)

diff --git a/tubemq-core/src/main/java/org/apache/tubemq/corebase/balance/ConsumerEvent.java b/tubemq-core/src/main/java/org/apache/tubemq/corebase/balance/ConsumerEvent.java
index bfa3739..3ca72c5 100644
--- a/tubemq-core/src/main/java/org/apache/tubemq/corebase/balance/ConsumerEvent.java
+++ b/tubemq-core/src/main/java/org/apache/tubemq/corebase/balance/ConsumerEvent.java
@@ -28,7 +28,7 @@ public class ConsumerEvent {
     private EventType type;
     private EventStatus status;
     private List<SubscribeInfo> subscribeInfoList =
-            new ArrayList<SubscribeInfo>();
+            new ArrayList<>();
 
 
     public ConsumerEvent(long rebalanceId,
diff --git a/tubemq-core/src/main/java/org/apache/tubemq/corebase/cluster/ConsumerInfo.java b/tubemq-core/src/main/java/org/apache/tubemq/corebase/cluster/ConsumerInfo.java
index 93e847e..f600647 100644
--- a/tubemq-core/src/main/java/org/apache/tubemq/corebase/cluster/ConsumerInfo.java
+++ b/tubemq-core/src/main/java/org/apache/tubemq/corebase/cluster/ConsumerInfo.java
@@ -57,7 +57,7 @@ public class ConsumerInfo implements Comparable<ConsumerInfo>, Serializable {
         this.topicSet = topicSet;
         if (topicConditions == null) {
             this.topicConditions =
-                    new HashMap<String, TreeSet<String>>();
+                    new HashMap<>();
         } else {
             this.topicConditions = topicConditions;
         }
diff --git a/tubemq-core/src/main/java/org/apache/tubemq/corebase/cluster/MasterInfo.java b/tubemq-core/src/main/java/org/apache/tubemq/corebase/cluster/MasterInfo.java
index 38ff76e..0866394 100644
--- a/tubemq-core/src/main/java/org/apache/tubemq/corebase/cluster/MasterInfo.java
+++ b/tubemq-core/src/main/java/org/apache/tubemq/corebase/cluster/MasterInfo.java
@@ -29,7 +29,7 @@ import org.apache.tubemq.corebase.utils.TStringUtils;
 public class MasterInfo {
 
     private final Map<String/** ip:port */, NodeAddrInfo> addrMap4Failover =
-            new HashMap<String, NodeAddrInfo>();
+            new HashMap<>();
     private List<String> nodeHostPortList;
     private NodeAddrInfo firstNodeAddr = null;
     private String masterClusterStr;
@@ -78,7 +78,7 @@ public class MasterInfo {
                 this.firstNodeAddr = tmpNodeAddrInfo;
             }
         }
-        nodeHostPortList = new ArrayList<String>(addrMap4Failover.size());
+        nodeHostPortList = new ArrayList<>(addrMap4Failover.size());
         nodeHostPortList.addAll(addrMap4Failover.keySet());
         int count = 0;
         Collections.sort(nodeHostPortList);
@@ -100,7 +100,7 @@ public class MasterInfo {
             }
             this.addrMap4Failover.put(entry.getKey(), entry.getValue());
         }
-        this.nodeHostPortList = new ArrayList<String>(addrMap4Failover.size());
+        this.nodeHostPortList = new ArrayList<>(addrMap4Failover.size());
         this.nodeHostPortList.addAll(addrMap4Failover.keySet());
         this.firstNodeAddr = firstNodeAddr;
         this.masterClusterStr = masterClusterStr;
diff --git a/tubemq-core/src/main/java/org/apache/tubemq/corebase/policies/FlowCtrlRuleHandler.java b/tubemq-core/src/main/java/org/apache/tubemq/corebase/policies/FlowCtrlRuleHandler.java
index a06b77a..ade5c55 100644
--- a/tubemq-core/src/main/java/org/apache/tubemq/corebase/policies/FlowCtrlRuleHandler.java
+++ b/tubemq-core/src/main/java/org/apache/tubemq/corebase/policies/FlowCtrlRuleHandler.java
@@ -81,7 +81,7 @@ public class FlowCtrlRuleHandler {
             System.currentTimeMillis();
     // Decoded flow control rules
     private Map<Integer, List<FlowCtrlItem>> flowCtrlRuleSet =
-            new ConcurrentHashMap<Integer, List<FlowCtrlItem>>();
+            new ConcurrentHashMap<>();
 
     public FlowCtrlRuleHandler(boolean isDefault) {
         this.isDefaultHandler = isDefault;
@@ -397,7 +397,7 @@ public class FlowCtrlRuleHandler {
      */
     public Map<Integer, List<FlowCtrlItem>> parseFlowCtrlInfo(final String flowCtrlInfo)
             throws Exception {
-        Map<Integer, List<FlowCtrlItem>> flowCtrlMap = new ConcurrentHashMap<Integer, List<FlowCtrlItem>>();
+        Map<Integer, List<FlowCtrlItem>> flowCtrlMap = new ConcurrentHashMap<>();
         if (TStringUtils.isBlank(flowCtrlInfo)) {
             throw new Exception("Parsing error, flowCtrlInfo value is blank!");
         }
@@ -470,7 +470,7 @@ public class FlowCtrlRuleHandler {
         if (ruleArray == null) {
             throw new Exception("not found rule list in data limit!");
         }
-        ArrayList<FlowCtrlItem> flowCtrlItems = new ArrayList<FlowCtrlItem>();
+        ArrayList<FlowCtrlItem> flowCtrlItems = new ArrayList<>();
         for (int index = 0; index < ruleArray.size(); index++) {
             JsonObject ruleObject = ruleArray.get(index).getAsJsonObject();
             int startTime = validAndGetTimeValue("start",
@@ -550,7 +550,7 @@ public class FlowCtrlRuleHandler {
         if (ruleArray == null) {
             throw new Exception("not found rule list in freq limit!");
         }
-        ArrayList<FlowCtrlItem> flowCtrlItems = new ArrayList<FlowCtrlItem>();
+        ArrayList<FlowCtrlItem> flowCtrlItems = new ArrayList<>();
         for (int index = 0; index < ruleArray.size(); index++) {
             JsonObject ruleObject = ruleArray.get(index).getAsJsonObject();
             if (!ruleObject.has("zeroCnt")) {
@@ -611,7 +611,7 @@ public class FlowCtrlRuleHandler {
         if (ruleArray.size() > 1) {
             throw new Exception("only allow set one rule in low fetch limit!");
         }
-        ArrayList<FlowCtrlItem> flowCtrlItems = new ArrayList<FlowCtrlItem>();
+        ArrayList<FlowCtrlItem> flowCtrlItems = new ArrayList<>();
         for (int index = 0; index < ruleArray.size(); index++) {
             JsonObject ruleObject = ruleArray.get(index).getAsJsonObject();
             int normfreqInMs = 0;
@@ -685,7 +685,7 @@ public class FlowCtrlRuleHandler {
         if (ruleArray == null) {
             throw new Exception("not found rule list in SSD limit!");
         }
-        ArrayList<FlowCtrlItem> flowCtrlItems = new ArrayList<FlowCtrlItem>();
+        ArrayList<FlowCtrlItem> flowCtrlItems = new ArrayList<>();
         for (int index = 0; index < ruleArray.size(); index++) {
             JsonObject ruleObject = ruleArray.get(index).getAsJsonObject();
             int startTime = validAndGetTimeValue("start",
diff --git a/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/ConcurrentHashSet.java b/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/ConcurrentHashSet.java
index 9860d09..1b938dd 100644
--- a/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/ConcurrentHashSet.java
+++ b/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/ConcurrentHashSet.java
@@ -30,11 +30,11 @@ public class ConcurrentHashSet<E> extends MapBackedSet<E> {
     private static final long serialVersionUID = 8518578988740277828L;
 
     public ConcurrentHashSet() {
-        super(new ConcurrentHashMap<E, Boolean>());
+        super(new ConcurrentHashMap<>());
     }
 
     public ConcurrentHashSet(Collection<E> c) {
-        super(new ConcurrentHashMap<E, Boolean>(), c);
+        super(new ConcurrentHashMap<>(), c);
     }
 
     @Override
diff --git a/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/DataConverterUtil.java b/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/DataConverterUtil.java
index 1e5b708..b800e1a 100644
--- a/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/DataConverterUtil.java
+++ b/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/DataConverterUtil.java
@@ -47,7 +47,7 @@ public class DataConverterUtil {
      * @param strSubInfoList return a list of SubscribeInfos
      */
     public static List<SubscribeInfo> convertSubInfo(List<String> strSubInfoList) {
-        List<SubscribeInfo> subInfoList = new ArrayList<SubscribeInfo>();
+        List<SubscribeInfo> subInfoList = new ArrayList<>();
         if (strSubInfoList != null) {
             for (String strSubInfo : strSubInfoList) {
                 if (TStringUtils.isNotBlank(strSubInfo)) {
@@ -65,7 +65,7 @@ public class DataConverterUtil {
      * @param subInfoList return a list of String SubscribeInfos
      */
     public static List<String> formatSubInfo(List<SubscribeInfo> subInfoList) {
-        List<String> strSubInfoList = new ArrayList<String>();
+        List<String> strSubInfoList = new ArrayList<>();
         if ((subInfoList != null) && (!subInfoList.isEmpty())) {
             for (SubscribeInfo subInfo : subInfoList) {
                 if (subInfo != null) {
@@ -83,7 +83,7 @@ public class DataConverterUtil {
      * @return return a list of Partition
      */
     public static List<Partition> convertPartitionInfo(List<String> strPartInfoList) {
-        List<Partition> partList = new ArrayList<Partition>();
+        List<Partition> partList = new ArrayList<>();
         if (strPartInfoList != null) {
             for (String partInfo : strPartInfoList) {
                 if (partInfo != null) {
@@ -102,7 +102,7 @@ public class DataConverterUtil {
      */
     public static List<TopicInfo> convertTopicInfo(Map<Integer, BrokerInfo> brokerInfoMap,
                                                    List<String> strTopicInfos) {
-        List<TopicInfo> topicList = new ArrayList<TopicInfo>();
+        List<TopicInfo> topicList = new ArrayList<>();
         if (strTopicInfos != null) {
             for (String info : strTopicInfos) {
                 if (info != null) {
@@ -133,7 +133,7 @@ public class DataConverterUtil {
      */
     public static Map<Integer, BrokerInfo> convertBrokerInfo(List<String> strBrokerInfos) {
         Map<Integer, BrokerInfo> brokerInfoMap =
-                new ConcurrentHashMap<Integer, BrokerInfo>();
+                new ConcurrentHashMap<>();
         if (strBrokerInfos != null) {
             for (String info : strBrokerInfos) {
                 if (info != null) {
@@ -155,7 +155,7 @@ public class DataConverterUtil {
     public static Map<String, TreeSet<String>> convertTopicConditions(
             final List<String> strTopicConditions) {
         Map<String, TreeSet<String>> topicConditions =
-                new HashMap<String, TreeSet<String>>();
+                new HashMap<>();
         if (strTopicConditions == null || strTopicConditions.isEmpty()) {
             return topicConditions;
         }
@@ -172,7 +172,7 @@ public class DataConverterUtil {
             String[] strCondInfo = strInfo[1].split(TokenConstants.ARRAY_SEP);
             TreeSet<String> conditionSet = topicConditions.get(topicName);
             if (conditionSet == null) {
-                conditionSet = new TreeSet<String>();
+                conditionSet = new TreeSet<>();
                 topicConditions.put(topicName, conditionSet);
             }
             for (String cond : strCondInfo) {
@@ -194,7 +194,7 @@ public class DataConverterUtil {
      */
     public static List<Message> convertMessage(final String topicName,
                                                List<ClientBroker.TransferedMessage> transferedMessageList) {
-        List<Message> messageList = new ArrayList<Message>();
+        List<Message> messageList = new ArrayList<>();
         if (transferedMessageList == null || transferedMessageList.isEmpty()) {
             return messageList;
         }
diff --git a/tubemq-core/src/main/java/org/apache/tubemq/corerpc/RpcConfig.java b/tubemq-core/src/main/java/org/apache/tubemq/corerpc/RpcConfig.java
index 7743195..65301c8 100644
--- a/tubemq-core/src/main/java/org/apache/tubemq/corerpc/RpcConfig.java
+++ b/tubemq-core/src/main/java/org/apache/tubemq/corerpc/RpcConfig.java
@@ -24,7 +24,7 @@ import org.apache.tubemq.corebase.utils.TStringUtils;
 
 public class RpcConfig {
 
-    private final Map<String, Object> params = new HashMap<String, Object>();
+    private final Map<String, Object> params = new HashMap<>();
 
     public RpcConfig() {
 
diff --git a/tubemq-core/src/main/java/org/apache/tubemq/corerpc/RpcServiceFactory.java b/tubemq-core/src/main/java/org/apache/tubemq/corerpc/RpcServiceFactory.java
index abdfc80..488baff 100644
--- a/tubemq-core/src/main/java/org/apache/tubemq/corerpc/RpcServiceFactory.java
+++ b/tubemq-core/src/main/java/org/apache/tubemq/corerpc/RpcServiceFactory.java
@@ -51,21 +51,21 @@ public class RpcServiceFactory {
     private static AtomicInteger threadIdGen = new AtomicInteger(0);
     private final ClientFactory clientFactory;
     private final ConcurrentHashMap<Integer, ServiceRpcServer> servers =
-            new ConcurrentHashMap<Integer, ServiceRpcServer>();
+            new ConcurrentHashMap<>();
     private final ConcurrentHashMap<String, ServiceHolder> servicesCache =
-            new ConcurrentHashMap<String, ServiceHolder>();
+            new ConcurrentHashMap<>();
     private final ConcurrentHashMap<String/* addr */, RemoteConErrStats> remoteAddrMap =
-            new ConcurrentHashMap<String, RemoteConErrStats>();
+            new ConcurrentHashMap<>();
     private final ConcurrentHashMap<String/* addr */, Long> forbiddenAddrMap =
-            new ConcurrentHashMap<String, Long>();
+            new ConcurrentHashMap<>();
     private final ConnectionManager connectionManager;
     private final ConcurrentHashMap<String, ConnectionNode> brokerQueue =
-            new ConcurrentHashMap<String, ConnectionNode>();
+            new ConcurrentHashMap<>();
     private final ConcurrentHashMap<String, Long> updateTime =
-            new ConcurrentHashMap<String, Long>();
+            new ConcurrentHashMap<>();
     // Temporary invalid broker map
     private final ConcurrentHashMap<Integer, Long> brokerUnavailableMap =
-        new ConcurrentHashMap<Integer, Long>();
+            new ConcurrentHashMap<>();
     private long unAvailableFbdDurationMs =
         RpcConstants.CFG_UNAVAILABLE_FORBIDDEN_DURATION_MS;
     private AtomicLong lastLogPrintTime = new AtomicLong(0);
@@ -197,7 +197,7 @@ public class RpcServiceFactory {
             if (beforeTime == null) {
                 int totalCount = 0;
                 Long curTime = System.currentTimeMillis();
-                Set<String> expiredAddrs = new HashSet<String>();
+                Set<String> expiredAddrs = new HashSet<>();
                 for (Map.Entry<String, Long> entry : forbiddenAddrMap.entrySet()) {
                     if (entry.getKey() == null || entry.getValue() == null) {
                         continue;
@@ -244,7 +244,7 @@ public class RpcServiceFactory {
 
     public void rmvAllExpiredRecords() {
         long curTime = System.currentTimeMillis();
-        Set<String> expiredAddrs = new HashSet<String>();
+        Set<String> expiredAddrs = new HashSet<>();
         for (Map.Entry<String, RemoteConErrStats> entry : remoteAddrMap.entrySet()) {
             if (entry.getKey() == null || entry.getValue() == null) {
                 continue;
@@ -294,7 +294,7 @@ public class RpcServiceFactory {
 
     public void rmvExpiredUnavailableBrokers() {
         long curTime = System.currentTimeMillis();
-        Set<Integer> expiredBrokers = new HashSet<Integer>();
+        Set<Integer> expiredBrokers = new HashSet<>();
         for (Map.Entry<Integer, Long> entry : brokerUnavailableMap.entrySet()) {
             if (entry.getKey() == null || entry.getValue() == null) {
                 continue;
@@ -566,7 +566,7 @@ public class RpcServiceFactory {
                     }
                     long cur = System.currentTimeMillis();
                     if (cur - lastCheckTime.get() >= 30000) {
-                        ArrayList<String> tmpKeyList = new ArrayList<String>();
+                        ArrayList<String> tmpKeyList = new ArrayList<>();
                         for (Map.Entry<String, Long> entry : updateTime.entrySet()) {
                             if (entry.getKey() == null || entry.getValue() == null) {
                                 continue;
diff --git a/tubemq-core/src/main/java/org/apache/tubemq/corerpc/codec/PbEnDecoder.java b/tubemq-core/src/main/java/org/apache/tubemq/corerpc/codec/PbEnDecoder.java
index df4fae1..0b2c8d0 100644
--- a/tubemq-core/src/main/java/org/apache/tubemq/corerpc/codec/PbEnDecoder.java
+++ b/tubemq-core/src/main/java/org/apache/tubemq/corerpc/codec/PbEnDecoder.java
@@ -30,10 +30,10 @@ import org.apache.tubemq.corerpc.RpcConstants;
 public class PbEnDecoder {
     // The set of methods supported by RPC, only the methods in the map are accepted
     private static final Map<String, Integer> rpcMethodMap =
-            new HashMap<String, Integer>();
+            new HashMap<>();
     // The set of services supported by RPC, only the services in the map are processed.
     private static final Map<String, Integer> rpcServiceMap =
-            new HashMap<String, Integer>();
+            new HashMap<>();
 
     static {
         // The MAP corresponding to the writing of these strings and constants when the system starts up
diff --git a/tubemq-core/src/main/java/org/apache/tubemq/corerpc/netty/ByteBufferOutputStream.java b/tubemq-core/src/main/java/org/apache/tubemq/corerpc/netty/ByteBufferOutputStream.java
index 6625682..994bf37 100644
--- a/tubemq-core/src/main/java/org/apache/tubemq/corerpc/netty/ByteBufferOutputStream.java
+++ b/tubemq-core/src/main/java/org/apache/tubemq/corerpc/netty/ByteBufferOutputStream.java
@@ -70,7 +70,7 @@ public class ByteBufferOutputStream extends OutputStream {
     }
 
     public void reset() {
-        buffers = new LinkedList<ByteBuffer>();
+        buffers = new LinkedList<>();
         buffers.add(ByteBuffer.allocate(RpcConstants.RPC_MAX_BUFFER_SIZE));
     }
 
diff --git a/tubemq-core/src/main/java/org/apache/tubemq/corerpc/netty/NettyClient.java b/tubemq-core/src/main/java/org/apache/tubemq/corerpc/netty/NettyClient.java
index cbadd56..3741a23 100644
--- a/tubemq-core/src/main/java/org/apache/tubemq/corerpc/netty/NettyClient.java
+++ b/tubemq-core/src/main/java/org/apache/tubemq/corerpc/netty/NettyClient.java
@@ -67,9 +67,9 @@ public class NettyClient implements Client {
     private static final AtomicLong init = new AtomicLong(0);
     private static Timer timer;
     private final ConcurrentHashMap<Integer, Callback<ResponseWrapper>> requests =
-            new ConcurrentHashMap<Integer, Callback<ResponseWrapper>>();
+            new ConcurrentHashMap<>();
     private final ConcurrentHashMap<Integer, Timeout> timeouts =
-            new ConcurrentHashMap<Integer, Timeout>();
+            new ConcurrentHashMap<>();
     private final AtomicInteger serialNoGenerator =
             new AtomicInteger(0);
     private AtomicBoolean released = new AtomicBoolean(false);
diff --git a/tubemq-core/src/main/java/org/apache/tubemq/corerpc/netty/NettyClientFactory.java b/tubemq-core/src/main/java/org/apache/tubemq/corerpc/netty/NettyClientFactory.java
index 404d99d..2400b3a 100644
--- a/tubemq-core/src/main/java/org/apache/tubemq/corerpc/netty/NettyClientFactory.java
+++ b/tubemq-core/src/main/java/org/apache/tubemq/corerpc/netty/NettyClientFactory.java
@@ -59,7 +59,7 @@ public class NettyClientFactory implements ClientFactory {
     private static final Logger logger =
             LoggerFactory.getLogger(NettyClientFactory.class);
     protected final ConcurrentHashMap<String, Client> clients =
-            new ConcurrentHashMap<String, Client>();
+            new ConcurrentHashMap<>();
     protected AtomicBoolean shutdown = new AtomicBoolean(true);
     private Timer timer = new HashedWheelTimer();
     private volatile AtomicBoolean init = new AtomicBoolean(true);
diff --git a/tubemq-core/src/main/java/org/apache/tubemq/corerpc/netty/NettyProtocolDecoder.java b/tubemq-core/src/main/java/org/apache/tubemq/corerpc/netty/NettyProtocolDecoder.java
index 78f36ea..46353e7 100644
--- a/tubemq-core/src/main/java/org/apache/tubemq/corerpc/netty/NettyProtocolDecoder.java
+++ b/tubemq-core/src/main/java/org/apache/tubemq/corerpc/netty/NettyProtocolDecoder.java
@@ -37,9 +37,9 @@ public class NettyProtocolDecoder extends FrameDecoder {
     private static final Logger logger =
             LoggerFactory.getLogger(NettyProtocolDecoder.class);
     private static final ConcurrentHashMap<String, AtomicLong> errProtolAddrMap =
-            new ConcurrentHashMap<String, AtomicLong>();
+            new ConcurrentHashMap<>();
     private static final ConcurrentHashMap<String, AtomicLong> errSizeAddrMap =
-            new ConcurrentHashMap<String, AtomicLong>();
+            new ConcurrentHashMap<>();
     private static AtomicLong lastProtolTime = new AtomicLong(0);
     private static AtomicLong lastSizeTime = new AtomicLong(0);
     private boolean packHeaderRead = false;
@@ -61,7 +61,7 @@ public class NettyProtocolDecoder extends FrameDecoder {
             filterIllegalPackageSize(true, tmpListSize,
                     RpcConstants.MAX_FRAME_MAX_LIST_SIZE, channel);
             this.listSize = tmpListSize;
-            this.dataPack = new RpcDataPack(serialNo, new ArrayList<ByteBuffer>(this.listSize));
+            this.dataPack = new RpcDataPack(serialNo, new ArrayList<>(this.listSize));
             this.packHeaderRead = true;
         }
         // get PackBody
diff --git a/tubemq-core/src/main/java/org/apache/tubemq/corerpc/netty/NettyProtocolEncoder.java b/tubemq-core/src/main/java/org/apache/tubemq/corerpc/netty/NettyProtocolEncoder.java
index ae0a69a..b480d3a 100644
--- a/tubemq-core/src/main/java/org/apache/tubemq/corerpc/netty/NettyProtocolEncoder.java
+++ b/tubemq-core/src/main/java/org/apache/tubemq/corerpc/netty/NettyProtocolEncoder.java
@@ -35,7 +35,7 @@ public class NettyProtocolEncoder extends OneToOneEncoder {
                             Channel channel, Object msg) throws Exception {
         RpcDataPack dataPack = (RpcDataPack) msg;
         List<ByteBuffer> origs = dataPack.getDataLst();
-        List<ByteBuffer> bbs = new ArrayList<ByteBuffer>(origs.size() * 2 + 1);
+        List<ByteBuffer> bbs = new ArrayList<>(origs.size() * 2 + 1);
         bbs.add(getPackHeader(dataPack));
         for (ByteBuffer b : origs) {
             bbs.add(getLengthHeader(b));
diff --git a/tubemq-core/src/main/java/org/apache/tubemq/corerpc/netty/NettyRpcServer.java b/tubemq-core/src/main/java/org/apache/tubemq/corerpc/netty/NettyRpcServer.java
index a9b7c5e..abebf4c 100644
--- a/tubemq-core/src/main/java/org/apache/tubemq/corerpc/netty/NettyRpcServer.java
+++ b/tubemq-core/src/main/java/org/apache/tubemq/corerpc/netty/NettyRpcServer.java
@@ -68,10 +68,10 @@ public class NettyRpcServer implements ServiceRpcServer {
     private static final Logger logger =
             LoggerFactory.getLogger(NettyRpcServer.class);
     private static final ConcurrentHashMap<String, AtomicLong> errParseAddrMap =
-            new ConcurrentHashMap<String, AtomicLong>();
+            new ConcurrentHashMap<>();
     private static AtomicLong lastParseTime = new AtomicLong(0);
     private final ConcurrentHashMap<Integer, Protocol> protocols =
-            new ConcurrentHashMap<Integer, Protocol>();
+            new ConcurrentHashMap<>();
     private ServerBootstrap bootstrap;
     private NioServerSocketChannelFactory channelFactory = null;
     private AtomicBoolean started = new AtomicBoolean(false);
diff --git a/tubemq-core/src/main/java/org/apache/tubemq/corerpc/protocol/ProtocolFactory.java b/tubemq-core/src/main/java/org/apache/tubemq/corerpc/protocol/ProtocolFactory.java
index c95cb17..4312a58 100644
--- a/tubemq-core/src/main/java/org/apache/tubemq/corerpc/protocol/ProtocolFactory.java
+++ b/tubemq-core/src/main/java/org/apache/tubemq/corerpc/protocol/ProtocolFactory.java
@@ -24,7 +24,7 @@ import java.util.Map;
 public class ProtocolFactory {
 
     private static final Map<Integer, Class<? extends Protocol>> protocols =
-            new HashMap<Integer, Class<? extends Protocol>>();
+            new HashMap<>();
 
     static {
         registerProtocol(RpcProtocol.RPC_PROTOCOL_TCP, RpcProtocol.class);
diff --git a/tubemq-core/src/main/java/org/apache/tubemq/corerpc/protocol/RpcProtocol.java b/tubemq-core/src/main/java/org/apache/tubemq/corerpc/protocol/RpcProtocol.java
index 4f2c272..e405ff0 100644
--- a/tubemq-core/src/main/java/org/apache/tubemq/corerpc/protocol/RpcProtocol.java
+++ b/tubemq-core/src/main/java/org/apache/tubemq/corerpc/protocol/RpcProtocol.java
@@ -45,11 +45,11 @@ public class RpcProtocol implements Protocol {
     private static final Logger logger =
             LoggerFactory.getLogger(RpcProtocol.class);
     private final Map<Integer, Object> processors =
-            new HashMap<Integer, Object>();
+            new HashMap<>();
     private final Map<Integer, Method> cacheMethods =
-            new HashMap<Integer, Method>();
+            new HashMap<>();
     private final Map<Integer, ExecutorService> threadPools =
-            new HashMap<Integer, ExecutorService>();
+            new HashMap<>();
     private boolean isOverTLS = false;
 
     @Override
diff --git a/tubemq-core/src/test/java/org/apache/tubemq/corerpc/codec/DataConverterUtilTest.java b/tubemq-core/src/test/java/org/apache/tubemq/corerpc/codec/DataConverterUtilTest.java
index 5690a48..28d07e2 100644
--- a/tubemq-core/src/test/java/org/apache/tubemq/corerpc/codec/DataConverterUtilTest.java
+++ b/tubemq-core/src/test/java/org/apache/tubemq/corerpc/codec/DataConverterUtilTest.java
@@ -40,7 +40,7 @@ public class DataConverterUtilTest {
     public void testDataConvert() {
         // broker convert
         BrokerInfo broker = new BrokerInfo(0, "localhost", 1200);
-        List<String> strInfoList = new ArrayList<String>();
+        List<String> strInfoList = new ArrayList<>();
         strInfoList.add("0:localhost:1200");
         Map<Integer, BrokerInfo> brokerMap = DataConverterUtil.convertBrokerInfo(strInfoList);
         assertEquals("broker should be equal", broker, brokerMap.get(broker.getBrokerId()));
diff --git a/tubemq-core/src/test/java/org/apache/tubemq/corerpc/netty/NettyProtocolEncoderTest.java b/tubemq-core/src/test/java/org/apache/tubemq/corerpc/netty/NettyProtocolEncoderTest.java
index 27dea5c..21f4b71 100644
--- a/tubemq-core/src/test/java/org/apache/tubemq/corerpc/netty/NettyProtocolEncoderTest.java
+++ b/tubemq-core/src/test/java/org/apache/tubemq/corerpc/netty/NettyProtocolEncoderTest.java
@@ -37,7 +37,7 @@ public class NettyProtocolEncoderTest {
         RpcDataPack obj = new RpcDataPack();
         // set serial number
         obj.setSerialNo(123);
-        List<ByteBuffer> dataList = new LinkedList<ByteBuffer>();
+        List<ByteBuffer> dataList = new LinkedList<>();
         dataList.add(ByteBuffer.wrap("abc".getBytes()));
         dataList.add(ByteBuffer.wrap("def".getBytes()));
         // append data list.