You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/09/22 02:57:08 UTC

[pulsar] branch master updated: [feat][broker]PIP-180 ShadowTopic - Part IV - Add Shadow Replicator (#17371)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d9c9d737577 [feat][broker]PIP-180 ShadowTopic - Part IV - Add Shadow Replicator (#17371)
d9c9d737577 is described below

commit d9c9d7375776c1b79a7d8f4430d650123d715047
Author: JiangHaiting <ji...@apache.org>
AuthorDate: Thu Sep 22 10:57:00 2022 +0800

    [feat][broker]PIP-180 ShadowTopic - Part IV - Add Shadow Replicator (#17371)
---
 .../pulsar/broker/service/BrokerService.java       |  10 ++
 .../org/apache/pulsar/broker/service/Topic.java    |   4 +
 .../service/nonpersistent/NonPersistentTopic.java  |  10 ++
 .../service/persistent/PersistentReplicator.java   |   3 +-
 .../broker/service/persistent/PersistentTopic.java | 149 ++++++++++++++++++++-
 .../service/persistent/ShadowReplicator.java       | 136 +++++++++++++++++++
 .../service/persistent/ShadowReplicatorTest.java   | 136 +++++++++++++++++++
 .../util/collections/ConcurrentOpenHashMap.java    |   5 +
 8 files changed, 448 insertions(+), 5 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 9289b2cf141..a3cef4bd027 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1058,6 +1058,14 @@ public class BrokerService implements Closeable {
                         new IllegalStateException("Delete forbidden topic is replicated on clusters " + clusters));
             }
 
+            // shadow topic should be deleted first.
+            if (t.isShadowReplicated()) {
+                final List<String> shadowTopics = t.getShadowReplicators().keys();
+                log.error("Delete forbidden. Topic {} is replicated to shadow topics: {}", topic, shadowTopics);
+                return FutureUtil.failedFuture(new IllegalStateException(
+                        "Delete forbidden. Topic " + topic + " is replicated to shadow topics."));
+            }
+
             return t.delete();
         }
 
@@ -2600,6 +2608,8 @@ public class BrokerService implements Closeable {
                     }
                     topic.getReplicators().forEach((name, persistentReplicator) ->
                         persistentReplicator.updateRateLimiter());
