You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/02/10 00:16:50 UTC
[pulsar] branch master updated: Expire message by position. (#9514)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 301d42c Expire message by position. (#9514)
301d42c is described below
commit 301d42ce04ceb20797cc2a60267e6ec39ff05e95
Author: Marvin Cai <zx...@streamnative.io>
AuthorDate: Tue Feb 9 16:16:29 2021 -0800
Expire message by position. (#9514)
Fixes #2736
### Motivation
Add ability to expire message for subscription by position to admin client and admin rest API.
### Modifications
Update PersistentMessageExpiryMonitor to able to expire message by position and expose to admin client and admin rest api.
### Verifying this change
This change added tests and can be verified as follows:
- Added unit test for expire message by position in PersistentMessageFinderTest.
- Added test for expire message by position in admin client/admin rest api.
---
.../broker/admin/impl/PersistentTopicsBase.java | 258 ++++++++++++++-------
.../pulsar/broker/admin/v1/PersistentTopics.java | 41 +++-
.../pulsar/broker/admin/v2/PersistentTopics.java | 42 +++-
.../apache/pulsar/broker/service/Subscription.java | 2 +
.../nonpersistent/NonPersistentSubscription.java | 6 +
.../persistent/PersistentMessageExpiryMonitor.java | 27 +++
.../service/persistent/PersistentReplicator.java | 12 +
.../service/persistent/PersistentSubscription.java | 5 +
.../apache/pulsar/broker/admin/AdminApiTest.java | 95 ++++++--
.../service/PersistentMessageFinderTest.java | 86 ++++++-
.../api/AuthorizationProducerConsumerTest.java | 2 +
.../org/apache/pulsar/client/admin/Topics.java | 34 +++
.../pulsar/client/admin/internal/TopicsImpl.java | 27 +++
.../pulsar/admin/cli/PulsarAdminToolTest.java | 5 +
.../org/apache/pulsar/admin/cli/CliCommand.java | 6 +-
.../org/apache/pulsar/admin/cli/CmdTopics.java | 31 ++-
.../apache/pulsar/client/impl/ResetCursorData.java | 2 +
17 files changed, 567 insertions(+), 114 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 82a0e5c..2f01b04 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -1838,7 +1838,7 @@ public class PersistentTopicsBase extends AdminResource {
topic.getReplicators().forEach((subName, replicator) -> {
try {
- internalExpireMessagesForSinglePartition(subName, expireTimeInSeconds, authoritative);
+ internalExpireMessagesByTimestampForSinglePartition(subName, expireTimeInSeconds, authoritative);
} catch (Throwable t) {
exception.set(t);
}
@@ -1846,7 +1846,7 @@ public class PersistentTopicsBase extends AdminResource {
topic.getSubscriptions().forEach((subName, subscriber) -> {
try {
- internalExpireMessagesForSinglePartition(subName, expireTimeInSeconds, authoritative);
+ internalExpireMessagesByTimestampForSinglePartition(subName, expireTimeInSeconds, authoritative);
} catch (Throwable t) {
exception.set(t);
}
@@ -2198,82 +2198,9 @@ public class PersistentTopicsBase extends AdminResource {
return;
}
CompletableFuture<Integer> batchSizeFuture = new CompletableFuture<>();
- if (batchIndex >= 0) {
- try {
- ManagedLedgerImpl ledger = (ManagedLedgerImpl) topic.getManagedLedger();
- ledger.asyncReadEntry(new PositionImpl(messageId.getLedgerId(),
- messageId.getEntryId()), new AsyncCallbacks.ReadEntryCallback() {
- @Override
- public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
- // Since we can't read the message from the storage layer,
- // it might be an already delete message ID or an invalid message ID
- // We should fall back to non batch index seek.
- batchSizeFuture.complete(0);
- }
-
- @Override
- public void readEntryComplete(Entry entry, Object ctx) {
- try {
- try {
- if (entry == null) {
- batchSizeFuture.complete(0);
- } else {
- MessageMetadata metadata =
- Commands.parseMessageMetadata(entry.getDataBuffer());
- batchSizeFuture.complete(metadata.getNumMessagesInBatch());
- }
- } catch (Exception e) {
- batchSizeFuture.completeExceptionally(new RestException(e));
- }
- } finally {
- if (entry != null) {
- entry.release();
- }
- }
- }
- }, null);
- } catch (NullPointerException npe) {
- batchSizeFuture.completeExceptionally(new RestException(Status.NOT_FOUND, "Message not found"));
- } catch (Exception exception) {
- log.error("[{}] Failed to get message with ledgerId {} entryId {} from {}",
- clientAppId(), messageId.getLedgerId(), messageId.getEntryId(), topicName, exception);
- batchSizeFuture.completeExceptionally(new RestException(exception));
- }
- } else {
- batchSizeFuture.complete(0);
- }
+ getEntryBatchSize(batchSizeFuture, topic, messageId, batchIndex);
batchSizeFuture.thenAccept(bi -> {
- PositionImpl seekPosition;
- if (bi > 0) {
- long[] ackSet;
- BitSetRecyclable bitSet = BitSetRecyclable.create();
- bitSet.set(0, bi);
- if (isExcluded) {
- bitSet.clear(0, Math.max(batchIndex + 1, 0));
- if (bitSet.length() > 0) {
- ackSet = bitSet.toLongArray();
- seekPosition = PositionImpl.get(messageId.getLedgerId(),
- messageId.getEntryId(), ackSet);
- } else {
- seekPosition = PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId());
- seekPosition = seekPosition.getNext();
- }
- } else {
- if (batchIndex - 1 >= 0) {
- bitSet.clear(0, batchIndex);
- ackSet = bitSet.toLongArray();
- seekPosition = PositionImpl.get(messageId.getLedgerId(),
- messageId.getEntryId(), ackSet);
- } else {
- seekPosition = PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId());
- }
- }
- bitSet.recycle();
- } else {
- seekPosition = PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId());
- seekPosition = isExcluded ? seekPosition.getNext() : seekPosition;
- }
-
+ PositionImpl seekPosition = calculatePositionAckSet(isExcluded, bi, batchIndex, messageId);
sub.resetCursor(seekPosition).thenRun(() -> {
log.info("[{}][{}] successfully reset cursor on subscription {} to position {}", clientAppId(),
topicName, subName, messageId);
@@ -2305,6 +2232,89 @@ public class PersistentTopicsBase extends AdminResource {
}
}
+ private void getEntryBatchSize(CompletableFuture<Integer> batchSizeFuture, PersistentTopic topic,
+ MessageIdImpl messageId, int batchIndex) {
+ if (batchIndex >= 0) {
+ try {
+ ManagedLedgerImpl ledger = (ManagedLedgerImpl) topic.getManagedLedger();
+ ledger.asyncReadEntry(new PositionImpl(messageId.getLedgerId(),
+ messageId.getEntryId()), new AsyncCallbacks.ReadEntryCallback() {
+ @Override
+ public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
+ // Since we can't read the message from the storage layer,
+ // it might be an already delete message ID or an invalid message ID
+ // We should fall back to non batch index seek.
+ batchSizeFuture.complete(0);
+ }
+
+ @Override
+ public void readEntryComplete(Entry entry, Object ctx) {
+ try {
+ try {
+ if (entry == null) {
+ batchSizeFuture.complete(0);
+ } else {
+ MessageMetadata metadata =
+ Commands.parseMessageMetadata(entry.getDataBuffer());
+ batchSizeFuture.complete(metadata.getNumMessagesInBatch());
+ }
+ } catch (Exception e) {
+ batchSizeFuture.completeExceptionally(new RestException(e));
+ }
+ } finally {
+ if (entry != null) {
+ entry.release();
+ }
+ }
+ }
+ }, null);
+ } catch (NullPointerException npe) {
+ batchSizeFuture.completeExceptionally(new RestException(Status.NOT_FOUND, "Message not found"));
+ } catch (Exception exception) {
+ log.error("[{}] Failed to get message with ledgerId {} entryId {} from {}",
+ clientAppId(), messageId.getLedgerId(), messageId.getEntryId(), topicName, exception);
+ batchSizeFuture.completeExceptionally(new RestException(exception));
+ }
+ } else {
+ batchSizeFuture.complete(0);
+ }
+ }
+
+ private PositionImpl calculatePositionAckSet(boolean isExcluded, int batchSize,
+ int batchIndex, MessageIdImpl messageId) {
+ PositionImpl seekPosition;
+ if (batchSize > 0) {
+ long[] ackSet;
+ BitSetRecyclable bitSet = BitSetRecyclable.create();
+ bitSet.set(0, batchSize);
+ if (isExcluded) {
+ bitSet.clear(0, Math.max(batchIndex + 1, 0));
+ if (bitSet.length() > 0) {
+ ackSet = bitSet.toLongArray();
+ seekPosition = PositionImpl.get(messageId.getLedgerId(),
+ messageId.getEntryId(), ackSet);
+ } else {
+ seekPosition = PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId());
+ seekPosition = seekPosition.getNext();
+ }
+ } else {
+ if (batchIndex - 1 >= 0) {
+ bitSet.clear(0, batchIndex);
+ ackSet = bitSet.toLongArray();
+ seekPosition = PositionImpl.get(messageId.getLedgerId(),
+ messageId.getEntryId(), ackSet);
+ } else {
+ seekPosition = PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId());
+ }
+ }
+ bitSet.recycle();
+ } else {
+ seekPosition = PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId());
+ seekPosition = isExcluded ? seekPosition.getNext() : seekPosition;
+ }
+ return seekPosition;
+ }
+
protected void internalGetMessageById(AsyncResponse asyncResponse, long ledgerId, long entryId,
boolean authoritative) {
try {
@@ -2931,16 +2941,15 @@ public class PersistentTopicsBase extends AdminResource {
}
}
-
- protected void internalExpireMessages(AsyncResponse asyncResponse, String subName, int expireTimeInSeconds,
- boolean authoritative) {
+ protected void internalExpireMessagesByTimestamp(AsyncResponse asyncResponse, String subName,
+ int expireTimeInSeconds, boolean authoritative) {
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
// If the topic name is a partition name, no need to get partition topic metadata again
if (topicName.isPartitioned()) {
try {
- internalExpireMessagesForSinglePartition(subName, expireTimeInSeconds, authoritative);
+ internalExpireMessagesByTimestampForSinglePartition(subName, expireTimeInSeconds, authoritative);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
return;
@@ -2991,7 +3000,7 @@ public class PersistentTopicsBase extends AdminResource {
});
} else {
try {
- internalExpireMessagesForSinglePartition(subName, expireTimeInSeconds, authoritative);
+ internalExpireMessagesByTimestampForSinglePartition(subName, expireTimeInSeconds, authoritative);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
return;
@@ -3004,7 +3013,7 @@ public class PersistentTopicsBase extends AdminResource {
}
}
- private void internalExpireMessagesForSinglePartition(String subName, int expireTimeInSeconds,
+ private void internalExpireMessagesByTimestampForSinglePartition(String subName, int expireTimeInSeconds,
boolean authoritative) {
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
@@ -3048,6 +3057,89 @@ public class PersistentTopicsBase extends AdminResource {
}
}
+ protected void internalExpireMessagesByPosition(AsyncResponse asyncResponse, String subName, boolean authoritative,
+ MessageIdImpl messageId, boolean isExcluded, int batchIndex) {
+ if (topicName.isGlobal()) {
+ try {
+ validateGlobalNamespaceOwnership(namespaceName);
+ } catch (Exception e) {
+ log.warn("[{}][{}] Failed to expire messages on subscription {} to position {}: {}", clientAppId(),
+ topicName, subName, messageId, e.getMessage());
+ resumeAsyncResponseExceptionally(asyncResponse, e);
+ return;
+ }
+ }
+
+ log.info("[{}][{}] received expire messages on subscription {} to position {}", clientAppId(), topicName,
+ subName, messageId);
+
+ // If the topic name is a partition name, no need to get partition topic metadata again
+ if (!topicName.isPartitioned() && getPartitionedTopicMetadata(topicName, authoritative, false).partitions > 0) {
+ log.warn("[{}] Not supported operation expire message up to {} on partitioned-topic {} {}",
+ clientAppId(), messageId, topicName, subName);
+ asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
+ "Expire message at position is not supported for partitioned-topic"));
+ return;
+ } else if (messageId.getPartitionIndex() != topicName.getPartitionIndex()) {
+ log.warn("[{}] Invalid parameter for expire message by position, partition index of passed in message"
+ + " position {} doesn't match partition index of topic requested {}.",
+ clientAppId(), messageId, topicName);
+ asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
+ "Invalid parameter for expire message by position, partition index of message position "
+ + "passed in doesn't match partition index for the topic."));
+ } else {
+ validateAdminAccessForSubscriber(subName, authoritative);
+ validateReadOperationOnTopic(authoritative);
+ PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
+ if (topic == null) {
+ asyncResponse.resume(new RestException(Status.NOT_FOUND, "Topic not found"));
+ return;
+ }
+ try {
+ PersistentSubscription sub = topic.getSubscription(subName);
+ if (sub == null) {
+ asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found"));
+ return;
+ }
+ CompletableFuture<Integer> batchSizeFuture = new CompletableFuture<>();
+ getEntryBatchSize(batchSizeFuture, topic, messageId, batchIndex);
+ batchSizeFuture.thenAccept(bi -> {
+ PositionImpl position = calculatePositionAckSet(isExcluded, bi, batchIndex, messageId);
+ try {
+ if (subName.startsWith(topic.getReplicatorPrefix())) {
+ String remoteCluster = PersistentReplicator.getRemoteCluster(subName);
+ PersistentReplicator repl = (PersistentReplicator)
+ topic.getPersistentReplicator(remoteCluster);
+ checkNotNull(repl);
+ repl.expireMessages(position);
+ } else {
+ checkNotNull(sub);
+ sub.expireMessages(position);
+ }
+ log.info("[{}] Message expire issued up to {} on {} {}", clientAppId(), position, topicName,
+ subName);
+ } catch (NullPointerException npe) {
+ throw new RestException(Status.NOT_FOUND, "Subscription not found");
+ } catch (Exception exception) {
+ log.error("[{}] Failed to expire messages up to {} on {} with subscription {} {}",
+ clientAppId(), position, topicName, subName, exception);
+ throw new RestException(exception);
+ }
+ }).exceptionally(e -> {
+ log.error("[{}] Failed to expire messages up to {} on {} with subscription {} {}", clientAppId(),
+ messageId, topicName, subName, e);
+ asyncResponse.resume(e);
+ return null;
+ });
+ } catch (Exception e) {
+ log.warn("[{}][{}] Failed to expire messages up to {} on subscription {} to position {}",
+ clientAppId(), topicName, messageId, subName, messageId, e);
+ resumeAsyncResponseExceptionally(asyncResponse, e);
+ }
+ }
+ asyncResponse.resume(Response.noContent().build());
+ }
+
protected void internalTriggerCompaction(AsyncResponse asyncResponse, boolean authoritative) {
log.info("[{}] Trigger compaction on topic {}", clientAppId(), topicName);
try {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index 78392c2..46708dd 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -462,7 +462,46 @@ public class PersistentTopics extends PersistentTopicsBase {
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
try {
validateTopicName(property, cluster, namespace, encodedTopic);
- internalExpireMessages(asyncResponse, decode(encodedSubName), expireTimeInSeconds, authoritative);
+ internalExpireMessagesByTimestamp(asyncResponse, decode(encodedSubName),
+ expireTimeInSeconds, authoritative);
+ } catch (WebApplicationException wae) {
+ asyncResponse.resume(wae);
+ } catch (Exception e) {
+ asyncResponse.resume(new RestException(e));
+ }
+ }
+
+ @POST
+ @Path("/{property}/{cluster}/{namespace}/{topic}/subscription/{subName}/expireMessages")
+ @ApiOperation(value = "Expiry messages on a topic subscription.")
+ @ApiResponses(value = {
+ @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
+ @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or"
+ + "subscriber is not authorized to access this operation"),
+ @ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Topic or subscription does not exist"),
+ @ApiResponse(code = 405, message = "Expiry messages on a non-persistent topic is not allowed"),
+ @ApiResponse(code = 500, message = "Internal server error"),
+ @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")})
+ public void expireTopicMessages(
+ @Suspended final AsyncResponse asyncResponse,
+ @PathParam("property") String property, @PathParam("cluster") String cluster,
+ @ApiParam(value = "Specify the namespace", required = true)
+ @PathParam("namespace") String namespace,
+ @ApiParam(value = "Specify topic name", required = true)
+ @PathParam("topic") @Encoded String encodedTopic,
+ @ApiParam(value = "Subscription to be Expiry messages on")
+ @PathParam("subName") String encodedSubName,
+ @ApiParam(value = "Is authentication required to perform this operation")
+ @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
+ @ApiParam(name = "messageId", value = "messageId to reset back to (ledgerId:entryId)")
+ ResetCursorData resetCursorData) {
+ try {
+ validateTopicName(property, cluster, namespace, encodedTopic);
+ internalExpireMessagesByPosition(asyncResponse, decode(encodedSubName), authoritative,
+ new MessageIdImpl(resetCursorData.getLedgerId(),
+ resetCursorData.getEntryId(), resetCursorData.getPartitionIndex())
+ , resetCursorData.isExcluded(), resetCursorData.getBatchIndex());
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index a569339..851e265 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -1197,7 +1197,47 @@ public class PersistentTopics extends PersistentTopicsBase {
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
try {
validateTopicName(tenant, namespace, encodedTopic);
- internalExpireMessages(asyncResponse, decode(encodedSubName), expireTimeInSeconds, authoritative);
+ internalExpireMessagesByTimestamp(asyncResponse, decode(encodedSubName),
+ expireTimeInSeconds, authoritative);
+ } catch (WebApplicationException wae) {
+ asyncResponse.resume(wae);
+ } catch (Exception e) {
+ asyncResponse.resume(new RestException(e));
+ }
+ }
+
+ @POST
+ @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/expireMessages")
+ @ApiOperation(value = "Expiry messages on a topic subscription.")
+ @ApiResponses(value = {
+ @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
+ @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or"
+ + "subscriber is not authorized to access this operation"),
+ @ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Topic or subscription does not exist"),
+ @ApiResponse(code = 405, message = "Expiry messages on a non-persistent topic is not allowed"),
+ @ApiResponse(code = 500, message = "Internal server error"),
+ @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")})
+ public void expireTopicMessages(
+ @Suspended final AsyncResponse asyncResponse,
+ @ApiParam(value = "Specify the tenant", required = true)
+ @PathParam("tenant") String tenant,
+ @ApiParam(value = "Specify the namespace", required = true)
+ @PathParam("namespace") String namespace,
+ @ApiParam(value = "Specify topic name", required = true)
+ @PathParam("topic") @Encoded String encodedTopic,
+ @ApiParam(value = "Subscription to be Expiry messages on")
+ @PathParam("subName") String encodedSubName,
+ @ApiParam(value = "Is authentication required to perform this operation")
+ @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
+ @ApiParam(name = "messageId", value = "messageId to reset back to (ledgerId:entryId)")
+ ResetCursorData resetCursorData) {
+ try {
+ validateTopicName(tenant, namespace, encodedTopic);
+ internalExpireMessagesByPosition(asyncResponse, decode(encodedSubName), authoritative,
+ new MessageIdImpl(resetCursorData.getLedgerId(),
+ resetCursorData.getEntryId(), resetCursorData.getPartitionIndex())
+ , resetCursorData.isExcluded(), resetCursorData.getBatchIndex());
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
index 05b2d28..95a470d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
@@ -85,6 +85,8 @@ public interface Subscription {
void expireMessages(int messageTTLInSeconds);
+ void expireMessages(Position position);
+
void redeliverUnacknowledgedMessages(Consumer consumer);
void redeliverUnacknowledgedMessages(Consumer consumer, List<PositionImpl> positions);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
index 79edaab..c347d36 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
@@ -431,6 +431,12 @@ public class NonPersistentSubscription implements Subscription {
// No-op
}
+ @Override
+ public void expireMessages(Position position) {
+ throw new UnsupportedOperationException("Expire message by position is not supported for"
+ + " non-persistent topic.");
+ }
+
public NonPersistentSubscriptionStats getStats() {
NonPersistentSubscriptionStats subStats = new NonPersistentSubscriptionStats();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
index b66c66d..29848e3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
@@ -28,6 +28,7 @@ import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.NonRecoverableLedgerException;
import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.stats.Rate;
@@ -94,6 +95,32 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback {
}
}
+ public void expireMessages(Position messagePosition) {
+ // If it's beyond last position of this topic, do nothing.
+ if (((PositionImpl) subscription.getTopic().getLastPosition()).compareTo((PositionImpl) messagePosition) < 0) {
+ return;
+ }
+ if (expirationCheckInProgressUpdater.compareAndSet(this, FALSE, TRUE)) {
+ log.info("[{}][{}] Starting message expiry check, position= {} seconds", topicName, subName,
+ messagePosition);
+
+ cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries, entry -> {
+ try {
+ // If given position larger than entry position.
+ return ((PositionImpl) entry.getPosition()).compareTo((PositionImpl) messagePosition) <= 0;
+ } finally {
+ entry.release();
+ }
+ }, this, null);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}][{}] Ignore expire-message scheduled task, last check is still running", topicName,
+ subName);
+ }
+ }
+ }
+
+
public void updateRates() {
msgExpired.calculateRate();
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index cc28af7..52c1957 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -727,6 +727,18 @@ public class PersistentReplicator extends AbstractReplicator
}
}
+ public void expireMessages(Position position) {
+ if ((cursor.getNumberOfEntriesInBacklog(false) == 0)
+ || (cursor.getNumberOfEntriesInBacklog(false) < MINIMUM_BACKLOG_FOR_EXPIRY_CHECK
+ && !topic.isOldestMessageExpired(cursor, messageTTLInSeconds))) {
+ // don't do anything for almost caught-up connected subscriptions
+ return;
+ }
+ if (expiryMonitor != null) {
+ expiryMonitor.expireMessages(messageTTLInSeconds);
+ }
+ }
+
@Override
public Optional<DispatchRateLimiter> getRateLimiter() {
return dispatchRateLimiter;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 2df6b4c..a666edb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -904,6 +904,11 @@ public class PersistentSubscription implements Subscription {
expiryMonitor.expireMessages(messageTTLInSeconds);
}
+ @Override
+ public void expireMessages(Position position) {
+ expiryMonitor.expireMessages(position);
+ }
+
public double getExpiredMessageRate() {
return expiryMonitor.getMessageExpiryRate();
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index cede9b7..ac8fdab 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -123,6 +123,7 @@ import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.compaction.Compactor;
+import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
@@ -1603,16 +1604,17 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
long messageTimestamp = System.currentTimeMillis();
long secondTimestamp = System.currentTimeMillis();
- private void publishMessagesOnPersistentTopic(String topicName, int messages) throws Exception {
- publishMessagesOnPersistentTopic(topicName, messages, 0, false);
+ private List<MessageId> publishMessagesOnPersistentTopic(String topicName, int messages) throws Exception {
+ return publishMessagesOnPersistentTopic(topicName, messages, 0, false);
}
- private void publishNullValueMessageOnPersistentTopic(String topicName, int messages) throws Exception {
- publishMessagesOnPersistentTopic(topicName, messages, 0, true);
+ private List<MessageId> publishNullValueMessageOnPersistentTopic(String topicName, int messages) throws Exception {
+ return publishMessagesOnPersistentTopic(topicName, messages, 0, true);
}
- private void publishMessagesOnPersistentTopic(String topicName, int messages, int startIdx,
+ private List<MessageId> publishMessagesOnPersistentTopic(String topicName, int messages, int startIdx,
boolean nullValue) throws Exception {
+ List<MessageId> messageIds = new ArrayList<>();
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.topic(topicName)
.enableBatching(false)
@@ -1621,14 +1623,15 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
for (int i = startIdx; i < (messages + startIdx); i++) {
if (nullValue) {
- producer.send(null);
+ messageIds.add(producer.send(null));
} else {
String message = "message-" + i;
- producer.send(message.getBytes());
+ messageIds.add(producer.send(message.getBytes()));
}
}
producer.close();
+ return messageIds;
}
@Test
@@ -2069,7 +2072,6 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
*/
@Test
public void testPersistentTopicsExpireMessages() throws Exception {
-
// Force to create a topic
publishMessagesOnPersistentTopic("persistent://prop-xyz/ns1/ds2", 0);
assertEquals(admin.topics().getList("prop-xyz/ns1"),
@@ -2089,21 +2091,21 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
assertEquals(admin.topics().getSubscriptions("persistent://prop-xyz/ns1/ds2").size(), 3);
- publishMessagesOnPersistentTopic("persistent://prop-xyz/ns1/ds2", 10);
+ List<MessageId> messageIds = publishMessagesOnPersistentTopic("persistent://prop-xyz/ns1/ds2", 10);
TopicStats topicStats = admin.topics().getStats("persistent://prop-xyz/ns1/ds2");
assertEquals(topicStats.subscriptions.get("my-sub1").msgBacklog, 10);
assertEquals(topicStats.subscriptions.get("my-sub2").msgBacklog, 10);
assertEquals(topicStats.subscriptions.get("my-sub3").msgBacklog, 10);
- Thread.sleep(1000); // wait for 1 seconds to expire message
+ Thread.sleep(1000);
admin.topics().expireMessages("persistent://prop-xyz/ns1/ds2", "my-sub1", 1);
- Thread.sleep(1000); // wait for 1 seconds to execute expire message as it is async
-
+ // Wait at most 2 seconds for sub1's message to expire.
+ Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> assertTrue(
+ admin.topics().getStats("persistent://prop-xyz/ns1/ds2").subscriptions.get("my-sub1").lastMarkDeleteAdvancedTimestamp > 0L));
topicStats = admin.topics().getStats("persistent://prop-xyz/ns1/ds2");
SubscriptionStats subStats1 = topicStats.subscriptions.get("my-sub1");
assertEquals(subStats1.msgBacklog, 0);
- assertTrue(subStats1.lastMarkDeleteAdvancedTimestamp > 0L);
SubscriptionStats subStats2 = topicStats.subscriptions.get("my-sub2");
assertEquals(subStats2.msgBacklog, 10);
assertEquals(subStats2.lastMarkDeleteAdvancedTimestamp, 0L);
@@ -2111,24 +2113,69 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
assertEquals(subStats3.msgBacklog, 10);
assertEquals(subStats3.lastMarkDeleteAdvancedTimestamp, 0L);
- admin.topics().expireMessagesForAllSubscriptions("persistent://prop-xyz/ns1/ds2", 1);
- Thread.sleep(1000); // wait for 1 seconds to execute expire message as it is async
+ admin.topics().expireMessages("persistent://prop-xyz/ns1/ds2", "my-sub2",
+ messageIds.get(4), false);
+ // Wait at most 2 seconds for sub2's message to expire.
+ Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> assertTrue(
+ admin.topics().getStats("persistent://prop-xyz/ns1/ds2").subscriptions.get("my-sub2").lastMarkDeleteAdvancedTimestamp > 0L));
+ topicStats = admin.topics().getStats("persistent://prop-xyz/ns1/ds2");
+ subStats1 = topicStats.subscriptions.get("my-sub1");
+ assertEquals(subStats1.msgBacklog, 0);
+ assertTrue(subStats1.lastMarkDeleteAdvancedTimestamp > 0L);
+ Long sub2lastMarkDeleteAdvancedTimestamp = subStats1.lastMarkDeleteAdvancedTimestamp;
+ subStats2 = topicStats.subscriptions.get("my-sub2");
+ assertEquals(subStats2.msgBacklog, 5);
+ subStats3 = topicStats.subscriptions.get("my-sub3");
+ assertEquals(subStats3.msgBacklog, 10);
+ assertEquals(subStats3.lastMarkDeleteAdvancedTimestamp, 0L);
+ admin.topics().expireMessagesForAllSubscriptions("persistent://prop-xyz/ns1/ds2", 1);
+ // Wait at most 2 seconds for sub3's message to expire.
+ Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> assertTrue(
+ admin.topics().getStats("persistent://prop-xyz/ns1/ds2").subscriptions.get("my-sub3").lastMarkDeleteAdvancedTimestamp > 0L));
topicStats = admin.topics().getStats("persistent://prop-xyz/ns1/ds2");
- SubscriptionStats newSubStats1 = topicStats.subscriptions.get("my-sub1");
- assertEquals(newSubStats1.msgBacklog, 0);
- assertEquals(newSubStats1.lastMarkDeleteAdvancedTimestamp, subStats1.lastMarkDeleteAdvancedTimestamp);
- SubscriptionStats newSubStats2 = topicStats.subscriptions.get("my-sub2");
- assertEquals(newSubStats2.msgBacklog, 0);
- assertTrue(newSubStats2.lastMarkDeleteAdvancedTimestamp > subStats2.lastMarkDeleteAdvancedTimestamp);
- SubscriptionStats newSubStats3 = topicStats.subscriptions.get("my-sub3");
- assertEquals(newSubStats3.msgBacklog, 0);
- assertTrue(newSubStats3.lastMarkDeleteAdvancedTimestamp > subStats3.lastMarkDeleteAdvancedTimestamp);
+ subStats1 = topicStats.subscriptions.get("my-sub1");
+ assertEquals(subStats1.msgBacklog, 0);
+ assertEquals(subStats1.lastMarkDeleteAdvancedTimestamp, subStats1.lastMarkDeleteAdvancedTimestamp);
+ // Wait at most 2 seconds for rest of sub2's message to expire.
+ subStats2 = topicStats.subscriptions.get("my-sub2");
+ assertEquals(subStats2.msgBacklog, 0);
+ assertTrue(subStats2.lastMarkDeleteAdvancedTimestamp > sub2lastMarkDeleteAdvancedTimestamp);
+ subStats3 = topicStats.subscriptions.get("my-sub3");
+ assertEquals(subStats3.msgBacklog, 0);
consumer1.close();
consumer2.close();
consumer3.close();
+ }
+ @Test
+ public void testPersistentTopicsExpireMessagesInvalidPartitionIndex() throws Exception {
+ // Force to create a topic
+ publishMessagesOnPersistentTopic("persistent://prop-xyz/ns1/ds2-partition-2", 0);
+ assertEquals(admin.topics().getList("prop-xyz/ns1"),
+ Lists.newArrayList("persistent://prop-xyz/ns1/ds2-partition-2"));
+
+ // create consumer and subscription
+ @Cleanup
+ PulsarClient client = PulsarClient.builder()
+ .serviceUrl(pulsar.getWebServiceAddress())
+ .statsInterval(0, TimeUnit.SECONDS)
+ .build();
+ ConsumerBuilder<byte[]> consumerBuilder = client.newConsumer()
+ .topic("persistent://prop-xyz/ns1/ds2-partition-2")
+ .subscriptionType(SubscriptionType.Shared);
+ @Cleanup
+ Consumer<byte[]> consumer = consumerBuilder.clone().subscriptionName("my-sub").subscribe();
+
+ assertEquals(admin.topics().getSubscriptions("persistent://prop-xyz/ns1/ds2-partition-2").size(), 1);
+ publishMessagesOnPersistentTopic("persistent://prop-xyz/ns1/ds2-partition-2", 10);
+ try {
+ admin.topics().expireMessages("persistent://prop-xyz/ns1/ds2-partition-2", "my-sub",
+ new MessageIdImpl(1, 1, 1), false);
+ } catch (Exception e) {
+ assertTrue(e.getMessage().contains("Invalid parameter for expire message by position"));
+ }
}
/**
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
index f7c38f0..d60b4dc 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
@@ -19,6 +19,13 @@
package org.apache.pulsar.broker.service;
import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.clearInvocations;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.spy;
+import static org.powermock.api.mockito.PowerMockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
@@ -29,6 +36,7 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.UnpooledByteBufAllocator;
import java.lang.reflect.Field;
+import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
@@ -51,6 +59,8 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.Ledge
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor;
import org.apache.pulsar.broker.service.persistent.PersistentMessageFinder;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
+import org.apache.pulsar.client.impl.ResetCursorData;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
import org.apache.pulsar.common.api.proto.MessageMetadata;
@@ -58,9 +68,13 @@ import org.apache.pulsar.common.intercept.BrokerEntryMetadataInterceptor;
import org.apache.pulsar.common.intercept.BrokerEntryMetadataUtils;
import org.apache.pulsar.common.protocol.ByteBufPair;
import org.apache.pulsar.common.protocol.Commands;
+import org.awaitility.Awaitility;
import org.testng.annotations.Test;
import org.testng.collections.Sets;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.core.MediaType;
+
/**
*/
public class PersistentMessageFinderTest extends MockedBookKeeperTestCase {
@@ -329,7 +343,7 @@ public class PersistentMessageFinderTest extends MockedBookKeeperTestCase {
* @throws Exception
*/
@Test
- void testMessageExpiryWithNonRecoverableException() throws Exception {
+ void testMessageExpiryWithTimestampNonRecoverableException() throws Exception {
final String ledgerAndCursorName = "testPersistentMessageExpiryWithNonRecoverableLedgers";
final int entriesPerLedger = 2;
@@ -380,4 +394,74 @@ public class PersistentMessageFinderTest extends MockedBookKeeperTestCase {
factory.shutdown();
}
+
+ @Test
+ void testMessageExpiryWithPosition() throws Exception {
+ final String ledgerAndCursorName = "testPersistentMessageExpiryWithPositionNonRecoverableLedgers";
+ final int entriesPerLedger = 5;
+ final int totalEntries = 30;
+ List<Position> positions = new ArrayList<>();
+
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setRetentionSizeInMB(10);
+ config.setMaxEntriesPerLedger(entriesPerLedger);
+ config.setRetentionTime(1, TimeUnit.HOURS);
+ config.setAutoSkipNonRecoverableData(true);
+ ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open(ledgerAndCursorName, config);
+ ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor(ledgerAndCursorName);
+
+ PersistentSubscription subscription = mock(PersistentSubscription.class);
+ Topic topic = mock(Topic.class);
+ when(subscription.getTopic()).thenReturn(topic);
+
+ for (int i = 0; i < totalEntries; i++) {
+ positions.add(ledger.addEntry(createMessageWrittenToLedger("msg" + i)));
+ }
+ when(topic.getLastPosition()).thenReturn(positions.get(positions.size() - 1));
+ for (Position p : positions) {
+ System.out.println(p);
+ }
+ PersistentMessageExpiryMonitor monitor = spy(new PersistentMessageExpiryMonitor("topicname",
+ cursor.getName(), cursor, subscription));
+ assertEquals((PositionImpl) cursor.getMarkDeletedPosition(), PositionImpl.get(positions.get(0).getLedgerId(), -1));
+
+ // Expire by position and verify mark delete position of cursor.
+ monitor.expireMessages(positions.get(15));
+ Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> verify(monitor, times(1)).findEntryComplete(any(), any()));
+ assertEquals((PositionImpl) cursor.getMarkDeletedPosition(), PositionImpl.get(positions.get(15).getLedgerId(), positions.get(15).getEntryId()));
+ clearInvocations(monitor);
+
+ // Expire by position beyond last position and nothing should happen.
+ monitor.expireMessages(PositionImpl.get(100, 100));
+ assertEquals((PositionImpl) cursor.getMarkDeletedPosition(), PositionImpl.get(positions.get(15).getLedgerId(), positions.get(15).getEntryId()));
+
+ // Expire by position again and verify mark delete position of cursor didn't change.
+ monitor.expireMessages(positions.get(15));
+ Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> verify(monitor, times(1)).findEntryComplete(any(), any()));
+ assertEquals((PositionImpl) cursor.getMarkDeletedPosition(), PositionImpl.get(positions.get(15).getLedgerId(), positions.get(15).getEntryId()));
+ clearInvocations(monitor);
+
+ // Expire by position before current mark delete position and verify mark delete position of cursor didn't change.
+ monitor.expireMessages(positions.get(10));
+ Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> verify(monitor, times(1)).findEntryComplete(any(), any()));
+ assertEquals((PositionImpl) cursor.getMarkDeletedPosition(), PositionImpl.get(positions.get(15).getLedgerId(), positions.get(15).getEntryId()));
+ clearInvocations(monitor);
+
+ // Expire by position after current mark delete position and verify mark delete position of cursor move to new position.
+ monitor.expireMessages(positions.get(16));
+ Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> verify(monitor, times(1)).findEntryComplete(any(), any()));
+ assertEquals((PositionImpl) cursor.getMarkDeletedPosition(), PositionImpl.get(positions.get(16).getLedgerId(), positions.get(16).getEntryId()));
+ clearInvocations(monitor);
+
+ cursor.close();
+ ledger.close();
+ factory.shutdown();
+ }
+
+ @Test
+ public void test() {
+ ResetCursorData resetCursorData = new ResetCursorData(1, 1);
+ resetCursorData.setExcluded(true);
+ System.out.println(Entity.entity(resetCursorData, MediaType.APPLICATION_JSON));
+ }
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
index 7c06837..b5e8251 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
@@ -42,6 +42,7 @@ import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AuthAction;
@@ -210,6 +211,7 @@ public class AuthorizationProducerConsumerTest extends ProducerConsumerBase {
tenantAdmin.topics().skipAllMessages(topicName, subscriptionName);
tenantAdmin.topics().skipMessages(topicName, subscriptionName, 1);
tenantAdmin.topics().expireMessages(topicName, subscriptionName, 10);
+ tenantAdmin.topics().expireMessages(topicName, subscriptionName, new MessageIdImpl(-1, -1, -1), true);
tenantAdmin.topics().peekMessages(topicName, subscriptionName, 1);
tenantAdmin.topics().resetCursor(topicName, subscriptionName, 10);
tenantAdmin.topics().resetCursor(topicName, subscriptionName, MessageId.earliest);
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
index 247b287..576f628 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -1183,6 +1183,40 @@ public interface Topics {
long expireTimeInSeconds);
/**
+ * Expire all messages older than given N (expireTimeInSeconds) seconds for a given subscription.
+ *
+ * @param topic
+ * topic name
+ * @param subscriptionName
+ * Subscription name
+ * @param messageId
+ * Position before which all messages will be expired.
+ * @param isExcluded
+ * Will message at passed in position also be expired.
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ void expireMessages(String topic, String subscriptionName, MessageId messageId, boolean isExcluded)
+ throws PulsarAdminException;
+
+ /**
+ * Expire all messages older than given N (expireTimeInSeconds) seconds for a given subscription asynchronously.
+ *
+ * @param topic
+ * topic name
+ * @param subscriptionName
+ * Subscription name
+ * @param messageId
+ * Position before which all messages will be expired.
+ * @param isExcluded
+ * Will message at passed in position also be expired.
+ * @return
+ * A {@link CompletableFuture} that'll be completed when expire message is done.
+ */
+ CompletableFuture<Void> expireMessagesAsync(String topic, String subscriptionName,
+ MessageId messageId, boolean isExcluded);
+
+ /**
* Expire all messages older than given N seconds for all subscriptions of the persistent-topic.
*
* @param topic
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 8de48ce..218c314 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -937,6 +937,33 @@ public class TopicsImpl extends BaseResource implements Topics {
}
@Override
+ public void expireMessages(String topic, String subscriptionName, MessageId messageId, boolean isExcluded)
+ throws PulsarAdminException {
+ try {
+ expireMessagesAsync(topic, subscriptionName, messageId, isExcluded)
+ .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> expireMessagesAsync(String topic, String subscriptionName,
+ MessageId messageId, boolean isExcluded) {
+ TopicName tn = validateTopic(topic);
+ String encodedSubName = Codec.encode(subscriptionName);
+ ResetCursorData resetCursorData = new ResetCursorData(messageId);
+ resetCursorData.setExcluded(isExcluded);
+ WebTarget path = topicPath(tn, "subscription", encodedSubName, "expireMessages");
+ return asyncPostRequest(path, Entity.entity(resetCursorData, MediaType.APPLICATION_JSON));
+ }
+
+ @Override
public void expireMessagesForAllSubscriptions(String topic, long expireTimeInSeconds) throws PulsarAdminException {
try {
expireMessagesForAllSubscriptionsAsync(topic, expireTimeInSeconds)
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 1b2478e..8c35e01 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -748,6 +748,11 @@ public class PulsarAdminToolTest {
cmdTopics.run(split("expire-messages persistent://myprop/clust/ns1/ds1 -s sub1 -t 100"));
verify(mockTopics).expireMessages("persistent://myprop/clust/ns1/ds1", "sub1", 100);
+ //cmd with option cannot be executed repeatedly.
+ cmdTopics = new CmdTopics(() -> admin);
+ cmdTopics.run(split("expire-messages persistent://myprop/clust/ns1/ds1 -s sub1 -p 1:1 -e"));
+ verify(mockTopics).expireMessages(eq("persistent://myprop/clust/ns1/ds1"), eq("sub1"), eq(new MessageIdImpl(1, 1, -1)), eq(true));
+
cmdTopics.run(split("expire-messages-all-subscriptions persistent://myprop/clust/ns1/ds1 -t 100"));
verify(mockTopics).expireMessagesForAllSubscriptions("persistent://myprop/clust/ns1/ds1", 100);
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CliCommand.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CliCommand.java
index ce48ef4..5113daa 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CliCommand.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CliCommand.java
@@ -104,10 +104,14 @@ abstract class CliCommand {
}
static MessageId validateMessageIdString(String resetMessageIdStr) throws PulsarAdminException {
+ return validateMessageIdString(resetMessageIdStr, -1);
+ }
+
+ static MessageId validateMessageIdString(String resetMessageIdStr, int partitionIndex) throws PulsarAdminException {
String[] messageId = resetMessageIdStr.split(":");
try {
Preconditions.checkArgument(messageId.length == 2);
- return new MessageIdImpl(Long.parseLong(messageId[0]), Long.parseLong(messageId[1]), -1);
+ return new MessageIdImpl(Long.parseLong(messageId[0]), Long.parseLong(messageId[1]), partitionIndex);
} catch (Exception e) {
throw new PulsarAdminException(
"Invalid message id (must be in format: ledgerId:entryId) value " + resetMessageIdStr);
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 9e6eef2..f3319b9 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -39,6 +39,8 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
+
+import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -47,6 +49,7 @@ import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.DispatchRate;
@@ -657,13 +660,35 @@ public class CmdTopics extends CmdBase {
"--subscription" }, description = "Subscription to be skip messages on", required = true)
private String subName;
- @Parameter(names = { "-t", "--expireTime" }, description = "Expire messages older than time in seconds", required = true)
- private long expireTimeInSeconds;
+ @Parameter(names = { "-t", "--expireTime" }, description = "Expire messages older than time in seconds")
+ private long expireTimeInSeconds = -1;
+
+ @Parameter(names = { "--position",
+ "-p" }, description = "message position to reset back to (ledgerId:entryId)", required = false)
+ private String messagePosition;
+
+ @Parameter(names = { "-e", "--exclude-reset-position" },
+ description = "Exclude the reset position, start consume messages from the next position.", required = false)
+ private boolean excludeResetPosition = false;
@Override
void run() throws PulsarAdminException {
+ if (expireTimeInSeconds >= 0 && isNotBlank(messagePosition)) {
+ throw new ParameterException(String.format("Can't expire message by time and " +
+ "by message position at the same time."));
+ }
String topic = validateTopicName(params);
- getTopics().expireMessages(topic, subName, expireTimeInSeconds);
+ if (expireTimeInSeconds >= 0) {
+ getTopics().expireMessages(topic, subName, expireTimeInSeconds);
+ } else if (isNotBlank(messagePosition)) {
+ int partitionIndex = TopicName.get(topic).getPartitionIndex();
+ MessageId messageId = validateMessageIdString(messagePosition, partitionIndex);
+ getTopics().expireMessages(topic, subName, messageId, excludeResetPosition);
+ } else {
+ throw new ParameterException(
+ "Either time (--expireTime) or message position (--position) has to be provided" +
+ " to expire messages");
+ }
}
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ResetCursorData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ResetCursorData.java
index 9311fa0..1bc7b7f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ResetCursorData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ResetCursorData.java
@@ -49,10 +49,12 @@ public class ResetCursorData {
this.ledgerId = batchMessageId.getLedgerId();
this.entryId = batchMessageId.getEntryId();
this.batchIndex = batchMessageId.getBatchIndex();
+ this.partitionIndex = batchMessageId.partitionIndex;
} else if (messageId instanceof MessageIdImpl) {
MessageIdImpl messageIdImpl = (MessageIdImpl) messageId;
this.ledgerId = messageIdImpl.getLedgerId();
this.entryId = messageIdImpl.getEntryId();
+ this.partitionIndex = messageIdImpl.partitionIndex;
} else if (messageId instanceof TopicMessageIdImpl) {
throw new IllegalArgumentException("Not supported operation on partitioned-topic");
}