You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2020/05/11 02:20:15 UTC
[incubator-tubemq] branch master updated: [TUBEMQ-94] Substitute
the parameterized type for core module (#75)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
The following commit(s) were added to refs/heads/master by this push:
new aa93150 [TUBEMQ-94] Substitute the parameterized type for core module (#75)
aa93150 is described below
commit aa93150432f25f13a3d16a7b4608e9a0c81ee8b6
Author: viviel <37...@users.noreply.github.com>
AuthorDate: Mon May 11 10:20:09 2020 +0800
[TUBEMQ-94] Substitute the parameterized type for core module (#75)
---
.../tubemq/corebase/balance/ConsumerEvent.java | 2 +-
.../tubemq/corebase/cluster/ConsumerInfo.java | 2 +-
.../apache/tubemq/corebase/cluster/MasterInfo.java | 6 +++---
.../corebase/policies/FlowCtrlRuleHandler.java | 12 ++++++------
.../tubemq/corebase/utils/ConcurrentHashSet.java | 4 ++--
.../tubemq/corebase/utils/DataConverterUtil.java | 16 ++++++++--------
.../java/org/apache/tubemq/corerpc/RpcConfig.java | 2 +-
.../apache/tubemq/corerpc/RpcServiceFactory.java | 22 +++++++++++-----------
.../apache/tubemq/corerpc/codec/PbEnDecoder.java | 4 ++--
.../corerpc/netty/ByteBufferOutputStream.java | 2 +-
.../apache/tubemq/corerpc/netty/NettyClient.java | 4 ++--
.../tubemq/corerpc/netty/NettyClientFactory.java | 2 +-
.../tubemq/corerpc/netty/NettyProtocolDecoder.java | 6 +++---
.../tubemq/corerpc/netty/NettyProtocolEncoder.java | 2 +-
.../tubemq/corerpc/netty/NettyRpcServer.java | 4 ++--
.../tubemq/corerpc/protocol/ProtocolFactory.java | 2 +-
.../tubemq/corerpc/protocol/RpcProtocol.java | 6 +++---
.../corerpc/codec/DataConverterUtilTest.java | 2 +-
.../corerpc/netty/NettyProtocolEncoderTest.java | 2 +-
19 files changed, 51 insertions(+), 51 deletions(-)
diff --git a/tubemq-core/src/main/java/org/apache/tubemq/corebase/balance/ConsumerEvent.java b/tubemq-core/src/main/java/org/apache/tubemq/corebase/balance/ConsumerEvent.java
index bfa3739..3ca72c5 100644
--- a/tubemq-core/src/main/java/org/apache/tubemq/corebase/balance/ConsumerEvent.java
+++ b/tubemq-core/src/main/java/org/apache/tubemq/corebase/balance/ConsumerEvent.java
@@ -28,7 +28,7 @@ public class ConsumerEvent {
private EventType type;
private EventStatus status;
private List<SubscribeInfo> subscribeInfoList =
- new ArrayList<SubscribeInfo>();
+ new ArrayList<>();
public ConsumerEvent(long rebalanceId,
diff --git a/tubemq-core/src/main/java/org/apache/tubemq/corebase/cluster/ConsumerInfo.java b/tubemq-core/src/main/java/org/apache/tubemq/corebase/cluster/ConsumerInfo.java
index 93e847e..f600647 100644
--- a/tubemq-core/src/main/java/org/apache/tubemq/corebase/cluster/ConsumerInfo.java
+++ b/tubemq-core/src/main/java/org/apache/tubemq/corebase/cluster/ConsumerInfo.java
@@ -57,7 +57,7 @@ public class ConsumerInfo implements Comparable<ConsumerInfo>, Serializable {
this.topicSet = topicSet;
if (topicConditions == null) {
this.topicConditions =
- new HashMap<String, TreeSet<String>>();
+ new HashMap<>();
} else {
this.topicConditions = topicConditions;
}
diff --git a/tubemq-core/src/main/java/org/apache/tubemq/corebase/cluster/MasterInfo.java b/tubemq-core/src/main/java/org/apache/tubemq/corebase/cluster/MasterInfo.java
index 38ff76e..0866394 100644
--- a/tubemq-core/src/main/java/org/apache/tubemq/corebase/cluster/MasterInfo.java
+++ b/tubemq-core/src/main/java/org/apache/tubemq/corebase/cluster/MasterInfo.java
@@ -29,7 +29,7 @@ import org.apache.tubemq.corebase.utils.TStringUtils;
public class MasterInfo {
private final Map<String/** ip:port */, NodeAddrInfo> addrMap4Failover =
- new HashMap<String, NodeAddrInfo>();
+ new HashMap<>();
private List<String> nodeHostPortList;
private NodeAddrInfo firstNodeAddr = null;
private String masterClusterStr;
@@ -78,7 +78,7 @@ public class MasterInfo {
this.firstNodeAddr = tmpNodeAddrInfo;
}
}
- nodeHostPortList = new ArrayList<String>(addrMap4Failover.size());
+ nodeHostPortList = new ArrayList<>(addrMap4Failover.size());
nodeHostPortList.addAll(addrMap4Failover.keySet());
int count = 0;
Collections.sort(nodeHostPortList);
@@ -100,7 +100,7 @@ public class MasterInfo {
}
this.addrMap4Failover.put(entry.getKey(), entry.getValue());
}
- this.nodeHostPortList = new ArrayList<String>(addrMap4Failover.size());
+ this.nodeHostPortList = new ArrayList<>(addrMap4Failover.size());
this.nodeHostPortList.addAll(addrMap4Failover.keySet());
this.firstNodeAddr = firstNodeAddr;
this.masterClusterStr = masterClusterStr;
diff --git a/tubemq-core/src/main/java/org/apache/tubemq/corebase/policies/FlowCtrlRuleHandler.java b/tubemq-core/src/main/java/org/apache/tubemq/corebase/policies/FlowCtrlRuleHandler.java
index a06b77a..ade5c55 100644
--- a/tubemq-core/src/main/java/org/apache/tubemq/corebase/policies/FlowCtrlRuleHandler.java
+++ b/tubemq-core/src/main/java/org/apache/tubemq/corebase/policies/FlowCtrlRuleHandler.java
@@ -81,7 +81,7 @@ public class FlowCtrlRuleHandler {
System.currentTimeMillis();
// Decoded flow control rules
private Map<Integer, List<FlowCtrlItem>> flowCtrlRuleSet =
- new ConcurrentHashMap<Integer, List<FlowCtrlItem>>();
+ new ConcurrentHashMap<>();
public FlowCtrlRuleHandler(boolean isDefault) {
this.isDefaultHandler = isDefault;
@@ -397,7 +397,7 @@ public class FlowCtrlRuleHandler {
*/
public Map<Integer, List<FlowCtrlItem>> parseFlowCtrlInfo(final String flowCtrlInfo)
throws Exception {
- Map<Integer, List<FlowCtrlItem>> flowCtrlMap = new ConcurrentHashMap<Integer, List<FlowCtrlItem>>();
+ Map<Integer, List<FlowCtrlItem>> flowCtrlMap = new ConcurrentHashMap<>();
if (TStringUtils.isBlank(flowCtrlInfo)) {
throw new Exception("Parsing error, flowCtrlInfo value is blank!");
}
@@ -470,7 +470,7 @@ public class FlowCtrlRuleHandler {
if (ruleArray == null) {
throw new Exception("not found rule list in data limit!");
}
- ArrayList<FlowCtrlItem> flowCtrlItems = new ArrayList<FlowCtrlItem>();
+ ArrayList<FlowCtrlItem> flowCtrlItems = new ArrayList<>();
for (int index = 0; index < ruleArray.size(); index++) {
JsonObject ruleObject = ruleArray.get(index).getAsJsonObject();
int startTime = validAndGetTimeValue("start",
@@ -550,7 +550,7 @@ public class FlowCtrlRuleHandler {
if (ruleArray == null) {
throw new Exception("not found rule list in freq limit!");
}
- ArrayList<FlowCtrlItem> flowCtrlItems = new ArrayList<FlowCtrlItem>();
+ ArrayList<FlowCtrlItem> flowCtrlItems = new ArrayList<>();
for (int index = 0; index < ruleArray.size(); index++) {
JsonObject ruleObject = ruleArray.get(index).getAsJsonObject();
if (!ruleObject.has("zeroCnt")) {
@@ -611,7 +611,7 @@ public class FlowCtrlRuleHandler {
if (ruleArray.size() > 1) {
throw new Exception("only allow set one rule in low fetch limit!");
}
- ArrayList<FlowCtrlItem> flowCtrlItems = new ArrayList<FlowCtrlItem>();
+ ArrayList<FlowCtrlItem> flowCtrlItems = new ArrayList<>();
for (int index = 0; index < ruleArray.size(); index++) {
JsonObject ruleObject = ruleArray.get(index).getAsJsonObject();
int normfreqInMs = 0;
@@ -685,7 +685,7 @@ public class FlowCtrlRuleHandler {
if (ruleArray == null) {
throw new Exception("not found rule list in SSD limit!");
}
- ArrayList<FlowCtrlItem> flowCtrlItems = new ArrayList<FlowCtrlItem>();
+ ArrayList<FlowCtrlItem> flowCtrlItems = new ArrayList<>();
for (int index = 0; index < ruleArray.size(); index++) {
JsonObject ruleObject = ruleArray.get(index).getAsJsonObject();
int startTime = validAndGetTimeValue("start",
diff --git a/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/ConcurrentHashSet.java b/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/ConcurrentHashSet.java
index 9860d09..1b938dd 100644
--- a/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/ConcurrentHashSet.java
+++ b/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/ConcurrentHashSet.java
@@ -30,11 +30,11 @@ public class ConcurrentHashSet<E> extends MapBackedSet<E> {
private static final long serialVersionUID = 8518578988740277828L;
public ConcurrentHashSet() {
- super(new ConcurrentHashMap<E, Boolean>());
+ super(new ConcurrentHashMap<>());
}
public ConcurrentHashSet(Collection<E> c) {
- super(new ConcurrentHashMap<E, Boolean>(), c);
+ super(new ConcurrentHashMap<>(), c);
}
@Override
diff --git a/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/DataConverterUtil.java b/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/DataConverterUtil.java
index 1e5b708..b800e1a 100644
--- a/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/DataConverterUtil.java
+++ b/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/DataConverterUtil.java
@@ -47,7 +47,7 @@ public class DataConverterUtil {
* @param strSubInfoList return a list of SubscribeInfos
*/
public static List<SubscribeInfo> convertSubInfo(List<String> strSubInfoList) {
- List<SubscribeInfo> subInfoList = new ArrayList<SubscribeInfo>();
+ List<SubscribeInfo> subInfoList = new ArrayList<>();
if (strSubInfoList != null) {
for (String strSubInfo : strSubInfoList) {
if (TStringUtils.isNotBlank(strSubInfo)) {
@@ -65,7 +65,7 @@ public class DataConverterUtil {
* @param subInfoList return a list of String SubscribeInfos
*/
public static List<String> formatSubInfo(List<SubscribeInfo> subInfoList) {
- List<String> strSubInfoList = new ArrayList<String>();
+ List<String> strSubInfoList = new ArrayList<>();
if ((subInfoList != null) && (!subInfoList.isEmpty())) {
for (SubscribeInfo subInfo : subInfoList) {
if (subInfo != null) {
@@ -83,7 +83,7 @@ public class DataConverterUtil {
* @return return a list of Partition
*/
public static List<Partition> convertPartitionInfo(List<String> strPartInfoList) {
- List<Partition> partList = new ArrayList<Partition>();
+ List<Partition> partList = new ArrayList<>();
if (strPartInfoList != null) {
for (String partInfo : strPartInfoList) {
if (partInfo != null) {
@@ -102,7 +102,7 @@ public class DataConverterUtil {
*/
public static List<TopicInfo> convertTopicInfo(Map<Integer, BrokerInfo> brokerInfoMap,
List<String> strTopicInfos) {
- List<TopicInfo> topicList = new ArrayList<TopicInfo>();
+ List<TopicInfo> topicList = new ArrayList<>();
if (strTopicInfos != null) {
for (String info : strTopicInfos) {
if (info != null) {
@@ -133,7 +133,7 @@ public class DataConverterUtil {
*/
public static Map<Integer, BrokerInfo> convertBrokerInfo(List<String> strBrokerInfos) {
Map<Integer, BrokerInfo> brokerInfoMap =
- new ConcurrentHashMap<Integer, BrokerInfo>();
+ new ConcurrentHashMap<>();
if (strBrokerInfos != null) {
for (String info : strBrokerInfos) {
if (info != null) {
@@ -155,7 +155,7 @@ public class DataConverterUtil {
public static Map<String, TreeSet<String>> convertTopicConditions(
final List<String> strTopicConditions) {
Map<String, TreeSet<String>> topicConditions =
- new HashMap<String, TreeSet<String>>();
+ new HashMap<>();
if (strTopicConditions == null || strTopicConditions.isEmpty()) {
return topicConditions;
}
@@ -172,7 +172,7 @@ public class DataConverterUtil {
String[] strCondInfo = strInfo[1].split(TokenConstants.ARRAY_SEP);
TreeSet<String> conditionSet = topicConditions.get(topicName);
if (conditionSet == null) {
- conditionSet = new TreeSet<String>();
+ conditionSet = new TreeSet<>();
topicConditions.put(topicName, conditionSet);
}
for (String cond : strCondInfo) {
@@ -194,7 +194,7 @@ public class DataConverterUtil {
*/
public static List<Message> convertMessage(final String topicName,
List<ClientBroker.TransferedMessage> transferedMessageList) {
- List<Message> messageList = new ArrayList<Message>();
+ List<Message> messageList = new ArrayList<>();
if (transferedMessageList == null || transferedMessageList.isEmpty()) {
return messageList;
}
diff --git a/tubemq-core/src/main/java/org/apache/tubemq/corerpc/RpcConfig.java b/tubemq-core/src/main/java/org/apache/tubemq/corerpc/RpcConfig.java
index 7743195..65301c8 100644
--- a/tubemq-core/src/main/java/org/apache/tubemq/corerpc/RpcConfig.java
+++ b/tubemq-core/src/main/java/org/apache/tubemq/corerpc/RpcConfig.java
@@ -24,7 +24,7 @@ import org.apache.tubemq.corebase.utils.TStringUtils;
public class RpcConfig {
- private final Map<String, Object> params = new HashMap<String, Object>();
+ private final Map<String, Object> params = new HashMap<>();
public RpcConfig() {
diff --git a/tubemq-core/src/main/java/org/apache/tubemq/corerpc/RpcServiceFactory.java b/tubemq-core/src/main/java/org/apache/tubemq/corerpc/RpcServiceFactory.java
index abdfc80..488baff 100644
--- a/tubemq-core/src/main/java/org/apache/tubemq/corerpc/RpcServiceFactory.java
+++ b/tubemq-core/src/main/java/org/apache/tubemq/corerpc/RpcServiceFactory.java
@@ -51,21 +51,21 @@ public class RpcServiceFactory {
private static AtomicInteger threadIdGen = new AtomicInteger(0);
private final ClientFactory clientFactory;
private final ConcurrentHashMap<Integer, ServiceRpcServer> servers =
- new ConcurrentHashMap<Integer, ServiceRpcServer>();
+ new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, ServiceHolder> servicesCache =
- new ConcurrentHashMap<String, ServiceHolder>();
+ new ConcurrentHashMap<>();
private final ConcurrentHashMap<String/* addr */, RemoteConErrStats> remoteAddrMap =
- new ConcurrentHashMap<String, RemoteConErrStats>();
+ new ConcurrentHashMap<>();
private final ConcurrentHashMap<String/* addr */, Long> forbiddenAddrMap =
- new ConcurrentHashMap<String, Long>();
+ new ConcurrentHashMap<>();
private final ConnectionManager connectionManager;
private final ConcurrentHashMap<String, ConnectionNode> brokerQueue =
- new ConcurrentHashMap<String, ConnectionNode>();
+ new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, Long> updateTime =
- new ConcurrentHashMap<String, Long>();
+ new ConcurrentHashMap<>();
// Temporary invalid broker map
private final ConcurrentHashMap<Integer, Long> brokerUnavailableMap =
- new ConcurrentHashMap<Integer, Long>();
+ new ConcurrentHashMap<>();
private long unAvailableFbdDurationMs =
RpcConstants.CFG_UNAVAILABLE_FORBIDDEN_DURATION_MS;
private AtomicLong lastLogPrintTime = new AtomicLong(0);
@@ -197,7 +197,7 @@ public class RpcServiceFactory {
if (beforeTime == null) {
int totalCount = 0;
Long curTime = System.currentTimeMillis();
- Set<String> expiredAddrs = new HashSet<String>();
+ Set<String> expiredAddrs = new HashSet<>();
for (Map.Entry<String, Long> entry : forbiddenAddrMap.entrySet()) {
if (entry.getKey() == null || entry.getValue() == null) {
continue;
@@ -244,7 +244,7 @@ public class RpcServiceFactory {
public void rmvAllExpiredRecords() {
long curTime = System.currentTimeMillis();
- Set<String> expiredAddrs = new HashSet<String>();
+ Set<String> expiredAddrs = new HashSet<>();
for (Map.Entry<String, RemoteConErrStats> entry : remoteAddrMap.entrySet()) {
if (entry.getKey() == null || entry.getValue() == null) {
continue;
@@ -294,7 +294,7 @@ public class RpcServiceFactory {
public void rmvExpiredUnavailableBrokers() {
long curTime = System.currentTimeMillis();
- Set<Integer> expiredBrokers = new HashSet<Integer>();
+ Set<Integer> expiredBrokers = new HashSet<>();
for (Map.Entry<Integer, Long> entry : brokerUnavailableMap.entrySet()) {
if (entry.getKey() == null || entry.getValue() == null) {
continue;
@@ -566,7 +566,7 @@ public class RpcServiceFactory {
}
long cur = System.currentTimeMillis();
if (cur - lastCheckTime.get() >= 30000) {
- ArrayList<String> tmpKeyList = new ArrayList<String>();
+ ArrayList<String> tmpKeyList = new ArrayList<>();
for (Map.Entry<String, Long> entry : updateTime.entrySet()) {
if (entry.getKey() == null || entry.getValue() == null) {
continue;
diff --git a/tubemq-core/src/main/java/org/apache/tubemq/corerpc/codec/PbEnDecoder.java b/tubemq-core/src/main/java/org/apache/tubemq/corerpc/codec/PbEnDecoder.java
index df4fae1..0b2c8d0 100644
--- a/tubemq-core/src/main/java/org/apache/tubemq/corerpc/codec/PbEnDecoder.java
+++ b/tubemq-core/src/main/java/org/apache/tubemq/corerpc/codec/PbEnDecoder.java
@@ -30,10 +30,10 @@ import org.apache.tubemq.corerpc.RpcConstants;
public class PbEnDecoder {
// The set of methods supported by RPC, only the methods in the map are accepted
private static final Map<String, Integer> rpcMethodMap =
- new HashMap<String, Integer>();
+ new HashMap<>();
// The set of services supported by RPC, only the services in the map are processed.
private static final Map<String, Integer> rpcServiceMap =
- new HashMap<String, Integer>();
+ new HashMap<>();
static {
// The MAP corresponding to the writing of these strings and constants when the system starts up
diff --git a/tubemq-core/src/main/java/org/apache/tubemq/corerpc/netty/ByteBufferOutputStream.java b/tubemq-core/src/main/java/org/apache/tubemq/corerpc/netty/ByteBufferOutputStream.java
index 6625682..994bf37 100644
--- a/tubemq-core/src/main/java/org/apache/tubemq/corerpc/netty/ByteBufferOutputStream.java
+++ b/tubemq-core/src/main/java/org/apache/tubemq/corerpc/netty/ByteBufferOutputStream.java
@@ -70,7 +70,7 @@ public class ByteBufferOutputStream extends OutputStream {
}
public void reset() {
- buffers = new LinkedList<ByteBuffer>();
+ buffers = new LinkedList<>();
buffers.add(ByteBuffer.allocate(RpcConstants.RPC_MAX_BUFFER_SIZE));
}
diff --git a/tubemq-core/src/main/java/org/apache/tubemq/corerpc/netty/NettyClient.java b/tubemq-core/src/main/java/org/apache/tubemq/corerpc/netty/NettyClient.java
index cbadd56..3741a23 100644
--- a/tubemq-core/src/main/java/org/apache/tubemq/corerpc/netty/NettyClient.java
+++ b/tubemq-core/src/main/java/org/apache/tubemq/corerpc/netty/NettyClient.java
@@ -67,9 +67,9 @@ public class NettyClient implements Client {
private static final AtomicLong init = new AtomicLong(0);
private static Timer timer;
private final ConcurrentHashMap<Integer, Callback<ResponseWrapper>> requests =
- new ConcurrentHashMap<Integer, Callback<ResponseWrapper>>();
+ new ConcurrentHashMap<>();
private final ConcurrentHashMap<Integer, Timeout> timeouts =
- new ConcurrentHashMap<Integer, Timeout>();
+ new ConcurrentHashMap<>();
private final AtomicInteger serialNoGenerator =
new AtomicInteger(0);
private AtomicBoolean released = new AtomicBoolean(false);
diff --git a/tubemq-core/src/main/java/org/apache/tubemq/corerpc/netty/NettyClientFactory.java b/tubemq-core/src/main/java/org/apache/tubemq/corerpc/netty/NettyClientFactory.java
index 404d99d..2400b3a 100644
--- a/tubemq-core/src/main/java/org/apache/tubemq/corerpc/netty/NettyClientFactory.java
+++ b/tubemq-core/src/main/java/org/apache/tubemq/corerpc/netty/NettyClientFactory.java
@@ -59,7 +59,7 @@ public class NettyClientFactory implements ClientFactory {
private static final Logger logger =
LoggerFactory.getLogger(NettyClientFactory.class);
protected final ConcurrentHashMap<String, Client> clients =
- new ConcurrentHashMap<String, Client>();
+ new ConcurrentHashMap<>();
protected AtomicBoolean shutdown = new AtomicBoolean(true);
private Timer timer = new HashedWheelTimer();
private volatile AtomicBoolean init = new AtomicBoolean(true);
diff --git a/tubemq-core/src/main/java/org/apache/tubemq/corerpc/netty/NettyProtocolDecoder.java b/tubemq-core/src/main/java/org/apache/tubemq/corerpc/netty/NettyProtocolDecoder.java
index 78f36ea..46353e7 100644
--- a/tubemq-core/src/main/java/org/apache/tubemq/corerpc/netty/NettyProtocolDecoder.java
+++ b/tubemq-core/src/main/java/org/apache/tubemq/corerpc/netty/NettyProtocolDecoder.java
@@ -37,9 +37,9 @@ public class NettyProtocolDecoder extends FrameDecoder {
private static final Logger logger =
LoggerFactory.getLogger(NettyProtocolDecoder.class);
private static final ConcurrentHashMap<String, AtomicLong> errProtolAddrMap =
- new ConcurrentHashMap<String, AtomicLong>();
+ new ConcurrentHashMap<>();
private static final ConcurrentHashMap<String, AtomicLong> errSizeAddrMap =
- new ConcurrentHashMap<String, AtomicLong>();
+ new ConcurrentHashMap<>();
private static AtomicLong lastProtolTime = new AtomicLong(0);
private static AtomicLong lastSizeTime = new AtomicLong(0);
private boolean packHeaderRead = false;
@@ -61,7 +61,7 @@ public class NettyProtocolDecoder extends FrameDecoder {
filterIllegalPackageSize(true, tmpListSize,
RpcConstants.MAX_FRAME_MAX_LIST_SIZE, channel);
this.listSize = tmpListSize;
- this.dataPack = new RpcDataPack(serialNo, new ArrayList<ByteBuffer>(this.listSize));
+ this.dataPack = new RpcDataPack(serialNo, new ArrayList<>(this.listSize));
this.packHeaderRead = true;
}
// get PackBody
diff --git a/tubemq-core/src/main/java/org/apache/tubemq/corerpc/netty/NettyProtocolEncoder.java b/tubemq-core/src/main/java/org/apache/tubemq/corerpc/netty/NettyProtocolEncoder.java
index ae0a69a..b480d3a 100644
--- a/tubemq-core/src/main/java/org/apache/tubemq/corerpc/netty/NettyProtocolEncoder.java
+++ b/tubemq-core/src/main/java/org/apache/tubemq/corerpc/netty/NettyProtocolEncoder.java
@@ -35,7 +35,7 @@ public class NettyProtocolEncoder extends OneToOneEncoder {
Channel channel, Object msg) throws Exception {
RpcDataPack dataPack = (RpcDataPack) msg;
List<ByteBuffer> origs = dataPack.getDataLst();
- List<ByteBuffer> bbs = new ArrayList<ByteBuffer>(origs.size() * 2 + 1);
+ List<ByteBuffer> bbs = new ArrayList<>(origs.size() * 2 + 1);
bbs.add(getPackHeader(dataPack));
for (ByteBuffer b : origs) {
bbs.add(getLengthHeader(b));
diff --git a/tubemq-core/src/main/java/org/apache/tubemq/corerpc/netty/NettyRpcServer.java b/tubemq-core/src/main/java/org/apache/tubemq/corerpc/netty/NettyRpcServer.java
index a9b7c5e..abebf4c 100644
--- a/tubemq-core/src/main/java/org/apache/tubemq/corerpc/netty/NettyRpcServer.java
+++ b/tubemq-core/src/main/java/org/apache/tubemq/corerpc/netty/NettyRpcServer.java
@@ -68,10 +68,10 @@ public class NettyRpcServer implements ServiceRpcServer {
private static final Logger logger =
LoggerFactory.getLogger(NettyRpcServer.class);
private static final ConcurrentHashMap<String, AtomicLong> errParseAddrMap =
- new ConcurrentHashMap<String, AtomicLong>();
+ new ConcurrentHashMap<>();
private static AtomicLong lastParseTime = new AtomicLong(0);
private final ConcurrentHashMap<Integer, Protocol> protocols =
- new ConcurrentHashMap<Integer, Protocol>();
+ new ConcurrentHashMap<>();
private ServerBootstrap bootstrap;
private NioServerSocketChannelFactory channelFactory = null;
private AtomicBoolean started = new AtomicBoolean(false);
diff --git a/tubemq-core/src/main/java/org/apache/tubemq/corerpc/protocol/ProtocolFactory.java b/tubemq-core/src/main/java/org/apache/tubemq/corerpc/protocol/ProtocolFactory.java
index c95cb17..4312a58 100644
--- a/tubemq-core/src/main/java/org/apache/tubemq/corerpc/protocol/ProtocolFactory.java
+++ b/tubemq-core/src/main/java/org/apache/tubemq/corerpc/protocol/ProtocolFactory.java
@@ -24,7 +24,7 @@ import java.util.Map;
public class ProtocolFactory {
private static final Map<Integer, Class<? extends Protocol>> protocols =
- new HashMap<Integer, Class<? extends Protocol>>();
+ new HashMap<>();
static {
registerProtocol(RpcProtocol.RPC_PROTOCOL_TCP, RpcProtocol.class);
diff --git a/tubemq-core/src/main/java/org/apache/tubemq/corerpc/protocol/RpcProtocol.java b/tubemq-core/src/main/java/org/apache/tubemq/corerpc/protocol/RpcProtocol.java
index 4f2c272..e405ff0 100644
--- a/tubemq-core/src/main/java/org/apache/tubemq/corerpc/protocol/RpcProtocol.java
+++ b/tubemq-core/src/main/java/org/apache/tubemq/corerpc/protocol/RpcProtocol.java
@@ -45,11 +45,11 @@ public class RpcProtocol implements Protocol {
private static final Logger logger =
LoggerFactory.getLogger(RpcProtocol.class);
private final Map<Integer, Object> processors =
- new HashMap<Integer, Object>();
+ new HashMap<>();
private final Map<Integer, Method> cacheMethods =
- new HashMap<Integer, Method>();
+ new HashMap<>();
private final Map<Integer, ExecutorService> threadPools =
- new HashMap<Integer, ExecutorService>();
+ new HashMap<>();
private boolean isOverTLS = false;
@Override
diff --git a/tubemq-core/src/test/java/org/apache/tubemq/corerpc/codec/DataConverterUtilTest.java b/tubemq-core/src/test/java/org/apache/tubemq/corerpc/codec/DataConverterUtilTest.java
index 5690a48..28d07e2 100644
--- a/tubemq-core/src/test/java/org/apache/tubemq/corerpc/codec/DataConverterUtilTest.java
+++ b/tubemq-core/src/test/java/org/apache/tubemq/corerpc/codec/DataConverterUtilTest.java
@@ -40,7 +40,7 @@ public class DataConverterUtilTest {
public void testDataConvert() {
// broker convert
BrokerInfo broker = new BrokerInfo(0, "localhost", 1200);
- List<String> strInfoList = new ArrayList<String>();
+ List<String> strInfoList = new ArrayList<>();
strInfoList.add("0:localhost:1200");
Map<Integer, BrokerInfo> brokerMap = DataConverterUtil.convertBrokerInfo(strInfoList);
assertEquals("broker should be equal", broker, brokerMap.get(broker.getBrokerId()));
diff --git a/tubemq-core/src/test/java/org/apache/tubemq/corerpc/netty/NettyProtocolEncoderTest.java b/tubemq-core/src/test/java/org/apache/tubemq/corerpc/netty/NettyProtocolEncoderTest.java
index 27dea5c..21f4b71 100644
--- a/tubemq-core/src/test/java/org/apache/tubemq/corerpc/netty/NettyProtocolEncoderTest.java
+++ b/tubemq-core/src/test/java/org/apache/tubemq/corerpc/netty/NettyProtocolEncoderTest.java
@@ -37,7 +37,7 @@ public class NettyProtocolEncoderTest {
RpcDataPack obj = new RpcDataPack();
// set serial number
obj.setSerialNo(123);
- List<ByteBuffer> dataList = new LinkedList<ByteBuffer>();
+ List<ByteBuffer> dataList = new LinkedList<>();
dataList.add(ByteBuffer.wrap("abc".getBytes()));
dataList.add(ByteBuffer.wrap("def".getBytes()));
// append data list.