+                    topic.getShadowReplicators().forEach((name, persistentReplicator) ->
+                        persistentReplicator.updateRateLimiter());
                 }
             );
         });
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index 312468933f5..72e5d6a9228 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -246,6 +246,8 @@ public interface Topic {
 
     boolean isReplicated();
 
+    boolean isShadowReplicated();
+
     EntryFilters getEntryFiltersPolicy();
 
     ImmutableMap<String, EntryFilterWithClassLoader> getEntryFilters();
@@ -260,6 +262,8 @@ public interface Topic {
 
     ConcurrentOpenHashMap<String, ? extends Replicator> getReplicators();
 
+    ConcurrentOpenHashMap<String, ? extends Replicator> getShadowReplicators();
+
     TopicStatsImpl getStats(boolean getPreciseBacklog, boolean subscriptionBacklogSize,
                             boolean getEarliestTimeInBacklog);
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 0400326748d..a3da80080ff 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -669,6 +669,11 @@ public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPol
         return replicators;
     }
 
+    @Override
+    public ConcurrentOpenHashMap<String, ? extends Replicator> getShadowReplicators() {
+        return ConcurrentOpenHashMap.emptyMap();
+    }
+
     @Override
     public Subscription getSubscription(String subscription) {
         return subscriptions.get(subscription);
@@ -1091,6 +1096,11 @@ public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPol
         return replicators.size() > 1;
     }
 
+    @Override
+    public boolean isShadowReplicated() {
+        return false;
+    }
+
     @Override
     public CompletableFuture<Void> unsubscribe(String subscriptionName) {
         // checkInactiveSubscriptions iterates over subscriptions map and removing from the map with the same thread.
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index 87f9ef5480c..4f50a4c8fdc 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -50,6 +50,7 @@ import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyExceptio
 import org.apache.pulsar.broker.service.Replicator;
 import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.client.impl.MessageImpl;
@@ -132,7 +133,7 @@ public abstract class PersistentReplicator extends AbstractReplicator
     }
 
     @Override
-    protected void readEntries(org.apache.pulsar.client.api.Producer<byte[]> producer) {
+    protected void readEntries(Producer<byte[]> producer) {
         // Rewind the cursor to be sure to read again all non-acked messages sent while restarting
         cursor.rewind();
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 6f73602996d..30150341b36 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -74,6 +74,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.bookkeeper.net.BookieId;
+import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.namespace.NamespaceService;
@@ -172,7 +173,10 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
     // Subscriptions to this topic
     private final ConcurrentOpenHashMap<String, PersistentSubscription> subscriptions;
 
-    private final ConcurrentOpenHashMap<String, Replicator> replicators;
+    private final ConcurrentOpenHashMap<String/*RemoteCluster*/, Replicator> replicators;
+    private final ConcurrentOpenHashMap<String/*ShadowTopic*/, Replicator> shadowReplicators;
+    @Getter
+    private volatile List<String> shadowTopics;
 
     static final String DEDUPLICATION_CURSOR_NAME = "pulsar.dedup";
     private static final String TOPIC_EPOCH_PROPERTY_NAME = "pulsar.topic.epoch";
@@ -259,6 +263,10 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
                 .expectedItems(16)
                 .concurrencyLevel(1)
                 .build();
+        this.shadowReplicators = ConcurrentOpenHashMap.<String, Replicator>newBuilder()
+                .expectedItems(16)
+                .concurrencyLevel(1)
+                .build();
         this.backloggedCursorThresholdEntries =
                 brokerService.pulsar().getConfiguration().getManagedLedgerCursorBackloggedThreshold();
         registerTopicPolicyListener();
@@ -360,6 +368,10 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
                 .expectedItems(16)
                 .concurrencyLevel(1)
                 .build();
+        this.shadowReplicators = ConcurrentOpenHashMap.<String, Replicator>newBuilder()
+                .expectedItems(16)
+                .concurrencyLevel(1)
+                .build();
         this.compactedTopic = new CompactedTopicImpl(brokerService.pulsar().getBookKeeperClient());
         this.backloggedCursorThresholdEntries =
                 brokerService.pulsar().getConfiguration().getManagedLedgerCursorBackloggedThreshold();
@@ -659,12 +671,14 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
     public CompletableFuture<Void> stopReplProducers() {
         List<CompletableFuture<Void>> closeFutures = Lists.newArrayList();
         replicators.forEach((region, replicator) -> closeFutures.add(replicator.disconnect()));
+        shadowReplicators.forEach((__, replicator) -> closeFutures.add(replicator.disconnect()));
         return FutureUtil.waitForAll(closeFutures);
     }
 
     private synchronized CompletableFuture<Void> closeReplProducersIfNoBacklog() {
         List<CompletableFuture<Void>> closeFutures = Lists.newArrayList();
         replicators.forEach((region, replicator) -> closeFutures.add(replicator.disconnect(true)));
+        shadowReplicators.forEach((__, replicator) -> closeFutures.add(replicator.disconnect(true)));
         return FutureUtil.waitForAll(closeFutures);
     }
 
@@ -1150,6 +1164,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
             if (closeIfClientsConnected) {
                 List<CompletableFuture<Void>> futures = Lists.newArrayList();
                 replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect()));
+                shadowReplicators.forEach((__, replicator) -> futures.add(replicator.disconnect()));
                 producers.values().forEach(producer -> futures.add(producer.disconnect()));
                 subscriptions.forEach((s, sub) -> futures.add(sub.disconnect()));
                 FutureUtil.waitForAll(futures).thenRun(() -> {
@@ -1275,6 +1290,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
 
         futures.add(transactionBuffer.closeAsync());
         replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect()));
+        shadowReplicators.forEach((__, replicator) -> futures.add(replicator.disconnect()));
         producers.values().forEach(producer -> futures.add(producer.disconnect()));
         if (topicPublishRateLimiter != null) {
             topicPublishRateLimiter.close();
@@ -1393,7 +1409,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         }
 
         List<String> configuredClusters = topicPolicies.getReplicationClusters().get();
-        int newMessageTTLinSeconds = topicPolicies.getMessageTTLInSeconds().get();
+        int newMessageTTLInSeconds = topicPolicies.getMessageTTLInSeconds().get();
 
         String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
 
@@ -1420,7 +1436,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         // Check for replicators to be stopped
         replicators.forEach((cluster, replicator) -> {
             // Update message TTL
-            ((PersistentReplicator) replicator).updateMessageTTL(newMessageTTLinSeconds);
+            ((PersistentReplicator) replicator).updateMessageTTL(newMessageTTLInSeconds);
             if (!cluster.equals(localCluster)) {
                 if (!configuredClusters.contains(cluster)) {
                     futures.add(removeReplicator(cluster));
@@ -1428,6 +1444,38 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
             }
         });
 
+        futures.add(checkShadowReplication());
+
+        return FutureUtil.waitForAll(futures);
+    }
+
+    private CompletableFuture<Void> checkShadowReplication() {
+        if (CollectionUtils.isEmpty(shadowTopics)) {
+            return CompletableFuture.completedFuture(null);
+        }
+        List<String> configuredShadowTopics = shadowTopics;
+        int newMessageTTLInSeconds = topicPolicies.getMessageTTLInSeconds().get();
+
+        if (log.isDebugEnabled()) {
+            log.debug("[{}] Checking shadow replication status, shadowTopics={}", topic, configuredShadowTopics);
+        }
+        List<CompletableFuture<Void>> futures = Lists.newArrayList();
+
+        // Check for missing replicators
+        for (String shadowTopic : configuredShadowTopics) {
+            if (!shadowReplicators.containsKey(shadowTopic)) {
+                futures.add(startShadowReplicator(shadowTopic));
+            }
+        }
+
+        // Check for replicators to be stopped
+        shadowReplicators.forEach((shadowTopic, replicator) -> {
+            // Update message TTL
+            ((PersistentReplicator) replicator).updateMessageTTL(newMessageTTLInSeconds);
+            if (!configuredShadowTopics.contains(shadowTopic)) {
+                futures.add(removeShadowReplicator(shadowTopic));
+            }
+        });
         return FutureUtil.waitForAll(futures);
     }
 
@@ -1438,6 +1486,8 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
             subscriptions.forEach((__, sub) -> sub.expireMessages(messageTtlInSeconds));
             replicators.forEach((__, replicator)
                     -> ((PersistentReplicator) replicator).expireMessages(messageTtlInSeconds));
+            shadowReplicators.forEach((__, replicator)
+                    -> ((PersistentReplicator) replicator).expireMessages(messageTtlInSeconds));
         }
     }
 
