You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/07/26 19:45:47 UTC

[GitHub] [pulsar] eolivelli commented on a diff in pull request #16425: [PIP-136] Sync Pulsar metadata across multiple clouds

eolivelli commented on code in PR #16425:
URL: https://github.com/apache/pulsar/pull/16425#discussion_r930343921


##########
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataEvent.java:
##########
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata.api;
+
+import java.util.HashSet;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.ToString;
+import org.apache.pulsar.metadata.api.extended.CreateOption;
+
+/**
+ * Metadata event used by {@link MetadataEventSynchronizer}.
+ *
+ */
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+@ToString
+public class MetadataEvent {
+    private String path;

Review Comment:
   Should we add some 'version' field in order to better support protocol enhancements?



##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java:
##########
@@ -221,7 +221,7 @@ public long millis() {
         }
     }
 
-    @Test(timeOut = 100000)
+    @Test//(timeOut = 100000)

Review Comment:
   Please revert



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarMetadataEventSynchronizer.java:
##########
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import static org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.Backoff;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.schema.AvroSchema;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.metadata.api.MetadataEvent;
+import org.apache.pulsar.metadata.api.MetadataEventSynchronizer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PulsarMetadataEventSynchronizer implements MetadataEventSynchronizer {
+
+    private static final Logger log = LoggerFactory.getLogger(PulsarMetadataEventSynchronizer.class);
+    protected PulsarService pulsar;
+    protected BrokerService brokerService;
+    protected String topicName;
+    protected PulsarClientImpl client;
+    protected volatile Producer<MetadataEvent> producer;
+    protected volatile Consumer<MetadataEvent> consumer;
+    private final CopyOnWriteArrayList<Function<MetadataEvent, CompletableFuture<Void>>>
+    listeners = new CopyOnWriteArrayList<>();
+
+    private volatile boolean started = false;
+    public static final String SUBSCRIPTION_NAME = "metadata-syncer";
+    private static final int MAX_PRODUCER_PENDING_SIZE = 1000;
+    protected final Backoff backOff = new Backoff(100, TimeUnit.MILLISECONDS, 1, TimeUnit.MINUTES, 0,
+            TimeUnit.MILLISECONDS);
+
+    public PulsarMetadataEventSynchronizer(PulsarService pulsar, String topicName) throws PulsarServerException {
+        this.pulsar = pulsar;
+        this.brokerService = pulsar.getBrokerService();
+        this.topicName = topicName;
+        if (!StringUtils.isNotBlank(topicName)) {
+            log.info("Metadata synchronizer is disabled");
+            return;
+        }
+    }
+
+    public void start() throws PulsarServerException {
+        if (StringUtils.isBlank(topicName)) {
+            log.info("metadata topic doesn't exist.. skipping metadata synchronizer init..");
+            return;
+        }
+        this.client = (PulsarClientImpl) pulsar.getClient();
+        startProducer();
+        startConsumer();
+        log.info("Metadata event synchronizer started on topic {}", topicName);
+    }
+
+    @Override
+    public CompletableFuture<Void> notify(MetadataEvent event) {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        publishAsync(event, future);
+        return future;
+    }
+
+    @Override
+    public void registerSyncListener(Function<MetadataEvent, CompletableFuture<Void>> listener) {
+        listeners.add(listener);
+    }
+
+    @Override
+    public String getClusterName() {
+        return pulsar.getConfig().getClusterName();
+    }
+
+    private void publishAsync(MetadataEvent event, CompletableFuture<Void> future) {
+        if (!started) {
+            log.info("Producer is not started on {}, failed to publish {}", topicName, event);
+            future.completeExceptionally(new IllegalStateException("producer is not started yet"));
+        }
+        producer.newMessage().value(event).sendAsync().thenAccept(__ -> {
+            log.info("successfully published metadata change event {}", event);
+            future.complete(null);
+        }).exceptionally(ex -> {
+            log.warn("failed to publish metadata update {}, will retry in {}", topicName, MESSAGE_RATE_BACKOFF_MS, ex);
+            pulsar.getBrokerService().executor().schedule(() -> publishAsync(event, future), MESSAGE_RATE_BACKOFF_MS,
+                    TimeUnit.MILLISECONDS);
+            return null;
+        });
+    }
+
+    private void startProducer() {
+        log.info("[{}] Starting producer", topicName);
+        client.newProducer(AvroSchema.of(MetadataEvent.class)).topic(topicName)
+                .messageRoutingMode(MessageRoutingMode.SinglePartition).enableBatching(false).enableBatching(false)
+                .sendTimeout(0, TimeUnit.SECONDS) //
+                .maxPendingMessages(MAX_PRODUCER_PENDING_SIZE).createAsync().thenAccept(prod -> {
+                    producer = prod;
+                    started = true;
+                    log.info("producer is created successfully {}", topicName);
+                }).exceptionally(ex -> {
+                    long waitTimeMs = backOff.next();
+                    log.warn("[{}] Failed to create producer ({}), retrying in {} s", topicName, ex.getMessage(),
+                            waitTimeMs / 1000.0);
+                    // BackOff before retrying
+                    brokerService.executor().schedule(this::startProducer, waitTimeMs, TimeUnit.MILLISECONDS);
+                    return null;
+                });
+    }
+
+    private void startConsumer() {
+        if (consumer != null) {
+            return;
+        }
+        ConsumerBuilder<MetadataEvent> consumerBuilder = client.newConsumer(AvroSchema.of(MetadataEvent.class))

Review Comment:
   Nit:Please use Schema.AVRO()
   
   Using AVRO with a Pojo that is not defined with the avro plugin leads to bad performance because it is going to use reflection.
   
   We could generate the AVRO poco using the Maven plugin. I am 100% that it is worth (but not to be done in this patch)



##########
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java:
##########
@@ -227,17 +316,28 @@ public final CompletableFuture<Void> delete(String path, Optional<Long> expected
         if (!isValidPath(path)) {
             return FutureUtil.failedFuture(new MetadataStoreException.InvalidPathException(path));
         }
+        if (getMetadataEventSynchronizer().isPresent()) {
+            MetadataEvent event = new MetadataEvent(path, null, new HashSet<>(),
+                    expectedVersion.orElse(null), Instant.now().toEpochMilli(),
+                    getMetadataEventSynchronizer().get().getClusterName(), NotificationType.Deleted);
+            return getMetadataEventSynchronizer().get().notify(event)

Review Comment:
   So we could write to the topic and then fail the write.
   
   I understand this is a trade off.
   
   How can we recover? 
   Like if I am deleting a topic and the delete fails, it may happen that on the remote cluster the topic is eventually deleted.
   
   I wonder if we should use the topic as commit log and read from the topic from all the brokers and apply the changes  coming only from the log



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org