You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/09/19 13:45:19 UTC

[GitHub] [pulsar] codelipenghui commented on a diff in pull request #17371: [feat][broker]PIP-180 ShadowTopic - Part IV - Add Shadow Replicator

codelipenghui commented on code in PR #17371:
URL: https://github.com/apache/pulsar/pull/17371#discussion_r974210605


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -1578,6 +1628,86 @@ public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) {
         return future;
     }
 
+    CompletableFuture<Void> startShadowReplicator(String shadowTopic) {
+        log.info("[{}] Starting replicator to remote: {}", topic, shadowTopic);

Review Comment:
   ```suggestion
           log.info("[{}] Starting shadow topic replicator to remote: {}", topic, shadowTopic);
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -1578,6 +1628,86 @@ public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) {
         return future;
     }
 
+    CompletableFuture<Void> startShadowReplicator(String shadowTopic) {
+        log.info("[{}] Starting replicator to remote: {}", topic, shadowTopic);
+        final CompletableFuture<Void> future = new CompletableFuture<>();
+
+        String name = ShadowReplicator.getShadowReplicatorName(replicatorPrefix, shadowTopic);
+        ledger.asyncOpenCursor(name, new OpenCursorCallback() {
+            @Override
+            public void openCursorComplete(ManagedCursor cursor, Object ctx) {
+                addShadowReplicationCluster(shadowTopic, cursor).whenComplete((__, ex) -> {
+                    if (ex == null) {
+                        future.complete(null);
+                    } else {
+                        future.completeExceptionally(ex);
+                    }
+                });
+            }
+
+            @Override
+            public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
+                future.completeExceptionally(new PersistenceException(exception));
+            }
+
+        }, null);
+
+        return future;
+    }
+
+    protected CompletableFuture<Void> addShadowReplicationCluster(String shadowTopic, ManagedCursor cursor) {
+        String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
+        return AbstractReplicator.validatePartitionedTopicAsync(PersistentTopic.this.getName(), brokerService)
+                .thenCompose(__ -> brokerService.pulsar().getPulsarResources().getClusterResources()
+                        .getClusterAsync(localCluster)
+                        .thenApply(clusterData -> brokerService.getReplicationClient(localCluster, clusterData)))
+                .thenAccept(replicationClient -> {
+                    Replicator replicator = shadowReplicators.computeIfAbsent(shadowTopic, r -> {
+                        try {
+                            return new ShadowReplicator(shadowTopic, PersistentTopic.this, cursor, brokerService,
+                                    (PulsarClientImpl) replicationClient);
+                        } catch (PulsarServerException e) {
+                            log.error("[{}] ShadowReplicator startup failed {}", topic, shadowTopic, e);
+                        }
+                        return null;
+                    });
+
+                    // clean up replicator if startup is failed
+                    if (replicator == null) {
+                        shadowReplicators.removeNullValue(shadowTopic);
+                    }
+                });
+    }
+
+    CompletableFuture<Void> removeShadowReplicator(String shadowTopic) {
+        log.info("[{}] Removing replicator to {}", topic, shadowTopic);

Review Comment:
   ```suggestion
           log.info("[{}] Removing shadow topic replicator to {}", topic, shadowTopic);
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -1578,6 +1628,86 @@ public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) {
         return future;
     }
 