@@ -1584,6 +1634,79 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         return future;
     }
 
+    CompletableFuture<Void> startShadowReplicator(String shadowTopic) {
+        log.info("[{}] Starting shadow topic replicator to remote: {}", topic, shadowTopic);
+
+        String name = ShadowReplicator.getShadowReplicatorName(replicatorPrefix, shadowTopic);
+        ManagedCursor cursor;
+        try {
+            cursor = ledger.newNonDurableCursor(PositionImpl.LATEST, name);
+        } catch (ManagedLedgerException e) {
+            log.error("[{}]Open non-durable cursor for shadow replicator failed, name={}", topic, name, e);
+            return FutureUtil.failedFuture(e);
+        }
+        CompletableFuture<Void> future = addShadowReplicationCluster(shadowTopic, cursor);
+        future.exceptionally(ex -> {
+            log.error("[{}] Add shadow replication cluster failed, shadowTopic={}", topic, shadowTopic, ex);
+            return null;
+        });
+        return future;
+    }
+
+    protected CompletableFuture<Void> addShadowReplicationCluster(String shadowTopic, ManagedCursor cursor) {
+        String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
+        return AbstractReplicator.validatePartitionedTopicAsync(PersistentTopic.this.getName(), brokerService)
+                .thenCompose(__ -> brokerService.pulsar().getPulsarResources().getClusterResources()
+                        .getClusterAsync(localCluster)
+                        .thenApply(clusterData -> brokerService.getReplicationClient(localCluster, clusterData)))
+                .thenAccept(replicationClient -> {
+                    Replicator replicator = shadowReplicators.computeIfAbsent(shadowTopic, r -> {
+                        try {
+                            return new ShadowReplicator(shadowTopic, PersistentTopic.this, cursor, brokerService,
+                                    (PulsarClientImpl) replicationClient);
+                        } catch (PulsarServerException e) {
+                            log.error("[{}] ShadowReplicator startup failed {}", topic, shadowTopic, e);
+                        }
+                        return null;
+                    });
+
+                    // clean up replicator if startup is failed
+                    if (replicator == null) {
+                        shadowReplicators.removeNullValue(shadowTopic);
+                    }
+                });
+    }
+
+    CompletableFuture<Void> removeShadowReplicator(String shadowTopic) {
+        log.info("[{}] Removing shadow topic replicator to {}", topic, shadowTopic);
+        final CompletableFuture<Void> future = new CompletableFuture<>();
+        String name = ShadowReplicator.getShadowReplicatorName(replicatorPrefix, shadowTopic);
+        shadowReplicators.get(shadowTopic).disconnect().thenRun(() -> {
+
+            ledger.asyncDeleteCursor(name, new DeleteCursorCallback() {
+                @Override
+                public void deleteCursorComplete(Object ctx) {
+                    shadowReplicators.remove(shadowTopic);
+                    future.complete(null);
+                }
+
+                @Override
+                public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) {
+                    log.error("[{}] Failed to delete shadow topic replication cursor {} {}",
+                            topic, name, exception.getMessage(), exception);
+                    future.completeExceptionally(new PersistenceException(exception));
+                }
+            }, null);
+
+        }).exceptionally(e -> {
+            log.error("[{}] Failed to close shadow topic replication producer {} {}", topic, name, e.getMessage(), e);
+            future.completeExceptionally(e);
+            return null;
+        });
+
+        return future;
+    }
+
     public boolean isDeduplicationEnabled() {
         return messageDeduplication.isEnabled();
     }
