You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/02/10 00:16:50 UTC

[pulsar] branch master updated: Expire message by position. (#9514)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 301d42c  Expire message by position. (#9514)
301d42c is described below

commit 301d42ce04ceb20797cc2a60267e6ec39ff05e95
Author: Marvin Cai <zx...@streamnative.io>
AuthorDate: Tue Feb 9 16:16:29 2021 -0800

    Expire message by position. (#9514)
    
    Fixes #2736
    
    ### Motivation
    Add ability to expire message for subscription by position to admin client and admin rest API.
    
    ### Modifications
    Update PersistentMessageExpiryMonitor to able to expire message by position and expose to admin client and admin rest api.
    
    ### Verifying this change
    This change added tests and can be verified as follows:
     - Added unit test for expire message by position in PersistentMessageFinderTest.
     - Added test for expire message by position in admin client/admin rest api.
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 258 ++++++++++++++-------
 .../pulsar/broker/admin/v1/PersistentTopics.java   |  41 +++-
 .../pulsar/broker/admin/v2/PersistentTopics.java   |  42 +++-
 .../apache/pulsar/broker/service/Subscription.java |   2 +
 .../nonpersistent/NonPersistentSubscription.java   |   6 +
 .../persistent/PersistentMessageExpiryMonitor.java |  27 +++
 .../service/persistent/PersistentReplicator.java   |  12 +
 .../service/persistent/PersistentSubscription.java |   5 +
 .../apache/pulsar/broker/admin/AdminApiTest.java   |  95 ++++++--
 .../service/PersistentMessageFinderTest.java       |  86 ++++++-
 .../api/AuthorizationProducerConsumerTest.java     |   2 +
 .../org/apache/pulsar/client/admin/Topics.java     |  34 +++
 .../pulsar/client/admin/internal/TopicsImpl.java   |  27 +++
 .../pulsar/admin/cli/PulsarAdminToolTest.java      |   5 +
 .../org/apache/pulsar/admin/cli/CliCommand.java    |   6 +-
 .../org/apache/pulsar/admin/cli/CmdTopics.java     |  31 ++-
 .../apache/pulsar/client/impl/ResetCursorData.java |   2 +
 17 files changed, 567 insertions(+), 114 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 82a0e5c..2f01b04 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -1838,7 +1838,7 @@ public class PersistentTopicsBase extends AdminResource {
 
         topic.getReplicators().forEach((subName, replicator) -> {
             try {
-                internalExpireMessagesForSinglePartition(subName, expireTimeInSeconds, authoritative);
+                internalExpireMessagesByTimestampForSinglePartition(subName, expireTimeInSeconds, authoritative);
             } catch (Throwable t) {
                 exception.set(t);
             }
@@ -1846,7 +1846,7 @@ public class PersistentTopicsBase extends AdminResource {
 
         topic.getSubscriptions().forEach((subName, subscriber) -> {
             try {
-                internalExpireMessagesForSinglePartition(subName, expireTimeInSeconds, authoritative);
+                internalExpireMessagesByTimestampForSinglePartition(subName, expireTimeInSeconds, authoritative);
             } catch (Throwable t) {
                 exception.set(t);
             }
@@ -2198,82 +2198,9 @@ public class PersistentTopicsBase extends AdminResource {
                     return;
                 }
                 CompletableFuture<Integer> batchSizeFuture = new CompletableFuture<>();
-                if (batchIndex >= 0) {
-                    try {
-                        ManagedLedgerImpl ledger = (ManagedLedgerImpl) topic.getManagedLedger();
-                        ledger.asyncReadEntry(new PositionImpl(messageId.getLedgerId(),
-                                messageId.getEntryId()), new AsyncCallbacks.ReadEntryCallback() {
-                            @Override
-                            public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
-                                // Since we can't read the message from the storage layer,
-                                // it might be an already delete message ID or an invalid message ID
-                                // We should fall back to non batch index seek.
-                                batchSizeFuture.complete(0);
-                            }
-
-                            @Override
-                            public void readEntryComplete(Entry entry, Object ctx) {
-                                try {
-                                    try {
-                                        if (entry == null) {
-                                            batchSizeFuture.complete(0);
-                                        } else {
-                                            MessageMetadata metadata =
-                                                    Commands.parseMessageMetadata(entry.getDataBuffer());
-                                            batchSizeFuture.complete(metadata.getNumMessagesInBatch());
-                                        }
-                                    } catch (Exception e) {
-                                        batchSizeFuture.completeExceptionally(new RestException(e));
-                                    }
-                                } finally {
-                                    if (entry != null) {
-                                        entry.release();
-                                    }
-                                }
-                            }
-                        }, null);
-                    } catch (NullPointerException npe) {
-                        batchSizeFuture.completeExceptionally(new RestException(Status.NOT_FOUND, "Message not found"));
-                    } catch (Exception exception) {
-                        log.error("[{}] Failed to get message with ledgerId {} entryId {} from {}",
-                                clientAppId(), messageId.getLedgerId(), messageId.getEntryId(), topicName, exception);
-                        batchSizeFuture.completeExceptionally(new RestException(exception));
-                    }
-                } else {
-                    batchSizeFuture.complete(0);
-                }
+                getEntryBatchSize(batchSizeFuture, topic, messageId, batchIndex);
                 batchSizeFuture.thenAccept(bi -> {
-                    PositionImpl seekPosition;
-                    if (bi > 0) {
-                        long[] ackSet;
-                        BitSetRecyclable bitSet = BitSetRecyclable.create();
-                        bitSet.set(0, bi);
-                        if (isExcluded) {
-                            bitSet.clear(0, Math.max(batchIndex + 1, 0));
-                            if (bitSet.length() > 0) {
-                                ackSet = bitSet.toLongArray();
-                                seekPosition = PositionImpl.get(messageId.getLedgerId(),
-                                        messageId.getEntryId(), ackSet);
-                            } else {
-                                seekPosition = PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId());
-                                seekPosition = seekPosition.getNext();
-                            }
-                        } else {
-                            if (batchIndex - 1 >= 0) {
-                                bitSet.clear(0, batchIndex);
-                                ackSet = bitSet.toLongArray();
-                                seekPosition = PositionImpl.get(messageId.getLedgerId(),
-                                        messageId.getEntryId(), ackSet);
-                            } else {
-                                seekPosition = PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId());
-                            }
-                        }
-                        bitSet.recycle();
-                    } else {
-                        seekPosition = PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId());
-                        seekPosition = isExcluded ? seekPosition.getNext() : seekPosition;
-                    }
-
+                    PositionImpl seekPosition = calculatePositionAckSet(isExcluded, bi, batchIndex, messageId);
                     sub.resetCursor(seekPosition).thenRun(() -> {
                         log.info("[{}][{}] successfully reset cursor on subscription {} to position {}", clientAppId(),
                                 topicName, subName, messageId);
@@ -2305,6 +2232,89 @@ public class PersistentTopicsBase extends AdminResource {
         }
     }
 
+    private void getEntryBatchSize(CompletableFuture<Integer> batchSizeFuture, PersistentTopic topic,
+                                   MessageIdImpl messageId, int batchIndex) {
+        if (batchIndex >= 0) {
+            try {
+                ManagedLedgerImpl ledger = (ManagedLedgerImpl) topic.getManagedLedger();
+                ledger.asyncReadEntry(new PositionImpl(messageId.getLedgerId(),
+                        messageId.getEntryId()), new AsyncCallbacks.ReadEntryCallback() {
+                    @Override
+                    public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
+                        // Since we can't read the message from the storage layer,
+                        // it might be an already delete message ID or an invalid message ID
+                        // We should fall back to non batch index seek.
+                        batchSizeFuture.complete(0);
+                    }
+
+                    @Override
+                    public void readEntryComplete(Entry entry, Object ctx) {
+                        try {
+                            try {
+                                if (entry == null) {
+                                    batchSizeFuture.complete(0);
+                                } else {
+                                    MessageMetadata metadata =
+                                            Commands.parseMessageMetadata(entry.getDataBuffer());
+                                    batchSizeFuture.complete(metadata.getNumMessagesInBatch());
+                                }
+                            } catch (Exception e) {
+                                batchSizeFuture.completeExceptionally(new RestException(e));
+                            }
+                        } finally {
+                            if (entry != null) {
+                                entry.release();
+                            }
+                        }
+                    }
+                }, null);
+            } catch (NullPointerException npe) {
+                batchSizeFuture.completeExceptionally(new RestException(Status.NOT_FOUND, "Message not found"));
+            } catch (Exception exception) {
+                log.error("[{}] Failed to get message with ledgerId {} entryId {} from {}",
+                        clientAppId(), messageId.getLedgerId(), messageId.getEntryId(), topicName, exception);
+                batchSizeFuture.completeExceptionally(new RestException(exception));
+            }
+        } else {
+            batchSizeFuture.complete(0);
+        }
+    }
+
+    private PositionImpl calculatePositionAckSet(boolean isExcluded, int batchSize,
+                                                 int batchIndex, MessageIdImpl messageId) {
+        PositionImpl seekPosition;
+        if (batchSize > 0) {
+            long[] ackSet;
+            BitSetRecyclable bitSet = BitSetRecyclable.create();
+            bitSet.set(0, batchSize);
+            if (isExcluded) {
+                bitSet.clear(0, Math.max(batchIndex + 1, 0));
+                if (bitSet.length() > 0) {
+                    ackSet = bitSet.toLongArray();
+                    seekPosition = PositionImpl.get(messageId.getLedgerId(),
+                            messageId.getEntryId(), ackSet);
+                } else {
+                    seekPosition = PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId());
+                    seekPosition = seekPosition.getNext();
+                }
+            } else {
+                if (batchIndex - 1 >= 0) {
+                    bitSet.clear(0, batchIndex);
+                    ackSet = bitSet.toLongArray();
+                    seekPosition = PositionImpl.get(messageId.getLedgerId(),
+                            messageId.getEntryId(), ackSet);
+                } else {
+                    seekPosition = PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId());
+                }
+            }
+            bitSet.recycle();
+        } else {
+            seekPosition = PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId());
+            seekPosition = isExcluded ? seekPosition.getNext() : seekPosition;
+        }
+        return seekPosition;
+    }
+
     protected void internalGetMessageById(AsyncResponse asyncResponse, long ledgerId, long entryId,
                                               boolean authoritative) {
         try {
@@ -2931,16 +2941,15 @@ public class PersistentTopicsBase extends AdminResource {
         }
     }
 
-
-    protected void internalExpireMessages(AsyncResponse asyncResponse, String subName, int expireTimeInSeconds,
-            boolean authoritative) {
+    protected void internalExpireMessagesByTimestamp(AsyncResponse asyncResponse, String subName,
+                                                     int expireTimeInSeconds, boolean authoritative) {
         if (topicName.isGlobal()) {
             validateGlobalNamespaceOwnership(namespaceName);
         }
         // If the topic name is a partition name, no need to get partition topic metadata again
         if (topicName.isPartitioned()) {
             try {
-                internalExpireMessagesForSinglePartition(subName, expireTimeInSeconds, authoritative);
+                internalExpireMessagesByTimestampForSinglePartition(subName, expireTimeInSeconds, authoritative);
             } catch (WebApplicationException wae) {
                 asyncResponse.resume(wae);
                 return;
@@ -2991,7 +3000,7 @@ public class PersistentTopicsBase extends AdminResource {
                 });
             } else {
                 try {
-                    internalExpireMessagesForSinglePartition(subName, expireTimeInSeconds, authoritative);
+                    internalExpireMessagesByTimestampForSinglePartition(subName, expireTimeInSeconds, authoritative);
                 } catch (WebApplicationException wae) {
                     asyncResponse.resume(wae);
                     return;
@@ -3004,7 +3013,7 @@ public class PersistentTopicsBase extends AdminResource {
         }
     }
 
-    private void internalExpireMessagesForSinglePartition(String subName, int expireTimeInSeconds,
+    private void internalExpireMessagesByTimestampForSinglePartition(String subName, int expireTimeInSeconds,
             boolean authoritative) {
         if (topicName.isGlobal()) {
             validateGlobalNamespaceOwnership(namespaceName);
@@ -3048,6 +3057,89 @@ public class PersistentTopicsBase extends AdminResource {
         }
     }
 
+    protected void internalExpireMessagesByPosition(AsyncResponse asyncResponse, String subName, boolean authoritative,
+                                                 MessageIdImpl messageId, boolean isExcluded, int batchIndex) {
+        if (topicName.isGlobal()) {
+            try {
+                validateGlobalNamespaceOwnership(namespaceName);
+            } catch (Exception e) {
+                log.warn("[{}][{}] Failed to expire messages on subscription {} to position {}: {}", clientAppId(),
+                        topicName, subName, messageId, e.getMessage());
+                resumeAsyncResponseExceptionally(asyncResponse, e);
+                return;
+            }
+        }
+
+        log.info("[{}][{}] received expire messages on subscription {} to position {}", clientAppId(), topicName,
+                subName, messageId);
+
+        // If the topic name is a partition name, no need to get partition topic metadata again
+        if (!topicName.isPartitioned() && getPartitionedTopicMetadata(topicName, authoritative, false).partitions > 0) {
+            log.warn("[{}] Not supported operation expire message up to {} on partitioned-topic {} {}",
+                    clientAppId(), messageId, topicName, subName);
+            asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
+                    "Expire message at position is not supported for partitioned-topic"));
+            return;
+        } else if (messageId.getPartitionIndex() != topicName.getPartitionIndex()) {
+            log.warn("[{}] Invalid parameter for expire message by position, partition index of passed in message"
+                            + " position {} doesn't match partition index of topic requested {}.",
+                    clientAppId(), messageId, topicName);
+            asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
+                    "Invalid parameter for expire message by position, partition index of message position "
+                            + "passed in doesn't match partition index for the topic."));
+        } else {
+            validateAdminAccessForSubscriber(subName, authoritative);
+            validateReadOperationOnTopic(authoritative);
+            PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
+            if (topic == null) {
+                asyncResponse.resume(new RestException(Status.NOT_FOUND, "Topic not found"));
+                return;
+            }
+            try {
+                PersistentSubscription sub = topic.getSubscription(subName);
+                if (sub == null) {
+                    asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found"));
+                    return;
+                }
+                CompletableFuture<Integer> batchSizeFuture = new CompletableFuture<>();
+                getEntryBatchSize(batchSizeFuture, topic, messageId, batchIndex);
+                batchSizeFuture.thenAccept(bi -> {
+                    PositionImpl position = calculatePositionAckSet(isExcluded, bi, batchIndex, messageId);
+                    try {
+                        if (subName.startsWith(topic.getReplicatorPrefix())) {
+                            String remoteCluster = PersistentReplicator.getRemoteCluster(subName);
+                            PersistentReplicator repl = (PersistentReplicator)
+                                    topic.getPersistentReplicator(remoteCluster);
+                            checkNotNull(repl);
+                            repl.expireMessages(position);
+                        } else {
+                            checkNotNull(sub);
+                            sub.expireMessages(position);
+                        }
+                        log.info("[{}] Message expire issued up to {} on {} {}", clientAppId(), position, topicName,
+                                subName);
+                    } catch (NullPointerException npe) {
+                        throw new RestException(Status.NOT_FOUND, "Subscription not found");
+                    } catch (Exception exception) {
+                        log.error("[{}] Failed to expire messages up to {} on {} with subscription {} {}",
+                                clientAppId(), position, topicName, subName, exception);
+                        throw new RestException(exception);
+                    }
+                }).exceptionally(e -> {
+                    log.error("[{}] Failed to expire messages up to {} on {} with subscription {} {}", clientAppId(),
+                            messageId, topicName, subName, e);
+                    asyncResponse.resume(e);
+                    return null;
+                });
+            } catch (Exception e) {
+                log.warn("[{}][{}] Failed to expire messages up to {} on subscription {} to position {}",
+                        clientAppId(), topicName, messageId, subName, messageId, e);
+                resumeAsyncResponseExceptionally(asyncResponse, e);
+            }
+        }
+        asyncResponse.resume(Response.noContent().build());
+    }
+
     protected void internalTriggerCompaction(AsyncResponse asyncResponse, boolean authoritative) {
         log.info("[{}] Trigger compaction on topic {}", clientAppId(), topicName);
         try {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index 78392c2..46708dd 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -462,7 +462,46 @@ public class PersistentTopics extends PersistentTopicsBase {
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         try {
             validateTopicName(property, cluster, namespace, encodedTopic);
-            internalExpireMessages(asyncResponse, decode(encodedSubName), expireTimeInSeconds, authoritative);
+            internalExpireMessagesByTimestamp(asyncResponse, decode(encodedSubName),
+                    expireTimeInSeconds, authoritative);
+        } catch (WebApplicationException wae) {
+            asyncResponse.resume(wae);
+        } catch (Exception e) {
+            asyncResponse.resume(new RestException(e));
+        }
+    }
+
+    @POST
+    @Path("/{property}/{cluster}/{namespace}/{topic}/subscription/{subName}/expireMessages")
+    @ApiOperation(value = "Expiry messages on a topic subscription.")
+    @ApiResponses(value = {
+            @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
+            @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or"
+                    + "subscriber is not authorized to access this operation"),
+            @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Topic or subscription does not exist"),
+            @ApiResponse(code = 405, message = "Expiry messages on a non-persistent topic is not allowed"),
+            @ApiResponse(code = 500, message = "Internal server error"),
+            @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")})
+    public void expireTopicMessages(
+            @Suspended final AsyncResponse asyncResponse,
+            @PathParam("property") String property, @PathParam("cluster") String cluster,
+            @ApiParam(value = "Specify the namespace", required = true)
+            @PathParam("namespace") String namespace,
+            @ApiParam(value = "Specify topic name", required = true)
+            @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Subscription to be Expiry messages on")
+            @PathParam("subName") String encodedSubName,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
+            @ApiParam(name = "messageId", value = "messageId to reset back to (ledgerId:entryId)")
+                    ResetCursorData resetCursorData) {
+        try {
+            validateTopicName(property, cluster, namespace, encodedTopic);
+            internalExpireMessagesByPosition(asyncResponse, decode(encodedSubName), authoritative,
+                    new MessageIdImpl(resetCursorData.getLedgerId(),
+                            resetCursorData.getEntryId(), resetCursorData.getPartitionIndex())
+                    , resetCursorData.isExcluded(), resetCursorData.getBatchIndex());
         } catch (WebApplicationException wae) {
             asyncResponse.resume(wae);
         } catch (Exception e) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index a569339..851e265 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -1197,7 +1197,47 @@ public class PersistentTopics extends PersistentTopicsBase {
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         try {
             validateTopicName(tenant, namespace, encodedTopic);
-            internalExpireMessages(asyncResponse, decode(encodedSubName), expireTimeInSeconds, authoritative);
+            internalExpireMessagesByTimestamp(asyncResponse, decode(encodedSubName),
+                    expireTimeInSeconds, authoritative);
+        } catch (WebApplicationException wae) {
+            asyncResponse.resume(wae);
+        } catch (Exception e) {
+            asyncResponse.resume(new RestException(e));
+        }
+    }
+
+    @POST
+    @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/expireMessages")
+    @ApiOperation(value = "Expiry messages on a topic subscription.")
+    @ApiResponses(value = {
+            @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
+            @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or"
+                    + "subscriber is not authorized to access this operation"),
+            @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Topic or subscription does not exist"),
+            @ApiResponse(code = 405, message = "Expiry messages on a non-persistent topic is not allowed"),
+            @ApiResponse(code = 500, message = "Internal server error"),
+            @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")})
+    public void expireTopicMessages(
+            @Suspended final AsyncResponse asyncResponse,
+            @ApiParam(value = "Specify the tenant", required = true)
+            @PathParam("tenant") String tenant,
+            @ApiParam(value = "Specify the namespace", required = true)
+            @PathParam("namespace") String namespace,
+            @ApiParam(value = "Specify topic name", required = true)
+            @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Subscription to be Expiry messages on")
+            @PathParam("subName") String encodedSubName,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
+            @ApiParam(name = "messageId", value = "messageId to reset back to (ledgerId:entryId)")
+            ResetCursorData resetCursorData) {
+        try {
+            validateTopicName(tenant, namespace, encodedTopic);
+            internalExpireMessagesByPosition(asyncResponse, decode(encodedSubName), authoritative,
+            new MessageIdImpl(resetCursorData.getLedgerId(),
+                    resetCursorData.getEntryId(), resetCursorData.getPartitionIndex())
+            , resetCursorData.isExcluded(), resetCursorData.getBatchIndex());
         } catch (WebApplicationException wae) {
             asyncResponse.resume(wae);
         } catch (Exception e) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
index 05b2d28..95a470d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
@@ -85,6 +85,8 @@ public interface Subscription {
 
     void expireMessages(int messageTTLInSeconds);
 
+    void expireMessages(Position position);
+
     void redeliverUnacknowledgedMessages(Consumer consumer);
 
     void redeliverUnacknowledgedMessages(Consumer consumer, List<PositionImpl> positions);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
index 79edaab..c347d36 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
@@ -431,6 +431,12 @@ public class NonPersistentSubscription implements Subscription {
         // No-op
     }
 
+    @Override
+    public void expireMessages(Position position) {
+        throw new UnsupportedOperationException("Expire message by position is not supported for"
+                + " non-persistent topic.");
+    }
+
     public NonPersistentSubscriptionStats getStats() {
         NonPersistentSubscriptionStats subStats = new NonPersistentSubscriptionStats();
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
index b66c66d..29848e3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
@@ -28,6 +28,7 @@ import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.ManagedLedgerException.NonRecoverableLedgerException;
 import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
 import org.apache.pulsar.common.stats.Rate;
@@ -94,6 +95,32 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback {
         }
     }
 
+    public void expireMessages(Position messagePosition) {
+        // If it's beyond last position of this topic, do nothing.
+        if (((PositionImpl) subscription.getTopic().getLastPosition()).compareTo((PositionImpl) messagePosition) < 0) {
+            return;
+        }
+        if (expirationCheckInProgressUpdater.compareAndSet(this, FALSE, TRUE)) {
+            log.info("[{}][{}] Starting message expiry check, position= {} seconds", topicName, subName,
+                    messagePosition);
+
+            cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries, entry -> {
+                try {
+                    // If given position larger than entry position.
+                    return ((PositionImpl) entry.getPosition()).compareTo((PositionImpl) messagePosition) <= 0;
+                } finally {
+                    entry.release();
+                }
+            }, this, null);
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}][{}] Ignore expire-message scheduled task, last check is still running", topicName,
+                        subName);
+            }
+        }
+    }
+
+
     public void updateRates() {
         msgExpired.calculateRate();
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index cc28af7..52c1957 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -727,6 +727,18 @@ public class PersistentReplicator extends AbstractReplicator
         }
     }
 
+    public void expireMessages(Position position) {
+        if ((cursor.getNumberOfEntriesInBacklog(false) == 0)
+                || (cursor.getNumberOfEntriesInBacklog(false) < MINIMUM_BACKLOG_FOR_EXPIRY_CHECK
+                && !topic.isOldestMessageExpired(cursor, messageTTLInSeconds))) {
+            // don't do anything for almost caught-up connected subscriptions
+            return;
+        }
+        if (expiryMonitor != null) {
+            expiryMonitor.expireMessages(messageTTLInSeconds);
+        }
+    }
+
     @Override
     public Optional<DispatchRateLimiter> getRateLimiter() {
         return dispatchRateLimiter;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 2df6b4c..a666edb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -904,6 +904,11 @@ public class PersistentSubscription implements Subscription {
         expiryMonitor.expireMessages(messageTTLInSeconds);
     }
 
+    @Override
+    public void expireMessages(Position position) {
+        expiryMonitor.expireMessages(position);
+    }
+
     public double getExpiredMessageRate() {
         return expiryMonitor.getMessageExpiryRate();
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index cede9b7..ac8fdab 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -123,6 +123,7 @@ import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.common.util.Codec;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.compaction.Compactor;
+import org.awaitility.Awaitility;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -1603,16 +1604,17 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
     long messageTimestamp = System.currentTimeMillis();
     long secondTimestamp = System.currentTimeMillis();
 
-    private void publishMessagesOnPersistentTopic(String topicName, int messages) throws Exception {
-        publishMessagesOnPersistentTopic(topicName, messages, 0, false);
+    private List<MessageId> publishMessagesOnPersistentTopic(String topicName, int messages) throws Exception {
+        return publishMessagesOnPersistentTopic(topicName, messages, 0, false);
     }
 
-    private void publishNullValueMessageOnPersistentTopic(String topicName, int messages) throws Exception {
-        publishMessagesOnPersistentTopic(topicName, messages, 0, true);
+    private List<MessageId> publishNullValueMessageOnPersistentTopic(String topicName, int messages) throws Exception {
+        return publishMessagesOnPersistentTopic(topicName, messages, 0, true);
     }
 
-    private void publishMessagesOnPersistentTopic(String topicName, int messages, int startIdx,
+    private List<MessageId> publishMessagesOnPersistentTopic(String topicName, int messages, int startIdx,
                                                   boolean nullValue) throws Exception {
+        List<MessageId> messageIds = new ArrayList<>();
         Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
             .topic(topicName)
             .enableBatching(false)
@@ -1621,14 +1623,15 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
 
         for (int i = startIdx; i < (messages + startIdx); i++) {
             if (nullValue) {
-                producer.send(null);
+                messageIds.add(producer.send(null));
             } else {
                 String message = "message-" + i;
-                producer.send(message.getBytes());
+                messageIds.add(producer.send(message.getBytes()));
             }
         }
 
         producer.close();
+        return messageIds;
     }
 
     @Test
@@ -2069,7 +2072,6 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
      */
     @Test
     public void testPersistentTopicsExpireMessages() throws Exception {
-
         // Force to create a topic
         publishMessagesOnPersistentTopic("persistent://prop-xyz/ns1/ds2", 0);
         assertEquals(admin.topics().getList("prop-xyz/ns1"),
@@ -2089,21 +2091,21 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
 
         assertEquals(admin.topics().getSubscriptions("persistent://prop-xyz/ns1/ds2").size(), 3);
 
-        publishMessagesOnPersistentTopic("persistent://prop-xyz/ns1/ds2", 10);
+        List<MessageId> messageIds = publishMessagesOnPersistentTopic("persistent://prop-xyz/ns1/ds2", 10);
 
         TopicStats topicStats = admin.topics().getStats("persistent://prop-xyz/ns1/ds2");
         assertEquals(topicStats.subscriptions.get("my-sub1").msgBacklog, 10);
         assertEquals(topicStats.subscriptions.get("my-sub2").msgBacklog, 10);
         assertEquals(topicStats.subscriptions.get("my-sub3").msgBacklog, 10);
 
-        Thread.sleep(1000); // wait for 1 seconds to expire message
+        Thread.sleep(1000);
         admin.topics().expireMessages("persistent://prop-xyz/ns1/ds2", "my-sub1", 1);
-        Thread.sleep(1000); // wait for 1 seconds to execute expire message as it is async
-
+        // Wait at most 2 seconds for sub1's message to expire.
+        Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> assertTrue(
+                admin.topics().getStats("persistent://prop-xyz/ns1/ds2").subscriptions.get("my-sub1").lastMarkDeleteAdvancedTimestamp > 0L));
         topicStats = admin.topics().getStats("persistent://prop-xyz/ns1/ds2");
         SubscriptionStats subStats1 = topicStats.subscriptions.get("my-sub1");
         assertEquals(subStats1.msgBacklog, 0);
-        assertTrue(subStats1.lastMarkDeleteAdvancedTimestamp > 0L);
         SubscriptionStats subStats2 = topicStats.subscriptions.get("my-sub2");
         assertEquals(subStats2.msgBacklog, 10);
         assertEquals(subStats2.lastMarkDeleteAdvancedTimestamp, 0L);
@@ -2111,24 +2113,69 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
         assertEquals(subStats3.msgBacklog, 10);
         assertEquals(subStats3.lastMarkDeleteAdvancedTimestamp, 0L);
 
-        admin.topics().expireMessagesForAllSubscriptions("persistent://prop-xyz/ns1/ds2", 1);
-        Thread.sleep(1000); // wait for 1 seconds to execute expire message as it is async
+        admin.topics().expireMessages("persistent://prop-xyz/ns1/ds2", "my-sub2",
+                messageIds.get(4), false);
+        // Wait at most 2 seconds for sub2's message to expire.
+        Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> assertTrue(
+                admin.topics().getStats("persistent://prop-xyz/ns1/ds2").subscriptions.get("my-sub2").lastMarkDeleteAdvancedTimestamp > 0L));
+        topicStats = admin.topics().getStats("persistent://prop-xyz/ns1/ds2");
+        subStats1 = topicStats.subscriptions.get("my-sub1");
+        assertEquals(subStats1.msgBacklog, 0);
+        assertTrue(subStats1.lastMarkDeleteAdvancedTimestamp > 0L);
+        Long sub2lastMarkDeleteAdvancedTimestamp = subStats1.lastMarkDeleteAdvancedTimestamp;
+        subStats2 = topicStats.subscriptions.get("my-sub2");
+        assertEquals(subStats2.msgBacklog, 5);
+        subStats3 = topicStats.subscriptions.get("my-sub3");
+        assertEquals(subStats3.msgBacklog, 10);
+        assertEquals(subStats3.lastMarkDeleteAdvancedTimestamp, 0L);
 
+        admin.topics().expireMessagesForAllSubscriptions("persistent://prop-xyz/ns1/ds2", 1);
+        // Wait at most 2 seconds for sub3's message to expire.
+        Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> assertTrue(
+                admin.topics().getStats("persistent://prop-xyz/ns1/ds2").subscriptions.get("my-sub3").lastMarkDeleteAdvancedTimestamp > 0L));
         topicStats = admin.topics().getStats("persistent://prop-xyz/ns1/ds2");
-        SubscriptionStats newSubStats1 = topicStats.subscriptions.get("my-sub1");
-        assertEquals(newSubStats1.msgBacklog, 0);
-        assertEquals(newSubStats1.lastMarkDeleteAdvancedTimestamp, subStats1.lastMarkDeleteAdvancedTimestamp);
-        SubscriptionStats newSubStats2 = topicStats.subscriptions.get("my-sub2");
-        assertEquals(newSubStats2.msgBacklog, 0);
-        assertTrue(newSubStats2.lastMarkDeleteAdvancedTimestamp > subStats2.lastMarkDeleteAdvancedTimestamp);
-        SubscriptionStats newSubStats3 = topicStats.subscriptions.get("my-sub3");
-        assertEquals(newSubStats3.msgBacklog, 0);
-        assertTrue(newSubStats3.lastMarkDeleteAdvancedTimestamp > subStats3.lastMarkDeleteAdvancedTimestamp);
+        subStats1 = topicStats.subscriptions.get("my-sub1");
+        assertEquals(subStats1.msgBacklog, 0);
+        assertEquals(subStats1.lastMarkDeleteAdvancedTimestamp, subStats1.lastMarkDeleteAdvancedTimestamp);
+        // Wait at most 2 seconds for rest of sub2's message to expire.
+        subStats2 = topicStats.subscriptions.get("my-sub2");
+        assertEquals(subStats2.msgBacklog, 0);
+        assertTrue(subStats2.lastMarkDeleteAdvancedTimestamp > sub2lastMarkDeleteAdvancedTimestamp);
+        subStats3 = topicStats.subscriptions.get("my-sub3");
+        assertEquals(subStats3.msgBacklog, 0);
 
         consumer1.close();
         consumer2.close();
         consumer3.close();
+    }
 
+    @Test
+    public void testPersistentTopicsExpireMessagesInvalidPartitionIndex() throws Exception {
+        // Force to create a topic
+        publishMessagesOnPersistentTopic("persistent://prop-xyz/ns1/ds2-partition-2", 0);
+        assertEquals(admin.topics().getList("prop-xyz/ns1"),
+                Lists.newArrayList("persistent://prop-xyz/ns1/ds2-partition-2"));
+
+        // create consumer and subscription
+        @Cleanup
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsar.getWebServiceAddress())
+                .statsInterval(0, TimeUnit.SECONDS)
+                .build();
+        ConsumerBuilder<byte[]> consumerBuilder = client.newConsumer()
+                .topic("persistent://prop-xyz/ns1/ds2-partition-2")
+                .subscriptionType(SubscriptionType.Shared);
+        @Cleanup
+        Consumer<byte[]> consumer = consumerBuilder.clone().subscriptionName("my-sub").subscribe();
+
+        assertEquals(admin.topics().getSubscriptions("persistent://prop-xyz/ns1/ds2-partition-2").size(), 1);
+        publishMessagesOnPersistentTopic("persistent://prop-xyz/ns1/ds2-partition-2", 10);
+        try {
+            admin.topics().expireMessages("persistent://prop-xyz/ns1/ds2-partition-2", "my-sub",
+                    new MessageIdImpl(1, 1, 1), false);
+        } catch (Exception e) {
+            assertTrue(e.getMessage().contains("Invalid parameter for expire message by position"));
+        }
     }
 
     /**
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
index f7c38f0..d60b4dc 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
@@ -19,6 +19,13 @@
 package org.apache.pulsar.broker.service;
 
 import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.clearInvocations;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.spy;
+import static org.powermock.api.mockito.PowerMockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertNotNull;
@@ -29,6 +36,7 @@ import io.netty.buffer.ByteBuf;
 import io.netty.buffer.UnpooledByteBufAllocator;
 
 import java.lang.reflect.Field;
+import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Optional;
@@ -51,6 +59,8 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.Ledge
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
 import org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor;
 import org.apache.pulsar.broker.service.persistent.PersistentMessageFinder;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
+import org.apache.pulsar.client.impl.ResetCursorData;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
@@ -58,9 +68,13 @@ import org.apache.pulsar.common.intercept.BrokerEntryMetadataInterceptor;
 import org.apache.pulsar.common.intercept.BrokerEntryMetadataUtils;
 import org.apache.pulsar.common.protocol.ByteBufPair;
 import org.apache.pulsar.common.protocol.Commands;
+import org.awaitility.Awaitility;
 import org.testng.annotations.Test;
 import org.testng.collections.Sets;
 
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.core.MediaType;
+
 /**
  */
 public class PersistentMessageFinderTest extends MockedBookKeeperTestCase {
@@ -329,7 +343,7 @@ public class PersistentMessageFinderTest extends MockedBookKeeperTestCase {
      * @throws Exception
      */
     @Test
-    void testMessageExpiryWithNonRecoverableException() throws Exception {
+    void testMessageExpiryWithTimestampNonRecoverableException() throws Exception {
 
         final String ledgerAndCursorName = "testPersistentMessageExpiryWithNonRecoverableLedgers";
         final int entriesPerLedger = 2;
@@ -380,4 +394,74 @@ public class PersistentMessageFinderTest extends MockedBookKeeperTestCase {
         factory.shutdown();
 
     }
+
+    @Test
+    void testMessageExpiryWithPosition() throws Exception {
+        final String ledgerAndCursorName = "testPersistentMessageExpiryWithPositionNonRecoverableLedgers";
+        final int entriesPerLedger = 5;
+        final int totalEntries = 30;
+        List<Position> positions = new ArrayList<>();
+
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setRetentionSizeInMB(10);
+        config.setMaxEntriesPerLedger(entriesPerLedger);
+        config.setRetentionTime(1, TimeUnit.HOURS);
+        config.setAutoSkipNonRecoverableData(true);
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open(ledgerAndCursorName, config);
+        ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor(ledgerAndCursorName);
+
+        PersistentSubscription subscription = mock(PersistentSubscription.class);
+        Topic topic = mock(Topic.class);
+        when(subscription.getTopic()).thenReturn(topic);
+
+        for (int i = 0; i < totalEntries; i++) {
+            positions.add(ledger.addEntry(createMessageWrittenToLedger("msg" + i)));
+        }
+        when(topic.getLastPosition()).thenReturn(positions.get(positions.size() - 1));
+        for (Position p : positions) {
+            System.out.println(p);
+        }
+        PersistentMessageExpiryMonitor monitor = spy(new PersistentMessageExpiryMonitor("topicname",
+                cursor.getName(), cursor, subscription));
+        assertEquals((PositionImpl) cursor.getMarkDeletedPosition(), PositionImpl.get(positions.get(0).getLedgerId(), -1));
+
+        // Expire by position and verify mark delete position of cursor.
+        monitor.expireMessages(positions.get(15));
+        Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> verify(monitor, times(1)).findEntryComplete(any(), any()));
+        assertEquals((PositionImpl) cursor.getMarkDeletedPosition(), PositionImpl.get(positions.get(15).getLedgerId(), positions.get(15).getEntryId()));
+        clearInvocations(monitor);
+
+        // Expire by position beyond last position and nothing should happen.
+        monitor.expireMessages(PositionImpl.get(100, 100));
+        assertEquals((PositionImpl) cursor.getMarkDeletedPosition(), PositionImpl.get(positions.get(15).getLedgerId(), positions.get(15).getEntryId()));
+
+        // Expire by position again and verify mark delete position of cursor didn't change.
+        monitor.expireMessages(positions.get(15));
+        Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> verify(monitor, times(1)).findEntryComplete(any(), any()));
+        assertEquals((PositionImpl) cursor.getMarkDeletedPosition(), PositionImpl.get(positions.get(15).getLedgerId(), positions.get(15).getEntryId()));
+        clearInvocations(monitor);
+
+        // Expire by position before current mark delete position and verify mark delete position of cursor didn't change.
+        monitor.expireMessages(positions.get(10));
+        Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> verify(monitor, times(1)).findEntryComplete(any(), any()));
+        assertEquals((PositionImpl) cursor.getMarkDeletedPosition(), PositionImpl.get(positions.get(15).getLedgerId(), positions.get(15).getEntryId()));
+        clearInvocations(monitor);
+
+        // Expire by position after current mark delete position and verify mark delete position of cursor move to new position.
+        monitor.expireMessages(positions.get(16));
+        Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> verify(monitor, times(1)).findEntryComplete(any(), any()));
+        assertEquals((PositionImpl) cursor.getMarkDeletedPosition(), PositionImpl.get(positions.get(16).getLedgerId(), positions.get(16).getEntryId()));
+        clearInvocations(monitor);
+
+        cursor.close();
+        ledger.close();
+        factory.shutdown();
+    }
+
+    @Test
+    public void test() {
+        ResetCursorData resetCursorData = new ResetCursorData(1, 1);
+        resetCursorData.setExcluded(true);
+        System.out.println(Entity.entity(resetCursorData, MediaType.APPLICATION_JSON));
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
index 7c06837..b5e8251 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
@@ -42,6 +42,7 @@ import org.apache.pulsar.broker.authorization.AuthorizationService;
 import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
 import org.apache.pulsar.broker.cache.ConfigurationCacheService;
 import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.AuthAction;
@@ -210,6 +211,7 @@ public class AuthorizationProducerConsumerTest extends ProducerConsumerBase {
         tenantAdmin.topics().skipAllMessages(topicName, subscriptionName);
         tenantAdmin.topics().skipMessages(topicName, subscriptionName, 1);
         tenantAdmin.topics().expireMessages(topicName, subscriptionName, 10);
+        tenantAdmin.topics().expireMessages(topicName, subscriptionName, new MessageIdImpl(-1, -1, -1), true);
         tenantAdmin.topics().peekMessages(topicName, subscriptionName, 1);
         tenantAdmin.topics().resetCursor(topicName, subscriptionName, 10);
         tenantAdmin.topics().resetCursor(topicName, subscriptionName, MessageId.earliest);
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
index 247b287..576f628 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -1183,6 +1183,40 @@ public interface Topics {
             long expireTimeInSeconds);
 
     /**
+     * Expire all messages older than given N (expireTimeInSeconds) seconds for a given subscription.
+     *
+     * @param topic
+     *            topic name
+     * @param subscriptionName
+     *            Subscription name
+     * @param messageId
+     *            Position before which all messages will be expired.
+     * @param isExcluded
+     *            Will message at passed in position also be expired.
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void expireMessages(String topic, String subscriptionName, MessageId messageId, boolean isExcluded)
+            throws PulsarAdminException;
+
+    /**
+     * Expire all messages older than given N (expireTimeInSeconds) seconds for a given subscription asynchronously.
+     *
+     * @param topic
+     *            topic name
+     * @param subscriptionName
+     *            Subscription name
+     * @param messageId
+     *            Position before which all messages will be expired.
+     * @param isExcluded
+     *            Will message at passed in position also be expired.
+     * @return
+     *            A {@link CompletableFuture} that'll be completed when expire message is done.
+     */
+    CompletableFuture<Void> expireMessagesAsync(String topic, String subscriptionName,
+                                                MessageId messageId, boolean isExcluded);
+
+    /**
      * Expire all messages older than given N seconds for all subscriptions of the persistent-topic.
      *
      * @param topic
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 8de48ce..218c314 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -937,6 +937,33 @@ public class TopicsImpl extends BaseResource implements Topics {
     }
 
     @Override
+    public void expireMessages(String topic, String subscriptionName, MessageId messageId, boolean isExcluded)
+            throws PulsarAdminException {
+        try {
+            expireMessagesAsync(topic, subscriptionName, messageId, isExcluded)
+                    .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> expireMessagesAsync(String topic, String subscriptionName,
+                                                       MessageId messageId, boolean isExcluded) {
+        TopicName tn = validateTopic(topic);
+        String encodedSubName = Codec.encode(subscriptionName);
+        ResetCursorData resetCursorData = new ResetCursorData(messageId);
+        resetCursorData.setExcluded(isExcluded);
+        WebTarget path = topicPath(tn, "subscription", encodedSubName, "expireMessages");
+        return asyncPostRequest(path, Entity.entity(resetCursorData, MediaType.APPLICATION_JSON));
+    }
+
+    @Override
     public void expireMessagesForAllSubscriptions(String topic, long expireTimeInSeconds) throws PulsarAdminException {
         try {
             expireMessagesForAllSubscriptionsAsync(topic, expireTimeInSeconds)
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 1b2478e..8c35e01 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -748,6 +748,11 @@ public class PulsarAdminToolTest {
         cmdTopics.run(split("expire-messages persistent://myprop/clust/ns1/ds1 -s sub1 -t 100"));
         verify(mockTopics).expireMessages("persistent://myprop/clust/ns1/ds1", "sub1", 100);
 
+        //cmd with option cannot be executed repeatedly.
+        cmdTopics = new CmdTopics(() -> admin);
+        cmdTopics.run(split("expire-messages persistent://myprop/clust/ns1/ds1 -s sub1 -p 1:1 -e"));
+        verify(mockTopics).expireMessages(eq("persistent://myprop/clust/ns1/ds1"), eq("sub1"), eq(new MessageIdImpl(1, 1, -1)), eq(true));
+
         cmdTopics.run(split("expire-messages-all-subscriptions persistent://myprop/clust/ns1/ds1 -t 100"));
         verify(mockTopics).expireMessagesForAllSubscriptions("persistent://myprop/clust/ns1/ds1", 100);
 
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CliCommand.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CliCommand.java
index ce48ef4..5113daa 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CliCommand.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CliCommand.java
@@ -104,10 +104,14 @@ abstract class CliCommand {
     }
 
     static MessageId validateMessageIdString(String resetMessageIdStr) throws PulsarAdminException {
+        return validateMessageIdString(resetMessageIdStr, -1);
+    }
+
+    static MessageId validateMessageIdString(String resetMessageIdStr, int partitionIndex) throws PulsarAdminException {
         String[] messageId = resetMessageIdStr.split(":");
         try {
             Preconditions.checkArgument(messageId.length == 2);
-            return new MessageIdImpl(Long.parseLong(messageId[0]), Long.parseLong(messageId[1]), -1);
+            return new MessageIdImpl(Long.parseLong(messageId[0]), Long.parseLong(messageId[1]), partitionIndex);
         } catch (Exception e) {
             throw new PulsarAdminException(
                     "Invalid message id (must be in format: ledgerId:entryId) value " + resetMessageIdStr);
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 9e6eef2..f3319b9 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -39,6 +39,8 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
+
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.admin.LongRunningProcessStatus;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -47,6 +49,7 @@ import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
 import org.apache.pulsar.common.policies.data.DispatchRate;
@@ -657,13 +660,35 @@ public class CmdTopics extends CmdBase {
                 "--subscription" }, description = "Subscription to be skip messages on", required = true)
         private String subName;
 
-        @Parameter(names = { "-t", "--expireTime" }, description = "Expire messages older than time in seconds", required = true)
-        private long expireTimeInSeconds;
+        @Parameter(names = { "-t", "--expireTime" }, description = "Expire messages older than time in seconds")
+        private long expireTimeInSeconds = -1;
+
+        @Parameter(names = { "--position",
+                "-p" }, description = "message position to reset back to (ledgerId:entryId)", required = false)
+        private String messagePosition;
+
+        @Parameter(names = { "-e", "--exclude-reset-position" },
+                description = "Exclude the reset position, start consume messages from the next position.", required = false)
+        private boolean excludeResetPosition = false;
 
         @Override
         void run() throws PulsarAdminException {
+            if (expireTimeInSeconds >= 0 && isNotBlank(messagePosition)) {
+                throw new ParameterException(String.format("Can't expire message by time and " +
+                        "by message position at the same time."));
+            }
             String topic = validateTopicName(params);
-            getTopics().expireMessages(topic, subName, expireTimeInSeconds);
+            if (expireTimeInSeconds >= 0) {
+                getTopics().expireMessages(topic, subName, expireTimeInSeconds);
+            } else if (isNotBlank(messagePosition)) {
+                int partitionIndex = TopicName.get(topic).getPartitionIndex();
+                MessageId messageId = validateMessageIdString(messagePosition, partitionIndex);
+                getTopics().expireMessages(topic, subName, messageId, excludeResetPosition);
+            } else {
+                throw new ParameterException(
+                        "Either time (--expireTime) or message position (--position) has to be provided" +
+                                " to expire messages");
+            }
         }
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ResetCursorData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ResetCursorData.java
index 9311fa0..1bc7b7f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ResetCursorData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ResetCursorData.java
@@ -49,10 +49,12 @@ public class ResetCursorData {
             this.ledgerId = batchMessageId.getLedgerId();
             this.entryId = batchMessageId.getEntryId();
             this.batchIndex = batchMessageId.getBatchIndex();
+            this.partitionIndex = batchMessageId.partitionIndex;
         } else if (messageId instanceof MessageIdImpl) {
             MessageIdImpl messageIdImpl = (MessageIdImpl) messageId;
             this.ledgerId = messageIdImpl.getLedgerId();
             this.entryId = messageIdImpl.getEntryId();
+            this.partitionIndex = messageIdImpl.partitionIndex;
         }  else if (messageId instanceof TopicMessageIdImpl) {
             throw new IllegalArgumentException("Not supported operation on partitioned-topic");
         }