+    CompletableFuture<Void> startShadowReplicator(String shadowTopic) {
+        log.info("[{}] Starting replicator to remote: {}", topic, shadowTopic);
+        final CompletableFuture<Void> future = new CompletableFuture<>();
+
+        String name = ShadowReplicator.getShadowReplicatorName(replicatorPrefix, shadowTopic);
+        ledger.asyncOpenCursor(name, new OpenCursorCallback() {
+            @Override
+            public void openCursorComplete(ManagedCursor cursor, Object ctx) {
+                addShadowReplicationCluster(shadowTopic, cursor).whenComplete((__, ex) -> {
+                    if (ex == null) {
+                        future.complete(null);
+                    } else {
+                        future.completeExceptionally(ex);
+                    }
+                });
+            }
+
+            @Override
+            public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
+                future.completeExceptionally(new PersistenceException(exception));
+            }
+
+        }, null);
+
+        return future;
+    }
+
+    protected CompletableFuture<Void> addShadowReplicationCluster(String shadowTopic, ManagedCursor cursor) {
+        String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
+        return AbstractReplicator.validatePartitionedTopicAsync(PersistentTopic.this.getName(), brokerService)
+                .thenCompose(__ -> brokerService.pulsar().getPulsarResources().getClusterResources()
+                        .getClusterAsync(localCluster)
+                        .thenApply(clusterData -> brokerService.getReplicationClient(localCluster, clusterData)))
+                .thenAccept(replicationClient -> {
+                    Replicator replicator = shadowReplicators.computeIfAbsent(shadowTopic, r -> {
+                        try {
+                            return new ShadowReplicator(shadowTopic, PersistentTopic.this, cursor, brokerService,
+                                    (PulsarClientImpl) replicationClient);
+                        } catch (PulsarServerException e) {
+                            log.error("[{}] ShadowReplicator startup failed {}", topic, shadowTopic, e);
+                        }
+                        return null;
+                    });
+
+                    // clean up replicator if startup is failed
+                    if (replicator == null) {
+                        shadowReplicators.removeNullValue(shadowTopic);
+                    }
+                });
+    }
+
+    CompletableFuture<Void> removeShadowReplicator(String shadowTopic) {
+        log.info("[{}] Removing replicator to {}", topic, shadowTopic);
+        final CompletableFuture<Void> future = new CompletableFuture<>();
+        String name = ShadowReplicator.getShadowReplicatorName(replicatorPrefix, shadowTopic);
+        shadowReplicators.get(shadowTopic).disconnect().thenRun(() -> {
+
+            ledger.asyncDeleteCursor(name, new DeleteCursorCallback() {
+                @Override
+                public void deleteCursorComplete(Object ctx) {
+                    shadowReplicators.remove(shadowTopic);
+                    future.complete(null);
+                }
+
+                @Override
+                public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) {
+                    log.error("[{}] Failed to delete cursor {} {}", topic, name, exception.getMessage(), exception);
+                    future.completeExceptionally(new PersistenceException(exception));
+                }
+            }, null);
+
+        }).exceptionally(e -> {
+            log.error("[{}] Failed to close replication producer {} {}", topic, name, e.getMessage(), e);

Review Comment:
   ```suggestion
               log.error("[{}] Failed to close shadow topic replication producer {} {}", topic, name, e.getMessage(), e);
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -1578,6 +1628,86 @@ public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) {
         return future;
     }
 
+    CompletableFuture<Void> startShadowReplicator(String shadowTopic) {
+        log.info("[{}] Starting replicator to remote: {}", topic, shadowTopic);
+        final CompletableFuture<Void> future = new CompletableFuture<>();
+
+        String name = ShadowReplicator.getShadowReplicatorName(replicatorPrefix, shadowTopic);
+        ledger.asyncOpenCursor(name, new OpenCursorCallback() {
+            @Override
+            public void openCursorComplete(ManagedCursor cursor, Object ctx) {
+                addShadowReplicationCluster(shadowTopic, cursor).whenComplete((__, ex) -> {
+                    if (ex == null) {
+                        future.complete(null);
+                    } else {
+                        future.completeExceptionally(ex);

Review Comment:
   Do we need to add an error log here? I noticed that we don't have an error log here, we are not able to know the cause of each operation because we are using FutureUtil.waitForAll to handle all the operations.



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowReplicatorTest.java:
##########
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.broker.service.persistent;
+
+import com.google.common.collect.Sets;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.assertj.core.util.Lists;
+import org.awaitility.Awaitility;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class ShadowReplicatorTest extends BrokerTestBase {

Review Comment:
   It's better to have test group name



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -1578,6 +1628,86 @@ public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) {
         return future;
     }
 
+    CompletableFuture<Void> startShadowReplicator(String shadowTopic) {
+        log.info("[{}] Starting replicator to remote: {}", topic, shadowTopic);
+        final CompletableFuture<Void> future = new CompletableFuture<>();
+
+        String name = ShadowReplicator.getShadowReplicatorName(replicatorPrefix, shadowTopic);
+        ledger.asyncOpenCursor(name, new OpenCursorCallback() {
+            @Override
+            public void openCursorComplete(ManagedCursor cursor, Object ctx) {
+                addShadowReplicationCluster(shadowTopic, cursor).whenComplete((__, ex) -> {
+                    if (ex == null) {
+                        future.complete(null);
+                    } else {
+                        future.completeExceptionally(ex);
+                    }
+                });
+            }
+
+            @Override
+            public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
+                future.completeExceptionally(new PersistenceException(exception));

Review Comment:
   Same as the above comment



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -1578,6 +1628,86 @@ public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) {
         return future;
     }
 
+    CompletableFuture<Void> startShadowReplicator(String shadowTopic) {
+        log.info("[{}] Starting replicator to remote: {}", topic, shadowTopic);
+        final CompletableFuture<Void> future = new CompletableFuture<>();
+
+        String name = ShadowReplicator.getShadowReplicatorName(replicatorPrefix, shadowTopic);
+        ledger.asyncOpenCursor(name, new OpenCursorCallback() {
+            @Override
+            public void openCursorComplete(ManagedCursor cursor, Object ctx) {
+                addShadowReplicationCluster(shadowTopic, cursor).whenComplete((__, ex) -> {
+                    if (ex == null) {
+                        future.complete(null);
+                    } else {
+                        future.completeExceptionally(ex);
+                    }
+                });
+            }
+
+            @Override
+            public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
+                future.completeExceptionally(new PersistenceException(exception));
+            }
+
+        }, null);
+
+        return future;
+    }
+
+    protected CompletableFuture<Void> addShadowReplicationCluster(String shadowTopic, ManagedCursor cursor) {
+        String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
+        return AbstractReplicator.validatePartitionedTopicAsync(PersistentTopic.this.getName(), brokerService)
+                .thenCompose(__ -> brokerService.pulsar().getPulsarResources().getClusterResources()
+                        .getClusterAsync(localCluster)
+                        .thenApply(clusterData -> brokerService.getReplicationClient(localCluster, clusterData)))
+                .thenAccept(replicationClient -> {
+                    Replicator replicator = shadowReplicators.computeIfAbsent(shadowTopic, r -> {
+                        try {
+                            return new ShadowReplicator(shadowTopic, PersistentTopic.this, cursor, brokerService,
+                                    (PulsarClientImpl) replicationClient);
+                        } catch (PulsarServerException e) {
+                            log.error("[{}] ShadowReplicator startup failed {}", topic, shadowTopic, e);
+                        }
+                        return null;
+                    });
+
+                    // clean up replicator if startup is failed
+                    if (replicator == null) {
+                        shadowReplicators.removeNullValue(shadowTopic);
+                    }
+                });
+    }
+
+    CompletableFuture<Void> removeShadowReplicator(String shadowTopic) {
+        log.info("[{}] Removing replicator to {}", topic, shadowTopic);
+        final CompletableFuture<Void> future = new CompletableFuture<>();
+        String name = ShadowReplicator.getShadowReplicatorName(replicatorPrefix, shadowTopic);
+        shadowReplicators.get(shadowTopic).disconnect().thenRun(() -> {
+
+            ledger.asyncDeleteCursor(name, new DeleteCursorCallback() {
+                @Override
+                public void deleteCursorComplete(Object ctx) {
+                    shadowReplicators.remove(shadowTopic);
+                    future.complete(null);
+                }
+
+                @Override
+                public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) {
+                    log.error("[{}] Failed to delete cursor {} {}", topic, name, exception.getMessage(), exception);

Review Comment:
   ```suggestion
                       log.error("[{}] Failed to delete shadow topic replication cursor {} {}", topic, name, exception.getMessage(), exception);
   ```



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowReplicatorTest.java:
##########
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.broker.service.persistent;
+
+import com.google.common.collect.Sets;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.assertj.core.util.Lists;
+import org.awaitility.Awaitility;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class ShadowReplicatorTest extends BrokerTestBase {
+
+    @BeforeMethod(alwaysRun = true)
+    @Override
+    protected void setup() throws Exception {
+        super.baseSetup();
+        admin.tenants().createTenant("prop1",
+                new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet("test")));
+        admin.namespaces().createNamespace("prop1/ns-source");
+        admin.namespaces().createNamespace("prop1/ns-shadow");
+    }
+
+    @AfterMethod(alwaysRun = true)

Review Comment:
   ```suggestion
       @AfterClass(alwaysRun = true)
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java:
##########
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.broker.service.persistent;
+
+
+import io.netty.buffer.ByteBuf;
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.util.Codec;
+
+/**
+ *  Replicate messages to shadow topic.
+ */
+@Slf4j
+public class ShadowReplicator extends PersistentReplicator {
+
+    public ShadowReplicator(String shadowTopic, PersistentTopic sourceTopic, ManagedCursor cursor,
+                            BrokerService brokerService, PulsarClientImpl replicationClient)
+            throws PulsarServerException {
+        super(brokerService.pulsar().getConfiguration().getClusterName(), sourceTopic, cursor,
+                brokerService.pulsar().getConfiguration().getClusterName(), shadowTopic, brokerService,
+                replicationClient);
+    }
+
+    /**
+     * @return Producer name format : replicatorPrefix-localTopic-->remoteTopic
+     */
+    @Override
+    protected String getProducerName() {
+        return replicatorPrefix + "-" + localTopicName + REPL_PRODUCER_NAME_DELIMITER + remoteTopicName;
+    }
+
+    @Override
+    protected void readEntries(Producer<byte[]> producer) {
+        // Shadow replicator should skip all backlog
+        this.cursor.asyncResetCursor(PositionImpl.LATEST, false, new AsyncCallbacks.ResetCursorCallback() {

Review Comment:
   If the shadow topic can skip all the backlogs. Can we use the non-durable cursor?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -1422,6 +1438,38 @@ public CompletableFuture<Void> checkReplication() {
             }
         });
 
+        futures.add(checkShadowReplication());
+
+        return FutureUtil.waitForAll(futures);
+    }
+
+    private CompletableFuture<Void> checkShadowReplication() {
+        if (CollectionUtils.isEmpty(shadowTopics)) {
+            return CompletableFuture.completedFuture(null);
+        }
+        List<String> configuredShadowTopics = shadowTopics;
+        int newMessageTTLinSeconds = topicPolicies.getMessageTTLInSeconds().get();

Review Comment:
   ```suggestion
           int newMessageTTLInSeconds = topicPolicies.getMessageTTLInSeconds().get();
   ```



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowReplicatorTest.java:
##########
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.broker.service.persistent;
+
+import com.google.common.collect.Sets;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.assertj.core.util.Lists;
+import org.awaitility.Awaitility;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class ShadowReplicatorTest extends BrokerTestBase {
+
+    @BeforeMethod(alwaysRun = true)

Review Comment:
   ```suggestion
       @BeforeClass(alwaysRun = true)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org