@@ -1617,6 +1740,11 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         return replicators;
     }
 
+    @Override
+    public ConcurrentOpenHashMap<String, Replicator> getShadowReplicators() {
+        return shadowReplicators;
+    }
+
     public Replicator getPersistentReplicator(String remoteCluster) {
         return replicators.get(remoteCluster);
     }
@@ -2448,6 +2576,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         return FutureUtil.waitForAll(producerCheckFutures).thenCompose((__) -> {
             return updateSubscriptionsDispatcherRateLimiter().thenCompose((___) -> {
                 replicators.forEach((name, replicator) -> replicator.updateRateLimiter());
+                shadowReplicators.forEach((name, replicator) -> replicator.updateRateLimiter());
                 checkMessageExpiry();
                 CompletableFuture<Void> replicationFuture = checkReplicationAndRetryOnFailure();
                 CompletableFuture<Void> dedupFuture = checkDeduplicationStatus();
@@ -2617,6 +2746,11 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         return !replicators.isEmpty();
     }
 
+    @Override
+    public boolean isShadowReplicated() {
+        return !shadowReplicators.isEmpty();
+    }
+
     public CompletableFuture<MessageId> terminate() {
         CompletableFuture<MessageId> future = new CompletableFuture<>();
         ledger.asyncTerminate(new TerminateCallback() {
@@ -2672,6 +2806,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         List<CompletableFuture<Void>> futures = Lists.newArrayList();
         List<String> cursors = getSubscriptions().keys();
         cursors.addAll(getReplicators().keys());
+        cursors.addAll(getShadowReplicators().keys());
         for (String cursor : cursors) {
             futures.add(clearBacklog(cursor));
         }
@@ -2699,6 +2834,11 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
             return repl.clearBacklog();
         }
 
+        repl = (PersistentReplicator) shadowReplicators.get(cursorName);
+        if (repl != null) {
+            return repl.clearBacklog();
+        }
+
         return FutureUtil.failedFuture(new BrokerServiceException("Cursor not found"));
     }
 
@@ -3069,12 +3209,13 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
             return;
         }
         updateTopicPolicy(policies);
-
+        shadowTopics = policies.getShadowTopics();
         updateDispatchRateLimiter();
         updateSubscriptionsDispatcherRateLimiter().thenRun(() -> {
             updatePublishDispatcher();
             updateSubscribeRateLimiter();
             replicators.forEach((name, replicator) -> replicator.updateRateLimiter());
+            shadowReplicators.forEach((name, replicator) -> replicator.updateRateLimiter());
             checkMessageExpiry();
             checkReplicationAndRetryOnFailure();
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java
new file mode 100644
index 00000000000..a7c56acbbfb
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.broker.service.persistent;
+
+
+import io.netty.buffer.ByteBuf;
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.util.Codec;
+
+/**
+ *  Replicate messages to shadow topic.
+ */
+@Slf4j
+public class ShadowReplicator extends PersistentReplicator {
+
+    public ShadowReplicator(String shadowTopic, PersistentTopic sourceTopic, ManagedCursor cursor,
+                            BrokerService brokerService, PulsarClientImpl replicationClient)
+            throws PulsarServerException {
+        super(brokerService.pulsar().getConfiguration().getClusterName(), sourceTopic, cursor,
+                brokerService.pulsar().getConfiguration().getClusterName(), shadowTopic, brokerService,
+                replicationClient);
+    }
+
+    /**
+     * @return Producer name format : replicatorPrefix-localTopic-->remoteTopic
+     */
+    @Override
+    protected String getProducerName() {
+        return replicatorPrefix + "-" + localTopicName + REPL_PRODUCER_NAME_DELIMITER + remoteTopicName;
+    }
+
+    @Override
+    protected boolean replicateEntries(List<Entry> entries) {
+        boolean atLeastOneMessageSentForReplication = false;
+
+        try {
+            // This flag is set to true when we skip at least one local message,
+            // in order to skip remaining local messages.
+            boolean isLocalMessageSkippedOnce = false;
+            for (int i = 0; i < entries.size(); i++) {
+                Entry entry = entries.get(i);
+                int length = entry.getLength();
+                ByteBuf headersAndPayload = entry.getDataBuffer();
+                MessageImpl msg;
+                try {
+                    msg = MessageImpl.deserializeSkipBrokerEntryMetaData(headersAndPayload);
+                } catch (Throwable t) {
+                    log.error("[{}] Failed to deserialize message at {} (buffer size: {}): {}", replicatorId,
+                            entry.getPosition(), length, t.getMessage(), t);
+                    cursor.asyncDelete(entry.getPosition(), this, entry.getPosition());
+                    entry.release();
+                    continue;
+                }
+
+                if (msg.isExpired(messageTTLInSeconds)) {
+                    msgExpired.recordEvent(0 /* no value stat */);
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] Discarding expired message at position {}, replicateTo {}",
+                                replicatorId, entry.getPosition(), msg.getReplicateTo());
+                    }
+                    cursor.asyncDelete(entry.getPosition(), this, entry.getPosition());
+                    entry.release();
+                    msg.recycle();
+                    continue;
+                }
+
+                if (STATE_UPDATER.get(this) != State.Started || isLocalMessageSkippedOnce) {
+                    // The producer is not ready yet after having stopped/restarted. Drop the message because it will
+                    // recovered when the producer is ready
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] Dropping read message at {} because producer is not ready",
+                                replicatorId, entry.getPosition());
+                    }
+                    isLocalMessageSkippedOnce = true;
+                    entry.release();
+                    msg.recycle();
+                    continue;
+                }
+
+                dispatchRateLimiter.ifPresent(rateLimiter -> rateLimiter.tryDispatchPermit(1, entry.getLength()));
+
+                // Increment pending messages for messages produced locally
+                PENDING_MESSAGES_UPDATER.incrementAndGet(this);
+
+                msgOut.recordEvent(headersAndPayload.readableBytes());
+
+                msg.setReplicatedFrom(localCluster);
+
+                msg.setMessageId(new MessageIdImpl(entry.getLedgerId(), entry.getEntryId(), -1));
+
+                headersAndPayload.retain();
+
+                producer.sendAsync(msg, ProducerSendCallback.create(this, entry, msg));
+                atLeastOneMessageSentForReplication = true;
+            }
+        } catch (Exception e) {
+            log.error("[{}] Unexpected exception: {}", replicatorId, e.getMessage(), e);
+        }
+        return atLeastOneMessageSentForReplication;
+    }
+
+    /**
+     * Cursor name fot this shadow replicator.
+     * @param replicatorPrefix
+     * @param shadowTopic
+     * @return
+     */
+    public static String getShadowReplicatorName(String replicatorPrefix, String shadowTopic) {
+        return replicatorPrefix + "-" + Codec.encode(shadowTopic);
+    }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowReplicatorTest.java
new file mode 100644
index 00000000000..822627f12b4
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowReplicatorTest.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.broker.service.persistent;
+
+import com.google.common.collect.Sets;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.assertj.core.util.Lists;
+import org.awaitility.Awaitility;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker-replication")
+public class ShadowReplicatorTest extends BrokerTestBase {
+
+    @BeforeClass(alwaysRun = true)
+    @Override
+    protected void setup() throws Exception {
+        super.baseSetup();
+        admin.tenants().createTenant("prop1",
+                new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet("test")));
+        admin.namespaces().createNamespace("prop1/ns-source");
+        admin.namespaces().createNamespace("prop1/ns-shadow");
+    }
+
+    @BeforeClass(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testShadowReplication() throws Exception {
+        String sourceTopicName = "persistent://prop1/ns-source/source-topic";
+        String shadowTopicName = "persistent://prop1/ns-shadow/shadow-topic";
+        String shadowTopicName2 = "persistent://prop1/ns-shadow/shadow-topic-2";
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(sourceTopicName).create();
+        // NOTE: shadow topic is not ready yet. So use normal topic instead.
+        // The only difference for consumer should be that the message id is changed.
+        @Cleanup
+        Consumer<byte[]> shadowConsumer =
+                pulsarClient.newConsumer().topic(shadowTopicName).subscriptionName("shadow-sub")
+                        .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
+        @Cleanup
+        Consumer<byte[]> shadowConsumer2 =
+                pulsarClient.newConsumer().topic(shadowTopicName2).subscriptionName("shadow-sub")
+                        .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
+
+        PersistentTopic sourceTopic =
+                (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(sourceTopicName).get().get();
+
+        admin.topics().setShadowTopics(sourceTopicName, Lists.newArrayList(shadowTopicName, shadowTopicName2));
+
+        Awaitility.await().untilAsserted(()->Assert.assertEquals(sourceTopic.getShadowReplicators().size(), 2));
+
+        ShadowReplicator
+                replicator = (ShadowReplicator) sourceTopic.getShadowReplicators().get(shadowTopicName);
+        Awaitility.await().untilAsserted(() ->
+                Assert.assertEquals(String.valueOf(replicator.getState()), "Started"));
+
+        @Cleanup
+        Consumer<byte[]> sourceConsumer =
+                pulsarClient.newConsumer().topic(sourceTopicName).subscriptionName("source-sub")
+                        .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
+        byte[] data = "test-shadow-topic".getBytes(StandardCharsets.UTF_8);
+        MessageId sourceMessageId = producer.newMessage()
+                .sequenceId(1)
+                .key("K")
+                .property("PK", "PV")
+                .eventTime(123)
+                .value(data)
+                .send();
+
+        Message<byte[]> sourceMessage = sourceConsumer.receive();
+        Assert.assertEquals(sourceMessage.getMessageId(), sourceMessageId);
+
+        //Wait until msg is replicated to shadow topic.
+        Awaitility.await().until(() -> {
+            replicator.msgOut.calculateRate();
+            return replicator.msgOut.getCount() >= 1;
+        });
+        Awaitility.await().until(() -> PersistentReplicator.PENDING_MESSAGES_UPDATER.get(replicator) == 0);
+
+        PersistentTopic shadowTopic =
+                (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(shadowTopicName).get().get();
+        Assert.assertNotNull(shadowTopic);
+
+        Message<byte[]> shadowMessage = shadowConsumer.receive(5, TimeUnit.SECONDS);
+
+        Assert.assertEquals(shadowMessage.getData(), sourceMessage.getData());
+        Assert.assertEquals(shadowMessage.getSequenceId(), sourceMessage.getSequenceId());
+        Assert.assertEquals(shadowMessage.getEventTime(), sourceMessage.getEventTime());
+        Assert.assertEquals(shadowMessage.getProperties(), sourceMessage.getProperties());
+        Assert.assertEquals(shadowMessage.getKey(), sourceMessage.getKey());
+        Assert.assertEquals(shadowMessage.getOrderingKey(), sourceMessage.getOrderingKey());
+        Assert.assertEquals(shadowMessage.getSchemaVersion(), sourceMessage.getSchemaVersion());
+        Assert.assertEquals(shadowMessage.getPublishTime(), sourceMessage.getPublishTime());
+        Assert.assertEquals(shadowMessage.getBrokerPublishTime(), sourceMessage.getBrokerPublishTime());
+        Assert.assertEquals(shadowMessage.getIndex(), sourceMessage.getIndex());
+
+        //`replicatedFrom` is set as localClusterName in shadow topic.
+        Assert.assertNotEquals(shadowMessage.getReplicatedFrom(), sourceMessage.getReplicatedFrom());
+        //Currently, msg is copied in BK. So the message id is not the same.
+        Assert.assertNotEquals(shadowMessage.getMessageId(), sourceMessage.getMessageId());
+    }
+}
\ No newline at end of file
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java
index 618b3e539a9..924d4893578 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java
@@ -42,6 +42,7 @@ public class ConcurrentOpenHashMap<K, V> {
 
     private static final Object EmptyKey = null;
     private static final Object DeletedKey = new Object();
+    private static final ConcurrentOpenHashMap EmptyMap = new ConcurrentOpenHashMap<>(1, 1);
 
     /**
      * This object is used to delete empty value in this map.
@@ -173,6 +174,10 @@ public class ConcurrentOpenHashMap<K, V> {
         }
     }
 
+    public static <K, V> ConcurrentOpenHashMap<K, V> emptyMap() {
+        return (ConcurrentOpenHashMap<K, V>) EmptyMap;
+    }
+
     long getUsedBucketCount() {
         long usedBucketCount = 0;
         for (Section<K, V> s : sections) {