You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2020/12/25 08:54:36 UTC
[incubator-tubemq] branch master updated: [TUBEMQ-482] Add offset
query api
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
The following commit(s) were added to refs/heads/master by this push:
new 6294926 [TUBEMQ-482] Add offset query api
6294926 is described below
commit 6294926e88676b21b1266d929850a873cdf3453e
Author: gosonzhang <go...@tencent.com>
AuthorDate: Fri Dec 25 16:37:14 2020 +0800
[TUBEMQ-482] Add offset query api
---
.../server/broker/metadata/TopicMetadata.java | 29 ++++++++
.../server/broker/msgstore/MessageStore.java | 11 ++-
.../broker/msgstore/MessageStoreManager.java | 62 +++++++++++++++-
.../server/broker/msgstore/StoreService.java | 6 ++
.../server/broker/offset/DefaultOffsetManager.java | 46 +++++++-----
.../tubemq/server/broker/offset/OffsetService.java | 6 +-
.../server/broker/utils/GroupOffsetInfo.java | 85 ++++++++++++++++++++++
.../server/broker/utils/TopicPubStoreInfo.java | 45 ++++++++++++
.../server/broker/web/BrokerAdminServlet.java | 79 +++++++++++++++-----
9 files changed, 323 insertions(+), 46 deletions(-)
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/TopicMetadata.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/TopicMetadata.java
index c582606..800254b 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/TopicMetadata.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/TopicMetadata.java
@@ -17,7 +17,9 @@
package org.apache.tubemq.server.broker.metadata;
+import java.util.HashMap;
import java.util.HashSet;
+import java.util.Map;
import java.util.Set;
import org.apache.tubemq.corebase.TBaseConstants;
@@ -251,6 +253,33 @@ public class TopicMetadata {
return partIds;
}
+ // builder the partitionId set for each store
+ public Map<Integer, Set<Integer>> getStorePartIdMap() {
+ Map<Integer, Set<Integer>> storePartIds = new HashMap<>();
+ for (int i = 0; i < numTopicStores; i++) {
+ Set<Integer> partIds = new HashSet<>();
+ for (int j = 0; j < numPartitions; j++) {
+ partIds.add(i * TBaseConstants.META_STORE_INS_BASE + j);
+ }
+ storePartIds.put(i, partIds);
+ }
+ return storePartIds;
+ }
+
+ public int getStoreIdByPartitionId(int partitionId) {
+ return partitionId % TBaseConstants.META_STORE_INS_BASE;
+ }
+
+ public Set<Integer> getPartIdsByStoreId(int storeId) {
+ Set<Integer> partIds = new HashSet<>();
+ if (storeId >= 0 && storeId < numTopicStores) {
+ for (int i = 0; i < numPartitions; i++) {
+ partIds.add(storeId * TBaseConstants.META_STORE_INS_BASE + i);
+ }
+ }
+ return partIds;
+ }
+
public int getStatusId() {
return statusId;
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java
index a827723..e610a8b 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java
@@ -133,9 +133,9 @@ public class MessageStore implements Closeable {
this.writeCacheFlushIntvl = topicMetadata.getMemCacheFlushIntvl();
int tmpIndexReadCnt = tubeConfig.getIndexTransCount() * partitionNum;
memMaxIndexReadCnt.set(tmpIndexReadCnt <= 6000
- ? 6000 : (tmpIndexReadCnt >= 10000 ? 10000 : tmpIndexReadCnt));
+ ? 6000 : (Math.min(tmpIndexReadCnt, 10000)));
fileMaxIndexReadCnt.set(tmpIndexReadCnt < 8000
- ? 8000 : (tmpIndexReadCnt >= 13500 ? 13500 : tmpIndexReadCnt));
+ ? 8000 : (Math.min(tmpIndexReadCnt, 13500)));
memMaxFilterIndexReadCnt.set(memMaxIndexReadCnt.get() * 2);
fileMaxFilterIndexReadCnt.set(fileMaxIndexReadCnt.get() * 3);
fileLowReqMaxFilterIndexReadCnt.set(fileMaxFilterIndexReadCnt.get() * 10);
@@ -250,8 +250,7 @@ public class MessageStore implements Closeable {
}
}
// before read from file, adjust request's offset.
- long reqNewOffset = requestOffset < this.msgFileStore.getIndexMinOffset()
- ? this.msgFileStore.getIndexMinOffset() : requestOffset;
+ long reqNewOffset = Math.max(requestOffset, this.msgFileStore.getIndexMinOffset());
if (reqSwitch <= 1 && reqNewOffset >= getFileIndexMaxOffset()) {
return new GetMessageResult(false, TErrCodeConstants.NOT_FOUND,
reqNewOffset, 0, "current offset is exceed max file offset");
@@ -409,9 +408,9 @@ public class MessageStore implements Closeable {
maxFileValidDurMs.set(parseDeletePolicy(topicMetadata.getDeletePolicy()));
int tmpIndexReadCnt = tubeConfig.getIndexTransCount() * partitionNum;
memMaxIndexReadCnt.set(tmpIndexReadCnt <= 6000
- ? 6000 : (tmpIndexReadCnt >= 10000 ? 10000 : tmpIndexReadCnt));
+ ? 6000 : (Math.min(tmpIndexReadCnt, 10000)));
fileMaxIndexReadCnt.set(tmpIndexReadCnt < 8000
- ? 8000 : (tmpIndexReadCnt >= 13500 ? 13500 : tmpIndexReadCnt));
+ ? 8000 : (Math.min(tmpIndexReadCnt, 13500)));
memMaxFilterIndexReadCnt.set(memMaxIndexReadCnt.get() * 2);
fileMaxFilterIndexReadCnt.set(fileMaxIndexReadCnt.get() * 3);
fileLowReqMaxFilterIndexReadCnt.set(fileMaxFilterIndexReadCnt.get() * 10);
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStoreManager.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStoreManager.java
index 6275f3a..2d55ba1 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStoreManager.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStoreManager.java
@@ -24,6 +24,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -51,6 +52,7 @@ import org.apache.tubemq.server.broker.metadata.TopicMetadata;
import org.apache.tubemq.server.broker.msgstore.disk.GetMessageResult;
import org.apache.tubemq.server.broker.nodeinfo.ConsumerNodeInfo;
import org.apache.tubemq.server.broker.utils.DataStoreUtils;
+import org.apache.tubemq.server.broker.utils.TopicPubStoreInfo;
import org.apache.tubemq.server.common.TStatusConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -90,8 +92,7 @@ public class MessageStoreManager implements StoreService {
this.metadataManager = this.tubeBroker.getMetadataManager();
this.isRemovingTopic.set(false);
this.maxMsgTransferSize =
- tubeConfig.getTransferSize() > DataStoreUtils.MAX_MSG_TRANSFER_SIZE
- ? DataStoreUtils.MAX_MSG_TRANSFER_SIZE : tubeConfig.getTransferSize();
+ Math.min(tubeConfig.getTransferSize(), DataStoreUtils.MAX_MSG_TRANSFER_SIZE);
this.metadataManager.addPropertyChangeListener("topicConfigMap", new PropertyChangeListener() {
@Override
public void propertyChange(final PropertyChangeEvent evt) {
@@ -385,6 +386,63 @@ public class MessageStoreManager implements StoreService {
return Collections.unmodifiableMap(this.dataStores);
}
+ /***
+ * Query topic's publish info.
+ *
+ * @param topicSet query's topic set
+ *
+ * @return the topic's offset info
+ */
+ @Override
+ public Map<String, Map<Integer, TopicPubStoreInfo>> getTopicPublishInfos(
+ Set<String> topicSet) {
+ MessageStore store = null;
+ TopicMetadata topicMetadata = null;
+ Set<String> qryTopicSet = new HashSet<>();
+ Map<String, Map<Integer, TopicPubStoreInfo>> topicPubStoreInfoMap = new HashMap<>();
+ Map<String, TopicMetadata> confTopicInfo = metadataManager.getTopicConfigMap();
+ if (topicSet == null || topicSet.isEmpty()) {
+ qryTopicSet.addAll(confTopicInfo.keySet());
+ } else {
+ for (String topic : topicSet) {
+ if (confTopicInfo.containsKey(topic)) {
+ qryTopicSet.add(topic);
+ }
+ }
+ }
+ if (qryTopicSet.isEmpty()) {
+ return topicPubStoreInfoMap;
+ }
+ for (String topic : qryTopicSet) {
+ topicMetadata = confTopicInfo.get(topic);
+ if (topicMetadata == null) {
+ continue;
+ }
+ Map<Integer, MessageStore> storeMap = dataStores.get(topic);
+ if (storeMap == null) {
+ continue;
+ }
+ Map<Integer, TopicPubStoreInfo> storeInfoMap = new HashMap<>();
+ for (Map.Entry<Integer, MessageStore> entry : storeMap.entrySet()) {
+ if (entry == null
+ || entry.getKey() == null
+ || entry.getValue() == null) {
+ continue;
+ }
+ store = entry.getValue();
+ for (Integer partitionId : topicMetadata.getPartIdsByStoreId(entry.getKey())) {
+ TopicPubStoreInfo storeInfo =
+ new TopicPubStoreInfo(topic, entry.getKey(), partitionId,
+ store.getIndexMinOffset(), store.getIndexMaxOffset(),
+ store.getDataMinOffset(), store.getDataMaxOffset());
+ storeInfoMap.put(partitionId, storeInfo);
+ }
+ }
+ topicPubStoreInfoMap.put(topic, storeInfoMap);
+ }
+ return topicPubStoreInfoMap;
+ }
+
private Set<File> getLogDirSet(final BrokerConfig tubeConfig) throws IOException {
TopicMetadata topicMetadata = null;
final Set<String> paths = new HashSet<>();
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/StoreService.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/StoreService.java
index d5f7f32..184fb4d 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/StoreService.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/StoreService.java
@@ -20,6 +20,10 @@ package org.apache.tubemq.server.broker.msgstore;
import java.util.Collection;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.tubemq.server.broker.utils.TopicPubStoreInfo;
+
/***
* Store service interface.
@@ -35,4 +39,6 @@ public interface StoreService {
MessageStore getOrCreateMessageStore(final String topic,
final int partition) throws Throwable;
+
+ Map<String, Map<Integer, TopicPubStoreInfo>> getTopicPublishInfos(Set<String> topicSet);
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/DefaultOffsetManager.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/DefaultOffsetManager.java
index bdd85b3..df3afc4 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/DefaultOffsetManager.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/DefaultOffsetManager.java
@@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.tubemq.corebase.TBaseConstants;
import org.apache.tubemq.corebase.daemon.AbstractDaemonService;
import org.apache.tubemq.corebase.utils.TStringUtils;
+import org.apache.tubemq.corebase.utils.Tuple2;
import org.apache.tubemq.server.broker.BrokerConfig;
import org.apache.tubemq.server.broker.msgstore.MessageStore;
import org.apache.tubemq.server.broker.msgstore.MessageStoreManager;
@@ -399,22 +400,26 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse
* @return group offset info in memory or zk
*/
@Override
- public Map<String, Map<Integer, Long>> queryGroupOffset(
+ public Map<String, Map<Integer, Tuple2<Long, Long>>> queryGroupOffset(
String group, Map<String, Set<Integer>> topicPartMap) {
- Map<String, Map<Integer, Long>> result = new HashMap<>();
+ Map<String, Map<Integer, Tuple2<Long, Long>>> result = new HashMap<>();
// search group from memory
Map<String, OffsetStorageInfo> topicPartOffsetMap = cfmOffsetMap.get(group);
if (topicPartOffsetMap == null) {
// query from zookeeper
for (Map.Entry<String, Set<Integer>> entry : topicPartMap.entrySet()) {
+ if (entry == null || entry.getKey() == null || entry.getValue() == null) {
+ continue;
+ }
Map<Integer, Long> qryResult =
- zkOffsetStorage.queryGroupOffsetInfo(
- group, entry.getKey(), entry.getValue());
- Map<Integer, Long> offsetMap = new HashMap<>();
+ zkOffsetStorage.queryGroupOffsetInfo(group,
+ entry.getKey(), entry.getValue());
+ Map<Integer, Tuple2<Long, Long>> offsetMap = new HashMap<>();
for (Map.Entry<Integer, Long> item : qryResult.entrySet()) {
- if (item.getValue() != null) {
- offsetMap.put(item.getKey(), item.getValue());
+ if (item == null || item.getKey() == null || item.getValue() == null) {
+ continue;
}
+ offsetMap.put(item.getKey(), new Tuple2<>(item.getValue(), 0L));
}
if (!offsetMap.isEmpty()) {
result.put(entry.getKey(), offsetMap);
@@ -422,14 +427,18 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse
}
} else {
// found in memory, get offset values
+ Map<String, Long> tmpPartOffsetMap = tmpOffsetMap.get(group);
for (Map.Entry<String, Set<Integer>> entry : topicPartMap.entrySet()) {
- Map<Integer, Long> offsetMap = new HashMap<>();
+ Map<Integer, Tuple2<Long, Long>> offsetMap = new HashMap<>();
for (Integer partitionId : entry.getValue()) {
String offsetCacheKey =
getOffsetCacheKey(entry.getKey(), partitionId);
OffsetStorageInfo offsetInfo = topicPartOffsetMap.get(offsetCacheKey);
+ Long tmpOffset = tmpPartOffsetMap.get(offsetCacheKey);
if (offsetInfo != null) {
- offsetMap.put(partitionId, offsetInfo.getOffset());
+ offsetMap.put(partitionId,
+ new Tuple2<>(offsetInfo.getOffset(),
+ (tmpOffset == null ? 0 : tmpOffset)));
}
}
if (!offsetMap.isEmpty()) {
@@ -451,9 +460,9 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse
* @return at least one record modified
*/
@Override
- public boolean modifyGroupOffset(MessageStoreManager storeManager, Set<String> groups,
- Map<String, Map<Integer, Long>> topicPartOffsetMap,
- String modifier) {
+ public boolean modifyGroupOffset(
+ MessageStoreManager storeManager, Set<String> groups,
+ Map<String, Map<Integer, Tuple2<Long, Long>>> topicPartOffsetMap, String modifier) {
long oldOffset = -1;
long reSetOffset = -1;
boolean changed = false;
@@ -461,17 +470,18 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse
StringBuilder strBuidler = new StringBuilder(512);
// set offset by group
for (String group : groups) {
- for (Map.Entry<String, Map<Integer, Long>> entry : topicPartOffsetMap.entrySet()) {
- Map<Integer, Long> partOffsetMap = entry.getValue();
+ for (Map.Entry<String, Map<Integer, Tuple2<Long, Long>>> entry
+ : topicPartOffsetMap.entrySet()) {
+ Map<Integer, Tuple2<Long, Long>> partOffsetMap = entry.getValue();
if (partOffsetMap == null) {
continue;
}
// set offset
- for (Map.Entry<Integer, Long> entry1 : partOffsetMap.entrySet()) {
+ for (Map.Entry<Integer, Tuple2<Long, Long>> entry1 : partOffsetMap.entrySet()) {
if (entry1.getValue() == null) {
continue;
}
- reSetOffset = entry1.getValue();
+ Tuple2<Long, Long> offsetTuple = entry1.getValue();
// get topic store
try {
store = storeManager.getOrCreateMessageStore(
@@ -485,8 +495,8 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse
long firstOffset = store.getIndexMinOffset();
long lastOffset = store.getIndexMaxOffset();
// adjust reseted offset value
- reSetOffset = reSetOffset < firstOffset
- ? firstOffset : Math.min(reSetOffset, lastOffset);
+ reSetOffset = offsetTuple.f0 < firstOffset
+ ? firstOffset : Math.min(offsetTuple.f0, lastOffset);
String offsetCacheKey =
getOffsetCacheKey(entry.getKey(), entry1.getKey());
getAndResetTmpOffset(group, offsetCacheKey);
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/OffsetService.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/OffsetService.java
index 05f0724..fcebdfc 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/OffsetService.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/OffsetService.java
@@ -19,6 +19,8 @@ package org.apache.tubemq.server.broker.offset;
import java.util.Map;
import java.util.Set;
+
+import org.apache.tubemq.corebase.utils.Tuple2;
import org.apache.tubemq.server.broker.msgstore.MessageStore;
import org.apache.tubemq.server.broker.msgstore.MessageStoreManager;
import org.apache.tubemq.server.common.offsetstorage.OffsetStorageInfo;
@@ -63,10 +65,10 @@ public interface OffsetService {
Set<String> getGroupSubInfo(String group);
- Map<String, Map<Integer, Long>> queryGroupOffset(
+ Map<String, Map<Integer, Tuple2<Long, Long>>> queryGroupOffset(
String group, Map<String, Set<Integer>> topicPartMap);
boolean modifyGroupOffset(MessageStoreManager storeManager, Set<String> groups,
- Map<String, Map<Integer, Long>> topicPartOffsetMap,
+ Map<String, Map<Integer, Tuple2<Long, Long>>> topicPartOffsetMap,
String modifier);
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/utils/GroupOffsetInfo.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/utils/GroupOffsetInfo.java
new file mode 100644
index 0000000..9a4abe3
--- /dev/null
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/utils/GroupOffsetInfo.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.server.broker.utils;
+
+import org.apache.tubemq.corebase.TBaseConstants;
+import org.apache.tubemq.corebase.utils.Tuple2;
+
+
+public class GroupOffsetInfo {
+ public int partitionId = TBaseConstants.META_VALUE_UNDEFINED;
+ public long offsetMin = TBaseConstants.META_VALUE_UNDEFINED;
+ public long offsetMax = TBaseConstants.META_VALUE_UNDEFINED;
+ public long dataMin = TBaseConstants.META_VALUE_UNDEFINED;
+ public long dataMax = TBaseConstants.META_VALUE_UNDEFINED;
+ public long curOffset = TBaseConstants.META_VALUE_UNDEFINED;
+ public long flightOffset = TBaseConstants.META_VALUE_UNDEFINED;
+ public long offsetLag = TBaseConstants.META_VALUE_UNDEFINED;
+ public long curDataOffset = TBaseConstants.META_VALUE_UNDEFINED;
+ public long dataLag = TBaseConstants.META_VALUE_UNDEFINED;
+
+ public GroupOffsetInfo(int partitionId) {
+ this.partitionId = partitionId;
+ }
+
+ public void setPartPubStoreInfo(TopicPubStoreInfo pubStoreInfo) {
+ if (pubStoreInfo != null) {
+ this.offsetMin = pubStoreInfo.indexStart;
+ this.offsetMax = pubStoreInfo.indexEnd;
+ this.dataMin = pubStoreInfo.dataStart;
+ this.dataMax = pubStoreInfo.dataEnd;
+ }
+ }
+
+ public void setConsumeOffsetInfo(Tuple2<Long, Long> offsetInfo) {
+ if (offsetInfo != null) {
+ this.curOffset = offsetInfo.f0;
+ this.flightOffset = offsetInfo.f1;
+ }
+ }
+
+ public void setConsumeDataOffsetInfo(long curDataOffset) {
+ if (curDataOffset >= 0) {
+ this.curDataOffset = curDataOffset;
+ }
+ }
+
+ public void calculateLag() {
+ if (offsetMax != TBaseConstants.META_VALUE_UNDEFINED
+ && curOffset != TBaseConstants.META_VALUE_UNDEFINED) {
+ offsetLag = offsetMax - curOffset;
+ }
+ if (dataMax != TBaseConstants.META_VALUE_UNDEFINED
+ && curDataOffset != TBaseConstants.META_VALUE_UNDEFINED) {
+ dataLag = dataMax - curDataOffset;
+ }
+ }
+
+ public StringBuilder buildOffsetInfo(StringBuilder sBuilder) {
+ sBuilder.append("{\"partitionId\":").append(partitionId)
+ .append(",\"curOffset\":").append(curOffset)
+ .append(",\"flightOffset\":").append(flightOffset)
+ .append(",\"curDataOffset\":").append(curDataOffset)
+ .append(",\"offsetLag\":").append(offsetLag)
+ .append(",\"dataLag\":").append(dataLag)
+ .append(",\"offsetMax\":").append(offsetMax)
+ .append(",\"dataMax\":").append(dataMax)
+ .append("}");
+ return sBuilder;
+ }
+}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/utils/TopicPubStoreInfo.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/utils/TopicPubStoreInfo.java
new file mode 100644
index 0000000..b2257dd
--- /dev/null
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/utils/TopicPubStoreInfo.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.server.broker.utils;
+
+import org.apache.tubemq.corebase.TBaseConstants;
+
+
+
+public class TopicPubStoreInfo {
+
+ public String topicName = null;
+ public int storeId = TBaseConstants.META_VALUE_UNDEFINED;
+ public int partitionId = TBaseConstants.META_VALUE_UNDEFINED;
+ public long indexStart = 0L;
+ public long indexEnd = 0L;
+ public long dataStart = 0L;
+ public long dataEnd = 0L;
+
+ public TopicPubStoreInfo(String topicName, int storeId, int partitionId,
+ long indexStart, long indexEnd, long dataStart, long dataEnd) {
+ this.topicName = topicName;
+ this.storeId = storeId;
+ this.partitionId = partitionId;
+ this.indexStart = indexStart;
+ this.indexEnd = indexEnd;
+ this.dataStart = dataStart;
+ this.dataEnd = dataEnd;
+ }
+
+}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java
index 91bfd23..9a0506e 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java
@@ -27,12 +27,15 @@ import java.util.concurrent.ConcurrentHashMap;
import javax.servlet.http.HttpServletRequest;
import org.apache.tubemq.corebase.TokenConstants;
import org.apache.tubemq.corebase.utils.TStringUtils;
+import org.apache.tubemq.corebase.utils.Tuple2;
import org.apache.tubemq.server.broker.TubeBroker;
import org.apache.tubemq.server.broker.metadata.TopicMetadata;
import org.apache.tubemq.server.broker.msgstore.MessageStore;
import org.apache.tubemq.server.broker.msgstore.MessageStoreManager;
import org.apache.tubemq.server.broker.nodeinfo.ConsumerNodeInfo;
import org.apache.tubemq.server.broker.offset.OffsetService;
+import org.apache.tubemq.server.broker.utils.GroupOffsetInfo;
+import org.apache.tubemq.server.broker.utils.TopicPubStoreInfo;
import org.apache.tubemq.server.common.fielddef.WebFieldDef;
import org.apache.tubemq.server.common.utils.ProcessResult;
import org.apache.tubemq.server.common.utils.WebParameterUtils;
@@ -672,42 +675,34 @@ public class BrokerAdminServlet extends AbstractWebHandler {
Set<String> topicSet = (Set<String>) result.retData1;
// verify the acquired Topic set and
// query the corresponding offset information
- Map<String, Map<String, Map<Integer, Long>>> groupOffsetMaps = new HashMap<>();
- for (String group : qryGroupNameSet) {
- Map<String, Set<Integer>> topicPartMap =
- validAndGetPartitions(group, topicSet);
- Map<String, Map<Integer, Long>> groupOffsetMap =
- broker.getOffsetManager().queryGroupOffset(group, topicPartMap);
- groupOffsetMaps.put(group, groupOffsetMap);
- }
+ Map<String, Map<String, Map<Integer, GroupOffsetInfo>>> groupOffsetMaps =
+ getGroupOffsetInfo(qryGroupNameSet, topicSet);
// builder result
int totalCnt = 0;
sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"Success!\",\"dataSet\":[");
- for (Map.Entry<String, Map<String, Map<Integer, Long>>> entry
+ for (Map.Entry<String, Map<String, Map<Integer, GroupOffsetInfo>>> entry
: groupOffsetMaps.entrySet()) {
if (totalCnt++ > 0) {
sBuilder.append(",");
}
- Map<String, Map<Integer, Long>> topicPartMap = entry.getValue();
+ Map<String, Map<Integer, GroupOffsetInfo>> topicPartMap = entry.getValue();
sBuilder.append("{\"groupName\":\"").append(entry.getKey())
.append("\",\"subInfo\":[");
int topicCnt = 0;
- for (Map.Entry<String, Map<Integer, Long>> entry1 : topicPartMap.entrySet()) {
+ for (Map.Entry<String, Map<Integer, GroupOffsetInfo>> entry1 : topicPartMap.entrySet()) {
if (topicCnt++ > 0) {
sBuilder.append(",");
}
- Map<Integer, Long> partOffMap = entry1.getValue();
+ Map<Integer, GroupOffsetInfo> partOffMap = entry1.getValue();
sBuilder.append("{\"topicName\":\"").append(entry1.getKey())
.append("\",\"offsets\":[");
int partCnt = 0;
- for (Map.Entry<Integer, Long> entry2 : partOffMap.entrySet()) {
+ for (Map.Entry<Integer, GroupOffsetInfo> entry2 : partOffMap.entrySet()) {
if (partCnt++ > 0) {
sBuilder.append(",");
}
- sBuilder.append("{\"").append(this.broker.getTubeConfig().getBrokerId())
- .append(TokenConstants.ATTR_SEP).append(entry1.getKey())
- .append(TokenConstants.ATTR_SEP).append(entry2.getKey())
- .append("\":").append(entry2.getValue()).append("}");
+ GroupOffsetInfo offsetInfo = entry2.getValue();
+ offsetInfo.buildOffsetInfo(sBuilder);
}
sBuilder.append("],\"partCount\":").append(partCnt).append("}");
}
@@ -777,7 +772,7 @@ public class BrokerAdminServlet extends AbstractWebHandler {
return;
}
// query offset from source group
- Map<String, Map<Integer, Long>> srcGroupOffsets =
+ Map<String, Map<Integer, Tuple2<Long, Long>>> srcGroupOffsets =
broker.getOffsetManager().queryGroupOffset(srcGroupName, topicPartMap);
boolean changed = broker.getOffsetManager().modifyGroupOffset(
broker.getStoreManager(), tgtGroupNameSet, srcGroupOffsets, modifier);
@@ -785,6 +780,48 @@ public class BrokerAdminServlet extends AbstractWebHandler {
sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"}");
}
+ // builder group's offset info
+ private Map<String, Map<String, Map<Integer, GroupOffsetInfo>>> getGroupOffsetInfo(
+ Set<String> groupSet, Set<String> topicSet) {
+ long curReadDataOffset = -2;
+ long curDataLag = -2;
+ Map<String, Map<String, Map<Integer, GroupOffsetInfo>>> groupOffsetMaps = new HashMap<>();
+ for (String group : groupSet) {
+ Map<String, Map<Integer, GroupOffsetInfo>> topicOffsetRet = new HashMap<>();
+ // valid and get topic's partitionIds
+ Map<String, Set<Integer>> topicPartMap = validAndGetPartitions(group, topicSet);
+ // get topic's publish info
+ Map<String, Map<Integer, TopicPubStoreInfo>> topicStorePubInfoMap =
+ broker.getStoreManager().getTopicPublishInfos(topicPartMap.keySet());
+ // get group's booked offset info
+ Map<String, Map<Integer, Tuple2<Long, Long>>> groupOffsetMap =
+ broker.getOffsetManager().queryGroupOffset(group, topicPartMap);
+ // get offset info array
+ for (Map.Entry<String, Set<Integer>> entry : topicPartMap.entrySet()) {
+ String topic = entry.getKey();
+ Map<Integer, GroupOffsetInfo> partOffsetRet = new HashMap<>();
+ Map<Integer, TopicPubStoreInfo> storeInfoMap = topicStorePubInfoMap.get(topic);
+ Map<Integer, Tuple2<Long, Long>> partBookedMap = groupOffsetMap.get(topic);
+ for (Integer partitionId : entry.getValue()) {
+ GroupOffsetInfo offsetInfo = new GroupOffsetInfo(partitionId);
+ offsetInfo.setPartPubStoreInfo(storeInfoMap.get(partitionId));
+ offsetInfo.setConsumeOffsetInfo(partBookedMap.get(partitionId));
+ String queryKey = buildQueryID(group, topic, partitionId);
+ ConsumerNodeInfo nodeInfo = broker.getConsumerNodeInfo(queryKey);
+ if (nodeInfo != null) {
+ offsetInfo.setConsumeDataOffsetInfo(nodeInfo.getLastDataRdOffset());
+ }
+ offsetInfo.calculateLag();
+ partOffsetRet.put(partitionId, offsetInfo);
+ }
+ topicOffsetRet.put(topic, partOffsetRet);
+ }
+ groupOffsetMaps.put(group, topicOffsetRet);
+ }
+ return groupOffsetMaps;
+ }
+
+
private Map<String, Set<Integer>> validAndGetPartitions(String group, Set<String> topicSet) {
Map<String, Set<Integer>> topicPartMap = new HashMap<>();
// query stored topic set stored in memory or zk
@@ -807,4 +844,10 @@ public class BrokerAdminServlet extends AbstractWebHandler {
return topicPartMap;
}
+ private String buildQueryID(String group, String topic, int partitionId) {
+ return new StringBuilder(512).append(group)
+ .append(TokenConstants.ATTR_SEP).append(topic)
+ .append(TokenConstants.ATTR_SEP).append(partitionId).toString();
+ }
+
}