You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2020/05/11 02:21:08 UTC

[incubator-tubemq] branch master updated: [TUBEMQ-95] Substitute the parameterized type for server module (#74)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git


The following commit(s) were added to refs/heads/master by this push:
     new c3a1f95  [TUBEMQ-95] Substitute the parameterized type for server module (#74)
c3a1f95 is described below

commit c3a1f952d1135960bf31186ad65fcc7e68f05f78
Author: Tboy <gu...@immomo.com>
AuthorDate: Mon May 11 10:21:00 2020 +0800

    [TUBEMQ-95] Substitute the parameterized type for server module (#74)
---
 .../tubemq/server/broker/BrokerServiceServer.java  |  8 +--
 .../broker/metadata/BrokerMetadataManager.java     | 22 +++---
 .../server/broker/msgstore/MessageStore.java       |  4 +-
 .../broker/msgstore/MessageStoreManager.java       | 20 +++---
 .../broker/msgstore/disk/FileSegmentList.java      |  2 +-
 .../broker/msgstore/disk/GetMessageResult.java     |  4 +-
 .../server/broker/msgstore/disk/MsgFileStore.java  |  4 +-
 .../server/broker/msgstore/mem/MsgMemStore.java    |  6 +-
 .../server/broker/msgstore/ssd/MsgSSDSegment.java  |  6 +-
 .../broker/msgstore/ssd/MsgSSDStoreManager.java    | 20 +++---
 .../server/broker/nodeinfo/ConsumerNodeInfo.java   |  4 +-
 .../server/broker/offset/DefaultOffsetManager.java | 10 +--
 .../server/broker/stats/GroupCountService.java     |  2 +-
 .../server/broker/web/BrokerAdminServlet.java      |  2 +-
 .../aaaserver/SimpleCertificateBrokerHandler.java  |  4 +-
 .../server/common/heartbeat/HeartbeatManager.java  |  8 +--
 .../zookeeper/RecoverableZooKeeper.java            |  2 +-
 .../offsetstorage/zookeeper/ZooKeeperWatcher.java  |  2 +-
 .../server/common/paramcheck/PBParameterUtils.java |  2 +-
 .../apache/tubemq/server/common/utils/RowLock.java |  4 +-
 .../server/common/utils/WebParameterUtils.java     | 10 +--
 .../org/apache/tubemq/server/master/TMaster.java   | 36 +++++-----
 .../server/master/balance/DefaultLoadBalancer.java | 46 ++++++------
 .../master/bdbstore/DefaultBdbStoreService.java    | 32 ++++-----
 .../nodemanage/nodebroker/BrokerConfManager.java   | 84 ++++++++++------------
 .../nodemanage/nodebroker/BrokerInfoHolder.java    | 10 +--
 .../nodebroker/BrokerSyncStatusInfo.java           | 10 +--
 .../nodemanage/nodebroker/TopicPSInfoManager.java  | 22 +++---
 .../nodemanage/nodeconsumer/ConsumerBandInfo.java  | 20 +++---
 .../nodeconsumer/ConsumerEventManager.java         | 12 ++--
 .../nodeconsumer/ConsumerInfoHolder.java           |  8 +--
 .../nodemanage/nodeconsumer/RebProcessInfo.java    |  4 +-
 .../nodeproducer/ProducerInfoHolder.java           |  2 +-
 .../server/master/web/action/screen/Master.java    |  2 +-
 .../server/master/web/action/screen/Webapi.java    |  6 +-
 .../web/action/screen/cluster/ClusterManager.java  |  2 +-
 .../web/action/screen/config/BrokerList.java       |  2 +-
 .../web/handler/WebAdminFlowRuleHandler.java       | 16 ++---
 .../web/handler/WebAdminGroupCtrlHandler.java      |  8 +--
 .../web/handler/WebAdminTopicAuthHandler.java      |  4 +-
 .../web/handler/WebBrokerDefConfHandler.java       |  4 +-
 .../web/handler/WebBrokerTopicConfHandler.java     | 22 +++---
 .../master/web/simplemvc/RequestDispatcher.java    | 10 +--
 .../master/web/simplemvc/conf/WebConfig.java       |  8 +--
 .../apache/tubemq/server/tools/BdbGroupAdmin.java  |  2 +-
 .../tubemq/server/tools/StoreRepairAdmin.java      |  6 +-
 46 files changed, 257 insertions(+), 267 deletions(-)

diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/BrokerServiceServer.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/BrokerServiceServer.java
index 3de42d1..6cd99b1 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/BrokerServiceServer.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/BrokerServiceServer.java
@@ -89,7 +89,7 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
     private final BrokerConfig tubeConfig;
     // registered consumers. format : consumer group - topic - partition id  --> consumer info
     private final ConcurrentHashMap<String/* group:topic-partitionId */, ConsumerNodeInfo> consumerRegisterMap =
-            new ConcurrentHashMap<String, ConsumerNodeInfo>();
+            new ConcurrentHashMap<>();
     // metadata manager.
     private final MetadataManager metadataManager;
     // offset storage manager.
@@ -555,7 +555,7 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
                         .append(topicName).append(")!\"}");
                 return sb;
             } else {
-                List<String> transferMessageList = new ArrayList<String>();
+                List<String> transferMessageList = new ArrayList<>();
                 List<TransferedMessage> tmpMsgList = getMessageResult.transferedMessageList;
                 List<Message> messageList = DataConverterUtil.convertMessage(topicName, tmpMsgList);
                 int startPos = messageList.size() - msgCount < 0 ? 0 : messageList.size() - msgCount;
@@ -755,7 +755,7 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
         final String groupName = (String) paramCheckResult.checkData;
 
         boolean isRegister = (request.getOpType() == RpcConstants.MSG_OPTYPE_REGISTER);
-        Set<String> filterCondSet = new HashSet<String>();
+        Set<String> filterCondSet = new HashSet<>();
         if (request.getFilterCondStrList() != null && !request.getFilterCondStrList().isEmpty()) {
             for (String filterCond : request.getFilterCondStrList()) {
                 if (TStringUtils.isNotBlank(filterCond)) {
@@ -1022,7 +1022,7 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
                 DataConverterUtil.convertPartitionInfo(request.getPartitionInfoList());
         CertifiedResult authorizeResult = null;
         boolean isAuthorized = false;
-        List<String> failureInfo = new ArrayList<String>();
+        List<String> failureInfo = new ArrayList<>();
         for (Partition partition : partitions) {
             String topic = partition.getTopic();
             int partitionId = partition.getPartitionId();
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/BrokerMetadataManager.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/BrokerMetadataManager.java
index 0e5cec4..984291d 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/BrokerMetadataManager.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/BrokerMetadataManager.java
@@ -50,20 +50,20 @@ public class BrokerMetadataManager implements MetadataManager {
     // broker's metadata in String format.
     private String brokerDefMetaConfInfo = "";
     // broker's topic's config list.
-    private List<String> topicMetaConfInfoLst = new ArrayList<String>();
+    private List<String> topicMetaConfInfoLst = new ArrayList<>();
     // topic in this broker.
-    private List<String> topics = new ArrayList<String>();
+    private List<String> topics = new ArrayList<>();
     // broker's default metadata.
     private BrokerDefMetadata brokerDefMetadata = new BrokerDefMetadata();
     // topic with custom config.
     private ConcurrentHashMap<String/* topic */, TopicMetadata> topicConfigMap =
-            new ConcurrentHashMap<String, TopicMetadata>();
+            new ConcurrentHashMap<>();
     // topics will be closed.
     private Map<String/* topic */, Integer> closedTopicMap =
-            new ConcurrentHashMap<String, Integer>();
+            new ConcurrentHashMap<>();
     // topics will be removed.
     private Map<String/* topic */, TopicMetadata> removedTopicConfigMap =
-            new ConcurrentHashMap<String, TopicMetadata>();
+            new ConcurrentHashMap<>();
     private long lastRptBrokerMetaConfId = 0;
 
     public BrokerMetadataManager() {
@@ -174,7 +174,7 @@ public class BrokerMetadataManager implements MetadataManager {
      */
     @Override
     public List<String> getHardRemovedTopics() {
-        List<String> targetTopics = new ArrayList<String>();
+        List<String> targetTopics = new ArrayList<>();
         for (Map.Entry<String, TopicMetadata> entry
                 : this.removedTopicConfigMap.entrySet()) {
             if (entry.getKey() == null || entry.getValue() == null) {
@@ -242,11 +242,11 @@ public class BrokerMetadataManager implements MetadataManager {
             logger.error("[Metadata Manage] received broker topic info is Blank, not update");
             return;
         }
-        List<String> newTopics = new ArrayList<String>();
+        List<String> newTopics = new ArrayList<>();
         Map<String/* topic */, Integer> tmpInvalidTopicMap =
-                new ConcurrentHashMap<String, Integer>();
+                new ConcurrentHashMap<>();
         ConcurrentHashMap<String/* topic */, TopicMetadata> newTopicConfigMap =
-                new ConcurrentHashMap<String, TopicMetadata>();
+                new ConcurrentHashMap<>();
         for (String strTopicConfInfo : newTopicMetaConfInfoLst) {
             if (TStringUtils.isBlank(strTopicConfInfo)) {
                 continue;
@@ -287,7 +287,7 @@ public class BrokerMetadataManager implements MetadataManager {
         // 该部分根据Master上的指示进行对应Topic的删除操作
         boolean needProcess = false;
         if (isTakeRemoveTopics) {
-            List<String> origTopics = new ArrayList<String>();
+            List<String> origTopics = new ArrayList<>();
             if (rmvTopicMetaConfInfoLst != null
                     && !rmvTopicMetaConfInfoLst.isEmpty()) {
                 for (String tmpTopicMetaConfInfo : rmvTopicMetaConfInfoLst) {
@@ -307,7 +307,7 @@ public class BrokerMetadataManager implements MetadataManager {
                     origTopics.add(topicMetadata.getTopic());
                 }
             }
-            List<String> tmpTopics = new ArrayList<String>();
+            List<String> tmpTopics = new ArrayList<>();
             for (Map.Entry<String, TopicMetadata> entry : removedTopicConfigMap.entrySet()) {
                 if (entry.getKey() == null || entry.getValue() == null) {
                     continue;
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java
index 5e9f31b..7dbef6d 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java
@@ -213,9 +213,9 @@ public class MessageStore implements Closeable {
                 // return not found when data is under memory sink operation.
                 if (memMsgRlt.isSuccess) {
                     HashMap<String, CountItem> countMap =
-                            new HashMap<String, CountItem>();
+                            new HashMap<>();
                     List<ClientBroker.TransferedMessage> transferedMessageList =
-                            new ArrayList<ClientBroker.TransferedMessage>();
+                            new ArrayList<>();
                     if (!memMsgRlt.cacheMsgList.isEmpty()) {
                         final StringBuilder strBuffer = new StringBuilder(512);
                         for (ByteBuffer dataBuffer : memMsgRlt.cacheMsgList) {
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStoreManager.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStoreManager.java
index 74e0d91..daa8ba7 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStoreManager.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStoreManager.java
@@ -70,7 +70,7 @@ public class MessageStoreManager implements StoreService {
     // storeId to store on each topic.
     private final ConcurrentHashMap<String/* topic */,
             ConcurrentHashMap<Integer/* storeId */, MessageStore>> dataStores =
-            new ConcurrentHashMap<String, ConcurrentHashMap<Integer, MessageStore>>();
+            new ConcurrentHashMap<>();
     // ssd store manager
     private final MsgSSDStoreManager msgSsdStoreManager;
     // store service status
@@ -204,13 +204,13 @@ public class MessageStoreManager implements StoreService {
         }
         try {
             List<String> removedTopics =
-                    new ArrayList<String>();
+                    new ArrayList<>();
             Map<String, TopicMetadata> removedTopicMap =
                     this.metadataManager.getRemovedTopicConfigMap();
             if (removedTopicMap.isEmpty()) {
                 return removedTopics;
             }
-            Set<String> targetTopics = new HashSet<String>();
+            Set<String> targetTopics = new HashSet<>();
             for (Map.Entry<String, TopicMetadata> entry : removedTopicMap.entrySet()) {
                 if (entry.getKey() == null || entry.getValue() == null) {
                     continue;
@@ -310,7 +310,7 @@ public class MessageStoreManager implements StoreService {
         ConcurrentHashMap<Integer, MessageStore> dataMap = dataStores.get(topic);
         if (dataMap == null) {
             ConcurrentHashMap<Integer, MessageStore> tmpTopicMap =
-                    new ConcurrentHashMap<Integer, MessageStore>();
+                    new ConcurrentHashMap<>();
             dataMap = this.dataStores.putIfAbsent(topic, tmpTopicMap);
             if (dataMap == null) {
                 dataMap = tmpTopicMap;
@@ -454,7 +454,7 @@ public class MessageStoreManager implements StoreService {
 
     private Set<File> getLogDirSet(final BrokerConfig tubeConfig) throws IOException {
         TopicMetadata topicMetadata = null;
-        final Set<String> paths = new HashSet<String>();
+        final Set<String> paths = new HashSet<>();
         paths.add(tubeConfig.getPrimaryPath());
         for (final String topic : metadataManager.getTopics()) {
             topicMetadata = metadataManager.getTopicMetadata(topic);
@@ -463,7 +463,7 @@ public class MessageStoreManager implements StoreService {
                 paths.add(topicMetadata.getDataPath());
             }
         }
-        final Set<File> fileSet = new HashSet<File>();
+        final Set<File> fileSet = new HashSet<>();
         for (final String path : paths) {
             final File dir = new File(path);
             if (!dir.exists() && !dir.mkdirs()) {
@@ -497,7 +497,7 @@ public class MessageStoreManager implements StoreService {
         final long start = System.currentTimeMillis();
         final AtomicInteger errCnt = new AtomicInteger(0);
         final AtomicInteger finishCnt = new AtomicInteger(0);
-        List<Callable<MessageStore>> tasks = new ArrayList<Callable<MessageStore>>();
+        List<Callable<MessageStore>> tasks = new ArrayList<>();
         for (final File dir : this.getLogDirSet(tubeConfig)) {
             if (dir == null) {
                 continue;
@@ -542,7 +542,7 @@ public class MessageStoreManager implements StoreService {
                             ConcurrentHashMap<Integer, MessageStore> map =
                                     dataStores.get(msgStore.getTopic());
                             if (map == null) {
-                                map = new ConcurrentHashMap<Integer, MessageStore>();
+                                map = new ConcurrentHashMap<>();
                                 ConcurrentHashMap<Integer, MessageStore> oldmap =
                                         dataStores.putIfAbsent(msgStore.getTopic(), map);
                                 if (oldmap != null) {
@@ -594,7 +594,7 @@ public class MessageStoreManager implements StoreService {
         ExecutorService executor =
                 Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() + 1);
         CompletionService<MessageStore> completionService =
-                new ExecutorCompletionService<MessageStore>(executor);
+                new ExecutorCompletionService<>(executor);
         for (Callable<MessageStore> task : tasks) {
             completionService.submit(task);
         }
@@ -707,7 +707,7 @@ public class MessageStoreManager implements StoreService {
         }
 
         private Set<String> getExpiredTopicSet(final StringBuilder sb) {
-            Set<String> expiredTopic = new HashSet<String>();
+            Set<String> expiredTopic = new HashSet<>();
             for (Map<Integer, MessageStore> storeMap : dataStores.values()) {
                 if (storeMap == null || storeMap.isEmpty()) {
                     continue;
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/FileSegmentList.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/FileSegmentList.java
index 04f52af..6020641 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/FileSegmentList.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/FileSegmentList.java
@@ -30,7 +30,7 @@ public class FileSegmentList implements SegmentList {
             LoggerFactory.getLogger(FileSegmentList.class);
     // list of segments.
     private AtomicReference<Segment[]> segmentList =
-            new AtomicReference<Segment[]>();
+            new AtomicReference<>();
 
     public FileSegmentList(final Segment[] s) {
         super();
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/GetMessageResult.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/GetMessageResult.java
index 304ff33..84b6d6d 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/GetMessageResult.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/GetMessageResult.java
@@ -37,8 +37,8 @@ public class GetMessageResult {
     public long waitTime = -1;
     public boolean isSlowFreq = false;
     public boolean isFromSsdFile = false;
-    public HashMap<String, CountItem> tmpCounters = new HashMap<String, CountItem>();
-    public List<TransferedMessage> transferedMessageList = new ArrayList<TransferedMessage>();
+    public HashMap<String, CountItem> tmpCounters = new HashMap<>();
+    public List<TransferedMessage> transferedMessageList = new ArrayList<>();
 
 
     public GetMessageResult(boolean isSuccess, int retCode, final String errInfo,
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/MsgFileStore.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/MsgFileStore.java
index 4440806..9bbd492 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/MsgFileStore.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/MsgFileStore.java
@@ -256,7 +256,7 @@ public class MsgFileStore implements Closeable {
         ByteBuffer dataBuffer =
                 ByteBuffer.allocate(TServerConstants.CFG_STORE_DEFAULT_MSG_READ_UNIT);
         List<ClientBroker.TransferedMessage> transferedMessageList =
-                new ArrayList<ClientBroker.TransferedMessage>();
+                new ArrayList<>();
         // read data file by index.
         for (curIndexOffset = 0; curIndexOffset < indexBuffer.remaining();
              curIndexOffset += DataStoreUtils.STORE_INDEX_HEAD_LEN) {
@@ -513,7 +513,7 @@ public class MsgFileStore implements Closeable {
                 .append(segTypeStr).append(" segments ")
                 .append(segListDir.getAbsolutePath()).toString());
         sBuilder.delete(0, sBuilder.length());
-        final List<Segment> accum = new ArrayList<Segment>();
+        final List<Segment> accum = new ArrayList<>();
         final File[] ls = segListDir.listFiles();
         if (ls != null) {
             for (final File file : ls) {
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/mem/MsgMemStore.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/mem/MsgMemStore.java
index 6ba9012..6f14f4f 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/mem/MsgMemStore.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/mem/MsgMemStore.java
@@ -51,10 +51,10 @@ public class MsgMemStore implements Closeable {
     private final ReentrantLock writeLock = new ReentrantLock();
     // partitionId to index position, accelerate query
     private final ConcurrentHashMap<Integer, Integer> queuesMap =
-            new ConcurrentHashMap<Integer, Integer>(20);
+            new ConcurrentHashMap<>(20);
     // key to index position, used for filter consume
     private final ConcurrentHashMap<Integer, Integer> keysMap =
-            new ConcurrentHashMap<Integer, Integer>(100);
+            new ConcurrentHashMap<>(100);
     // where messages in memory will sink to disk
     private long writeDataStartPos = -1;
     private ByteBuffer cacheDataSegment;
@@ -147,7 +147,7 @@ public class MsgMemStore implements Closeable {
         Integer lastWritePos = 0;
         boolean hasMsg = false;
         // judge memory contains the given offset or not.
-        List<ByteBuffer> cacheMsgList = new ArrayList<ByteBuffer>();
+        List<ByteBuffer> cacheMsgList = new ArrayList<>();
         if (lastOffset < this.writeIndexStartPos) {
             return new GetCacheMsgResult(false, TErrCodeConstants.MOVED,
                     lastOffset, "Request offset lower than cache minOffset");
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/ssd/MsgSSDSegment.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/ssd/MsgSSDSegment.java
index a485374..c3adb5c 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/ssd/MsgSSDSegment.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/ssd/MsgSSDSegment.java
@@ -46,7 +46,7 @@ public class MsgSSDSegment implements Closeable {
     private static final Logger logger = LoggerFactory.getLogger(MsgSSDSegment.class);
     private final Segment dataSegment;
     private final ConcurrentHashMap<String, SSDVisitInfo> visitMap =
-        new ConcurrentHashMap<String, SSDVisitInfo>();
+        new ConcurrentHashMap<>();
     private final String topic;
     private final String storeKey;
     private AtomicLong lastReadTime = new AtomicLong(System.currentTimeMillis());
@@ -100,11 +100,11 @@ public class MsgSSDSegment implements Closeable {
         long lastRdDataOffset = -1L;
         final long curDataMaxOffset = getDataMaxOffset();
         final long curDataMinOffset = getDataMinOffset();
-        HashMap<String, CountItem> countMap = new HashMap<String, CountItem>();
+        HashMap<String, CountItem> countMap = new HashMap<>();
         ByteBuffer dataBuffer =
             ByteBuffer.allocate(TServerConstants.CFG_STORE_DEFAULT_MSG_READ_UNIT);
         List<ClientBroker.TransferedMessage> transferedMessageList =
-            new ArrayList<ClientBroker.TransferedMessage>();
+            new ArrayList<>();
         if (this.dataSegment == null) {
             logger.error(strBuffer.append("[SSD Store] Found SSD fileRecordView is null! storeKey=")
                 .append(this.storeKey).toString());
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/ssd/MsgSSDStoreManager.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/ssd/MsgSSDStoreManager.java
index 974f28e..0be8806 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/ssd/MsgSSDStoreManager.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/ssd/MsgSSDStoreManager.java
@@ -69,7 +69,7 @@ public class MsgSSDStoreManager implements Closeable {
     // ssd data directory
     private final File ssdBaseDataDir;
     private final BlockingQueue<SSDSegEvent> reqSSDEvents
-            = new ArrayBlockingQueue<SSDSegEvent>(60);
+            = new ArrayBlockingQueue<>(60);
     private final ExecutorService statusCheckExecutor = Executors.newSingleThreadExecutor();
     private final ExecutorService reqExecutor = Executors.newSingleThreadExecutor();
     // total ssd files size
@@ -82,9 +82,9 @@ public class MsgSSDStoreManager implements Closeable {
     private AtomicInteger firstChecked = new AtomicInteger(3);
     // ssd segments
     private ConcurrentHashMap<String, ConcurrentHashMap<Long, MsgSSDSegment>> ssdSegmentsMap =
-            new ConcurrentHashMap<String, ConcurrentHashMap<Long, MsgSSDSegment>>(60);
+            new ConcurrentHashMap<>(60);
     private ConcurrentHashMap<String, ConcurrentHashSet<SSDSegIndex>> ssdPartStrMap =
-            new ConcurrentHashMap<String, ConcurrentHashSet<SSDSegIndex>>(60);
+            new ConcurrentHashMap<>(60);
     private long startTime = System.currentTimeMillis();
     private long lastCheckTime = System.currentTimeMillis();
 
@@ -269,7 +269,7 @@ public class MsgSSDStoreManager implements Closeable {
                 && totalSsdFileSize.get() < (long) (tubeConfig.getMaxSSDTotalFileSizes() * 0.7))) {
             return true;
         }
-        Set<SSDSegIndex> msgSsdIndexSet = new HashSet<SSDSegIndex>();
+        Set<SSDSegIndex> msgSsdIndexSet = new HashSet<>();
         for (Map.Entry<String, ConcurrentHashMap<Long, MsgSSDSegment>> entry : ssdSegmentsMap.entrySet()) {
             if (entry.getKey() == null || entry.getValue() == null) {
                 continue;
@@ -323,7 +323,7 @@ public class MsgSSDStoreManager implements Closeable {
             ConcurrentHashMap<String, SSDVisitInfo> ssdVisitInfoMap =
                     msgSsdSegment.getVisitMap();
             if (ssdVisitInfoMap != null) {
-                List<String> rmvPartStrList = new ArrayList<String>();
+                List<String> rmvPartStrList = new ArrayList<>();
                 for (String partStr : ssdVisitInfoMap.keySet()) {
                     ConsumerNodeInfo consumerNodeInfo =
                             consumerNodeInfoMap.get(partStr);
@@ -494,7 +494,7 @@ public class MsgSSDStoreManager implements Closeable {
                 ssdSegmentsMap.get(storeKey);
         if (msgSsdSegMap == null) {
             ConcurrentHashMap<Long, MsgSSDSegment> tmpMsgSsdSegMap =
-                    new ConcurrentHashMap<Long, MsgSSDSegment>();
+                    new ConcurrentHashMap<>();
             msgSsdSegMap = ssdSegmentsMap.putIfAbsent(storeKey, tmpMsgSsdSegMap);
             if (msgSsdSegMap == null) {
                 msgSsdSegMap = tmpMsgSsdSegMap;
@@ -575,7 +575,7 @@ public class MsgSSDStoreManager implements Closeable {
         }
         ConcurrentHashSet<SSDSegIndex> ssdSegIndices = this.ssdPartStrMap.get(partStr);
         if (ssdSegIndices == null) {
-            ConcurrentHashSet<SSDSegIndex> tmpSsdSegIndices = new ConcurrentHashSet<SSDSegIndex>();
+            ConcurrentHashSet<SSDSegIndex> tmpSsdSegIndices = new ConcurrentHashSet<>();
             ssdSegIndices = this.ssdPartStrMap.putIfAbsent(partStr, tmpSsdSegIndices);
             if (ssdSegIndices == null) {
                 ssdSegIndices = tmpSsdSegIndices;
@@ -601,7 +601,7 @@ public class MsgSSDStoreManager implements Closeable {
                 ssdSegmentsMap.get(ssdSegEvent.storeKey);
         if (msgSegmentMap == null) {
             ConcurrentHashMap<Long, MsgSSDSegment> tmpMsgSsdSegMap =
-                    new ConcurrentHashMap<Long, MsgSSDSegment>();
+                    new ConcurrentHashMap<>();
             msgSegmentMap =
                     ssdSegmentsMap.putIfAbsent(ssdSegEvent.storeKey, tmpMsgSsdSegMap);
             if (msgSegmentMap == null) {
@@ -616,7 +616,7 @@ public class MsgSSDStoreManager implements Closeable {
                     ssdPartStrMap.get(ssdSegEvent.partStr);
             if (ssdSegIndices == null) {
                 ConcurrentHashSet<SSDSegIndex> tmpSsdSegIndices =
-                        new ConcurrentHashSet<SSDSegIndex>();
+                        new ConcurrentHashSet<>();
                 ssdSegIndices =
                         ssdPartStrMap.putIfAbsent(ssdSegEvent.partStr, tmpSsdSegIndices);
                 if (ssdSegIndices == null) {
@@ -673,7 +673,7 @@ public class MsgSSDStoreManager implements Closeable {
                         this.ssdSegmentsMap.get(storeKey);
                 if (msgSsdSegMap == null) {
                     ConcurrentHashMap<Long, MsgSSDSegment> tmpMsgSsdSegMap =
-                            new ConcurrentHashMap<Long, MsgSSDSegment>();
+                            new ConcurrentHashMap<>();
                     msgSsdSegMap =
                             ssdSegmentsMap.putIfAbsent(storeKey, tmpMsgSsdSegMap);
                     if (msgSsdSegMap == null) {
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/nodeinfo/ConsumerNodeInfo.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/nodeinfo/ConsumerNodeInfo.java
index 786390c..a66f3af 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/nodeinfo/ConsumerNodeInfo.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/nodeinfo/ConsumerNodeInfo.java
@@ -44,9 +44,9 @@ public class ConsumerNodeInfo {
     // is filter consumer or not
     private boolean isFilterConsume = false;
     // filter conditions in string format
-    private Set<String> filterCondStrs = new HashSet<String>(10);
+    private Set<String> filterCondStrs = new HashSet<>(10);
     // filter conditions in int format
-    private Set<Integer> filterCondCode = new HashSet<Integer>(10);
+    private Set<Integer> filterCondCode = new HashSet<>(10);
     // consumer's address
     private String rmtAddrInfo;
     private boolean isSupportLimit = false;
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/DefaultOffsetManager.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/DefaultOffsetManager.java
index b4228ed..82a758c 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/DefaultOffsetManager.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/DefaultOffsetManager.java
@@ -41,10 +41,10 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse
     private final OffsetStorage zkOffsetStorage;
     private final ConcurrentHashMap<String/* group */,
             ConcurrentHashMap<String/* topic - partitionId*/, OffsetStorageInfo>> cfmOffsetMap =
-            new ConcurrentHashMap<String, ConcurrentHashMap<String/* topic */, OffsetStorageInfo>>();
+            new ConcurrentHashMap<>();
     private final ConcurrentHashMap<String/* group */,
             ConcurrentHashMap<String/* topic - partitionId*/, Long>> tmpOffsetMap =
-            new ConcurrentHashMap<String, ConcurrentHashMap<String, Long>>();
+            new ConcurrentHashMap<>();
 
     public DefaultOffsetManager(final BrokerConfig brokerConfig) {
         super("[Offset Manager]", brokerConfig.getZkConfig().getZkCommitPeriodMs());
@@ -334,7 +334,7 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse
         long tmpOffset = origOffset - origOffset % DataStoreUtils.STORE_INDEX_HEAD_LEN;
         ConcurrentHashMap<String, Long> partTmpOffsetMap = tmpOffsetMap.get(group);
         if (partTmpOffsetMap == null) {
-            ConcurrentHashMap<String, Long> tmpMap = new ConcurrentHashMap<String, Long>();
+            ConcurrentHashMap<String, Long> tmpMap = new ConcurrentHashMap<>();
             partTmpOffsetMap = tmpOffsetMap.putIfAbsent(group, tmpMap);
             if (partTmpOffsetMap == null) {
                 partTmpOffsetMap = tmpMap;
@@ -351,7 +351,7 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse
     private long getAndResetTmpOffset(final String group, final String offsetCacheKey) {
         ConcurrentHashMap<String, Long> partTmpOffsetMap = tmpOffsetMap.get(group);
         if (partTmpOffsetMap == null) {
-            ConcurrentHashMap<String, Long> tmpMap = new ConcurrentHashMap<String, Long>();
+            ConcurrentHashMap<String, Long> tmpMap = new ConcurrentHashMap<>();
             partTmpOffsetMap = tmpOffsetMap.putIfAbsent(group, tmpMap);
             if (partTmpOffsetMap == null) {
                 partTmpOffsetMap = tmpMap;
@@ -416,7 +416,7 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse
         ConcurrentHashMap<String, OffsetStorageInfo> regInfoMap = cfmOffsetMap.get(group);
         if (regInfoMap == null) {
             ConcurrentHashMap<String, OffsetStorageInfo> tmpRegInfoMap
-                    = new ConcurrentHashMap<String, OffsetStorageInfo>();
+                    = new ConcurrentHashMap<>();
             regInfoMap = cfmOffsetMap.putIfAbsent(group, tmpRegInfoMap);
             if (regInfoMap == null) {
                 regInfoMap = tmpRegInfoMap;
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/stats/GroupCountService.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/stats/GroupCountService.java
index 1f863d4..b0de32e 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/stats/GroupCountService.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/stats/GroupCountService.java
@@ -142,6 +142,6 @@ public class GroupCountService extends AbstractDaemonService implements CountSer
     private class CountSet {
         public AtomicLong refCnt = new AtomicLong(0);
         public ConcurrentHashMap<String, CountItem> counterItem =
-                new ConcurrentHashMap<String, CountItem>();
+                new ConcurrentHashMap<>();
     }
 }
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java
index cb2be0f..eb3d586 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java
@@ -282,7 +282,7 @@ public class BrokerAdminServlet extends HttpServlet {
      */
     private StringBuilder adminGetMemStoreStatisInfo(HttpServletRequest req) throws Exception {
         StringBuilder sBuilder = new StringBuilder(1024);
-        Set<String> batchTopicNames = new HashSet<String>();
+        Set<String> batchTopicNames = new HashSet<>();
         String inputTopicName = req.getParameter("topicName");
         if (TStringUtils.isNotBlank(inputTopicName)) {
             inputTopicName = String.valueOf(inputTopicName).trim();
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/aaaserver/SimpleCertificateBrokerHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/aaaserver/SimpleCertificateBrokerHandler.java
index 73f8708..d190500 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/aaaserver/SimpleCertificateBrokerHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/aaaserver/SimpleCertificateBrokerHandler.java
@@ -39,7 +39,7 @@ public class SimpleCertificateBrokerHandler implements CertificateBrokerHandler
     private long inValidTokenCheckTimeMs = 120000; // 2 minutes
     private final TubeBroker tubeBroker;
     private final AtomicReference<List<Long>> visitTokenList =
-            new AtomicReference<List<Long>>();
+            new AtomicReference<>();
     private String lastUpdatedVisitTokens = "";
     private boolean enableVisitTokenCheck = false;
     private boolean enableProduceAuthenticate = false;
@@ -93,7 +93,7 @@ public class SimpleCertificateBrokerHandler implements CertificateBrokerHandler
                         if (currList.contains(curVisitToken)) {
                             break;
                         }
-                        List<Long> updateList = new ArrayList<Long>(currList);
+                        List<Long> updateList = new ArrayList<>(currList);
                         while (updateList.size() >= MAX_VISIT_TOKEN_SIZE) {
                             updateList.remove(0);
                         }
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/heartbeat/HeartbeatManager.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/heartbeat/HeartbeatManager.java
index 44f734b..c000b31 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/heartbeat/HeartbeatManager.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/heartbeat/HeartbeatManager.java
@@ -34,11 +34,11 @@ public class HeartbeatManager {
     private static final Logger logger = LoggerFactory.getLogger(HeartbeatManager.class);
 
     private final ConcurrentHashMap<String, TimeoutInfo> brokerRegMap =
-            new ConcurrentHashMap<String, TimeoutInfo>();
+            new ConcurrentHashMap<>();
     private final ConcurrentHashMap<String, TimeoutInfo> producerRegMap =
-            new ConcurrentHashMap<String, TimeoutInfo>();
+            new ConcurrentHashMap<>();
     private final ConcurrentHashMap<String, TimeoutInfo> consumerRegMap =
-            new ConcurrentHashMap<String, TimeoutInfo>();
+            new ConcurrentHashMap<>();
     private final ExecutorService timeoutScanService = Executors.newCachedThreadPool();
     private long brokerTimeoutDlt = 0;
     private long producerTimeoutDlt = 0;
@@ -128,7 +128,7 @@ public class HeartbeatManager {
                 while (!isStopped) {
                     try {
                         long currentTime = System.currentTimeMillis();
-                        Set<String> removedNodeKey = new HashSet<String>();
+                        Set<String> removedNodeKey = new HashSet<>();
                         for (Map.Entry<String, TimeoutInfo> entry : nodeMap.entrySet()) {
                             if (TStringUtils.isBlank(entry.getKey()) || entry.getValue() == null) {
                                 continue;
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/zookeeper/RecoverableZooKeeper.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/zookeeper/RecoverableZooKeeper.java
index 13e7008..c9e27ea 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/zookeeper/RecoverableZooKeeper.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/zookeeper/RecoverableZooKeeper.java
@@ -84,7 +84,7 @@ public class RecoverableZooKeeper {
      * @return list of every element that starts with one of the prefixes
      */
     private static synchronized List<String> filterByPrefix(List<String> nodes, String... prefixes) {
-        List<String> lockChildren = new ArrayList<String>();
+        List<String> lockChildren = new ArrayList<>();
         for (String child : nodes) {
             for (String prefix : prefixes) {
                 if (child.startsWith(prefix)) {
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/zookeeper/ZooKeeperWatcher.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/zookeeper/ZooKeeperWatcher.java
index 4d62e4c..996e2f6 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/zookeeper/ZooKeeperWatcher.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/zookeeper/ZooKeeperWatcher.java
@@ -63,7 +63,7 @@ public class ZooKeeperWatcher implements Watcher, Abortable {
     private static final Logger logger = LoggerFactory.getLogger(ZooKeeperWatcher.class);
     protected final ZKConfig conf;
     // listeners to be notified
-    private final List<ZooKeeperListener> listeners = new CopyOnWriteArrayList<ZooKeeperListener>();
+    private final List<ZooKeeperListener> listeners = new CopyOnWriteArrayList<>();
     private final Exception constructorCaller;
     // abortable in case of zk failure
     private Abortable abortable;
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/paramcheck/PBParameterUtils.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/paramcheck/PBParameterUtils.java
index 9a4e4f8..cb93dff 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/paramcheck/PBParameterUtils.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/paramcheck/PBParameterUtils.java
@@ -126,7 +126,7 @@ public class PBParameterUtils {
                                                               final Set<String> reqTopicSet,
                                                               final String requiredParts,
                                                               final StringBuilder strBuffer) {
-        Map<String, Long> requiredPartMap = new HashMap<String, Long>();
+        Map<String, Long> requiredPartMap = new HashMap<>();
         ParamCheckResult retResult = new ParamCheckResult();
         if (!isReqConsumeBand) {
             retResult.setCheckData(requiredPartMap);
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/RowLock.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/RowLock.java
index 66c0817..874fd2b 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/RowLock.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/RowLock.java
@@ -34,9 +34,9 @@ public class RowLock {
     private static final Logger logger = LoggerFactory.getLogger(RowLock.class);
     private static Random rand = new Random();
     private final ConcurrentHashMap<HashedBytes, CountDownLatch> lockedRows =
-            new ConcurrentHashMap<HashedBytes, CountDownLatch>();
+            new ConcurrentHashMap<>();
     private final ConcurrentHashMap<Integer, HashedBytes> lockIds =
-            new ConcurrentHashMap<Integer, HashedBytes>();
+            new ConcurrentHashMap<>();
     private final AtomicInteger lockIdGenerator = new AtomicInteger(1);
     private final int rowLockWaitDuration;
     private final String name;
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
index 9b9b60f..1791f43 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
@@ -512,7 +512,7 @@ public class WebParameterUtils {
                                                  boolean checkResToken,
                                                  Set<String> resTokens,
                                                  final StringBuilder sb) throws Exception {
-        Set<String> batchOpGroupNames = new HashSet<String>();
+        Set<String> batchOpGroupNames = new HashSet<>();
         if (TStringUtils.isNotBlank(inputGroupName)) {
             inputGroupName = escDoubleQuotes(inputGroupName.trim());
         }
@@ -566,7 +566,7 @@ public class WebParameterUtils {
                                                  boolean checkRange,
                                                  Set<String> checkedTopicList,
                                                  final StringBuilder sb) throws Exception {
-        Set<String> batchOpTopicNames = new HashSet<String>();
+        Set<String> batchOpTopicNames = new HashSet<>();
         if (TStringUtils.isNotBlank(inputTopicName)) {
             inputTopicName = escDoubleQuotes(inputTopicName.trim());
         }
@@ -615,7 +615,7 @@ public class WebParameterUtils {
 
     public static Set<String> getBatchBrokerIpSet(String inStrBrokerIps,
                                                   boolean checkEmpty) throws Exception {
-        Set<String> batchBrokerIps = new HashSet<String>();
+        Set<String> batchBrokerIps = new HashSet<>();
         if (TStringUtils.isNotBlank(inStrBrokerIps)) {
             inStrBrokerIps = escDoubleQuotes(inStrBrokerIps.trim());
         }
@@ -648,7 +648,7 @@ public class WebParameterUtils {
 
     public static Set<Integer> getBatchBrokerIdSet(String inStrBrokerIds,
                                                    boolean checkEmpty) throws Exception {
-        Set<Integer> batchBrokerIdSet = new HashSet<Integer>();
+        Set<Integer> batchBrokerIdSet = new HashSet<>();
         if (TStringUtils.isNotBlank(inStrBrokerIds)) {
             inStrBrokerIds = escDoubleQuotes(inStrBrokerIds.trim());
         }
@@ -684,7 +684,7 @@ public class WebParameterUtils {
                                                                BrokerConfManager webMaster,
                                                                boolean checkEmpty,
                                                                final StringBuilder sb) throws Exception {
-        Set<BdbBrokerConfEntity> batchBrokerIdSet = new HashSet<BdbBrokerConfEntity>();
+        Set<BdbBrokerConfEntity> batchBrokerIdSet = new HashSet<>();
         if (TStringUtils.isNotBlank(inStrBrokerIds)) {
             inStrBrokerIds = escDoubleQuotes(inStrBrokerIds.trim());
         }
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java
index 8a35d25..41056b9 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java
@@ -120,7 +120,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
     private static final int MAX_BALANCE_DELAY_TIME = 10;
 
     private final ConcurrentHashMap<String/* consumerId */, Map<String/* topic */, Map<String, Partition>>>
-            currentSubInfo = new ConcurrentHashMap<String, Map<String, Map<String, Partition>>>();
+            currentSubInfo = new ConcurrentHashMap<>();
     private final RpcServiceFactory rpcServiceFactory =     //rpc service factory
             new RpcServiceFactory();
     private final ConsumerEventManager consumerEventManager;    //consumer event manager
@@ -337,7 +337,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
         }
         heartbeatManager.regProducerNode(producerId);
         producerHolder.setProducerInfo(producerId,
-                new HashSet<String>(transTopicSet), hostName, overtls);
+                new HashSet<>(transTopicSet), hostName, overtls);
         builder.setBrokerCheckSum(this.defaultBrokerConfManager.getBrokerInfoCheckSum());
         builder.addAllBrokerInfos(this.defaultBrokerConfManager.getBrokersMap(overtls).values());
         builder.setAuthorizedInfo(genAuthorizedInfo(certResult.authorizedToken, false).build());
@@ -618,7 +618,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
                 ConcurrentHashSet<String> groupSet =
                         topicPSInfoManager.getTopicSubInfo(topic);
                 if (groupSet == null) {
-                    groupSet = new ConcurrentHashSet<String>();
+                    groupSet = new ConcurrentHashSet<>();
                     topicPSInfoManager.setTopicSubInfo(topic, groupSet);
                 }
                 if (!groupSet.contains(groupName)) {
@@ -627,7 +627,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
             }
             if (CollectionUtils.isNotEmpty(subscribeList)) {
                 Map<String, Map<String, Partition>> topicPartSubMap =
-                        new HashMap<String, Map<String, Partition>>();
+                        new HashMap<>();
                 currentSubInfo.put(consumerId, topicPartSubMap);
                 for (SubscribeInfo info : subscribeList) {
                     Map<String, Partition> partMap = topicPartSubMap.get(info.getTopic());
@@ -765,7 +765,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
         Map<String, Map<String, Partition>> topicPartSubList =
                 currentSubInfo.get(clientId);
         if (topicPartSubList == null) {
-            topicPartSubList = new HashMap<String, Map<String, Partition>>();
+            topicPartSubList = new HashMap<>();
             Map<String, Map<String, Partition>> tmpTopicPartSubList =
                     currentSubInfo.putIfAbsent(clientId, topicPartSubList);
             if (tmpTopicPartSubList != null) {
@@ -1375,7 +1375,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
      * @return
      */
     private Map<String, String> getProducerTopicPartitionInfo(String producerId) {
-        Map<String, String> topicPartStrMap = new HashMap<String, String>();
+        Map<String, String> topicPartStrMap = new HashMap<>();
         ProducerInfo producerInfo = producerHolder.getProducerInfo(producerId);
         if (producerInfo == null) {
             return topicPartStrMap;
@@ -1387,7 +1387,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
             return topicPartStrMap;
         }
         Map<String, StringBuilder> topicPartStrBuilderMap =
-                new HashMap<String, StringBuilder>();
+                new HashMap<>();
         for (String topic : producerInfoTopicSet) {
             if (topic == null) {
                 continue;
@@ -1441,7 +1441,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
                               Map<String/* topicName */, TopicInfo> newTopicInfoMap,
                               boolean requirePartUpdate, boolean requireAcceptPublish,
                               boolean requireAcceptSubscribe) {
-        List<TopicInfo> needAddTopicList = new ArrayList<TopicInfo>();
+        List<TopicInfo> needAddTopicList = new ArrayList<>();
         for (Map.Entry<String, TopicInfo> entry : newTopicInfoMap.entrySet()) {
             TopicInfo newTopicInfo = entry.getValue();
             TopicInfo oldTopicInfo = null;
@@ -1482,7 +1482,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
     private void deleteTopics(BrokerInfo brokerInfo, final StringBuilder strBuffer,
                               Map<String/* topicName */, TopicInfo> curTopicInfoMap,
                               Map<String/* topicName */, TopicInfo> newTopicInfoMap) {
-        List<TopicInfo> needRmvTopicList = new ArrayList<TopicInfo>();
+        List<TopicInfo> needRmvTopicList = new ArrayList<>();
         if (curTopicInfoMap != null) {
             for (Map.Entry<String, TopicInfo> entry : curTopicInfoMap.entrySet()) {
                 if (newTopicInfoMap.get(entry.getKey()) == null) {
@@ -1757,7 +1757,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
             }
         }
         ConcurrentHashMap<String/* topic */, TopicInfo> newTopicInfoMap =
-                new ConcurrentHashMap<String, TopicInfo>();
+                new ConcurrentHashMap<>();
         // according to broker status and default config, topic config, make up current status record
         for (String strTopicConfInfo : brokerTopicSetConfInfo) {
             if (TStringUtils.isBlank(strTopicConfInfo)) {
@@ -1823,7 +1823,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
     private void updateTopicsInternal(BrokerInfo broker,
                                       List<TopicInfo> topicList,
                                       EventType type) {
-        List<TopicInfo> cloneTopicList = new ArrayList<TopicInfo>();
+        List<TopicInfo> cloneTopicList = new ArrayList<>();
         for (TopicInfo topicInfo : topicList) {
             cloneTopicList.add(topicInfo.clone());
         }
@@ -1834,7 +1834,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
                 ConcurrentHashMap<BrokerInfo, TopicInfo> topicInfoMap =
                         topicPSInfoManager.getBrokerPubInfo(topicInfo.getTopic());
                 if (topicInfoMap == null) {
-                    topicInfoMap = new ConcurrentHashMap<BrokerInfo, TopicInfo>();
+                    topicInfoMap = new ConcurrentHashMap<>();
                     topicPSInfoManager.setBrokerPubInfo(topicInfo.getTopic(), topicInfoMap);
                 }
                 if (EventType.CONNECT == type) {
@@ -1919,8 +1919,8 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
             }
             List<String> blackTopicList = this.defaultBrokerConfManager.getBdbBlackTopicList(tupleInfo.groupName);
             Map<String, List<Partition>> topicSubPartMap = entry.getValue();
-            List<SubscribeInfo> deletedSubInfoList = new ArrayList<SubscribeInfo>();
-            List<SubscribeInfo> addedSubInfoList = new ArrayList<SubscribeInfo>();
+            List<SubscribeInfo> deletedSubInfoList = new ArrayList<>();
+            List<SubscribeInfo> addedSubInfoList = new ArrayList<>();
             for (Map.Entry<String, List<Partition>> topicEntry : topicSubPartMap.entrySet()) {
                 String topic = topicEntry.getKey();
                 List<Partition> finalPartList = topicEntry.getValue();
@@ -2071,8 +2071,8 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
             List<String> blackTopicList =
                     this.defaultBrokerConfManager.getBdbBlackTopicList(tupleInfo.groupName);
             Map<String, Map<String, Partition>> topicSubPartMap = entry.getValue();
-            List<SubscribeInfo> deletedSubInfoList = new ArrayList<SubscribeInfo>();
-            List<SubscribeInfo> addedSubInfoList = new ArrayList<SubscribeInfo>();
+            List<SubscribeInfo> deletedSubInfoList = new ArrayList<>();
+            List<SubscribeInfo> addedSubInfoList = new ArrayList<>();
             for (Map.Entry<String, Map<String, Partition>> topicEntry : topicSubPartMap.entrySet()) {
                 String topic = topicEntry.getKey();
                 Map<String, Partition> finalPartMap = topicEntry.getValue();
@@ -2208,8 +2208,8 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
      * @return
      */
     private List<String> getNeedToBalanceGroupList(final StringBuilder strBuffer) {
-        List<String> groupsNeedToBalance = new ArrayList<String>();
-        Set<String> groupHasUnfinishedEvent = new HashSet<String>();
+        List<String> groupsNeedToBalance = new ArrayList<>();
+        Set<String> groupHasUnfinishedEvent = new HashSet<>();
         if (consumerEventManager.hasEvent()) {
             Set<String> consumerIdSet =
                     consumerEventManager.getUnProcessedIdSet();
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/balance/DefaultLoadBalancer.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/balance/DefaultLoadBalancer.java
index 07c487e..860dd05 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/balance/DefaultLoadBalancer.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/balance/DefaultLoadBalancer.java
@@ -83,10 +83,10 @@ public class DefaultLoadBalancer implements LoadBalancer {
         // load balance according to group
         Map<String/* consumer */,
                 Map<String/* topic */, List<Partition>>> finalSubInfoMap =
-                new HashMap<String, Map<String, List<Partition>>>();
-        Map<String, RebProcessInfo> rejGroupClientINfoMap = new HashMap<String, RebProcessInfo>();
-        Set<String> onlineOfflineGroupSet = new HashSet<String>();
-        Set<String> bandGroupSet = new HashSet<String>();
+                new HashMap<>();
+        Map<String, RebProcessInfo> rejGroupClientINfoMap = new HashMap<>();
+        Set<String> onlineOfflineGroupSet = new HashSet<>();
+        Set<String> bandGroupSet = new HashSet<>();
         for (String group : groupSet) {
             if (group == null) {
                 continue;
@@ -109,7 +109,7 @@ public class DefaultLoadBalancer implements LoadBalancer {
                 bandGroupSet.add(group);
                 continue;
             }
-            List<ConsumerInfo> newConsumerList = new ArrayList<ConsumerInfo>();
+            List<ConsumerInfo> newConsumerList = new ArrayList<>();
             for (ConsumerInfo consumerInfo : consumerList) {
                 if (consumerInfo != null) {
                     newConsumerList.add(consumerInfo);
@@ -159,11 +159,11 @@ public class DefaultLoadBalancer implements LoadBalancer {
                     rejGroupClientINfoMap.put(group, rebProcessInfo);
                 }
             }
-            List<ConsumerInfo> newConsumerList2 = new ArrayList<ConsumerInfo>();
+            List<ConsumerInfo> newConsumerList2 = new ArrayList<>();
             Map<String, Partition> psMap = topicPSInfoManager.getPartitionMap(topicSet);
             Map<String, NodeRebInfo> rebProcessInfoMap = consumerBandInfo.getRebalanceMap();
             for (ConsumerInfo consumer : newConsumerList) {
-                Map<String, List<Partition>> partitions = new HashMap<String, List<Partition>>();
+                Map<String, List<Partition>> partitions = new HashMap<>();
                 finalSubInfoMap.put(consumer.getConsumerId(), partitions);
                 Map<String, Map<String, Partition>> relation = clusterState.get(consumer.getConsumerId());
                 if (relation != null) {
@@ -183,7 +183,7 @@ public class DefaultLoadBalancer implements LoadBalancer {
                     }
                     newConsumerList2.add(consumer);
                     for (Entry<String, Map<String, Partition>> entry : relation.entrySet()) {
-                        List<Partition> ps = new ArrayList<Partition>();
+                        List<Partition> ps = new ArrayList<>();
                         partitions.put(entry.getKey(), ps);
                         Map<String, Partition> partitionMap = entry.getValue();
                         if (partitionMap != null && !partitionMap.isEmpty()) {
@@ -210,7 +210,7 @@ public class DefaultLoadBalancer implements LoadBalancer {
         if (onlineOfflineGroupSet.size() == 0) {
             groupsNeedToBalance = groupSet;
         } else {
-            groupsNeedToBalance = new ArrayList<String>();
+            groupsNeedToBalance = new ArrayList<>();
             for (String group : groupSet) {
                 if (group == null) {
                     continue;
@@ -255,7 +255,7 @@ public class DefaultLoadBalancer implements LoadBalancer {
                 continue;
             }
             // filter consumer which don't need to handle
-            List<ConsumerInfo> consumerList = new ArrayList<ConsumerInfo>();
+            List<ConsumerInfo> consumerList = new ArrayList<>();
             List<ConsumerInfo> consumerList1 = consumerBandInfo.getConsumerInfoList();
             RebProcessInfo rebProcessInfo = rejGroupClientInfoMap.get(group);
             if (rebProcessInfo != null) {
@@ -265,7 +265,7 @@ public class DefaultLoadBalancer implements LoadBalancer {
                         Map<String, List<Partition>> partitions2 =
                                 clusterState.get(consumerInfo.getConsumerId());
                         if (partitions2 == null) {
-                            partitions2 = new HashMap<String, List<Partition>>();
+                            partitions2 = new HashMap<>();
                             clusterState.put(consumerInfo.getConsumerId(), partitions2);
                         }
                         Map<String, Map<String, Partition>> relation =
@@ -297,7 +297,7 @@ public class DefaultLoadBalancer implements LoadBalancer {
                 Map<String, List<Partition>> partitions =
                         clusterState.get(consumer.getConsumerId());
                 if (partitions == null) {
-                    partitions = new HashMap<String, List<Partition>>();
+                    partitions = new HashMap<>();
                 }
                 int load = 0;
                 for (List<Partition> entry : partitions.values()) {
@@ -365,7 +365,7 @@ public class DefaultLoadBalancer implements LoadBalancer {
                         String consumerId) {
         Map<String, List<Partition>> partitions = clusterState.get(consumerId);
         if (partitions == null) {
-            partitions = new HashMap<String, List<Partition>>();
+            partitions = new HashMap<>();
             clusterState.put(consumerId, partitions);
         }
         List<Partition> ps = partitions.get(partition.getTopic());
@@ -422,12 +422,12 @@ public class DefaultLoadBalancer implements LoadBalancer {
             }
             Map<String, List<Partition>> partitions = clusterState.get(consumer.getConsumerId());
             if (partitions == null) {
-                partitions = new HashMap<String, List<Partition>>();
+                partitions = new HashMap<>();
                 clusterState.put(consumer.getConsumerId(), partitions);
             }
             List<Partition> ps = partitions.get(partition.getTopic());
             if (ps == null) {
-                ps = new ArrayList<Partition>();
+                ps = new ArrayList<>();
                 partitions.put(partition.getTopic(), ps);
             }
             ps.add(partition);
@@ -447,7 +447,7 @@ public class DefaultLoadBalancer implements LoadBalancer {
         if (partitions.isEmpty() || consumers.isEmpty()) {
             return null;
         }
-        Map<String, List<Partition>> assignments = new TreeMap<String, List<Partition>>();
+        Map<String, List<Partition>> assignments = new TreeMap<>();
         int numPartitions = partitions.size();
         int numServers = consumers.size();
         int max = (int) Math.ceil((float) numPartitions / numServers);
@@ -458,7 +458,7 @@ public class DefaultLoadBalancer implements LoadBalancer {
         int partitionIdx = 0;
         for (int j = 0; j < numServers; j++) {
             String server = consumers.get((j + serverIdx) % numServers);
-            List<Partition> serverPartitions = new ArrayList<Partition>(max);
+            List<Partition> serverPartitions = new ArrayList<>(max);
             for (int i = partitionIdx; i < numPartitions; i += numServers) {
                 serverPartitions.add(partitions.get(i % numPartitions));
             }
@@ -564,12 +564,12 @@ public class DefaultLoadBalancer implements LoadBalancer {
                     Map<String, List<Partition>> topicSubPartMap =
                             finalSubInfoMap.get(consumerId);
                     if (topicSubPartMap == null) {
-                        topicSubPartMap = new HashMap<String, List<Partition>>();
+                        topicSubPartMap = new HashMap<>();
                         finalSubInfoMap.put(consumerId, topicSubPartMap);
                     }
                     List<Partition> partList = topicSubPartMap.get(topic);
                     if (partList == null) {
-                        partList = new ArrayList<Partition>();
+                        partList = new ArrayList<>();
                         topicSubPartMap.put(topic, partList);
                     }
                     int startIndex = partsPerConsumer * i + Math.min(i, consumersWithExtraPart);
@@ -637,7 +637,7 @@ public class DefaultLoadBalancer implements LoadBalancer {
             final StringBuilder strBuffer) {
         // band consume reset offset
         Map<String, Map<String, Map<String, Partition>>> finalSubInfoMap =
-                new HashMap<String, Map<String, Map<String, Partition>>>();
+                new HashMap<>();
         // according group
         for (String group : groupSet) {
             if (group == null) {
@@ -686,10 +686,10 @@ public class DefaultLoadBalancer implements LoadBalancer {
             }
             // actual reset offset
             Map<String, Long> partsOffsetMap = consumerBandInfo.getPartOffsetMap();
-            List<OffsetStorageInfo> offsetInfoList = new ArrayList<OffsetStorageInfo>();
+            List<OffsetStorageInfo> offsetInfoList = new ArrayList<>();
             Set<Partition> partPubList =
                     topicPSInfoManager.getPartitions(consumerBandInfo.getTopicSet());
-            Map<String, Partition> partitionMap = new HashMap<String, Partition>();
+            Map<String, Partition> partitionMap = new HashMap<>();
             for (Partition partition : partPubList) {
                 partitionMap.put(partition.getPartitionKey(), partition);
             }
@@ -706,7 +706,7 @@ public class DefaultLoadBalancer implements LoadBalancer {
                     Map<String, Map<String, Partition>> topicSubPartMap =
                             finalSubInfoMap.get(consumerId);
                     if (topicSubPartMap == null) {
-                        topicSubPartMap = new HashMap<String, Map<String, Partition>>();
+                        topicSubPartMap = new HashMap<>();
                         finalSubInfoMap.put(consumerId, topicSubPartMap);
                     }
                     Map<String, Partition> partMap = topicSubPartMap.get(foundPart.getTopic());
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/DefaultBdbStoreService.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/DefaultBdbStoreService.java
index 6cdb2d7..1a57d8d 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/DefaultBdbStoreService.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/DefaultBdbStoreService.java
@@ -93,7 +93,7 @@ public class DefaultBdbStoreService implements BdbStoreService, Server {
     // simple log print
     private final BdbStoreSamplePrint bdbStoreSamplePrint =
             new BdbStoreSamplePrint(logger);
-    private Set<String> replicas4Transfer = new HashSet<String>();
+    private Set<String> replicas4Transfer = new HashSet<>();
     private String masterNodeName;
     private int connectNodeFailCount = 0;
     private long masterStartTime = Long.MAX_VALUE;
@@ -107,12 +107,12 @@ public class DefaultBdbStoreService implements BdbStoreService, Server {
     private EntityStore brokerConfStore;
     private PrimaryIndex<Integer/* brokerId */, BdbBrokerConfEntity> brokerConfIndex;
     private ConcurrentHashMap<Integer/* brokerId */, BdbBrokerConfEntity> brokerConfigMap =
-            new ConcurrentHashMap<Integer, BdbBrokerConfEntity>();
+            new ConcurrentHashMap<>();
     // topic config store
     private EntityStore topicConfStore;
     private PrimaryIndex<String/* recordKey */, BdbTopicConfEntity> topicConfIndex;
     private ConcurrentHashMap<Integer/* brokerId */, ConcurrentHashMap<String/* topicName */, BdbTopicConfEntity>>
-            brokerIdTopicEntityMap = new ConcurrentHashMap<Integer, ConcurrentHashMap<String, BdbTopicConfEntity>>();
+            brokerIdTopicEntityMap = new ConcurrentHashMap<>();
     // consumer group store
     private EntityStore consumerGroupStore;
     private PrimaryIndex<String/* recordKey */, BdbConsumerGroupEntity> consumerGroupIndex;
@@ -120,7 +120,7 @@ public class DefaultBdbStoreService implements BdbStoreService, Server {
             String/* topicName */,
             ConcurrentHashMap<String /* consumerGroup */, BdbConsumerGroupEntity>>
             consumerGroupTopicMap =
-            new ConcurrentHashMap<String, ConcurrentHashMap<String, BdbConsumerGroupEntity>>();
+            new ConcurrentHashMap<>();
     //consumer group black list store
     private EntityStore blackGroupStore;
     private PrimaryIndex<String/* recordKey */, BdbBlackGroupEntity> blackGroupIndex;
@@ -128,12 +128,12 @@ public class DefaultBdbStoreService implements BdbStoreService, Server {
             String/* consumerGroup */,
             ConcurrentHashMap<String /* topicName */, BdbBlackGroupEntity>>
             blackGroupTopicMap =
-            new ConcurrentHashMap<String, ConcurrentHashMap<String, BdbBlackGroupEntity>>();
+            new ConcurrentHashMap<>();
     // topic auth config store
     private EntityStore topicAuthControlStore;
     private PrimaryIndex<String/* recordKey */, BdbTopicAuthControlEntity> topicAuthControlIndex;
     private ConcurrentHashMap<String/* topicName */, BdbTopicAuthControlEntity> topicAuthControlMap =
-            new ConcurrentHashMap<String, BdbTopicAuthControlEntity>();
+            new ConcurrentHashMap<>();
     // consumer group filter condition store
     private EntityStore groupFilterCondStore;
     private PrimaryIndex<String/* recordKey */, BdbGroupFilterCondEntity> groupFilterCondIndex;
@@ -141,24 +141,24 @@ public class DefaultBdbStoreService implements BdbStoreService, Server {
             String/* topicName */,
             ConcurrentHashMap<String /* consumerGroup */, BdbGroupFilterCondEntity>>
             groupFilterCondMap =
-            new ConcurrentHashMap<String, ConcurrentHashMap<String, BdbGroupFilterCondEntity>>();
+            new ConcurrentHashMap<>();
     // consumer group flow control store
     private EntityStore groupFlowCtrlStore;
     private PrimaryIndex<String/* groupName */, BdbGroupFlowCtrlEntity> groupFlowCtrlIndex;
     private ConcurrentHashMap<String/* groupName */, BdbGroupFlowCtrlEntity> groupFlowCtrlMap =
-            new ConcurrentHashMap<String, BdbGroupFlowCtrlEntity>();
+            new ConcurrentHashMap<>();
     // consumer group setting store
     private EntityStore consumeGroupSettingStore;
     private PrimaryIndex<String/* recordKey */, BdbConsumeGroupSettingEntity> consumeGroupSettingIndex;
     private ConcurrentHashMap<String/* consumeGroup */, BdbConsumeGroupSettingEntity> consumeGroupSettingMap =
-            new ConcurrentHashMap<String, BdbConsumeGroupSettingEntity>();
+            new ConcurrentHashMap<>();
     // service status
     private AtomicBoolean isStarted = new AtomicBoolean(false);
     // master role flag
     private volatile boolean isMaster;
     // master node list
     private ConcurrentHashMap<String/* nodeName */, MasterNodeInfo> masterNodeInfoMap =
-            new ConcurrentHashMap<String, MasterNodeInfo>();
+            new ConcurrentHashMap<>();
     private String nodeHost;
     private BDBConfig bdbConfig;
     private Listener listener = new Listener();
@@ -168,7 +168,7 @@ public class DefaultBdbStoreService implements BdbStoreService, Server {
         this.tMaster = tMaster;
         this.nodeHost = masterConfig.getHostName();
         this.bdbConfig = masterConfig.getBdbConfig();
-        Set<InetSocketAddress> helpers = new HashSet<InetSocketAddress>();
+        Set<InetSocketAddress> helpers = new HashSet<>();
         for (int i = 1; i <= 3; i++) {
             InetSocketAddress helper = new InetSocketAddress(this.nodeHost, bdbConfig.getBdbNodePort() + i);
             helpers.add(helper);
@@ -251,7 +251,7 @@ public class DefaultBdbStoreService implements BdbStoreService, Server {
             clusterGroupVO.setPrimaryNodeActive(isPrimaryNodeActive());
             int count = 0;
             boolean hasMaster = false;
-            List<ClusterNodeVO> clusterNodeVOList = new ArrayList<ClusterNodeVO>();
+            List<ClusterNodeVO> clusterNodeVOList = new ArrayList<>();
             for (ReplicationNode node : replicationGroup.getNodes()) {
                 ClusterNodeVO clusterNodeVO = new ClusterNodeVO();
                 clusterNodeVO.setHostName(node.getHostName());
@@ -838,7 +838,7 @@ public class DefaultBdbStoreService implements BdbStoreService, Server {
         }
         int activeNodes = 0;
         boolean isMasterActive = false;
-        Set<String> tmp = new HashSet<String>();
+        Set<String> tmp = new HashSet<>();
         for (ReplicationNode node : replicationGroup.getNodes()) {
             MasterNodeInfo masterNodeInfo =
                     new MasterNodeInfo(replicationGroup.getName(),
@@ -1178,7 +1178,7 @@ public class DefaultBdbStoreService implements BdbStoreService, Server {
                         consumerGroupTopicMap.get(topicName);
                 if (consumerGroupMap == null) {
                     consumerGroupMap =
-                            new ConcurrentHashMap<String, BdbConsumerGroupEntity>();
+                            new ConcurrentHashMap<>();
                     consumerGroupTopicMap.put(topicName, consumerGroupMap);
                 }
                 consumerGroupMap.put(consumerGroupName, bdbEntity);
@@ -1221,7 +1221,7 @@ public class DefaultBdbStoreService implements BdbStoreService, Server {
                         groupFilterCondMap.get(topicName);
                 if (filterCondMap == null) {
                     filterCondMap =
-                            new ConcurrentHashMap<String, BdbGroupFilterCondEntity>();
+                            new ConcurrentHashMap<>();
                     groupFilterCondMap.put(topicName, filterCondMap);
                 }
                 filterCondMap.put(consumerGroupName, bdbEntity);
@@ -1299,7 +1299,7 @@ public class DefaultBdbStoreService implements BdbStoreService, Server {
                         blackGroupTopicMap.get(consumerGroupName);
                 if (blackGroupMap == null) {
                     blackGroupMap =
-                            new ConcurrentHashMap<String, BdbBlackGroupEntity>();
+                            new ConcurrentHashMap<>();
                     blackGroupTopicMap.put(consumerGroupName, blackGroupMap);
                 }
                 blackGroupMap.put(topicName, bdbEntity);
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerConfManager.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerConfManager.java
index 4503e96..e244feb 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerConfManager.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerConfManager.java
@@ -63,43 +63,33 @@ public class BrokerConfManager implements Server {
     private final BDBConfig bdbConfig;
     private final ScheduledExecutorService scheduledExecutorService;
     private final ConcurrentHashMap<Integer, String> brokersMap =
-            new ConcurrentHashMap<Integer, String>();
+            new ConcurrentHashMap<>();
     private final ConcurrentHashMap<Integer, String> brokersTLSMap =
-            new ConcurrentHashMap<Integer, String>();
+            new ConcurrentHashMap<>();
 
     private final MasterGroupStatus masterGroupStatus = new MasterGroupStatus();
-    private ConcurrentHashMap<Integer/* brokerId */, BdbBrokerConfEntity> brokerConfStoreMap =
-            new ConcurrentHashMap<Integer, BdbBrokerConfEntity>();
+    private ConcurrentHashMap<Integer/* brokerId */, BdbBrokerConfEntity> brokerConfStoreMap;
     private ConcurrentHashMap<Integer/* brokerId */, BrokerSyncStatusInfo> brokerRunSyncManageMap =
-            new ConcurrentHashMap<Integer, BrokerSyncStatusInfo>();
+            new ConcurrentHashMap<>();
     private ConcurrentHashMap<Integer/* brokerId */, ConcurrentHashMap<String/* topicName */, BdbTopicConfEntity>>
-            brokerTopicEntityStoreMap = new ConcurrentHashMap<Integer, ConcurrentHashMap<String, BdbTopicConfEntity>>();
+            brokerTopicEntityStoreMap;
     private ConcurrentHashMap<Integer/* brokerId */, ConcurrentHashMap<String/* topicName */, TopicInfo>>
-            brokerRunTopicInfoStoreMap = new ConcurrentHashMap<Integer, ConcurrentHashMap<String, TopicInfo>>();
+            brokerRunTopicInfoStoreMap = new ConcurrentHashMap<>();
     private volatile boolean isStarted = false;
     private volatile boolean isStopped = false;
     private DefaultBdbStoreService mBdbStoreManagerService;
-    private ConcurrentHashMap<String, BdbTopicAuthControlEntity> topicAuthControlEnableMap =
-            new ConcurrentHashMap<String, BdbTopicAuthControlEntity>();
+    private ConcurrentHashMap<String, BdbTopicAuthControlEntity> topicAuthControlEnableMap;
     private ConcurrentHashMap<
             String /* topicName */,
-            ConcurrentHashMap<String /* consumerGroup */, BdbConsumerGroupEntity>>
-            consumerGroupTopicMap =
-            new ConcurrentHashMap<String, ConcurrentHashMap<String, BdbConsumerGroupEntity>>();
+            ConcurrentHashMap<String /* consumerGroup */, BdbConsumerGroupEntity>> consumerGroupTopicMap;
     private ConcurrentHashMap<
             String /* consumerGroup */,
-            ConcurrentHashMap<String /* topicName */, BdbBlackGroupEntity>>
-            blackGroupTopicMap =
-            new ConcurrentHashMap<String, ConcurrentHashMap<String, BdbBlackGroupEntity>>();
+            ConcurrentHashMap<String /* topicName */, BdbBlackGroupEntity>> blackGroupTopicMap;
     private ConcurrentHashMap<
             String /* topicName */,
-            ConcurrentHashMap<String /* consumerGroup */, BdbGroupFilterCondEntity>>
-            groupFilterCondTopicMap =
-            new ConcurrentHashMap<String, ConcurrentHashMap<String, BdbGroupFilterCondEntity>>();
-    private ConcurrentHashMap<String /* groupName */, BdbGroupFlowCtrlEntity> consumeGroupFlowCtrlMap =
-            new ConcurrentHashMap<String, BdbGroupFlowCtrlEntity>();
-    private ConcurrentHashMap<String /* consumeGroup */, BdbConsumeGroupSettingEntity> consumeGroupSettingMap =
-            new ConcurrentHashMap<String, BdbConsumeGroupSettingEntity>();
+            ConcurrentHashMap<String /* consumerGroup */, BdbGroupFilterCondEntity>> groupFilterCondTopicMap;
+    private ConcurrentHashMap<String /* groupName */, BdbGroupFlowCtrlEntity> consumeGroupFlowCtrlMap;
+    private ConcurrentHashMap<String /* consumeGroup */, BdbConsumeGroupSettingEntity> consumeGroupSettingMap;
     private AtomicLong brokerInfoCheckSum = new AtomicLong(System.currentTimeMillis());
     private long lastBrokerUpdatedTime = System.currentTimeMillis();
     private long serviceStartTime = System.currentTimeMillis();
@@ -280,7 +270,7 @@ public class BrokerConfManager implements Server {
         if ((reqTopicConditions != null) && (!reqTopicConditions.isEmpty())) {
             // check if request topic set is all in the filter topic set
             Set<String> condTopics = reqTopicConditions.keySet();
-            List<String> unSetTopic = new ArrayList<String>();
+            List<String> unSetTopic = new ArrayList<>();
             for (String topic : condTopics) {
                 if (!reqTopicSet.contains(topic)) {
                     unSetTopic.add(topic);
@@ -296,7 +286,7 @@ public class BrokerConfManager implements Server {
             }
         }
         // check if consumer group is in the blacklist
-        List<String> fbdTopicList = new ArrayList<String>();
+        List<String> fbdTopicList = new ArrayList<>();
         ConcurrentHashMap<String, BdbBlackGroupEntity> blackGroupEntityMap =
                 this.blackGroupTopicMap.get(consumerGroupName);
         if (blackGroupEntityMap != null) {
@@ -313,8 +303,8 @@ public class BrokerConfManager implements Server {
                             .append(fbdTopicList).toString());
         }
         // check if topic enabled authorization
-        ArrayList<String> enableAuthTopicList = new ArrayList<String>();
-        ArrayList<String> unAuthTopicList = new ArrayList<String>();
+        ArrayList<String> enableAuthTopicList = new ArrayList<>();
+        ArrayList<String> unAuthTopicList = new ArrayList<>();
         for (String topicItem : reqTopicSet) {
             if (TStringUtils.isBlank(topicItem)) {
                 continue;
@@ -408,14 +398,14 @@ public class BrokerConfManager implements Server {
                     break;
                 } else {
                     Map<String, List<String>> unAuthorizedCondMap =
-                            new HashMap<String, List<String>>();
+                            new HashMap<>();
                     for (String item : condItemSet) {
                         if (!allowedConds.contains(sb.append(TokenConstants.ARRAY_SEP)
                                 .append(item).append(TokenConstants.ARRAY_SEP).toString())) {
                             isAllowed = false;
                             List<String> unAuthConds = unAuthorizedCondMap.get(tmpTopic);
                             if (unAuthConds == null) {
-                                unAuthConds = new ArrayList<String>();
+                                unAuthConds = new ArrayList<>();
                                 unAuthorizedCondMap.put(tmpTopic, unAuthConds);
                             }
                             unAuthConds.add(item);
@@ -483,7 +473,7 @@ public class BrokerConfManager implements Server {
         // #lizard forgives
         // find broker info
         validMasterStatus();
-        List<BdbBrokerConfEntity> bdbBrokerEntities = new ArrayList<BdbBrokerConfEntity>();
+        List<BdbBrokerConfEntity> bdbBrokerEntities = new ArrayList<>();
         for (BdbBrokerConfEntity entity : brokerConfStoreMap.values()) {
             if (bdbQueryEntity == null) {
                 bdbBrokerEntities.add(entity);
@@ -730,7 +720,7 @@ public class BrokerConfManager implements Server {
     }
 
     public Set<String> getTotalConfiguredTopicNames() {
-        Set<String> totalTopics = new HashSet<String>(50);
+        Set<String> totalTopics = new HashSet<>(50);
         for (ConcurrentHashMap<String, BdbTopicConfEntity> tmpTopicCfgMap
                 : this.brokerTopicEntityStoreMap.values()) {
             if (tmpTopicCfgMap != null) {
@@ -898,7 +888,7 @@ public class BrokerConfManager implements Server {
             BdbTopicConfEntity bdbQueryEntity) {
         // #lizard forgives
         ConcurrentHashMap<String, List<BdbTopicConfEntity>> bdbTopicEntityMap =
-                new ConcurrentHashMap<String, List<BdbTopicConfEntity>>();
+                new ConcurrentHashMap<>();
         for (ConcurrentHashMap<String, BdbTopicConfEntity> topicEntityMap
                 : brokerTopicEntityStoreMap.values()) {
             for (BdbTopicConfEntity entity : topicEntityMap.values()) {
@@ -906,7 +896,7 @@ public class BrokerConfManager implements Server {
                         bdbTopicEntityMap.get(entity.getTopicName());
                 if (bdbQueryEntity == null) {
                     if (bdbTopicEntities == null) {
-                        bdbTopicEntities = new ArrayList<BdbTopicConfEntity>();
+                        bdbTopicEntities = new ArrayList<>();
                         bdbTopicEntityMap.put(entity.getTopicName(), bdbTopicEntities);
                     }
                     bdbTopicEntities.add(entity);
@@ -946,7 +936,7 @@ public class BrokerConfManager implements Server {
                     continue;
                 }
                 if (bdbTopicEntities == null) {
-                    bdbTopicEntities = new ArrayList<BdbTopicConfEntity>();
+                    bdbTopicEntities = new ArrayList<>();
                     bdbTopicEntityMap.put(entity.getTopicName(), bdbTopicEntities);
                 }
                 bdbTopicEntities.add(entity);
@@ -988,7 +978,7 @@ public class BrokerConfManager implements Server {
         if (putResult) {
             if (brokerTopicConfMap == null) {
                 brokerTopicConfMap =
-                        new ConcurrentHashMap<String, BdbTopicConfEntity>();
+                        new ConcurrentHashMap<>();
                 ConcurrentHashMap<String, BdbTopicConfEntity> tmpBrokerTopicConfMap =
                         brokerTopicEntityStoreMap.putIfAbsent(bdbEntity.getBrokerId(), brokerTopicConfMap);
                 if (tmpBrokerTopicConfMap != null) {
@@ -1052,7 +1042,7 @@ public class BrokerConfManager implements Server {
 
     private List<String> innGetTopicStrConfigInfo(BdbBrokerConfEntity brokerConfEntity,
                                                   boolean isRemoved) {
-        List<String> brokerTopicStrConfSet = new ArrayList<String>();
+        List<String> brokerTopicStrConfSet = new ArrayList<>();
         ConcurrentHashMap<String, BdbTopicConfEntity> topicBdbEntytyMap =
                 brokerTopicEntityStoreMap.get(brokerConfEntity.getBrokerId());
         if (topicBdbEntytyMap != null) {
@@ -1208,7 +1198,7 @@ public class BrokerConfManager implements Server {
             BdbTopicAuthControlEntity bdbQueryEntity) throws Exception {
         validMasterStatus();
         List<BdbTopicAuthControlEntity> bdbTopicAuthControlEntities =
-                new ArrayList<BdbTopicAuthControlEntity>();
+                new ArrayList<>();
         for (BdbTopicAuthControlEntity entity : this.topicAuthControlEnableMap.values()) {
             if (bdbQueryEntity == null) {
                 bdbTopicAuthControlEntities.add(entity);
@@ -1258,7 +1248,7 @@ public class BrokerConfManager implements Server {
         if (putResult) {
             if (groupFilterCondEntityMap == null) {
                 groupFilterCondEntityMap =
-                        new ConcurrentHashMap<String, BdbGroupFilterCondEntity>();
+                        new ConcurrentHashMap<>();
                 ConcurrentHashMap<String, BdbGroupFilterCondEntity> tmpGroupFilterCondEntityMap =
                         groupFilterCondTopicMap.putIfAbsent(bdbEntity.getTopicName(), groupFilterCondEntityMap);
                 if (tmpGroupFilterCondEntityMap != null) {
@@ -1317,7 +1307,7 @@ public class BrokerConfManager implements Server {
      */
     public List<BdbGroupFilterCondEntity> getBdbAllowedGroupFilterConds(String topicName) {
         List<BdbGroupFilterCondEntity> bdbGroupFilterCondEntities =
-                new ArrayList<BdbGroupFilterCondEntity>();
+                new ArrayList<>();
         ConcurrentHashMap<String, BdbGroupFilterCondEntity> groupFilterCondMap =
                 groupFilterCondTopicMap.get(topicName);
         if (groupFilterCondMap != null) {
@@ -1355,7 +1345,7 @@ public class BrokerConfManager implements Server {
             BdbGroupFilterCondEntity bdbQueryEntity) throws Exception {
         validMasterStatus();
         List<BdbGroupFilterCondEntity> bdbGroupFilterCondEntities =
-                new ArrayList<BdbGroupFilterCondEntity>();
+                new ArrayList<>();
         for (ConcurrentHashMap<String, BdbGroupFilterCondEntity> groupFilterCondEntityMap
                 : groupFilterCondTopicMap.values()) {
             for (BdbGroupFilterCondEntity entity : groupFilterCondEntityMap.values()) {
@@ -1454,7 +1444,7 @@ public class BrokerConfManager implements Server {
                 mBdbStoreManagerService.putBdbConsumerGroupConfEntity(bdbEntity, true);
         if (putResult) {
             if (consumerGroupEntityMap == null) {
-                consumerGroupEntityMap = new ConcurrentHashMap<String, BdbConsumerGroupEntity>();
+                consumerGroupEntityMap = new ConcurrentHashMap<>();
                 ConcurrentHashMap<String, BdbConsumerGroupEntity> tmpConsumerGroupEntityMap =
                         consumerGroupTopicMap.putIfAbsent(bdbEntity.getGroupTopicName(), consumerGroupEntityMap);
                 if (tmpConsumerGroupEntityMap != null) {
@@ -1477,7 +1467,7 @@ public class BrokerConfManager implements Server {
      */
     public List<BdbConsumerGroupEntity> getBdbAllowedConsumerGroups(String topicName) {
         List<BdbConsumerGroupEntity> bdbConsumerGroupEntities =
-                new ArrayList<BdbConsumerGroupEntity>();
+                new ArrayList<>();
         ConcurrentHashMap<String, BdbConsumerGroupEntity> consumerGroupMap =
                 consumerGroupTopicMap.get(topicName);
         if (consumerGroupMap != null) {
@@ -1490,7 +1480,7 @@ public class BrokerConfManager implements Server {
             BdbConsumerGroupEntity bdbQueryEntity) throws Exception {
         validMasterStatus();
         List<BdbConsumerGroupEntity> bdbConsumerGroupEntities =
-                new ArrayList<BdbConsumerGroupEntity>();
+                new ArrayList<>();
         for (ConcurrentHashMap<String, BdbConsumerGroupEntity> consumerGroupEntityMap
                 : consumerGroupTopicMap.values()) {
             for (BdbConsumerGroupEntity entity : consumerGroupEntityMap.values()) {
@@ -1585,7 +1575,7 @@ public class BrokerConfManager implements Server {
         if (putResult) {
             if (blackGroupEntityMap == null) {
                 blackGroupEntityMap =
-                        new ConcurrentHashMap<String, BdbBlackGroupEntity>();
+                        new ConcurrentHashMap<>();
                 blackGroupTopicMap.put(bdbEntity.getBlackGroupName(), blackGroupEntityMap);
             }
             blackGroupEntityMap.put(bdbEntity.getTopicName(), bdbEntity);
@@ -1606,7 +1596,7 @@ public class BrokerConfManager implements Server {
     public List<BdbBlackGroupEntity> confGetBdbBlackConsumerGroupSet(
             BdbBlackGroupEntity bdbQueryEntity) throws Exception {
         validMasterStatus();
-        List<BdbBlackGroupEntity> bdbBlackGroupEntities = new ArrayList<BdbBlackGroupEntity>();
+        List<BdbBlackGroupEntity> bdbBlackGroupEntities = new ArrayList<>();
         for (ConcurrentHashMap<String, BdbBlackGroupEntity> blackGroupEntityMap
                 : blackGroupTopicMap.values()) {
             for (BdbBlackGroupEntity entity : blackGroupEntityMap.values()) {
@@ -1679,7 +1669,7 @@ public class BrokerConfManager implements Server {
     }
 
     public List<String> getBdbBlackTopicList(String consumerGroupName) {
-        ArrayList<String> blackTopicList = new ArrayList<String>();
+        ArrayList<String> blackTopicList = new ArrayList<>();
         ConcurrentHashMap<String/* topicname */, BdbBlackGroupEntity> blackGroupEntityMap =
                 this.blackGroupTopicMap.get(consumerGroupName);
         if (blackGroupEntityMap != null) {
@@ -1786,7 +1776,7 @@ public class BrokerConfManager implements Server {
             BdbConsumeGroupSettingEntity bdbQueryEntity) throws Exception {
         validMasterStatus();
         List<BdbConsumeGroupSettingEntity> bdbOffsetRstGroupEntities =
-                new ArrayList<BdbConsumeGroupSettingEntity>();
+                new ArrayList<>();
         for (BdbConsumeGroupSettingEntity entity
                 : consumeGroupSettingMap.values()) {
             if (entity == null) {
@@ -1897,7 +1887,7 @@ public class BrokerConfManager implements Server {
             BdbGroupFlowCtrlEntity bdbQueryEntity) throws Exception {
         validMasterStatus();
         List<BdbGroupFlowCtrlEntity> bdbGroupFlowCtrlEntities =
-                new ArrayList<BdbGroupFlowCtrlEntity>();
+                new ArrayList<>();
         for (BdbGroupFlowCtrlEntity ctrlEntity : consumeGroupFlowCtrlMap.values()) {
             if (ctrlEntity == null) {
                 continue;
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerInfoHolder.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerInfoHolder.java
index 2177260..c644c5d 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerInfoHolder.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerInfoHolder.java
@@ -42,11 +42,11 @@ public class BrokerInfoHolder {
     private static final Logger logger =
             LoggerFactory.getLogger(BrokerInfoHolder.class);
     private final ConcurrentHashMap<Integer/* brokerId */, BrokerInfo> brokerInfoMap =
-            new ConcurrentHashMap<Integer, BrokerInfo>();
+            new ConcurrentHashMap<>();
     private final ConcurrentHashMap<Integer/* brokerId */, BrokerAbnInfo> brokerAbnormalMap =
-            new ConcurrentHashMap<Integer, BrokerAbnInfo>();
+            new ConcurrentHashMap<>();
     private final ConcurrentHashMap<Integer/* brokerId */, BrokerFbdInfo> brokerForbiddenMap =
-            new ConcurrentHashMap<Integer, BrokerFbdInfo>();
+            new ConcurrentHashMap<>();
     private final int maxAutoForbiddenCnt;
     private final BrokerConfManager brokerConfManager;
     private AtomicInteger brokerTotalCount = new AtomicInteger(0);
@@ -183,7 +183,7 @@ public class BrokerInfoHolder {
     }
 
     public Map<Integer, BrokerInfo> getBrokerInfos(Collection<Integer> brokerIds) {
-        HashMap<Integer, BrokerInfo> brokerMap = new HashMap<Integer, BrokerInfo>();
+        HashMap<Integer, BrokerInfo> brokerMap = new HashMap<>();
         for (Integer brokerId : brokerIds) {
             brokerMap.put(brokerId, brokerInfoMap.get(brokerId));
         }
@@ -307,7 +307,7 @@ public class BrokerInfoHolder {
         if (brokerIdSet == null || brokerIdSet.isEmpty()) {
             return;
         }
-        List<BrokerFbdInfo> brokerFbdInfos = new ArrayList<BrokerFbdInfo>();
+        List<BrokerFbdInfo> brokerFbdInfos = new ArrayList<>();
         for (Integer brokerId : brokerIdSet) {
             BrokerFbdInfo fbdInfo = this.brokerForbiddenMap.remove(brokerId);
             if (fbdInfo != null) {
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerSyncStatusInfo.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerSyncStatusInfo.java
index 461a5f4..cfe8e0c 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerSyncStatusInfo.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerSyncStatusInfo.java
@@ -63,20 +63,20 @@ public class BrokerSyncStatusInfo {
     private long lastDataPushInMills = 0;
     private String lastPushBrokerDefaultConfInfo;
     private List<String> lastPushBrokerTopicSetConfInfo =
-            new ArrayList<String>();
+            new ArrayList<>();
 
     private long reportedBrokerConfId = -2;
     private int reportedBrokerCheckSumId = -2;
     private long lastDataReportInMills = 0;
     private String reportedBrokerDefaultConfInfo;
     private List<String> reportedBrokerTopicSetConfInfo =
-            new ArrayList<String>();
+            new ArrayList<>();
 
     private AtomicLong currBrokerConfId = new AtomicLong(0);
     private int currBrokerCheckSumId = 0;
     private String curBrokerDefaultConfInfo;
     private List<String> curBrokerTopicSetConfInfo =
-            new ArrayList<String>();
+            new ArrayList<>();
 
     private int numPartitions = 1;              //partition number
     private int numTopicStores = 1;             //store number
@@ -165,7 +165,7 @@ public class BrokerSyncStatusInfo {
         this.reportedBrokerCheckSumId = -2;
         this.reportedBrokerDefaultConfInfo = "";
         this.reportedBrokerTopicSetConfInfo =
-                new ArrayList<String>();
+                new ArrayList<>();
         this.isBrokerRegister = false;
         this.isBrokerOnline = false;
         this.isFastStart = false;
@@ -196,7 +196,7 @@ public class BrokerSyncStatusInfo {
         if (isTackData) {
             this.reportedBrokerDefaultConfInfo = reportDefaultConfInfo;
             if (reportTopicSetConfInfo == null) {
-                this.reportedBrokerTopicSetConfInfo = new ArrayList<String>();
+                this.reportedBrokerTopicSetConfInfo = new ArrayList<>();
             } else {
                 this.reportedBrokerTopicSetConfInfo = reportTopicSetConfInfo;
             }
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/TopicPSInfoManager.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/TopicPSInfoManager.java
index 02cf875..2b0ffc9 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/TopicPSInfoManager.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/TopicPSInfoManager.java
@@ -37,13 +37,13 @@ public class TopicPSInfoManager {
 
     private final ConcurrentHashMap<String/* topic */,
             ConcurrentHashMap<BrokerInfo, TopicInfo>> brokerPubInfoMap =
-            new ConcurrentHashMap<String/* topic */, ConcurrentHashMap<BrokerInfo, TopicInfo>>();
+            new ConcurrentHashMap<>();
     private final ConcurrentHashMap<String/* topic */,
             ConcurrentHashSet<String/* producerId */>> topicPubInfoMap =
-            new ConcurrentHashMap<String, ConcurrentHashSet<String>>();
+            new ConcurrentHashMap<>();
     private final ConcurrentHashMap<String/* topic */,
             ConcurrentHashSet<String/* group */>> topicSubInfoMap =
-            new ConcurrentHashMap<String, ConcurrentHashSet<String>>();
+            new ConcurrentHashMap<>();
 
     /**
      * Get groups according to topic
@@ -111,7 +111,7 @@ public class TopicPSInfoManager {
                     topicPubInfoMap.get(topic);
             if (producerIdSet == null) {
                 ConcurrentHashSet<String> tmpProducerIdSet =
-                        new ConcurrentHashSet<String>();
+                        new ConcurrentHashSet<>();
                 producerIdSet =
                         topicPubInfoMap.putIfAbsent(topic, tmpProducerIdSet);
                 if (producerIdSet == null) {
@@ -172,7 +172,7 @@ public class TopicPSInfoManager {
     }
 
     public Set<Partition> getPartitions() {
-        Set<Partition> partitions = new HashSet<Partition>();
+        Set<Partition> partitions = new HashSet<>();
         for (Map<BrokerInfo, TopicInfo> broker
                 : this.brokerPubInfoMap.values()) {
             for (Map.Entry<BrokerInfo, TopicInfo> entry
@@ -191,7 +191,7 @@ public class TopicPSInfoManager {
     }
 
     public Set<Partition> getPartitions(Set<String> topics) {
-        Set<Partition> partList = new HashSet<Partition>();
+        Set<Partition> partList = new HashSet<>();
         for (String topic : topics) {
             partList.addAll(getPartitionSet(topic));
         }
@@ -199,7 +199,7 @@ public class TopicPSInfoManager {
     }
 
     public Map<String, Partition> getPartitionMap(Set<String> topics) {
-        Map<String, Partition> partMap = new HashMap<String, Partition>();
+        Map<String, Partition> partMap = new HashMap<>();
         for (String topic : topics) {
             ConcurrentHashMap<BrokerInfo, TopicInfo> topicInfoMap =
                     brokerPubInfoMap.get(topic);
@@ -225,11 +225,11 @@ public class TopicPSInfoManager {
     }
 
     public Set<Partition> getPartitionSet(String topic) {
-        Set<Partition> partSet = new HashSet<Partition>();
+        Set<Partition> partSet = new HashSet<>();
         ConcurrentHashMap<BrokerInfo, TopicInfo> topicInfoMap =
                 brokerPubInfoMap.get(topic);
         if (topicInfoMap == null) {
-            return new HashSet<Partition>();
+            return new HashSet<>();
         }
         for (Map.Entry<BrokerInfo, TopicInfo> entry
                 : topicInfoMap.entrySet()) {
@@ -250,7 +250,7 @@ public class TopicPSInfoManager {
     }
 
     public List<Partition> getPartitionList(String topic) {
-        List<Partition> partList = new ArrayList<Partition>(getPartitionSet(topic));
+        List<Partition> partList = new ArrayList<>(getPartitionSet(topic));
         return partList;
     }
 
@@ -264,7 +264,7 @@ public class TopicPSInfoManager {
     }
 
     public List<TopicInfo> getBrokerPubInfoList(BrokerInfo broker) {
-        List<TopicInfo> topicInfoList = new ArrayList<TopicInfo>();
+        List<TopicInfo> topicInfoList = new ArrayList<>();
         for (Map.Entry<String, ConcurrentHashMap<BrokerInfo, TopicInfo>> pubEntry
                 : brokerPubInfoMap.entrySet()) {
             ConcurrentHashMap<BrokerInfo, TopicInfo> topicPubMap =
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodeconsumer/ConsumerBandInfo.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodeconsumer/ConsumerBandInfo.java
index 7d03ca5..2f33cc9 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodeconsumer/ConsumerBandInfo.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodeconsumer/ConsumerBandInfo.java
@@ -46,15 +46,15 @@ public class ConsumerBandInfo {
     private long createTime = System.currentTimeMillis();           //create time
     private Set<String> topicSet = new HashSet<>();                 //topic set
     private Map<String, TreeSet<String>> topicConditions =          //filter condition set
-            new HashMap<String, TreeSet<String>>();
+            new HashMap<>();
     private ConcurrentHashMap<String, ConsumerInfo> consumerInfoMap =   //consumer info
-            new ConcurrentHashMap<String, ConsumerInfo>();
+            new ConcurrentHashMap<>();
     private ConcurrentHashMap<String, String> partitionInfoMap =        //partition info
-            new ConcurrentHashMap<String, String>();
+            new ConcurrentHashMap<>();
     private ConcurrentHashMap<String, Long> partOffsetMap =             //partition offset
-            new ConcurrentHashMap<String, Long>();
+            new ConcurrentHashMap<>();
     private ConcurrentHashMap<String, NodeRebInfo> rebalanceMap =       //load balance
-            new ConcurrentHashMap<String, NodeRebInfo>();
+            new ConcurrentHashMap<>();
     private int defBClientRate = -2;            //default broker/client ratio
     private int confBClientRate = -2;           //config broker/client ratio
     private int curBClientRate = -2;            //current broker/client ratio
@@ -205,7 +205,7 @@ public class ConsumerBandInfo {
             return null;
         }
         this.rebalanceMap.remove(consumerId);
-        List<String> partKeyList = new ArrayList<String>();
+        List<String> partKeyList = new ArrayList<>();
         for (Map.Entry<String, String> entry : partitionInfoMap.entrySet()) {
             if (entry.getValue() != null) {
                 if (entry.getValue().equals(consumerId)) {
@@ -274,9 +274,9 @@ public class ConsumerBandInfo {
     }
 
     public RebProcessInfo getNeedRebNodeList() {
-        List<String> needProcessList = new ArrayList<String>();
-        List<String> needEscapeList = new ArrayList<String>();
-        List<String> needRemoved = new ArrayList<String>();
+        List<String> needProcessList = new ArrayList<>();
+        List<String> needEscapeList = new ArrayList<>();
+        List<String> needRemoved = new ArrayList<>();
         for (NodeRebInfo nodeRebInfo : this.rebalanceMap.values()) {
             if (nodeRebInfo.getStatus() == 0) {
                 nodeRebInfo.setStatus(1);
@@ -305,7 +305,7 @@ public class ConsumerBandInfo {
                 || this.rebalanceMap.isEmpty()) {
             return;
         }
-        List<String> needRemoved = new ArrayList<String>();
+        List<String> needRemoved = new ArrayList<>();
         for (NodeRebInfo nodeRebInfo : this.rebalanceMap.values()) {
             if (processList.contains(nodeRebInfo.getClientId())) {
                 if (nodeRebInfo.getReqType() == 0) {
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodeconsumer/ConsumerEventManager.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodeconsumer/ConsumerEventManager.java
index 57f19d6..3ca4dd2 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodeconsumer/ConsumerEventManager.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodeconsumer/ConsumerEventManager.java
@@ -36,11 +36,11 @@ public class ConsumerEventManager {
     private static final Logger logger = LoggerFactory.getLogger(ConsumerEventManager.class);
 
     private final ConcurrentHashMap<String/* consumerId */, LinkedList<ConsumerEvent>> disconnectEventMap =
-            new ConcurrentHashMap<String, LinkedList<ConsumerEvent>>();
+            new ConcurrentHashMap<>();
     private final ConcurrentHashMap<String/* consumerId */, LinkedList<ConsumerEvent>> connectEventMap =
-            new ConcurrentHashMap<String, LinkedList<ConsumerEvent>>();
+            new ConcurrentHashMap<>();
     private final ConcurrentHashMap<String/* group */, AtomicInteger> groupUnfinishedCountMap =
-            new ConcurrentHashMap<String, AtomicInteger>();
+            new ConcurrentHashMap<>();
 
     private final ConsumerInfoHolder consumerHolder;
 
@@ -53,7 +53,7 @@ public class ConsumerEventManager {
         LinkedList<ConsumerEvent> eventList =
                 disconnectEventMap.get(consumerId);
         if (eventList == null) {
-            eventList = new LinkedList<ConsumerEvent>();
+            eventList = new LinkedList<>();
             LinkedList<ConsumerEvent> tmptList =
                     disconnectEventMap.putIfAbsent(consumerId, eventList);
             if (tmptList != null) {
@@ -70,7 +70,7 @@ public class ConsumerEventManager {
         LinkedList<ConsumerEvent> eventList =
                 connectEventMap.get(consumerId);
         if (eventList == null) {
-            eventList = new LinkedList<ConsumerEvent>();
+            eventList = new LinkedList<>();
             LinkedList<ConsumerEvent> tmptList =
                     connectEventMap.putIfAbsent(consumerId, eventList);
             if (tmptList != null) {
@@ -240,7 +240,7 @@ public class ConsumerEventManager {
      * @return consumer id set
      */
     public Set<String> getUnProcessedIdSet() {
-        Set<String> consumerIdSet = new HashSet<String>();
+        Set<String> consumerIdSet = new HashSet<>();
         for (Map.Entry<String, LinkedList<ConsumerEvent>> entry
                 : disconnectEventMap.entrySet()) {
             if (!entry.getValue().isEmpty()) {
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfoHolder.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfoHolder.java
index 8e9bc49..5c78858 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfoHolder.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfoHolder.java
@@ -31,9 +31,9 @@ import org.apache.tubemq.corebase.cluster.ConsumerInfo;
 public class ConsumerInfoHolder {
 
     private final ConcurrentHashMap<String/* group */, ConsumerBandInfo> groupInfoMap =
-            new ConcurrentHashMap<String, ConsumerBandInfo>();
+            new ConcurrentHashMap<>();
     private final ConcurrentHashMap<String/* consumerId */, String/* group */> consumerIndexMap =
-            new ConcurrentHashMap<String, String>();
+            new ConcurrentHashMap<>();
     private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
 
     /**
@@ -55,7 +55,7 @@ public class ConsumerInfoHolder {
                         oldConsumeBandInfo.getConsumerInfoList();
                 if (oldConsumerList != null) {
                     List<ConsumerInfo> consumerList =
-                            new ArrayList<ConsumerInfo>(oldConsumerList.size());
+                            new ArrayList<>(oldConsumerList.size());
                     for (ConsumerInfo consumer : oldConsumerList) {
                         if (consumer != null) {
                             consumerList.add(consumer.clone());
@@ -392,7 +392,7 @@ public class ConsumerInfoHolder {
                 return Collections.emptyList();
             } else {
                 List<String> groupList =
-                        new ArrayList<String>(groupInfoMap.size());
+                        new ArrayList<>(groupInfoMap.size());
                 groupList.addAll(groupInfoMap.keySet());
                 return groupList;
             }
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodeconsumer/RebProcessInfo.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodeconsumer/RebProcessInfo.java
index c661d9a..5d72b86 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodeconsumer/RebProcessInfo.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodeconsumer/RebProcessInfo.java
@@ -24,9 +24,9 @@ import java.util.List;
 public class RebProcessInfo {
 
     public List<String> needProcessList =
-            new ArrayList<String>();
+            new ArrayList<>();
     public List<String> needEscapeList =
-            new ArrayList<String>();
+            new ArrayList<>();
 
     public RebProcessInfo() {
 
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodeproducer/ProducerInfoHolder.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodeproducer/ProducerInfoHolder.java
index dae39e9..63e6586 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodeproducer/ProducerInfoHolder.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodeproducer/ProducerInfoHolder.java
@@ -25,7 +25,7 @@ import org.apache.tubemq.corebase.cluster.ProducerInfo;
 public class ProducerInfoHolder {
 
     final ConcurrentHashMap<String/* producerId */, ProducerInfo> producerInfoMap =
-            new ConcurrentHashMap<String, ProducerInfo>();
+            new ConcurrentHashMap<>();
 
     public ProducerInfo getProducerInfo(String producerId) {
         return producerInfoMap.get(producerId);
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/action/screen/Master.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/action/screen/Master.java
index 9db6045..7ac535c 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/action/screen/Master.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/action/screen/Master.java
@@ -224,7 +224,7 @@ public class Master implements Action {
             brokerInfoMap = master.getBrokerHolder().getBrokerInfoMap();
         } else {
             String[] brokerIdArr = brokerIds.split(",");
-            List<Integer> idList = new ArrayList<Integer>(brokerIdArr.length);
+            List<Integer> idList = new ArrayList<>(brokerIdArr.length);
             for (String strId : brokerIdArr) {
                 idList.add(Integer.parseInt(strId));
             }
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/action/screen/Webapi.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/action/screen/Webapi.java
index 7c52308..63e7367 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/action/screen/Webapi.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/action/screen/Webapi.java
@@ -428,9 +428,9 @@ public class Webapi implements Action {
             int minRequireClientCnt = -2;
             int rebalanceStatus = -2;
             Set<String> topicSet = new HashSet<>();
-            List<ConsumerInfo> consumerList = new ArrayList<ConsumerInfo>();
-            Map<String, NodeRebInfo> nodeRebInfoMap = new ConcurrentHashMap<String, NodeRebInfo>();
-            Map<String, TreeSet<String>> existedTopicCondtions = new HashMap<String, TreeSet<String>>();
+            List<ConsumerInfo> consumerList = new ArrayList<>();
+            Map<String, NodeRebInfo> nodeRebInfoMap = new ConcurrentHashMap<>();
+            Map<String, TreeSet<String>> existedTopicCondtions = new HashMap<>();
             ConsumerInfoHolder consumerHolder = master.getConsumerHolder();
             ConsumerBandInfo consumerBandInfo = consumerHolder.getConsumerBandInfo(strConsumeGroup);
             if (consumerBandInfo != null) {
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/action/screen/cluster/ClusterManager.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/action/screen/cluster/ClusterManager.java
index 1fdd8f3..192254c 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/action/screen/cluster/ClusterManager.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/action/screen/cluster/ClusterManager.java
@@ -42,7 +42,7 @@ public class ClusterManager implements Action {
     public void execute(RequestContext context) {
         HttpServletRequest req = context.getReq();
         BrokerConfManager brokerConfManager = this.master.getMasterTopicManager();
-        List<ClusterGroupVO> clusterGroupVOList = new ArrayList<ClusterGroupVO>();
+        List<ClusterGroupVO> clusterGroupVOList = new ArrayList<>();
         ClusterGroupVO clusterGroupVO = brokerConfManager.getGroupAddressStrInfo();
         if (clusterGroupVO != null) {
             clusterGroupVOList.add(clusterGroupVO);
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/action/screen/config/BrokerList.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/action/screen/config/BrokerList.java
index e7fab62..909a888 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/action/screen/config/BrokerList.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/action/screen/config/BrokerList.java
@@ -84,7 +84,7 @@ public class BrokerList implements Action {
                     fromIndex + pageSize > brokerInfoList.size() ? brokerInfoList.size() : fromIndex
                             + pageSize;
             List<BrokerInfo> firstPageList = brokerInfoList.subList(fromIndex, toIndex);
-            brokerVOList = new ArrayList<BrokerVO>(brokerInfoList.size());
+            brokerVOList = new ArrayList<>(brokerInfoList.size());
             TopicPSInfoManager psInfoManager = master.getTopicPSInfoManager();
             for (BrokerInfo brokerInfo : firstPageList) {
                 BrokerVO brokerVO = new BrokerVO();
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebAdminFlowRuleHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebAdminFlowRuleHandler.java
index 83dd0eb..e6a91a1 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebAdminFlowRuleHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebAdminFlowRuleHandler.java
@@ -84,13 +84,13 @@ public class WebAdminFlowRuleHandler {
                     WebParameterUtils.validBooleanDataParameter("needSSDProc",
                             req.getParameter("needSSDProc"),
                             false, false);
-            Set<String> batchGroupNames = new HashSet<String>();
+            Set<String> batchGroupNames = new HashSet<>();
             if (opType == 1) {
                 batchGroupNames.add(TServerConstants.TOKEN_DEFAULT_FLOW_CONTROL);
             } else {
                 // get groupname info if rule is set to consume group
                 boolean checkResToken = opType > 1;
-                Set<String> resTokenSet = new HashSet<String>();
+                Set<String> resTokenSet = new HashSet<>();
                 if (checkResToken) {
                     resTokenSet.add(TServerConstants.TOKEN_DEFAULT_FLOW_CONTROL);
                 }
@@ -147,12 +147,12 @@ public class WebAdminFlowRuleHandler {
                     WebParameterUtils.validDateParameter("createDate",
                             req.getParameter("createDate"),
                             TBaseConstants.META_MAX_DATEVALUE_LENGTH, false, new Date());
-            Set<String> batchGroupNames = new HashSet<String>();
+            Set<String> batchGroupNames = new HashSet<>();
             if (opType == 1) {
                 batchGroupNames.add(TServerConstants.TOKEN_DEFAULT_FLOW_CONTROL);
             } else {
                 boolean checkResToken = opType > 1;
-                Set<String> resTokenSet = new HashSet<String>();
+                Set<String> resTokenSet = new HashSet<>();
                 if (checkResToken) {
                     resTokenSet.add(TServerConstants.TOKEN_DEFAULT_FLOW_CONTROL);
                 }
@@ -193,13 +193,13 @@ public class WebAdminFlowRuleHandler {
                     WebParameterUtils.validDateParameter("createDate",
                             req.getParameter("createDate"),
                             TBaseConstants.META_MAX_DATEVALUE_LENGTH, false, new Date());
-            Set<String> batchGroupNames = new HashSet<String>();
+            Set<String> batchGroupNames = new HashSet<>();
             // check optype
             if (opType == 1) {
                 batchGroupNames.add(TServerConstants.TOKEN_DEFAULT_FLOW_CONTROL);
             } else {
                 boolean checkResToken = opType > 1;
-                Set<String> resTokenSet = new HashSet<String>();
+                Set<String> resTokenSet = new HashSet<>();
                 if (checkResToken) {
                     resTokenSet.add(TServerConstants.TOKEN_DEFAULT_FLOW_CONTROL);
                 }
@@ -302,12 +302,12 @@ public class WebAdminFlowRuleHandler {
                     .setQryPriorityId(WebParameterUtils.validIntDataParameter("qryPriorityId",
                             req.getParameter("qryPriorityId"),
                             false, TBaseConstants.META_VALUE_UNDEFINED, 0));
-            Set<String> batchGroupNames = new HashSet<String>();
+            Set<String> batchGroupNames = new HashSet<>();
             if (opType == 1) {
                 batchGroupNames.add(TServerConstants.TOKEN_DEFAULT_FLOW_CONTROL);
             } else {
                 boolean checkResToken = opType > 1;
-                Set<String> resTokenSet = new HashSet<String>();
+                Set<String> resTokenSet = new HashSet<>();
                 if (checkResToken) {
                     resTokenSet.add(TServerConstants.TOKEN_DEFAULT_FLOW_CONTROL);
                 }
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebAdminGroupCtrlHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebAdminGroupCtrlHandler.java
index 982f942..7c33c7b 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebAdminGroupCtrlHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebAdminGroupCtrlHandler.java
@@ -170,7 +170,7 @@ public class WebAdminGroupCtrlHandler {
             }
             Set<String> confgiuredTopicSet = brokerConfManager.getTotalConfiguredTopicNames();
             HashMap<String, BdbGroupFilterCondEntity> inGroupFilterCondEntityMap =
-                    new HashMap<String, BdbGroupFilterCondEntity>();
+                    new HashMap<>();
             for (int j = 0; j < filterJsonArray.size(); j++) {
                 Map<String, String> groupObject = filterJsonArray.get(j);
                 try {
@@ -368,7 +368,7 @@ public class WebAdminGroupCtrlHandler {
             if ((jsonArray == null) || (jsonArray.isEmpty())) {
                 throw new Exception("Null value of filterCondJsonSet, please set the value first!");
             }
-            Set<String> batchRecords = new HashSet<String>();
+            Set<String> batchRecords = new HashSet<>();
             List<BdbGroupFilterCondEntity> modifyFilterCondEntities = new ArrayList<>();
             for (int j = 0; j < jsonArray.size(); j++) {
                 Map<String, String> groupObject = jsonArray.get(j);
@@ -780,7 +780,7 @@ public class WebAdminGroupCtrlHandler {
             }
             Set<String> confgiuredTopicSet = brokerConfManager.getTotalConfiguredTopicNames();
             HashMap<String, BdbConsumerGroupEntity> inGroupAuthConfEntityMap =
-                    new HashMap<String, BdbConsumerGroupEntity>();
+                    new HashMap<>();
             for (int j = 0; j < jsonArray.size(); j++) {
                 Map<String, String> groupObject = jsonArray.get(j);
                 try {
@@ -1294,7 +1294,7 @@ public class WebAdminGroupCtrlHandler {
                 throw new Exception("Null value of groupNameJsonSet, please set the value first!");
             }
             HashMap<String, BdbConsumeGroupSettingEntity> inOffsetRstGroupEntityMap =
-                    new HashMap<String, BdbConsumeGroupSettingEntity>();
+                    new HashMap<>();
             for (int j = 0; j < groupNameJsonArray.size(); j++) {
                 Map<String, String> groupObject = groupNameJsonArray.get(j);
                 try {
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebAdminTopicAuthHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebAdminTopicAuthHandler.java
index f2d561f..77beb91 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebAdminTopicAuthHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebAdminTopicAuthHandler.java
@@ -119,9 +119,9 @@ public class WebAdminTopicAuthHandler {
             }
             Set<String> configuredTopicSet = brokerConfManager.getTotalConfiguredTopicNames();
             HashMap<String, BdbTopicAuthControlEntity> inTopicAuthConfEntityMap =
-                    new HashMap<String, BdbTopicAuthControlEntity>();
+                    new HashMap<>();
             HashMap<String, BdbConsumerGroupEntity> inGroupAuthConfEntityMap =
-                    new HashMap<String, BdbConsumerGroupEntity>();
+                    new HashMap<>();
             for (int count = 0; count < topicJsonArray.size(); count++) {
                 Map<String, String> jsonObject = topicJsonArray.get(count);
                 try {
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerDefConfHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerDefConfHandler.java
index 128fbde..73a9989 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerDefConfHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerDefConfHandler.java
@@ -601,7 +601,7 @@ public class WebBrokerDefConfHandler {
             // Check the current status and status after the change, to see if there are changes.
             // If yes, check if the current status complies with the change.
             // If it complies, record the change.
-            Set<BdbBrokerConfEntity> newBrokerEntitySet = new HashSet<BdbBrokerConfEntity>();
+            Set<BdbBrokerConfEntity> newBrokerEntitySet = new HashSet<>();
             for (BdbBrokerConfEntity oldEntity : batchBrokerEntitySet) {
                 if (oldEntity == null) {
                     continue;
@@ -1026,7 +1026,7 @@ public class WebBrokerDefConfHandler {
                     WebParameterUtils.getBatchBrokerIdSet(req.getParameter("brokerId"),
                             brokerConfManager, true, strBuffer);
             Set<BdbBrokerConfEntity> newBrokerEntitys =
-                    new HashSet<BdbBrokerConfEntity>();
+                    new HashSet<>();
             for (BdbBrokerConfEntity oldEntity : batchBrokerEntities) {
                 if (oldEntity == null) {
                     continue;
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerTopicConfHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerTopicConfHandler.java
index c0d932c..bb90522 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerTopicConfHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerTopicConfHandler.java
@@ -97,8 +97,8 @@ public class WebBrokerTopicConfHandler {
             Set<BdbBrokerConfEntity> batchBrokerEntitySet =
                     WebParameterUtils.getBatchBrokerIdSet(req.getParameter("brokerId"), brokerConfManager, true,
                             strBuffer);
-            List<BdbTopicConfEntity> batchAddBdbTopicEntities = new ArrayList<BdbTopicConfEntity>();
-            List<BdbTopicAuthControlEntity> batchAddBdbTopicAuthControls = new ArrayList<BdbTopicAuthControlEntity>();
+            List<BdbTopicConfEntity> batchAddBdbTopicEntities = new ArrayList<>();
+            List<BdbTopicAuthControlEntity> batchAddBdbTopicAuthControls = new ArrayList<>();
             // for each topic
             for (String topicName : batchAddTopicNames) {
                 BdbTopicAuthControlEntity tmpTopicAuthControl =
@@ -238,10 +238,10 @@ public class WebBrokerTopicConfHandler {
             if ((topicJsonArray == null) || (topicJsonArray.isEmpty())) {
                 throw new Exception("Null value of topicJsonSet, please set the value first!");
             }
-            Set<String> batchAddTopicNames = new HashSet<String>();
-            Set<String> batchAddItemKeys = new HashSet<String>();
-            List<BdbTopicAuthControlEntity> batchTopicAuthInfos = new ArrayList<BdbTopicAuthControlEntity>();
-            List<BdbTopicConfEntity> batchAddBdbTopicEntities = new ArrayList<BdbTopicConfEntity>();
+            Set<String> batchAddTopicNames = new HashSet<>();
+            Set<String> batchAddItemKeys = new HashSet<>();
+            List<BdbTopicAuthControlEntity> batchTopicAuthInfos = new ArrayList<>();
+            List<BdbTopicConfEntity> batchAddBdbTopicEntities = new ArrayList<>();
             for (int count = 0; count < topicJsonArray.size(); count++) {
                 Map<String, String> jsonObject = topicJsonArray.get(count);
                 try {
@@ -722,9 +722,9 @@ public class WebBrokerTopicConfHandler {
             Set<BdbBrokerConfEntity> batchInputTopicEntitySet =
                     WebParameterUtils.getBatchBrokerIdSet(req.getParameter("brokerId"),
                             brokerConfManager, true, strBuffer);
-            Set<Integer> changedBrokerSet = new HashSet<Integer>();
+            Set<Integer> changedBrokerSet = new HashSet<>();
             Set<BdbTopicConfEntity> batchRmvBdbTopicEntitySet =
-                    new HashSet<BdbTopicConfEntity>();
+                    new HashSet<>();
 
             // For the broker to perform, check its status
             // and check the config of the topic to see if the action could be performed
@@ -866,7 +866,7 @@ public class WebBrokerTopicConfHandler {
             Set<BdbBrokerConfEntity> batchBrokerEntitySet =
                     WebParameterUtils.getBatchBrokerIdSet(req.getParameter("brokerId"), brokerConfManager, true,
                             strBuffer);
-            List<BdbTopicConfEntity> batchRmvBdbTopicEntities = new ArrayList<BdbTopicConfEntity>();
+            List<BdbTopicConfEntity> batchRmvBdbTopicEntities = new ArrayList<>();
             for (BdbBrokerConfEntity brokerConfEntity : batchBrokerEntitySet) {
                 if (brokerConfEntity == null) {
                     continue;
@@ -913,7 +913,7 @@ public class WebBrokerTopicConfHandler {
                 }
             }
             try {
-                Set<Integer> changedBrokerSet = new HashSet<Integer>();
+                Set<Integer> changedBrokerSet = new HashSet<>();
                 for (BdbTopicConfEntity itemTopicEntity : batchRmvBdbTopicEntities) {
                     BdbBrokerConfEntity brokerConfEntity =
                             brokerConfManager.getBrokerDefaultConfigStoreInfo(itemTopicEntity.getBrokerId());
@@ -1211,7 +1211,7 @@ public class WebBrokerTopicConfHandler {
                     WebParameterUtils.validIntDataParameter("memCacheFlushIntvl",
                             req.getParameter("memCacheFlushIntvl"), false, TBaseConstants.META_VALUE_UNDEFINED, 4000);
             int unFlushDataHold = unflushThreshold;
-            List<BdbTopicConfEntity> batchModBdbTopicEntities = new ArrayList<BdbTopicConfEntity>();
+            List<BdbTopicConfEntity> batchModBdbTopicEntities = new ArrayList<>();
             for (BdbBrokerConfEntity tgtEntity : batchBrokerEntitySet) {
                 if (tgtEntity == null) {
                     continue;
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/simplemvc/RequestDispatcher.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/simplemvc/RequestDispatcher.java
index f772ba4..a5895c5 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/simplemvc/RequestDispatcher.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/simplemvc/RequestDispatcher.java
@@ -44,9 +44,9 @@ public class RequestDispatcher {
     private static final String SCREEN_PLACEHOLDER = "screen_placeholder";
     private final WebConfig config;
     private final TemplateEngine engine;
-    private final HashMap<String, Action> actions = new HashMap<String, Action>();
-    private final HashMap<String, String> templates = new HashMap<String, String>();
-    private final ThreadLocal<ControlTool> controlTools = new ThreadLocal<ControlTool>();
+    private final HashMap<String, Action> actions = new HashMap<>();
+    private final HashMap<String, String> templates = new HashMap<>();
+    private final ThreadLocal<ControlTool> controlTools = new ThreadLocal<>();
     private GenericXmlApplicationContext applicationContext;
 
     /**
@@ -251,7 +251,7 @@ public class RequestDispatcher {
     }
 
     private List<Class> getActionClasses(String packageName) throws Exception {
-        List<Class> classList = new ArrayList<Class>();
+        List<Class> classList = new ArrayList<>();
         String path = packageName.replaceAll("\\.", "/");
         URL url = this.getClass().getClassLoader().getResource(path);
         if (url == null) {
@@ -271,7 +271,7 @@ public class RequestDispatcher {
 
     private List<File> getFileList(String directory, String type) throws Exception {
         File file = new File(directory);
-        List<File> fileList = new ArrayList<File>();
+        List<File> fileList = new ArrayList<>();
         getFile(file, fileList, type);
         return fileList;
     }
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/simplemvc/conf/WebConfig.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/simplemvc/conf/WebConfig.java
index 26623e7..62aa55e 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/simplemvc/conf/WebConfig.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/simplemvc/conf/WebConfig.java
@@ -28,9 +28,9 @@ import org.apache.tubemq.server.master.web.simplemvc.Action;
 
 public class WebConfig {
 
-    private final HashMap<String, Object> tools = new HashMap<String, Object>();
-    private final HashMap<String, Action> actions = new HashMap<String, Action>();
-    private final HashSet<String> types = new HashSet<String>();
+    private final HashMap<String, Object> tools = new HashMap<>();
+    private final HashMap<String, Action> actions = new HashMap<>();
+    private final HashSet<String> types = new HashSet<>();
     private String resourcePath;
     private String templatePath;
     private String actionPackage;
@@ -38,7 +38,7 @@ public class WebConfig {
     private String supportedTypes = ".htm,.html";
     private String defaultPage = "index.htm";
     private boolean springSupported = false;
-    private List<String> beanFilePathList = new ArrayList<String>();
+    private List<String> beanFilePathList = new ArrayList<>();
     private boolean standalone = false;
 
     public WebConfig() {
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/tools/BdbGroupAdmin.java b/tubemq-server/src/main/java/org/apache/tubemq/server/tools/BdbGroupAdmin.java
index c248b2d..8730eca 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/tools/BdbGroupAdmin.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/tools/BdbGroupAdmin.java
@@ -58,7 +58,7 @@ public class BdbGroupAdmin {
             return;
         }
 
-        Set<InetSocketAddress> helpers = new HashSet<InetSocketAddress>();
+        Set<InetSocketAddress> helpers = new HashSet<>();
         String group = args[0];
         String[] hostAndPort = args[1].split(":");
         if (hostAndPort.length != 2) {
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/tools/StoreRepairAdmin.java b/tubemq-server/src/main/java/org/apache/tubemq/server/tools/StoreRepairAdmin.java
index 2f5f8ce..8fd1f5b 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/tools/StoreRepairAdmin.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/tools/StoreRepairAdmin.java
@@ -87,7 +87,7 @@ public class StoreRepairAdmin {
         int count = 0;
         ExecutorService executor =
                 Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() + 1);
-        List<Callable<IndexReparStore>> tasks = new ArrayList<Callable<IndexReparStore>>();
+        List<Callable<IndexReparStore>> tasks = new ArrayList<>();
         if (ls != null) {
             for (final File subDir : ls) {
                 if (subDir == null) {
@@ -127,7 +127,7 @@ public class StoreRepairAdmin {
         }
         if (count > 0) {
             CompletionService<IndexReparStore> completionService =
-                    new ExecutorCompletionService<IndexReparStore>(executor);
+                    new ExecutorCompletionService<>(executor);
             for (Callable<IndexReparStore> task : tasks) {
                 completionService.submit(task);
             }
@@ -218,7 +218,7 @@ public class StoreRepairAdmin {
         }
 
         private void loaderSegments() throws IOException {
-            final List<Segment> accum = new ArrayList<Segment>();
+            final List<Segment> accum = new ArrayList<>();
             String fileSuffix = DataStoreUtils.DATA_FILE_SUFFIX;
             final File[] ls = topicDir.listFiles();
             if (ls != null) {