You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2022/02/12 07:52:33 UTC
[incubator-inlong] branch master updated: [INLONG-2478][TubeMQ] Optimize GroupCountService logic implementation (#2479)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new b2d2c08 [INLONG-2478][TubeMQ] Optimize GroupCountService logic implementation (#2479)
b2d2c08 is described below
commit b2d2c08e41891ceb70c242e495b863aa987ec5d7
Author: gosonzhang <46...@qq.com>
AuthorDate: Sat Feb 12 15:52:29 2022 +0800
[INLONG-2478][TubeMQ] Optimize GroupCountService logic implementation (#2479)
---
.../corebase/daemon/AbstractDaemonService.java | 6 +-
.../tubemq/server/broker/BrokerServiceServer.java | 11 +-
.../server/broker/msgstore/MessageStore.java | 4 +-
.../broker/msgstore/disk/GetMessageResult.java | 12 +-
.../server/broker/msgstore/disk/MsgFileStore.java | 4 +-
.../server/broker/stats/GroupCountService.java | 146 ------------
.../stats/{CountItem.java => TrafficInfo.java} | 38 ++--
.../{CountService.java => TrafficService.java} | 29 ++-
.../server/broker/stats/TrafficStatsService.java | 245 +++++++++++++++++++++
.../tubemq/server/broker/utils/DataStoreUtils.java | 16 +-
.../server/broker/stats/GroupCountServiceTest.java | 39 ----
.../broker/stats/ServiceStatsHolderTest.java | 2 +
.../broker/stats/TrafficStatsServiceTest.java | 61 +++++
13 files changed, 379 insertions(+), 234 deletions(-)
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/daemon/AbstractDaemonService.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/daemon/AbstractDaemonService.java
index e76817b..d99006a 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/daemon/AbstractDaemonService.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/daemon/AbstractDaemonService.java
@@ -22,11 +22,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class AbstractDaemonService implements Service, Runnable {
- private static final Logger logger = LoggerFactory.getLogger(AbstractDaemonService.class);
+ private static final Logger logger =
+ LoggerFactory.getLogger(AbstractDaemonService.class);
private final String name;
private final long intervalMs;
private final Thread daemon;
- private AtomicBoolean shutdown = new AtomicBoolean(false);
+ private final AtomicBoolean shutdown =
+ new AtomicBoolean(false);
public AbstractDaemonService(final String serviceName, final long intervalMs) {
this.name = serviceName;
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java
index 75b363d..74144f8 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java
@@ -66,9 +66,8 @@ import org.apache.inlong.tubemq.server.broker.msgstore.disk.GetMessageResult;
import org.apache.inlong.tubemq.server.broker.nodeinfo.ConsumerNodeInfo;
import org.apache.inlong.tubemq.server.broker.offset.OffsetRecordInfo;
import org.apache.inlong.tubemq.server.broker.offset.OffsetService;
-import org.apache.inlong.tubemq.server.broker.stats.CountService;
-import org.apache.inlong.tubemq.server.broker.stats.GroupCountService;
import org.apache.inlong.tubemq.server.broker.stats.ServiceStatsHolder;
+import org.apache.inlong.tubemq.server.broker.stats.TrafficStatsService;
import org.apache.inlong.tubemq.server.common.TServerConstants;
import org.apache.inlong.tubemq.server.common.TStatusConstants;
import org.apache.inlong.tubemq.server.common.aaaserver.CertificateBrokerHandler;
@@ -107,9 +106,9 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
// row lock.
private final RowLock brokerRowLock;
// statistics of produce.
- private final CountService putCounterGroup;
+ private final TrafficStatsService putCounterGroup;
// statistics of consume.
- private final CountService getCounterGroup;
+ private final TrafficStatsService getCounterGroup;
// certificate handler.
private final CertificateBrokerHandler serverAuthHandler;
// consumer timeout listener.
@@ -128,8 +127,8 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
this.serverAuthHandler = tubeBroker.getServerAuthHandler();
ServiceStatusHolder.setStatisParameters(tubeConfig.getAllowedReadIOExcptCnt(),
tubeConfig.getAllowedWriteIOExcptCnt(), tubeConfig.getIoExcptStatsDurationMs());
- this.putCounterGroup = new GroupCountService("PutCounterGroup", "Producer", 60 * 1000);
- this.getCounterGroup = new GroupCountService("GetCounterGroup", "Consumer", 60 * 1000);
+ this.putCounterGroup = new TrafficStatsService("PutCounterGroup", "Producer", 60 * 1000);
+ this.getCounterGroup = new TrafficStatsService("GetCounterGroup", "Consumer", 60 * 1000);
this.heartbeatManager = new HeartbeatManager();
this.brokerRowLock =
new RowLock("Broker-RowLock", this.tubeConfig.getRowLockWaitDurMs());
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStore.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStore.java
index 3764559..61b94f4 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStore.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStore.java
@@ -47,7 +47,7 @@ import org.apache.inlong.tubemq.server.broker.msgstore.mem.GetCacheMsgResult;
import org.apache.inlong.tubemq.server.broker.msgstore.mem.MsgMemStatisInfo;
import org.apache.inlong.tubemq.server.broker.msgstore.mem.MsgMemStore;
import org.apache.inlong.tubemq.server.broker.nodeinfo.ConsumerNodeInfo;
-import org.apache.inlong.tubemq.server.broker.stats.CountItem;
+import org.apache.inlong.tubemq.server.broker.stats.TrafficInfo;
import org.apache.inlong.tubemq.server.broker.utils.DataStoreUtils;
import org.apache.inlong.tubemq.server.common.utils.AppendResult;
import org.apache.inlong.tubemq.server.common.utils.IdWorker;
@@ -219,7 +219,7 @@ public class MessageStore implements Closeable {
if (inMemCache) {
// return not found when data is under memory sink operation.
if (memMsgRlt.isSuccess) {
- HashMap<String, CountItem> countMap =
+ HashMap<String, TrafficInfo> countMap =
new HashMap<>();
List<ClientBroker.TransferedMessage> transferedMessageList =
new ArrayList<>();
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/GetMessageResult.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/GetMessageResult.java
index 5cd6fef..0c0d903 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/GetMessageResult.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/GetMessageResult.java
@@ -22,7 +22,7 @@ import java.util.HashMap;
import java.util.List;
import org.apache.inlong.tubemq.corebase.TBaseConstants;
import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientBroker.TransferedMessage;
-import org.apache.inlong.tubemq.server.broker.stats.CountItem;
+import org.apache.inlong.tubemq.server.broker.stats.TrafficInfo;
/**
* Broker's reply to Consumer's GetMessage request.
@@ -38,14 +38,14 @@ public class GetMessageResult {
public long waitTime = -1;
public boolean isSlowFreq = false;
public boolean isFromSsdFile = false;
- public HashMap<String, CountItem> tmpCounters = new HashMap<>();
+ public HashMap<String, TrafficInfo> tmpCounters = new HashMap<>();
public List<TransferedMessage> transferedMessageList = new ArrayList<>();
public long maxOffset = TBaseConstants.META_VALUE_UNDEFINED;
public GetMessageResult(boolean isSuccess, int retCode, final String errInfo,
final long reqOffset, final int lastReadOffset,
final long lastRdDataOffset, final int totalSize,
- HashMap<String, CountItem> tmpCounters,
+ HashMap<String, TrafficInfo> tmpCounters,
List<TransferedMessage> transferedMessageList) {
this(isSuccess, retCode, errInfo, reqOffset, lastReadOffset,
lastRdDataOffset, totalSize, tmpCounters, transferedMessageList, false);
@@ -54,7 +54,7 @@ public class GetMessageResult {
public GetMessageResult(boolean isSuccess, int retCode, final String errInfo,
final long reqOffset, final int lastReadOffset,
final long lastRdDataOffset, final int totalSize,
- HashMap<String, CountItem> tmpCounters,
+ HashMap<String, TrafficInfo> tmpCounters,
List<TransferedMessage> transferedMessageList,
boolean isFromSsdFile) {
this.isSuccess = isSuccess;
@@ -108,11 +108,11 @@ public class GetMessageResult {
this.waitTime = waitTime;
}
- public HashMap<String, CountItem> getTmpCounters() {
+ public HashMap<String, TrafficInfo> getTmpCounters() {
return tmpCounters;
}
- public void setTmpCounters(HashMap<String, CountItem> tmpCounters) {
+ public void setTmpCounters(HashMap<String, TrafficInfo> tmpCounters) {
this.tmpCounters = tmpCounters;
}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java
index f2c5c0d..64ad394 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java
@@ -35,8 +35,8 @@ import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientBroker;
import org.apache.inlong.tubemq.corebase.utils.ServiceStatusHolder;
import org.apache.inlong.tubemq.server.broker.BrokerConfig;
import org.apache.inlong.tubemq.server.broker.msgstore.MessageStore;
-import org.apache.inlong.tubemq.server.broker.stats.CountItem;
import org.apache.inlong.tubemq.server.broker.stats.ServiceStatsHolder;
+import org.apache.inlong.tubemq.server.broker.stats.TrafficInfo;
import org.apache.inlong.tubemq.server.broker.utils.DataStoreUtils;
import org.apache.inlong.tubemq.server.broker.utils.DiskSamplePrint;
import org.apache.inlong.tubemq.server.common.TServerConstants;
@@ -263,7 +263,7 @@ public class MsgFileStore implements Closeable {
final StringBuilder sBuilder = new StringBuilder(512);
final long curDataMaxOffset = getDataMaxOffset();
final long curDataMinOffset = getDataMinOffset();
- HashMap<String, CountItem> countMap = new HashMap<>();
+ HashMap<String, TrafficInfo> countMap = new HashMap<>();
ByteBuffer dataBuffer =
ByteBuffer.allocate(TServerConstants.CFG_STORE_DEFAULT_MSG_READ_UNIT);
List<ClientBroker.TransferedMessage> transferedMessageList =
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/GroupCountService.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/GroupCountService.java
deleted file mode 100644
index a162dd3..0000000
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/GroupCountService.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.tubemq.server.broker.stats;
-
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.inlong.tubemq.corebase.daemon.AbstractDaemonService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Statistics of broker. It use two CountSet alternatively print statistics to log.
- */
-public class GroupCountService extends AbstractDaemonService implements CountService {
- private final Logger logger;
- private final String cntHdr;
- private final CountSet[] countSets = new CountSet[2];
- private AtomicInteger index = new AtomicInteger(0);
-
- public GroupCountService(String logFileName, String countType, long scanIntervalMs) {
- super(logFileName, scanIntervalMs);
- this.cntHdr = countType;
- if (logFileName == null) {
- this.logger = LoggerFactory.getLogger(GroupCountService.class);
- } else {
- this.logger = LoggerFactory.getLogger(logFileName);
- }
- countSets[0] = new CountSet();
- countSets[1] = new CountSet();
- super.start();
- }
-
- @Override
- protected void loopProcess(long intervalMs) {
- int tmpIndex = 0;
- int befIndex = 0;
- AtomicLong curRunCnt;
- ConcurrentHashMap<String, CountItem> counters;
- while (!super.isStopped()) {
- try {
- Thread.sleep(intervalMs);
- befIndex = tmpIndex = index.get();
- if (index.compareAndSet(befIndex, (++tmpIndex) % 2)) {
- curRunCnt = countSets[befIndex].refCnt;
- do {
- try {
- Thread.sleep(10);
- } catch (InterruptedException e) {
- return;
- }
- } while (curRunCnt.get() > 0);
- counters = countSets[befIndex].counterItem;
- if (counters != null) {
- for (Map.Entry<String, CountItem> entry : counters.entrySet()) {
- logger.info("{}#{}#{}#{}", new Object[]{cntHdr, entry.getKey(),
- entry.getValue().getMsgCount(), entry.getValue().getMsgSize()});
- }
- counters.clear();
- }
- }
- } catch (InterruptedException e) {
- return;
- } catch (Throwable t) {
- //
- }
- }
- }
-
- @Override
- public void close(long waitTimeMs) {
- if (super.stop()) {
- return;
- }
- int befIndex = index.get();
- ConcurrentHashMap<String, CountItem> counters;
- for (int i = 0; i < countSets.length; i++) {
- counters = countSets[(++befIndex) % 2].counterItem;
- if (counters != null) {
- for (Map.Entry<String, CountItem> entry : counters.entrySet()) {
- logger.info("{}#{}#{}#{}", new Object[]{cntHdr, entry.getKey(),
- entry.getValue().getMsgCount(), entry.getValue().getMsgSize()});
- }
- counters.clear();
- }
- }
- }
-
- @Override
- public void add(Map<String, CountItem> counterGroup) {
- CountSet countSet = countSets[index.get()];
- countSet.refCnt.incrementAndGet();
- ConcurrentHashMap<String, CountItem> counters = countSet.counterItem;
- for (Entry<String, CountItem> entry : counterGroup.entrySet()) {
- CountItem currData = counters.get(entry.getKey());
- if (currData == null) {
- CountItem tmpData = new CountItem(0L, 0L);
- currData = counters.putIfAbsent(entry.getKey(), tmpData);
- if (currData == null) {
- currData = tmpData;
- }
- }
- currData.appendMsg(entry.getValue().getMsgCount(), entry.getValue().getMsgSize());
- }
- countSet.refCnt.decrementAndGet();
- }
-
- @Override
- public void add(String name, Long delta, int msgSize) {
- CountSet countSet = countSets[index.get()];
- countSet.refCnt.incrementAndGet();
- CountItem currData = countSet.counterItem.get(name);
- if (currData == null) {
- CountItem tmpData = new CountItem(0L, 0L);
- currData = countSet.counterItem.putIfAbsent(name, tmpData);
- if (currData == null) {
- currData = tmpData;
- }
- }
- currData.appendMsg(delta, msgSize);
- countSet.refCnt.decrementAndGet();
- }
-
- private static class CountSet {
- public AtomicLong refCnt = new AtomicLong(0);
- public ConcurrentHashMap<String, CountItem> counterItem =
- new ConcurrentHashMap<>();
- }
-}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/CountItem.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/TrafficInfo.java
similarity index 61%
rename from inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/CountItem.java
rename to inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/TrafficInfo.java
index 58ed92f..0968552 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/CountItem.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/TrafficInfo.java
@@ -17,39 +17,37 @@
package org.apache.inlong.tubemq.server.broker.stats;
-import java.util.concurrent.atomic.AtomicLong;
-
/**
* Statistic of message, contains message's count and message's size.
*/
-public class CountItem {
- AtomicLong msgCount = new AtomicLong(0);
- AtomicLong msgSize = new AtomicLong(0);
-
- public CountItem(long msgCount, long msgSize) {
- this.msgCount.set(msgCount);
- this.msgSize.set(msgSize);
- }
+public class TrafficInfo {
+ private long msgCnt = 0L;
+ private long msgSize = 0L;
- public long getMsgSize() {
- return msgSize.get();
+ public TrafficInfo() {
+ clear();
}
- public void setMsgSize(long msgSize) {
- this.msgSize.set(msgSize);
+ public TrafficInfo(long msgCount, long msgSize) {
+ this.msgCnt = msgCount;
+ this.msgSize = msgSize;
}
public long getMsgCount() {
- return msgCount.get();
+ return msgCnt;
}
- public void setMsgCount(long msgCount) {
- this.msgCount.set(msgCount);
+ public long getMsgSize() {
+ return msgSize;
}
- public void appendMsg(final long msgCount, final long msgSize) {
- this.msgCount.addAndGet(msgCount);
- this.msgSize.addAndGet(msgSize);
+ public void addMsgCntAndSize(long msgCount, long msgSize) {
+ this.msgCnt += msgCount;
+ this.msgSize += msgSize;
}
+ public void clear() {
+ this.msgCnt = 0L;
+ this.msgSize = 0L;
+ }
}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/CountService.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/TrafficService.java
similarity index 54%
rename from inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/CountService.java
rename to inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/TrafficService.java
index d502a11..ea6a7b2 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/CountService.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/TrafficService.java
@@ -19,11 +19,34 @@ package org.apache.inlong.tubemq.server.broker.stats;
import java.util.Map;
-public interface CountService {
+/**
+ * TrafficService, incoming and outgoing traffic statistics service
+ *
+ * Supports adding new metric data one by one or in batches,
+ * and outputting metric data to a file at specified intervals.
+ */
+public interface TrafficService {
+ /**
+ * Close service.
+ *
+ * @param waitTimeMs the wait time
+ */
void close(long waitTimeMs);
- void add(Map<String, CountItem> counterGroup);
+ /**
+ * Add traffic information in batches
+ *
+ * @param trafficInfos the traffic information
+ */
+ void add(Map<String, TrafficInfo> trafficInfos);
- void add(String name, Long delta, int msgSize);
+ /**
+ * Add a traffic information record
+ *
+ * @param statsKey the statistical key
+ * @param msgCnt the total message count
+ * @param msgSize the total message size
+ */
+ void add(String statsKey, long msgCnt, long msgSize);
}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/TrafficStatsService.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/TrafficStatsService.java
new file mode 100644
index 0000000..34c678b
--- /dev/null
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/TrafficStatsService.java
@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.server.broker.stats;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.inlong.tubemq.corebase.daemon.AbstractDaemonService;
+import org.apache.inlong.tubemq.corebase.metric.impl.LongOnlineCounter;
+import org.apache.inlong.tubemq.corebase.metric.impl.LongStatsCounter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * TrafficStatsService, Input and Output traffic statistics Service
+ *
+ * Due to the large amount of traffic-related metric data, this statistics service uses
+ * a daemon thread to periodically refresh the data to the special metric file
+ * for metric data collection.
+ */
+public class TrafficStatsService extends AbstractDaemonService implements TrafficService {
+ // Maximum write wait time
+ private static final long MAX_WRITING_WAIT_DLT = 5000L;
+ // Statistics output log file
+ private final Logger logger;
+ // Statistic category
+ private final String statsCat;
+ // Switchable traffic statistic units
+ private final WritableUnit[] switchableUnits = new WritableUnit[2];
+ // Current writable index
+ private final AtomicInteger writableIndex = new AtomicInteger(0);
+
+ /**
+ * Initial traffic statistics service
+ *
+ * @param logFileName the output file name
+ * @param countType the statistic type
+ * @param scanIntervalMs the snapshot interval
+ */
+ public TrafficStatsService(String logFileName, String countType, long scanIntervalMs) {
+ super(logFileName, scanIntervalMs);
+ this.statsCat = countType;
+ if (logFileName == null) {
+ this.logger = LoggerFactory.getLogger(TrafficStatsService.class);
+ } else {
+ this.logger = LoggerFactory.getLogger(logFileName);
+ }
+ switchableUnits[0] = new WritableUnit();
+ switchableUnits[1] = new WritableUnit();
+ super.start();
+ }
+
+ @Override
+ protected void loopProcess(long intervalMs) {
+ int befIndex;
+ while (!super.isStopped()) {
+ try {
+ Thread.sleep(intervalMs);
+ // Snapshot metric data
+ befIndex = writableIndex.getAndIncrement();
+ // Output 2 file
+ output2file(befIndex);
+ } catch (InterruptedException e) {
+ return;
+ } catch (Throwable t) {
+ //
+ }
+ }
+ }
+
+ @Override
+ public void close(long waitTimeMs) {
+ if (super.stop()) {
+ return;
+ }
+ // Output remain information
+ int index = writableIndex.get();
+ for (int i = 0; i < switchableUnits.length; i++) {
+ output2file(++index);
+ }
+ }
+
+ @Override
+ public void add(Map<String, TrafficInfo> trafficInfos) {
+ TrafficStatsSet tmpStatsSet;
+ TrafficStatsSet trafficStatsSet;
+ // Increment write reference count
+ switchableUnits[getIndex()].refCnt.incValue();
+ try {
+ // Accumulate statistics information
+ ConcurrentHashMap<String, TrafficStatsSet> tmpStatsSetMap =
+ switchableUnits[getIndex()].statsUnitMap;
+ for (Entry<String, TrafficInfo> entry : trafficInfos.entrySet()) {
+ trafficStatsSet = tmpStatsSetMap.get(entry.getKey());
+ if (trafficStatsSet == null) {
+ tmpStatsSet = new TrafficStatsSet();
+ trafficStatsSet = tmpStatsSetMap.putIfAbsent(entry.getKey(), tmpStatsSet);
+ if (trafficStatsSet == null) {
+ trafficStatsSet = tmpStatsSet;
+ }
+ }
+ trafficStatsSet.addMsgCntAndSize(
+ entry.getValue().getMsgCount(), entry.getValue().getMsgSize());
+ }
+ } finally {
+ // Decrement write reference count
+ switchableUnits[getIndex()].refCnt.decValue();
+ }
+ }
+
+ @Override
+ public void add(String statsKey, long msgCnt, long msgSize) {
+ // Increment write reference count
+ switchableUnits[getIndex()].refCnt.incValue();
+ try {
+ // Accumulate statistics information
+ ConcurrentHashMap<String, TrafficStatsSet> tmpStatsSetMap =
+ switchableUnits[getIndex()].statsUnitMap;
+ TrafficStatsSet trafficStatsSet = tmpStatsSetMap.get(statsKey);
+ if (trafficStatsSet == null) {
+ TrafficStatsSet tmpStatsSet = new TrafficStatsSet();
+ trafficStatsSet = tmpStatsSetMap.putIfAbsent(statsKey, tmpStatsSet);
+ if (trafficStatsSet == null) {
+ trafficStatsSet = tmpStatsSet;
+ }
+ }
+ trafficStatsSet.addMsgCntAndSize(msgCnt, msgSize);
+ } finally {
+ // Decrement write reference count
+ switchableUnits[getIndex()].refCnt.decValue();
+ }
+ }
+
+ /**
+ * Print statistics data to file
+ *
+ * @param readIndex the readable index
+ */
+ private void output2file(int readIndex) {
+ WritableUnit selectedUnit =
+ switchableUnits[getIndex(readIndex)];
+ if (selectedUnit == null) {
+ return;
+ }
+ // Wait for the data update operation to complete
+ long startTime = System.currentTimeMillis();
+ do {
+ if (System.currentTimeMillis() - startTime >= MAX_WRITING_WAIT_DLT) {
+ break;
+ }
+ try {
+ Thread.sleep(20);
+ } catch (InterruptedException e) {
+ break;
+ }
+ } while (selectedUnit.refCnt.getValue() > 0);
+ // Output data to file
+ Map<String, TrafficStatsSet> statsMap = selectedUnit.statsUnitMap;
+ for (Entry<String, TrafficStatsSet> entry : statsMap.entrySet()) {
+ logger.info("{}#{}#{}#{}", statsCat, entry.getKey(),
+ entry.getValue().msgCnt.getAndResetValue(),
+ entry.getValue().msgSize.getAndResetValue());
+ }
+ statsMap.clear();
+ }
+
+ /**
+ * Get current writable block index.
+ *
+ * @return the writable block index
+ */
+ private int getIndex() {
+ return getIndex(this.writableIndex.get());
+ }
+
+ /**
+ * Gets the metric block index based on the specified value.
+ *
+ * @param origIndex the specified value
+ * @return the metric block index
+ */
+ private int getIndex(int origIndex) {
+ return Math.abs(origIndex % 2);
+ }
+
+ /**
+ * StatsItemSet, Metric Statistics item set
+ *
+ * Currently includes the total number of messages and bytes
+ * according to the statistics dimension, which can be expanded later as needed
+ */
+ private static class TrafficStatsSet {
+ protected LongStatsCounter msgCnt =
+ new LongStatsCounter("msgCount", null);
+ protected LongStatsCounter msgSize =
+ new LongStatsCounter("msgSize", null);
+
+ public TrafficStatsSet() {
+ //
+ }
+
+ /**
+ * Accumulate the count of messages and message bytes.
+ *
+ * @param msgCount the specified message count
+ * @param msgSize the specified message size
+ */
+ public void addMsgCntAndSize(long msgCount, long msgSize) {
+ this.msgCnt.addValue(msgCount);
+ this.msgSize.addValue(msgSize);
+ }
+ }
+
+ /**
+ * WritableUnit,
+ *
+ * This class is mainly defined to facilitate reading and writing of
+ * statistic set through array operations, which contains a Map of
+ * statistic dimensions and corresponding metric values
+ */
+ private static class WritableUnit {
+ // Current writing thread count
+ public LongOnlineCounter refCnt =
+ new LongOnlineCounter("ref_count", null);
+ // statistic unit map
+ protected ConcurrentHashMap<String, TrafficStatsSet> statsUnitMap =
+ new ConcurrentHashMap<>(512);
+ }
+}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/utils/DataStoreUtils.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/utils/DataStoreUtils.java
index 5c1eabd..f74cd3f 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/utils/DataStoreUtils.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/utils/DataStoreUtils.java
@@ -27,7 +27,7 @@ import org.apache.inlong.tubemq.corebase.TokenConstants;
import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientBroker;
import org.apache.inlong.tubemq.corebase.utils.MessageFlagUtils;
import org.apache.inlong.tubemq.corebase.utils.TStringUtils;
-import org.apache.inlong.tubemq.server.broker.stats.CountItem;
+import org.apache.inlong.tubemq.server.broker.stats.TrafficInfo;
/**
* Storage util. Used for data and index file storage format.
@@ -113,10 +113,10 @@ public class DataStoreUtils {
* @param sBuilder the string buffer
* @return the converted messages
*/
- public static ClientBroker.TransferedMessage getTransferMsg(final ByteBuffer dataBuffer, int dataTotalSize,
- final HashMap<String, CountItem> countMap,
- final String statisKeyBase,
- final StringBuilder sBuilder) {
+ public static ClientBroker.TransferedMessage getTransferMsg(ByteBuffer dataBuffer, int dataTotalSize,
+ HashMap<String, TrafficInfo> countMap,
+ String statisKeyBase,
+ StringBuilder sBuilder) {
if (dataBuffer.array().length < dataTotalSize) {
return null;
}
@@ -187,11 +187,11 @@ public class DataStoreUtils {
String baseKey = sBuilder.append(statisKeyBase)
.append("#").append(messageTime).toString();
sBuilder.delete(0, sBuilder.length());
- CountItem getCount = countMap.get(baseKey);
+ TrafficInfo getCount = countMap.get(baseKey);
if (getCount == null) {
- countMap.put(baseKey, new CountItem(1L, payLoadLen2));
+ countMap.put(baseKey, new TrafficInfo(1L, payLoadLen2));
} else {
- getCount.appendMsg(1L, payLoadLen2);
+ getCount.addMsgCntAndSize(1L, payLoadLen2);
}
ClientBroker.TransferedMessage transferedMessage = dataBuilder.build();
dataBuilder.clear();
diff --git a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/stats/GroupCountServiceTest.java b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/stats/GroupCountServiceTest.java
deleted file mode 100644
index b9e80fe..0000000
--- a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/stats/GroupCountServiceTest.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.tubemq.server.broker.stats;
-
-import java.util.HashMap;
-import java.util.Map;
-import org.junit.Test;
-
-/**
- * GroupCountService test.
- */
-public class GroupCountServiceTest {
-
- @Test
- public void add() {
- GroupCountService groupCountService = new GroupCountService("PutCounterGroup", "Producer", 60 * 1000);
- groupCountService.add("key", 1L, 100);
- Map<String, CountItem> items = new HashMap<>();
- items.put("key1", new CountItem(1L, 1024));
- items.put("key2", new CountItem(1L, 1024));
- // add counts
- groupCountService.add(items);
- }
-}
diff --git a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/stats/ServiceStatsHolderTest.java b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/stats/ServiceStatsHolderTest.java
index f8ae24b..4da079d 100644
--- a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/stats/ServiceStatsHolderTest.java
+++ b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/stats/ServiceStatsHolderTest.java
@@ -75,6 +75,7 @@ public class ServiceStatsHolderTest {
Assert.assertEquals(10, retMap.get("file_sync_dlt_min").longValue());
Assert.assertEquals(1, retMap.get("file_sync_dlt_cell_8t16").longValue());
Assert.assertEquals(1, retMap.get("file_sync_dlt_cell_64t128").longValue());
+ final long sinceTime1 = retMap.get("reset_time");
// verify snapshot
ServiceStatsHolder.snapShort(retMap);
retMap.clear();
@@ -83,6 +84,7 @@ public class ServiceStatsHolderTest {
// add disk sync data, add 1
ServiceStatsHolder.updDiskSyncDataDlt(999);
ServiceStatsHolder.snapShort(retMap);
+ Assert.assertNotEquals(sinceTime1, retMap.get("reset_time").longValue());
Assert.assertEquals(2, retMap.get("consumer_online_cnt").longValue());
Assert.assertEquals(0, retMap.get("consumer_timeout_cnt").longValue());
Assert.assertEquals(0, retMap.get("broker_hb_exc_cnt").longValue());
diff --git a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/stats/TrafficStatsServiceTest.java b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/stats/TrafficStatsServiceTest.java
new file mode 100644
index 0000000..e8e08ea
--- /dev/null
+++ b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/stats/TrafficStatsServiceTest.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.server.broker.stats;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * TrafficStatsService test.
+ */
+public class TrafficStatsServiceTest {
+
+ @Test
+ public void testTrafficInfo() {
+ // case 1
+ TrafficInfo trafficInfo1 = new TrafficInfo();
+ trafficInfo1.addMsgCntAndSize(1, 100);
+ trafficInfo1.addMsgCntAndSize(3, 500);
+ Assert.assertEquals(4, trafficInfo1.getMsgCount());
+ Assert.assertEquals(600, trafficInfo1.getMsgSize());
+ trafficInfo1.clear();
+ trafficInfo1.addMsgCntAndSize(50, 5000);
+ Assert.assertEquals(50, trafficInfo1.getMsgCount());
+ Assert.assertEquals(5000, trafficInfo1.getMsgSize());
+ // case 2
+ TrafficInfo trafficInfo2 = new TrafficInfo(99, 1000);
+ trafficInfo2.addMsgCntAndSize(1, 100);
+ Assert.assertEquals(100, trafficInfo2.getMsgCount());
+ Assert.assertEquals(1100, trafficInfo2.getMsgSize());
+ }
+
+ @Test
+ public void testTrafficStatsService() {
+ TrafficStatsService trafficService =
+ new TrafficStatsService("PutCounterGroup", "Producer", 60 * 1000L);
+ trafficService.add("key", 1L, 100);
+ Map<String, TrafficInfo> items = new HashMap<>();
+ items.put("key1", new TrafficInfo(1L, 1024));
+ items.put("key2", new TrafficInfo(1L, 1024));
+ // add counts
+ trafficService.add(items);
+ trafficService.add("key3", 3L, 500L);
+ }
+}