You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/03/07 00:43:05 UTC
[incubator-pulsar] branch master updated: Log phases in two phase
compaction (#1335)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 9c6e124 Log phases in two phase compaction (#1335)
9c6e124 is described below
commit 9c6e124e2077c6a59d81b3b4de7866b9ac79a998
Author: Ivan Kelly <iv...@apache.org>
AuthorDate: Wed Mar 7 01:43:03 2018 +0100
Log phases in two phase compaction (#1335)
Log when each phase in two phase compaction starts so that if it
stalls we have something to debug by.
---
.../src/main/java/org/apache/pulsar/client/api/RawReader.java | 7 +++++++
.../java/org/apache/pulsar/client/impl/RawReaderImpl.java | 11 +++++++++++
.../java/org/apache/pulsar/compaction/TwoPhaseCompactor.java | 9 +++++++--
3 files changed, 25 insertions(+), 2 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java
index 7f7cd40..f9d297f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java
@@ -38,6 +38,13 @@ public interface RawReader {
}
/**
+ * Get the topic for the reader
+ *
+ * @return topic for the reader
+ */
+ String getTopic();
+
+ /**
* Seek to a location in the topic. After the seek, the first message read will be the one with
* with the specified message ID.
* @param messageId the message ID to seek to
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
index 75cbbbc..61a1fc2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
@@ -60,6 +60,12 @@ public class RawReaderImpl implements RawReader {
}
@Override
+ public String getTopic() {
+ return consumerConfiguration.getTopicNames().stream()
+ .findFirst().orElse(null);
+ }
+
+ @Override
public CompletableFuture<Void> seekAsync(MessageId messageId) {
return consumer.seekAsync(messageId);
}
@@ -84,6 +90,11 @@ public class RawReaderImpl implements RawReader {
return consumer.getLastMessageIdAsync();
}
+ @Override
+ public String toString() {
+ return "RawReader(topic=" + getTopic() + ")";
+ }
+
static class RawConsumerImpl extends ConsumerImpl<byte[]> {
final BlockingQueue<RawMessageAndCnx> incomingRawMessages;
final Queue<CompletableFuture<RawMessage>> pendingRawReceives;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
index 7ae788c..3e2d259 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
@@ -83,6 +83,7 @@ public class TwoPhaseCompactor extends Compactor {
if (exception != null) {
loopPromise.completeExceptionally(exception);
} else {
+ log.info("Commencing phase one of compaction for {}, reading to {}", reader, lastMessageId);
phaseOneLoop(reader, Optional.empty(), lastMessageId, latestForKey, loopPromise);
}
});
@@ -136,8 +137,12 @@ public class TwoPhaseCompactor extends Compactor {
private CompletableFuture<Long> phaseTwo(RawReader reader, MessageId from, MessageId to,
Map<String,MessageId> latestForKey, BookKeeper bk) {
- return createLedger(bk).thenCompose(
- (ledger) -> phaseTwoSeekThenLoop(reader, from, to, latestForKey, bk, ledger));
+
+ return createLedger(bk).thenCompose((ledger) -> {
+ log.info("Commencing phase two of compaction for {}, from {} to {}, compacting {} keys to ledger {}",
+ reader, from, to, latestForKey.size(), ledger.getId());
+ return phaseTwoSeekThenLoop(reader, from, to, latestForKey, bk, ledger);
+ });
}
private CompletableFuture<Long> phaseTwoSeekThenLoop(RawReader reader, MessageId from, MessageId to,
--
To stop receiving notification emails like this one, please contact
mmerli@apache.org.