You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/09/02 03:05:27 UTC
[pulsar] 02/05: Add compacted topic metrics for TopicStats in CLI
(#11564)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 71cc13be016646da47ba7298c585ae1f484507a5
Author: GuoJiwei <te...@apache.org>
AuthorDate: Sat Aug 21 22:58:20 2021 +0800
Add compacted topic metrics for TopicStats in CLI (#11564)
Add below metrics to help track potential flows or examine the overall condition of compacted topics .
- lastCompactionRemovedEventCount : the removed event count of last compaction
- lastCompactionSucceedTimestamp : the timestamp of last succeed compaction
- lastCompactionFailedTimestamp : the timestamp of last failed compaction
- lastCompactionDurationTimeInMills: the duration time of last compaction
These 4 metrics will be displayed in topic stats CLI :
```
./pulsar-admin topics stats persistent://tenant/ns/topic
```
This patch will add metrics in CLI , which would generate doc automatically.
(cherry picked from commit c0ef593990ce8a7ea9ee6f1def880f71def3fc97)
---
.../org/apache/pulsar/broker/PulsarService.java | 14 ++--
.../pulsar/broker/service/BrokerService.java | 13 ++-
.../broker/service/persistent/PersistentTopic.java | 22 ++++++
.../org/apache/pulsar/compaction/Compactor.java | 29 ++++---
.../apache/pulsar/compaction/CompactorMXBean.java | 57 ++++++++++++++
.../pulsar/compaction/CompactorMXBeanImpl.java | 92 ++++++++++++++++++++++
.../pulsar/compaction/TwoPhaseCompactor.java | 14 +++-
.../pulsar/compaction/CompactorMXBeanImplTest.java | 50 ++++++++++++
.../apache/pulsar/compaction/CompactorTest.java | 20 +++--
.../common/policies/data/CompactionStats.java | 37 +++++++++
.../pulsar/common/policies/data/TopicStats.java | 3 +
.../policies/data/stats/CompactionStatsImpl.java | 47 +++++++++++
.../common/policies/data/stats/TopicStatsImpl.java | 6 +-
.../policies/data/PersistentTopicStatsTest.java | 4 +
14 files changed, 383 insertions(+), 25 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index f7a1337..c09ec45 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -1313,12 +1313,16 @@ public class PulsarService implements AutoCloseable {
// only public so mockito can mock it
public Compactor newCompactor() throws PulsarServerException {
return new TwoPhaseCompactor(this.getConfiguration(),
- getClient(), getBookKeeperClient(),
- getCompactorExecutor());
+ getClient(), getBookKeeperClient(),
+ getCompactorExecutor());
}
public synchronized Compactor getCompactor() throws PulsarServerException {
- if (this.compactor == null) {
+ return getCompactor(true);
+ }
+
+ public synchronized Compactor getCompactor(boolean shouldInitialize) throws PulsarServerException {
+ if (this.compactor == null && shouldInitialize) {
this.compactor = newCompactor();
}
return this.compactor;
@@ -1327,8 +1331,8 @@ public class PulsarService implements AutoCloseable {
protected synchronized OrderedScheduler getOffloaderScheduler(OffloadPoliciesImpl offloadPolicies) {
if (this.offloaderScheduler == null) {
this.offloaderScheduler = OrderedScheduler.newSchedulerBuilder()
- .numThreads(offloadPolicies.getManagedLedgerOffloadMaxThreads())
- .name("offloader").build();
+ .numThreads(offloadPolicies.getManagedLedgerOffloadMaxThreads())
+ .name("offloader").build();
}
return this.offloaderScheduler;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index ce9abcb..74fde84 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -164,6 +164,10 @@ import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.apache.pulsar.common.util.netty.ChannelFutures;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.apache.pulsar.common.util.netty.NettyFutureUtil;
+import org.apache.pulsar.compaction.Compactor;
+import org.apache.pulsar.metadata.api.MetadataCache;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.apache.pulsar.zookeeper.ZkIsolatedBookieEnsemblePlacementPolicy;
import org.apache.pulsar.zookeeper.ZooKeeperCacheListener;
@@ -1770,8 +1774,15 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
}
}
}
-
topics.remove(topic);
+
+ try {
+ Compactor compactor = pulsar.getCompactor(false);
+ if (compactor != null) {
+ compactor.getStats().removeTopic(topic);
+ }
+ } catch (PulsarServerException ignore) {
+ }
}
public int getNumberOfNamespaceBundles() {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 2808021..b017750 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -150,6 +150,8 @@ import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.compaction.CompactedTopic;
import org.apache.pulsar.compaction.CompactedTopicImpl;
+import org.apache.pulsar.compaction.Compactor;
+import org.apache.pulsar.compaction.CompactorMXBean;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.apache.pulsar.utils.StatsOutputStream;
@@ -1899,9 +1901,29 @@ public class PersistentTopic extends AbstractTopic
stats.lastOffloadLedgerId = ledger.getLastOffloadedLedgerId();
stats.lastOffloadSuccessTimeStamp = ledger.getLastOffloadedSuccessTimestamp();
stats.lastOffloadFailureTimeStamp = ledger.getLastOffloadedFailureTimestamp();
+ Optional<CompactorMXBean> mxBean = getCompactorMXBean();
+ stats.compaction.lastCompactionRemovedEventCount = mxBean.map(stat ->
+ stat.getLastCompactionRemovedEventCount(topic)).orElse(0L);
+ stats.compaction.lastCompactionSucceedTimestamp = mxBean.map(stat ->
+ stat.getLastCompactionSucceedTimestamp(topic)).orElse(0L);
+ stats.compaction.lastCompactionFailedTimestamp = mxBean.map(stat ->
+ stat.getLastCompactionFailedTimestamp(topic)).orElse(0L);
+ stats.compaction.lastCompactionDurationTimeInMills = mxBean.map(stat ->
+ stat.getLastCompactionDurationTimeInMills(topic)).orElse(0L);
+
return stats;
}
+ private Optional<CompactorMXBean> getCompactorMXBean() {
+ Compactor compactor = null;
+ try {
+ compactor = brokerService.pulsar().getCompactor(false);
+ } catch (PulsarServerException ex) {
+ log.warn("get compactor error", ex);
+ }
+ return Optional.ofNullable(compactor).map(c -> c.getStats());
+ }
+
@Override
public CompletableFuture<PersistentTopicInternalStats> getInternalStats(boolean includeLedgerMetadata) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/Compactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/Compactor.java
index 61032d6..cb631e2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/Compactor.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/Compactor.java
@@ -42,6 +42,7 @@ public abstract class Compactor {
protected final ScheduledExecutorService scheduler;
private final PulsarClient pulsar;
private final BookKeeper bk;
+ protected final CompactorMXBeanImpl mxBean;
public Compactor(ServiceConfiguration conf,
PulsarClient pulsar,
@@ -51,6 +52,7 @@ public abstract class Compactor {
this.scheduler = scheduler;
this.pulsar = pulsar;
this.bk = bk;
+ this.mxBean = new CompactorMXBeanImpl();
}
public CompletableFuture<Long> compact(String topic) {
@@ -60,23 +62,30 @@ public abstract class Compactor {
private CompletableFuture<Long> compactAndCloseReader(RawReader reader) {
CompletableFuture<Long> promise = new CompletableFuture<>();
+ mxBean.addCompactionStartOp(reader.getTopic());
doCompaction(reader, bk).whenComplete(
(ledgerId, exception) -> {
reader.closeAsync().whenComplete((v, exception2) -> {
- if (exception2 != null) {
- log.warn("Error closing reader handle {}, ignoring", reader, exception2);
- }
- if (exception != null) {
- // complete with original exception
- promise.completeExceptionally(exception);
- } else {
- promise.complete(ledgerId);
- }
- });
+ if (exception2 != null) {
+ log.warn("Error closing reader handle {}, ignoring", reader, exception2);
+ }
+ if (exception != null) {
+ // complete with original exception
+ mxBean.addCompactionEndOp(reader.getTopic(), false);
+ promise.completeExceptionally(exception);
+ } else {
+ mxBean.addCompactionEndOp(reader.getTopic(), true);
+ promise.complete(ledgerId);
+ }
+ });
});
return promise;
}
protected abstract CompletableFuture<Long> doCompaction(RawReader reader, BookKeeper bk);
+
+ public CompactorMXBean getStats() {
+ return this.mxBean;
+ }
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorMXBean.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorMXBean.java
new file mode 100644
index 0000000..54ca2e8
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorMXBean.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.compaction;
+
+import org.apache.bookkeeper.common.annotation.InterfaceAudience;
+import org.apache.bookkeeper.common.annotation.InterfaceStability;
+
+/**
+ * JMX Bean interface for Compactor stats.
+ */
+@InterfaceAudience.LimitedPrivate
+@InterfaceStability.Stable
+public interface CompactorMXBean {
+
+ /**
+ * @return the removed event count of last compaction
+ */
+ long getLastCompactionRemovedEventCount(String topic);
+
+ /**
+ * @return the timestamp of last succeed compaction
+ */
+ long getLastCompactionSucceedTimestamp(String topic);
+
+ /**
+ * @return the timestamp of last failed compaction
+ */
+ long getLastCompactionFailedTimestamp(String topic);
+
+ /**
+ * @return the duration time of last compaction
+ */
+ long getLastCompactionDurationTimeInMills(String topic);
+
+ /**
+ * Remove metrics about this topic.
+ * @param topic
+ */
+ void removeTopic(String topic);
+
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorMXBeanImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorMXBeanImpl.java
new file mode 100644
index 0000000..05db2aa
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorMXBeanImpl.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.compaction;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.LongAdder;
+
+public class CompactorMXBeanImpl implements CompactorMXBean {
+
+ private final ConcurrentHashMap<String, CompactRecord> compactRecordOps = new ConcurrentHashMap<>();
+
+ public void addCompactionRemovedEvent(String topic) {
+ compactRecordOps.computeIfAbsent(topic, k -> new CompactRecord()).addCompactionRemovedEvent();
+ }
+
+ public void addCompactionStartOp(String topic) {
+ compactRecordOps.computeIfAbsent(topic, k -> new CompactRecord()).reset();
+ }
+
+ public void addCompactionEndOp(String topic, boolean succeed) {
+ CompactRecord compactRecord = compactRecordOps.computeIfAbsent(topic, k -> new CompactRecord());
+ compactRecord.lastCompactionDurationTimeInMills = System.currentTimeMillis()
+ - compactRecord.lastCompactionStartTimeOp;
+ compactRecord.lastCompactionRemovedEventCount = compactRecord.lastCompactionRemovedEventCountOp.longValue();
+ if (succeed) {
+ compactRecord.lastCompactionSucceedTimestamp = System.currentTimeMillis();
+ } else {
+ compactRecord.lastCompactionFailedTimestamp = System.currentTimeMillis();
+ }
+ }
+
+ @Override
+ public long getLastCompactionRemovedEventCount(String topic) {
+ return compactRecordOps.getOrDefault(topic, new CompactRecord()).lastCompactionRemovedEventCount;
+ }
+
+ @Override
+ public long getLastCompactionSucceedTimestamp(String topic) {
+ return compactRecordOps.getOrDefault(topic, new CompactRecord()).lastCompactionSucceedTimestamp;
+ }
+
+ @Override
+ public long getLastCompactionFailedTimestamp(String topic) {
+ return compactRecordOps.getOrDefault(topic, new CompactRecord()).lastCompactionFailedTimestamp;
+ }
+
+ @Override
+ public long getLastCompactionDurationTimeInMills(String topic) {
+ return compactRecordOps.getOrDefault(topic, new CompactRecord()).lastCompactionDurationTimeInMills;
+ }
+
+ @Override
+ public void removeTopic(String topic) {
+ compactRecordOps.remove(topic);
+ }
+
+ static class CompactRecord {
+
+ private long lastCompactionRemovedEventCount = 0L;
+ private long lastCompactionSucceedTimestamp = 0L;
+ private long lastCompactionFailedTimestamp = 0L;
+ private long lastCompactionDurationTimeInMills = 0L;
+
+ private LongAdder lastCompactionRemovedEventCountOp = new LongAdder();
+ private long lastCompactionStartTimeOp;
+
+ public void addCompactionRemovedEvent() {
+ lastCompactionRemovedEventCountOp.increment();
+ }
+
+ public void reset() {
+ lastCompactionRemovedEventCountOp.reset();
+ lastCompactionStartTimeOp = System.currentTimeMillis();
+ }
+ }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
index 0f0f981..49d121f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
@@ -124,18 +124,23 @@ public class TwoPhaseCompactor extends Compactor {
try {
MessageId id = m.getMessageId();
boolean deletedMessage = false;
+ boolean replaceMessage = false;
if (RawBatchConverter.isReadableBatch(m)) {
try {
for (ImmutableTriple<MessageId, String, Integer> e : RawBatchConverter
.extractIdsAndKeysAndSize(m)) {
if (e != null) {
if (e.getRight() > 0) {
- latestForKey.put(e.getMiddle(), e.getLeft());
+ MessageId old = latestForKey.put(e.getMiddle(), e.getLeft());
+ replaceMessage = old != null;
} else {
deletedMessage = true;
latestForKey.remove(e.getMiddle());
}
}
+ if (replaceMessage || deletedMessage) {
+ mxBean.addCompactionRemovedEvent(reader.getTopic());
+ }
}
} catch (IOException ioe) {
log.info("Error decoding batch for message {}. Whole batch will be included in output",
@@ -145,14 +150,17 @@ public class TwoPhaseCompactor extends Compactor {
Pair<String, Integer> keyAndSize = extractKeyAndSize(m);
if (keyAndSize != null) {
if (keyAndSize.getRight() > 0) {
- latestForKey.put(keyAndSize.getLeft(), id);
+ MessageId old = latestForKey.put(keyAndSize.getLeft(), id);
+ replaceMessage = old != null;
} else {
deletedMessage = true;
latestForKey.remove(keyAndSize.getLeft());
}
}
+ if (replaceMessage || deletedMessage) {
+ mxBean.addCompactionRemovedEvent(reader.getTopic());
+ }
}
-
MessageId first = firstMessageId.orElse(deletedMessage ? null : id);
MessageId to = deletedMessage ? toMessageId.orElse(null) : id;
if (id.compareTo(lastMessageId) == 0) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorMXBeanImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorMXBeanImplTest.java
new file mode 100644
index 0000000..b865396
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorMXBeanImplTest.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.compaction;
+
+import org.apache.pulsar.broker.service.BrokerService;
+import org.mockito.Mockito;
+import org.testng.annotations.Test;
+
+import java.util.Optional;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@Test(groups = "broker-compaction")
+public class CompactorMXBeanImplTest {
+
+ @Test
+ public void testSimple() throws Exception {
+ CompactorMXBeanImpl mxBean = new CompactorMXBeanImpl();
+ String topic = "topic1";
+ mxBean.addCompactionStartOp(topic);
+ assertEquals(mxBean.getLastCompactionRemovedEventCount(topic), 0, 0);
+ mxBean.addCompactionRemovedEvent(topic);
+ assertEquals(mxBean.getLastCompactionRemovedEventCount(topic), 0, 0);
+ mxBean.addCompactionEndOp(topic, true);
+ mxBean.addCompactionEndOp(topic, false);
+ assertEquals(mxBean.getLastCompactionRemovedEventCount(topic), 1, 0);
+ assertTrue(mxBean.getLastCompactionSucceedTimestamp(topic) > 0L);
+ assertTrue(mxBean.getLastCompactionFailedTimestamp(topic) > 0L);
+ assertTrue(mxBean.getLastCompactionDurationTimeInMills(topic) >= 0L);
+ }
+
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
index 0d1a95c..8197c4a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
@@ -84,7 +84,7 @@ public class CompactorTest extends MockedPulsarServiceBaseTest {
compactionScheduler.shutdownNow();
}
- private List<String> compactAndVerify(String topic, Map<String, byte[]> expected) throws Exception {
+ private List<String> compactAndVerify(String topic, Map<String, byte[]> expected, boolean checkMetrics) throws Exception {
BookKeeper bk = pulsar.getBookKeeperClientFactory().create(
this.conf, null, null, Optional.empty(), null);
Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
@@ -112,6 +112,16 @@ public class CompactorTest extends MockedPulsarServiceBaseTest {
"Compacted version should match expected version");
m.close();
}
+ if (checkMetrics) {
+ long compactedTopicRemovedEventCount = compactor.getStats().getLastCompactionRemovedEventCount(topic);
+ long lastCompactSucceedTimestamp = compactor.getStats().getLastCompactionSucceedTimestamp(topic);
+ long lastCompactFailedTimestamp = compactor.getStats().getLastCompactionFailedTimestamp(topic);
+ long lastCompactDurationTimeInMills = compactor.getStats().getLastCompactionDurationTimeInMills(topic);
+ Assert.assertTrue(compactedTopicRemovedEventCount >= 1);
+ Assert.assertTrue(lastCompactSucceedTimestamp >= 1L);
+ Assert.assertTrue(lastCompactDurationTimeInMills >= 0L);
+ Assert.assertEquals(lastCompactFailedTimestamp, 0L);
+ }
Assert.assertTrue(expected.isEmpty(), "All expected keys should have been found");
return keys;
}
@@ -140,7 +150,7 @@ public class CompactorTest extends MockedPulsarServiceBaseTest {
.send();
expected.put(key, data);
}
- compactAndVerify(topic, expected);
+ compactAndVerify(topic, expected, true);
}
@Test
@@ -169,7 +179,7 @@ public class CompactorTest extends MockedPulsarServiceBaseTest {
expected.put("a", "A_2".getBytes());
expected.put("b", "B_1".getBytes());
- compactAndVerify(topic, new HashMap<>(expected));
+ compactAndVerify(topic, new HashMap<>(expected), false);
producer.newMessage()
.key("b")
@@ -177,7 +187,7 @@ public class CompactorTest extends MockedPulsarServiceBaseTest {
.send();
expected.put("b", "B_2".getBytes());
- compactAndVerify(topic, expected);
+ compactAndVerify(topic, expected, false);
}
@Test
@@ -206,7 +216,7 @@ public class CompactorTest extends MockedPulsarServiceBaseTest {
expected.put("b", "B_1".getBytes());
expected.put("c", "C_1".getBytes());
- List<String> keyOrder = compactAndVerify(topic, expected);
+ List<String> keyOrder = compactAndVerify(topic, expected, false);
Assert.assertEquals(keyOrder, Lists.newArrayList("c", "b", "a"));
}
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/CompactionStats.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/CompactionStats.java
new file mode 100644
index 0000000..0500017
--- /dev/null
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/CompactionStats.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.policies.data;
+
+/**
+ * Statistics about compaction.
+ */
+public interface CompactionStats {
+
+ /** The removed event count of last compaction. */
+ long getLastCompactionRemovedEventCount();
+
+ /** The timestamp of last succeed compaction. */
+ long getLastCompactionSucceedTimestamp();
+
+ /** The timestamp of last failed compaction. */
+ long getLastCompactionFailedTimestamp();
+
+ /** The duration time of last compaction. */
+ long getLastCompactionDurationTimeInMills();
+}
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java
index afc1810..dc1964e 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java
@@ -85,4 +85,7 @@ public interface TopicStats {
/** The serialized size of non-contiguous deleted messages ranges. */
int getNonContiguousDeletedMessagesRangesSerializedSize();
+
+ /** The compaction stats. */
+ CompactionStats getCompaction();
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/CompactionStatsImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/CompactionStatsImpl.java
new file mode 100644
index 0000000..e187f6a
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/CompactionStatsImpl.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.policies.data.stats;
+
+import lombok.Data;
+import org.apache.pulsar.common.policies.data.CompactionStats;
+/**
+ * Statistics about compaction.
+ */
+@Data
+public class CompactionStatsImpl implements CompactionStats {
+
+ /** The removed event count of last compaction. */
+ public long lastCompactionRemovedEventCount;
+
+ /** The timestamp of last succeed compaction. */
+ public long lastCompactionSucceedTimestamp;
+
+ /** The timestamp of last failed compaction. */
+ public long lastCompactionFailedTimestamp;
+
+ /** The duration time of last compaction. */
+ public long lastCompactionDurationTimeInMills;
+
+ public void reset() {
+ this.lastCompactionRemovedEventCount = 0;
+ this.lastCompactionSucceedTimestamp = 0;
+ this.lastCompactionFailedTimestamp = 0;
+ this.lastCompactionDurationTimeInMills = 0;
+ }
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
index 7ad8e83..f3b3944 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
@@ -30,7 +30,6 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.TreeMap;
/**
@@ -114,6 +113,9 @@ public class TopicStatsImpl implements TopicStats {
/** The serialized size of non-contiguous deleted messages ranges. */
public int nonContiguousDeletedMessagesRangesSerializedSize;
+ /** The compaction stats */
+ public CompactionStatsImpl compaction;
+
public List<? extends PublisherStats> getPublishers() {
return publishers;
}
@@ -130,6 +132,7 @@ public class TopicStatsImpl implements TopicStats {
this.publishers = new ArrayList<>();
this.subscriptions = new HashMap<>();
this.replication = new TreeMap<>();
+ this.compaction = new CompactionStatsImpl();
}
public void reset() {
@@ -157,6 +160,7 @@ public class TopicStatsImpl implements TopicStats {
this.lastOffloadLedgerId = 0;
this.lastOffloadFailureTimeStamp = 0;
this.lastOffloadSuccessTimeStamp = 0;
+ this.compaction.reset();
}
// if the stats are added for the 1st time, we will need to make a copy of these stats and add it to the current
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PersistentTopicStatsTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PersistentTopicStatsTest.java
index fa67fb0..9c4c3b2 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PersistentTopicStatsTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PersistentTopicStatsTest.java
@@ -60,6 +60,10 @@ public class PersistentTopicStatsTest {
assertEquals(topicStats.publishers.size(), 1);
assertEquals(topicStats.subscriptions.size(), 1);
assertEquals(topicStats.replication.size(), 1);
+ assertEquals(topicStats.compaction.lastCompactionRemovedEventCount, 0);
+ assertEquals(topicStats.compaction.lastCompactionSucceedTimestamp, 0);
+ assertEquals(topicStats.compaction.lastCompactionFailedTimestamp, 0);
+ assertEquals(topicStats.compaction.lastCompactionDurationTimeInMills, 0);
topicStats.reset();
assertEquals(topicStats.msgRateIn, 0.0);
assertEquals(topicStats.msgThroughputIn, 0.0);