You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2020/05/11 02:21:08 UTC
[incubator-tubemq] branch master updated: [TUBEMQ-95] Substitute
the parameterized type for server module (#74)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
The following commit(s) were added to refs/heads/master by this push:
new c3a1f95 [TUBEMQ-95] Substitute the parameterized type for server module (#74)
c3a1f95 is described below
commit c3a1f952d1135960bf31186ad65fcc7e68f05f78
Author: Tboy <gu...@immomo.com>
AuthorDate: Mon May 11 10:21:00 2020 +0800
[TUBEMQ-95] Substitute the parameterized type for server module (#74)
---
.../tubemq/server/broker/BrokerServiceServer.java | 8 +--
.../broker/metadata/BrokerMetadataManager.java | 22 +++---
.../server/broker/msgstore/MessageStore.java | 4 +-
.../broker/msgstore/MessageStoreManager.java | 20 +++---
.../broker/msgstore/disk/FileSegmentList.java | 2 +-
.../broker/msgstore/disk/GetMessageResult.java | 4 +-
.../server/broker/msgstore/disk/MsgFileStore.java | 4 +-
.../server/broker/msgstore/mem/MsgMemStore.java | 6 +-
.../server/broker/msgstore/ssd/MsgSSDSegment.java | 6 +-
.../broker/msgstore/ssd/MsgSSDStoreManager.java | 20 +++---
.../server/broker/nodeinfo/ConsumerNodeInfo.java | 4 +-
.../server/broker/offset/DefaultOffsetManager.java | 10 +--
.../server/broker/stats/GroupCountService.java | 2 +-
.../server/broker/web/BrokerAdminServlet.java | 2 +-
.../aaaserver/SimpleCertificateBrokerHandler.java | 4 +-
.../server/common/heartbeat/HeartbeatManager.java | 8 +--
.../zookeeper/RecoverableZooKeeper.java | 2 +-
.../offsetstorage/zookeeper/ZooKeeperWatcher.java | 2 +-
.../server/common/paramcheck/PBParameterUtils.java | 2 +-
.../apache/tubemq/server/common/utils/RowLock.java | 4 +-
.../server/common/utils/WebParameterUtils.java | 10 +--
.../org/apache/tubemq/server/master/TMaster.java | 36 +++++-----
.../server/master/balance/DefaultLoadBalancer.java | 46 ++++++------
.../master/bdbstore/DefaultBdbStoreService.java | 32 ++++-----
.../nodemanage/nodebroker/BrokerConfManager.java | 84 ++++++++++------------
.../nodemanage/nodebroker/BrokerInfoHolder.java | 10 +--
.../nodebroker/BrokerSyncStatusInfo.java | 10 +--
.../nodemanage/nodebroker/TopicPSInfoManager.java | 22 +++---
.../nodemanage/nodeconsumer/ConsumerBandInfo.java | 20 +++---
.../nodeconsumer/ConsumerEventManager.java | 12 ++--
.../nodeconsumer/ConsumerInfoHolder.java | 8 +--
.../nodemanage/nodeconsumer/RebProcessInfo.java | 4 +-
.../nodeproducer/ProducerInfoHolder.java | 2 +-
.../server/master/web/action/screen/Master.java | 2 +-
.../server/master/web/action/screen/Webapi.java | 6 +-
.../web/action/screen/cluster/ClusterManager.java | 2 +-
.../web/action/screen/config/BrokerList.java | 2 +-
.../web/handler/WebAdminFlowRuleHandler.java | 16 ++---
.../web/handler/WebAdminGroupCtrlHandler.java | 8 +--
.../web/handler/WebAdminTopicAuthHandler.java | 4 +-
.../web/handler/WebBrokerDefConfHandler.java | 4 +-
.../web/handler/WebBrokerTopicConfHandler.java | 22 +++---
.../master/web/simplemvc/RequestDispatcher.java | 10 +--
.../master/web/simplemvc/conf/WebConfig.java | 8 +--
.../apache/tubemq/server/tools/BdbGroupAdmin.java | 2 +-
.../tubemq/server/tools/StoreRepairAdmin.java | 6 +-
46 files changed, 257 insertions(+), 267 deletions(-)
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/BrokerServiceServer.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/BrokerServiceServer.java
index 3de42d1..6cd99b1 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/BrokerServiceServer.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/BrokerServiceServer.java
@@ -89,7 +89,7 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
private final BrokerConfig tubeConfig;
// registered consumers. format : consumer group - topic - partition id --> consumer info
private final ConcurrentHashMap<String/* group:topic-partitionId */, ConsumerNodeInfo> consumerRegisterMap =
- new ConcurrentHashMap<String, ConsumerNodeInfo>();
+ new ConcurrentHashMap<>();
// metadata manager.
private final MetadataManager metadataManager;
// offset storage manager.
@@ -555,7 +555,7 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
.append(topicName).append(")!\"}");
return sb;
} else {
- List<String> transferMessageList = new ArrayList<String>();
+ List<String> transferMessageList = new ArrayList<>();
List<TransferedMessage> tmpMsgList = getMessageResult.transferedMessageList;
List<Message> messageList = DataConverterUtil.convertMessage(topicName, tmpMsgList);
int startPos = messageList.size() - msgCount < 0 ? 0 : messageList.size() - msgCount;
@@ -755,7 +755,7 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
final String groupName = (String) paramCheckResult.checkData;
boolean isRegister = (request.getOpType() == RpcConstants.MSG_OPTYPE_REGISTER);
- Set<String> filterCondSet = new HashSet<String>();
+ Set<String> filterCondSet = new HashSet<>();
if (request.getFilterCondStrList() != null && !request.getFilterCondStrList().isEmpty()) {
for (String filterCond : request.getFilterCondStrList()) {
if (TStringUtils.isNotBlank(filterCond)) {
@@ -1022,7 +1022,7 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
DataConverterUtil.convertPartitionInfo(request.getPartitionInfoList());
CertifiedResult authorizeResult = null;
boolean isAuthorized = false;
- List<String> failureInfo = new ArrayList<String>();
+ List<String> failureInfo = new ArrayList<>();
for (Partition partition : partitions) {
String topic = partition.getTopic();
int partitionId = partition.getPartitionId();
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/BrokerMetadataManager.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/BrokerMetadataManager.java
index 0e5cec4..984291d 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/BrokerMetadataManager.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/BrokerMetadataManager.java
@@ -50,20 +50,20 @@ public class BrokerMetadataManager implements MetadataManager {
// broker's metadata in String format.
private String brokerDefMetaConfInfo = "";
// broker's topic's config list.
- private List<String> topicMetaConfInfoLst = new ArrayList<String>();
+ private List<String> topicMetaConfInfoLst = new ArrayList<>();
// topic in this broker.
- private List<String> topics = new ArrayList<String>();
+ private List<String> topics = new ArrayList<>();
// broker's default metadata.
private BrokerDefMetadata brokerDefMetadata = new BrokerDefMetadata();
// topic with custom config.
private ConcurrentHashMap<String/* topic */, TopicMetadata> topicConfigMap =
- new ConcurrentHashMap<String, TopicMetadata>();
+ new ConcurrentHashMap<>();
// topics will be closed.
private Map<String/* topic */, Integer> closedTopicMap =
- new ConcurrentHashMap<String, Integer>();
+ new ConcurrentHashMap<>();
// topics will be removed.
private Map<String/* topic */, TopicMetadata> removedTopicConfigMap =
- new ConcurrentHashMap<String, TopicMetadata>();
+ new ConcurrentHashMap<>();
private long lastRptBrokerMetaConfId = 0;
public BrokerMetadataManager() {
@@ -174,7 +174,7 @@ public class BrokerMetadataManager implements MetadataManager {
*/
@Override
public List<String> getHardRemovedTopics() {
- List<String> targetTopics = new ArrayList<String>();
+ List<String> targetTopics = new ArrayList<>();
for (Map.Entry<String, TopicMetadata> entry
: this.removedTopicConfigMap.entrySet()) {
if (entry.getKey() == null || entry.getValue() == null) {
@@ -242,11 +242,11 @@ public class BrokerMetadataManager implements MetadataManager {
logger.error("[Metadata Manage] received broker topic info is Blank, not update");
return;
}
- List<String> newTopics = new ArrayList<String>();
+ List<String> newTopics = new ArrayList<>();
Map<String/* topic */, Integer> tmpInvalidTopicMap =
- new ConcurrentHashMap<String, Integer>();
+ new ConcurrentHashMap<>();
ConcurrentHashMap<String/* topic */, TopicMetadata> newTopicConfigMap =
- new ConcurrentHashMap<String, TopicMetadata>();
+ new ConcurrentHashMap<>();
for (String strTopicConfInfo : newTopicMetaConfInfoLst) {
if (TStringUtils.isBlank(strTopicConfInfo)) {
continue;
@@ -287,7 +287,7 @@ public class BrokerMetadataManager implements MetadataManager {
// 该部分根据Master上的指示进行对应Topic的删除操作
boolean needProcess = false;
if (isTakeRemoveTopics) {
- List<String> origTopics = new ArrayList<String>();
+ List<String> origTopics = new ArrayList<>();
if (rmvTopicMetaConfInfoLst != null
&& !rmvTopicMetaConfInfoLst.isEmpty()) {
for (String tmpTopicMetaConfInfo : rmvTopicMetaConfInfoLst) {
@@ -307,7 +307,7 @@ public class BrokerMetadataManager implements MetadataManager {
origTopics.add(topicMetadata.getTopic());
}
}
- List<String> tmpTopics = new ArrayList<String>();
+ List<String> tmpTopics = new ArrayList<>();
for (Map.Entry<String, TopicMetadata> entry : removedTopicConfigMap.entrySet()) {
if (entry.getKey() == null || entry.getValue() == null) {
continue;
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java
index 5e9f31b..7dbef6d 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java
@@ -213,9 +213,9 @@ public class MessageStore implements Closeable {
// return not found when data is under memory sink operation.
if (memMsgRlt.isSuccess) {
HashMap<String, CountItem> countMap =
- new HashMap<String, CountItem>();
+ new HashMap<>();
List<ClientBroker.TransferedMessage> transferedMessageList =
- new ArrayList<ClientBroker.TransferedMessage>();
+ new ArrayList<>();
if (!memMsgRlt.cacheMsgList.isEmpty()) {
final StringBuilder strBuffer = new StringBuilder(512);
for (ByteBuffer dataBuffer : memMsgRlt.cacheMsgList) {
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStoreManager.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStoreManager.java
index 74e0d91..daa8ba7 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStoreManager.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStoreManager.java
@@ -70,7 +70,7 @@ public class MessageStoreManager implements StoreService {
// storeId to store on each topic.
private final ConcurrentHashMap<String/* topic */,
ConcurrentHashMap<Integer/* storeId */, MessageStore>> dataStores =
- new ConcurrentHashMap<String, ConcurrentHashMap<Integer, MessageStore>>();
+ new ConcurrentHashMap<>();
// ssd store manager
private final MsgSSDStoreManager msgSsdStoreManager;
// store service status
@@ -204,13 +204,13 @@ public class MessageStoreManager implements StoreService {
}
try {
List<String> removedTopics =
- new ArrayList<String>();
+ new ArrayList<>();
Map<String, TopicMetadata> removedTopicMap =
this.metadataManager.getRemovedTopicConfigMap();
if (removedTopicMap.isEmpty()) {
return removedTopics;
}
- Set<String> targetTopics = new HashSet<String>();
+ Set<String> targetTopics = new HashSet<>();
for (Map.Entry<String, TopicMetadata> entry : removedTopicMap.entrySet()) {
if (entry.getKey() == null || entry.getValue() == null) {
continue;
@@ -310,7 +310,7 @@ public class MessageStoreManager implements StoreService {
ConcurrentHashMap<Integer, MessageStore> dataMap = dataStores.get(topic);
if (dataMap == null) {
ConcurrentHashMap<Integer, MessageStore> tmpTopicMap =
- new ConcurrentHashMap<Integer, MessageStore>();
+ new ConcurrentHashMap<>();
dataMap = this.dataStores.putIfAbsent(topic, tmpTopicMap);
if (dataMap == null) {
dataMap = tmpTopicMap;
@@ -454,7 +454,7 @@ public class MessageStoreManager implements StoreService {
private Set<File> getLogDirSet(final BrokerConfig tubeConfig) throws IOException {
TopicMetadata topicMetadata = null;
- final Set<String> paths = new HashSet<String>();
+ final Set<String> paths = new HashSet<>();
paths.add(tubeConfig.getPrimaryPath());
for (final String topic : metadataManager.getTopics()) {
topicMetadata = metadataManager.getTopicMetadata(topic);
@@ -463,7 +463,7 @@ public class MessageStoreManager implements StoreService {
paths.add(topicMetadata.getDataPath());
}
}
- final Set<File> fileSet = new HashSet<File>();
+ final Set<File> fileSet = new HashSet<>();
for (final String path : paths) {
final File dir = new File(path);
if (!dir.exists() && !dir.mkdirs()) {
@@ -497,7 +497,7 @@ public class MessageStoreManager implements StoreService {
final long start = System.currentTimeMillis();
final AtomicInteger errCnt = new AtomicInteger(0);
final AtomicInteger finishCnt = new AtomicInteger(0);
- List<Callable<MessageStore>> tasks = new ArrayList<Callable<MessageStore>>();
+ List<Callable<MessageStore>> tasks = new ArrayList<>();
for (final File dir : this.getLogDirSet(tubeConfig)) {
if (dir == null) {
continue;
@@ -542,7 +542,7 @@ public class MessageStoreManager implements StoreService {
ConcurrentHashMap<Integer, MessageStore> map =
dataStores.get(msgStore.getTopic());
if (map == null) {
- map = new ConcurrentHashMap<Integer, MessageStore>();
+ map = new ConcurrentHashMap<>();
ConcurrentHashMap<Integer, MessageStore> oldmap =
dataStores.putIfAbsent(msgStore.getTopic(), map);
if (oldmap != null) {
@@ -594,7 +594,7 @@ public class MessageStoreManager implements StoreService {
ExecutorService executor =
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() + 1);
CompletionService<MessageStore> completionService =
- new ExecutorCompletionService<MessageStore>(executor);
+ new ExecutorCompletionService<>(executor);
for (Callable<MessageStore> task : tasks) {
completionService.submit(task);
}
@@ -707,7 +707,7 @@ public class MessageStoreManager implements StoreService {
}
private Set<String> getExpiredTopicSet(final StringBuilder sb) {
- Set<String> expiredTopic = new HashSet<String>();
+ Set<String> expiredTopic = new HashSet<>();
for (Map<Integer, MessageStore> storeMap : dataStores.values()) {
if (storeMap == null || storeMap.isEmpty()) {
continue;
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/FileSegmentList.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/FileSegmentList.java
index 04f52af..6020641 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/FileSegmentList.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/FileSegmentList.java
@@ -30,7 +30,7 @@ public class FileSegmentList implements SegmentList {
LoggerFactory.getLogger(FileSegmentList.class);
// list of segments.
private AtomicReference<Segment[]> segmentList =
- new AtomicReference<Segment[]>();
+ new AtomicReference<>();
public FileSegmentList(final Segment[] s) {
super();
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/GetMessageResult.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/GetMessageResult.java
index 304ff33..84b6d6d 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/GetMessageResult.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/GetMessageResult.java
@@ -37,8 +37,8 @@ public class GetMessageResult {
public long waitTime = -1;
public boolean isSlowFreq = false;
public boolean isFromSsdFile = false;
- public HashMap<String, CountItem> tmpCounters = new HashMap<String, CountItem>();
- public List<TransferedMessage> transferedMessageList = new ArrayList<TransferedMessage>();
+ public HashMap<String, CountItem> tmpCounters = new HashMap<>();
+ public List<TransferedMessage> transferedMessageList = new ArrayList<>();
public GetMessageResult(boolean isSuccess, int retCode, final String errInfo,
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/MsgFileStore.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/MsgFileStore.java
index 4440806..9bbd492 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/MsgFileStore.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/MsgFileStore.java
@@ -256,7 +256,7 @@ public class MsgFileStore implements Closeable {
ByteBuffer dataBuffer =
ByteBuffer.allocate(TServerConstants.CFG_STORE_DEFAULT_MSG_READ_UNIT);
List<ClientBroker.TransferedMessage> transferedMessageList =
- new ArrayList<ClientBroker.TransferedMessage>();
+ new ArrayList<>();
// read data file by index.
for (curIndexOffset = 0; curIndexOffset < indexBuffer.remaining();
curIndexOffset += DataStoreUtils.STORE_INDEX_HEAD_LEN) {
@@ -513,7 +513,7 @@ public class MsgFileStore implements Closeable {
.append(segTypeStr).append(" segments ")
.append(segListDir.getAbsolutePath()).toString());
sBuilder.delete(0, sBuilder.length());
- final List<Segment> accum = new ArrayList<Segment>();
+ final List<Segment> accum = new ArrayList<>();
final File[] ls = segListDir.listFiles();
if (ls != null) {
for (final File file : ls) {
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/mem/MsgMemStore.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/mem/MsgMemStore.java
index 6ba9012..6f14f4f 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/mem/MsgMemStore.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/mem/MsgMemStore.java
@@ -51,10 +51,10 @@ public class MsgMemStore implements Closeable {
private final ReentrantLock writeLock = new ReentrantLock();
// partitionId to index position, accelerate query
private final ConcurrentHashMap<Integer, Integer> queuesMap =
- new ConcurrentHashMap<Integer, Integer>(20);
+ new ConcurrentHashMap<>(20);
// key to index position, used for filter consume
private final ConcurrentHashMap<Integer, Integer> keysMap =
- new ConcurrentHashMap<Integer, Integer>(100);
+ new ConcurrentHashMap<>(100);
// where messages in memory will sink to disk
private long writeDataStartPos = -1;
private ByteBuffer cacheDataSegment;
@@ -147,7 +147,7 @@ public class MsgMemStore implements Closeable {
Integer lastWritePos = 0;
boolean hasMsg = false;
// judge memory contains the given offset or not.
- List<ByteBuffer> cacheMsgList = new ArrayList<ByteBuffer>();
+ List<ByteBuffer> cacheMsgList = new ArrayList<>();
if (lastOffset < this.writeIndexStartPos) {
return new GetCacheMsgResult(false, TErrCodeConstants.MOVED,
lastOffset, "Request offset lower than cache minOffset");
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/ssd/MsgSSDSegment.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/ssd/MsgSSDSegment.java
index a485374..c3adb5c 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/ssd/MsgSSDSegment.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/ssd/MsgSSDSegment.java
@@ -46,7 +46,7 @@ public class MsgSSDSegment implements Closeable {
private static final Logger logger = LoggerFactory.getLogger(MsgSSDSegment.class);
private final Segment dataSegment;
private final ConcurrentHashMap<String, SSDVisitInfo> visitMap =
- new ConcurrentHashMap<String, SSDVisitInfo>();
+ new ConcurrentHashMap<>();
private final String topic;
private final String storeKey;
private AtomicLong lastReadTime = new AtomicLong(System.currentTimeMillis());
@@ -100,11 +100,11 @@ public class MsgSSDSegment implements Closeable {
long lastRdDataOffset = -1L;
final long curDataMaxOffset = getDataMaxOffset();
final long curDataMinOffset = getDataMinOffset();
- HashMap<String, CountItem> countMap = new HashMap<String, CountItem>();
+ HashMap<String, CountItem> countMap = new HashMap<>();
ByteBuffer dataBuffer =
ByteBuffer.allocate(TServerConstants.CFG_STORE_DEFAULT_MSG_READ_UNIT);
List<ClientBroker.TransferedMessage> transferedMessageList =
- new ArrayList<ClientBroker.TransferedMessage>();
+ new ArrayList<>();
if (this.dataSegment == null) {
logger.error(strBuffer.append("[SSD Store] Found SSD fileRecordView is null! storeKey=")
.append(this.storeKey).toString());
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/ssd/MsgSSDStoreManager.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/ssd/MsgSSDStoreManager.java
index 974f28e..0be8806 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/ssd/MsgSSDStoreManager.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/ssd/MsgSSDStoreManager.java
@@ -69,7 +69,7 @@ public class MsgSSDStoreManager implements Closeable {
// ssd data directory
private final File ssdBaseDataDir;
private final BlockingQueue<SSDSegEvent> reqSSDEvents
- = new ArrayBlockingQueue<SSDSegEvent>(60);
+ = new ArrayBlockingQueue<>(60);
private final ExecutorService statusCheckExecutor = Executors.newSingleThreadExecutor();
private final ExecutorService reqExecutor = Executors.newSingleThreadExecutor();
// total ssd files size
@@ -82,9 +82,9 @@ public class MsgSSDStoreManager implements Closeable {
private AtomicInteger firstChecked = new AtomicInteger(3);
// ssd segments
private ConcurrentHashMap<String, ConcurrentHashMap<Long, MsgSSDSegment>> ssdSegmentsMap =
- new ConcurrentHashMap<String, ConcurrentHashMap<Long, MsgSSDSegment>>(60);
+ new ConcurrentHashMap<>(60);
private ConcurrentHashMap<String, ConcurrentHashSet<SSDSegIndex>> ssdPartStrMap =
- new ConcurrentHashMap<String, ConcurrentHashSet<SSDSegIndex>>(60);
+ new ConcurrentHashMap<>(60);
private long startTime = System.currentTimeMillis();
private long lastCheckTime = System.currentTimeMillis();
@@ -269,7 +269,7 @@ public class MsgSSDStoreManager implements Closeable {
&& totalSsdFileSize.get() < (long) (tubeConfig.getMaxSSDTotalFileSizes() * 0.7))) {
return true;
}
- Set<SSDSegIndex> msgSsdIndexSet = new HashSet<SSDSegIndex>();
+ Set<SSDSegIndex> msgSsdIndexSet = new HashSet<>();
for (Map.Entry<String, ConcurrentHashMap<Long, MsgSSDSegment>> entry : ssdSegmentsMap.entrySet()) {
if (entry.getKey() == null || entry.getValue() == null) {
continue;
@@ -323,7 +323,7 @@ public class MsgSSDStoreManager implements Closeable {
ConcurrentHashMap<String, SSDVisitInfo> ssdVisitInfoMap =
msgSsdSegment.getVisitMap();
if (ssdVisitInfoMap != null) {
- List<String> rmvPartStrList = new ArrayList<String>();
+ List<String> rmvPartStrList = new ArrayList<>();
for (String partStr : ssdVisitInfoMap.keySet()) {
ConsumerNodeInfo consumerNodeInfo =
consumerNodeInfoMap.get(partStr);
@@ -494,7 +494,7 @@ public class MsgSSDStoreManager implements Closeable {
ssdSegmentsMap.get(storeKey);
if (msgSsdSegMap == null) {
ConcurrentHashMap<Long, MsgSSDSegment> tmpMsgSsdSegMap =
- new ConcurrentHashMap<Long, MsgSSDSegment>();
+ new ConcurrentHashMap<>();
msgSsdSegMap = ssdSegmentsMap.putIfAbsent(storeKey, tmpMsgSsdSegMap);
if (msgSsdSegMap == null) {
msgSsdSegMap = tmpMsgSsdSegMap;
@@ -575,7 +575,7 @@ public class MsgSSDStoreManager implements Closeable {
}
ConcurrentHashSet<SSDSegIndex> ssdSegIndices = this.ssdPartStrMap.get(partStr);
if (ssdSegIndices == null) {
- ConcurrentHashSet<SSDSegIndex> tmpSsdSegIndices = new ConcurrentHashSet<SSDSegIndex>();
+ ConcurrentHashSet<SSDSegIndex> tmpSsdSegIndices = new ConcurrentHashSet<>();
ssdSegIndices = this.ssdPartStrMap.putIfAbsent(partStr, tmpSsdSegIndices);
if (ssdSegIndices == null) {
ssdSegIndices = tmpSsdSegIndices;
@@ -601,7 +601,7 @@ public class MsgSSDStoreManager implements Closeable {
ssdSegmentsMap.get(ssdSegEvent.storeKey);
if (msgSegmentMap == null) {
ConcurrentHashMap<Long, MsgSSDSegment> tmpMsgSsdSegMap =
- new ConcurrentHashMap<Long, MsgSSDSegment>();
+ new ConcurrentHashMap<>();
msgSegmentMap =
ssdSegmentsMap.putIfAbsent(ssdSegEvent.storeKey, tmpMsgSsdSegMap);
if (msgSegmentMap == null) {
@@ -616,7 +616,7 @@ public class MsgSSDStoreManager implements Closeable {
ssdPartStrMap.get(ssdSegEvent.partStr);
if (ssdSegIndices == null) {
ConcurrentHashSet<SSDSegIndex> tmpSsdSegIndices =
- new ConcurrentHashSet<SSDSegIndex>();
+ new ConcurrentHashSet<>();
ssdSegIndices =
ssdPartStrMap.putIfAbsent(ssdSegEvent.partStr, tmpSsdSegIndices);
if (ssdSegIndices == null) {
@@ -673,7 +673,7 @@ public class MsgSSDStoreManager implements Closeable {
this.ssdSegmentsMap.get(storeKey);
if (msgSsdSegMap == null) {
ConcurrentHashMap<Long, MsgSSDSegment> tmpMsgSsdSegMap =
- new ConcurrentHashMap<Long, MsgSSDSegment>();
+ new ConcurrentHashMap<>();
msgSsdSegMap =
ssdSegmentsMap.putIfAbsent(storeKey, tmpMsgSsdSegMap);
if (msgSsdSegMap == null) {
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/nodeinfo/ConsumerNodeInfo.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/nodeinfo/ConsumerNodeInfo.java
index 786390c..a66f3af 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/nodeinfo/ConsumerNodeInfo.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/nodeinfo/ConsumerNodeInfo.java
@@ -44,9 +44,9 @@ public class ConsumerNodeInfo {
// is filter consumer or not
private boolean isFilterConsume = false;
// filter conditions in string format
- private Set<String> filterCondStrs = new HashSet<String>(10);
+ private Set<String> filterCondStrs = new HashSet<>(10);
// filter conditions in int format
- private Set<Integer> filterCondCode = new HashSet<Integer>(10);
+ private Set<Integer> filterCondCode = new HashSet<>(10);
// consumer's address
private String rmtAddrInfo;
private boolean isSupportLimit = false;
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/DefaultOffsetManager.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/DefaultOffsetManager.java
index b4228ed..82a758c 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/DefaultOffsetManager.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/DefaultOffsetManager.java
@@ -41,10 +41,10 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse
private final OffsetStorage zkOffsetStorage;
private final ConcurrentHashMap<String/* group */,
ConcurrentHashMap<String/* topic - partitionId*/, OffsetStorageInfo>> cfmOffsetMap =
- new ConcurrentHashMap<String, ConcurrentHashMap<String/* topic */, OffsetStorageInfo>>();
+ new ConcurrentHashMap<>();
private final ConcurrentHashMap<String/* group */,
ConcurrentHashMap<String/* topic - partitionId*/, Long>> tmpOffsetMap =
- new ConcurrentHashMap<String, ConcurrentHashMap<String, Long>>();
+ new ConcurrentHashMap<>();
public DefaultOffsetManager(final BrokerConfig brokerConfig) {
super("[Offset Manager]", brokerConfig.getZkConfig().getZkCommitPeriodMs());
@@ -334,7 +334,7 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse
long tmpOffset = origOffset - origOffset % DataStoreUtils.STORE_INDEX_HEAD_LEN;
ConcurrentHashMap<String, Long> partTmpOffsetMap = tmpOffsetMap.get(group);
if (partTmpOffsetMap == null) {
- ConcurrentHashMap<String, Long> tmpMap = new ConcurrentHashMap<String, Long>();
+ ConcurrentHashMap<String, Long> tmpMap = new ConcurrentHashMap<>();
partTmpOffsetMap = tmpOffsetMap.putIfAbsent(group, tmpMap);
if (partTmpOffsetMap == null) {
partTmpOffsetMap = tmpMap;
@@ -351,7 +351,7 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse
private long getAndResetTmpOffset(final String group, final String offsetCacheKey) {
ConcurrentHashMap<String, Long> partTmpOffsetMap = tmpOffsetMap.get(group);
if (partTmpOffsetMap == null) {
- ConcurrentHashMap<String, Long> tmpMap = new ConcurrentHashMap<String, Long>();
+ ConcurrentHashMap<String, Long> tmpMap = new ConcurrentHashMap<>();
partTmpOffsetMap = tmpOffsetMap.putIfAbsent(group, tmpMap);
if (partTmpOffsetMap == null) {
partTmpOffsetMap = tmpMap;
@@ -416,7 +416,7 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse
ConcurrentHashMap<String, OffsetStorageInfo> regInfoMap = cfmOffsetMap.get(group);
if (regInfoMap == null) {
ConcurrentHashMap<String, OffsetStorageInfo> tmpRegInfoMap
- = new ConcurrentHashMap<String, OffsetStorageInfo>();
+ = new ConcurrentHashMap<>();
regInfoMap = cfmOffsetMap.putIfAbsent(group, tmpRegInfoMap);
if (regInfoMap == null) {
regInfoMap = tmpRegInfoMap;
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/stats/GroupCountService.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/stats/GroupCountService.java
index 1f863d4..b0de32e 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/stats/GroupCountService.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/stats/GroupCountService.java
@@ -142,6 +142,6 @@ public class GroupCountService extends AbstractDaemonService implements CountSer
private class CountSet {
public AtomicLong refCnt = new AtomicLong(0);
public ConcurrentHashMap<String, CountItem> counterItem =
- new ConcurrentHashMap<String, CountItem>();
+ new ConcurrentHashMap<>();
}
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java
index cb2be0f..eb3d586 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java
@@ -282,7 +282,7 @@ public class BrokerAdminServlet extends HttpServlet {
*/
private StringBuilder adminGetMemStoreStatisInfo(HttpServletRequest req) throws Exception {
StringBuilder sBuilder = new StringBuilder(1024);
- Set<String> batchTopicNames = new HashSet<String>();
+ Set<String> batchTopicNames = new HashSet<>();
String inputTopicName = req.getParameter("topicName");
if (TStringUtils.isNotBlank(inputTopicName)) {
inputTopicName = String.valueOf(inputTopicName).trim();
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/aaaserver/SimpleCertificateBrokerHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/aaaserver/SimpleCertificateBrokerHandler.java
index 73f8708..d190500 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/aaaserver/SimpleCertificateBrokerHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/aaaserver/SimpleCertificateBrokerHandler.java
@@ -39,7 +39,7 @@ public class SimpleCertificateBrokerHandler implements CertificateBrokerHandler
private long inValidTokenCheckTimeMs = 120000; // 2 minutes
private final TubeBroker tubeBroker;
private final AtomicReference<List<Long>> visitTokenList =
- new AtomicReference<List<Long>>();
+ new AtomicReference<>();
private String lastUpdatedVisitTokens = "";
private boolean enableVisitTokenCheck = false;
private boolean enableProduceAuthenticate = false;
@@ -93,7 +93,7 @@ public class SimpleCertificateBrokerHandler implements CertificateBrokerHandler
if (currList.contains(curVisitToken)) {
break;
}
- List<Long> updateList = new ArrayList<Long>(currList);
+ List<Long> updateList = new ArrayList<>(currList);
while (updateList.size() >= MAX_VISIT_TOKEN_SIZE) {
updateList.remove(0);
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/heartbeat/HeartbeatManager.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/heartbeat/HeartbeatManager.java
index 44f734b..c000b31 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/heartbeat/HeartbeatManager.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/heartbeat/HeartbeatManager.java
@@ -34,11 +34,11 @@ public class HeartbeatManager {
private static final Logger logger = LoggerFactory.getLogger(HeartbeatManager.class);
private final ConcurrentHashMap<String, TimeoutInfo> brokerRegMap =
- new ConcurrentHashMap<String, TimeoutInfo>();
+ new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, TimeoutInfo> producerRegMap =
- new ConcurrentHashMap<String, TimeoutInfo>();
+ new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, TimeoutInfo> consumerRegMap =
- new ConcurrentHashMap<String, TimeoutInfo>();
+ new ConcurrentHashMap<>();
private final ExecutorService timeoutScanService = Executors.newCachedThreadPool();
private long brokerTimeoutDlt = 0;
private long producerTimeoutDlt = 0;
@@ -128,7 +128,7 @@ public class HeartbeatManager {
while (!isStopped) {
try {
long currentTime = System.currentTimeMillis();
- Set<String> removedNodeKey = new HashSet<String>();
+ Set<String> removedNodeKey = new HashSet<>();
for (Map.Entry<String, TimeoutInfo> entry : nodeMap.entrySet()) {
if (TStringUtils.isBlank(entry.getKey()) || entry.getValue() == null) {
continue;
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/zookeeper/RecoverableZooKeeper.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/zookeeper/RecoverableZooKeeper.java
index 13e7008..c9e27ea 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/zookeeper/RecoverableZooKeeper.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/zookeeper/RecoverableZooKeeper.java
@@ -84,7 +84,7 @@ public class RecoverableZooKeeper {
* @return list of every element that starts with one of the prefixes
*/
private static synchronized List<String> filterByPrefix(List<String> nodes, String... prefixes) {
- List<String> lockChildren = new ArrayList<String>();
+ List<String> lockChildren = new ArrayList<>();
for (String child : nodes) {
for (String prefix : prefixes) {
if (child.startsWith(prefix)) {
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/zookeeper/ZooKeeperWatcher.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/zookeeper/ZooKeeperWatcher.java
index 4d62e4c..996e2f6 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/zookeeper/ZooKeeperWatcher.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/zookeeper/ZooKeeperWatcher.java
@@ -63,7 +63,7 @@ public class ZooKeeperWatcher implements Watcher, Abortable {
private static final Logger logger = LoggerFactory.getLogger(ZooKeeperWatcher.class);
protected final ZKConfig conf;
// listeners to be notified
- private final List<ZooKeeperListener> listeners = new CopyOnWriteArrayList<ZooKeeperListener>();
+ private final List<ZooKeeperListener> listeners = new CopyOnWriteArrayList<>();
private final Exception constructorCaller;
// abortable in case of zk failure
private Abortable abortable;
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/paramcheck/PBParameterUtils.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/paramcheck/PBParameterUtils.java
index 9a4e4f8..cb93dff 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/paramcheck/PBParameterUtils.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/paramcheck/PBParameterUtils.java
@@ -126,7 +126,7 @@ public class PBParameterUtils {
final Set<String> reqTopicSet,
final String requiredParts,
final StringBuilder strBuffer) {
- Map<String, Long> requiredPartMap = new HashMap<String, Long>();
+ Map<String, Long> requiredPartMap = new HashMap<>();
ParamCheckResult retResult = new ParamCheckResult();
if (!isReqConsumeBand) {
retResult.setCheckData(requiredPartMap);
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/RowLock.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/RowLock.java
index 66c0817..874fd2b 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/RowLock.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/RowLock.java
@@ -34,9 +34,9 @@ public class RowLock {
private static final Logger logger = LoggerFactory.getLogger(RowLock.class);
private static Random rand = new Random();
private final ConcurrentHashMap<HashedBytes, CountDownLatch> lockedRows =
- new ConcurrentHashMap<HashedBytes, CountDownLatch>();
+ new ConcurrentHashMap<>();
private final ConcurrentHashMap<Integer, HashedBytes> lockIds =
- new ConcurrentHashMap<Integer, HashedBytes>();
+ new ConcurrentHashMap<>();
private final AtomicInteger lockIdGenerator = new AtomicInteger(1);
private final int rowLockWaitDuration;
private final String name;
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
index 9b9b60f..1791f43 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
@@ -512,7 +512,7 @@ public class WebParameterUtils {
boolean checkResToken,
Set<String> resTokens,
final StringBuilder sb) throws Exception {
- Set<String> batchOpGroupNames = new HashSet<String>();
+ Set<String> batchOpGroupNames = new HashSet<>();
if (TStringUtils.isNotBlank(inputGroupName)) {
inputGroupName = escDoubleQuotes(inputGroupName.trim());
}
@@ -566,7 +566,7 @@ public class WebParameterUtils {
boolean checkRange,
Set<String> checkedTopicList,
final StringBuilder sb) throws Exception {
- Set<String> batchOpTopicNames = new HashSet<String>();
+ Set<String> batchOpTopicNames = new HashSet<>();
if (TStringUtils.isNotBlank(inputTopicName)) {
inputTopicName = escDoubleQuotes(inputTopicName.trim());
}
@@ -615,7 +615,7 @@ public class WebParameterUtils {
public static Set<String> getBatchBrokerIpSet(String inStrBrokerIps,
boolean checkEmpty) throws Exception {
- Set<String> batchBrokerIps = new HashSet<String>();
+ Set<String> batchBrokerIps = new HashSet<>();
if (TStringUtils.isNotBlank(inStrBrokerIps)) {
inStrBrokerIps = escDoubleQuotes(inStrBrokerIps.trim());
}
@@ -648,7 +648,7 @@ public class WebParameterUtils {
public static Set<Integer> getBatchBrokerIdSet(String inStrBrokerIds,
boolean checkEmpty) throws Exception {
- Set<Integer> batchBrokerIdSet = new HashSet<Integer>();
+ Set<Integer> batchBrokerIdSet = new HashSet<>();
if (TStringUtils.isNotBlank(inStrBrokerIds)) {
inStrBrokerIds = escDoubleQuotes(inStrBrokerIds.trim());
}
@@ -684,7 +684,7 @@ public class WebParameterUtils {
BrokerConfManager webMaster,
boolean checkEmpty,
final StringBuilder sb) throws Exception {
- Set<BdbBrokerConfEntity> batchBrokerIdSet = new HashSet<BdbBrokerConfEntity>();
+ Set<BdbBrokerConfEntity> batchBrokerIdSet = new HashSet<>();
if (TStringUtils.isNotBlank(inStrBrokerIds)) {
inStrBrokerIds = escDoubleQuotes(inStrBrokerIds.trim());
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java
index 8a35d25..41056b9 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java
@@ -120,7 +120,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
private static final int MAX_BALANCE_DELAY_TIME = 10;
private final ConcurrentHashMap<String/* consumerId */, Map<String/* topic */, Map<String, Partition>>>
- currentSubInfo = new ConcurrentHashMap<String, Map<String, Map<String, Partition>>>();
+ currentSubInfo = new ConcurrentHashMap<>();
private final RpcServiceFactory rpcServiceFactory = //rpc service factory
new RpcServiceFactory();
private final ConsumerEventManager consumerEventManager; //consumer event manager
@@ -337,7 +337,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
}
heartbeatManager.regProducerNode(producerId);
producerHolder.setProducerInfo(producerId,
- new HashSet<String>(transTopicSet), hostName, overtls);
+ new HashSet<>(transTopicSet), hostName, overtls);
builder.setBrokerCheckSum(this.defaultBrokerConfManager.getBrokerInfoCheckSum());
builder.addAllBrokerInfos(this.defaultBrokerConfManager.getBrokersMap(overtls).values());
builder.setAuthorizedInfo(genAuthorizedInfo(certResult.authorizedToken, false).build());
@@ -618,7 +618,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
ConcurrentHashSet<String> groupSet =
topicPSInfoManager.getTopicSubInfo(topic);
if (groupSet == null) {
- groupSet = new ConcurrentHashSet<String>();
+ groupSet = new ConcurrentHashSet<>();
topicPSInfoManager.setTopicSubInfo(topic, groupSet);
}
if (!groupSet.contains(groupName)) {
@@ -627,7 +627,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
}
if (CollectionUtils.isNotEmpty(subscribeList)) {
Map<String, Map<String, Partition>> topicPartSubMap =
- new HashMap<String, Map<String, Partition>>();
+ new HashMap<>();
currentSubInfo.put(consumerId, topicPartSubMap);
for (SubscribeInfo info : subscribeList) {
Map<String, Partition> partMap = topicPartSubMap.get(info.getTopic());
@@ -765,7 +765,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
Map<String, Map<String, Partition>> topicPartSubList =
currentSubInfo.get(clientId);
if (topicPartSubList == null) {
- topicPartSubList = new HashMap<String, Map<String, Partition>>();
+ topicPartSubList = new HashMap<>();
Map<String, Map<String, Partition>> tmpTopicPartSubList =
currentSubInfo.putIfAbsent(clientId, topicPartSubList);
if (tmpTopicPartSubList != null) {
@@ -1375,7 +1375,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
* @return
*/
private Map<String, String> getProducerTopicPartitionInfo(String producerId) {
- Map<String, String> topicPartStrMap = new HashMap<String, String>();
+ Map<String, String> topicPartStrMap = new HashMap<>();
ProducerInfo producerInfo = producerHolder.getProducerInfo(producerId);
if (producerInfo == null) {
return topicPartStrMap;
@@ -1387,7 +1387,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
return topicPartStrMap;
}
Map<String, StringBuilder> topicPartStrBuilderMap =
- new HashMap<String, StringBuilder>();
+ new HashMap<>();
for (String topic : producerInfoTopicSet) {
if (topic == null) {
continue;
@@ -1441,7 +1441,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
Map<String/* topicName */, TopicInfo> newTopicInfoMap,
boolean requirePartUpdate, boolean requireAcceptPublish,
boolean requireAcceptSubscribe) {
- List<TopicInfo> needAddTopicList = new ArrayList<TopicInfo>();
+ List<TopicInfo> needAddTopicList = new ArrayList<>();
for (Map.Entry<String, TopicInfo> entry : newTopicInfoMap.entrySet()) {
TopicInfo newTopicInfo = entry.getValue();
TopicInfo oldTopicInfo = null;
@@ -1482,7 +1482,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
private void deleteTopics(BrokerInfo brokerInfo, final StringBuilder strBuffer,
Map<String/* topicName */, TopicInfo> curTopicInfoMap,
Map<String/* topicName */, TopicInfo> newTopicInfoMap) {
- List<TopicInfo> needRmvTopicList = new ArrayList<TopicInfo>();
+ List<TopicInfo> needRmvTopicList = new ArrayList<>();
if (curTopicInfoMap != null) {
for (Map.Entry<String, TopicInfo> entry : curTopicInfoMap.entrySet()) {
if (newTopicInfoMap.get(entry.getKey()) == null) {
@@ -1757,7 +1757,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
}
}
ConcurrentHashMap<String/* topic */, TopicInfo> newTopicInfoMap =
- new ConcurrentHashMap<String, TopicInfo>();
+ new ConcurrentHashMap<>();
// according to broker status and default config, topic config, make up current status record
for (String strTopicConfInfo : brokerTopicSetConfInfo) {
if (TStringUtils.isBlank(strTopicConfInfo)) {
@@ -1823,7 +1823,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
private void updateTopicsInternal(BrokerInfo broker,
List<TopicInfo> topicList,
EventType type) {
- List<TopicInfo> cloneTopicList = new ArrayList<TopicInfo>();
+ List<TopicInfo> cloneTopicList = new ArrayList<>();
for (TopicInfo topicInfo : topicList) {
cloneTopicList.add(topicInfo.clone());
}
@@ -1834,7 +1834,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
ConcurrentHashMap<BrokerInfo, TopicInfo> topicInfoMap =
topicPSInfoManager.getBrokerPubInfo(topicInfo.getTopic());
if (topicInfoMap == null) {
- topicInfoMap = new ConcurrentHashMap<BrokerInfo, TopicInfo>();
+ topicInfoMap = new ConcurrentHashMap<>();
topicPSInfoManager.setBrokerPubInfo(topicInfo.getTopic(), topicInfoMap);
}
if (EventType.CONNECT == type) {
@@ -1919,8 +1919,8 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
}
List<String> blackTopicList = this.defaultBrokerConfManager.getBdbBlackTopicList(tupleInfo.groupName);
Map<String, List<Partition>> topicSubPartMap = entry.getValue();
- List<SubscribeInfo> deletedSubInfoList = new ArrayList<SubscribeInfo>();
- List<SubscribeInfo> addedSubInfoList = new ArrayList<SubscribeInfo>();
+ List<SubscribeInfo> deletedSubInfoList = new ArrayList<>();
+ List<SubscribeInfo> addedSubInfoList = new ArrayList<>();
for (Map.Entry<String, List<Partition>> topicEntry : topicSubPartMap.entrySet()) {
String topic = topicEntry.getKey();
List<Partition> finalPartList = topicEntry.getValue();
@@ -2071,8 +2071,8 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
List<String> blackTopicList =
this.defaultBrokerConfManager.getBdbBlackTopicList(tupleInfo.groupName);
Map<String, Map<String, Partition>> topicSubPartMap = entry.getValue();
- List<SubscribeInfo> deletedSubInfoList = new ArrayList<SubscribeInfo>();
- List<SubscribeInfo> addedSubInfoList = new ArrayList<SubscribeInfo>();
+ List<SubscribeInfo> deletedSubInfoList = new ArrayList<>();
+ List<SubscribeInfo> addedSubInfoList = new ArrayList<>();
for (Map.Entry<String, Map<String, Partition>> topicEntry : topicSubPartMap.entrySet()) {
String topic = topicEntry.getKey();
Map<String, Partition> finalPartMap = topicEntry.getValue();
@@ -2208,8 +2208,8 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
* @return
*/
private List<String> getNeedToBalanceGroupList(final StringBuilder strBuffer) {
- List<String> groupsNeedToBalance = new ArrayList<String>();
- Set<String> groupHasUnfinishedEvent = new HashSet<String>();
+ List<String> groupsNeedToBalance = new ArrayList<>();
+ Set<String> groupHasUnfinishedEvent = new HashSet<>();
if (consumerEventManager.hasEvent()) {
Set<String> consumerIdSet =
consumerEventManager.getUnProcessedIdSet();
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/balance/DefaultLoadBalancer.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/balance/DefaultLoadBalancer.java
index 07c487e..860dd05 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/balance/DefaultLoadBalancer.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/balance/DefaultLoadBalancer.java
@@ -83,10 +83,10 @@ public class DefaultLoadBalancer implements LoadBalancer {
// load balance according to group
Map<String/* consumer */,
Map<String/* topic */, List<Partition>>> finalSubInfoMap =
- new HashMap<String, Map<String, List<Partition>>>();
- Map<String, RebProcessInfo> rejGroupClientINfoMap = new HashMap<String, RebProcessInfo>();
- Set<String> onlineOfflineGroupSet = new HashSet<String>();
- Set<String> bandGroupSet = new HashSet<String>();
+ new HashMap<>();
+ Map<String, RebProcessInfo> rejGroupClientINfoMap = new HashMap<>();
+ Set<String> onlineOfflineGroupSet = new HashSet<>();
+ Set<String> bandGroupSet = new HashSet<>();
for (String group : groupSet) {
if (group == null) {
continue;
@@ -109,7 +109,7 @@ public class DefaultLoadBalancer implements LoadBalancer {
bandGroupSet.add(group);
continue;
}
- List<ConsumerInfo> newConsumerList = new ArrayList<ConsumerInfo>();
+ List<ConsumerInfo> newConsumerList = new ArrayList<>();
for (ConsumerInfo consumerInfo : consumerList) {
if (consumerInfo != null) {
newConsumerList.add(consumerInfo);
@@ -159,11 +159,11 @@ public class DefaultLoadBalancer implements LoadBalancer {
rejGroupClientINfoMap.put(group, rebProcessInfo);
}
}
- List<ConsumerInfo> newConsumerList2 = new ArrayList<ConsumerInfo>();
+ List<ConsumerInfo> newConsumerList2 = new ArrayList<>();
Map<String, Partition> psMap = topicPSInfoManager.getPartitionMap(topicSet);
Map<String, NodeRebInfo> rebProcessInfoMap = consumerBandInfo.getRebalanceMap();
for (ConsumerInfo consumer : newConsumerList) {
- Map<String, List<Partition>> partitions = new HashMap<String, List<Partition>>();
+ Map<String, List<Partition>> partitions = new HashMap<>();
finalSubInfoMap.put(consumer.getConsumerId(), partitions);
Map<String, Map<String, Partition>> relation = clusterState.get(consumer.getConsumerId());
if (relation != null) {
@@ -183,7 +183,7 @@ public class DefaultLoadBalancer implements LoadBalancer {
}
newConsumerList2.add(consumer);
for (Entry<String, Map<String, Partition>> entry : relation.entrySet()) {
- List<Partition> ps = new ArrayList<Partition>();
+ List<Partition> ps = new ArrayList<>();
partitions.put(entry.getKey(), ps);
Map<String, Partition> partitionMap = entry.getValue();
if (partitionMap != null && !partitionMap.isEmpty()) {
@@ -210,7 +210,7 @@ public class DefaultLoadBalancer implements LoadBalancer {
if (onlineOfflineGroupSet.size() == 0) {
groupsNeedToBalance = groupSet;
} else {
- groupsNeedToBalance = new ArrayList<String>();
+ groupsNeedToBalance = new ArrayList<>();
for (String group : groupSet) {
if (group == null) {
continue;
@@ -255,7 +255,7 @@ public class DefaultLoadBalancer implements LoadBalancer {
continue;
}
// filter consumer which don't need to handle
- List<ConsumerInfo> consumerList = new ArrayList<ConsumerInfo>();
+ List<ConsumerInfo> consumerList = new ArrayList<>();
List<ConsumerInfo> consumerList1 = consumerBandInfo.getConsumerInfoList();
RebProcessInfo rebProcessInfo = rejGroupClientInfoMap.get(group);
if (rebProcessInfo != null) {
@@ -265,7 +265,7 @@ public class DefaultLoadBalancer implements LoadBalancer {
Map<String, List<Partition>> partitions2 =
clusterState.get(consumerInfo.getConsumerId());
if (partitions2 == null) {
- partitions2 = new HashMap<String, List<Partition>>();
+ partitions2 = new HashMap<>();
clusterState.put(consumerInfo.getConsumerId(), partitions2);
}
Map<String, Map<String, Partition>> relation =
@@ -297,7 +297,7 @@ public class DefaultLoadBalancer implements LoadBalancer {
Map<String, List<Partition>> partitions =
clusterState.get(consumer.getConsumerId());
if (partitions == null) {
- partitions = new HashMap<String, List<Partition>>();
+ partitions = new HashMap<>();
}
int load = 0;
for (List<Partition> entry : partitions.values()) {
@@ -365,7 +365,7 @@ public class DefaultLoadBalancer implements LoadBalancer {
String consumerId) {
Map<String, List<Partition>> partitions = clusterState.get(consumerId);
if (partitions == null) {
- partitions = new HashMap<String, List<Partition>>();
+ partitions = new HashMap<>();
clusterState.put(consumerId, partitions);
}
List<Partition> ps = partitions.get(partition.getTopic());
@@ -422,12 +422,12 @@ public class DefaultLoadBalancer implements LoadBalancer {
}
Map<String, List<Partition>> partitions = clusterState.get(consumer.getConsumerId());
if (partitions == null) {
- partitions = new HashMap<String, List<Partition>>();
+ partitions = new HashMap<>();
clusterState.put(consumer.getConsumerId(), partitions);
}
List<Partition> ps = partitions.get(partition.getTopic());
if (ps == null) {
- ps = new ArrayList<Partition>();
+ ps = new ArrayList<>();
partitions.put(partition.getTopic(), ps);
}
ps.add(partition);
@@ -447,7 +447,7 @@ public class DefaultLoadBalancer implements LoadBalancer {
if (partitions.isEmpty() || consumers.isEmpty()) {
return null;
}
- Map<String, List<Partition>> assignments = new TreeMap<String, List<Partition>>();
+ Map<String, List<Partition>> assignments = new TreeMap<>();
int numPartitions = partitions.size();
int numServers = consumers.size();
int max = (int) Math.ceil((float) numPartitions / numServers);
@@ -458,7 +458,7 @@ public class DefaultLoadBalancer implements LoadBalancer {
int partitionIdx = 0;
for (int j = 0; j < numServers; j++) {
String server = consumers.get((j + serverIdx) % numServers);
- List<Partition> serverPartitions = new ArrayList<Partition>(max);
+ List<Partition> serverPartitions = new ArrayList<>(max);
for (int i = partitionIdx; i < numPartitions; i += numServers) {
serverPartitions.add(partitions.get(i % numPartitions));
}
@@ -564,12 +564,12 @@ public class DefaultLoadBalancer implements LoadBalancer {
Map<String, List<Partition>> topicSubPartMap =
finalSubInfoMap.get(consumerId);
if (topicSubPartMap == null) {
- topicSubPartMap = new HashMap<String, List<Partition>>();
+ topicSubPartMap = new HashMap<>();
finalSubInfoMap.put(consumerId, topicSubPartMap);
}
List<Partition> partList = topicSubPartMap.get(topic);
if (partList == null) {
- partList = new ArrayList<Partition>();
+ partList = new ArrayList<>();
topicSubPartMap.put(topic, partList);
}
int startIndex = partsPerConsumer * i + Math.min(i, consumersWithExtraPart);
@@ -637,7 +637,7 @@ public class DefaultLoadBalancer implements LoadBalancer {
final StringBuilder strBuffer) {
// band consume reset offset
Map<String, Map<String, Map<String, Partition>>> finalSubInfoMap =
- new HashMap<String, Map<String, Map<String, Partition>>>();
+ new HashMap<>();
// according group
for (String group : groupSet) {
if (group == null) {
@@ -686,10 +686,10 @@ public class DefaultLoadBalancer implements LoadBalancer {
}
// actual reset offset
Map<String, Long> partsOffsetMap = consumerBandInfo.getPartOffsetMap();
- List<OffsetStorageInfo> offsetInfoList = new ArrayList<OffsetStorageInfo>();
+ List<OffsetStorageInfo> offsetInfoList = new ArrayList<>();
Set<Partition> partPubList =
topicPSInfoManager.getPartitions(consumerBandInfo.getTopicSet());
- Map<String, Partition> partitionMap = new HashMap<String, Partition>();
+ Map<String, Partition> partitionMap = new HashMap<>();
for (Partition partition : partPubList) {
partitionMap.put(partition.getPartitionKey(), partition);
}
@@ -706,7 +706,7 @@ public class DefaultLoadBalancer implements LoadBalancer {
Map<String, Map<String, Partition>> topicSubPartMap =
finalSubInfoMap.get(consumerId);
if (topicSubPartMap == null) {
- topicSubPartMap = new HashMap<String, Map<String, Partition>>();
+ topicSubPartMap = new HashMap<>();
finalSubInfoMap.put(consumerId, topicSubPartMap);
}
Map<String, Partition> partMap = topicSubPartMap.get(foundPart.getTopic());
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/DefaultBdbStoreService.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/DefaultBdbStoreService.java
index 6cdb2d7..1a57d8d 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/DefaultBdbStoreService.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/DefaultBdbStoreService.java
@@ -93,7 +93,7 @@ public class DefaultBdbStoreService implements BdbStoreService, Server {
// simple log print
private final BdbStoreSamplePrint bdbStoreSamplePrint =
new BdbStoreSamplePrint(logger);
- private Set<String> replicas4Transfer = new HashSet<String>();
+ private Set<String> replicas4Transfer = new HashSet<>();
private String masterNodeName;
private int connectNodeFailCount = 0;
private long masterStartTime = Long.MAX_VALUE;
@@ -107,12 +107,12 @@ public class DefaultBdbStoreService implements BdbStoreService, Server {
private EntityStore brokerConfStore;
private PrimaryIndex<Integer/* brokerId */, BdbBrokerConfEntity> brokerConfIndex;
private ConcurrentHashMap<Integer/* brokerId */, BdbBrokerConfEntity> brokerConfigMap =
- new ConcurrentHashMap<Integer, BdbBrokerConfEntity>();
+ new ConcurrentHashMap<>();
// topic config store
private EntityStore topicConfStore;
private PrimaryIndex<String/* recordKey */, BdbTopicConfEntity> topicConfIndex;
private ConcurrentHashMap<Integer/* brokerId */, ConcurrentHashMap<String/* topicName */, BdbTopicConfEntity>>
- brokerIdTopicEntityMap = new ConcurrentHashMap<Integer, ConcurrentHashMap<String, BdbTopicConfEntity>>();
+ brokerIdTopicEntityMap = new ConcurrentHashMap<>();
// consumer group store
private EntityStore consumerGroupStore;
private PrimaryIndex<String/* recordKey */, BdbConsumerGroupEntity> consumerGroupIndex;
@@ -120,7 +120,7 @@ public class DefaultBdbStoreService implements BdbStoreService, Server {
String/* topicName */,
ConcurrentHashMap<String /* consumerGroup */, BdbConsumerGroupEntity>>
consumerGroupTopicMap =
- new ConcurrentHashMap<String, ConcurrentHashMap<String, BdbConsumerGroupEntity>>();
+ new ConcurrentHashMap<>();
//consumer group black list store
private EntityStore blackGroupStore;
private PrimaryIndex<String/* recordKey */, BdbBlackGroupEntity> blackGroupIndex;
@@ -128,12 +128,12 @@ public class DefaultBdbStoreService implements BdbStoreService, Server {
String/* consumerGroup */,
ConcurrentHashMap<String /* topicName */, BdbBlackGroupEntity>>
blackGroupTopicMap =
- new ConcurrentHashMap<String, ConcurrentHashMap<String, BdbBlackGroupEntity>>();
+ new ConcurrentHashMap<>();
// topic auth config store
private EntityStore topicAuthControlStore;
private PrimaryIndex<String/* recordKey */, BdbTopicAuthControlEntity> topicAuthControlIndex;
private ConcurrentHashMap<String/* topicName */, BdbTopicAuthControlEntity> topicAuthControlMap =
- new ConcurrentHashMap<String, BdbTopicAuthControlEntity>();
+ new ConcurrentHashMap<>();
// consumer group filter condition store
private EntityStore groupFilterCondStore;
private PrimaryIndex<String/* recordKey */, BdbGroupFilterCondEntity> groupFilterCondIndex;
@@ -141,24 +141,24 @@ public class DefaultBdbStoreService implements BdbStoreService, Server {
String/* topicName */,
ConcurrentHashMap<String /* consumerGroup */, BdbGroupFilterCondEntity>>
groupFilterCondMap =
- new ConcurrentHashMap<String, ConcurrentHashMap<String, BdbGroupFilterCondEntity>>();
+ new ConcurrentHashMap<>();
// consumer group flow control store
private EntityStore groupFlowCtrlStore;
private PrimaryIndex<String/* groupName */, BdbGroupFlowCtrlEntity> groupFlowCtrlIndex;
private ConcurrentHashMap<String/* groupName */, BdbGroupFlowCtrlEntity> groupFlowCtrlMap =
- new ConcurrentHashMap<String, BdbGroupFlowCtrlEntity>();
+ new ConcurrentHashMap<>();
// consumer group setting store
private EntityStore consumeGroupSettingStore;
private PrimaryIndex<String/* recordKey */, BdbConsumeGroupSettingEntity> consumeGroupSettingIndex;
private ConcurrentHashMap<String/* consumeGroup */, BdbConsumeGroupSettingEntity> consumeGroupSettingMap =
- new ConcurrentHashMap<String, BdbConsumeGroupSettingEntity>();
+ new ConcurrentHashMap<>();
// service status
private AtomicBoolean isStarted = new AtomicBoolean(false);
// master role flag
private volatile boolean isMaster;
// master node list
private ConcurrentHashMap<String/* nodeName */, MasterNodeInfo> masterNodeInfoMap =
- new ConcurrentHashMap<String, MasterNodeInfo>();
+ new ConcurrentHashMap<>();
private String nodeHost;
private BDBConfig bdbConfig;
private Listener listener = new Listener();
@@ -168,7 +168,7 @@ public class DefaultBdbStoreService implements BdbStoreService, Server {
this.tMaster = tMaster;
this.nodeHost = masterConfig.getHostName();
this.bdbConfig = masterConfig.getBdbConfig();
- Set<InetSocketAddress> helpers = new HashSet<InetSocketAddress>();
+ Set<InetSocketAddress> helpers = new HashSet<>();
for (int i = 1; i <= 3; i++) {
InetSocketAddress helper = new InetSocketAddress(this.nodeHost, bdbConfig.getBdbNodePort() + i);
helpers.add(helper);
@@ -251,7 +251,7 @@ public class DefaultBdbStoreService implements BdbStoreService, Server {
clusterGroupVO.setPrimaryNodeActive(isPrimaryNodeActive());
int count = 0;
boolean hasMaster = false;
- List<ClusterNodeVO> clusterNodeVOList = new ArrayList<ClusterNodeVO>();
+ List<ClusterNodeVO> clusterNodeVOList = new ArrayList<>();
for (ReplicationNode node : replicationGroup.getNodes()) {
ClusterNodeVO clusterNodeVO = new ClusterNodeVO();
clusterNodeVO.setHostName(node.getHostName());
@@ -838,7 +838,7 @@ public class DefaultBdbStoreService implements BdbStoreService, Server {
}
int activeNodes = 0;
boolean isMasterActive = false;
- Set<String> tmp = new HashSet<String>();
+ Set<String> tmp = new HashSet<>();
for (ReplicationNode node : replicationGroup.getNodes()) {
MasterNodeInfo masterNodeInfo =
new MasterNodeInfo(replicationGroup.getName(),
@@ -1178,7 +1178,7 @@ public class DefaultBdbStoreService implements BdbStoreService, Server {
consumerGroupTopicMap.get(topicName);
if (consumerGroupMap == null) {
consumerGroupMap =
- new ConcurrentHashMap<String, BdbConsumerGroupEntity>();
+ new ConcurrentHashMap<>();
consumerGroupTopicMap.put(topicName, consumerGroupMap);
}
consumerGroupMap.put(consumerGroupName, bdbEntity);
@@ -1221,7 +1221,7 @@ public class DefaultBdbStoreService implements BdbStoreService, Server {
groupFilterCondMap.get(topicName);
if (filterCondMap == null) {
filterCondMap =
- new ConcurrentHashMap<String, BdbGroupFilterCondEntity>();
+ new ConcurrentHashMap<>();
groupFilterCondMap.put(topicName, filterCondMap);
}
filterCondMap.put(consumerGroupName, bdbEntity);
@@ -1299,7 +1299,7 @@ public class DefaultBdbStoreService implements BdbStoreService, Server {
blackGroupTopicMap.get(consumerGroupName);
if (blackGroupMap == null) {
blackGroupMap =
- new ConcurrentHashMap<String, BdbBlackGroupEntity>();
+ new ConcurrentHashMap<>();
blackGroupTopicMap.put(consumerGroupName, blackGroupMap);
}
blackGroupMap.put(topicName, bdbEntity);
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerConfManager.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerConfManager.java
index 4503e96..e244feb 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerConfManager.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerConfManager.java
@@ -63,43 +63,33 @@ public class BrokerConfManager implements Server {
private final BDBConfig bdbConfig;
private final ScheduledExecutorService scheduledExecutorService;
private final ConcurrentHashMap<Integer, String> brokersMap =
- new ConcurrentHashMap<Integer, String>();
+ new ConcurrentHashMap<>();
private final ConcurrentHashMap<Integer, String> brokersTLSMap =
- new ConcurrentHashMap<Integer, String>();
+ new ConcurrentHashMap<>();
private final MasterGroupStatus masterGroupStatus = new MasterGroupStatus();
- private ConcurrentHashMap<Integer/* brokerId */, BdbBrokerConfEntity> brokerConfStoreMap =
- new ConcurrentHashMap<Integer, BdbBrokerConfEntity>();
+ private ConcurrentHashMap<Integer/* brokerId */, BdbBrokerConfEntity> brokerConfStoreMap;
private ConcurrentHashMap<Integer/* brokerId */, BrokerSyncStatusInfo> brokerRunSyncManageMap =
- new ConcurrentHashMap<Integer, BrokerSyncStatusInfo>();
+ new ConcurrentHashMap<>();
private ConcurrentHashMap<Integer/* brokerId */, ConcurrentHashMap<String/* topicName */, BdbTopicConfEntity>>
- brokerTopicEntityStoreMap = new ConcurrentHashMap<Integer, ConcurrentHashMap<String, BdbTopicConfEntity>>();
+ brokerTopicEntityStoreMap;
private ConcurrentHashMap<Integer/* brokerId */, ConcurrentHashMap<String/* topicName */, TopicInfo>>
- brokerRunTopicInfoStoreMap = new ConcurrentHashMap<Integer, ConcurrentHashMap<String, TopicInfo>>();
+ brokerRunTopicInfoStoreMap = new ConcurrentHashMap<>();
private volatile boolean isStarted = false;
private volatile boolean isStopped = false;
private DefaultBdbStoreService mBdbStoreManagerService;
- private ConcurrentHashMap<String, BdbTopicAuthControlEntity> topicAuthControlEnableMap =
- new ConcurrentHashMap<String, BdbTopicAuthControlEntity>();
+ private ConcurrentHashMap<String, BdbTopicAuthControlEntity> topicAuthControlEnableMap;
private ConcurrentHashMap<
String /* topicName */,
- ConcurrentHashMap<String /* consumerGroup */, BdbConsumerGroupEntity>>
- consumerGroupTopicMap =
- new ConcurrentHashMap<String, ConcurrentHashMap<String, BdbConsumerGroupEntity>>();
+ ConcurrentHashMap<String /* consumerGroup */, BdbConsumerGroupEntity>> consumerGroupTopicMap;
private ConcurrentHashMap<
String /* consumerGroup */,
- ConcurrentHashMap<String /* topicName */, BdbBlackGroupEntity>>
- blackGroupTopicMap =
- new ConcurrentHashMap<String, ConcurrentHashMap<String, BdbBlackGroupEntity>>();
+ ConcurrentHashMap<String /* topicName */, BdbBlackGroupEntity>> blackGroupTopicMap;
private ConcurrentHashMap<
String /* topicName */,
- ConcurrentHashMap<String /* consumerGroup */, BdbGroupFilterCondEntity>>
- groupFilterCondTopicMap =
- new ConcurrentHashMap<String, ConcurrentHashMap<String, BdbGroupFilterCondEntity>>();
- private ConcurrentHashMap<String /* groupName */, BdbGroupFlowCtrlEntity> consumeGroupFlowCtrlMap =
- new ConcurrentHashMap<String, BdbGroupFlowCtrlEntity>();
- private ConcurrentHashMap<String /* consumeGroup */, BdbConsumeGroupSettingEntity> consumeGroupSettingMap =
- new ConcurrentHashMap<String, BdbConsumeGroupSettingEntity>();
+ ConcurrentHashMap<String /* consumerGroup */, BdbGroupFilterCondEntity>> groupFilterCondTopicMap;
+ private ConcurrentHashMap<String /* groupName */, BdbGroupFlowCtrlEntity> consumeGroupFlowCtrlMap;
+ private ConcurrentHashMap<String /* consumeGroup */, BdbConsumeGroupSettingEntity> consumeGroupSettingMap;
private AtomicLong brokerInfoCheckSum = new AtomicLong(System.currentTimeMillis());
private long lastBrokerUpdatedTime = System.currentTimeMillis();
private long serviceStartTime = System.currentTimeMillis();
@@ -280,7 +270,7 @@ public class BrokerConfManager implements Server {
if ((reqTopicConditions != null) && (!reqTopicConditions.isEmpty())) {
// check if request topic set is all in the filter topic set
Set<String> condTopics = reqTopicConditions.keySet();
- List<String> unSetTopic = new ArrayList<String>();
+ List<String> unSetTopic = new ArrayList<>();
for (String topic : condTopics) {
if (!reqTopicSet.contains(topic)) {
unSetTopic.add(topic);
@@ -296,7 +286,7 @@ public class BrokerConfManager implements Server {
}
}
// check if consumer group is in the blacklist
- List<String> fbdTopicList = new ArrayList<String>();
+ List<String> fbdTopicList = new ArrayList<>();
ConcurrentHashMap<String, BdbBlackGroupEntity> blackGroupEntityMap =
this.blackGroupTopicMap.get(consumerGroupName);
if (blackGroupEntityMap != null) {
@@ -313,8 +303,8 @@ public class BrokerConfManager implements Server {
.append(fbdTopicList).toString());
}
// check if topic enabled authorization
- ArrayList<String> enableAuthTopicList = new ArrayList<String>();
- ArrayList<String> unAuthTopicList = new ArrayList<String>();
+ ArrayList<String> enableAuthTopicList = new ArrayList<>();
+ ArrayList<String> unAuthTopicList = new ArrayList<>();
for (String topicItem : reqTopicSet) {
if (TStringUtils.isBlank(topicItem)) {
continue;
@@ -408,14 +398,14 @@ public class BrokerConfManager implements Server {
break;
} else {
Map<String, List<String>> unAuthorizedCondMap =
- new HashMap<String, List<String>>();
+ new HashMap<>();
for (String item : condItemSet) {
if (!allowedConds.contains(sb.append(TokenConstants.ARRAY_SEP)
.append(item).append(TokenConstants.ARRAY_SEP).toString())) {
isAllowed = false;
List<String> unAuthConds = unAuthorizedCondMap.get(tmpTopic);
if (unAuthConds == null) {
- unAuthConds = new ArrayList<String>();
+ unAuthConds = new ArrayList<>();
unAuthorizedCondMap.put(tmpTopic, unAuthConds);
}
unAuthConds.add(item);
@@ -483,7 +473,7 @@ public class BrokerConfManager implements Server {
// #lizard forgives
// find broker info
validMasterStatus();
- List<BdbBrokerConfEntity> bdbBrokerEntities = new ArrayList<BdbBrokerConfEntity>();
+ List<BdbBrokerConfEntity> bdbBrokerEntities = new ArrayList<>();
for (BdbBrokerConfEntity entity : brokerConfStoreMap.values()) {
if (bdbQueryEntity == null) {
bdbBrokerEntities.add(entity);
@@ -730,7 +720,7 @@ public class BrokerConfManager implements Server {
}
public Set<String> getTotalConfiguredTopicNames() {
- Set<String> totalTopics = new HashSet<String>(50);
+ Set<String> totalTopics = new HashSet<>(50);
for (ConcurrentHashMap<String, BdbTopicConfEntity> tmpTopicCfgMap
: this.brokerTopicEntityStoreMap.values()) {
if (tmpTopicCfgMap != null) {
@@ -898,7 +888,7 @@ public class BrokerConfManager implements Server {
BdbTopicConfEntity bdbQueryEntity) {
// #lizard forgives
ConcurrentHashMap<String, List<BdbTopicConfEntity>> bdbTopicEntityMap =
- new ConcurrentHashMap<String, List<BdbTopicConfEntity>>();
+ new ConcurrentHashMap<>();
for (ConcurrentHashMap<String, BdbTopicConfEntity> topicEntityMap
: brokerTopicEntityStoreMap.values()) {
for (BdbTopicConfEntity entity : topicEntityMap.values()) {
@@ -906,7 +896,7 @@ public class BrokerConfManager implements Server {
bdbTopicEntityMap.get(entity.getTopicName());
if (bdbQueryEntity == null) {
if (bdbTopicEntities == null) {
- bdbTopicEntities = new ArrayList<BdbTopicConfEntity>();
+ bdbTopicEntities = new ArrayList<>();
bdbTopicEntityMap.put(entity.getTopicName(), bdbTopicEntities);
}
bdbTopicEntities.add(entity);
@@ -946,7 +936,7 @@ public class BrokerConfManager implements Server {
continue;
}
if (bdbTopicEntities == null) {
- bdbTopicEntities = new ArrayList<BdbTopicConfEntity>();
+ bdbTopicEntities = new ArrayList<>();
bdbTopicEntityMap.put(entity.getTopicName(), bdbTopicEntities);
}
bdbTopicEntities.add(entity);
@@ -988,7 +978,7 @@ public class BrokerConfManager implements Server {
if (putResult) {
if (brokerTopicConfMap == null) {
brokerTopicConfMap =
- new ConcurrentHashMap<String, BdbTopicConfEntity>();
+ new ConcurrentHashMap<>();
ConcurrentHashMap<String, BdbTopicConfEntity> tmpBrokerTopicConfMap =
brokerTopicEntityStoreMap.putIfAbsent(bdbEntity.getBrokerId(), brokerTopicConfMap);
if (tmpBrokerTopicConfMap != null) {
@@ -1052,7 +1042,7 @@ public class BrokerConfManager implements Server {
private List<String> innGetTopicStrConfigInfo(BdbBrokerConfEntity brokerConfEntity,
boolean isRemoved) {
- List<String> brokerTopicStrConfSet = new ArrayList<String>();
+ List<String> brokerTopicStrConfSet = new ArrayList<>();
ConcurrentHashMap<String, BdbTopicConfEntity> topicBdbEntytyMap =
brokerTopicEntityStoreMap.get(brokerConfEntity.getBrokerId());
if (topicBdbEntytyMap != null) {
@@ -1208,7 +1198,7 @@ public class BrokerConfManager implements Server {
BdbTopicAuthControlEntity bdbQueryEntity) throws Exception {
validMasterStatus();
List<BdbTopicAuthControlEntity> bdbTopicAuthControlEntities =
- new ArrayList<BdbTopicAuthControlEntity>();
+ new ArrayList<>();
for (BdbTopicAuthControlEntity entity : this.topicAuthControlEnableMap.values()) {
if (bdbQueryEntity == null) {
bdbTopicAuthControlEntities.add(entity);
@@ -1258,7 +1248,7 @@ public class BrokerConfManager implements Server {
if (putResult) {
if (groupFilterCondEntityMap == null) {
groupFilterCondEntityMap =
- new ConcurrentHashMap<String, BdbGroupFilterCondEntity>();
+ new ConcurrentHashMap<>();
ConcurrentHashMap<String, BdbGroupFilterCondEntity> tmpGroupFilterCondEntityMap =
groupFilterCondTopicMap.putIfAbsent(bdbEntity.getTopicName(), groupFilterCondEntityMap);
if (tmpGroupFilterCondEntityMap != null) {
@@ -1317,7 +1307,7 @@ public class BrokerConfManager implements Server {
*/
public List<BdbGroupFilterCondEntity> getBdbAllowedGroupFilterConds(String topicName) {
List<BdbGroupFilterCondEntity> bdbGroupFilterCondEntities =
- new ArrayList<BdbGroupFilterCondEntity>();
+ new ArrayList<>();
ConcurrentHashMap<String, BdbGroupFilterCondEntity> groupFilterCondMap =
groupFilterCondTopicMap.get(topicName);
if (groupFilterCondMap != null) {
@@ -1355,7 +1345,7 @@ public class BrokerConfManager implements Server {
BdbGroupFilterCondEntity bdbQueryEntity) throws Exception {
validMasterStatus();
List<BdbGroupFilterCondEntity> bdbGroupFilterCondEntities =
- new ArrayList<BdbGroupFilterCondEntity>();
+ new ArrayList<>();
for (ConcurrentHashMap<String, BdbGroupFilterCondEntity> groupFilterCondEntityMap
: groupFilterCondTopicMap.values()) {
for (BdbGroupFilterCondEntity entity : groupFilterCondEntityMap.values()) {
@@ -1454,7 +1444,7 @@ public class BrokerConfManager implements Server {
mBdbStoreManagerService.putBdbConsumerGroupConfEntity(bdbEntity, true);
if (putResult) {
if (consumerGroupEntityMap == null) {
- consumerGroupEntityMap = new ConcurrentHashMap<String, BdbConsumerGroupEntity>();
+ consumerGroupEntityMap = new ConcurrentHashMap<>();
ConcurrentHashMap<String, BdbConsumerGroupEntity> tmpConsumerGroupEntityMap =
consumerGroupTopicMap.putIfAbsent(bdbEntity.getGroupTopicName(), consumerGroupEntityMap);
if (tmpConsumerGroupEntityMap != null) {
@@ -1477,7 +1467,7 @@ public class BrokerConfManager implements Server {
*/
public List<BdbConsumerGroupEntity> getBdbAllowedConsumerGroups(String topicName) {
List<BdbConsumerGroupEntity> bdbConsumerGroupEntities =
- new ArrayList<BdbConsumerGroupEntity>();
+ new ArrayList<>();
ConcurrentHashMap<String, BdbConsumerGroupEntity> consumerGroupMap =
consumerGroupTopicMap.get(topicName);
if (consumerGroupMap != null) {
@@ -1490,7 +1480,7 @@ public class BrokerConfManager implements Server {
BdbConsumerGroupEntity bdbQueryEntity) throws Exception {
validMasterStatus();
List<BdbConsumerGroupEntity> bdbConsumerGroupEntities =
- new ArrayList<BdbConsumerGroupEntity>();
+ new ArrayList<>();
for (ConcurrentHashMap<String, BdbConsumerGroupEntity> consumerGroupEntityMap
: consumerGroupTopicMap.values()) {
for (BdbConsumerGroupEntity entity : consumerGroupEntityMap.values()) {
@@ -1585,7 +1575,7 @@ public class BrokerConfManager implements Server {
if (putResult) {
if (blackGroupEntityMap == null) {
blackGroupEntityMap =
- new ConcurrentHashMap<String, BdbBlackGroupEntity>();
+ new ConcurrentHashMap<>();
blackGroupTopicMap.put(bdbEntity.getBlackGroupName(), blackGroupEntityMap);
}
blackGroupEntityMap.put(bdbEntity.getTopicName(), bdbEntity);
@@ -1606,7 +1596,7 @@ public class BrokerConfManager implements Server {
public List<BdbBlackGroupEntity> confGetBdbBlackConsumerGroupSet(
BdbBlackGroupEntity bdbQueryEntity) throws Exception {
validMasterStatus();
- List<BdbBlackGroupEntity> bdbBlackGroupEntities = new ArrayList<BdbBlackGroupEntity>();
+ List<BdbBlackGroupEntity> bdbBlackGroupEntities = new ArrayList<>();
for (ConcurrentHashMap<String, BdbBlackGroupEntity> blackGroupEntityMap
: blackGroupTopicMap.values()) {
for (BdbBlackGroupEntity entity : blackGroupEntityMap.values()) {
@@ -1679,7 +1669,7 @@ public class BrokerConfManager implements Server {
}
public List<String> getBdbBlackTopicList(String consumerGroupName) {
- ArrayList<String> blackTopicList = new ArrayList<String>();
+ ArrayList<String> blackTopicList = new ArrayList<>();
ConcurrentHashMap<String/* topicname */, BdbBlackGroupEntity> blackGroupEntityMap =
this.blackGroupTopicMap.get(consumerGroupName);
if (blackGroupEntityMap != null) {
@@ -1786,7 +1776,7 @@ public class BrokerConfManager implements Server {
BdbConsumeGroupSettingEntity bdbQueryEntity) throws Exception {
validMasterStatus();
List<BdbConsumeGroupSettingEntity> bdbOffsetRstGroupEntities =
- new ArrayList<BdbConsumeGroupSettingEntity>();
+ new ArrayList<>();
for (BdbConsumeGroupSettingEntity entity
: consumeGroupSettingMap.values()) {
if (entity == null) {
@@ -1897,7 +1887,7 @@ public class BrokerConfManager implements Server {
BdbGroupFlowCtrlEntity bdbQueryEntity) throws Exception {
validMasterStatus();
List<BdbGroupFlowCtrlEntity> bdbGroupFlowCtrlEntities =
- new ArrayList<BdbGroupFlowCtrlEntity>();
+ new ArrayList<>();
for (BdbGroupFlowCtrlEntity ctrlEntity : consumeGroupFlowCtrlMap.values()) {
if (ctrlEntity == null) {
continue;
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerInfoHolder.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerInfoHolder.java
index 2177260..c644c5d 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerInfoHolder.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerInfoHolder.java
@@ -42,11 +42,11 @@ public class BrokerInfoHolder {
private static final Logger logger =
LoggerFactory.getLogger(BrokerInfoHolder.class);
private final ConcurrentHashMap<Integer/* brokerId */, BrokerInfo> brokerInfoMap =
- new ConcurrentHashMap<Integer, BrokerInfo>();
+ new ConcurrentHashMap<>();
private final ConcurrentHashMap<Integer/* brokerId */, BrokerAbnInfo> brokerAbnormalMap =
- new ConcurrentHashMap<Integer, BrokerAbnInfo>();
+ new ConcurrentHashMap<>();
private final ConcurrentHashMap<Integer/* brokerId */, BrokerFbdInfo> brokerForbiddenMap =
- new ConcurrentHashMap<Integer, BrokerFbdInfo>();
+ new ConcurrentHashMap<>();
private final int maxAutoForbiddenCnt;
private final BrokerConfManager brokerConfManager;
private AtomicInteger brokerTotalCount = new AtomicInteger(0);
@@ -183,7 +183,7 @@ public class BrokerInfoHolder {
}
public Map<Integer, BrokerInfo> getBrokerInfos(Collection<Integer> brokerIds) {
- HashMap<Integer, BrokerInfo> brokerMap = new HashMap<Integer, BrokerInfo>();
+ HashMap<Integer, BrokerInfo> brokerMap = new HashMap<>();
for (Integer brokerId : brokerIds) {
brokerMap.put(brokerId, brokerInfoMap.get(brokerId));
}
@@ -307,7 +307,7 @@ public class BrokerInfoHolder {
if (brokerIdSet == null || brokerIdSet.isEmpty()) {
return;
}
- List<BrokerFbdInfo> brokerFbdInfos = new ArrayList<BrokerFbdInfo>();
+ List<BrokerFbdInfo> brokerFbdInfos = new ArrayList<>();
for (Integer brokerId : brokerIdSet) {
BrokerFbdInfo fbdInfo = this.brokerForbiddenMap.remove(brokerId);
if (fbdInfo != null) {
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerSyncStatusInfo.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerSyncStatusInfo.java
index 461a5f4..cfe8e0c 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerSyncStatusInfo.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerSyncStatusInfo.java
@@ -63,20 +63,20 @@ public class BrokerSyncStatusInfo {
private long lastDataPushInMills = 0;
private String lastPushBrokerDefaultConfInfo;
private List<String> lastPushBrokerTopicSetConfInfo =
- new ArrayList<String>();
+ new ArrayList<>();
private long reportedBrokerConfId = -2;
private int reportedBrokerCheckSumId = -2;
private long lastDataReportInMills = 0;
private String reportedBrokerDefaultConfInfo;
private List<String> reportedBrokerTopicSetConfInfo =
- new ArrayList<String>();
+ new ArrayList<>();
private AtomicLong currBrokerConfId = new AtomicLong(0);
private int currBrokerCheckSumId = 0;
private String curBrokerDefaultConfInfo;
private List<String> curBrokerTopicSetConfInfo =
- new ArrayList<String>();
+ new ArrayList<>();
private int numPartitions = 1; //partition number
private int numTopicStores = 1; //store number
@@ -165,7 +165,7 @@ public class BrokerSyncStatusInfo {
this.reportedBrokerCheckSumId = -2;
this.reportedBrokerDefaultConfInfo = "";
this.reportedBrokerTopicSetConfInfo =
- new ArrayList<String>();
+ new ArrayList<>();
this.isBrokerRegister = false;
this.isBrokerOnline = false;
this.isFastStart = false;
@@ -196,7 +196,7 @@ public class BrokerSyncStatusInfo {
if (isTackData) {
this.reportedBrokerDefaultConfInfo = reportDefaultConfInfo;
if (reportTopicSetConfInfo == null) {
- this.reportedBrokerTopicSetConfInfo = new ArrayList<String>();
+ this.reportedBrokerTopicSetConfInfo = new ArrayList<>();
} else {
this.reportedBrokerTopicSetConfInfo = reportTopicSetConfInfo;
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/TopicPSInfoManager.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/TopicPSInfoManager.java
index 02cf875..2b0ffc9 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/TopicPSInfoManager.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/TopicPSInfoManager.java
@@ -37,13 +37,13 @@ public class TopicPSInfoManager {
private final ConcurrentHashMap<String/* topic */,
ConcurrentHashMap<BrokerInfo, TopicInfo>> brokerPubInfoMap =
- new ConcurrentHashMap<String/* topic */, ConcurrentHashMap<BrokerInfo, TopicInfo>>();
+ new ConcurrentHashMap<>();
private final ConcurrentHashMap<String/* topic */,
ConcurrentHashSet<String/* producerId */>> topicPubInfoMap =
- new ConcurrentHashMap<String, ConcurrentHashSet<String>>();
+ new ConcurrentHashMap<>();
private final ConcurrentHashMap<String/* topic */,
ConcurrentHashSet<String/* group */>> topicSubInfoMap =
- new ConcurrentHashMap<String, ConcurrentHashSet<String>>();
+ new ConcurrentHashMap<>();
/**
* Get groups according to topic
@@ -111,7 +111,7 @@ public class TopicPSInfoManager {
topicPubInfoMap.get(topic);
if (producerIdSet == null) {
ConcurrentHashSet<String> tmpProducerIdSet =
- new ConcurrentHashSet<String>();
+ new ConcurrentHashSet<>();
producerIdSet =
topicPubInfoMap.putIfAbsent(topic, tmpProducerIdSet);
if (producerIdSet == null) {
@@ -172,7 +172,7 @@ public class TopicPSInfoManager {
}
public Set<Partition> getPartitions() {
- Set<Partition> partitions = new HashSet<Partition>();
+ Set<Partition> partitions = new HashSet<>();
for (Map<BrokerInfo, TopicInfo> broker
: this.brokerPubInfoMap.values()) {
for (Map.Entry<BrokerInfo, TopicInfo> entry
@@ -191,7 +191,7 @@ public class TopicPSInfoManager {
}
public Set<Partition> getPartitions(Set<String> topics) {
- Set<Partition> partList = new HashSet<Partition>();
+ Set<Partition> partList = new HashSet<>();
for (String topic : topics) {
partList.addAll(getPartitionSet(topic));
}
@@ -199,7 +199,7 @@ public class TopicPSInfoManager {
}
public Map<String, Partition> getPartitionMap(Set<String> topics) {
- Map<String, Partition> partMap = new HashMap<String, Partition>();
+ Map<String, Partition> partMap = new HashMap<>();
for (String topic : topics) {
ConcurrentHashMap<BrokerInfo, TopicInfo> topicInfoMap =
brokerPubInfoMap.get(topic);
@@ -225,11 +225,11 @@ public class TopicPSInfoManager {
}
public Set<Partition> getPartitionSet(String topic) {
- Set<Partition> partSet = new HashSet<Partition>();
+ Set<Partition> partSet = new HashSet<>();
ConcurrentHashMap<BrokerInfo, TopicInfo> topicInfoMap =
brokerPubInfoMap.get(topic);
if (topicInfoMap == null) {
- return new HashSet<Partition>();
+ return new HashSet<>();
}
for (Map.Entry<BrokerInfo, TopicInfo> entry
: topicInfoMap.entrySet()) {
@@ -250,7 +250,7 @@ public class TopicPSInfoManager {
}
public List<Partition> getPartitionList(String topic) {
- List<Partition> partList = new ArrayList<Partition>(getPartitionSet(topic));
+ List<Partition> partList = new ArrayList<>(getPartitionSet(topic));
return partList;
}
@@ -264,7 +264,7 @@ public class TopicPSInfoManager {
}
public List<TopicInfo> getBrokerPubInfoList(BrokerInfo broker) {
- List<TopicInfo> topicInfoList = new ArrayList<TopicInfo>();
+ List<TopicInfo> topicInfoList = new ArrayList<>();
for (Map.Entry<String, ConcurrentHashMap<BrokerInfo, TopicInfo>> pubEntry
: brokerPubInfoMap.entrySet()) {
ConcurrentHashMap<BrokerInfo, TopicInfo> topicPubMap =
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodeconsumer/ConsumerBandInfo.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodeconsumer/ConsumerBandInfo.java
index 7d03ca5..2f33cc9 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodeconsumer/ConsumerBandInfo.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodeconsumer/ConsumerBandInfo.java
@@ -46,15 +46,15 @@ public class ConsumerBandInfo {
private long createTime = System.currentTimeMillis(); //create time
private Set<String> topicSet = new HashSet<>(); //topic set
private Map<String, TreeSet<String>> topicConditions = //filter condition set
- new HashMap<String, TreeSet<String>>();
+ new HashMap<>();
private ConcurrentHashMap<String, ConsumerInfo> consumerInfoMap = //consumer info
- new ConcurrentHashMap<String, ConsumerInfo>();
+ new ConcurrentHashMap<>();
private ConcurrentHashMap<String, String> partitionInfoMap = //partition info
- new ConcurrentHashMap<String, String>();
+ new ConcurrentHashMap<>();
private ConcurrentHashMap<String, Long> partOffsetMap = //partition offset
- new ConcurrentHashMap<String, Long>();
+ new ConcurrentHashMap<>();
private ConcurrentHashMap<String, NodeRebInfo> rebalanceMap = //load balance
- new ConcurrentHashMap<String, NodeRebInfo>();
+ new ConcurrentHashMap<>();
private int defBClientRate = -2; //default broker/client ratio
private int confBClientRate = -2; //config broker/client ratio
private int curBClientRate = -2; //current broker/client ratio
@@ -205,7 +205,7 @@ public class ConsumerBandInfo {
return null;
}
this.rebalanceMap.remove(consumerId);
- List<String> partKeyList = new ArrayList<String>();
+ List<String> partKeyList = new ArrayList<>();
for (Map.Entry<String, String> entry : partitionInfoMap.entrySet()) {
if (entry.getValue() != null) {
if (entry.getValue().equals(consumerId)) {
@@ -274,9 +274,9 @@ public class ConsumerBandInfo {
}
public RebProcessInfo getNeedRebNodeList() {
- List<String> needProcessList = new ArrayList<String>();
- List<String> needEscapeList = new ArrayList<String>();
- List<String> needRemoved = new ArrayList<String>();
+ List<String> needProcessList = new ArrayList<>();
+ List<String> needEscapeList = new ArrayList<>();
+ List<String> needRemoved = new ArrayList<>();
for (NodeRebInfo nodeRebInfo : this.rebalanceMap.values()) {
if (nodeRebInfo.getStatus() == 0) {
nodeRebInfo.setStatus(1);
@@ -305,7 +305,7 @@ public class ConsumerBandInfo {
|| this.rebalanceMap.isEmpty()) {
return;
}
- List<String> needRemoved = new ArrayList<String>();
+ List<String> needRemoved = new ArrayList<>();
for (NodeRebInfo nodeRebInfo : this.rebalanceMap.values()) {
if (processList.contains(nodeRebInfo.getClientId())) {
if (nodeRebInfo.getReqType() == 0) {
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodeconsumer/ConsumerEventManager.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodeconsumer/ConsumerEventManager.java
index 57f19d6..3ca4dd2 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodeconsumer/ConsumerEventManager.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodeconsumer/ConsumerEventManager.java
@@ -36,11 +36,11 @@ public class ConsumerEventManager {
private static final Logger logger = LoggerFactory.getLogger(ConsumerEventManager.class);
private final ConcurrentHashMap<String/* consumerId */, LinkedList<ConsumerEvent>> disconnectEventMap =
- new ConcurrentHashMap<String, LinkedList<ConsumerEvent>>();
+ new ConcurrentHashMap<>();
private final ConcurrentHashMap<String/* consumerId */, LinkedList<ConsumerEvent>> connectEventMap =
- new ConcurrentHashMap<String, LinkedList<ConsumerEvent>>();
+ new ConcurrentHashMap<>();
private final ConcurrentHashMap<String/* group */, AtomicInteger> groupUnfinishedCountMap =
- new ConcurrentHashMap<String, AtomicInteger>();
+ new ConcurrentHashMap<>();
private final ConsumerInfoHolder consumerHolder;
@@ -53,7 +53,7 @@ public class ConsumerEventManager {
LinkedList<ConsumerEvent> eventList =
disconnectEventMap.get(consumerId);
if (eventList == null) {
- eventList = new LinkedList<ConsumerEvent>();
+ eventList = new LinkedList<>();
LinkedList<ConsumerEvent> tmptList =
disconnectEventMap.putIfAbsent(consumerId, eventList);
if (tmptList != null) {
@@ -70,7 +70,7 @@ public class ConsumerEventManager {
LinkedList<ConsumerEvent> eventList =
connectEventMap.get(consumerId);
if (eventList == null) {
- eventList = new LinkedList<ConsumerEvent>();
+ eventList = new LinkedList<>();
LinkedList<ConsumerEvent> tmptList =
connectEventMap.putIfAbsent(consumerId, eventList);
if (tmptList != null) {
@@ -240,7 +240,7 @@ public class ConsumerEventManager {
* @return consumer id set
*/
public Set<String> getUnProcessedIdSet() {
- Set<String> consumerIdSet = new HashSet<String>();
+ Set<String> consumerIdSet = new HashSet<>();
for (Map.Entry<String, LinkedList<ConsumerEvent>> entry
: disconnectEventMap.entrySet()) {
if (!entry.getValue().isEmpty()) {
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfoHolder.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfoHolder.java
index 8e9bc49..5c78858 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfoHolder.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfoHolder.java
@@ -31,9 +31,9 @@ import org.apache.tubemq.corebase.cluster.ConsumerInfo;
public class ConsumerInfoHolder {
private final ConcurrentHashMap<String/* group */, ConsumerBandInfo> groupInfoMap =
- new ConcurrentHashMap<String, ConsumerBandInfo>();
+ new ConcurrentHashMap<>();
private final ConcurrentHashMap<String/* consumerId */, String/* group */> consumerIndexMap =
- new ConcurrentHashMap<String, String>();
+ new ConcurrentHashMap<>();
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
/**
@@ -55,7 +55,7 @@ public class ConsumerInfoHolder {
oldConsumeBandInfo.getConsumerInfoList();
if (oldConsumerList != null) {
List<ConsumerInfo> consumerList =
- new ArrayList<ConsumerInfo>(oldConsumerList.size());
+ new ArrayList<>(oldConsumerList.size());
for (ConsumerInfo consumer : oldConsumerList) {
if (consumer != null) {
consumerList.add(consumer.clone());
@@ -392,7 +392,7 @@ public class ConsumerInfoHolder {
return Collections.emptyList();
} else {
List<String> groupList =
- new ArrayList<String>(groupInfoMap.size());
+ new ArrayList<>(groupInfoMap.size());
groupList.addAll(groupInfoMap.keySet());
return groupList;
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodeconsumer/RebProcessInfo.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodeconsumer/RebProcessInfo.java
index c661d9a..5d72b86 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodeconsumer/RebProcessInfo.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodeconsumer/RebProcessInfo.java
@@ -24,9 +24,9 @@ import java.util.List;
public class RebProcessInfo {
public List<String> needProcessList =
- new ArrayList<String>();
+ new ArrayList<>();
public List<String> needEscapeList =
- new ArrayList<String>();
+ new ArrayList<>();
public RebProcessInfo() {
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodeproducer/ProducerInfoHolder.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodeproducer/ProducerInfoHolder.java
index dae39e9..63e6586 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodeproducer/ProducerInfoHolder.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodeproducer/ProducerInfoHolder.java
@@ -25,7 +25,7 @@ import org.apache.tubemq.corebase.cluster.ProducerInfo;
public class ProducerInfoHolder {
final ConcurrentHashMap<String/* producerId */, ProducerInfo> producerInfoMap =
- new ConcurrentHashMap<String, ProducerInfo>();
+ new ConcurrentHashMap<>();
public ProducerInfo getProducerInfo(String producerId) {
return producerInfoMap.get(producerId);
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/action/screen/Master.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/action/screen/Master.java
index 9db6045..7ac535c 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/action/screen/Master.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/action/screen/Master.java
@@ -224,7 +224,7 @@ public class Master implements Action {
brokerInfoMap = master.getBrokerHolder().getBrokerInfoMap();
} else {
String[] brokerIdArr = brokerIds.split(",");
- List<Integer> idList = new ArrayList<Integer>(brokerIdArr.length);
+ List<Integer> idList = new ArrayList<>(brokerIdArr.length);
for (String strId : brokerIdArr) {
idList.add(Integer.parseInt(strId));
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/action/screen/Webapi.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/action/screen/Webapi.java
index 7c52308..63e7367 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/action/screen/Webapi.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/action/screen/Webapi.java
@@ -428,9 +428,9 @@ public class Webapi implements Action {
int minRequireClientCnt = -2;
int rebalanceStatus = -2;
Set<String> topicSet = new HashSet<>();
- List<ConsumerInfo> consumerList = new ArrayList<ConsumerInfo>();
- Map<String, NodeRebInfo> nodeRebInfoMap = new ConcurrentHashMap<String, NodeRebInfo>();
- Map<String, TreeSet<String>> existedTopicCondtions = new HashMap<String, TreeSet<String>>();
+ List<ConsumerInfo> consumerList = new ArrayList<>();
+ Map<String, NodeRebInfo> nodeRebInfoMap = new ConcurrentHashMap<>();
+ Map<String, TreeSet<String>> existedTopicCondtions = new HashMap<>();
ConsumerInfoHolder consumerHolder = master.getConsumerHolder();
ConsumerBandInfo consumerBandInfo = consumerHolder.getConsumerBandInfo(strConsumeGroup);
if (consumerBandInfo != null) {
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/action/screen/cluster/ClusterManager.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/action/screen/cluster/ClusterManager.java
index 1fdd8f3..192254c 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/action/screen/cluster/ClusterManager.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/action/screen/cluster/ClusterManager.java
@@ -42,7 +42,7 @@ public class ClusterManager implements Action {
public void execute(RequestContext context) {
HttpServletRequest req = context.getReq();
BrokerConfManager brokerConfManager = this.master.getMasterTopicManager();
- List<ClusterGroupVO> clusterGroupVOList = new ArrayList<ClusterGroupVO>();
+ List<ClusterGroupVO> clusterGroupVOList = new ArrayList<>();
ClusterGroupVO clusterGroupVO = brokerConfManager.getGroupAddressStrInfo();
if (clusterGroupVO != null) {
clusterGroupVOList.add(clusterGroupVO);
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/action/screen/config/BrokerList.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/action/screen/config/BrokerList.java
index e7fab62..909a888 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/action/screen/config/BrokerList.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/action/screen/config/BrokerList.java
@@ -84,7 +84,7 @@ public class BrokerList implements Action {
fromIndex + pageSize > brokerInfoList.size() ? brokerInfoList.size() : fromIndex
+ pageSize;
List<BrokerInfo> firstPageList = brokerInfoList.subList(fromIndex, toIndex);
- brokerVOList = new ArrayList<BrokerVO>(brokerInfoList.size());
+ brokerVOList = new ArrayList<>(brokerInfoList.size());
TopicPSInfoManager psInfoManager = master.getTopicPSInfoManager();
for (BrokerInfo brokerInfo : firstPageList) {
BrokerVO brokerVO = new BrokerVO();
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebAdminFlowRuleHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebAdminFlowRuleHandler.java
index 83dd0eb..e6a91a1 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebAdminFlowRuleHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebAdminFlowRuleHandler.java
@@ -84,13 +84,13 @@ public class WebAdminFlowRuleHandler {
WebParameterUtils.validBooleanDataParameter("needSSDProc",
req.getParameter("needSSDProc"),
false, false);
- Set<String> batchGroupNames = new HashSet<String>();
+ Set<String> batchGroupNames = new HashSet<>();
if (opType == 1) {
batchGroupNames.add(TServerConstants.TOKEN_DEFAULT_FLOW_CONTROL);
} else {
// get groupname info if rule is set to consume group
boolean checkResToken = opType > 1;
- Set<String> resTokenSet = new HashSet<String>();
+ Set<String> resTokenSet = new HashSet<>();
if (checkResToken) {
resTokenSet.add(TServerConstants.TOKEN_DEFAULT_FLOW_CONTROL);
}
@@ -147,12 +147,12 @@ public class WebAdminFlowRuleHandler {
WebParameterUtils.validDateParameter("createDate",
req.getParameter("createDate"),
TBaseConstants.META_MAX_DATEVALUE_LENGTH, false, new Date());
- Set<String> batchGroupNames = new HashSet<String>();
+ Set<String> batchGroupNames = new HashSet<>();
if (opType == 1) {
batchGroupNames.add(TServerConstants.TOKEN_DEFAULT_FLOW_CONTROL);
} else {
boolean checkResToken = opType > 1;
- Set<String> resTokenSet = new HashSet<String>();
+ Set<String> resTokenSet = new HashSet<>();
if (checkResToken) {
resTokenSet.add(TServerConstants.TOKEN_DEFAULT_FLOW_CONTROL);
}
@@ -193,13 +193,13 @@ public class WebAdminFlowRuleHandler {
WebParameterUtils.validDateParameter("createDate",
req.getParameter("createDate"),
TBaseConstants.META_MAX_DATEVALUE_LENGTH, false, new Date());
- Set<String> batchGroupNames = new HashSet<String>();
+ Set<String> batchGroupNames = new HashSet<>();
// check optype
if (opType == 1) {
batchGroupNames.add(TServerConstants.TOKEN_DEFAULT_FLOW_CONTROL);
} else {
boolean checkResToken = opType > 1;
- Set<String> resTokenSet = new HashSet<String>();
+ Set<String> resTokenSet = new HashSet<>();
if (checkResToken) {
resTokenSet.add(TServerConstants.TOKEN_DEFAULT_FLOW_CONTROL);
}
@@ -302,12 +302,12 @@ public class WebAdminFlowRuleHandler {
.setQryPriorityId(WebParameterUtils.validIntDataParameter("qryPriorityId",
req.getParameter("qryPriorityId"),
false, TBaseConstants.META_VALUE_UNDEFINED, 0));
- Set<String> batchGroupNames = new HashSet<String>();
+ Set<String> batchGroupNames = new HashSet<>();
if (opType == 1) {
batchGroupNames.add(TServerConstants.TOKEN_DEFAULT_FLOW_CONTROL);
} else {
boolean checkResToken = opType > 1;
- Set<String> resTokenSet = new HashSet<String>();
+ Set<String> resTokenSet = new HashSet<>();
if (checkResToken) {
resTokenSet.add(TServerConstants.TOKEN_DEFAULT_FLOW_CONTROL);
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebAdminGroupCtrlHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebAdminGroupCtrlHandler.java
index 982f942..7c33c7b 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebAdminGroupCtrlHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebAdminGroupCtrlHandler.java
@@ -170,7 +170,7 @@ public class WebAdminGroupCtrlHandler {
}
Set<String> confgiuredTopicSet = brokerConfManager.getTotalConfiguredTopicNames();
HashMap<String, BdbGroupFilterCondEntity> inGroupFilterCondEntityMap =
- new HashMap<String, BdbGroupFilterCondEntity>();
+ new HashMap<>();
for (int j = 0; j < filterJsonArray.size(); j++) {
Map<String, String> groupObject = filterJsonArray.get(j);
try {
@@ -368,7 +368,7 @@ public class WebAdminGroupCtrlHandler {
if ((jsonArray == null) || (jsonArray.isEmpty())) {
throw new Exception("Null value of filterCondJsonSet, please set the value first!");
}
- Set<String> batchRecords = new HashSet<String>();
+ Set<String> batchRecords = new HashSet<>();
List<BdbGroupFilterCondEntity> modifyFilterCondEntities = new ArrayList<>();
for (int j = 0; j < jsonArray.size(); j++) {
Map<String, String> groupObject = jsonArray.get(j);
@@ -780,7 +780,7 @@ public class WebAdminGroupCtrlHandler {
}
Set<String> confgiuredTopicSet = brokerConfManager.getTotalConfiguredTopicNames();
HashMap<String, BdbConsumerGroupEntity> inGroupAuthConfEntityMap =
- new HashMap<String, BdbConsumerGroupEntity>();
+ new HashMap<>();
for (int j = 0; j < jsonArray.size(); j++) {
Map<String, String> groupObject = jsonArray.get(j);
try {
@@ -1294,7 +1294,7 @@ public class WebAdminGroupCtrlHandler {
throw new Exception("Null value of groupNameJsonSet, please set the value first!");
}
HashMap<String, BdbConsumeGroupSettingEntity> inOffsetRstGroupEntityMap =
- new HashMap<String, BdbConsumeGroupSettingEntity>();
+ new HashMap<>();
for (int j = 0; j < groupNameJsonArray.size(); j++) {
Map<String, String> groupObject = groupNameJsonArray.get(j);
try {
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebAdminTopicAuthHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebAdminTopicAuthHandler.java
index f2d561f..77beb91 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebAdminTopicAuthHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebAdminTopicAuthHandler.java
@@ -119,9 +119,9 @@ public class WebAdminTopicAuthHandler {
}
Set<String> configuredTopicSet = brokerConfManager.getTotalConfiguredTopicNames();
HashMap<String, BdbTopicAuthControlEntity> inTopicAuthConfEntityMap =
- new HashMap<String, BdbTopicAuthControlEntity>();
+ new HashMap<>();
HashMap<String, BdbConsumerGroupEntity> inGroupAuthConfEntityMap =
- new HashMap<String, BdbConsumerGroupEntity>();
+ new HashMap<>();
for (int count = 0; count < topicJsonArray.size(); count++) {
Map<String, String> jsonObject = topicJsonArray.get(count);
try {
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerDefConfHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerDefConfHandler.java
index 128fbde..73a9989 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerDefConfHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerDefConfHandler.java
@@ -601,7 +601,7 @@ public class WebBrokerDefConfHandler {
// Check the current status and status after the change, to see if there are changes.
// If yes, check if the current status complies with the change.
// If it complies, record the change.
- Set<BdbBrokerConfEntity> newBrokerEntitySet = new HashSet<BdbBrokerConfEntity>();
+ Set<BdbBrokerConfEntity> newBrokerEntitySet = new HashSet<>();
for (BdbBrokerConfEntity oldEntity : batchBrokerEntitySet) {
if (oldEntity == null) {
continue;
@@ -1026,7 +1026,7 @@ public class WebBrokerDefConfHandler {
WebParameterUtils.getBatchBrokerIdSet(req.getParameter("brokerId"),
brokerConfManager, true, strBuffer);
Set<BdbBrokerConfEntity> newBrokerEntitys =
- new HashSet<BdbBrokerConfEntity>();
+ new HashSet<>();
for (BdbBrokerConfEntity oldEntity : batchBrokerEntities) {
if (oldEntity == null) {
continue;
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerTopicConfHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerTopicConfHandler.java
index c0d932c..bb90522 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerTopicConfHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerTopicConfHandler.java
@@ -97,8 +97,8 @@ public class WebBrokerTopicConfHandler {
Set<BdbBrokerConfEntity> batchBrokerEntitySet =
WebParameterUtils.getBatchBrokerIdSet(req.getParameter("brokerId"), brokerConfManager, true,
strBuffer);
- List<BdbTopicConfEntity> batchAddBdbTopicEntities = new ArrayList<BdbTopicConfEntity>();
- List<BdbTopicAuthControlEntity> batchAddBdbTopicAuthControls = new ArrayList<BdbTopicAuthControlEntity>();
+ List<BdbTopicConfEntity> batchAddBdbTopicEntities = new ArrayList<>();
+ List<BdbTopicAuthControlEntity> batchAddBdbTopicAuthControls = new ArrayList<>();
// for each topic
for (String topicName : batchAddTopicNames) {
BdbTopicAuthControlEntity tmpTopicAuthControl =
@@ -238,10 +238,10 @@ public class WebBrokerTopicConfHandler {
if ((topicJsonArray == null) || (topicJsonArray.isEmpty())) {
throw new Exception("Null value of topicJsonSet, please set the value first!");
}
- Set<String> batchAddTopicNames = new HashSet<String>();
- Set<String> batchAddItemKeys = new HashSet<String>();
- List<BdbTopicAuthControlEntity> batchTopicAuthInfos = new ArrayList<BdbTopicAuthControlEntity>();
- List<BdbTopicConfEntity> batchAddBdbTopicEntities = new ArrayList<BdbTopicConfEntity>();
+ Set<String> batchAddTopicNames = new HashSet<>();
+ Set<String> batchAddItemKeys = new HashSet<>();
+ List<BdbTopicAuthControlEntity> batchTopicAuthInfos = new ArrayList<>();
+ List<BdbTopicConfEntity> batchAddBdbTopicEntities = new ArrayList<>();
for (int count = 0; count < topicJsonArray.size(); count++) {
Map<String, String> jsonObject = topicJsonArray.get(count);
try {
@@ -722,9 +722,9 @@ public class WebBrokerTopicConfHandler {
Set<BdbBrokerConfEntity> batchInputTopicEntitySet =
WebParameterUtils.getBatchBrokerIdSet(req.getParameter("brokerId"),
brokerConfManager, true, strBuffer);
- Set<Integer> changedBrokerSet = new HashSet<Integer>();
+ Set<Integer> changedBrokerSet = new HashSet<>();
Set<BdbTopicConfEntity> batchRmvBdbTopicEntitySet =
- new HashSet<BdbTopicConfEntity>();
+ new HashSet<>();
// For the broker to perform, check its status
// and check the config of the topic to see if the action could be performed
@@ -866,7 +866,7 @@ public class WebBrokerTopicConfHandler {
Set<BdbBrokerConfEntity> batchBrokerEntitySet =
WebParameterUtils.getBatchBrokerIdSet(req.getParameter("brokerId"), brokerConfManager, true,
strBuffer);
- List<BdbTopicConfEntity> batchRmvBdbTopicEntities = new ArrayList<BdbTopicConfEntity>();
+ List<BdbTopicConfEntity> batchRmvBdbTopicEntities = new ArrayList<>();
for (BdbBrokerConfEntity brokerConfEntity : batchBrokerEntitySet) {
if (brokerConfEntity == null) {
continue;
@@ -913,7 +913,7 @@ public class WebBrokerTopicConfHandler {
}
}
try {
- Set<Integer> changedBrokerSet = new HashSet<Integer>();
+ Set<Integer> changedBrokerSet = new HashSet<>();
for (BdbTopicConfEntity itemTopicEntity : batchRmvBdbTopicEntities) {
BdbBrokerConfEntity brokerConfEntity =
brokerConfManager.getBrokerDefaultConfigStoreInfo(itemTopicEntity.getBrokerId());
@@ -1211,7 +1211,7 @@ public class WebBrokerTopicConfHandler {
WebParameterUtils.validIntDataParameter("memCacheFlushIntvl",
req.getParameter("memCacheFlushIntvl"), false, TBaseConstants.META_VALUE_UNDEFINED, 4000);
int unFlushDataHold = unflushThreshold;
- List<BdbTopicConfEntity> batchModBdbTopicEntities = new ArrayList<BdbTopicConfEntity>();
+ List<BdbTopicConfEntity> batchModBdbTopicEntities = new ArrayList<>();
for (BdbBrokerConfEntity tgtEntity : batchBrokerEntitySet) {
if (tgtEntity == null) {
continue;
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/simplemvc/RequestDispatcher.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/simplemvc/RequestDispatcher.java
index f772ba4..a5895c5 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/simplemvc/RequestDispatcher.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/simplemvc/RequestDispatcher.java
@@ -44,9 +44,9 @@ public class RequestDispatcher {
private static final String SCREEN_PLACEHOLDER = "screen_placeholder";
private final WebConfig config;
private final TemplateEngine engine;
- private final HashMap<String, Action> actions = new HashMap<String, Action>();
- private final HashMap<String, String> templates = new HashMap<String, String>();
- private final ThreadLocal<ControlTool> controlTools = new ThreadLocal<ControlTool>();
+ private final HashMap<String, Action> actions = new HashMap<>();
+ private final HashMap<String, String> templates = new HashMap<>();
+ private final ThreadLocal<ControlTool> controlTools = new ThreadLocal<>();
private GenericXmlApplicationContext applicationContext;
/**
@@ -251,7 +251,7 @@ public class RequestDispatcher {
}
private List<Class> getActionClasses(String packageName) throws Exception {
- List<Class> classList = new ArrayList<Class>();
+ List<Class> classList = new ArrayList<>();
String path = packageName.replaceAll("\\.", "/");
URL url = this.getClass().getClassLoader().getResource(path);
if (url == null) {
@@ -271,7 +271,7 @@ public class RequestDispatcher {
private List<File> getFileList(String directory, String type) throws Exception {
File file = new File(directory);
- List<File> fileList = new ArrayList<File>();
+ List<File> fileList = new ArrayList<>();
getFile(file, fileList, type);
return fileList;
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/simplemvc/conf/WebConfig.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/simplemvc/conf/WebConfig.java
index 26623e7..62aa55e 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/simplemvc/conf/WebConfig.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/simplemvc/conf/WebConfig.java
@@ -28,9 +28,9 @@ import org.apache.tubemq.server.master.web.simplemvc.Action;
public class WebConfig {
- private final HashMap<String, Object> tools = new HashMap<String, Object>();
- private final HashMap<String, Action> actions = new HashMap<String, Action>();
- private final HashSet<String> types = new HashSet<String>();
+ private final HashMap<String, Object> tools = new HashMap<>();
+ private final HashMap<String, Action> actions = new HashMap<>();
+ private final HashSet<String> types = new HashSet<>();
private String resourcePath;
private String templatePath;
private String actionPackage;
@@ -38,7 +38,7 @@ public class WebConfig {
private String supportedTypes = ".htm,.html";
private String defaultPage = "index.htm";
private boolean springSupported = false;
- private List<String> beanFilePathList = new ArrayList<String>();
+ private List<String> beanFilePathList = new ArrayList<>();
private boolean standalone = false;
public WebConfig() {
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/tools/BdbGroupAdmin.java b/tubemq-server/src/main/java/org/apache/tubemq/server/tools/BdbGroupAdmin.java
index c248b2d..8730eca 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/tools/BdbGroupAdmin.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/tools/BdbGroupAdmin.java
@@ -58,7 +58,7 @@ public class BdbGroupAdmin {
return;
}
- Set<InetSocketAddress> helpers = new HashSet<InetSocketAddress>();
+ Set<InetSocketAddress> helpers = new HashSet<>();
String group = args[0];
String[] hostAndPort = args[1].split(":");
if (hostAndPort.length != 2) {
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/tools/StoreRepairAdmin.java b/tubemq-server/src/main/java/org/apache/tubemq/server/tools/StoreRepairAdmin.java
index 2f5f8ce..8fd1f5b 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/tools/StoreRepairAdmin.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/tools/StoreRepairAdmin.java
@@ -87,7 +87,7 @@ public class StoreRepairAdmin {
int count = 0;
ExecutorService executor =
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() + 1);
- List<Callable<IndexReparStore>> tasks = new ArrayList<Callable<IndexReparStore>>();
+ List<Callable<IndexReparStore>> tasks = new ArrayList<>();
if (ls != null) {
for (final File subDir : ls) {
if (subDir == null) {
@@ -127,7 +127,7 @@ public class StoreRepairAdmin {
}
if (count > 0) {
CompletionService<IndexReparStore> completionService =
- new ExecutorCompletionService<IndexReparStore>(executor);
+ new ExecutorCompletionService<>(executor);
for (Callable<IndexReparStore> task : tasks) {
completionService.submit(task);
}
@@ -218,7 +218,7 @@ public class StoreRepairAdmin {
}
private void loaderSegments() throws IOException {
- final List<Segment> accum = new ArrayList<Segment>();
+ final List<Segment> accum = new ArrayList<>();
String fileSuffix = DataStoreUtils.DATA_FILE_SUFFIX;
final File[] ls = topicDir.listFiles();
if (ls != null) {