You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2021/08/08 08:59:26 UTC

[pulsar] branch master updated: [PIP-88] Replicate schemas across clusters (#11441)

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a09bd68  [PIP-88] Replicate schemas across clusters (#11441)
a09bd68 is described below

commit a09bd68bfb1b6397cf2682d52835d902d0e34336
Author: lipenghui <pe...@apache.org>
AuthorDate: Sun Aug 8 16:58:45 2021 +0800

    [PIP-88] Replicate schemas across clusters (#11441)
    
    * [PIP-88] Replicate schemas accross clusters
    
    Here is the proposal: https://github.com/apache/pulsar/wiki/PIP-88%3A-Replicate-schemas-across-multiple
    For the implementation, we just need to set the correct SchemaInfo for the replicated message and using
    the AutoProduceByte schema for the producer of the
---
 .../pulsar/broker/service/AbstractReplicator.java  | 10 ++--
 .../nonpersistent/NonPersistentReplicator.java     |  3 +-
 .../service/nonpersistent/NonPersistentTopic.java  |  3 +-
 .../service/persistent/PersistentReplicator.java   | 23 ++++++++-
 .../broker/service/persistent/PersistentTopic.java |  2 +-
 .../pulsar/broker/service/PersistentTopicTest.java |  1 +
 .../pulsar/broker/service/ReplicatorTest.java      | 57 ++++++++++++++++++++++
 .../org/apache/pulsar/client/impl/MessageImpl.java | 25 +++++++++-
 .../apache/pulsar/client/impl/ProducerImpl.java    |  4 +-
 .../pulsar/client/impl/PulsarClientImpl.java       |  2 +-
 10 files changed, 118 insertions(+), 12 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
index 34d2b6e..b7fdb24 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
@@ -23,11 +23,13 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import org.apache.bookkeeper.mledger.Position;
+import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
 import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.client.impl.ProducerImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
@@ -41,6 +43,7 @@ public abstract class AbstractReplicator {
     protected final String topicName;
     protected final String localCluster;
     protected final String remoteCluster;
+    protected final PulsarClientImpl replicationClient;
     protected final PulsarClientImpl client;
 
     protected volatile ProducerImpl producer;
@@ -63,18 +66,19 @@ public abstract class AbstractReplicator {
     }
 
     public AbstractReplicator(String topicName, String replicatorPrefix, String localCluster, String remoteCluster,
-                              BrokerService brokerService) throws NamingException {
+                              BrokerService brokerService) throws NamingException, PulsarServerException {
         validatePartitionedTopic(topicName, brokerService);
         this.brokerService = brokerService;
         this.topicName = topicName;
         this.replicatorPrefix = replicatorPrefix;
         this.localCluster = localCluster.intern();
         this.remoteCluster = remoteCluster.intern();
-        this.client = (PulsarClientImpl) brokerService.getReplicationClient(remoteCluster);
+        this.replicationClient = (PulsarClientImpl) brokerService.getReplicationClient(remoteCluster);
+        this.client = (PulsarClientImpl) brokerService.pulsar().getClient();
         this.producer = null;
         this.producerQueueSize = brokerService.pulsar().getConfiguration().getReplicationProducerQueueSize();
 
-        this.producerBuilder = client.newProducer() //
+        this.producerBuilder = replicationClient.newProducer(Schema.AUTO_PRODUCE_BYTES()) //
                 .topic(topicName)
                 .messageRoutingMode(MessageRoutingMode.SinglePartition)
                 .enableBatching(false)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
index b003db8..dd57fd9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
@@ -25,6 +25,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.Position;
+import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.service.AbstractReplicator;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
@@ -48,7 +49,7 @@ public class NonPersistentReplicator extends AbstractReplicator implements Repli
     private final NonPersistentReplicatorStatsImpl stats = new NonPersistentReplicatorStatsImpl();
 
     public NonPersistentReplicator(NonPersistentTopic topic, String localCluster, String remoteCluster,
-            BrokerService brokerService) throws NamingException {
+            BrokerService brokerService) throws NamingException, PulsarServerException {
         super(topic.getName(), topic.getReplicatorPrefix(), localCluster, remoteCluster, brokerService);
 
         producerBuilder.blockIfQueueFull(false);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index e3255fe..a9c0f7f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -38,6 +38,7 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.atomic.LongAdder;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.Position;
+import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.service.AbstractTopic;
@@ -561,7 +562,7 @@ public class NonPersistentTopic extends AbstractTopic implements Topic {
         replicators.computeIfAbsent(remoteCluster, r -> {
             try {
                 return new NonPersistentReplicator(NonPersistentTopic.this, localCluster, remoteCluster, brokerService);
-            } catch (NamingException e) {
+            } catch (NamingException | PulsarServerException e) {
                 isReplicatorStarted.set(false);
                 log.error("[{}] Replicator startup failed due to partitioned-topic {}", topic, remoteCluster);
             }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index 0f3f18c..aa53e15 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
@@ -41,6 +42,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.CursorAlreadyClosedE
 import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.service.AbstractReplicator;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
@@ -56,6 +58,7 @@ import org.apache.pulsar.client.impl.SendCallback;
 import org.apache.pulsar.common.api.proto.MarkerType;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl;
+import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.stats.Rate;
 import org.apache.pulsar.common.util.Codec;
 import org.slf4j.Logger;
@@ -102,7 +105,7 @@ public class PersistentReplicator extends AbstractReplicator
     private final ReplicatorStatsImpl stats = new ReplicatorStatsImpl();
 
     public PersistentReplicator(PersistentTopic topic, ManagedCursor cursor, String localCluster, String remoteCluster,
-                                BrokerService brokerService) throws NamingException {
+                                BrokerService brokerService) throws NamingException, PulsarServerException {
         super(topic.getName(), topic.getReplicatorPrefix(), localCluster, remoteCluster, brokerService);
         this.topic = topic;
         this.cursor = cursor;
@@ -358,7 +361,15 @@ public class PersistentReplicator extends AbstractReplicator
 
                 headersAndPayload.retain();
 
-                producer.sendAsync(msg, ProducerSendCallback.create(this, entry, msg));
+                getSchemaInfo(msg).thenAccept(schemaInfo -> {
+                    msg.setSchemaInfoForReplicator(schemaInfo);
+                    producer.sendAsync(msg, ProducerSendCallback.create(this, entry, msg));
+                }).exceptionally(ex -> {
+                    log.error("[{}][{} -> {}] Failed to get schema from local cluster", topicName,
+                            localCluster, remoteCluster, ex);
+                    return null;
+                });
+
                 atLeastOneMessageSentForReplication = true;
             }
         } catch (Exception e) {
@@ -379,6 +390,14 @@ public class PersistentReplicator extends AbstractReplicator
         }
     }
 
+    private CompletableFuture<SchemaInfo> getSchemaInfo(MessageImpl msg) throws ExecutionException {
+        if (msg.getSchemaVersion() == null || msg.getSchemaVersion().length == 0) {
+            return CompletableFuture.completedFuture(null);
+        }
+        return client.getSchemaProviderLoadingCache().get(topicName)
+                .getSchemaByVersion(msg.getSchemaVersion());
+    }
+
     public void updateCursorState() {
         if (this.cursor != null) {
             if (producer != null && producer.isConnected()) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index db79988..c03457c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1431,7 +1431,7 @@ public class PersistentTopic extends AbstractTopic
             try {
                 return new PersistentReplicator(PersistentTopic.this, cursor, localCluster, remoteCluster,
                         brokerService);
-            } catch (NamingException e) {
+            } catch (NamingException | PulsarServerException e) {
                 isReplicatorStarted.set(false);
                 log.error("[{}] Replicator startup failed due to partitioned-topic {}", topic, remoteCluster);
             }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 9b74331..5c074fc 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -181,6 +181,7 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase {
 
         mlFactoryMock = mock(ManagedLedgerFactory.class);
         doReturn(mlFactoryMock).when(pulsar).getManagedLedgerFactory();
+        doReturn(mock(PulsarClientImpl.class)).when(pulsar).getClient();
 
         ZooKeeper mockZk = createMockZooKeeper();
         doReturn(mockZk).when(pulsar).getZkClient();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index 071160b0..527fd2c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -90,6 +90,7 @@ import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.apache.pulsar.schema.Schemas;
 import org.awaitility.Awaitility;
 import org.mockito.Mockito;
 import org.slf4j.Logger;
@@ -376,6 +377,62 @@ public class ReplicatorTest extends ReplicatorTestBase {
     }
 
     @Test
+    public void testReplicationWithSchema() throws Exception {
+        PulsarClient client1 = pulsar1.getClient();
+        PulsarClient client2 = pulsar2.getClient();
+        PulsarClient client3 = pulsar3.getClient();
+        final TopicName topic = TopicName
+                .get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/testReplicationWithSchema"));
+
+        final String subName = "my-sub";
+
+        @Cleanup
+        Producer<Schemas.PersonOne> producer1 = client1.newProducer(Schema.AVRO(Schemas.PersonOne.class))
+                .topic(topic.toString())
+                .create();
+        @Cleanup
+        Producer<Schemas.PersonOne> producer2 = client2.newProducer(Schema.AVRO(Schemas.PersonOne.class))
+                .topic(topic.toString())
+                .create();
+        @Cleanup
+        Producer<Schemas.PersonOne> producer3 = client3.newProducer(Schema.AVRO(Schemas.PersonOne.class))
+                .topic(topic.toString())
+                .create();
+
+        List<Producer<Schemas.PersonOne>> producers = Lists.newArrayList(producer1, producer2, producer3);
+
+        @Cleanup
+        Consumer<Schemas.PersonOne> consumer1 = client1.newConsumer(Schema.AVRO(Schemas.PersonOne.class))
+                .topic(topic.toString())
+                .subscriptionName(subName)
+                .subscribe();
+
+        @Cleanup
+        Consumer<Schemas.PersonOne> consumer2 = client2.newConsumer(Schema.AVRO(Schemas.PersonOne.class))
+                .topic(topic.toString())
+                .subscriptionName(subName)
+                .subscribe();
+
+        @Cleanup
+        Consumer<Schemas.PersonOne> consumer3 = client3.newConsumer(Schema.AVRO(Schemas.PersonOne.class))
+                .topic(topic.toString())
+                .subscriptionName(subName)
+                .subscribe();
+
+        for (int i = 0; i < 3; i++) {
+            producers.get(i).send(new Schemas.PersonOne(i));
+            Message<Schemas.PersonOne> msg1 = consumer1.receive();
+            Message<Schemas.PersonOne> msg2 = consumer2.receive();
+            Message<Schemas.PersonOne> msg3 = consumer3.receive();
+            assertTrue(msg1 != null && msg2 != null && msg3 != null);
+            assertTrue(msg1.getValue().equals(msg2.getValue()) && msg2.getValue().equals(msg3.getValue()));
+            consumer1.acknowledge(msg1);
+            consumer2.acknowledge(msg2);
+            consumer3.acknowledge(msg3);
+        }
+    }
+
+    @Test
     public void testReplicationOverrides() throws Exception {
         log.info("--- Starting ReplicatorTest::testReplicationOverrides ---");
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index 1af6915..0297b14 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -64,6 +64,7 @@ public class MessageImpl<T> implements Message<T> {
     private ByteBuf payload;
 
     private Schema<T> schema;
+    private SchemaInfo schemaInfoForReplicator;
     private SchemaState schemaState = SchemaState.None;
     private Optional<EncryptionContext> encryptionCtx = Optional.empty();
 
@@ -418,7 +419,10 @@ public class MessageImpl<T> implements Message<T> {
         }
     }
 
-    private SchemaInfo getSchemaInfo() {
+    public SchemaInfo getSchemaInfo() {
+        if (schema == null) {
+            return null;
+        }
         ensureSchemaIsLoaded();
         if (schema instanceof AutoConsumeSchema) {
             return ((AutoConsumeSchema) schema).getSchemaInfo(getSchemaVersion());
@@ -426,6 +430,18 @@ public class MessageImpl<T> implements Message<T> {
         return schema.getSchemaInfo();
     }
 
+    public void setSchemaInfoForReplicator(SchemaInfo schemaInfo) {
+        if (msgMetadata.hasReplicatedFrom()) {
+            this.schemaInfoForReplicator = schemaInfo;
+        } else {
+            throw new IllegalArgumentException("Only allowed to set schemaInfoForReplicator for a replicated message.");
+        }
+    }
+
+    public SchemaInfo getSchemaInfoForReplicator() {
+        return msgMetadata.hasReplicatedFrom() ? this.schemaInfoForReplicator : null;
+    }
+
     @Override
     public T getValue() {
         SchemaInfo schemaInfo = getSchemaInfo();
@@ -671,6 +687,10 @@ public class MessageImpl<T> implements Message<T> {
         return msgMetadata.getReplicateTosList();
     }
 
+    public boolean hasReplicateFrom() {
+        return msgMetadata.hasReplicatedFrom();
+    }
+
     void setMessageId(MessageIdImpl messageId) {
         this.messageId = messageId;
     }
@@ -690,6 +710,9 @@ public class MessageImpl<T> implements Message<T> {
     }
 
     SchemaState getSchemaState() {
+        if (getSchemaInfo() == null) {
+            return SchemaState.Ready;
+        }
         return schemaState;
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 7d4e8a7..9513caa 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -610,8 +610,8 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
         if (!changeToRegisteringSchemaState()) {
             return;
         }
-        SchemaInfo schemaInfo = Optional.ofNullable(msg.getSchemaInternal())
-                                        .map(Schema::getSchemaInfo)
+        SchemaInfo schemaInfo = msg.hasReplicateFrom() ? msg.getSchemaInfoForReplicator() : msg.getSchemaInfo();
+        schemaInfo = Optional.ofNullable(schemaInfo)
                                         .filter(si -> si.getType().getValue() > 0)
                                         .orElse(Schema.BYTES.getSchemaInfo());
         getOrCreateSchemaAsync(cnx, schemaInfo).handle((v, ex) -> {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 00c90fb..894f67d 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -988,7 +988,7 @@ public class PulsarClientImpl implements PulsarClient {
         return new MultiVersionSchemaInfoProvider(TopicName.get(topicName), this);
     }
 
-    protected LoadingCache<String, SchemaInfoProvider> getSchemaProviderLoadingCache() {
+    public LoadingCache<String, SchemaInfoProvider> getSchemaProviderLoadingCache() {
         return schemaProviderLoadingCache;
     }