You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/06/27 11:49:18 UTC
[pulsar] branch master updated: [improve][broker][PIP-149]make examineMessage method async (#16226)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 0d03731db69 [improve][broker][PIP-149]make examineMessage method async (#16226)
0d03731db69 is described below
commit 0d03731db698085eeaee4d56218ee739322867a8
Author: Qiang Huang <HQ...@users.noreply.github.com>
AuthorDate: Mon Jun 27 19:49:06 2022 +0800
[improve][broker][PIP-149]make examineMessage method async (#16226)
---
.../broker/admin/impl/PersistentTopicsBase.java | 110 ++++++++++++---------
.../pulsar/broker/admin/v2/PersistentTopics.java | 14 ++-
2 files changed, 76 insertions(+), 48 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 75ba3b2c7db..c95928d9029 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2857,58 +2857,76 @@ public class PersistentTopicsBase extends AdminResource {
});
}
- protected Response internalExamineMessage(String initialPosition, long messagePosition, boolean authoritative) {
+ protected CompletableFuture<Response> internalExamineMessageAsync(String initialPosition, long messagePosition,
+ boolean authoritative) {
+ CompletableFuture<Void> ret;
if (topicName.isGlobal()) {
- validateGlobalNamespaceOwnership(namespaceName);
- }
-
- if (!topicName.isPartitioned() && getPartitionedTopicMetadata(topicName,
- authoritative, false).partitions > 0) {
- throw new RestException(Status.METHOD_NOT_ALLOWED,
- "Examine messages on a partitioned topic is not allowed, "
- + "please try examine message on specific topic partition");
- }
- validateTopicOwnership(topicName, authoritative);
- if (!(getTopicReference(topicName) instanceof PersistentTopic)) {
- log.error("[{}] Not supported operation of non-persistent topic {} ", clientAppId(), topicName);
- throw new RestException(Status.METHOD_NOT_ALLOWED,
- "Examine messages on a non-persistent topic is not allowed");
- }
-
- if (messagePosition < 1) {
- messagePosition = 1;
+ ret = validateGlobalNamespaceOwnershipAsync(namespaceName);
+ } else {
+ ret = CompletableFuture.completedFuture(null);
}
- if (null == initialPosition) {
- initialPosition = "latest";
+ ret = ret.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative));
+ long messagePositionLocal = messagePosition < 1 ? 1 : messagePosition;
+ String initialPositionLocal = initialPosition == null ? "latest" : initialPosition;
+ if (!topicName.isPartitioned()) {
+ ret = ret.thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, authoritative, false))
+ .thenCompose(partitionedTopicMetadata -> {
+ if (partitionedTopicMetadata.partitions > 0) {
+ throw new RestException(Status.METHOD_NOT_ALLOWED,
+ "Examine messages on a partitioned topic is not allowed, "
+ + "please try examine message on specific topic partition");
+ } else {
+ return CompletableFuture.completedFuture(null);
+ }
+ });
}
+ return ret.thenCompose(__ -> getTopicReferenceAsync(topicName))
+ .thenCompose(topic -> {
+ if (!(topic instanceof PersistentTopic)) {
+ log.error("[{}] Not supported operation of non-persistent topic {} ", clientAppId(), topicName);
+ throw new RestException(Status.METHOD_NOT_ALLOWED,
+ "Examine messages on a non-persistent topic is not allowed");
+ }
+ try {
+ PersistentTopic persistentTopic = (PersistentTopic) topic;
+ long totalMessage = persistentTopic.getNumberOfEntries();
+ PositionImpl startPosition = persistentTopic.getFirstPosition();
+
+ long messageToSkip = initialPositionLocal.equals("earliest") ? messagePositionLocal :
+ totalMessage - messagePositionLocal + 1;
+ CompletableFuture<Entry> future = new CompletableFuture<>();
+ PositionImpl readPosition = persistentTopic.getPositionAfterN(startPosition, messageToSkip);
+ persistentTopic.asyncReadEntry(readPosition, new AsyncCallbacks.ReadEntryCallback() {
+ @Override
+ public void readEntryComplete(Entry entry, Object ctx) {
+ future.complete(entry);
+ }
- try {
- PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
- long totalMessage = topic.getNumberOfEntries();
- PositionImpl startPosition = topic.getFirstPosition();
- long messageToSkip =
- initialPosition.equals("earliest") ? messagePosition : totalMessage - messagePosition + 1;
- CompletableFuture<Entry> future = new CompletableFuture<>();
- PositionImpl readPosition = topic.getPositionAfterN(startPosition, messageToSkip);
- topic.asyncReadEntry(readPosition, new AsyncCallbacks.ReadEntryCallback() {
- @Override
- public void readEntryComplete(Entry entry, Object ctx) {
- future.complete(entry);
- }
+ @Override
+ public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
+ future.completeExceptionally(exception);
+ }
+ }, null);
+ return future;
+ } catch (ManagedLedgerException exception) {
+ log.error("[{}] Failed to examine message at position {} from {} due to {}", clientAppId(),
+ messagePosition,
+ topicName, exception);
+ throw new RestException(exception);
+ }
- @Override
- public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
- future.completeExceptionally(exception);
- }
- }, null);
- return generateResponseWithEntry(future.get());
- } catch (Exception exception) {
- exception.printStackTrace();
- log.error("[{}] Failed to examine message at position {} from {} due to {}", clientAppId(), messagePosition,
- topicName, exception);
- throw new RestException(exception);
- }
+ }).thenApply(entry -> {
+ try {
+ return generateResponseWithEntry(entry);
+ } catch (IOException exception) {
+ throw new RestException(exception);
+ } finally {
+ if (entry != null) {
+ entry.release();
+ }
+ }
+ });
}
private Response generateResponseWithEntry(Entry entry) throws IOException {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 55bacfec03c..bfcbd3169d3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -1743,7 +1743,8 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiResponse(code = 405, message = "If given partitioned topic"),
@ApiResponse(code = 412, message = "Topic name is not valid"),
@ApiResponse(code = 500, message = "Internal server error")})
- public Response examineMessage(
+ public void examineMessage(
+ @Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@@ -1761,7 +1762,16 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- return internalExamineMessage(initialPosition, messagePosition, authoritative);
+ internalExamineMessageAsync(initialPosition, messagePosition, authoritative)
+ .thenAccept(asyncResponse::resume)
+ .exceptionally(ex -> {
+ if (!isRedirectException(ex)) {
+ log.error("[{}] Failed to examine a specific message on the topic {}", clientAppId(), topicName,
+ ex);
+ }
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@GET