You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ji...@apache.org on 2022/10/03 06:42:21 UTC
[pulsar] branch master updated: [feat][broker]PIP-180 Shadow Topic - Part V - Support shadow topic creation. (#17711)
This is an automated email from the ASF dual-hosted git repository.
jianghaiting pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 11482048d35 [feat][broker]PIP-180 Shadow Topic - Part V - Support shadow topic creation. (#17711)
11482048d35 is described below
commit 11482048d357ccb4e4f1802304a7dd0bfd7b9c26
Author: JiangHaiting <ji...@apache.org>
AuthorDate: Mon Oct 3 14:42:09 2022 +0800
[feat][broker]PIP-180 Shadow Topic - Part V - Support shadow topic creation. (#17711)
---
.../bookkeeper/mledger/ManagedLedgerConfig.java | 7 ++
.../bookkeeper/mledger/ManagedLedgerFactory.java | 5 ++
.../mledger/impl/ManagedLedgerFactoryImpl.java | 17 ++--
.../apache/bookkeeper/mledger/impl/MetaStore.java | 9 ++
.../bookkeeper/mledger/impl/MetaStoreImpl.java | 29 +++++++
.../mledger/impl/ShadowManagedLedgerImpl.java | 58 +++++++++++++
.../pulsar/broker/service/BrokerService.java | 32 ++++++-
.../broker/service/persistent/PersistentTopic.java | 12 +++
.../pulsar/broker/service/PersistentTopicTest.java | 2 +
.../broker/service/persistent/ShadowTopicTest.java | 97 ++++++++++++++++++++++
.../org/apache/pulsar/client/admin/Topics.java | 54 ++++++++++++
.../pulsar/client/admin/internal/TopicsImpl.java | 40 ++++++++-
.../pulsar/admin/cli/PulsarAdminToolTest.java | 19 ++++-
.../org/apache/pulsar/admin/cli/CmdTopics.java | 34 ++++++++
14 files changed, 405 insertions(+), 10 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
index 92c9c911981..0efd1ca2a82 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
@@ -32,6 +32,7 @@ import org.apache.bookkeeper.common.annotation.InterfaceAudience;
import org.apache.bookkeeper.common.annotation.InterfaceStability;
import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
+import org.apache.commons.collections4.MapUtils;
import org.apache.pulsar.common.util.collections.ConcurrentOpenLongPairRangeSet;
/**
@@ -742,4 +743,10 @@ public class ManagedLedgerConfig {
public void setMaxBacklogBetweenCursorsForCaching(int maxBacklogBetweenCursorsForCaching) {
this.maxBacklogBetweenCursorsForCaching = maxBacklogBetweenCursorsForCaching;
}
+
+ public String getShadowSource() {
+ return MapUtils.getString(properties, PROPERTY_SOURCE_TOPIC_KEY);
+ }
+
+ public static final String PROPERTY_SOURCE_TOPIC_KEY = "PULSAR.SHADOW_SOURCE";
}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java
index e42c2581ba1..667d641ac9a 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java
@@ -18,6 +18,7 @@
*/
package org.apache.bookkeeper.mledger;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import org.apache.bookkeeper.common.annotation.InterfaceAudience;
@@ -197,4 +198,8 @@ public interface ManagedLedgerFactory {
* */
long getCacheEvictionTimeThreshold();
+ /**
+ * @return properties of this managedLedger.
+ */
+ CompletableFuture<Map<String, String>> getManagedLedgerPropertiesAsync(String name);
}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index d7596a7468a..e0af1cc6326 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -367,11 +367,13 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
ledgers.computeIfAbsent(name, (mlName) -> {
// Create the managed ledger
CompletableFuture<ManagedLedgerImpl> future = new CompletableFuture<>();
- final ManagedLedgerImpl newledger = new ManagedLedgerImpl(this,
- bookkeeperFactory.get(
- new EnsemblePlacementPolicyConfig(config.getBookKeeperEnsemblePlacementPolicyClassName(),
- config.getBookKeeperEnsemblePlacementPolicyProperties())),
- store, config, scheduledExecutor, name, mlOwnershipChecker);
+ BookKeeper bk = bookkeeperFactory.get(
+ new EnsemblePlacementPolicyConfig(config.getBookKeeperEnsemblePlacementPolicyClassName(),
+ config.getBookKeeperEnsemblePlacementPolicyProperties()));
+ final ManagedLedgerImpl newledger = config.getShadowSource() == null
+ ? new ManagedLedgerImpl(this, bk, store, config, scheduledExecutor, name, mlOwnershipChecker)
+ : new ShadowManagedLedgerImpl(this, bk, store, config, scheduledExecutor, name,
+ mlOwnershipChecker);
PendingInitializeManagedLedger pendingLedger = new PendingInitializeManagedLedger(newledger);
pendingInitializeLedgers.put(name, pendingLedger);
newledger.initialize(new ManagedLedgerInitializeLedgerCallback() {
@@ -954,6 +956,11 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
return future;
}
+ @Override
+ public CompletableFuture<Map<String, String>> getManagedLedgerPropertiesAsync(String name) {
+ return store.getManagedLedgerPropertiesAsync(name);
+ }
+
public MetaStore getMetaStore() {
return store;
}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStore.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStore.java
index 35f109b21dc..3d60066782a 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStore.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStore.java
@@ -158,4 +158,13 @@ public interface MetaStore {
* if the operation succeeds.
*/
CompletableFuture<Boolean> asyncExists(String ledgerName);
+
+
+ /**
+ * Get managed ledger properties from meta store.
+ *
+ * @param name ledgerName
+ * @return a future represents the result of the operation.
+ */
+ CompletableFuture<Map<String, String>> getManagedLedgerPropertiesAsync(String name);
}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
index a501b9e43dc..2949902ac35 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
@@ -23,6 +23,8 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -148,6 +150,33 @@ public class MetaStoreImpl implements MetaStore {
});
}
+ public CompletableFuture<Map<String, String>> getManagedLedgerPropertiesAsync(String name) {
+ CompletableFuture<Map<String, String>> result = new CompletableFuture<>();
+ getManagedLedgerInfo(name, false, new MetaStoreCallback<>() {
+ @Override
+ public void operationComplete(MLDataFormats.ManagedLedgerInfo mlInfo, Stat stat) {
+ HashMap<String, String> propertiesMap = new HashMap<>(mlInfo.getPropertiesCount());
+ if (mlInfo.getPropertiesCount() > 0) {
+ for (int i = 0; i < mlInfo.getPropertiesCount(); i++) {
+ MLDataFormats.KeyValue property = mlInfo.getProperties(i);
+ propertiesMap.put(property.getKey(), property.getValue());
+ }
+ }
+ result.complete(propertiesMap);
+ }
+
+ @Override
+ public void operationFailed(MetaStoreException e) {
+ if (e instanceof MetadataNotFoundException) {
+ result.complete(Collections.emptyMap());
+ } else {
+ result.completeExceptionally(e);
+ }
+ }
+ });
+ return result;
+ }
+
@Override
public void asyncUpdateLedgerIds(String ledgerName, ManagedLedgerInfo mlInfo, Stat stat,
MetaStoreCallback<Void> callback) {
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java
new file mode 100644
index 00000000000..346780a2283
--- /dev/null
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.mledger.impl;
+
+import java.util.function.Supplier;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.pulsar.common.naming.TopicName;
+
+/**
+ * Working in progress until <a href="https://github.com/apache/pulsar/issues/16153">PIP-180</a> is finished.
+ * Currently, it works nothing different with ManagedLedgerImpl.
+ */
+@Slf4j
+public class ShadowManagedLedgerImpl extends ManagedLedgerImpl {
+
+ private final TopicName shadowSource;
+ private final String sourceMLName;
+
+ public ShadowManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper,
+ MetaStore store, ManagedLedgerConfig config,
+ OrderedScheduler scheduledExecutor,
+ String name, final Supplier<Boolean> mlOwnershipChecker) {
+ super(factory, bookKeeper, store, config, scheduledExecutor, name, mlOwnershipChecker);
+ this.shadowSource = TopicName.get(config.getShadowSource());
+ this.sourceMLName = shadowSource.getPersistenceNamingEncoding();
+ }
+
+ @Override
+ synchronized void initialize(ManagedLedgerInitializeLedgerCallback callback, Object ctx) {
+ // TODO: ShadowManagedLedger has different initialize process from normal ManagedLedger,
+ // which is complicated and will be implemented in the next PRs.
+ super.initialize(callback, ctx);
+ }
+
+ public TopicName getShadowSource() {
+ return shadowSource;
+ }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 8491615448a..2f6cb020b70 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1405,6 +1405,22 @@ public class BrokerService implements Closeable {
return topicFuture;
}
+ CompletableFuture<Map<String, String>> fetchTopicPropertiesAsync(TopicName topicName) {
+ if (!topicName.isPartitioned()) {
+ return managedLedgerFactory.getManagedLedgerPropertiesAsync(topicName.getPersistenceNamingEncoding());
+ } else {
+ TopicName partitionedTopicName = TopicName.get(topicName.getPartitionedTopicName());
+ return fetchPartitionedTopicMetadataAsync(partitionedTopicName)
+ .thenCompose(metadata -> {
+ if (metadata.partitions == PartitionedTopicMetadata.NON_PARTITIONED) {
+ return managedLedgerFactory.getManagedLedgerPropertiesAsync(
+ topicName.getPersistenceNamingEncoding());
+ }
+ return CompletableFuture.completedFuture(metadata.properties);
+ });
+ }
+ }
+
private void checkOwnershipAndCreatePersistentTopic(final String topic, boolean createIfMissing,
CompletableFuture<Optional<Topic>> topicFuture,
Map<String, String> properties) {
@@ -1412,7 +1428,21 @@ public class BrokerService implements Closeable {
pulsar.getNamespaceService().isServiceUnitActiveAsync(topicName)
.thenAccept(isActive -> {
if (isActive) {
- createPersistentTopic(topic, createIfMissing, topicFuture, properties);
+ CompletableFuture<Map<String, String>> propertiesFuture;
+ if (properties == null) {
+ //Read properties from storage when loading topic.
+ propertiesFuture = fetchTopicPropertiesAsync(topicName);
+ } else {
+ propertiesFuture = CompletableFuture.completedFuture(properties);
+ }
+ propertiesFuture.thenAccept(finalProperties ->
+ createPersistentTopic(topic, createIfMissing, topicFuture, finalProperties)
+ ).exceptionally(throwable -> {
+ log.warn("[{}] Read topic property failed", topic, throwable);
+ pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
+ topicFuture.completeExceptionally(throwable);
+ return null;
+ });
} else {
// namespace is being unloaded
String msg = String.format("Namespace is being unloaded, cannot add topic %s", topic);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index fb1c521bb00..a3193935e53 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -72,6 +72,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedCursorContainer;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.mledger.impl.ShadowManagedLedgerImpl;
import org.apache.bookkeeper.net.BookieId;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
@@ -176,6 +177,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
private final ConcurrentOpenHashMap<String/*ShadowTopic*/, Replicator> shadowReplicators;
@Getter
private volatile List<String> shadowTopics;
+ private final TopicName shadowSourceTopic;
static final String DEDUPLICATION_CURSOR_NAME = "pulsar.dedup";
private static final String TOPIC_EPOCH_PROPERTY_NAME = "pulsar.topic.epoch";
@@ -302,6 +304,11 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
this.transactionBuffer = new TransactionBufferDisable();
}
transactionBuffer.syncMaxReadPositionForNormalPublish((PositionImpl) ledger.getLastConfirmedEntry());
+ if (ledger instanceof ShadowManagedLedgerImpl) {
+ shadowSourceTopic = ((ShadowManagedLedgerImpl) ledger).getShadowSource();
+ } else {
+ shadowSourceTopic = null;
+ }
}
@Override
@@ -381,6 +388,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
} else {
this.transactionBuffer = new TransactionBufferDisable();
}
+ shadowSourceTopic = null;
}
private void initializeDispatchRateLimiterIfNeeded() {
@@ -3330,4 +3338,8 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
public long getLastDataMessagePublishedTimestamp() {
return lastDataMessagePublishedTimestamp;
}
+
+ public Optional<TopicName> getShadowSourceTopic() {
+ return Optional.ofNullable(shadowSourceTopic);
+ }
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 970bfd763a4..2771808a9fa 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -201,6 +201,8 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase {
doReturn(mlFactoryMock).when(pulsar).getManagedLedgerFactory();
doReturn(mock(PulsarClientImpl.class)).when(pulsar).getClient();
+ doAnswer(invocationOnMock -> CompletableFuture.completedFuture(null))
+ .when(mlFactoryMock).getManagedLedgerPropertiesAsync(any());
doAnswer(invocation -> {
DeleteLedgerCallback deleteLedgerCallback = invocation.getArgument(1);
deleteLedgerCallback.deleteLedgerComplete(null);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicTest.java
new file mode 100644
index 00000000000..22bfd70cf88
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicTest.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.broker.service.persistent;
+
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.impl.ShadowManagedLedgerImpl;
+import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.common.naming.TopicName;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class ShadowTopicTest extends BrokerTestBase {
+
+ @BeforeClass(alwaysRun = true)
+ @Override
+ protected void setup() throws Exception {
+ baseSetup();
+ }
+
+ @AfterClass(alwaysRun = true)
+ @Override
+ protected void cleanup() throws Exception {
+ internalCleanup();
+ }
+
+ @Test()
+ public void testNonPartitionedShadowTopicSetup() throws Exception {
+ String sourceTopic = "persistent://prop/ns-abc/source";
+ String shadowTopic = "persistent://prop/ns-abc/shadow";
+ //1. test shadow topic setting in topic creation.
+ admin.topics().createNonPartitionedTopic(sourceTopic);
+ admin.topics().createShadowTopic(shadowTopic, sourceTopic);
+ PersistentTopic brokerShadowTopic =
+ (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(shadowTopic).get().get();
+ Assert.assertTrue(brokerShadowTopic.getManagedLedger() instanceof ShadowManagedLedgerImpl);
+ Assert.assertEquals(brokerShadowTopic.getShadowSourceTopic().get().toString(), sourceTopic);
+ Assert.assertEquals(admin.topics().getShadowSource(shadowTopic), sourceTopic);
+
+ //2. test shadow topic could be properly loaded after unload.
+ admin.namespaces().unload("prop/ns-abc");
+ Assert.assertTrue(pulsar.getBrokerService().getTopicReference(shadowTopic).isEmpty());
+ Assert.assertEquals(admin.topics().getShadowSource(shadowTopic), sourceTopic);
+ brokerShadowTopic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(shadowTopic).get().get();
+ Assert.assertTrue(brokerShadowTopic.getManagedLedger() instanceof ShadowManagedLedgerImpl);
+ Assert.assertEquals(brokerShadowTopic.getShadowSourceTopic().get().toString(), sourceTopic);
+ }
+
+ @Test()
+ public void testPartitionedShadowTopicSetup() throws Exception {
+ String sourceTopic = "persistent://prop/ns-abc/source-p";
+ String shadowTopic = "persistent://prop/ns-abc/shadow-p";
+ String shadowTopicPartition = TopicName.get(shadowTopic).getPartition(0).toString();
+
+ //1. test shadow topic setting in topic creation.
+ admin.topics().createPartitionedTopic(sourceTopic, 2);
+ admin.topics().createShadowTopic(shadowTopic, sourceTopic);
+ pulsarClient.newProducer().topic(shadowTopic).create().close();//trigger loading partitions.
+ PersistentTopic brokerShadowTopic = (PersistentTopic) pulsar.getBrokerService()
+ .getTopicIfExists(shadowTopicPartition).get().get();
+ Assert.assertTrue(brokerShadowTopic.getManagedLedger() instanceof ShadowManagedLedgerImpl);
+ Assert.assertEquals(brokerShadowTopic.getShadowSourceTopic().get().toString(), sourceTopic);
+ Assert.assertEquals(admin.topics().getShadowSource(shadowTopic), sourceTopic);
+
+ //2. test shadow topic could be properly loaded after unload.
+ admin.namespaces().unload("prop/ns-abc");
+ Assert.assertTrue(pulsar.getBrokerService().getTopicReference(shadowTopic).isEmpty());
+
+ Assert.assertEquals(admin.topics().getShadowSource(shadowTopic), sourceTopic);
+ brokerShadowTopic =
+ (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(shadowTopicPartition).get().get();
+ Assert.assertTrue(brokerShadowTopic.getManagedLedger() instanceof ShadowManagedLedgerImpl);
+ Assert.assertEquals(brokerShadowTopic.getShadowSourceTopic().get().toString(), sourceTopic);
+ }
+
+
+}
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
index 177cae9a9a4..d2d0754f1d1 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -4397,4 +4397,58 @@ public interface Topics {
* @param sourceTopic source topic name
*/
CompletableFuture<List<String>> getShadowTopicsAsync(String sourceTopic);
+
+ /**
+ * Get the shadow source topic name of the given shadow topic.
+ * @param shadowTopic shadow topic name.
+ * @return The topic name of the source of the shadow topic.
+ */
+ String getShadowSource(String shadowTopic) throws PulsarAdminException;
+
+ /**
+ * Get the shadow source topic name of the given shadow topic asynchronously.
+ * @param shadowTopic shadow topic name.
+ * @return The topic name of the source of the shadow topic.
+ */
+ CompletableFuture<String> getShadowSourceAsync(String shadowTopic);
+
+ /**
+ * Create a new shadow topic as the shadow of the source topic.
+ * The source topic must exist before call this method.
+ * <p>
+ * For partitioned source topic, the partition number of shadow topic follows the source topic at creation. If
+ * the partition number of the source topic changes, the shadow topic needs to update its partition number
+ * manually.
+ * For non-partitioned source topic, the shadow topic will be created as non-partitioned topic.
+ * </p>
+ *
+ * NOTE: This is still WIP until <a href="https://github.com/apache/pulsar/issues/16153">PIP-180</a> is finished.
+ *
+ * @param shadowTopic shadow topic name, and it must be a persistent topic name.
+ * @param sourceTopic source topic name, and it must be a persistent topic name.
+ * @param properties properties to be created with in the shadow topic.
+ * @throws PulsarAdminException
+ */
+ void createShadowTopic(String shadowTopic, String sourceTopic, Map<String, String> properties)
+ throws PulsarAdminException;
+
+ /**
+ * Create a new shadow topic, see #{@link #createShadowTopic(String, String, Map)} for details.
+ */
+ CompletableFuture<Void> createShadowTopicAsync(String shadowTopic, String sourceTopic,
+ Map<String, String> properties);
+
+ /**
+ * Create a new shadow topic, see #{@link #createShadowTopic(String, String, Map)} for details.
+ */
+ default void createShadowTopic(String shadowTopic, String sourceTopic) throws PulsarAdminException {
+ createShadowTopic(shadowTopic, sourceTopic, null);
+ }
+
+ /**
+ * Create a new shadow topic, see #{@link #createShadowTopic(String, String, Map)} for details.
+ */
+ default CompletableFuture<Void> createShadowTopicAsync(String shadowTopic, String sourceTopic) {
+ return createShadowTopicAsync(shadowTopic, sourceTopic, null);
+ }
}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index e3b51accdfd..4312080ac22 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -131,6 +131,8 @@ public class TopicsImpl extends BaseResource implements Topics {
private static final String ENCRYPTION_KEYS = "X-Pulsar-Base64-encryption-keys";
// CHECKSTYLE.ON: MemberName
+ public static final String PROPERTY_SHADOW_SOURCE_KEY = "PULSAR.SHADOW_SOURCE";
+
public TopicsImpl(WebTarget web, Authentication auth, long readTimeoutMs) {
super(auth, readTimeoutMs);
adminTopics = web.path("/admin");
@@ -2705,7 +2707,43 @@ public class TopicsImpl extends BaseResource implements Topics {
public CompletableFuture<List<String>> getShadowTopicsAsync(String sourceTopic) {
TopicName tn = validateTopic(sourceTopic);
WebTarget path = topicPath(tn, "shadowTopics");
- return asyncGetRequest(path, new FutureCallback<List<String>>(){});
+ return asyncGetRequest(path, new FutureCallback<List<String>>() {});
+ }
+
+ @Override
+ public String getShadowSource(String shadowTopic) throws PulsarAdminException {
+ return sync(() -> getShadowSourceAsync(shadowTopic));
+ }
+
+ @Override
+ public CompletableFuture<String> getShadowSourceAsync(String shadowTopic) {
+ return getPropertiesAsync(shadowTopic).thenApply(
+ properties -> properties != null ? properties.get(PROPERTY_SHADOW_SOURCE_KEY) : null);
+ }
+
+ @Override
+ public void createShadowTopic(String shadowTopic, String sourceTopic, Map<String, String> properties)
+ throws PulsarAdminException {
+ sync(() -> createShadowTopicAsync(shadowTopic, sourceTopic, properties));
+ }
+
+ @Override
+ public CompletableFuture<Void> createShadowTopicAsync(String shadowTopic, String sourceTopic,
+ Map<String, String> properties) {
+ checkArgument(TopicName.get(shadowTopic).isPersistent(), "Shadow topic must be persistent");
+ checkArgument(TopicName.get(sourceTopic).isPersistent(), "Source topic must be persistent");
+ return getPartitionedTopicMetadataAsync(sourceTopic).thenCompose(sourceTopicMeta -> {
+ HashMap<String, String> shadowProperties = new HashMap<>();
+ if (properties != null) {
+ shadowProperties.putAll(properties);
+ }
+ shadowProperties.put(PROPERTY_SHADOW_SOURCE_KEY, sourceTopic);
+ if (sourceTopicMeta.partitions == PartitionedTopicMetadata.NON_PARTITIONED) {
+ return createNonPartitionedTopicAsync(shadowTopic, shadowProperties);
+ } else {
+ return createPartitionedTopicAsync(shadowTopic, sourceTopicMeta.partitions, shadowProperties);
+ }
+ });
}
private static final Logger log = LoggerFactory.getLogger(TopicsImpl.class);
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index b9cfbe8c97e..bcb1fca1705 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -34,7 +34,6 @@ import static org.testng.Assert.fail;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
-
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.PrintStream;
@@ -51,7 +50,6 @@ import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
-
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.admin.cli.utils.SchemaExtractor;
import org.apache.pulsar.client.admin.Bookies;
@@ -74,7 +72,6 @@ import org.apache.pulsar.client.admin.TopicPolicies;
import org.apache.pulsar.client.admin.Topics;
import org.apache.pulsar.client.admin.Transactions;
import org.apache.pulsar.client.admin.internal.OffloadProcessStatusImpl;
-import org.apache.pulsar.client.admin.internal.PulsarAdminBuilderImpl;
import org.apache.pulsar.client.admin.internal.PulsarAdminImpl;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.SubscriptionType;
@@ -1946,6 +1943,22 @@ public class PulsarAdminToolTest {
cmdTopics.run(split("remove-shadow-topics persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).removeShadowTopics("persistent://myprop/clust/ns1/ds1");
+ cmdTopics.run(split("create-shadow-topic -s persistent://myprop/clust/ns1/source persistent://myprop/clust/ns1/ds1"));
+ verify(mockTopics).createShadowTopic("persistent://myprop/clust/ns1/ds1", "persistent://myprop/clust/ns1/source", null);
+
+ cmdTopics = new CmdTopics(() -> admin);
+ cmdTopics.run(split("create-shadow-topic -p a=aa,b=bb,c=cc -s persistent://myprop/clust/ns1/source persistent://myprop/clust/ns1/ds1"));
+ HashMap<String, String> p = new HashMap<>();
+ p.put("a","aa");
+ p.put("b","bb");
+ p.put("c","cc");
+ verify(mockTopics).createShadowTopic("persistent://myprop/clust/ns1/ds1", "persistent://myprop/clust/ns1/source", p);
+
+ cmdTopics.run(split("get-shadow-source persistent://myprop/clust/ns1/ds1"));
+ verify(mockTopics).getShadowSource("persistent://myprop/clust/ns1/ds1");
+
+
+
}
private static LedgerInfo newLedger(long id, long entries, long size) {
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index ae37a591bae..245cfc1b855 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -255,6 +255,8 @@ public class CmdTopics extends CmdBase {
jcommander.addCommand("get-shadow-topics", new GetShadowTopics());
jcommander.addCommand("set-shadow-topics", new SetShadowTopics());
jcommander.addCommand("remove-shadow-topics", new RemoveShadowTopics());
+ jcommander.addCommand("create-shadow-topic", new CreateShadowTopic());
+ jcommander.addCommand("get-shadow-source", new GetShadowSource());
jcommander.addCommand("get-schema-validation-enforce", new GetSchemaValidationEnforced());
jcommander.addCommand("set-schema-validation-enforce", new SetSchemaValidationEnforced());
@@ -1714,6 +1716,38 @@ public class CmdTopics extends CmdBase {
}
}
+ @Parameters(commandDescription = "Create a shadow topic for an existing source topic.")
+ private class CreateShadowTopic extends CliCommand {
+
+ @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+ private java.util.List<String> params;
+
+ @Parameter(names = {"--source", "-s"}, description = "source topic name", required = true)
+ private String sourceTopic;
+
+ @Parameter(names = {"--properties", "-p"}, description = "key value pair properties(eg: a=a b=b c=c)")
+ private java.util.List<String> propertyList;
+
+ @Override
+ void run() throws Exception {
+ String topic = validateTopicName(params);
+ Map<String, String> properties = parseListKeyValueMap(propertyList);
+ getTopics().createShadowTopic(topic, TopicName.get(sourceTopic).toString(), properties);
+ }
+ }
+
+ @Parameters(commandDescription = "Get the source topic for a shadow topic")
+ private class GetShadowSource extends CliCommand {
+ @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+ private java.util.List<String> params;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String shadowTopic = validatePersistentTopic(params);
+ print(getTopics().getShadowSource(shadowTopic));
+ }
+ }
+
@Parameters(commandDescription = "Get the delayed delivery policy for a topic")
private class GetDelayedDelivery extends CliCommand {
@Parameter(description = "tenant/namespace/topic", required = true)