You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/09/22 02:57:08 UTC
[pulsar] branch master updated: [feat][broker]PIP-180 ShadowTopic - Part IV - Add Shadow Replicator (#17371)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d9c9d737577 [feat][broker]PIP-180 ShadowTopic - Part IV - Add Shadow Replicator (#17371)
d9c9d737577 is described below
commit d9c9d7375776c1b79a7d8f4430d650123d715047
Author: JiangHaiting <ji...@apache.org>
AuthorDate: Thu Sep 22 10:57:00 2022 +0800
[feat][broker]PIP-180 ShadowTopic - Part IV - Add Shadow Replicator (#17371)
---
.../pulsar/broker/service/BrokerService.java | 10 ++
.../org/apache/pulsar/broker/service/Topic.java | 4 +
.../service/nonpersistent/NonPersistentTopic.java | 10 ++
.../service/persistent/PersistentReplicator.java | 3 +-
.../broker/service/persistent/PersistentTopic.java | 149 ++++++++++++++++++++-
.../service/persistent/ShadowReplicator.java | 136 +++++++++++++++++++
.../service/persistent/ShadowReplicatorTest.java | 136 +++++++++++++++++++
.../util/collections/ConcurrentOpenHashMap.java | 5 +
8 files changed, 448 insertions(+), 5 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 9289b2cf141..a3cef4bd027 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1058,6 +1058,14 @@ public class BrokerService implements Closeable {
new IllegalStateException("Delete forbidden topic is replicated on clusters " + clusters));
}
+ // shadow topic should be deleted first.
+ if (t.isShadowReplicated()) {
+ final List<String> shadowTopics = t.getShadowReplicators().keys();
+ log.error("Delete forbidden. Topic {} is replicated to shadow topics: {}", topic, shadowTopics);
+ return FutureUtil.failedFuture(new IllegalStateException(
+ "Delete forbidden. Topic " + topic + " is replicated to shadow topics."));
+ }
+
return t.delete();
}
@@ -2600,6 +2608,8 @@ public class BrokerService implements Closeable {
}
topic.getReplicators().forEach((name, persistentReplicator) ->
persistentReplicator.updateRateLimiter());
+ topic.getShadowReplicators().forEach((name, persistentReplicator) ->
+ persistentReplicator.updateRateLimiter());
}
);
});
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index 312468933f5..72e5d6a9228 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -246,6 +246,8 @@ public interface Topic {
boolean isReplicated();
+ boolean isShadowReplicated();
+
EntryFilters getEntryFiltersPolicy();
ImmutableMap<String, EntryFilterWithClassLoader> getEntryFilters();
@@ -260,6 +262,8 @@ public interface Topic {
ConcurrentOpenHashMap<String, ? extends Replicator> getReplicators();
+ ConcurrentOpenHashMap<String, ? extends Replicator> getShadowReplicators();
+
TopicStatsImpl getStats(boolean getPreciseBacklog, boolean subscriptionBacklogSize,
boolean getEarliestTimeInBacklog);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 0400326748d..a3da80080ff 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -669,6 +669,11 @@ public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPol
return replicators;
}
+ @Override
+ public ConcurrentOpenHashMap<String, ? extends Replicator> getShadowReplicators() {
+ return ConcurrentOpenHashMap.emptyMap();
+ }
+
@Override
public Subscription getSubscription(String subscription) {
return subscriptions.get(subscription);
@@ -1091,6 +1096,11 @@ public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPol
return replicators.size() > 1;
}
+ @Override
+ public boolean isShadowReplicated() {
+ return false;
+ }
+
@Override
public CompletableFuture<Void> unsubscribe(String subscriptionName) {
// checkInactiveSubscriptions iterates over subscriptions map and removing from the map with the same thread.
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index 87f9ef5480c..4f50a4c8fdc 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -50,6 +50,7 @@ import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyExceptio
import org.apache.pulsar.broker.service.Replicator;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type;
import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.client.impl.MessageImpl;
@@ -132,7 +133,7 @@ public abstract class PersistentReplicator extends AbstractReplicator
}
@Override
- protected void readEntries(org.apache.pulsar.client.api.Producer<byte[]> producer) {
+ protected void readEntries(Producer<byte[]> producer) {
// Rewind the cursor to be sure to read again all non-acked messages sent while restarting
cursor.rewind();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 6f73602996d..30150341b36 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -74,6 +74,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.net.BookieId;
+import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.namespace.NamespaceService;
@@ -172,7 +173,10 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
// Subscriptions to this topic
private final ConcurrentOpenHashMap<String, PersistentSubscription> subscriptions;
- private final ConcurrentOpenHashMap<String, Replicator> replicators;
+ private final ConcurrentOpenHashMap<String/*RemoteCluster*/, Replicator> replicators;
+ private final ConcurrentOpenHashMap<String/*ShadowTopic*/, Replicator> shadowReplicators;
+ @Getter
+ private volatile List<String> shadowTopics;
static final String DEDUPLICATION_CURSOR_NAME = "pulsar.dedup";
private static final String TOPIC_EPOCH_PROPERTY_NAME = "pulsar.topic.epoch";
@@ -259,6 +263,10 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
.expectedItems(16)
.concurrencyLevel(1)
.build();
+ this.shadowReplicators = ConcurrentOpenHashMap.<String, Replicator>newBuilder()
+ .expectedItems(16)
+ .concurrencyLevel(1)
+ .build();
this.backloggedCursorThresholdEntries =
brokerService.pulsar().getConfiguration().getManagedLedgerCursorBackloggedThreshold();
registerTopicPolicyListener();
@@ -360,6 +368,10 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
.expectedItems(16)
.concurrencyLevel(1)
.build();
+ this.shadowReplicators = ConcurrentOpenHashMap.<String, Replicator>newBuilder()
+ .expectedItems(16)
+ .concurrencyLevel(1)
+ .build();
this.compactedTopic = new CompactedTopicImpl(brokerService.pulsar().getBookKeeperClient());
this.backloggedCursorThresholdEntries =
brokerService.pulsar().getConfiguration().getManagedLedgerCursorBackloggedThreshold();
@@ -659,12 +671,14 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
public CompletableFuture<Void> stopReplProducers() {
List<CompletableFuture<Void>> closeFutures = Lists.newArrayList();
replicators.forEach((region, replicator) -> closeFutures.add(replicator.disconnect()));
+ shadowReplicators.forEach((__, replicator) -> closeFutures.add(replicator.disconnect()));
return FutureUtil.waitForAll(closeFutures);
}
private synchronized CompletableFuture<Void> closeReplProducersIfNoBacklog() {
List<CompletableFuture<Void>> closeFutures = Lists.newArrayList();
replicators.forEach((region, replicator) -> closeFutures.add(replicator.disconnect(true)));
+ shadowReplicators.forEach((__, replicator) -> closeFutures.add(replicator.disconnect(true)));
return FutureUtil.waitForAll(closeFutures);
}
@@ -1150,6 +1164,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
if (closeIfClientsConnected) {
List<CompletableFuture<Void>> futures = Lists.newArrayList();
replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect()));
+ shadowReplicators.forEach((__, replicator) -> futures.add(replicator.disconnect()));
producers.values().forEach(producer -> futures.add(producer.disconnect()));
subscriptions.forEach((s, sub) -> futures.add(sub.disconnect()));
FutureUtil.waitForAll(futures).thenRun(() -> {
@@ -1275,6 +1290,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
futures.add(transactionBuffer.closeAsync());
replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect()));
+ shadowReplicators.forEach((__, replicator) -> futures.add(replicator.disconnect()));
producers.values().forEach(producer -> futures.add(producer.disconnect()));
if (topicPublishRateLimiter != null) {
topicPublishRateLimiter.close();
@@ -1393,7 +1409,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
}
List<String> configuredClusters = topicPolicies.getReplicationClusters().get();
- int newMessageTTLinSeconds = topicPolicies.getMessageTTLInSeconds().get();
+ int newMessageTTLInSeconds = topicPolicies.getMessageTTLInSeconds().get();
String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
@@ -1420,7 +1436,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
// Check for replicators to be stopped
replicators.forEach((cluster, replicator) -> {
// Update message TTL
- ((PersistentReplicator) replicator).updateMessageTTL(newMessageTTLinSeconds);
+ ((PersistentReplicator) replicator).updateMessageTTL(newMessageTTLInSeconds);
if (!cluster.equals(localCluster)) {
if (!configuredClusters.contains(cluster)) {
futures.add(removeReplicator(cluster));
@@ -1428,6 +1444,38 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
}
});
+ futures.add(checkShadowReplication());
+
+ return FutureUtil.waitForAll(futures);
+ }
+
+ private CompletableFuture<Void> checkShadowReplication() {
+ if (CollectionUtils.isEmpty(shadowTopics)) {
+ return CompletableFuture.completedFuture(null);
+ }
+ List<String> configuredShadowTopics = shadowTopics;
+ int newMessageTTLInSeconds = topicPolicies.getMessageTTLInSeconds().get();
+
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Checking shadow replication status, shadowTopics={}", topic, configuredShadowTopics);
+ }
+ List<CompletableFuture<Void>> futures = Lists.newArrayList();
+
+ // Check for missing replicators
+ for (String shadowTopic : configuredShadowTopics) {
+ if (!shadowReplicators.containsKey(shadowTopic)) {
+ futures.add(startShadowReplicator(shadowTopic));
+ }
+ }
+
+ // Check for replicators to be stopped
+ shadowReplicators.forEach((shadowTopic, replicator) -> {
+ // Update message TTL
+ ((PersistentReplicator) replicator).updateMessageTTL(newMessageTTLInSeconds);
+ if (!configuredShadowTopics.contains(shadowTopic)) {
+ futures.add(removeShadowReplicator(shadowTopic));
+ }
+ });
return FutureUtil.waitForAll(futures);
}
@@ -1438,6 +1486,8 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
subscriptions.forEach((__, sub) -> sub.expireMessages(messageTtlInSeconds));
replicators.forEach((__, replicator)
-> ((PersistentReplicator) replicator).expireMessages(messageTtlInSeconds));
+ shadowReplicators.forEach((__, replicator)
+ -> ((PersistentReplicator) replicator).expireMessages(messageTtlInSeconds));
}
}
@@ -1584,6 +1634,79 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
return future;
}
+ CompletableFuture<Void> startShadowReplicator(String shadowTopic) {
+ log.info("[{}] Starting shadow topic replicator to remote: {}", topic, shadowTopic);
+
+ String name = ShadowReplicator.getShadowReplicatorName(replicatorPrefix, shadowTopic);
+ ManagedCursor cursor;
+ try {
+ cursor = ledger.newNonDurableCursor(PositionImpl.LATEST, name);
+ } catch (ManagedLedgerException e) {
+ log.error("[{}]Open non-durable cursor for shadow replicator failed, name={}", topic, name, e);
+ return FutureUtil.failedFuture(e);
+ }
+ CompletableFuture<Void> future = addShadowReplicationCluster(shadowTopic, cursor);
+ future.exceptionally(ex -> {
+ log.error("[{}] Add shadow replication cluster failed, shadowTopic={}", topic, shadowTopic, ex);
+ return null;
+ });
+ return future;
+ }
+
+ protected CompletableFuture<Void> addShadowReplicationCluster(String shadowTopic, ManagedCursor cursor) {
+ String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
+ return AbstractReplicator.validatePartitionedTopicAsync(PersistentTopic.this.getName(), brokerService)
+ .thenCompose(__ -> brokerService.pulsar().getPulsarResources().getClusterResources()
+ .getClusterAsync(localCluster)
+ .thenApply(clusterData -> brokerService.getReplicationClient(localCluster, clusterData)))
+ .thenAccept(replicationClient -> {
+ Replicator replicator = shadowReplicators.computeIfAbsent(shadowTopic, r -> {
+ try {
+ return new ShadowReplicator(shadowTopic, PersistentTopic.this, cursor, brokerService,
+ (PulsarClientImpl) replicationClient);
+ } catch (PulsarServerException e) {
+ log.error("[{}] ShadowReplicator startup failed {}", topic, shadowTopic, e);
+ }
+ return null;
+ });
+
+ // clean up replicator if startup is failed
+ if (replicator == null) {
+ shadowReplicators.removeNullValue(shadowTopic);
+ }
+ });
+ }
+
+ CompletableFuture<Void> removeShadowReplicator(String shadowTopic) {
+ log.info("[{}] Removing shadow topic replicator to {}", topic, shadowTopic);
+ final CompletableFuture<Void> future = new CompletableFuture<>();
+ String name = ShadowReplicator.getShadowReplicatorName(replicatorPrefix, shadowTopic);
+ shadowReplicators.get(shadowTopic).disconnect().thenRun(() -> {
+
+ ledger.asyncDeleteCursor(name, new DeleteCursorCallback() {
+ @Override
+ public void deleteCursorComplete(Object ctx) {
+ shadowReplicators.remove(shadowTopic);
+ future.complete(null);
+ }
+
+ @Override
+ public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) {
+ log.error("[{}] Failed to delete shadow topic replication cursor {} {}",
+ topic, name, exception.getMessage(), exception);
+ future.completeExceptionally(new PersistenceException(exception));
+ }
+ }, null);
+
+ }).exceptionally(e -> {
+ log.error("[{}] Failed to close shadow topic replication producer {} {}", topic, name, e.getMessage(), e);
+ future.completeExceptionally(e);
+ return null;
+ });
+
+ return future;
+ }
+
public boolean isDeduplicationEnabled() {
return messageDeduplication.isEnabled();
}
@@ -1617,6 +1740,11 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
return replicators;
}
+ @Override
+ public ConcurrentOpenHashMap<String, Replicator> getShadowReplicators() {
+ return shadowReplicators;
+ }
+
public Replicator getPersistentReplicator(String remoteCluster) {
return replicators.get(remoteCluster);
}
@@ -2448,6 +2576,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
return FutureUtil.waitForAll(producerCheckFutures).thenCompose((__) -> {
return updateSubscriptionsDispatcherRateLimiter().thenCompose((___) -> {
replicators.forEach((name, replicator) -> replicator.updateRateLimiter());
+ shadowReplicators.forEach((name, replicator) -> replicator.updateRateLimiter());
checkMessageExpiry();
CompletableFuture<Void> replicationFuture = checkReplicationAndRetryOnFailure();
CompletableFuture<Void> dedupFuture = checkDeduplicationStatus();
@@ -2617,6 +2746,11 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
return !replicators.isEmpty();
}
+ @Override
+ public boolean isShadowReplicated() {
+ return !shadowReplicators.isEmpty();
+ }
+
public CompletableFuture<MessageId> terminate() {
CompletableFuture<MessageId> future = new CompletableFuture<>();
ledger.asyncTerminate(new TerminateCallback() {
@@ -2672,6 +2806,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
List<CompletableFuture<Void>> futures = Lists.newArrayList();
List<String> cursors = getSubscriptions().keys();
cursors.addAll(getReplicators().keys());
+ cursors.addAll(getShadowReplicators().keys());
for (String cursor : cursors) {
futures.add(clearBacklog(cursor));
}
@@ -2699,6 +2834,11 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
return repl.clearBacklog();
}
+ repl = (PersistentReplicator) shadowReplicators.get(cursorName);
+ if (repl != null) {
+ return repl.clearBacklog();
+ }
+
return FutureUtil.failedFuture(new BrokerServiceException("Cursor not found"));
}
@@ -3069,12 +3209,13 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
return;
}
updateTopicPolicy(policies);
-
+ shadowTopics = policies.getShadowTopics();
updateDispatchRateLimiter();
updateSubscriptionsDispatcherRateLimiter().thenRun(() -> {
updatePublishDispatcher();
updateSubscribeRateLimiter();
replicators.forEach((name, replicator) -> replicator.updateRateLimiter());
+ shadowReplicators.forEach((name, replicator) -> replicator.updateRateLimiter());
checkMessageExpiry();
checkReplicationAndRetryOnFailure();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java
new file mode 100644
index 00000000000..a7c56acbbfb
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.broker.service.persistent;
+
+
+import io.netty.buffer.ByteBuf;
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.util.Codec;
+
+/**
+ * Replicate messages to shadow topic.
+ */
+@Slf4j
+public class ShadowReplicator extends PersistentReplicator {
+
+ public ShadowReplicator(String shadowTopic, PersistentTopic sourceTopic, ManagedCursor cursor,
+ BrokerService brokerService, PulsarClientImpl replicationClient)
+ throws PulsarServerException {
+ super(brokerService.pulsar().getConfiguration().getClusterName(), sourceTopic, cursor,
+ brokerService.pulsar().getConfiguration().getClusterName(), shadowTopic, brokerService,
+ replicationClient);
+ }
+
+ /**
+ * @return Producer name format : replicatorPrefix-localTopic-->remoteTopic
+ */
+ @Override
+ protected String getProducerName() {
+ return replicatorPrefix + "-" + localTopicName + REPL_PRODUCER_NAME_DELIMITER + remoteTopicName;
+ }
+
+ @Override
+ protected boolean replicateEntries(List<Entry> entries) {
+ boolean atLeastOneMessageSentForReplication = false;
+
+ try {
+ // This flag is set to true when we skip at least one local message,
+ // in order to skip remaining local messages.
+ boolean isLocalMessageSkippedOnce = false;
+ for (int i = 0; i < entries.size(); i++) {
+ Entry entry = entries.get(i);
+ int length = entry.getLength();
+ ByteBuf headersAndPayload = entry.getDataBuffer();
+ MessageImpl msg;
+ try {
+ msg = MessageImpl.deserializeSkipBrokerEntryMetaData(headersAndPayload);
+ } catch (Throwable t) {
+ log.error("[{}] Failed to deserialize message at {} (buffer size: {}): {}", replicatorId,
+ entry.getPosition(), length, t.getMessage(), t);
+ cursor.asyncDelete(entry.getPosition(), this, entry.getPosition());
+ entry.release();
+ continue;
+ }
+
+ if (msg.isExpired(messageTTLInSeconds)) {
+ msgExpired.recordEvent(0 /* no value stat */);
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Discarding expired message at position {}, replicateTo {}",
+ replicatorId, entry.getPosition(), msg.getReplicateTo());
+ }
+ cursor.asyncDelete(entry.getPosition(), this, entry.getPosition());
+ entry.release();
+ msg.recycle();
+ continue;
+ }
+
+ if (STATE_UPDATER.get(this) != State.Started || isLocalMessageSkippedOnce) {
+ // The producer is not ready yet after having stopped/restarted. Drop the message because it will
+ // recovered when the producer is ready
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Dropping read message at {} because producer is not ready",
+ replicatorId, entry.getPosition());
+ }
+ isLocalMessageSkippedOnce = true;
+ entry.release();
+ msg.recycle();
+ continue;
+ }
+
+ dispatchRateLimiter.ifPresent(rateLimiter -> rateLimiter.tryDispatchPermit(1, entry.getLength()));
+
+ // Increment pending messages for messages produced locally
+ PENDING_MESSAGES_UPDATER.incrementAndGet(this);
+
+ msgOut.recordEvent(headersAndPayload.readableBytes());
+
+ msg.setReplicatedFrom(localCluster);
+
+ msg.setMessageId(new MessageIdImpl(entry.getLedgerId(), entry.getEntryId(), -1));
+
+ headersAndPayload.retain();
+
+ producer.sendAsync(msg, ProducerSendCallback.create(this, entry, msg));
+ atLeastOneMessageSentForReplication = true;
+ }
+ } catch (Exception e) {
+ log.error("[{}] Unexpected exception: {}", replicatorId, e.getMessage(), e);
+ }
+ return atLeastOneMessageSentForReplication;
+ }
+
+ /**
+ * Cursor name fot this shadow replicator.
+ * @param replicatorPrefix
+ * @param shadowTopic
+ * @return
+ */
+ public static String getShadowReplicatorName(String replicatorPrefix, String shadowTopic) {
+ return replicatorPrefix + "-" + Codec.encode(shadowTopic);
+ }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowReplicatorTest.java
new file mode 100644
index 00000000000..822627f12b4
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowReplicatorTest.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.broker.service.persistent;
+
+import com.google.common.collect.Sets;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.assertj.core.util.Lists;
+import org.awaitility.Awaitility;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker-replication")
+public class ShadowReplicatorTest extends BrokerTestBase {
+
+ @BeforeClass(alwaysRun = true)
+ @Override
+ protected void setup() throws Exception {
+ super.baseSetup();
+ admin.tenants().createTenant("prop1",
+ new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet("test")));
+ admin.namespaces().createNamespace("prop1/ns-source");
+ admin.namespaces().createNamespace("prop1/ns-shadow");
+ }
+
+ @BeforeClass(alwaysRun = true)
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Test
+ public void testShadowReplication() throws Exception {
+ String sourceTopicName = "persistent://prop1/ns-source/source-topic";
+ String shadowTopicName = "persistent://prop1/ns-shadow/shadow-topic";
+ String shadowTopicName2 = "persistent://prop1/ns-shadow/shadow-topic-2";
+
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient.newProducer().topic(sourceTopicName).create();
+ // NOTE: shadow topic is not ready yet. So use normal topic instead.
+ // The only difference for consumer should be that the message id is changed.
+ @Cleanup
+ Consumer<byte[]> shadowConsumer =
+ pulsarClient.newConsumer().topic(shadowTopicName).subscriptionName("shadow-sub")
+ .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
+ @Cleanup
+ Consumer<byte[]> shadowConsumer2 =
+ pulsarClient.newConsumer().topic(shadowTopicName2).subscriptionName("shadow-sub")
+ .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
+
+ PersistentTopic sourceTopic =
+ (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(sourceTopicName).get().get();
+
+ admin.topics().setShadowTopics(sourceTopicName, Lists.newArrayList(shadowTopicName, shadowTopicName2));
+
+ Awaitility.await().untilAsserted(()->Assert.assertEquals(sourceTopic.getShadowReplicators().size(), 2));
+
+ ShadowReplicator
+ replicator = (ShadowReplicator) sourceTopic.getShadowReplicators().get(shadowTopicName);
+ Awaitility.await().untilAsserted(() ->
+ Assert.assertEquals(String.valueOf(replicator.getState()), "Started"));
+
+ @Cleanup
+ Consumer<byte[]> sourceConsumer =
+ pulsarClient.newConsumer().topic(sourceTopicName).subscriptionName("source-sub")
+ .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
+ byte[] data = "test-shadow-topic".getBytes(StandardCharsets.UTF_8);
+ MessageId sourceMessageId = producer.newMessage()
+ .sequenceId(1)
+ .key("K")
+ .property("PK", "PV")
+ .eventTime(123)
+ .value(data)
+ .send();
+
+ Message<byte[]> sourceMessage = sourceConsumer.receive();
+ Assert.assertEquals(sourceMessage.getMessageId(), sourceMessageId);
+
+ //Wait until msg is replicated to shadow topic.
+ Awaitility.await().until(() -> {
+ replicator.msgOut.calculateRate();
+ return replicator.msgOut.getCount() >= 1;
+ });
+ Awaitility.await().until(() -> PersistentReplicator.PENDING_MESSAGES_UPDATER.get(replicator) == 0);
+
+ PersistentTopic shadowTopic =
+ (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(shadowTopicName).get().get();
+ Assert.assertNotNull(shadowTopic);
+
+ Message<byte[]> shadowMessage = shadowConsumer.receive(5, TimeUnit.SECONDS);
+
+ Assert.assertEquals(shadowMessage.getData(), sourceMessage.getData());
+ Assert.assertEquals(shadowMessage.getSequenceId(), sourceMessage.getSequenceId());
+ Assert.assertEquals(shadowMessage.getEventTime(), sourceMessage.getEventTime());
+ Assert.assertEquals(shadowMessage.getProperties(), sourceMessage.getProperties());
+ Assert.assertEquals(shadowMessage.getKey(), sourceMessage.getKey());
+ Assert.assertEquals(shadowMessage.getOrderingKey(), sourceMessage.getOrderingKey());
+ Assert.assertEquals(shadowMessage.getSchemaVersion(), sourceMessage.getSchemaVersion());
+ Assert.assertEquals(shadowMessage.getPublishTime(), sourceMessage.getPublishTime());
+ Assert.assertEquals(shadowMessage.getBrokerPublishTime(), sourceMessage.getBrokerPublishTime());
+ Assert.assertEquals(shadowMessage.getIndex(), sourceMessage.getIndex());
+
+ //`replicatedFrom` is set as localClusterName in shadow topic.
+ Assert.assertNotEquals(shadowMessage.getReplicatedFrom(), sourceMessage.getReplicatedFrom());
+ //Currently, msg is copied in BK. So the message id is not the same.
+ Assert.assertNotEquals(shadowMessage.getMessageId(), sourceMessage.getMessageId());
+ }
+}
\ No newline at end of file
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java
index 618b3e539a9..924d4893578 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java
@@ -42,6 +42,7 @@ public class ConcurrentOpenHashMap<K, V> {
private static final Object EmptyKey = null;
private static final Object DeletedKey = new Object();
+ private static final ConcurrentOpenHashMap EmptyMap = new ConcurrentOpenHashMap<>(1, 1);
/**
* This object is used to delete empty value in this map.
@@ -173,6 +174,10 @@ public class ConcurrentOpenHashMap<K, V> {
}
}
+ public static <K, V> ConcurrentOpenHashMap<K, V> emptyMap() {
+ return (ConcurrentOpenHashMap<K, V>) EmptyMap;
+ }
+
long getUsedBucketCount() {
long usedBucketCount = 0;
for (Section<K, V> s : sections) {