You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2021/11/09 11:55:38 UTC
[pulsar] branch branch-2.9 updated: Fix call sync method in an
async callback when enabling geo replicator. (#12590)
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new a8a207b Fix call sync method in an async callback when enabling geo replicator. (#12590)
a8a207b is described below
commit a8a207b1ae11564b98e3332204c0b20cb6c91118
Author: lipenghui <pe...@apache.org>
AuthorDate: Sat Nov 6 22:06:18 2021 +0800
Fix call sync method in an async callback when enabling geo replicator. (#12590)
After enabled geo-replication, we are not able to produce messages to a partitoned non-persistent topic.
Reproduce step:
1. Start a geo-replication instance with 2 clusters, I have 3 brokers for each cluster
2. Create a non-persistent partitioned topic such as 10 partitions
3. Use pulsar-perf to publish messages to the partitioned topic
4. Verify if the message produce throughput is 0, such as `bin/pulsar-perf produce -s 2048 -r 100 -bm 1 non-persistent://public/default/test`
The root cause is there are 2 places calling a sync method in the async method callback.
So the fix is:
1. Async the `validatePartitionedTopicAsync` method
2. Avoid call get cluster sync method when getting the replication client
Integration tests added.
(cherry picked from commit a2b7cae3cfe8d3776483b1ecaf69af47949b70e1)
---
.../pulsar/broker/resources/BaseResources.java | 4 +
.../broker/resources/NamespaceResources.java | 11 +-
.../apache/pulsar/broker/admin/AdminResource.java | 31 ++++--
.../broker/admin/impl/PersistentTopicsBase.java | 11 +-
.../pulsar/broker/service/AbstractReplicator.java | 31 +++---
.../pulsar/broker/service/BrokerService.java | 56 +++++++----
.../nonpersistent/NonPersistentReplicator.java | 7 +-
.../service/nonpersistent/NonPersistentTopic.java | 53 +++++-----
.../service/persistent/PersistentReplicator.java | 8 +-
.../broker/service/persistent/PersistentTopic.java | 85 ++++++++--------
.../pulsar/broker/service/PersistentTopicTest.java | 10 +-
.../service/ReplicatorRemoveClusterTest.java | 4 +-
.../pulsar/broker/service/ReplicatorTest.java | 4 +-
.../integration/messaging/GeoReplicationTest.java | 112 +++++++++++++++++++++
.../integration/topologies/PulsarCluster.java | 59 +++++++----
.../integration/topologies/PulsarGeoCluster.java | 82 +++++++++++++++
.../topologies/PulsarGeoClusterTestBase.java | 92 +++++++++++++++++
.../integration/topologies/PulsarTestBase.java | 9 ++
.../src/test/resources/pulsar-messaging.xml | 1 +
19 files changed, 516 insertions(+), 154 deletions(-)
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java
index 9061dd7..1e463fa 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java
@@ -167,6 +167,10 @@ public class BaseResources<T> {
}
}
+ protected CompletableFuture<Boolean> existsAsync(String path) {
+ return cache.exists(path);
+ }
+
public int getOperationTimeoutSec() {
return operationTimeoutSec;
}
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
index dcba2ac..2beeab8 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
@@ -18,14 +18,11 @@
*/
package org.apache.pulsar.broker.resources;
-import static org.apache.pulsar.common.policies.path.PolicyPath.path;
import com.fasterxml.jackson.core.type.TypeReference;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.Getter;
@@ -39,7 +36,6 @@ import org.apache.pulsar.common.policies.data.NamespaceIsolationDataImpl;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
import org.apache.pulsar.common.util.Codec;
-import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreException;
@@ -122,7 +118,7 @@ public class NamespaceResources extends BaseResources<Policies> {
}
public static boolean pathIsFromNamespace(String path) {
- return path.startsWith(BASE_POLICIES_PATH);
+ return path.startsWith(BASE_POLICIES_PATH) && path.substring(BASE_POLICIES_PATH.length() + 1).contains("/");
}
public static NamespaceName namespaceFromPath(String path) {
@@ -208,6 +204,11 @@ public class NamespaceResources extends BaseResources<Policies> {
tn.getEncodedLocalName()));
}
+ public CompletableFuture<Boolean> partitionedTopicExistsAsync(TopicName tn) {
+ return existsAsync(joinPath(PARTITIONED_TOPIC_PATH, tn.getNamespace(), tn.getDomain().value(),
+ tn.getEncodedLocalName()));
+ }
+
public CompletableFuture<Void> deletePartitionedTopicAsync(TopicName tn) {
return deleteAsync(joinPath(PARTITIONED_TOPIC_PATH, tn.getNamespace(), tn.getDomain().value(),
tn.getEncodedLocalName()));
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 625f419..f3a94d2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -632,10 +632,7 @@ public abstract class AdminResource extends PulsarWebResource {
return;
}
- List<CompletableFuture<Void>> createFutureList = new ArrayList<>();
-
CompletableFuture<Void> createLocalFuture = new CompletableFuture<>();
- createFutureList.add(createLocalFuture);
checkTopicExistsAsync(topicName).thenAccept(exists -> {
if (exists) {
log.warn("[{}] Failed to create already existing topic {}", clientAppId(), topicName);
@@ -658,7 +655,13 @@ public abstract class AdminResource extends PulsarWebResource {
return null;
});
- FutureUtil.waitForAll(createFutureList).whenComplete((ignored, ex) -> {
+ List<String> replicatedClusters = new ArrayList<>();
+ if (!createLocalTopicOnly && topicName.isGlobal() && isNamespaceReplicated(namespaceName)) {
+ getNamespaceReplicatedClusters(namespaceName)
+ .stream().filter(cluster -> !cluster.equals(pulsar().getConfiguration().getClusterName()))
+ .forEach(replicatedClusters::add);
+ }
+ createLocalFuture.whenComplete((ignored, ex) -> {
if (ex != null) {
log.error("[{}] Failed to create partitions for topic {}", clientAppId(), topicName, ex.getCause());
if (ex.getCause() instanceof RestException) {
@@ -669,14 +672,20 @@ public abstract class AdminResource extends PulsarWebResource {
return;
}
- if (!createLocalTopicOnly && topicName.isGlobal() && isNamespaceReplicated(namespaceName)) {
- getNamespaceReplicatedClusters(namespaceName)
- .stream()
- .filter(cluster -> !cluster.equals(pulsar().getConfiguration().getClusterName()))
- .forEach(cluster -> createFutureList.add(
- ((TopicsImpl) pulsar().getBrokerService().getClusterPulsarAdmin(cluster).topics())
+ if (!replicatedClusters.isEmpty()) {
+ replicatedClusters.forEach(cluster -> {
+ pulsar().getPulsarResources().getClusterResources().getClusterAsync(cluster)
+ .thenAccept(clusterDataOp -> {
+ ((TopicsImpl) pulsar().getBrokerService()
+ .getClusterPulsarAdmin(cluster, clusterDataOp).topics())
.createPartitionedTopicAsync(
- topicName.getPartitionedTopicName(), numPartitions, true)));
+ topicName.getPartitionedTopicName(), numPartitions, true);
+ })
+ .exceptionally(throwable -> {
+ log.error("Failed to create partition topic in cluster {}.", cluster, throwable);
+ return null;
+ });
+ });
}
log.info("[{}] Successfully created partitions for topic {} in cluster {}",
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 0271abd..a6d8f2c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -508,9 +508,14 @@ public class PersistentTopicsBase extends AdminResource {
if (cluster.equals(pulsar().getConfig().getClusterName())) {
return;
}
- results.add(pulsar().getBrokerService().getClusterPulsarAdmin(cluster).topics()
- .updatePartitionedTopicAsync(topicName.toString(),
- numPartitions, true, false));
+ CompletableFuture<Void> updatePartitionTopicFuture =
+ pulsar().getPulsarResources().getClusterResources().getClusterAsync(cluster)
+ .thenApply(clusterDataOp ->
+ pulsar().getBrokerService().getClusterPulsarAdmin(cluster, clusterDataOp))
+ .thenCompose(pulsarAdmin ->
+ pulsarAdmin.topics().updatePartitionedTopicAsync(
+ topicName.toString(), numPartitions, true, false));
+ results.add(updatePartitionTopicFuture);
});
return FutureUtil.waitForAll(results);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
index b7e85da..59ec74f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
@@ -32,6 +32,7 @@ import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -64,14 +65,14 @@ public abstract class AbstractReplicator {
}
public AbstractReplicator(String topicName, String replicatorPrefix, String localCluster, String remoteCluster,
- BrokerService brokerService) throws NamingException, PulsarServerException {
- validatePartitionedTopic(topicName, brokerService);
+ BrokerService brokerService, PulsarClientImpl replicationClient)
+ throws PulsarServerException {
this.brokerService = brokerService;
this.topicName = topicName;
this.replicatorPrefix = replicatorPrefix;
this.localCluster = localCluster.intern();
this.remoteCluster = remoteCluster.intern();
- this.replicationClient = (PulsarClientImpl) brokerService.getReplicationClient(remoteCluster);
+ this.replicationClient = replicationClient;
this.client = (PulsarClientImpl) brokerService.pulsar().getClient();
this.producer = null;
this.producerQueueSize = brokerService.pulsar().getConfiguration().getReplicationProducerQueueSize();
@@ -242,20 +243,18 @@ public abstract class AbstractReplicator {
* @param topic
* @param brokerService
*/
- private void validatePartitionedTopic(String topic, BrokerService brokerService) throws NamingException {
+ public static CompletableFuture<Void> validatePartitionedTopicAsync(String topic, BrokerService brokerService) {
TopicName topicName = TopicName.get(topic);
- boolean isPartitionedTopic = false;
- try {
- isPartitionedTopic =
- brokerService.pulsar().getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
- .partitionedTopicExists(topicName);
- } catch (Exception e) {
- log.warn("Failed to verify partitioned topic {}-{}", topicName, e.getMessage());
- }
- if (isPartitionedTopic) {
- throw new NamingException(
- topicName + " is a partitioned-topic and replication can't be started for partitioned-producer ");
- }
+ return brokerService.pulsar().getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
+ .partitionedTopicExistsAsync(topicName).thenCompose(isPartitionedTopic -> {
+ if (isPartitionedTopic) {
+ String s = topicName
+ + " is a partitioned-topic and replication can't be started for partitioned-producer ";
+ log.error(s);
+ return FutureUtil.failedFuture(new NamingException(s));
+ }
+ return CompletableFuture.completedFuture(null);
+ });
}
private static final Logger log = LoggerFactory.getLogger(AbstractReplicator.class);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index c3a241d..456af1f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -994,6 +994,7 @@ public class BrokerService implements Closeable {
}
private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String topic) {
+ CompletableFuture<Optional<Topic>> topicFuture = new CompletableFuture<>();
if (!pulsar.getConfiguration().isEnableNonPersistentTopics()) {
if (log.isDebugEnabled()) {
log.debug("Broker is unable to load non-persistent topic {}", topic);
@@ -1003,27 +1004,40 @@ public class BrokerService implements Closeable {
}
final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
NonPersistentTopic nonPersistentTopic = new NonPersistentTopic(topic, this);
-
- CompletableFuture<Optional<Topic>> future = nonPersistentTopic.initialize()
- .thenCompose(__ -> nonPersistentTopic.checkReplication())
- .thenApply(__ -> {
- log.info("Created topic {}", nonPersistentTopic);
- long topicLoadLatencyMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) - topicCreateTimeMs;
- pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs);
- addTopicToStatsMaps(TopicName.get(topic), nonPersistentTopic);
- return Optional.of(nonPersistentTopic);
- });
-
- future.exceptionally((ex) -> {
- log.warn("Replication check failed. Removing topic from topics list {}, {}", topic, ex);
- nonPersistentTopic.stopReplProducers().whenComplete((v, exception) -> {
- pulsar.getExecutor().execute(() -> topics.remove(topic, future));
+ CompletableFuture<Void> isOwner = checkTopicNsOwnership(topic);
+ isOwner.thenRun(() -> {
+ nonPersistentTopic.initialize()
+ .thenCompose(__ -> nonPersistentTopic.checkReplication())
+ .thenRun(() -> {
+ log.info("Created topic {}", nonPersistentTopic);
+ long topicLoadLatencyMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) - topicCreateTimeMs;
+ pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs);
+ addTopicToStatsMaps(TopicName.get(topic), nonPersistentTopic);
+ topicFuture.complete(Optional.of(nonPersistentTopic));
+ }).exceptionally(ex -> {
+ log.warn("Replication check failed. Removing topic from topics list {}, {}", topic, ex.getCause());
+ nonPersistentTopic.stopReplProducers().whenComplete((v, exception) -> {
+ pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
+ topicFuture.completeExceptionally(ex);
+ });
+ return null;
});
-
+ }).exceptionally(e -> {
+ log.warn("CheckTopicNsOwnership fail when createNonPersistentTopic! {}", topic, e.getCause());
+ // CheckTopicNsOwnership fail dont create nonPersistentTopic, when topic do lookup will find the correct
+ // broker. When client get non-persistent-partitioned topic
+ // metadata will the non-persistent-topic will be created.
+ // so we should add checkTopicNsOwnership logic otherwise the topic will be created
+ // if it dont own by this broker,we should return success
+ // otherwise it will keep retrying getPartitionedTopicMetadata
+ topicFuture.complete(Optional.of(nonPersistentTopic));
+ // after get metadata return success, we should delete this topic from this broker, because this topic not
+ // owner by this broker and it don't initialize and checkReplication
+ pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
return null;
});
- return future;
+ return topicFuture;
}
private <T> CompletableFuture<T> futureWithDeadline() {
@@ -1031,7 +1045,7 @@ public class BrokerService implements Closeable {
() -> FUTURE_DEADLINE_TIMEOUT_EXCEPTION);
}
- public PulsarClient getReplicationClient(String cluster) {
+ public PulsarClient getReplicationClient(String cluster, Optional<ClusterData> clusterDataOp) {
PulsarClient client = replicationClients.get(cluster);
if (client != null) {
return client;
@@ -1039,7 +1053,7 @@ public class BrokerService implements Closeable {
return replicationClients.computeIfAbsent(cluster, key -> {
try {
- ClusterData data = pulsar.getPulsarResources().getClusterResources().getCluster(cluster)
+ ClusterData data = clusterDataOp
.orElseThrow(() -> new MetadataStoreException.NotFoundException(cluster));
ClientBuilder clientBuilder = PulsarClient.builder()
.enableTcpNoDelay(false)
@@ -1106,14 +1120,14 @@ public class BrokerService implements Closeable {
}
}
- public PulsarAdmin getClusterPulsarAdmin(String cluster) {
+ public PulsarAdmin getClusterPulsarAdmin(String cluster, Optional<ClusterData> clusterDataOp) {
PulsarAdmin admin = clusterAdmins.get(cluster);
if (admin != null) {
return admin;
}
return clusterAdmins.computeIfAbsent(cluster, key -> {
try {
- ClusterData data = pulsar.getPulsarResources().getClusterResources().getCluster(cluster)
+ ClusterData data = clusterDataOp
.orElseThrow(() -> new MetadataStoreException.NotFoundException(cluster));
ServiceConfiguration conf = pulsar.getConfig();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
index dd57fd9..ce1fe34 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
@@ -28,13 +28,13 @@ import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.service.AbstractReplicator;
import org.apache.pulsar.broker.service.BrokerService;
-import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
import org.apache.pulsar.broker.service.Replicator;
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.SendCallback;
import org.apache.pulsar.common.policies.data.stats.NonPersistentReplicatorStatsImpl;
import org.apache.pulsar.common.stats.Rate;
@@ -49,8 +49,9 @@ public class NonPersistentReplicator extends AbstractReplicator implements Repli
private final NonPersistentReplicatorStatsImpl stats = new NonPersistentReplicatorStatsImpl();
public NonPersistentReplicator(NonPersistentTopic topic, String localCluster, String remoteCluster,
- BrokerService brokerService) throws NamingException, PulsarServerException {
- super(topic.getName(), topic.getReplicatorPrefix(), localCluster, remoteCluster, brokerService);
+ BrokerService brokerService, PulsarClientImpl replicationClient) throws PulsarServerException {
+ super(topic.getName(), topic.getReplicatorPrefix(), localCluster, remoteCluster, brokerService,
+ replicationClient);
producerBuilder.blockIfQueueFull(false);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index c1687a3..2c62f2e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -32,13 +32,14 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.LongAdder;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.resources.NamespaceResources;
+import org.apache.pulsar.broker.service.AbstractReplicator;
import org.apache.pulsar.broker.service.AbstractTopic;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException;
@@ -61,6 +62,7 @@ import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
import org.apache.pulsar.broker.stats.NamespaceStats;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
@@ -528,14 +530,7 @@ public class NonPersistentTopic extends AbstractTopic implements Topic {
}
if (!replicators.containsKey(cluster)) {
- if (!startReplicator(cluster)) {
- // it happens when global topic is a partitioned topic and replicator can't start on
- // original
- // non partitioned-topic (topic without partition prefix)
- return FutureUtil
- .failedFuture(new NamingException(
- topic + " failed to start replicator for " + cluster));
- }
+ futures.add(startReplicator(cluster));
}
}
@@ -553,29 +548,35 @@ public class NonPersistentTopic extends AbstractTopic implements Topic {
}
- boolean startReplicator(String remoteCluster) {
+ CompletableFuture<Void> startReplicator(String remoteCluster) {
log.info("[{}] Starting replicator to remote: {}", topic, remoteCluster);
String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
return addReplicationCluster(remoteCluster, NonPersistentTopic.this, localCluster);
}
- protected boolean addReplicationCluster(String remoteCluster, NonPersistentTopic nonPersistentTopic,
+ protected CompletableFuture<Void> addReplicationCluster(String remoteCluster, NonPersistentTopic nonPersistentTopic,
String localCluster) {
- AtomicBoolean isReplicatorStarted = new AtomicBoolean(true);
- replicators.computeIfAbsent(remoteCluster, r -> {
- try {
- return new NonPersistentReplicator(NonPersistentTopic.this, localCluster, remoteCluster, brokerService);
- } catch (NamingException | PulsarServerException e) {
- isReplicatorStarted.set(false);
- log.error("[{}] Replicator startup failed due to partitioned-topic {}", topic, remoteCluster);
- }
- return null;
- });
- // clean up replicator if startup is failed
- if (!isReplicatorStarted.get()) {
- replicators.remove(remoteCluster);
- }
- return isReplicatorStarted.get();
+ return AbstractReplicator.validatePartitionedTopicAsync(nonPersistentTopic.getName(), brokerService)
+ .thenCompose(__ -> brokerService.pulsar().getPulsarResources().getClusterResources()
+ .getClusterAsync(remoteCluster)
+ .thenApply(clusterData ->
+ brokerService.getReplicationClient(remoteCluster, clusterData)))
+ .thenAccept(replicationClient -> {
+ replicators.computeIfAbsent(remoteCluster, r -> {
+ try {
+ return new NonPersistentReplicator(NonPersistentTopic.this, localCluster,
+ remoteCluster, brokerService, (PulsarClientImpl) replicationClient);
+ } catch (PulsarServerException e) {
+ log.error("[{}] Replicator startup failed {}", topic, remoteCluster, e);
+ }
+ return null;
+ });
+
+ // clean up replicator if startup is failed
+ if (replicators.containsKey(remoteCluster) && replicators.get(remoteCluster) == null) {
+ replicators.remove(remoteCluster);
+ }
+ });
}
CompletableFuture<Void> removeReplicator(String remoteCluster) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index aa53e15..4d79c9a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -45,7 +45,6 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.service.AbstractReplicator;
import org.apache.pulsar.broker.service.BrokerService;
-import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException;
import org.apache.pulsar.broker.service.Replicator;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type;
@@ -54,6 +53,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.SendCallback;
import org.apache.pulsar.common.api.proto.MarkerType;
import org.apache.pulsar.common.policies.data.Policies;
@@ -105,8 +105,10 @@ public class PersistentReplicator extends AbstractReplicator
private final ReplicatorStatsImpl stats = new ReplicatorStatsImpl();
public PersistentReplicator(PersistentTopic topic, ManagedCursor cursor, String localCluster, String remoteCluster,
- BrokerService brokerService) throws NamingException, PulsarServerException {
- super(topic.getName(), topic.getReplicatorPrefix(), localCluster, remoteCluster, brokerService);
+ BrokerService brokerService, PulsarClientImpl replicationClient)
+ throws PulsarServerException {
+ super(topic.getName(), topic.getReplicatorPrefix(), localCluster, remoteCluster, brokerService,
+ replicationClient);
this.topic = topic;
this.cursor = cursor;
this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 64e4b7f..f74060c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -76,6 +76,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.resources.NamespaceResources.PartitionedTopicResources;
+import org.apache.pulsar.broker.service.AbstractReplicator;
import org.apache.pulsar.broker.service.AbstractTopic;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException;
@@ -116,6 +117,7 @@ import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
@@ -265,20 +267,8 @@ public class PersistentTopic extends AbstractTopic
this.compactedTopic = new CompactedTopicImpl(brokerService.pulsar().getBookKeeperClient());
for (ManagedCursor cursor : ledger.getCursors()) {
- if (cursor.getName().startsWith(replicatorPrefix)) {
- String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
- String remoteCluster = PersistentReplicator.getRemoteCluster(cursor.getName());
- boolean isReplicatorStarted = false;
- try {
- isReplicatorStarted = addReplicationCluster(remoteCluster, cursor, localCluster);
- } catch (Exception e) {
- log.warn("[{}] failed to start replication", topic, e);
- }
- if (!isReplicatorStarted) {
- throw new NamingException(
- PersistentTopic.this.getName() + " Failed to start replicator " + remoteCluster);
- }
- } else if (cursor.getName().equals(DEDUPLICATION_CURSOR_NAME)) {
+ if (cursor.getName().equals(DEDUPLICATION_CURSOR_NAME)
+ || cursor.getName().startsWith(replicatorPrefix)) {
// This is not a regular subscription, we are going to
// ignore it for now and let the message dedup logic to take care of it
} else {
@@ -309,7 +299,16 @@ public class PersistentTopic extends AbstractTopic
@Override
public CompletableFuture<Void> initialize() {
- return brokerService.pulsar().getPulsarResources().getNamespaceResources()
+ List<CompletableFuture<Void>> futures = new ArrayList<>();
+ for (ManagedCursor cursor : ledger.getCursors()) {
+ if (cursor.getName().startsWith(replicatorPrefix)) {
+ String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
+ String remoteCluster = PersistentReplicator.getRemoteCluster(cursor.getName());
+ futures.add(addReplicationCluster(remoteCluster, cursor, localCluster));
+ }
+ }
+ return FutureUtil.waitForAll(futures).thenCompose(__ ->
+ brokerService.pulsar().getPulsarResources().getNamespaceResources()
.getPoliciesAsync(TopicName.get(topic).getNamespaceObject())
.thenAccept(optPolicies -> {
if (!optPolicies.isPresent()) {
@@ -338,7 +337,7 @@ public class PersistentTopic extends AbstractTopic
updateUnackedMessagesAppliedOnSubscription(null);
updateUnackedMessagesExceededOnConsumer(null);
return null;
- });
+ }));
}
// for testing purposes
@@ -1519,13 +1518,13 @@ public class PersistentTopic extends AbstractTopic
@Override
public void openCursorComplete(ManagedCursor cursor, Object ctx) {
String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
- boolean isReplicatorStarted = addReplicationCluster(remoteCluster, cursor, localCluster);
- if (isReplicatorStarted) {
- future.complete(null);
- } else {
- future.completeExceptionally(new NamingException(
- PersistentTopic.this.getName() + " Failed to start replicator " + remoteCluster));
- }
+ addReplicationCluster(remoteCluster, cursor, localCluster).whenComplete((__, ex) -> {
+ if (ex == null) {
+ future.complete(null);
+ } else {
+ future.completeExceptionally(ex);
+ }
+ });
}
@Override
@@ -1538,23 +1537,29 @@ public class PersistentTopic extends AbstractTopic
return future;
}
- protected boolean addReplicationCluster(String remoteCluster, ManagedCursor cursor, String localCluster) {
- AtomicBoolean isReplicatorStarted = new AtomicBoolean(true);
- replicators.computeIfAbsent(remoteCluster, r -> {
- try {
- return new PersistentReplicator(PersistentTopic.this, cursor, localCluster, remoteCluster,
- brokerService);
- } catch (NamingException | PulsarServerException e) {
- isReplicatorStarted.set(false);
- log.error("[{}] Replicator startup failed due to partitioned-topic {}", topic, remoteCluster);
- }
- return null;
- });
- // clean up replicator if startup is failed
- if (!isReplicatorStarted.get()) {
- replicators.remove(remoteCluster);
- }
- return isReplicatorStarted.get();
+ protected CompletableFuture<Void> addReplicationCluster(String remoteCluster, ManagedCursor cursor,
+ String localCluster) {
+ return AbstractReplicator.validatePartitionedTopicAsync(PersistentTopic.this.getName(), brokerService)
+ .thenCompose(__ -> brokerService.pulsar().getPulsarResources().getClusterResources()
+ .getClusterAsync(remoteCluster)
+ .thenApply(clusterData ->
+ brokerService.getReplicationClient(remoteCluster, clusterData)))
+ .thenAccept(replicationClient -> {
+ replicators.computeIfAbsent(remoteCluster, r -> {
+ try {
+ return new PersistentReplicator(PersistentTopic.this, cursor, localCluster,
+ remoteCluster, brokerService, (PulsarClientImpl) replicationClient);
+ } catch (PulsarServerException e) {
+ log.error("[{}] Replicator startup failed {}", topic, remoteCluster, e);
+ }
+ return null;
+ });
+
+ // clean up replicator if startup is failed
+ if (replicators.containsKey(remoteCluster) && replicators.get(remoteCluster) == null) {
+ replicators.remove(remoteCluster);
+ }
+ });
}
CompletableFuture<Void> removeReplicator(String remoteCluster) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 8603404..543d897 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -1768,7 +1768,10 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase {
doReturn(remoteCluster).when(cursor).getName();
brokerService.getReplicationClients().put(remoteCluster, client);
PersistentReplicator replicator = spy(
- new PersistentReplicator(topic, cursor, localCluster, remoteCluster, brokerService));
+ new PersistentReplicator(topic, cursor, localCluster, remoteCluster, brokerService,
+ (PulsarClientImpl) brokerService.getReplicationClient(remoteCluster,
+ brokerService.pulsar().getPulsarResources().getClusterResources()
+ .getCluster(remoteCluster))));
replicatorMap.put(remoteReplicatorName, replicator);
// step-1 remove replicator : it will disconnect the producer but it will wait for callback to be completed
@@ -1813,7 +1816,10 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase {
ManagedCursor cursor = mock(ManagedCursorImpl.class);
doReturn(remoteCluster).when(cursor).getName();
brokerService.getReplicationClients().put(remoteCluster, client);
- PersistentReplicator replicator = new PersistentReplicator(topic, cursor, localCluster, remoteCluster, brokerService);
+ PersistentReplicator replicator = new PersistentReplicator(topic, cursor, localCluster, remoteCluster,
+ brokerService, (PulsarClientImpl) brokerService.getReplicationClient(remoteCluster,
+ brokerService.pulsar().getPulsarResources().getClusterResources()
+ .getCluster(remoteCluster)));
// PersistentReplicator constructor calls startProducer()
verify(clientImpl)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRemoveClusterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRemoveClusterTest.java
index 701ab47..65e9096 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRemoveClusterTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRemoveClusterTest.java
@@ -75,7 +75,9 @@ public class ReplicatorRemoveClusterTest extends ReplicatorTestBase {
admin1.namespaces().createNamespace("pulsar1/ns1", Sets.newHashSet("r1", "r2", "r3"));
- PulsarClient repClient1 = pulsar1.getBrokerService().getReplicationClient("r3");
+ PulsarClient repClient1 = pulsar1.getBrokerService().getReplicationClient("r3",
+ pulsar1.getBrokerService().pulsar().getPulsarResources().getClusterResources()
+ .getCluster("r3"));
Assert.assertNotNull(repClient1);
Assert.assertFalse(repClient1.isClosed());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index 9b4fcc8..11c8e19 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -262,7 +262,9 @@ public class ReplicatorTest extends ReplicatorTestBase {
.getOrCreateTopic(topicName.toString()).get();
PulsarClientImpl pulsarClient = spy((PulsarClientImpl) pulsar1.getBrokerService()
- .getReplicationClient("r3"));
+ .getReplicationClient("r3",
+ pulsar1.getBrokerService().pulsar().getPulsarResources().getClusterResources()
+ .getCluster("r3")));
final Method startRepl = PersistentTopic.class.getDeclaredMethod("startReplicator", String.class);
startRepl.setAccessible(true);
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/GeoReplicationTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/GeoReplicationTest.java
new file mode 100644
index 0000000..75ce363
--- /dev/null
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/GeoReplicationTest.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.messaging;
+
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.tests.integration.topologies.PulsarGeoClusterTestBase;
+import org.awaitility.Awaitility;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Geo replication test.
+ */
+@Slf4j
+public class GeoReplicationTest extends PulsarGeoClusterTestBase {
+
+ @BeforeClass(alwaysRun = true)
+ public final void setupBeforeClass() throws Exception {
+ setup();
+ }
+
+ @AfterClass(alwaysRun = true)
+ public final void tearDownAfterClass() throws Exception {
+ cleanup();
+ }
+
+ @Test(timeOut = 1000 * 30, dataProvider = "TopicDomain")
+ public void testTopicReplication(String domain) throws Exception {
+ String cluster1 = getGeoCluster().getClusters()[0].getClusterName();
+ String cluster2 = getGeoCluster().getClusters()[1].getClusterName();
+
+ @Cleanup
+ PulsarAdmin admin = PulsarAdmin.builder()
+ .serviceHttpUrl(getGeoCluster().getClusters()[0].getHttpServiceUrl())
+ .requestTimeout(30, TimeUnit.SECONDS)
+ .build();
+
+ String topic = domain + "://public/default/testTopicReplication-" + UUID.randomUUID();
+ Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
+ try {
+ admin.topics().createPartitionedTopic(topic, 10);
+ } catch (Exception e) {
+ log.error("Failed to create partitioned topic {}.", topic, e);
+ Assert.fail("Failed to create partitioned topic " + topic);
+ }
+ Assert.assertEquals(admin.topics().getPartitionedTopicMetadata(topic).partitions, 10);
+ });
+ log.info("Test geo-replication produce and consume for topic {}.", topic);
+
+ @Cleanup
+ PulsarClient client1 = PulsarClient.builder()
+ .serviceUrl(getGeoCluster().getClusters()[0].getPlainTextServiceUrl())
+ .build();
+
+ @Cleanup
+ PulsarClient client2 = PulsarClient.builder()
+ .serviceUrl(getGeoCluster().getClusters()[1].getPlainTextServiceUrl())
+ .build();
+
+ @Cleanup
+ Producer<byte[]> p = client1.newProducer()
+ .topic(topic)
+ .create();
+ log.info("Successfully create producer in cluster {} for topic {}.", cluster1, topic);
+
+ @Cleanup
+ Consumer<byte[]> c = client2.newConsumer()
+ .topic(topic)
+ .subscriptionName("geo-sub")
+ .subscribe();
+ log.info("Successfully create consumer in cluster {} for topic {}.", cluster2, topic);
+
+ for (int i = 0; i < 10; i++) {
+ p.send(String.format("Message [%d]", i).getBytes(StandardCharsets.UTF_8));
+ }
+ log.info("Successfully produce message to cluster {} for topic {}.", cluster1, topic);
+
+ for (int i = 0; i < 10; i++) {
+ Message<byte[]> message = c.receive(10, TimeUnit.SECONDS);
+ Assert.assertNotNull(message);
+ }
+ log.info("Successfully consume message from cluster {} for topic {}.", cluster2, topic);
+ }
+}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
index 043762c..2191799 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
@@ -70,7 +70,14 @@ public class PulsarCluster {
* @return the built pulsar cluster
*/
public static PulsarCluster forSpec(PulsarClusterSpec spec) {
- return new PulsarCluster(spec);
+ CSContainer csContainer = new CSContainer(spec.clusterName)
+ .withNetwork(Network.newNetwork())
+ .withNetworkAliases(CSContainer.NAME);
+ return new PulsarCluster(spec, csContainer, false);
+ }
+
+ public static PulsarCluster forSpec(PulsarClusterSpec spec, CSContainer csContainer) {
+ return new PulsarCluster(spec, csContainer, true);
}
private final PulsarClusterSpec spec;
@@ -80,6 +87,7 @@ public class PulsarCluster {
private final Network network;
private final ZKContainer zkContainer;
private final CSContainer csContainer;
+ private final boolean sharedCsContainer;
private final Map<String, BKContainer> bookieContainers;
private final Map<String, BrokerContainer> brokerContainers;
private final Map<String, WorkerContainer> workerContainers;
@@ -90,11 +98,12 @@ public class PulsarCluster {
private Map<String, GenericContainer<?>> externalServices = Collections.emptyMap();
private final boolean enablePrestoWorker;
- private PulsarCluster(PulsarClusterSpec spec) {
+ private PulsarCluster(PulsarClusterSpec spec, CSContainer csContainer, boolean sharedCsContainer) {
this.spec = spec;
+ this.sharedCsContainer = sharedCsContainer;
this.clusterName = spec.clusterName();
- this.network = Network.newNetwork();
+ this.network = csContainer.getNetwork();
this.enablePrestoWorker = spec.enablePrestoWorker();
this.sqlFollowWorkerContainers = Maps.newTreeMap();
@@ -109,26 +118,24 @@ public class PulsarCluster {
this.zkContainer = new ZKContainer(clusterName);
this.zkContainer
.withNetwork(network)
- .withNetworkAliases(ZKContainer.NAME)
+ .withNetworkAliases(appendClusterName(ZKContainer.NAME))
.withEnv("clusterName", clusterName)
- .withEnv("zkServers", ZKContainer.NAME)
+ .withEnv("zkServers", appendClusterName(ZKContainer.NAME))
.withEnv("configurationStore", CSContainer.NAME + ":" + CS_PORT)
.withEnv("forceSync", "no")
- .withEnv("pulsarNode", "pulsar-broker-0");
+ .withEnv("pulsarNode", appendClusterName("pulsar-broker-0"));
- this.csContainer = new CSContainer(clusterName)
- .withNetwork(network)
- .withNetworkAliases(CSContainer.NAME);
+ this.csContainer = csContainer;
this.bookieContainers = Maps.newTreeMap();
this.brokerContainers = Maps.newTreeMap();
this.workerContainers = Maps.newTreeMap();
- this.proxyContainer = new ProxyContainer(clusterName, ProxyContainer.NAME)
+ this.proxyContainer = new ProxyContainer(appendClusterName("pulsar-proxy"), ProxyContainer.NAME)
.withNetwork(network)
- .withNetworkAliases("pulsar-proxy")
- .withEnv("zkServers", ZKContainer.NAME)
- .withEnv("zookeeperServers", ZKContainer.NAME)
+ .withNetworkAliases(appendClusterName("pulsar-proxy"))
+ .withEnv("zkServers", appendClusterName(ZKContainer.NAME))
+ .withEnv("zookeeperServers", appendClusterName(ZKContainer.NAME))
.withEnv("configurationStoreServers", CSContainer.NAME + ":" + CS_PORT)
.withEnv("clusterName", clusterName);
if (spec.proxyEnvs != null) {
@@ -142,8 +149,8 @@ public class PulsarCluster {
bookieContainers.putAll(
runNumContainers("bookie", spec.numBookies(), (name) -> new BKContainer(clusterName, name)
.withNetwork(network)
- .withNetworkAliases(name)
- .withEnv("zkServers", ZKContainer.NAME)
+ .withNetworkAliases(appendClusterName(name))
+ .withEnv("zkServers", appendClusterName(ZKContainer.NAME))
.withEnv("useHostNameAsBookieID", "true")
// Disable fsyncs for tests since they're slow within the containers
.withEnv("journalSyncData", "false")
@@ -156,11 +163,11 @@ public class PulsarCluster {
// create brokers
brokerContainers.putAll(
runNumContainers("broker", spec.numBrokers(), (name) -> {
- BrokerContainer brokerContainer = new BrokerContainer(clusterName, name)
+ BrokerContainer brokerContainer = new BrokerContainer(clusterName, appendClusterName(name))
.withNetwork(network)
- .withNetworkAliases(name)
- .withEnv("zkServers", ZKContainer.NAME)
- .withEnv("zookeeperServers", ZKContainer.NAME)
+ .withNetworkAliases(appendClusterName(name))
+ .withEnv("zkServers", appendClusterName(ZKContainer.NAME))
+ .withEnv("zookeeperServers", appendClusterName(ZKContainer.NAME))
.withEnv("configurationStoreServers", CSContainer.NAME + ":" + CS_PORT)
.withEnv("clusterName", clusterName)
.withEnv("brokerServiceCompactionMonitorIntervalInSeconds", "1")
@@ -235,8 +242,10 @@ public class PulsarCluster {
log.info("Successfully started local zookeeper container.");
// start the configuration store
- csContainer.start();
- log.info("Successfully started configuration store container.");
+ if (!sharedCsContainer) {
+ csContainer.start();
+ log.info("Successfully started configuration store container.");
+ }
// init the cluster
zkContainer.execCmd(
@@ -335,9 +344,11 @@ public class PulsarCluster {
if (null != proxyContainer) {
containers.add(proxyContainer);
}
- if (null != csContainer) {
+
+ if (!sharedCsContainer && null != csContainer) {
containers.add(csContainer);
}
+
if (null != zkContainer) {
containers.add(zkContainer);
}
@@ -668,4 +679,8 @@ public class PulsarCluster {
}
}
}
+
+ private String appendClusterName(String name) {
+ return sharedCsContainer ? clusterName + "-" + name : name;
+ }
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarGeoCluster.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarGeoCluster.java
new file mode 100644
index 0000000..9be3c38
--- /dev/null
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarGeoCluster.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.topologies;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.tests.integration.containers.CSContainer;
+import org.testcontainers.containers.Network;
+
+@Slf4j
+public class PulsarGeoCluster {
+
+ @Getter
+ private final PulsarClusterSpec[] clusterSpecs;
+
+ @Getter
+ private final CSContainer csContainer;
+
+ @Getter
+ private final PulsarCluster[] clusters;
+
+ /**
+ * Pulsar Cluster Spec
+ *
+ * @param specs each pulsar cluster spec.
+ * @return the built a pulsar cluster with geo replication
+ */
+ public static PulsarGeoCluster forSpec(PulsarClusterSpec... specs) {
+ return new PulsarGeoCluster(specs);
+ }
+
+ public PulsarGeoCluster(PulsarClusterSpec... clusterSpecs) {
+ this.clusterSpecs = clusterSpecs;
+ this.clusters = new PulsarCluster[clusterSpecs.length];
+
+ this.csContainer = new CSContainer("geo-cluster")
+ .withNetwork(Network.newNetwork())
+ .withNetworkAliases(CSContainer.NAME);
+
+ for (int i = 0; i < this.clusters.length; i++) {
+ clusters[i] = PulsarCluster.forSpec(this.clusterSpecs[i], this.csContainer);
+ }
+ }
+
+ public void start() throws Exception {
+ // start the configuration store
+ this.csContainer.start();
+ log.info("Successfully started configuration store container.");
+
+ for (PulsarCluster cluster : clusters) {
+ cluster.start();
+ log.info("Successfully started all components for cluster {}.", cluster.getClusterName());
+ }
+ }
+
+ public void stop() throws Exception {
+ for (PulsarCluster cluster : clusters) {
+ cluster.stop();
+ log.info("Successfully stopped all components for cluster {}.", cluster.getClusterName());
+ }
+ // stop the configuration store
+ this.csContainer.stop();
+ log.info("Successfully stopped configuration store container.");
+ }
+
+}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarGeoClusterTestBase.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarGeoClusterTestBase.java
new file mode 100644
index 0000000..51c74ee
--- /dev/null
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarGeoClusterTestBase.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.topologies;
+
+import static java.util.stream.Collectors.joining;
+import java.util.stream.Stream;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class PulsarGeoClusterTestBase extends PulsarTestBase {
+
+ @Override
+ protected final void setup() throws Exception {
+ setupCluster();
+ }
+
+ @Override
+ protected final void cleanup() throws Exception {
+ tearDownCluster();
+ }
+
+ protected void setupCluster() throws Exception {
+ this.setupCluster("");
+ }
+
+ @Getter
+ private PulsarGeoCluster geoCluster;
+
+ public void setupCluster(String namePrefix) throws Exception {
+ PulsarClusterSpec.PulsarClusterSpecBuilder[] specBuilders = new PulsarClusterSpec.PulsarClusterSpecBuilder[2];
+ for (int i = 0; i < 2; i++) {
+ String clusterName = Stream.of(this.getClass().getSimpleName(), namePrefix, String.valueOf(i),
+ randomName(5))
+ .filter(s -> s != null && !s.isEmpty())
+ .collect(joining("-"));
+ specBuilders[i] = PulsarClusterSpec.builder().clusterName(clusterName);
+ }
+ specBuilders = beforeSetupCluster(specBuilders);
+ PulsarClusterSpec[] specs = new PulsarClusterSpec[2];
+ for (int i = 0; i < specBuilders.length; i++) {
+ specs[i] = specBuilders[i].build();
+ }
+ setupCluster0(specs);
+ }
+
+ protected PulsarClusterSpec.PulsarClusterSpecBuilder[] beforeSetupCluster (
+ PulsarClusterSpec.PulsarClusterSpecBuilder... specBuilder) {
+ return specBuilder;
+ }
+
+ protected void setupCluster0(PulsarClusterSpec... specs) throws Exception {
+ incrementSetupNumber();
+ log.info("Setting up geo cluster with {} local clusters}", specs.length);
+
+ this.geoCluster = PulsarGeoCluster.forSpec(specs);
+
+ beforeStartCluster();
+
+ this.geoCluster.start();
+
+ log.info("Geo Cluster is setup!");
+ }
+
+ protected void beforeStartCluster() throws Exception {
+ // no-op
+ }
+
+ public void tearDownCluster() throws Exception {
+ markCurrentSetupNumberCleaned();
+ if (null != this.geoCluster) {
+ this.geoCluster.stop();
+ }
+ }
+}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarTestBase.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarTestBase.java
index 9989b15..ebdfbe8 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarTestBase.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarTestBase.java
@@ -34,9 +34,18 @@ import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.tests.TestRetrySupport;
import org.testng.Assert;
+import org.testng.annotations.DataProvider;
public abstract class PulsarTestBase extends TestRetrySupport {
+ @DataProvider(name = "TopicDomain")
+ public Object[][] topicDomain() {
+ return new Object[][] {
+ {"persistent"},
+ {"non-persistent"}
+ };
+ }
+
public static String randomName(int numChars) {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < numChars; i++) {
diff --git a/tests/integration/src/test/resources/pulsar-messaging.xml b/tests/integration/src/test/resources/pulsar-messaging.xml
index aa31852..6421561 100644
--- a/tests/integration/src/test/resources/pulsar-messaging.xml
+++ b/tests/integration/src/test/resources/pulsar-messaging.xml
@@ -24,6 +24,7 @@
<classes>
<class name="org.apache.pulsar.tests.integration.messaging.PersistentTopicMessagingTest" />
<class name="org.apache.pulsar.tests.integration.messaging.NonPersistentTopicMessagingTest" />
+ <class name="org.apache.pulsar.tests.integration.messaging.GeoReplicationTest" />
<class name="org.apache.pulsar.tests.integration.messaging.DelayMessagingTest" />
<class name="org.apache.pulsar.tests.integration.admin.AdminTest" />
<class name="org.apache.pulsar.tests.integration.io.sources.AvroKafkaSourceTest" />