You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2021/08/08 08:59:26 UTC
[pulsar] branch master updated: [PIP-88] Replicate schemas across
clusters (#11441)
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a09bd68 [PIP-88] Replicate schemas across clusters (#11441)
a09bd68 is described below
commit a09bd68bfb1b6397cf2682d52835d902d0e34336
Author: lipenghui <pe...@apache.org>
AuthorDate: Sun Aug 8 16:58:45 2021 +0800
[PIP-88] Replicate schemas across clusters (#11441)
* [PIP-88] Replicate schemas accross clusters
Here is the proposal: https://github.com/apache/pulsar/wiki/PIP-88%3A-Replicate-schemas-across-multiple
For the implementation, we just need to set the correct SchemaInfo for the replicated message and using
the AutoProduceByte schema for the producer of the
---
.../pulsar/broker/service/AbstractReplicator.java | 10 ++--
.../nonpersistent/NonPersistentReplicator.java | 3 +-
.../service/nonpersistent/NonPersistentTopic.java | 3 +-
.../service/persistent/PersistentReplicator.java | 23 ++++++++-
.../broker/service/persistent/PersistentTopic.java | 2 +-
.../pulsar/broker/service/PersistentTopicTest.java | 1 +
.../pulsar/broker/service/ReplicatorTest.java | 57 ++++++++++++++++++++++
.../org/apache/pulsar/client/impl/MessageImpl.java | 25 +++++++++-
.../apache/pulsar/client/impl/ProducerImpl.java | 4 +-
.../pulsar/client/impl/PulsarClientImpl.java | 2 +-
10 files changed, 118 insertions(+), 12 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
index 34d2b6e..b7fdb24 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
@@ -23,11 +23,13 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.bookkeeper.mledger.Position;
+import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
@@ -41,6 +43,7 @@ public abstract class AbstractReplicator {
protected final String topicName;
protected final String localCluster;
protected final String remoteCluster;
+ protected final PulsarClientImpl replicationClient;
protected final PulsarClientImpl client;
protected volatile ProducerImpl producer;
@@ -63,18 +66,19 @@ public abstract class AbstractReplicator {
}
public AbstractReplicator(String topicName, String replicatorPrefix, String localCluster, String remoteCluster,
- BrokerService brokerService) throws NamingException {
+ BrokerService brokerService) throws NamingException, PulsarServerException {
validatePartitionedTopic(topicName, brokerService);
this.brokerService = brokerService;
this.topicName = topicName;
this.replicatorPrefix = replicatorPrefix;
this.localCluster = localCluster.intern();
this.remoteCluster = remoteCluster.intern();
- this.client = (PulsarClientImpl) brokerService.getReplicationClient(remoteCluster);
+ this.replicationClient = (PulsarClientImpl) brokerService.getReplicationClient(remoteCluster);
+ this.client = (PulsarClientImpl) brokerService.pulsar().getClient();
this.producer = null;
this.producerQueueSize = brokerService.pulsar().getConfiguration().getReplicationProducerQueueSize();
- this.producerBuilder = client.newProducer() //
+ this.producerBuilder = replicationClient.newProducer(Schema.AUTO_PRODUCE_BYTES()) //
.topic(topicName)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.enableBatching(false)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
index b003db8..dd57fd9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
@@ -25,6 +25,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.Position;
+import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.service.AbstractReplicator;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
@@ -48,7 +49,7 @@ public class NonPersistentReplicator extends AbstractReplicator implements Repli
private final NonPersistentReplicatorStatsImpl stats = new NonPersistentReplicatorStatsImpl();
public NonPersistentReplicator(NonPersistentTopic topic, String localCluster, String remoteCluster,
- BrokerService brokerService) throws NamingException {
+ BrokerService brokerService) throws NamingException, PulsarServerException {
super(topic.getName(), topic.getReplicatorPrefix(), localCluster, remoteCluster, brokerService);
producerBuilder.blockIfQueueFull(false);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index e3255fe..a9c0f7f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -38,6 +38,7 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.LongAdder;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.Position;
+import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.AbstractTopic;
@@ -561,7 +562,7 @@ public class NonPersistentTopic extends AbstractTopic implements Topic {
replicators.computeIfAbsent(remoteCluster, r -> {
try {
return new NonPersistentReplicator(NonPersistentTopic.this, localCluster, remoteCluster, brokerService);
- } catch (NamingException e) {
+ } catch (NamingException | PulsarServerException e) {
isReplicatorStarted.set(false);
log.error("[{}] Replicator startup failed due to partitioned-topic {}", topic, remoteCluster);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index 0f3f18c..aa53e15 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -26,6 +26,7 @@ import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
@@ -41,6 +42,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.CursorAlreadyClosedE
import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.service.AbstractReplicator;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
@@ -56,6 +58,7 @@ import org.apache.pulsar.client.impl.SendCallback;
import org.apache.pulsar.common.api.proto.MarkerType;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl;
+import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.stats.Rate;
import org.apache.pulsar.common.util.Codec;
import org.slf4j.Logger;
@@ -102,7 +105,7 @@ public class PersistentReplicator extends AbstractReplicator
private final ReplicatorStatsImpl stats = new ReplicatorStatsImpl();
public PersistentReplicator(PersistentTopic topic, ManagedCursor cursor, String localCluster, String remoteCluster,
- BrokerService brokerService) throws NamingException {
+ BrokerService brokerService) throws NamingException, PulsarServerException {
super(topic.getName(), topic.getReplicatorPrefix(), localCluster, remoteCluster, brokerService);
this.topic = topic;
this.cursor = cursor;
@@ -358,7 +361,15 @@ public class PersistentReplicator extends AbstractReplicator
headersAndPayload.retain();
- producer.sendAsync(msg, ProducerSendCallback.create(this, entry, msg));
+ getSchemaInfo(msg).thenAccept(schemaInfo -> {
+ msg.setSchemaInfoForReplicator(schemaInfo);
+ producer.sendAsync(msg, ProducerSendCallback.create(this, entry, msg));
+ }).exceptionally(ex -> {
+ log.error("[{}][{} -> {}] Failed to get schema from local cluster", topicName,
+ localCluster, remoteCluster, ex);
+ return null;
+ });
+
atLeastOneMessageSentForReplication = true;
}
} catch (Exception e) {
@@ -379,6 +390,14 @@ public class PersistentReplicator extends AbstractReplicator
}
}
+ private CompletableFuture<SchemaInfo> getSchemaInfo(MessageImpl msg) throws ExecutionException {
+ if (msg.getSchemaVersion() == null || msg.getSchemaVersion().length == 0) {
+ return CompletableFuture.completedFuture(null);
+ }
+ return client.getSchemaProviderLoadingCache().get(topicName)
+ .getSchemaByVersion(msg.getSchemaVersion());
+ }
+
public void updateCursorState() {
if (this.cursor != null) {
if (producer != null && producer.isConnected()) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index db79988..c03457c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1431,7 +1431,7 @@ public class PersistentTopic extends AbstractTopic
try {
return new PersistentReplicator(PersistentTopic.this, cursor, localCluster, remoteCluster,
brokerService);
- } catch (NamingException e) {
+ } catch (NamingException | PulsarServerException e) {
isReplicatorStarted.set(false);
log.error("[{}] Replicator startup failed due to partitioned-topic {}", topic, remoteCluster);
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 9b74331..5c074fc 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -181,6 +181,7 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase {
mlFactoryMock = mock(ManagedLedgerFactory.class);
doReturn(mlFactoryMock).when(pulsar).getManagedLedgerFactory();
+ doReturn(mock(PulsarClientImpl.class)).when(pulsar).getClient();
ZooKeeper mockZk = createMockZooKeeper();
doReturn(mockZk).when(pulsar).getZkClient();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index 071160b0..527fd2c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -90,6 +90,7 @@ import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.apache.pulsar.schema.Schemas;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.slf4j.Logger;
@@ -376,6 +377,62 @@ public class ReplicatorTest extends ReplicatorTestBase {
}
@Test
+ public void testReplicationWithSchema() throws Exception {
+ PulsarClient client1 = pulsar1.getClient();
+ PulsarClient client2 = pulsar2.getClient();
+ PulsarClient client3 = pulsar3.getClient();
+ final TopicName topic = TopicName
+ .get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/testReplicationWithSchema"));
+
+ final String subName = "my-sub";
+
+ @Cleanup
+ Producer<Schemas.PersonOne> producer1 = client1.newProducer(Schema.AVRO(Schemas.PersonOne.class))
+ .topic(topic.toString())
+ .create();
+ @Cleanup
+ Producer<Schemas.PersonOne> producer2 = client2.newProducer(Schema.AVRO(Schemas.PersonOne.class))
+ .topic(topic.toString())
+ .create();
+ @Cleanup
+ Producer<Schemas.PersonOne> producer3 = client3.newProducer(Schema.AVRO(Schemas.PersonOne.class))
+ .topic(topic.toString())
+ .create();
+
+ List<Producer<Schemas.PersonOne>> producers = Lists.newArrayList(producer1, producer2, producer3);
+
+ @Cleanup
+ Consumer<Schemas.PersonOne> consumer1 = client1.newConsumer(Schema.AVRO(Schemas.PersonOne.class))
+ .topic(topic.toString())
+ .subscriptionName(subName)
+ .subscribe();
+
+ @Cleanup
+ Consumer<Schemas.PersonOne> consumer2 = client2.newConsumer(Schema.AVRO(Schemas.PersonOne.class))
+ .topic(topic.toString())
+ .subscriptionName(subName)
+ .subscribe();
+
+ @Cleanup
+ Consumer<Schemas.PersonOne> consumer3 = client3.newConsumer(Schema.AVRO(Schemas.PersonOne.class))
+ .topic(topic.toString())
+ .subscriptionName(subName)
+ .subscribe();
+
+ for (int i = 0; i < 3; i++) {
+ producers.get(i).send(new Schemas.PersonOne(i));
+ Message<Schemas.PersonOne> msg1 = consumer1.receive();
+ Message<Schemas.PersonOne> msg2 = consumer2.receive();
+ Message<Schemas.PersonOne> msg3 = consumer3.receive();
+ assertTrue(msg1 != null && msg2 != null && msg3 != null);
+ assertTrue(msg1.getValue().equals(msg2.getValue()) && msg2.getValue().equals(msg3.getValue()));
+ consumer1.acknowledge(msg1);
+ consumer2.acknowledge(msg2);
+ consumer3.acknowledge(msg3);
+ }
+ }
+
+ @Test
public void testReplicationOverrides() throws Exception {
log.info("--- Starting ReplicatorTest::testReplicationOverrides ---");
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index 1af6915..0297b14 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -64,6 +64,7 @@ public class MessageImpl<T> implements Message<T> {
private ByteBuf payload;
private Schema<T> schema;
+ private SchemaInfo schemaInfoForReplicator;
private SchemaState schemaState = SchemaState.None;
private Optional<EncryptionContext> encryptionCtx = Optional.empty();
@@ -418,7 +419,10 @@ public class MessageImpl<T> implements Message<T> {
}
}
- private SchemaInfo getSchemaInfo() {
+ public SchemaInfo getSchemaInfo() {
+ if (schema == null) {
+ return null;
+ }
ensureSchemaIsLoaded();
if (schema instanceof AutoConsumeSchema) {
return ((AutoConsumeSchema) schema).getSchemaInfo(getSchemaVersion());
@@ -426,6 +430,18 @@ public class MessageImpl<T> implements Message<T> {
return schema.getSchemaInfo();
}
+ public void setSchemaInfoForReplicator(SchemaInfo schemaInfo) {
+ if (msgMetadata.hasReplicatedFrom()) {
+ this.schemaInfoForReplicator = schemaInfo;
+ } else {
+ throw new IllegalArgumentException("Only allowed to set schemaInfoForReplicator for a replicated message.");
+ }
+ }
+
+ public SchemaInfo getSchemaInfoForReplicator() {
+ return msgMetadata.hasReplicatedFrom() ? this.schemaInfoForReplicator : null;
+ }
+
@Override
public T getValue() {
SchemaInfo schemaInfo = getSchemaInfo();
@@ -671,6 +687,10 @@ public class MessageImpl<T> implements Message<T> {
return msgMetadata.getReplicateTosList();
}
+ public boolean hasReplicateFrom() {
+ return msgMetadata.hasReplicatedFrom();
+ }
+
void setMessageId(MessageIdImpl messageId) {
this.messageId = messageId;
}
@@ -690,6 +710,9 @@ public class MessageImpl<T> implements Message<T> {
}
SchemaState getSchemaState() {
+ if (getSchemaInfo() == null) {
+ return SchemaState.Ready;
+ }
return schemaState;
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 7d4e8a7..9513caa 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -610,8 +610,8 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
if (!changeToRegisteringSchemaState()) {
return;
}
- SchemaInfo schemaInfo = Optional.ofNullable(msg.getSchemaInternal())
- .map(Schema::getSchemaInfo)
+ SchemaInfo schemaInfo = msg.hasReplicateFrom() ? msg.getSchemaInfoForReplicator() : msg.getSchemaInfo();
+ schemaInfo = Optional.ofNullable(schemaInfo)
.filter(si -> si.getType().getValue() > 0)
.orElse(Schema.BYTES.getSchemaInfo());
getOrCreateSchemaAsync(cnx, schemaInfo).handle((v, ex) -> {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 00c90fb..894f67d 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -988,7 +988,7 @@ public class PulsarClientImpl implements PulsarClient {
return new MultiVersionSchemaInfoProvider(TopicName.get(topicName), this);
}
- protected LoadingCache<String, SchemaInfoProvider> getSchemaProviderLoadingCache() {
+ public LoadingCache<String, SchemaInfoProvider> getSchemaProviderLoadingCache() {
return schemaProviderLoadingCache;
}