You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/10/11 17:36:26 UTC

[GitHub] [pulsar] poorbarcode opened a new pull request, #18010: [ci-run][improve][broker]make consumer and producer can work even if schema ledger lost

poorbarcode opened a new pull request, #18010:
URL: https://github.com/apache/pulsar/pull/18010

   relate to:
   
   - 1
   
   `/pulsarbot rerun-failure-checks`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] github-actions[bot] commented on pull request #18010: [ci-run][improve][broker]make consumer and producer can work even if schema ledger lost

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #18010:
URL: https://github.com/apache/pulsar/pull/18010#issuecomment-1275051445

   @poorbarcode Please add the following content to your PR description and select a checkbox:
   ```
   - [ ] `doc` <!-- Your PR contains doc changes -->
   - [ ] `doc-required` <!-- Your PR changes impact docs and you will update later -->
   - [ ] `doc-not-needed` <!-- Your PR changes do not impact docs -->
   - [ ] `doc-complete` <!-- Docs have been already added -->
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #18010: [improve] [broker] make consumer and producer can work even if schema ledger lost

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode commented on code in PR #18010:
URL: https://github.com/apache/pulsar/pull/18010#discussion_r1140505643


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java:
##########
@@ -691,7 +691,8 @@ public static Exception bkException(String operation, int rc, long ledgerId, lon
             message += " - entry=" + entryId;
         }
         boolean recoverable = rc != BKException.Code.NoSuchLedgerExistsException
-                && rc != BKException.Code.NoSuchEntryException;
+                && rc != BKException.Code.NoSuchEntryException
+                && rc != BKException.Code.NoSuchLedgerExistsOnMetadataServerException;

Review Comment:
   Yes. We will check the config `autoSkipNonRecoverableData` when the BKException has been caught. Such as this case: 
   <img width="772" alt="截屏2023-03-18 01 19 11" src="https://user-images.githubusercontent.com/25195800/225974613-16aee2d3-7217-457d-aaf5-a41889c5e588.png">
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Denovo1998 commented on a diff in pull request #18010: [improve] [broker] make consumer and producer can work even if schema ledger lost

Posted by "Denovo1998 (via GitHub)" <gi...@apache.org>.
Denovo1998 commented on code in PR #18010:
URL: https://github.com/apache/pulsar/pull/18010#discussion_r1140357628


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java:
##########
@@ -691,7 +691,8 @@ public static Exception bkException(String operation, int rc, long ledgerId, lon
             message += " - entry=" + entryId;
         }
         boolean recoverable = rc != BKException.Code.NoSuchLedgerExistsException
-                && rc != BKException.Code.NoSuchEntryException;
+                && rc != BKException.Code.NoSuchEntryException
+                && rc != BKException.Code.NoSuchLedgerExistsOnMetadataServerException;

Review Comment:
   @poorbarcode Maybe we should modify here(`BookkeeperSchemaStorage.bkException`), if `autoSkipNonRecoverableData` is false, then `NoSuchLedgerExistsOnMetadataServerException` should not be recoverable. What do you think about this?
   ```java
       private final boolean autoSkipNonRecoverableData;
   
       BookkeeperSchemaStorage(PulsarService pulsar) {
           ......
           this.autoSkipNonRecoverableData = pulsar.getConfiguration().isAutoSkipNonRecoverableData();
       }
   
       private CompletableFuture<Long> addEntry(LedgerHandle ledgerHandle, SchemaStorageFormat.SchemaEntry entry) {
           ......
           future.completeExceptionally(bkException("Failed to add entry", rc, ledgerHandle.getId(), -1, autoSkipNonRecoverableData));
           ......
       }
   
       private CompletableFuture<LedgerHandle> createLedger(String schemaId) {
           ......
           future.completeExceptionally(bkException("Failed to create ledger", rc, -1, -1, autoSkipNonRecoverableData));
           ......
       }
   
       private CompletableFuture<LedgerHandle> openLedger(Long ledgerId) {
           ......
           future.completeExceptionally(bkException("Failed to open ledger", rc, ledgerId, -1, autoSkipNonRecoverableData));
           ......
       }
   
       private CompletableFuture<Void> closeLedger(LedgerHandle ledgerHandle) {
           ......
           future.completeExceptionally(bkException("Failed to close ledger", rc, ledgerHandle.getId(), -1, autoSkipNonRecoverableData));
           ......
       }
   
       interface Functions {
           // remove getLedgerEntry
       }
   
       // move to here
       private CompletableFuture<LedgerEntry> getLedgerEntry(LedgerHandle ledger, long entry) {
           ......
           future.completeExceptionally(bkException("Failed to read entry", rc, ledger.getId(), entry,
                   autoSkipNonRecoverableData));
           ......
       }
   
       @NotNull
       private CompletableFuture<SchemaStorageFormat.SchemaEntry> readSchemaEntry(SchemaStorageFormat.PositionInfo position) {
           ......
           // Functions.getLedgerEntry(ledger, position.getEntryId())
           getLedgerEntry(ledger, position.getEntryId())
           ......
       }
   
       public static Exception bkException(String operation, int rc, long ledgerId, long entryId, boolean autoSkipNonRecoverableData) {
           String message = org.apache.bookkeeper.client.api.BKException.getMessage(rc)
                   + " -  ledger=" + ledgerId + " - operation=" + operation;
   
           if (entryId != -1) {
               message += " - entry=" + entryId;
           }
   
           boolean recoverable = rc != BKException.Code.NoSuchLedgerExistsException
                   && rc != BKException.Code.NoSuchEntryException;
           return !autoSkipNonRecoverableData ? new SchemaException(recoverable, message)
                   : new SchemaException(recoverable && rc != BKException.Code.NoSuchLedgerExistsOnMetadataServerException, message);
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #18010: [improve] [broker] make consumer and producer can work even if schema ledger lost

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode commented on code in PR #18010:
URL: https://github.com/apache/pulsar/pull/18010#discussion_r1140494435


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/LedgerBrokenTest.java:
##########
@@ -0,0 +1,401 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker;
+
+import static org.testng.Assert.*;
+import java.io.Serializable;
+import java.net.Inet4Address;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.pulsar.broker.service.schema.SchemaRegistry;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
+import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker")
+public class LedgerBrokenTest {
+
+    // prefer inet4.
+    private static final String LOCALHOST = Inet4Address.getLoopbackAddress().getHostAddress();
+    private static final String CLUSTER = "broken_ledger_test";
+    private static final String DEFAULT_TENANT = "public";
+    private static final String DEFAULT_NAMESPACE = DEFAULT_TENANT + "/default";
+
+    protected LocalBookkeeperEnsemble bkEnsemble;
+    protected ServiceConfiguration pulsarConfig;
+    protected PulsarService pulsarService;
+    protected int brokerWebServicePort;
+    protected int brokerServicePort;
+    protected String metadataServiceUri;
+    protected BookKeeper bookKeeperClient;
+    protected String brokerUrl;
+    protected String brokerServiceUrl;
+    protected PulsarAdmin pulsarAdmin;
+    protected PulsarClient pulsarClient;
+
+    @BeforeClass
+    protected void setup() throws Exception {
+        log.info("--- Start cluster ---");
+        startLocalBookie();
+        initPulsarConfig();
+        startPulsar();
+    }
+
+    @AfterClass
+    protected void cleanup() throws Exception {
+        log.info("--- Shutting down ---");
+        silentStopPulsar();
+        stopLocalBookie();
+    }
+
+    protected void startLocalBookie() throws Exception{
+        log.info("===> Start bookie ");
+        bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+        bkEnsemble.start();
+        metadataServiceUri = String.format("zk:%s:%s", LOCALHOST, bkEnsemble.getZookeeperPort());
+        initBookieClient();
+    }
+
+    protected void initBookieClient() throws Exception {
+        bookKeeperClient = new BookKeeper(String.format("%s:%s", LOCALHOST, bkEnsemble.getZookeeperPort()));
+    }
+
+    protected void stopLocalBookie() {
+        log.info("===> Close bookie client");
+        try {
+            bookKeeperClient.close();
+            // TODO delete bk files.
+            // TODO delete zk files.
+        } catch (Exception e){
+            log.error("Close bookie client fail", e);
+        }
+        log.info("===> Stop bookie ");
+        try {
+            bkEnsemble.stop();
+            // TODO delete bk files.
+            // TODO delete zk files.
+        } catch (Exception e){
+            log.error("Stop bookie fail", e);
+        }
+    }
+
+    protected void initPulsarConfig() throws Exception{
+        pulsarConfig = new ServiceConfiguration();
+        pulsarConfig.setAdvertisedAddress(LOCALHOST);
+        pulsarConfig.setMetadataStoreUrl(metadataServiceUri);
+        pulsarConfig.setClusterName(CLUSTER);
+        pulsarConfig.setTransactionCoordinatorEnabled(false);
+        pulsarConfig.setAllowAutoTopicCreation(true);
+        pulsarConfig.setAllowAutoTopicCreationType("non-partitioned");
+        pulsarConfig.setAutoSkipNonRecoverableData(true);
+    }
+
+    protected void startPulsar() throws Exception {
+        log.info("===> Start pulsar ");
+        pulsarService = new PulsarService(pulsarConfig);
+        pulsarService.start();
+        brokerWebServicePort = pulsarService.getListenPortHTTP().get();
+        brokerServicePort = pulsarService.getBrokerListenPort().get();
+        brokerUrl = String.format("http://%s:%s", LOCALHOST, brokerWebServicePort);
+        brokerServiceUrl = String.format("pulsar://%s:%s", LOCALHOST, brokerServicePort);
+        initPulsarAdmin();
+        initPulsarClient();
+        initDefaultNamespace();
+    }
+
+    protected void silentStopPulsar() throws Exception {
+        log.info("===> Close pulsar client ");
+        try {
+            pulsarClient.close();
+        }catch (Exception e){
+            log.error("===> Close pulsar client fail", e);
+        }
+        log.info("===> Close pulsar admin ");
+        try {
+            pulsarAdmin.close();
+        }catch (Exception e){
+            log.error("===> Close pulsar admin fail", e);
+        }
+        log.info("===> Stop pulsar service ");
+        try {
+            pulsarService.close();
+        }catch (Exception e){
+            log.error("===> Stop pulsar service fail", e);
+        }
+    }
+
+    protected void stopPulsar() throws Exception {
+        log.info("===> Close pulsar client ");
+        pulsarClient.close();
+        log.info("===> Close pulsar admin ");
+        pulsarAdmin.close();
+        log.info("===> Stop pulsar service ");
+        pulsarService.close();
+    }
+
+    protected void initPulsarAdmin() throws Exception {
+        pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(brokerUrl).build();
+    }
+
+    protected void initPulsarClient() throws Exception {
+        pulsarClient = PulsarClient.builder().serviceUrl(brokerServiceUrl).build();
+    }
+
+    protected void initDefaultNamespace() throws Exception {
+        if (!pulsarAdmin.clusters().getClusters().contains(CLUSTER)) {
+            pulsarAdmin.clusters().createCluster(CLUSTER, ClusterData.builder().serviceUrl(brokerServiceUrl).build());

Review Comment:
   Ah, you are right. Already fixed, thank you



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #18010: [improve] [broker] make consumer and producer can work even if schema ledger lost

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode commented on code in PR #18010:
URL: https://github.com/apache/pulsar/pull/18010#discussion_r1140507422


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java:
##########
@@ -691,7 +691,8 @@ public static Exception bkException(String operation, int rc, long ledgerId, lon
             message += " - entry=" + entryId;
         }
         boolean recoverable = rc != BKException.Code.NoSuchLedgerExistsException
-                && rc != BKException.Code.NoSuchEntryException;
+                && rc != BKException.Code.NoSuchEntryException
+                && rc != BKException.Code.NoSuchLedgerExistsOnMetadataServerException;

Review Comment:
   Hi @Denovo1998 
   
   I will take some time to rewrite this PR later. Is your environment hit this issue?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Denovo1998 commented on a diff in pull request #18010: [improve] [broker] make consumer and producer can work even if schema ledger lost

Posted by "Denovo1998 (via GitHub)" <gi...@apache.org>.
Denovo1998 commented on code in PR #18010:
URL: https://github.com/apache/pulsar/pull/18010#discussion_r1133131049


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/LedgerBrokenTest.java:
##########
@@ -0,0 +1,401 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker;
+
+import static org.testng.Assert.*;
+import java.io.Serializable;
+import java.net.Inet4Address;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.pulsar.broker.service.schema.SchemaRegistry;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
+import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker")
+public class LedgerBrokenTest {
+
+    // prefer inet4.
+    private static final String LOCALHOST = Inet4Address.getLoopbackAddress().getHostAddress();
+    private static final String CLUSTER = "broken_ledger_test";
+    private static final String DEFAULT_TENANT = "public";
+    private static final String DEFAULT_NAMESPACE = DEFAULT_TENANT + "/default";
+
+    protected LocalBookkeeperEnsemble bkEnsemble;
+    protected ServiceConfiguration pulsarConfig;
+    protected PulsarService pulsarService;
+    protected int brokerWebServicePort;
+    protected int brokerServicePort;
+    protected String metadataServiceUri;
+    protected BookKeeper bookKeeperClient;
+    protected String brokerUrl;
+    protected String brokerServiceUrl;
+    protected PulsarAdmin pulsarAdmin;
+    protected PulsarClient pulsarClient;
+
+    @BeforeClass
+    protected void setup() throws Exception {
+        log.info("--- Start cluster ---");
+        startLocalBookie();
+        initPulsarConfig();
+        startPulsar();
+    }
+
+    @AfterClass
+    protected void cleanup() throws Exception {
+        log.info("--- Shutting down ---");
+        silentStopPulsar();
+        stopLocalBookie();
+    }
+
+    protected void startLocalBookie() throws Exception{
+        log.info("===> Start bookie ");
+        bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+        bkEnsemble.start();
+        metadataServiceUri = String.format("zk:%s:%s", LOCALHOST, bkEnsemble.getZookeeperPort());
+        initBookieClient();
+    }
+
+    protected void initBookieClient() throws Exception {
+        bookKeeperClient = new BookKeeper(String.format("%s:%s", LOCALHOST, bkEnsemble.getZookeeperPort()));
+    }
+
+    protected void stopLocalBookie() {
+        log.info("===> Close bookie client");
+        try {
+            bookKeeperClient.close();
+            // TODO delete bk files.
+            // TODO delete zk files.
+        } catch (Exception e){
+            log.error("Close bookie client fail", e);
+        }
+        log.info("===> Stop bookie ");
+        try {
+            bkEnsemble.stop();
+            // TODO delete bk files.
+            // TODO delete zk files.
+        } catch (Exception e){
+            log.error("Stop bookie fail", e);
+        }
+    }
+
+    protected void initPulsarConfig() throws Exception{
+        pulsarConfig = new ServiceConfiguration();
+        pulsarConfig.setAdvertisedAddress(LOCALHOST);
+        pulsarConfig.setMetadataStoreUrl(metadataServiceUri);
+        pulsarConfig.setClusterName(CLUSTER);
+        pulsarConfig.setTransactionCoordinatorEnabled(false);
+        pulsarConfig.setAllowAutoTopicCreation(true);
+        pulsarConfig.setAllowAutoTopicCreationType("non-partitioned");
+        pulsarConfig.setAutoSkipNonRecoverableData(true);
+    }
+
+    protected void startPulsar() throws Exception {
+        log.info("===> Start pulsar ");
+        pulsarService = new PulsarService(pulsarConfig);
+        pulsarService.start();
+        brokerWebServicePort = pulsarService.getListenPortHTTP().get();
+        brokerServicePort = pulsarService.getBrokerListenPort().get();
+        brokerUrl = String.format("http://%s:%s", LOCALHOST, brokerWebServicePort);
+        brokerServiceUrl = String.format("pulsar://%s:%s", LOCALHOST, brokerServicePort);
+        initPulsarAdmin();
+        initPulsarClient();
+        initDefaultNamespace();
+    }
+
+    protected void silentStopPulsar() throws Exception {
+        log.info("===> Close pulsar client ");
+        try {
+            pulsarClient.close();
+        }catch (Exception e){
+            log.error("===> Close pulsar client fail", e);
+        }
+        log.info("===> Close pulsar admin ");
+        try {
+            pulsarAdmin.close();
+        }catch (Exception e){
+            log.error("===> Close pulsar admin fail", e);
+        }
+        log.info("===> Stop pulsar service ");
+        try {
+            pulsarService.close();
+        }catch (Exception e){
+            log.error("===> Stop pulsar service fail", e);
+        }
+    }
+
+    protected void stopPulsar() throws Exception {
+        log.info("===> Close pulsar client ");
+        pulsarClient.close();
+        log.info("===> Close pulsar admin ");
+        pulsarAdmin.close();
+        log.info("===> Stop pulsar service ");
+        pulsarService.close();
+    }
+
+    protected void initPulsarAdmin() throws Exception {
+        pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(brokerUrl).build();
+    }
+
+    protected void initPulsarClient() throws Exception {
+        pulsarClient = PulsarClient.builder().serviceUrl(brokerServiceUrl).build();
+    }
+
+    protected void initDefaultNamespace() throws Exception {
+        if (!pulsarAdmin.clusters().getClusters().contains(CLUSTER)) {
+            pulsarAdmin.clusters().createCluster(CLUSTER, ClusterData.builder().serviceUrl(brokerServiceUrl).build());

Review Comment:
   `serviceUrl(brokerServiceUrl)` should be set to `serviceUrl(brokerUrl)`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] github-actions[bot] commented on pull request #18010: [improve] [broker] make consumer and producer can work even if schema ledger lost

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #18010:
URL: https://github.com/apache/pulsar/pull/18010#issuecomment-1312319106

   The pr had no activity for 30 days, mark with Stale label.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Denovo1998 commented on a diff in pull request #18010: [improve] [broker] make consumer and producer can work even if schema ledger lost

Posted by "Denovo1998 (via GitHub)" <gi...@apache.org>.
Denovo1998 commented on code in PR #18010:
URL: https://github.com/apache/pulsar/pull/18010#discussion_r1140920052


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java:
##########
@@ -691,7 +691,8 @@ public static Exception bkException(String operation, int rc, long ledgerId, lon
             message += " - entry=" + entryId;
         }
         boolean recoverable = rc != BKException.Code.NoSuchLedgerExistsException
-                && rc != BKException.Code.NoSuchEntryException;
+                && rc != BKException.Code.NoSuchEntryException
+                && rc != BKException.Code.NoSuchLedgerExistsOnMetadataServerException;

Review Comment:
   No, no, no. My company doesn't use pulsar. I'm just interested in this pr and have some personal opinions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org