You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/03/25 04:19:58 UTC
[pulsar] branch master updated: Refactored ReplicatorTest to be
more easily debuggable (#3892)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 1921bd9 Refactored ReplicatorTest to be more easily debuggable (#3892)
1921bd9 is described below
commit 1921bd98f20991248892717bbcfc102545e7c49c
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Sun Mar 24 21:19:54 2019 -0700
Refactored ReplicatorTest to be more easily debuggable (#3892)
---
.../pulsar/broker/service/ReplicatorTest.java | 429 ++++------
.../pulsar/broker/service/ReplicatorTestBase.java | 71 +-
.../broker/service/v1/V1_ReplicatorTest.java | 937 ---------------------
.../broker/service/v1/V1_ReplicatorTestBase.java | 377 ---------
4 files changed, 207 insertions(+), 1607 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index a66179b..3150983 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -38,6 +38,8 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -85,7 +87,7 @@ import com.scurrilous.circe.checksum.Crc32cIntChecksum;
import io.netty.buffer.ByteBuf;
/**
- * Starts 2 brokers that are in 2 different clusters
+ * Starts 3 brokers that are in 3 different clusters
*/
public class ReplicatorTest extends ReplicatorTestBase {
@@ -113,7 +115,7 @@ public class ReplicatorTest extends ReplicatorTestBase {
return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
}
- @Test(enabled = true, timeOut = 30000)
+ @Test
public void testConfigChange() throws Exception {
log.info("--- Starting ReplicatorTest::testConfigChange ---");
// This test is to verify that the config change on global namespace is successfully applied in broker during
@@ -127,18 +129,16 @@ public class ReplicatorTest extends ReplicatorTestBase {
@Override
public Void call() throws Exception {
+ @Cleanup
MessageProducer producer = new MessageProducer(url1, dest);
log.info("--- Starting producer --- " + url1);
+ @Cleanup
MessageConsumer consumer = new MessageConsumer(url1, dest);
log.info("--- Starting Consumer --- " + url1);
producer.produce(2);
-
consumer.receive(2);
-
- producer.close();
- consumer.close();
return null;
}
}));
@@ -207,6 +207,8 @@ public class ReplicatorTest extends ReplicatorTestBase {
admin1.namespaces().createNamespace(namespace);
admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2"));
final TopicName topicName = TopicName.get(String.format("persistent://" + namespace + "/topic-%d", 0));
+
+ @Cleanup
PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS)
.build();
Producer<byte[]> producer = client1.newProducer().topic(topicName.toString())
@@ -243,220 +245,154 @@ public class ReplicatorTest extends ReplicatorTestBase {
Mockito.verify(pulsarClient, Mockito.times(1)).createProducerAsync(Mockito.any(ProducerConfigurationData.class),
Mockito.any(Schema.class), eq(null));
- client1.shutdown();
+ executor.shutdown();
}
- @Test(enabled = false, timeOut = 30000)
- public void testConfigChangeNegativeCases() throws Exception {
- log.info("--- Starting ReplicatorTest::testConfigChangeNegativeCases ---");
- // Negative test cases for global namespace config change. Verify that the namespace config change can not be
- // updated when the namespace is being unloaded.
- // Set up field access to internal namespace state in NamespaceService
- Field ownershipCacheField = NamespaceService.class.getDeclaredField("ownershipCache");
- ownershipCacheField.setAccessible(true);
- OwnershipCache ownerCache = (OwnershipCache) ownershipCacheField.get(pulsar1.getNamespaceService());
- Assert.assertNotNull(pulsar1, "pulsar1 is null");
- Assert.assertNotNull(pulsar1.getNamespaceService(), "pulsar1.getNamespaceService() is null");
- NamespaceBundle globalNsBundle = pulsar1.getNamespaceService().getNamespaceBundleFactory()
- .getFullBundle(NamespaceName.get("pulsar/ns"));
- ownerCache.tryAcquiringOwnership(globalNsBundle);
- Assert.assertNotNull(ownerCache.getOwnedBundle(globalNsBundle),
- "pulsar1.getNamespaceService().getOwnedServiceUnit(NamespaceName.get(\"pulsar/ns\")) is null");
- Field stateField = OwnedBundle.class.getDeclaredField("isActive");
- stateField.setAccessible(true);
- // set the namespace to be disabled
- ownerCache.disableOwnership(globalNsBundle);
-
- // Make sure the namespace config update failed
- try {
- admin1.namespaces().setNamespaceReplicationClusters("pulsar/ns", Sets.newHashSet("r1"));
- fail("Should have raised exception");
- } catch (PreconditionFailedException pfe) {
- // OK
- }
-
- // restore the namespace state
- ownerCache.removeOwnership(globalNsBundle).get();
- ownerCache.tryAcquiringOwnership(globalNsBundle);
+ @DataProvider(name = "namespace")
+ public Object[][] namespaceNameProvider() {
+ return new Object[][] { { "pulsar/ns" }, { "pulsar/global/ns" } };
}
- @Test(enabled = true, timeOut = 30000)
- public void testReplication() throws Exception {
-
+ @Test(dataProvider = "namespace")
+ public void testReplication(String namespace) throws Exception {
log.info("--- Starting ReplicatorTest::testReplication ---");
// This test is to verify that the config change on global namespace is successfully applied in broker during
// runtime.
// Run a set of producer tasks to create the topics
- SortedSet<String> testDests = new TreeSet<String>();
- List<Future<Void>> results = Lists.newArrayList();
- for (int i = 0; i < 3; i++) {
- final TopicName dest = TopicName.get(String.format("persistent://pulsar/ns/repltopic-%d", i));
- testDests.add(dest.toString());
-
- results.add(executor.submit(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
+ final TopicName dest = TopicName
+ .get(String.format("persistent://%s/repltopic-%d", namespace, System.nanoTime()));
- MessageProducer producer1 = new MessageProducer(url1, dest);
- log.info("--- Starting producer --- " + url1);
-
- MessageProducer producer2 = new MessageProducer(url2, dest);
- log.info("--- Starting producer --- " + url2);
-
- MessageProducer producer3 = new MessageProducer(url3, dest);
- log.info("--- Starting producer --- " + url3);
+ @Cleanup
+ MessageProducer producer1 = new MessageProducer(url1, dest);
+ log.info("--- Starting producer --- " + url1);
- MessageConsumer consumer1 = new MessageConsumer(url1, dest);
- log.info("--- Starting Consumer --- " + url1);
+ @Cleanup
+ MessageProducer producer2 = new MessageProducer(url2, dest);
+ log.info("--- Starting producer --- " + url2);
- MessageConsumer consumer2 = new MessageConsumer(url2, dest);
- log.info("--- Starting Consumer --- " + url2);
+ @Cleanup
+ MessageProducer producer3 = new MessageProducer(url3, dest);
+ log.info("--- Starting producer --- " + url3);
- MessageConsumer consumer3 = new MessageConsumer(url3, dest);
- log.info("--- Starting Consumer --- " + url3);
+ @Cleanup
+ MessageConsumer consumer1 = new MessageConsumer(url1, dest);
+ log.info("--- Starting Consumer --- " + url1);
- // Produce from cluster1 and consume from the rest
- producer1.produce(2);
+ @Cleanup
+ MessageConsumer consumer2 = new MessageConsumer(url2, dest);
+ log.info("--- Starting Consumer --- " + url2);
- consumer1.receive(2);
+ @Cleanup
+ MessageConsumer consumer3 = new MessageConsumer(url3, dest);
+ log.info("--- Starting Consumer --- " + url3);
- consumer2.receive(2);
+ // Produce from cluster1 and consume from the rest
+ producer1.produce(2);
- consumer3.receive(2);
+ consumer1.receive(2);
- // Produce from cluster2 and consume from the rest
- producer2.produce(2);
+ consumer2.receive(2);
- consumer1.receive(2);
+ consumer3.receive(2);
- consumer2.receive(2);
+ // Produce from cluster2 and consume from the rest
+ producer2.produce(2);
- consumer3.receive(2);
+ consumer1.receive(2);
- // Produce from cluster3 and consume from the rest
- producer3.produce(2);
+ consumer2.receive(2);
- consumer1.receive(2);
+ consumer3.receive(2);
- consumer2.receive(2);
+ // Produce from cluster3 and consume from the rest
+ producer3.produce(2);
- consumer3.receive(2);
+ consumer1.receive(2);
- // Produce from cluster1&2 and consume from cluster3
- producer1.produce(1);
- producer2.produce(1);
+ consumer2.receive(2);
- consumer1.receive(1);
+ consumer3.receive(2);
- consumer2.receive(1);
+ // Produce from cluster1&2 and consume from cluster3
+ producer1.produce(1);
+ producer2.produce(1);
- consumer3.receive(1);
+ consumer1.receive(1);
- consumer1.receive(1);
+ consumer2.receive(1);
- consumer2.receive(1);
+ consumer3.receive(1);
- consumer3.receive(1);
+ consumer1.receive(1);
- producer1.close();
- producer2.close();
- producer3.close();
- consumer1.close();
- consumer2.close();
- consumer3.close();
- return null;
- }
- }));
- }
+ consumer2.receive(1);
- for (Future<Void> result : results) {
- try {
- result.get();
- } catch (Exception e) {
- log.error("exception in getting future result ", e);
- fail(String.format("replication test failed with %s exception", e.getMessage()));
- }
- }
+ consumer3.receive(1);
}
- @Test(enabled = false, timeOut = 30000)
+ @Test
public void testReplicationOverrides() throws Exception {
-
log.info("--- Starting ReplicatorTest::testReplicationOverrides ---");
// This test is to verify that the config change on global namespace is successfully applied in broker during
// runtime.
// Run a set of producer tasks to create the topics
- SortedSet<String> testDests = new TreeSet<String>();
- List<Future<Void>> results = Lists.newArrayList();
for (int i = 0; i < 10; i++) {
- final TopicName dest = TopicName.get(String.format("persistent://pulsar/ns/repltopic-%d", i));
- testDests.add(dest.toString());
+ final TopicName dest = TopicName
+ .get(String.format("persistent://pulsar/ns/repltopic-%d", System.nanoTime()));
- results.add(executor.submit(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
+ @Cleanup
+ MessageProducer producer1 = new MessageProducer(url1, dest);
+ log.info("--- Starting producer --- " + url1);
- MessageProducer producer1 = new MessageProducer(url1, dest);
- log.info("--- Starting producer --- " + url1);
+ @Cleanup
+ MessageProducer producer2 = new MessageProducer(url2, dest);
+ log.info("--- Starting producer --- " + url2);
- MessageProducer producer2 = new MessageProducer(url2, dest);
- log.info("--- Starting producer --- " + url2);
+ @Cleanup
+ MessageProducer producer3 = new MessageProducer(url3, dest);
+ log.info("--- Starting producer --- " + url3);
- MessageProducer producer3 = new MessageProducer(url3, dest);
- log.info("--- Starting producer --- " + url3);
+ @Cleanup
+ MessageConsumer consumer1 = new MessageConsumer(url1, dest);
+ log.info("--- Starting Consumer --- " + url1);
- MessageConsumer consumer1 = new MessageConsumer(url1, dest);
- log.info("--- Starting Consumer --- " + url1);
+ @Cleanup
+ MessageConsumer consumer2 = new MessageConsumer(url2, dest);
+ log.info("--- Starting Consumer --- " + url2);
- MessageConsumer consumer2 = new MessageConsumer(url2, dest);
- log.info("--- Starting Consumer --- " + url2);
-
- MessageConsumer consumer3 = new MessageConsumer(url3, dest);
- log.info("--- Starting Consumer --- " + url3);
-
- // Produce a message that isn't replicated
- producer1.produce(1, producer1.newMessage().disableReplication());
-
- // Produce a message not replicated to r2
- producer1.produce(1, producer1.newMessage().replicationClusters(Lists.newArrayList("r1", "r3")));
-
- // Produce a default replicated message
- producer1.produce(1);
-
- consumer1.receive(3);
- consumer2.receive(1);
- if (!consumer2.drained()) {
- throw new Exception("consumer2 - unexpected message in queue");
- }
- consumer3.receive(2);
- if (!consumer3.drained()) {
- throw new Exception("consumer3 - unexpected message in queue");
- }
-
- producer1.close();
- producer2.close();
- producer3.close();
- consumer1.close();
- consumer2.close();
- consumer3.close();
- return null;
- }
- }));
- }
+ @Cleanup
+ MessageConsumer consumer3 = new MessageConsumer(url3, dest);
+ log.info("--- Starting Consumer --- " + url3);
- for (Future<Void> result : results) {
- try {
- result.get();
- } catch (Exception e) {
- log.error("exception in getting future result ", e);
- fail(String.format("replication test failed with %s exception", e.getMessage()));
- }
+ // Produce a message that isn't replicated
+ producer1.produce(1, producer1.newMessage().disableReplication());
+
+ consumer1.receive(1);
+ assertTrue(consumer2.drained());
+ assertTrue(consumer3.drained());
+
+ // Produce a message not replicated to r2
+ producer1.produce(1, producer1.newMessage().replicationClusters(Lists.newArrayList("r1", "r3")));
+ consumer1.receive(1);
+ assertTrue(consumer2.drained());
+ consumer3.receive(1);
+
+ // Produce a default replicated message
+ producer1.produce(1);
+
+ consumer1.receive(1);
+ consumer2.receive(1);
+ consumer3.receive(1);
+
+ assertTrue(consumer1.drained());
+ assertTrue(consumer2.drained());
+ assertTrue(consumer3.drained());
}
}
- @Test(enabled = true, timeOut = 30000)
+ @Test()
public void testFailures() throws Exception {
log.info("--- Starting ReplicatorTest::testFailures ---");
@@ -479,17 +415,16 @@ public class ReplicatorTest extends ReplicatorTestBase {
@Test(timeOut = 30000)
public void testReplicatePeekAndSkip() throws Exception {
- SortedSet<String> testDests = new TreeSet<String>();
-
final TopicName dest = TopicName.get("persistent://pulsar/ns/peekAndSeekTopic");
- testDests.add(dest.toString());
+ @Cleanup
MessageProducer producer1 = new MessageProducer(url1, dest);
+
+ @Cleanup
MessageConsumer consumer1 = new MessageConsumer(url3, dest);
// Produce from cluster1 and consume from the rest
producer1.produce(2);
- producer1.close();
PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(dest.toString()).get();
PersistentReplicator replicator = (PersistentReplicator) topic.getReplicators()
.get(topic.getReplicators().keys().get(0));
@@ -497,7 +432,6 @@ public class ReplicatorTest extends ReplicatorTestBase {
CompletableFuture<Entry> result = replicator.peekNthMessage(1);
Entry entry = result.get(50, TimeUnit.MILLISECONDS);
assertNull(entry);
- consumer1.close();
}
@Test(timeOut = 30000)
@@ -509,12 +443,14 @@ public class ReplicatorTest extends ReplicatorTestBase {
final TopicName dest = TopicName.get("persistent://pulsar/ns/clearBacklogTopic");
testDests.add(dest.toString());
+ @Cleanup
MessageProducer producer1 = new MessageProducer(url1, dest);
+
+ @Cleanup
MessageConsumer consumer1 = new MessageConsumer(url3, dest);
// Produce from cluster1 and consume from the rest
producer1.produce(2);
- producer1.close();
PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(dest.toString()).get();
PersistentReplicator replicator = (PersistentReplicator) spy(
topic.getReplicators().get(topic.getReplicators().keys().get(0)));
@@ -525,7 +461,6 @@ public class ReplicatorTest extends ReplicatorTestBase {
replicator.expireMessages(1); // for code-coverage
ReplicatorStats status = replicator.getStats();
assertTrue(status.replicationBacklog == 0);
- consumer1.close();
}
@Test(enabled = true, timeOut = 30000)
@@ -534,116 +469,73 @@ public class ReplicatorTest extends ReplicatorTestBase {
log.info("--- Starting ReplicatorTest::testResetCursorNotFail ---");
// This test is to verify that reset cursor fails on global topic
- SortedSet<String> testDests = new TreeSet<String>();
- List<Future<Void>> results = Lists.newArrayList();
- for (int i = 0; i < 1; i++) {
- final TopicName dest = TopicName.get(String.format("persistent://pulsar/ns/resetrepltopic-%d", i));
- testDests.add(dest.toString());
-
- results.add(executor.submit(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
-
- MessageProducer producer1 = new MessageProducer(url1, dest);
- log.info("--- Starting producer --- " + url1);
+ final TopicName dest = TopicName
+ .get(String.format("persistent://pulsar/ns/resetrepltopic-%d", System.nanoTime()));
- MessageConsumer consumer1 = new MessageConsumer(url1, dest);
- log.info("--- Starting Consumer --- " + url1);
+ @Cleanup
+ MessageProducer producer1 = new MessageProducer(url1, dest);
+ log.info("--- Starting producer --- " + url1);
- // Produce from cluster1 and consume from the rest
- producer1.produce(2);
+ @Cleanup
+ MessageConsumer consumer1 = new MessageConsumer(url1, dest);
+ log.info("--- Starting Consumer --- " + url1);
- consumer1.receive(2);
+ // Produce from cluster1 and consume from the rest
+ producer1.produce(2);
- producer1.close();
- consumer1.close();
- return null;
- }
- }));
- }
+ consumer1.receive(2);
- for (Future<Void> result : results) {
- try {
- result.get();
- } catch (Exception e) {
- log.error("exception in getting future result ", e);
- fail(String.format("replication test failed with %s exception", e.getMessage()));
- }
- }
- admin1.topics().resetCursor(testDests.first(), "sub-id", System.currentTimeMillis());
+ admin1.topics().resetCursor(dest.toString(), "sub-id", System.currentTimeMillis());
}
- @Test(enabled = true, timeOut = 30000)
+ @Test
public void testReplicationForBatchMessages() throws Exception {
-
log.info("--- Starting ReplicatorTest::testReplicationForBatchMessages ---");
// Run a set of producer tasks to create the topics
- SortedSet<String> testDests = new TreeSet<String>();
- List<Future<Void>> results = Lists.newArrayList();
- for (int i = 0; i < 3; i++) {
- final TopicName dest = TopicName.get(String.format("persistent://pulsar/ns/repltopicbatch-%d", i));
- testDests.add(dest.toString());
-
- results.add(executor.submit(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
+ final TopicName dest = TopicName.get(String.format("persistent://pulsar/ns/repltopicbatch-%d", System.nanoTime()));
- MessageProducer producer1 = new MessageProducer(url1, dest, true);
- log.info("--- Starting producer --- " + url1);
-
- MessageProducer producer2 = new MessageProducer(url2, dest, true);
- log.info("--- Starting producer --- " + url2);
+ @Cleanup
+ MessageProducer producer1 = new MessageProducer(url1, dest, true);
+ log.info("--- Starting producer --- " + url1);
- MessageProducer producer3 = new MessageProducer(url3, dest, true);
- log.info("--- Starting producer --- " + url3);
+ @Cleanup
+ MessageProducer producer2 = new MessageProducer(url2, dest, true);
+ log.info("--- Starting producer --- " + url2);
- MessageConsumer consumer1 = new MessageConsumer(url1, dest);
- log.info("--- Starting Consumer --- " + url1);
+ @Cleanup
+ MessageProducer producer3 = new MessageProducer(url3, dest, true);
+ log.info("--- Starting producer --- " + url3);
- MessageConsumer consumer2 = new MessageConsumer(url2, dest);
- log.info("--- Starting Consumer --- " + url2);
+ @Cleanup
+ MessageConsumer consumer1 = new MessageConsumer(url1, dest);
+ log.info("--- Starting Consumer --- " + url1);
- MessageConsumer consumer3 = new MessageConsumer(url3, dest);
- log.info("--- Starting Consumer --- " + url3);
+ @Cleanup
+ MessageConsumer consumer2 = new MessageConsumer(url2, dest);
+ log.info("--- Starting Consumer --- " + url2);
- // Produce from cluster1 and consume from the rest
- producer1.produceBatch(10);
+ @Cleanup
+ MessageConsumer consumer3 = new MessageConsumer(url3, dest);
+ log.info("--- Starting Consumer --- " + url3);
- consumer1.receive(10);
+ // Produce from cluster1 and consume from the rest
+ producer1.produceBatch(10);
- consumer2.receive(10);
+ consumer1.receive(10);
- consumer3.receive(10);
+ consumer2.receive(10);
- // Produce from cluster2 and consume from the rest
- producer2.produceBatch(10);
+ consumer3.receive(10);
- consumer1.receive(10);
+ // Produce from cluster2 and consume from the rest
+ producer2.produceBatch(10);
- consumer2.receive(10);
+ consumer1.receive(10);
- consumer3.receive(10);
+ consumer2.receive(10);
- producer1.close();
- producer2.close();
- producer3.close();
- consumer1.close();
- consumer2.close();
- consumer3.close();
- return null;
- }
- }));
- }
-
- for (Future<Void> result : results) {
- try {
- result.get(5, TimeUnit.SECONDS);
- } catch (Exception e) {
- log.error("exception in getting future result ", e);
- fail(String.format("replication test failed with %s exception", e.getMessage()));
- }
- }
+ consumer3.receive(10);
}
/**
@@ -657,6 +549,8 @@ public class ReplicatorTest extends ReplicatorTestBase {
log.info("--- Starting ReplicatorTest::testDeleteReplicatorFailure ---");
final String topicName = "persistent://pulsar/ns/repltopicbatch";
final TopicName dest = TopicName.get(topicName);
+
+ @Cleanup
MessageProducer producer1 = new MessageProducer(url1, dest);
PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(topicName).get();
final String replicatorClusterName = topic.getReplicators().keys().get(0);
@@ -688,8 +582,6 @@ public class ReplicatorTest extends ReplicatorTestBase {
assertNull(topic.getPersistentReplicator(replicatorClusterName));
return null;
});
-
- producer1.close();
}
@SuppressWarnings("unchecked")
@@ -698,6 +590,8 @@ public class ReplicatorTest extends ReplicatorTestBase {
log.info("--- Starting ReplicatorTest::testDeleteReplicatorFailure ---");
final String topicName = "persistent://pulsar/ns/repltopicbatch";
final TopicName dest = TopicName.get(topicName);
+
+ @Cleanup
MessageProducer producer1 = new MessageProducer(url1, dest);
PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(topicName).get();
final String replicatorClusterName = topic.getReplicators().keys().get(0);
@@ -710,7 +604,6 @@ public class ReplicatorTest extends ReplicatorTestBase {
field.setAccessible(true);
ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) field.get(replicator);
assertNull(producer);
- producer1.close();
}
/**
@@ -737,9 +630,11 @@ public class ReplicatorTest extends ReplicatorTestBase {
.get(String.format("persistent://pulsar/ns1/%s-%d", policy, System.currentTimeMillis()));
// Producer on r1
+ @Cleanup
MessageProducer producer1 = new MessageProducer(url1, dest);
// Consumer on r2
+ @Cleanup
MessageConsumer consumer2 = new MessageConsumer(url2, dest);
// Replicator for r1 -> r2
@@ -780,9 +675,6 @@ public class ReplicatorTest extends ReplicatorTestBase {
}
assertEquals(replicator.getStats().replicationBacklog, 0);
-
- producer1.close();
- consumer2.close();
}
}
@@ -796,10 +688,13 @@ public class ReplicatorTest extends ReplicatorTestBase {
public void testCloseReplicatorStartProducer() throws Exception {
TopicName dest = TopicName.get("persistent://pulsar/ns1/closeCursor");
// Producer on r1
+ @Cleanup
MessageProducer producer1 = new MessageProducer(url1, dest);
// Consumer on r1
+ @Cleanup
MessageConsumer consumer1 = new MessageConsumer(url1, dest);
// Consumer on r2
+ @Cleanup
MessageConsumer consumer2 = new MessageConsumer(url2, dest);
// Replicator for r1 -> r2
@@ -833,10 +728,6 @@ public class ReplicatorTest extends ReplicatorTestBase {
@SuppressWarnings("unchecked")
ProducerImpl<byte[]> replicatorProducer = (ProducerImpl<byte[]>) producerField.get(replicator);
assertEquals(replicatorProducer, null);
-
- producer1.close();
- consumer1.close();
- consumer2.close();
}
@Test(timeOut = 30000)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
index 6aac08a..73b9b91 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
@@ -19,13 +19,15 @@
package org.apache.pulsar.broker.service;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
import com.google.common.collect.Sets;
+import io.netty.util.concurrent.DefaultThreadFactory;
+
import java.net.URL;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
+import java.util.Set;
+import java.util.TreeSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
@@ -37,16 +39,15 @@ import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
-import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.apache.pulsar.zookeeper.ZookeeperServerTest;
import org.slf4j.Logger;
@@ -80,7 +81,8 @@ public class ReplicatorTestBase {
ZookeeperServerTest globalZkS;
- ExecutorService executor = new ThreadPoolExecutor(5, 20, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
+ ExecutorService executor = new ThreadPoolExecutor(5, 20, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<>(),
+ new DefaultThreadFactory("ReplicatorTestBase"));
static final int TIME_TO_CHECK_BACKLOG_QUOTA = 5;
@@ -211,13 +213,10 @@ public class ReplicatorTestBase {
admin1.clusters().createCluster("r3", new ClusterData(url3.toString(), urlTls3.toString(),
pulsar3.getBrokerServiceUrl(), pulsar3.getBrokerServiceUrlTls()));
- admin1.clusters().createCluster("global", new ClusterData("http://global:8080", "https://global:8443"));
admin1.tenants().createTenant("pulsar",
new TenantInfo(Sets.newHashSet("appid1", "appid2", "appid3"), Sets.newHashSet("r1", "r2", "r3")));
- admin1.namespaces().createNamespace("pulsar/ns");
- admin1.namespaces().setNamespaceReplicationClusters("pulsar/ns", Sets.newHashSet("r1", "r2", "r3"));
- admin1.namespaces().createNamespace("pulsar/ns1");
- admin1.namespaces().setNamespaceReplicationClusters("pulsar/ns1", Sets.newHashSet("r1", "r2"));
+ admin1.namespaces().createNamespace("pulsar/ns", Sets.newHashSet("r1", "r2", "r3"));
+ admin1.namespaces().createNamespace("pulsar/ns1", Sets.newHashSet("r1", "r2"));
assertEquals(admin2.clusters().getCluster("r1").getServiceUrl(), url1.toString());
assertEquals(admin2.clusters().getCluster("r2").getServiceUrl(), url2.toString());
@@ -226,6 +225,11 @@ public class ReplicatorTestBase {
assertEquals(admin2.clusters().getCluster("r2").getBrokerServiceUrl(), pulsar2.getBrokerServiceUrl());
assertEquals(admin2.clusters().getCluster("r3").getBrokerServiceUrl(), pulsar3.getBrokerServiceUrl());
+ // Also create V1 namespace for compatibility check
+ admin1.clusters().createCluster("global", new ClusterData("http://global:8080", "https://global:8443"));
+ admin1.namespaces().createNamespace("pulsar/global/ns");
+ admin1.namespaces().setNamespaceReplicationClusters("pulsar/global/ns", Sets.newHashSet("r1", "r2", "r3"));
+
Thread.sleep(100);
log.info("--- ReplicatorTestBase::setup completed ---");
@@ -253,7 +257,7 @@ public class ReplicatorTestBase {
globalZkS.stop();
}
- static class MessageProducer {
+ static class MessageProducer implements AutoCloseable {
URL url;
String namespace;
String topicName;
@@ -289,13 +293,12 @@ public class ReplicatorTestBase {
void produceBatch(int messages) throws Exception {
log.info("Start sending batch messages");
- List<CompletableFuture<MessageId>> futureList = new ArrayList<>();
for (int i = 0; i < messages; i++) {
- futureList.add(producer.sendAsync(("test-" + i).getBytes()));
+ producer.sendAsync(("test-" + i).getBytes());
log.info("queued message {}", ("test-" + i));
}
- FutureUtil.waitForAll(futureList).get();
+ producer.flush();
}
void produce(int messages) throws Exception {
@@ -315,19 +318,23 @@ public class ReplicatorTestBase {
void produce(int messages, TypedMessageBuilder<byte[]> messageBuilder) throws Exception {
log.info("Start sending messages");
for (int i = 0; i < messages; i++) {
- final String m = new String("test-builder-" + i);
+ final String m = new String("test-" + i);
messageBuilder.value(m.getBytes()).send();
log.info("Sent message {}", m);
}
}
- void close() throws Exception {
- client.close();
+ public void close() {
+ try {
+ client.close();
+ } catch (PulsarClientException e) {
+ log.warn("Failed to close client", e);
+ }
}
}
- static class MessageConsumer {
+ static class MessageConsumer implements AutoCloseable {
final URL url;
final String namespace;
final String topicName;
@@ -357,12 +364,24 @@ public class ReplicatorTestBase {
log.info("Start receiving messages");
Message<byte[]> msg;
- for (int i = 0; i < messages; i++) {
- msg = consumer.receive();
+ Set<String> receivedMessages = new TreeSet<>();
+
+ int i = 0;
+ while (i < messages) {
+ msg = consumer.receive(10, TimeUnit.SECONDS);
+ assertNotNull(msg);
consumer.acknowledge(msg);
+
String msgData = new String(msg.getData());
- assertEquals(msgData, "test-" + i);
log.info("Received message {}", msgData);
+
+ boolean added = receivedMessages.add(msgData);
+ if (added) {
+ assertEquals(msgData, "test-" + i);
+ i++;
+ } else {
+ log.info("Ignoring duplicate {}", msgData);
+ }
}
}
@@ -370,8 +389,12 @@ public class ReplicatorTestBase {
return consumer.receive(0, TimeUnit.MICROSECONDS) == null;
}
- void close() throws Exception {
- client.close();
+ public void close() {
+ try {
+ client.close();
+ } catch (PulsarClientException e) {
+ log.warn("Failed to close client", e);
+ }
}
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/v1/V1_ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/v1/V1_ReplicatorTest.java
deleted file mode 100644
index 5d51db8..0000000
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/v1/V1_ReplicatorTest.java
+++ /dev/null
@@ -1,937 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker.service.v1;
-
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.spy;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNull;
-import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.fail;
-
-import com.google.common.collect.Sets;
-import com.scurrilous.circe.checksum.Crc32cIntChecksum;
-
-import io.netty.buffer.ByteBuf;
-
-import java.lang.reflect.Field;
-import java.lang.reflect.Method;
-import java.util.List;
-import java.util.SortedSet;
-import java.util.TreeSet;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
-import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
-import org.apache.bookkeeper.mledger.Entry;
-import org.apache.bookkeeper.mledger.ManagedCursor;
-import org.apache.bookkeeper.mledger.ManagedLedgerException;
-import org.apache.bookkeeper.mledger.ManagedLedgerException.CursorAlreadyClosedException;
-import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
-import org.apache.pulsar.broker.namespace.NamespaceService;
-import org.apache.pulsar.broker.namespace.OwnedBundle;
-import org.apache.pulsar.broker.namespace.OwnershipCache;
-import org.apache.pulsar.broker.service.AbstractReplicator;
-import org.apache.pulsar.broker.service.BrokerService;
-import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
-import org.apache.pulsar.broker.service.Replicator;
-import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
-import org.apache.pulsar.broker.service.persistent.PersistentTopic;
-import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.RawMessage;
-import org.apache.pulsar.client.api.RawReader;
-import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.impl.ProducerImpl;
-import org.apache.pulsar.client.impl.PulsarClientImpl;
-import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
-import org.apache.pulsar.common.api.Commands;
-import org.apache.pulsar.common.naming.NamespaceBundle;
-import org.apache.pulsar.common.naming.NamespaceName;
-import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.policies.data.BacklogQuota;
-import org.apache.pulsar.common.policies.data.BacklogQuota.RetentionPolicy;
-import org.apache.pulsar.common.policies.data.ReplicatorStats;
-import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
-import org.mockito.Mockito;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.DataProvider;
-import org.testng.annotations.Test;
-import org.testng.collections.Lists;
-
-/**
- * Starts 2 brokers that are in 2 different clusters
- */
-public class V1_ReplicatorTest extends V1_ReplicatorTestBase {
-
- protected String methodName;
-
- @BeforeMethod
- public void beforeMethod(Method m) {
- methodName = m.getName();
- }
-
- @Override
- @BeforeClass(timeOut = 60000)
- void setup() throws Exception {
- super.setup();
- }
-
- @Override
- @AfterClass(timeOut = 60000)
- void shutdown() throws Exception {
- super.shutdown();
- }
-
- @DataProvider(name = "partitionedTopic")
- public Object[][] partitionedTopicProvider() {
- return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
- }
-
- @Test(enabled = true, timeOut = 30000)
- public void testConfigChange() throws Exception {
- log.info("--- Starting V1_ReplicatorTest::testConfigChange ---");
- // This test is to verify that the config change on global namespace is successfully applied in broker during
- // runtime.
- // Run a set of producer tasks to create the topics
- List<Future<Void>> results = Lists.newArrayList();
- for (int i = 0; i < 10; i++) {
- final TopicName dest = TopicName.get(String.format("persistent://pulsar/global/ns/topic-%d", i));
-
- results.add(executor.submit(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
-
- MessageProducer producer = new MessageProducer(url1, dest);
- log.info("--- Starting producer --- " + url1);
-
- MessageConsumer consumer = new MessageConsumer(url1, dest);
- log.info("--- Starting Consumer --- " + url1);
-
- producer.produce(2);
-
- consumer.receive(2);
-
- producer.close();
- consumer.close();
- return null;
- }
- }));
- }
-
- for (Future<Void> result : results) {
- try {
- result.get();
- } catch (Exception e) {
- log.error("exception in getting future result ", e);
- fail(String.format("replication test failed with %s exception", e.getMessage()));
- }
- }
-
- Thread.sleep(1000L);
- // Make sure that the internal replicators map contains remote cluster info
- ConcurrentOpenHashMap<String, PulsarClient> replicationClients1 = ns1.getReplicationClients();
- ConcurrentOpenHashMap<String, PulsarClient> replicationClients2 = ns2.getReplicationClients();
- ConcurrentOpenHashMap<String, PulsarClient> replicationClients3 = ns3.getReplicationClients();
-
- Assert.assertNotNull(replicationClients1.get("r2"));
- Assert.assertNotNull(replicationClients1.get("r3"));
- Assert.assertNotNull(replicationClients2.get("r1"));
- Assert.assertNotNull(replicationClients2.get("r3"));
- Assert.assertNotNull(replicationClients3.get("r1"));
- Assert.assertNotNull(replicationClients3.get("r2"));
-
- // Case 1: Update the global namespace replication configuration to only contains the local cluster itself
- admin1.namespaces().setNamespaceReplicationClusters("pulsar/global/ns", Sets.newHashSet("r1"));
-
- // Wait for config changes to be updated.
- Thread.sleep(1000L);
-
- // Make sure that the internal replicators map still contains remote cluster info
- Assert.assertNotNull(replicationClients1.get("r2"));
- Assert.assertNotNull(replicationClients1.get("r3"));
- Assert.assertNotNull(replicationClients2.get("r1"));
- Assert.assertNotNull(replicationClients2.get("r3"));
- Assert.assertNotNull(replicationClients3.get("r1"));
- Assert.assertNotNull(replicationClients3.get("r2"));
-
- // Case 2: Update the configuration back
- admin1.namespaces().setNamespaceReplicationClusters("pulsar/global/ns", Sets.newHashSet("r1", "r2", "r3"));
-
- // Wait for config changes to be updated.
- Thread.sleep(1000L);
-
- // Make sure that the internal replicators map still contains remote cluster info
- Assert.assertNotNull(replicationClients1.get("r2"));
- Assert.assertNotNull(replicationClients1.get("r3"));
- Assert.assertNotNull(replicationClients2.get("r1"));
- Assert.assertNotNull(replicationClients2.get("r3"));
- Assert.assertNotNull(replicationClients3.get("r1"));
- Assert.assertNotNull(replicationClients3.get("r2"));
-
- // Case 3: TODO: Once automatic cleanup is implemented, add tests case to verify auto removal of clusters
- }
-
- @SuppressWarnings("unchecked")
- @Test(timeOut = 30000)
- public void testConcurrentReplicator() throws Exception {
-
- log.info("--- Starting V1_ReplicatorTest::testConcurrentReplicator ---");
-
- final String namespace = "pulsar/global/concurrent";
- admin1.namespaces().createNamespace(namespace);
- admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2"));
- final TopicName topicName = TopicName.get(String.format("persistent://" + namespace + "/topic-%d", 0));
- PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS)
- .build();
- Producer<byte[]> producer = client1.newProducer().topic(topicName.toString()).create();
- producer.close();
-
- PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getOrCreateTopic(topicName.toString()).get();
-
- PulsarClientImpl pulsarClient = spy((PulsarClientImpl) pulsar1.getBrokerService().getReplicationClient("r3"));
- final Method startRepl = PersistentTopic.class.getDeclaredMethod("startReplicator", String.class);
- startRepl.setAccessible(true);
-
- Field replClientField = BrokerService.class.getDeclaredField("replicationClients");
- replClientField.setAccessible(true);
- ConcurrentOpenHashMap<String, PulsarClient> replicationClients = (ConcurrentOpenHashMap<String, PulsarClient>) replClientField
- .get(pulsar1.getBrokerService());
- replicationClients.put("r3", pulsarClient);
-
- admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2", "r3"));
- ExecutorService executor = Executors.newFixedThreadPool(5);
- for (int i = 0; i < 5; i++) {
- executor.submit(() -> {
- try {
- startRepl.invoke(topic, "r3");
- } catch (Exception e) {
- fail("setting replicator failed", e);
- }
- });
- }
- Thread.sleep(3000);
-
- Mockito.verify(pulsarClient, Mockito.times(1)).createProducerAsync(Mockito.any(ProducerConfigurationData.class),
- Mockito.any(Schema.class), eq(null));
-
- client1.shutdown();
- }
-
- @Test(enabled = false, timeOut = 30000)
- public void testConfigChangeNegativeCases() throws Exception {
- log.info("--- Starting V1_ReplicatorTest::testConfigChangeNegativeCases ---");
- // Negative test cases for global namespace config change. Verify that the namespace config change can not be
- // updated when the namespace is being unloaded.
- // Set up field access to internal namespace state in NamespaceService
- Field ownershipCacheField = NamespaceService.class.getDeclaredField("ownershipCache");
- ownershipCacheField.setAccessible(true);
- OwnershipCache ownerCache = (OwnershipCache) ownershipCacheField.get(pulsar1.getNamespaceService());
- Assert.assertNotNull(pulsar1, "pulsar1 is null");
- Assert.assertNotNull(pulsar1.getNamespaceService(), "pulsar1.getNamespaceService() is null");
- NamespaceBundle globalNsBundle = pulsar1.getNamespaceService().getNamespaceBundleFactory()
- .getFullBundle(NamespaceName.get("pulsar/global/ns"));
- ownerCache.tryAcquiringOwnership(globalNsBundle);
- Assert.assertNotNull(ownerCache.getOwnedBundle(globalNsBundle),
- "pulsar1.getNamespaceService().getOwnedServiceUnit(NamespaceName.get(\"pulsar/global/ns\")) is null");
- Field stateField = OwnedBundle.class.getDeclaredField("isActive");
- stateField.setAccessible(true);
- // set the namespace to be disabled
- ownerCache.disableOwnership(globalNsBundle);
-
- // Make sure the namespace config update failed
- try {
- admin1.namespaces().setNamespaceReplicationClusters("pulsar/global/ns", Sets.newHashSet("r1"));
- fail("Should have raised exception");
- } catch (PreconditionFailedException pfe) {
- // OK
- }
-
- // restore the namespace state
- ownerCache.removeOwnership(globalNsBundle).get();
- ownerCache.tryAcquiringOwnership(globalNsBundle);
- }
-
- @Test(enabled = true, timeOut = 30000)
- public void testReplication() throws Exception {
-
- log.info("--- Starting V1_ReplicatorTest::testReplication ---");
-
- // This test is to verify that the config change on global namespace is successfully applied in broker during
- // runtime.
- // Run a set of producer tasks to create the topics
- SortedSet<String> testDests = new TreeSet<String>();
- List<Future<Void>> results = Lists.newArrayList();
- for (int i = 0; i < 3; i++) {
- final TopicName dest = TopicName.get(String.format("persistent://pulsar/global/ns/repltopic-%d", i));
- testDests.add(dest.toString());
-
- results.add(executor.submit(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
-
- MessageProducer producer1 = new MessageProducer(url1, dest);
- log.info("--- Starting producer --- " + url1);
-
- MessageProducer producer2 = new MessageProducer(url2, dest);
- log.info("--- Starting producer --- " + url2);
-
- MessageProducer producer3 = new MessageProducer(url3, dest);
- log.info("--- Starting producer --- " + url3);
-
- MessageConsumer consumer1 = new MessageConsumer(url1, dest);
- log.info("--- Starting Consumer --- " + url1);
-
- MessageConsumer consumer2 = new MessageConsumer(url2, dest);
- log.info("--- Starting Consumer --- " + url2);
-
- MessageConsumer consumer3 = new MessageConsumer(url3, dest);
- log.info("--- Starting Consumer --- " + url3);
-
- // Produce from cluster1 and consume from the rest
- producer1.produce(2);
-
- consumer1.receive(2);
-
- consumer2.receive(2);
-
- consumer3.receive(2);
-
- // Produce from cluster2 and consume from the rest
- producer2.produce(2);
-
- consumer1.receive(2);
-
- consumer2.receive(2);
-
- consumer3.receive(2);
-
- // Produce from cluster3 and consume from the rest
- producer3.produce(2);
-
- consumer1.receive(2);
-
- consumer2.receive(2);
-
- consumer3.receive(2);
-
- // Produce from cluster1&2 and consume from cluster3
- producer1.produce(1);
- producer2.produce(1);
-
- consumer1.receive(1);
-
- consumer2.receive(1);
-
- consumer3.receive(1);
-
- consumer1.receive(1);
-
- consumer2.receive(1);
-
- consumer3.receive(1);
-
- producer1.close();
- producer2.close();
- producer3.close();
- consumer1.close();
- consumer2.close();
- consumer3.close();
- return null;
- }
- }));
- }
-
- for (Future<Void> result : results) {
- try {
- result.get();
- } catch (Exception e) {
- log.error("exception in getting future result ", e);
- fail(String.format("replication test failed with %s exception", e.getMessage()));
- }
- }
- }
-
- @Test(enabled = false, timeOut = 30000)
- public void testReplicationOverrides() throws Exception {
-
- log.info("--- Starting V1_ReplicatorTest::testReplicationOverrides ---");
-
- // This test is to verify that the config change on global namespace is successfully applied in broker during
- // runtime.
- // Run a set of producer tasks to create the topics
- SortedSet<String> testDests = new TreeSet<String>();
- List<Future<Void>> results = Lists.newArrayList();
- for (int i = 0; i < 10; i++) {
- final TopicName dest = TopicName.get(String.format("persistent://pulsar/global/ns/repltopic-%d", i));
- testDests.add(dest.toString());
-
- results.add(executor.submit(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
-
- MessageProducer producer1 = new MessageProducer(url1, dest);
- log.info("--- Starting producer --- " + url1);
-
- MessageProducer producer2 = new MessageProducer(url2, dest);
- log.info("--- Starting producer --- " + url2);
-
- MessageProducer producer3 = new MessageProducer(url3, dest);
- log.info("--- Starting producer --- " + url3);
-
- MessageConsumer consumer1 = new MessageConsumer(url1, dest);
- log.info("--- Starting Consumer --- " + url1);
-
- MessageConsumer consumer2 = new MessageConsumer(url2, dest);
- log.info("--- Starting Consumer --- " + url2);
-
- MessageConsumer consumer3 = new MessageConsumer(url3, dest);
- log.info("--- Starting Consumer --- " + url3);
-
- // Produce a message that isn't replicated
- producer1.produce(1, producer1.newMessage().disableReplication());
-
- // Produce a message not replicated to r2
- producer1.produce(1,
- producer1.newMessage().replicationClusters(Lists.newArrayList("r1", "r3")));
-
- // Produce a default replicated message
- producer1.produce(1);
-
- consumer1.receive(3);
- consumer2.receive(1);
- if (!consumer2.drained()) {
- throw new Exception("consumer2 - unexpected message in queue");
- }
- consumer3.receive(2);
- if (!consumer3.drained()) {
- throw new Exception("consumer3 - unexpected message in queue");
- }
-
- producer1.close();
- producer2.close();
- producer3.close();
- consumer1.close();
- consumer2.close();
- consumer3.close();
- return null;
- }
- }));
- }
-
- for (Future<Void> result : results) {
- try {
- result.get();
- } catch (Exception e) {
- log.error("exception in getting future result ", e);
- fail(String.format("replication test failed with %s exception", e.getMessage()));
- }
- }
- }
-
- @Test(enabled = true, timeOut = 30000)
- public void testFailures() throws Exception {
-
- log.info("--- Starting V1_ReplicatorTest::testFailures ---");
-
- try {
- // 1. Create a consumer using the reserved consumer id prefix "pulsar.repl."
-
- final TopicName dest = TopicName.get(String.format("persistent://pulsar/global/ns/res-cons-id"));
-
- // Create another consumer using replication prefix as sub id
- MessageConsumer consumer = new MessageConsumer(url2, dest, "pulsar.repl.");
- consumer.close();
-
- } catch (Exception e) {
- // SUCCESS
- }
-
- }
-
- @Test(timeOut = 30000)
- public void testReplicatePeekAndSkip() throws Exception {
-
- SortedSet<String> testDests = new TreeSet<String>();
-
- final TopicName dest = TopicName.get("persistent://pulsar/global/ns/peekAndSeekTopic");
- testDests.add(dest.toString());
-
- MessageProducer producer1 = new MessageProducer(url1, dest);
- MessageConsumer consumer1 = new MessageConsumer(url3, dest);
-
- // Produce from cluster1 and consume from the rest
- producer1.produce(2);
- producer1.close();
- PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(dest.toString()).get();
- PersistentReplicator replicator = (PersistentReplicator) topic.getReplicators()
- .get(topic.getReplicators().keys().get(0));
- replicator.skipMessages(2);
- CompletableFuture<Entry> result = replicator.peekNthMessage(1);
- Entry entry = result.get(50, TimeUnit.MILLISECONDS);
- assertNull(entry);
- consumer1.close();
- }
-
- @Test(timeOut = 30000)
- public void testReplicatorClearBacklog() throws Exception {
-
- // This test is to verify that reset cursor fails on global topic
- SortedSet<String> testDests = new TreeSet<String>();
-
- final TopicName dest = TopicName.get("persistent://pulsar/global/ns/clearBacklogTopic");
- testDests.add(dest.toString());
-
- MessageProducer producer1 = new MessageProducer(url1, dest);
- MessageConsumer consumer1 = new MessageConsumer(url3, dest);
-
- // Produce from cluster1 and consume from the rest
- producer1.produce(2);
- producer1.close();
- PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(dest.toString()).get();
- PersistentReplicator replicator = (PersistentReplicator) spy(
- topic.getReplicators().get(topic.getReplicators().keys().get(0)));
- replicator.readEntriesFailed(new ManagedLedgerException.InvalidCursorPositionException("failed"), null);
- replicator.clearBacklog().get();
- Thread.sleep(100);
- replicator.updateRates(); // for code-coverage
- replicator.expireMessages(1); // for code-coverage
- ReplicatorStats status = replicator.getStats();
- assertTrue(status.replicationBacklog == 0);
- consumer1.close();
- }
-
- @SuppressWarnings("deprecation")
- @Test(enabled = true, timeOut = 30000)
- public void testResetCursorNotFail() throws Exception {
-
- log.info("--- Starting V1_ReplicatorTest::testResetCursorNotFail ---");
-
- // This test is to verify that reset cursor fails on global topic
- SortedSet<String> testDests = new TreeSet<String>();
- List<Future<Void>> results = Lists.newArrayList();
- for (int i = 0; i < 1; i++) {
- final TopicName dest = TopicName.get(String.format("persistent://pulsar/global/ns/resetrepltopic-%d", i));
- testDests.add(dest.toString());
-
- results.add(executor.submit(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
-
- MessageProducer producer1 = new MessageProducer(url1, dest);
- log.info("--- Starting producer --- " + url1);
-
- MessageConsumer consumer1 = new MessageConsumer(url1, dest);
- log.info("--- Starting Consumer --- " + url1);
-
- // Produce from cluster1 and consume from the rest
- producer1.produce(2);
-
- consumer1.receive(2);
-
- producer1.close();
- consumer1.close();
- return null;
- }
- }));
- }
-
- for (Future<Void> result : results) {
- try {
- result.get();
- } catch (Exception e) {
- log.error("exception in getting future result ", e);
- fail(String.format("replication test failed with %s exception", e.getMessage()));
- }
- }
- admin1.persistentTopics().resetCursor(testDests.first(), "sub-id", System.currentTimeMillis());
- }
-
- @Test(enabled = true, timeOut = 30000)
- public void testReplicationForBatchMessages() throws Exception {
-
- log.info("--- Starting V1_ReplicatorTest::testReplicationForBatchMessages ---");
-
- // Run a set of producer tasks to create the topics
- SortedSet<String> testDests = new TreeSet<String>();
- List<Future<Void>> results = Lists.newArrayList();
- for (int i = 0; i < 3; i++) {
- final TopicName dest = TopicName.get(String.format("persistent://pulsar/global/ns/repltopicbatch-%d", i));
- testDests.add(dest.toString());
-
- results.add(executor.submit(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
-
- MessageProducer producer1 = new MessageProducer(url1, dest, true);
- log.info("--- Starting producer --- " + url1);
-
- MessageProducer producer2 = new MessageProducer(url2, dest, true);
- log.info("--- Starting producer --- " + url2);
-
- MessageProducer producer3 = new MessageProducer(url3, dest, true);
- log.info("--- Starting producer --- " + url3);
-
- MessageConsumer consumer1 = new MessageConsumer(url1, dest);
- log.info("--- Starting Consumer --- " + url1);
-
- MessageConsumer consumer2 = new MessageConsumer(url2, dest);
- log.info("--- Starting Consumer --- " + url2);
-
- MessageConsumer consumer3 = new MessageConsumer(url3, dest);
- log.info("--- Starting Consumer --- " + url3);
-
- // Produce from cluster1 and consume from the rest
- producer1.produceBatch(10);
-
- consumer1.receive(10);
-
- consumer2.receive(10);
-
- consumer3.receive(10);
-
- // Produce from cluster2 and consume from the rest
- producer2.produceBatch(10);
-
- consumer1.receive(10);
-
- consumer2.receive(10);
-
- consumer3.receive(10);
-
- producer1.close();
- producer2.close();
- producer3.close();
- consumer1.close();
- consumer2.close();
- consumer3.close();
- return null;
- }
- }));
- }
-
- for (Future<Void> result : results) {
- try {
- result.get(5, TimeUnit.SECONDS);
- } catch (Exception e) {
- log.error("exception in getting future result ", e);
- fail(String.format("replication test failed with %s exception", e.getMessage()));
- }
- }
- }
-
- /**
- * It verifies that: if it fails while removing replicator-cluster-cursor: it should not restart the replicator and
- * it should have cleaned up from the list
- *
- * @throws Exception
- */
- @Test(timeOut = 30000)
- public void testDeleteReplicatorFailure() throws Exception {
- log.info("--- Starting V1_ReplicatorTest::testDeleteReplicatorFailure ---");
- final String topicName = "persistent://pulsar/global/ns/repltopicbatch";
- final TopicName dest = TopicName.get(topicName);
- MessageProducer producer1 = new MessageProducer(url1, dest);
- PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(topicName).get();
- final String replicatorClusterName = topic.getReplicators().keys().get(0);
- ManagedLedgerImpl ledger = (ManagedLedgerImpl) topic.getManagedLedger();
- CountDownLatch latch = new CountDownLatch(1);
- // delete cursor already : so next time if topic.removeReplicator will get exception but then it should
- // remove-replicator from the list even with failure
- ledger.asyncDeleteCursor("pulsar.repl." + replicatorClusterName, new DeleteCursorCallback() {
- @Override
- public void deleteCursorComplete(Object ctx) {
- latch.countDown();
- }
-
- @Override
- public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) {
- latch.countDown();
- }
- }, null);
- latch.await();
-
- Method removeReplicator = PersistentTopic.class.getDeclaredMethod("removeReplicator", String.class);
- removeReplicator.setAccessible(true);
- // invoke removeReplicator : it fails as cursor is not present: but still it should remove the replicator from
- // list without restarting it
- @SuppressWarnings("unchecked")
- CompletableFuture<Void> result = (CompletableFuture<Void>) removeReplicator.invoke(topic,
- replicatorClusterName);
- result.thenApply((v) -> {
- assertNull(topic.getPersistentReplicator(replicatorClusterName));
- return null;
- });
-
- producer1.close();
- }
-
- @SuppressWarnings("unchecked")
- @Test(priority = 5, timeOut = 30000)
- public void testReplicatorProducerClosing() throws Exception {
- log.info("--- Starting V1_ReplicatorTest::testDeleteReplicatorFailure ---");
- final String topicName = "persistent://pulsar/global/ns/repltopicbatch";
- final TopicName dest = TopicName.get(topicName);
- MessageProducer producer1 = new MessageProducer(url1, dest);
- PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(topicName).get();
- final String replicatorClusterName = topic.getReplicators().keys().get(0);
- Replicator replicator = topic.getPersistentReplicator(replicatorClusterName);
- pulsar2.close();
- pulsar3.close();
- replicator.disconnect(false);
- Thread.sleep(100);
- Field field = AbstractReplicator.class.getDeclaredField("producer");
- field.setAccessible(true);
- ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) field.get(replicator);
- assertNull(producer);
- producer1.close();
- }
-
- /**
- * Issue #199
- *
- * It verifies that: if the remote cluster reaches backlog quota limit, replicator temporarily stops and once the
- * backlog drains it should resume replication.
- *
- * @throws Exception
- */
-
- @Test(timeOut = 60000, enabled = true, priority = -1)
- public void testResumptionAfterBacklogRelaxed() throws Exception {
- List<RetentionPolicy> policies = Lists.newArrayList();
- policies.add(RetentionPolicy.producer_exception);
- policies.add(RetentionPolicy.producer_request_hold);
-
- for (RetentionPolicy policy : policies) {
- // Use 1Mb quota by default
- admin1.namespaces().setBacklogQuota("pulsar/global/ns1", new BacklogQuota(1 * 1024 * 1024, policy));
- Thread.sleep(200);
-
- TopicName dest = TopicName
- .get(String.format("persistent://pulsar/global/ns1/%s-%d", policy, System.currentTimeMillis()));
-
- // Producer on r1
- MessageProducer producer1 = new MessageProducer(url1, dest);
-
- // Consumer on r2
- MessageConsumer consumer2 = new MessageConsumer(url2, dest);
-
- // Replicator for r1 -> r2
- PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(dest.toString()).get();
- Replicator replicator = topic.getPersistentReplicator("r2");
-
- // Produce 1 message in r1. This message will be replicated immediately into r2 and it will become part of
- // local backlog
- producer1.produce(1);
-
- Thread.sleep(500);
-
- // Restrict backlog quota limit to 1 byte to stop replication
- admin1.namespaces().setBacklogQuota("pulsar/global/ns1", new BacklogQuota(1, policy));
-
- Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA + 1) * 1000);
-
- assertEquals(replicator.getStats().replicationBacklog, 0);
-
- // Next message will not be replicated, because r2 has reached the quota
- producer1.produce(1);
-
- Thread.sleep(500);
-
- assertEquals(replicator.getStats().replicationBacklog, 1);
-
- // Consumer will now drain 1 message and the replication backlog will be cleared
- consumer2.receive(1);
-
- // Wait until the 2nd message got delivered to consumer
- consumer2.receive(1);
-
- int retry = 10;
- for (int i = 0; i < retry && replicator.getStats().replicationBacklog > 0; i++) {
- if (i != retry - 1) {
- Thread.sleep(100);
- }
- }
-
- assertEquals(replicator.getStats().replicationBacklog, 0);
-
- producer1.close();
- consumer2.close();
- }
- }
-
- /**
- * It verifies that PersistentReplicator considers CursorAlreadyClosedException as non-retriable-read exception and
- * it should closed the producer as cursor is already closed because replicator is already deleted.
- *
- * @throws Exception
- */
- @Test(timeOut = 15000)
- public void testCloseReplicatorStartProducer() throws Exception {
-
- TopicName dest = TopicName.get("persistent://pulsar/global/ns1/closeCursor");
- // Producer on r1
- MessageProducer producer1 = new MessageProducer(url1, dest);
- // Consumer on r1
- MessageConsumer consumer1 = new MessageConsumer(url1, dest);
- // Consumer on r2
- MessageConsumer consumer2 = new MessageConsumer(url2, dest);
-
- // Replicator for r1 -> r2
- PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(dest.toString()).get();
- PersistentReplicator replicator = (PersistentReplicator) topic.getPersistentReplicator("r2");
-
- // close the cursor
- Field cursorField = PersistentReplicator.class.getDeclaredField("cursor");
- cursorField.setAccessible(true);
- ManagedCursor cursor = (ManagedCursor) cursorField.get(replicator);
- cursor.close();
- // try to read entries
- CountDownLatch latch = new CountDownLatch(1);
- producer1.produce(10);
- cursor.asyncReadEntriesOrWait(10, new ReadEntriesCallback() {
- @Override
- public void readEntriesComplete(List<Entry> entries, Object ctx) {
- latch.countDown();
- fail("it should have been failed");
- }
-
- @Override
- public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
- latch.countDown();
- assertTrue(exception instanceof CursorAlreadyClosedException);
- }
- }, null);
-
- // replicator-readException: cursorAlreadyClosed
- replicator.readEntriesFailed(new CursorAlreadyClosedException("Cursor already closed exception"), null);
-
- // wait replicator producer to be closed
- Thread.sleep(1000);
-
- // Replicator producer must be closed
- Field producerField = AbstractReplicator.class.getDeclaredField("producer");
- producerField.setAccessible(true);
- @SuppressWarnings("unchecked")
- ProducerImpl<byte[]> replicatorProducer = (ProducerImpl<byte[]>) producerField.get(replicator);
- assertEquals(replicatorProducer, null);
-
- producer1.close();
- consumer1.close();
- consumer2.close();
- }
-
- @Test(timeOut = 30000)
- public void verifyChecksumAfterReplication() throws Exception {
- final String topicName = "persistent://pulsar/global/ns/checksumAfterReplication";
-
- PulsarClient c1 = PulsarClient.builder().serviceUrl(url1.toString()).build();
- Producer<byte[]> p1 = c1.newProducer().topic(topicName).create();
-
- PulsarClient c2 = PulsarClient.builder().serviceUrl(url2.toString()).build();
- RawReader reader2 = RawReader.create(c2, topicName, "sub").get();
-
- p1.send("Hello".getBytes());
-
- RawMessage msg = reader2.readNextAsync().get();
-
- ByteBuf b = msg.getHeadersAndPayload();
-
- assertTrue(Commands.hasChecksum(b));
- int parsedChecksum = Commands.readChecksum(b);
- int computedChecksum = Crc32cIntChecksum.computeChecksum(b);
-
- assertEquals(parsedChecksum, computedChecksum);
-
- p1.close();
- reader2.closeAsync().get();
- }
-
- /**
- * It verifies that broker should not start replicator for partitioned-topic (topic without -partition postfix)
- *
- * @param isPartitionedTopic
- * @throws Exception
- */
- @SuppressWarnings("deprecation")
- @Test(dataProvider = "partitionedTopic")
- public void testReplicatorOnPartitionedTopic(boolean isPartitionedTopic) throws Exception {
-
- log.info("--- Starting V1_ReplicatorTest::{} --- ", methodName);
-
- final String namespace = "pulsar/global/partitionedNs-" + isPartitionedTopic;
- final String persistentTopicName = "persistent://" + namespace + "/partTopic-" + isPartitionedTopic;
- final String nonPersistentTopicName = "non-persistent://" + namespace + "/partTopic-" + isPartitionedTopic;
- BrokerService brokerService = pulsar1.getBrokerService();
-
- admin1.namespaces().createNamespace(namespace);
- admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2", "r3"));
-
- if (isPartitionedTopic) {
- admin1.persistentTopics().createPartitionedTopic(persistentTopicName, 5);
- admin1.nonPersistentTopics().createPartitionedTopic(nonPersistentTopicName, 5);
- }
-
- // load namespace with dummy topic on ns
- PulsarClient client = PulsarClient.builder().serviceUrl(url1.toString()).build();
- client.newProducer().topic("persistent://" + namespace + "/dummyTopic").create();
-
- // persistent topic test
- try {
- brokerService.getOrCreateTopic(persistentTopicName).get();
- if (isPartitionedTopic) {
- fail("Topic creation fails with partitioned topic as replicator init fails");
- }
- } catch (Exception e) {
- if (!isPartitionedTopic) {
- fail("Topic creation should not fail without any partitioned topic");
- }
- assertTrue(e.getCause() instanceof NamingException);
- }
-
- // non-persistent topic test
- try {
- brokerService.getOrCreateTopic(nonPersistentTopicName).get();
- if (isPartitionedTopic) {
- fail("Topic creation fails with partitioned topic as replicator init fails");
- }
- } catch (Exception e) {
- if (!isPartitionedTopic) {
- fail("Topic creation should not fail without any partitioned topic");
- }
- assertTrue(e.getCause() instanceof NamingException);
- }
-
- }
-
- private static final Logger log = LoggerFactory.getLogger(V1_ReplicatorTest.class);
-
-}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/v1/V1_ReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/v1/V1_ReplicatorTestBase.java
deleted file mode 100644
index 4026632..0000000
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/v1/V1_ReplicatorTestBase.java
+++ /dev/null
@@ -1,377 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker.service.v1;
-
-import static org.testng.Assert.assertEquals;
-
-import com.google.common.collect.Sets;
-
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.bookkeeper.test.PortManager;
-import org.apache.pulsar.broker.PulsarService;
-import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.broker.service.BrokerService;
-import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerBuilder;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.TypedMessageBuilder;
-import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.TenantInfo;
-import org.apache.pulsar.common.util.FutureUtil;
-import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
-import org.apache.pulsar.zookeeper.ZookeeperServerTest;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class V1_ReplicatorTestBase {
- URL url1;
- URL urlTls1;
- ServiceConfiguration config1 = new ServiceConfiguration();
- PulsarService pulsar1;
- BrokerService ns1;
-
- PulsarAdmin admin1;
- LocalBookkeeperEnsemble bkEnsemble1;
-
- URL url2;
- URL urlTls2;
- ServiceConfiguration config2 = new ServiceConfiguration();
- PulsarService pulsar2;
- BrokerService ns2;
- PulsarAdmin admin2;
- LocalBookkeeperEnsemble bkEnsemble2;
-
- URL url3;
- URL urlTls3;
- ServiceConfiguration config3 = new ServiceConfiguration();
- PulsarService pulsar3;
- BrokerService ns3;
- PulsarAdmin admin3;
- LocalBookkeeperEnsemble bkEnsemble3;
-
- ZookeeperServerTest globalZkS;
-
- ExecutorService executor = new ThreadPoolExecutor(5, 20, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
-
- static final int TIME_TO_CHECK_BACKLOG_QUOTA = 5;
-
- protected final static String TLS_SERVER_CERT_FILE_PATH = "./src/test/resources/certificate/server.crt";
- protected final static String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/certificate/server.key";
-
- // Default frequency
- public int getBrokerServicePurgeInactiveFrequency() {
- return 60;
- }
-
- public boolean isBrokerServicePurgeInactiveTopic() {
- return false;
- }
-
- void setup() throws Exception {
- log.info("--- Starting V1_ReplicatorTestBase::setup ---");
- int globalZKPort = PortManager.nextFreePort();
- globalZkS = new ZookeeperServerTest(globalZKPort);
- globalZkS.start();
-
- // Start region 1
- int zkPort1 = PortManager.nextFreePort();
- bkEnsemble1 = new LocalBookkeeperEnsemble(3, zkPort1, () -> PortManager.nextFreePort());
- bkEnsemble1.start();
-
- int webServicePort1 = PortManager.nextFreePort();
- int webServicePortTls1 = PortManager.nextFreePort();
-
- // NOTE: we have to instantiate a new copy of System.getProperties() to make sure pulsar1 and pulsar2 have
- // completely
- // independent config objects instead of referring to the same properties object
- config1.setClusterName("r1");
- config1.setAdvertisedAddress("localhost");
- config1.setWebServicePort(webServicePort1);
- config1.setWebServicePortTls(webServicePortTls1);
- config1.setZookeeperServers("127.0.0.1:" + zkPort1);
- config1.setConfigurationStoreServers("127.0.0.1:" + globalZKPort + "/foo");
- config1.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic());
- config1.setBrokerServicePurgeInactiveFrequencyInSeconds(
- inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
- config1.setBrokerServicePort(PortManager.nextFreePort());
- config1.setBrokerServicePortTls(PortManager.nextFreePort());
- config1.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
- config1.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
- config1.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH);
- config1.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
- config1.setDefaultNumberOfNamespaceBundles(1);
- pulsar1 = new PulsarService(config1);
- pulsar1.start();
- ns1 = pulsar1.getBrokerService();
-
- url1 = new URL("http://localhost:" + webServicePort1);
- urlTls1 = new URL("https://localhost:" + webServicePortTls1);
- admin1 = PulsarAdmin.builder().serviceHttpUrl(url1.toString()).build();
-
- // Start region 2
-
- // Start zk & bks
- int zkPort2 = PortManager.nextFreePort();
- bkEnsemble2 = new LocalBookkeeperEnsemble(3, zkPort2, () -> PortManager.nextFreePort());
- bkEnsemble2.start();
-
- int webServicePort2 = PortManager.nextFreePort();
- int webServicePortTls2 = PortManager.nextFreePort();
- config2.setClusterName("r2");
- config2.setAdvertisedAddress("localhost");
- config2.setWebServicePort(webServicePort2);
- config2.setWebServicePortTls(webServicePortTls2);
- config2.setZookeeperServers("127.0.0.1:" + zkPort2);
- config2.setConfigurationStoreServers("127.0.0.1:" + globalZKPort + "/foo");
- config2.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic());
- config2.setBrokerServicePurgeInactiveFrequencyInSeconds(
- inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
- config2.setBrokerServicePort(PortManager.nextFreePort());
- config2.setBrokerServicePortTls(PortManager.nextFreePort());
- config2.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
- config2.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
- config2.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH);
- config2.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
- config2.setDefaultNumberOfNamespaceBundles(1);
- pulsar2 = new PulsarService(config2);
- pulsar2.start();
- ns2 = pulsar2.getBrokerService();
-
- url2 = new URL("http://localhost:" + webServicePort2);
- urlTls2 = new URL("https://localhost:" + webServicePortTls2);
- admin2 = PulsarAdmin.builder().serviceHttpUrl(url2.toString()).build();
-
- // Start region 3
-
- // Start zk & bks
- int zkPort3 = PortManager.nextFreePort();
- bkEnsemble3 = new LocalBookkeeperEnsemble(3, zkPort3, () -> PortManager.nextFreePort());
- bkEnsemble3.start();
-
- int webServicePort3 = PortManager.nextFreePort();
- int webServicePortTls3 = PortManager.nextFreePort();
- config3.setClusterName("r3");
- config3.setAdvertisedAddress("localhost");
- config3.setWebServicePort(webServicePort3);
- config3.setWebServicePortTls(webServicePortTls3);
- config3.setZookeeperServers("127.0.0.1:" + zkPort3);
- config3.setConfigurationStoreServers("127.0.0.1:" + globalZKPort + "/foo");
- config3.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic());
- config3.setBrokerServicePurgeInactiveFrequencyInSeconds(
- inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
- config3.setBrokerServicePort(PortManager.nextFreePort());
- config3.setBrokerServicePortTls(PortManager.nextFreePort());
- config3.setTlsEnabled(true);
- config3.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
- config3.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
- config3.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH);
- config3.setDefaultNumberOfNamespaceBundles(1);
- pulsar3 = new PulsarService(config3);
- pulsar3.start();
- ns3 = pulsar3.getBrokerService();
-
- url3 = new URL("http://localhost:" + webServicePort3);
- urlTls3 = new URL("https://localhost:" + webServicePortTls3);
- admin3 = PulsarAdmin.builder().serviceHttpUrl(url3.toString()).build();
-
- // Provision the global namespace
- admin1.clusters().createCluster("r1", new ClusterData(url1.toString(), urlTls1.toString(),
- pulsar1.getBrokerServiceUrl(), pulsar1.getBrokerServiceUrlTls()));
- admin1.clusters().createCluster("r2", new ClusterData(url2.toString(), urlTls2.toString(),
- pulsar2.getBrokerServiceUrl(), pulsar2.getBrokerServiceUrlTls()));
- admin1.clusters().createCluster("r3", new ClusterData(url3.toString(), urlTls3.toString(),
- pulsar3.getBrokerServiceUrl(), pulsar3.getBrokerServiceUrlTls()));
-
- admin1.clusters().createCluster("global", new ClusterData("http://global:8080", "https://global:8443"));
- admin1.tenants().createTenant("pulsar",
- new TenantInfo(Sets.newHashSet("appid1", "appid2", "appid3"), Sets.newHashSet("r1", "r2", "r3")));
- admin1.namespaces().createNamespace("pulsar/global/ns");
- admin1.namespaces().setNamespaceReplicationClusters("pulsar/global/ns", Sets.newHashSet("r1", "r2", "r3"));
- admin1.namespaces().createNamespace("pulsar/global/ns1");
- admin1.namespaces().setNamespaceReplicationClusters("pulsar/global/ns1", Sets.newHashSet("r1", "r2"));
-
- assertEquals(admin2.clusters().getCluster("r1").getServiceUrl(), url1.toString());
- assertEquals(admin2.clusters().getCluster("r2").getServiceUrl(), url2.toString());
- assertEquals(admin2.clusters().getCluster("r3").getServiceUrl(), url3.toString());
- assertEquals(admin2.clusters().getCluster("r1").getBrokerServiceUrl(), pulsar1.getBrokerServiceUrl());
- assertEquals(admin2.clusters().getCluster("r2").getBrokerServiceUrl(), pulsar2.getBrokerServiceUrl());
- assertEquals(admin2.clusters().getCluster("r3").getBrokerServiceUrl(), pulsar3.getBrokerServiceUrl());
-
- Thread.sleep(100);
- log.info("--- V1_ReplicatorTestBase::setup completed ---");
-
- }
-
- private int inSec(int time, TimeUnit unit) {
- return (int) TimeUnit.SECONDS.convert(time, unit);
- }
-
- void shutdown() throws Exception {
- log.info("--- Shutting down ---");
- executor.shutdown();
-
- admin1.close();
- admin2.close();
- admin3.close();
-
- pulsar3.close();
- pulsar2.close();
- pulsar1.close();
-
- bkEnsemble1.stop();
- bkEnsemble2.stop();
- bkEnsemble3.stop();
- globalZkS.stop();
- }
-
- static class MessageProducer {
- URL url;
- String namespace;
- String topicName;
- PulsarClient client;
- Producer<byte[]> producer;
-
- MessageProducer(URL url, final TopicName dest) throws Exception {
- this.url = url;
- this.namespace = dest.getNamespace();
- this.topicName = dest.toString();
- client = PulsarClient.builder().serviceUrl(url.toString()).statsInterval(0, TimeUnit.SECONDS).build();
- producer = client.newProducer().topic(topicName).create();
-
- }
-
- MessageProducer(URL url, final TopicName dest, boolean batch) throws Exception {
- this.url = url;
- this.namespace = dest.getNamespace();
- this.topicName = dest.toString();
- client = PulsarClient.builder().serviceUrl(url.toString()).statsInterval(0, TimeUnit.SECONDS).build();
- ProducerBuilder<byte[]> producerBuilder = client.newProducer().topic(topicName);
- if (batch) {
- producerBuilder.enableBatching(true);
- producerBuilder.batchingMaxPublishDelay(1, TimeUnit.SECONDS);
- producerBuilder.batchingMaxMessages(5);
- }
- producer = producerBuilder.create();
-
- }
-
- void produceBatch(int messages) throws Exception {
- log.info("Start sending batch messages");
- List<CompletableFuture<MessageId>> futureList = new ArrayList<>();
-
- for (int i = 0; i < messages; i++) {
- futureList.add(producer.sendAsync(("test-" + i).getBytes()));
- log.info("queued message {}", ("test-" + i));
- }
- FutureUtil.waitForAll(futureList).get();
- }
-
- void produce(int messages) throws Exception {
-
- log.info("Start sending messages");
- for (int i = 0; i < messages; i++) {
- producer.send(("test-" + i).getBytes());
- log.info("Sent message {}", ("test-" + i));
- }
-
- }
-
- void produce(int messages, TypedMessageBuilder<byte[]> messageBuilder) throws Exception {
- log.info("Start sending messages");
- for (int i = 0; i < messages; i++) {
- final String m = new String("test-builder-" + i);
- messageBuilder.value(m.getBytes());
- messageBuilder.send();
- log.info("Sent message {}", m);
- }
- }
-
- TypedMessageBuilder<byte[]> newMessage() {
- return producer.newMessage();
- }
-
- void close() throws Exception {
- client.close();
- }
-
- }
-
- static class MessageConsumer {
- final URL url;
- final String namespace;
- final String topicName;
- final PulsarClient client;
- final Consumer<byte[]> consumer;
-
- MessageConsumer(URL url, final TopicName dest) throws Exception {
- this(url, dest, "sub-id");
- }
-
- MessageConsumer(URL url, final TopicName dest, String subId) throws Exception {
- this.url = url;
- this.namespace = dest.getNamespace();
- this.topicName = dest.toString();
-
- client = PulsarClient.builder().serviceUrl(url.toString()).statsInterval(0, TimeUnit.SECONDS).build();
-
- try {
- consumer = client.newConsumer().topic(topicName).subscriptionName(subId).subscribe();
- } catch (Exception e) {
- client.close();
- throw e;
- }
- }
-
- void receive(int messages) throws Exception {
- log.info("Start receiving messages");
- Message<byte[]> msg = null;
-
- for (int i = 0; i < messages; i++) {
- msg = consumer.receive();
- consumer.acknowledge(msg);
- String msgData = new String(msg.getData());
- assertEquals(msgData, "test-" + i);
- log.info("Received message {}", msgData);
- }
- }
-
- boolean drained() throws Exception {
- return consumer.receive(0, TimeUnit.MICROSECONDS) == null;
- }
-
- void close() throws Exception {
- client.close();
- }
- }
-
- private static final Logger log = LoggerFactory.getLogger(V1_ReplicatorTestBase.class);
-}