You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2024/01/16 08:34:00 UTC

(pulsar) branch branch-3.2 updated: [improve][broker] defer the ownership checks if the owner is inactive (ExtensibleLoadManager) (#21811)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new ffa6871a3be [improve][broker] defer the ownership checks if the owner is inactive (ExtensibleLoadManager) (#21811)
ffa6871a3be is described below

commit ffa6871a3be31a5a50e874eec0e150080b42a3a8
Author: Heesung Sohn <10...@users.noreply.github.com>
AuthorDate: Sat Dec 30 23:05:29 2023 -0800

    [improve][broker] defer the ownership checks if the owner is inactive (ExtensibleLoadManager) (#21811)
---
 .../extensions/ExtensibleLoadManagerWrapper.java   |  2 -
 .../channel/ServiceUnitStateChannelImpl.java       | 86 ++++++++++++++--------
 .../channel/ServiceUnitStateChannelTest.java       | 75 +++++++++++++++++++
 .../loadbalance/ExtensibleLoadManagerTest.java     | 39 +++++++---
 4 files changed, 161 insertions(+), 41 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java
index 18e949537de..cd1561cb70e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java
@@ -118,13 +118,11 @@ public class ExtensibleLoadManagerWrapper implements LoadManager {
     @Override
     public void writeLoadReportOnZookeeper() throws Exception {
         // No-op, this operation is not useful, the load data reporter will automatically write.
-        throw new UnsupportedOperationException();
     }
 
     @Override
     public void writeResourceQuotasToZooKeeper() throws Exception {
         // No-op, this operation is not useful, the load data reporter will automatically write.
-        throw new UnsupportedOperationException();
     }
 
     @Override
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index 713d98b7250..bd571284346 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -487,6 +487,27 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
         return isOwner(serviceUnit, lookupServiceAddress);
     }
 
+    private CompletableFuture<Optional<String>> getActiveOwnerAsync(
+            String serviceUnit,
+            ServiceUnitState state,
+            Optional<String> owner) {
+        CompletableFuture<Optional<String>> activeOwner = owner.isPresent()
+                ? brokerRegistry.lookupAsync(owner.get()).thenApply(lookupData -> lookupData.flatMap(__ -> owner))
+                : CompletableFuture.completedFuture(Optional.empty());
+
+        return activeOwner
+                .thenCompose(broker -> broker
+                        .map(__ -> activeOwner)
+                        .orElseGet(() -> deferGetOwnerRequest(serviceUnit).thenApply(Optional::ofNullable)))
+                .whenComplete((__, e) -> {
+                    if (e != null) {
+                        log.error("Failed to get active owner broker. serviceUnit:{}, state:{}, owner:{}",
+                                serviceUnit, state, owner, e);
+                        ownerLookUpCounters.get(state).getFailure().incrementAndGet();
+                    }
+                });
+    }
+
     public CompletableFuture<Optional<String>> getOwnerAsync(String serviceUnit) {
         if (!validateChannelState(Started, true)) {
             return CompletableFuture.failedFuture(
@@ -498,18 +519,13 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
         ownerLookUpCounters.get(state).getTotal().incrementAndGet();
         switch (state) {
             case Owned -> {
-                return CompletableFuture.completedFuture(Optional.of(data.dstBroker()));
+                return getActiveOwnerAsync(serviceUnit, state, Optional.of(data.dstBroker()));
             }
             case Splitting -> {
-                return CompletableFuture.completedFuture(Optional.of(data.sourceBroker()));
+                return getActiveOwnerAsync(serviceUnit, state, Optional.of(data.sourceBroker()));
             }
             case Assigning, Releasing -> {
-                return deferGetOwnerRequest(serviceUnit).whenComplete((__, e) -> {
-                    if (e != null) {
-                        ownerLookUpCounters.get(state).getFailure().incrementAndGet();
-                    }
-                }).thenApply(
-                        broker -> broker == null ? Optional.empty() : Optional.of(broker));
+                return getActiveOwnerAsync(serviceUnit, state, Optional.empty());
             }
             case Init, Free -> {
                 return CompletableFuture.completedFuture(Optional.empty());
@@ -812,9 +828,14 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
         if (getOwnerRequest != null) {
             getOwnerRequest.complete(null);
         }
-        stateChangeListeners.notify(serviceUnit, data, null);
+
         if (isTargetBroker(data.sourceBroker())) {
-            log(null, serviceUnit, data, null);
+            stateChangeListeners.notifyOnCompletion(
+                            data.force() ? closeServiceUnit(serviceUnit, true)
+                                    : CompletableFuture.completedFuture(0), serviceUnit, data)
+                    .whenComplete((__, e) -> log(e, serviceUnit, data, null));
+        } else {
+            stateChangeListeners.notify(serviceUnit, data, null);
         }
     }
 
@@ -1202,38 +1223,43 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
 
 
     private ServiceUnitStateData getOverrideInactiveBrokerStateData(ServiceUnitStateData orphanData,
-                                                                    String selectedBroker,
+                                                                    Optional<String> selectedBroker,
                                                                     String inactiveBroker) {
+
+
+        if (selectedBroker.isEmpty()) {
+            return new ServiceUnitStateData(Free, null, inactiveBroker,
+                    true, getNextVersionId(orphanData));
+        }
+
         if (orphanData.state() == Splitting) {
-            return new ServiceUnitStateData(Splitting, orphanData.dstBroker(), selectedBroker,
+            return new ServiceUnitStateData(Splitting, orphanData.dstBroker(), selectedBroker.get(),
                     Map.copyOf(orphanData.splitServiceUnitToDestBroker()),
                     true, getNextVersionId(orphanData));
         } else {
-            return new ServiceUnitStateData(Owned, selectedBroker, inactiveBroker,
+            return new ServiceUnitStateData(Owned, selectedBroker.get(), inactiveBroker,
                     true, getNextVersionId(orphanData));
         }
     }
 
     private void overrideOwnership(String serviceUnit, ServiceUnitStateData orphanData, String inactiveBroker) {
         Optional<String> selectedBroker = selectBroker(serviceUnit, inactiveBroker);
-        if (selectedBroker.isPresent()) {
-            var override = getOverrideInactiveBrokerStateData(
-                    orphanData, selectedBroker.get(), inactiveBroker);
-            log.info("Overriding ownership serviceUnit:{} from orphanData:{} to overrideData:{}",
-                    serviceUnit, orphanData, override);
-            publishOverrideEventAsync(serviceUnit, orphanData, override)
-                    .exceptionally(e -> {
-                        log.error(
-                                "Failed to override the ownership serviceUnit:{} orphanData:{}. "
-                                        + "Failed to publish override event. totalCleanupErrorCnt:{}",
-                                serviceUnit, orphanData, totalCleanupErrorCnt.incrementAndGet());
-                        return null;
-                    });
-        } else {
-            log.error("Failed to override the ownership serviceUnit:{} orphanData:{}. Empty selected broker. "
+        if (selectedBroker.isEmpty()) {
+            log.warn("Empty selected broker for ownership serviceUnit:{} orphanData:{}."
                             + "totalCleanupErrorCnt:{}",
                     serviceUnit, orphanData, totalCleanupErrorCnt.incrementAndGet());
         }
+        var override = getOverrideInactiveBrokerStateData(orphanData, selectedBroker, inactiveBroker);
+        log.info("Overriding ownership serviceUnit:{} from orphanData:{} to overrideData:{}",
+                serviceUnit, orphanData, override);
+        publishOverrideEventAsync(serviceUnit, orphanData, override)
+                .exceptionally(e -> {
+                    log.error(
+                            "Failed to override the ownership serviceUnit:{} orphanData:{}. "
+                                    + "Failed to publish override event. totalCleanupErrorCnt:{}",
+                            serviceUnit, orphanData, totalCleanupErrorCnt.incrementAndGet());
+                    return null;
+                });
     }
 
     private void waitForCleanups(String broker, boolean excludeSystemTopics, int maxWaitTimeInMillis) {
@@ -1335,7 +1361,7 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
                 broker,
                 cleanupTime,
                 orphanServiceUnitCleanupCnt,
-                totalCleanupErrorCntStart - totalCleanupErrorCnt.get(),
+                totalCleanupErrorCnt.get() - totalCleanupErrorCntStart,
                 printCleanupMetrics());
 
     }
@@ -1524,7 +1550,7 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
                     inactiveBrokers, inactiveBrokers.size(),
                     orphanServiceUnitCleanupCnt,
                     serviceUnitTombstoneCleanupCnt,
-                    totalCleanupErrorCntStart - totalCleanupErrorCnt.get(),
+                    totalCleanupErrorCnt.get() - totalCleanupErrorCntStart,
                     printCleanupMetrics());
         }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
index f99a167ff48..7bd12d66704 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
@@ -39,6 +39,7 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.lessThanOrEqualTo;
 import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertThrows;
+import static org.testng.Assert.expectThrows;
 import static org.testng.AssertJUnit.assertEquals;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
@@ -1560,6 +1561,80 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
         cleanTableViews();
     }
 
+    @Test(priority = 19)
+    public void testActiveGetOwner() throws Exception {
+
+
+        // set the bundle owner is the broker
+        String broker = lookupServiceAddress2;
+        String bundle = "public/owned/0xfffffff0_0xffffffff";
+        overrideTableViews(bundle,
+                new ServiceUnitStateData(Owned, broker, null, 1));
+        var owner = channel1.getOwnerAsync(bundle).get(5, TimeUnit.SECONDS).get();
+        assertEquals(owner, broker);
+
+        // simulate the owner is inactive
+        var spyRegistry = spy(new BrokerRegistryImpl(pulsar));
+        doReturn(CompletableFuture.completedFuture(Optional.empty()))
+                .when(spyRegistry).lookupAsync(eq(broker));
+        FieldUtils.writeDeclaredField(channel1,
+                "brokerRegistry", spyRegistry , true);
+        FieldUtils.writeDeclaredField(channel1,
+                "inFlightStateWaitingTimeInMillis", 1000, true);
+
+
+        // verify getOwnerAsync times out because the owner is inactive now.
+        long start = System.currentTimeMillis();
+        var ex = expectThrows(ExecutionException.class, () -> channel1.getOwnerAsync(bundle).get());
+        assertTrue(ex.getCause() instanceof TimeoutException);
+        assertTrue(System.currentTimeMillis() - start >= 1000);
+
+        // simulate ownership cleanup(no selected owner) by the leader channel
+        doReturn(CompletableFuture.completedFuture(Optional.empty()))
+                .when(loadManager).selectAsync(any(), any());
+        var leaderChannel = channel1;
+        String leader1 = channel1.getChannelOwnerAsync().get(2, TimeUnit.SECONDS).get();
+        String leader2 = channel2.getChannelOwnerAsync().get(2, TimeUnit.SECONDS).get();
+        assertEquals(leader1, leader2);
+        if (leader1.equals(lookupServiceAddress2)) {
+            leaderChannel = channel2;
+        }
+        leaderChannel.handleMetadataSessionEvent(SessionReestablished);
+        FieldUtils.writeDeclaredField(leaderChannel, "lastMetadataSessionEventTimestamp",
+                System.currentTimeMillis() - (MAX_CLEAN_UP_DELAY_TIME_IN_SECS * 1000 + 1000), true);
+        leaderChannel.handleBrokerRegistrationEvent(broker, NotificationType.Deleted);
+
+        // verify the ownership cleanup, and channel's getOwnerAsync returns empty result without timeout
+        FieldUtils.writeDeclaredField(channel1,
+                "inFlightStateWaitingTimeInMillis", 20 * 1000, true);
+        start = System.currentTimeMillis();
+        assertTrue(channel1.getOwnerAsync(bundle).get().isEmpty());
+        assertTrue(System.currentTimeMillis() - start < 20_000);
+
+        // simulate ownership cleanup(lookupServiceAddress1 selected owner) by the leader channel
+        overrideTableViews(bundle,
+                new ServiceUnitStateData(Owned, broker, null, 1));
+        doReturn(CompletableFuture.completedFuture(Optional.of(lookupServiceAddress1)))
+                .when(loadManager).selectAsync(any(), any());
+        leaderChannel.handleMetadataSessionEvent(SessionReestablished);
+        FieldUtils.writeDeclaredField(leaderChannel, "lastMetadataSessionEventTimestamp",
+                System.currentTimeMillis() - (MAX_CLEAN_UP_DELAY_TIME_IN_SECS * 1000 + 1000), true);
+        getCleanupJobs(leaderChannel).clear();
+        leaderChannel.handleBrokerRegistrationEvent(broker, NotificationType.Deleted);
+
+        // verify the ownership cleanup, and channel's getOwnerAsync returns lookupServiceAddress1 without timeout
+        start = System.currentTimeMillis();
+        assertEquals(lookupServiceAddress1, channel1.getOwnerAsync(bundle).get().get());
+        assertTrue(System.currentTimeMillis() - start < 20_000);
+
+        // test clean-up
+        FieldUtils.writeDeclaredField(channel1,
+                "inFlightStateWaitingTimeInMillis", 30 * 1000, true);
+        FieldUtils.writeDeclaredField(channel1,
+                "brokerRegistry", registry , true);
+        cleanTableViews();
+
+    }
 
     private static ConcurrentHashMap<String, CompletableFuture<Optional<String>>> getOwnerRequests(
             ServiceUnitStateChannel channel) throws IllegalAccessException {
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
index 64a05503e19..954c1aa3773 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
@@ -125,6 +125,26 @@ public class ExtensibleLoadManagerTest extends TestRetrySupport {
                     brokerContainer.start();
                 }
             });
+            String topicName = "persistent://" + DEFAULT_NAMESPACE + "/startBrokerCheck";
+            Awaitility.await().atMost(120, TimeUnit.SECONDS).ignoreExceptions().until(
+                    () -> {
+                        for (BrokerContainer brokerContainer : pulsarCluster.getBrokers()) {
+                            try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(
+                                    brokerContainer.getHttpServiceUrl()).build()) {
+                                if (admin.brokers().getActiveBrokers(clusterName).size() != NUM_BROKERS) {
+                                    return false;
+                                }
+                                try {
+                                    admin.topics().createPartitionedTopic(topicName, 10);
+                                } catch (PulsarAdminException.ConflictException e) {
+                                    // expected
+                                }
+                                admin.lookups().lookupPartitionedTopic(topicName);
+                            }
+                        }
+                        return true;
+                    }
+            );
         }
     }
 
@@ -243,7 +263,7 @@ public class ExtensibleLoadManagerTest extends TestRetrySupport {
         assertFalse(admin.namespaces().getNamespaces(DEFAULT_TENANT).contains(namespace));
     }
 
-    @Test(timeOut = 40 * 1000)
+    @Test(timeOut = 120 * 1000)
     public void testStopBroker() throws Exception {
         String topicName = "persistent://" + DEFAULT_NAMESPACE + "/test-stop-broker-topic";
 
@@ -259,9 +279,11 @@ public class ExtensibleLoadManagerTest extends TestRetrySupport {
             }
         }
 
-        String broker1 = admin.lookups().lookupTopic(topicName);
+        Awaitility.waitAtMost(60, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
+            String broker1 = admin.lookups().lookupTopic(topicName);
+            assertNotEquals(broker1, broker);
+        });
 
-        assertNotEquals(broker1, broker);
     }
 
     @Test(timeOut = 80 * 1000)
@@ -309,7 +331,7 @@ public class ExtensibleLoadManagerTest extends TestRetrySupport {
         parameters1.put("min_limit", "1");
         parameters1.put("usage_threshold", "100");
 
-        Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(
+        Awaitility.await().atMost(10, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(
                 () -> {
                     List<String> activeBrokers = admin.brokers().getActiveBrokers();
                     assertEquals(activeBrokers.size(), NUM_BROKERS);
@@ -350,14 +372,14 @@ public class ExtensibleLoadManagerTest extends TestRetrySupport {
             }
         }
 
-        Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(
+        Awaitility.await().atMost(30, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(
                 () -> {
                     List<String> activeBrokers = admin.brokers().getActiveBrokers();
                     assertEquals(activeBrokers.size(), 2);
                 }
         );
 
-        Awaitility.await().atMost(30, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
+        Awaitility.await().atMost(60, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
             String ownerBroker = admin.lookups().lookupTopic(topic);
             assertEquals(extractBrokerIndex(ownerBroker), 1);
         });
@@ -369,7 +391,7 @@ public class ExtensibleLoadManagerTest extends TestRetrySupport {
             }
         }
 
-        Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(
+        Awaitility.await().atMost(30, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(
             () -> {
                 List<String> activeBrokers = admin.brokers().getActiveBrokers();
                 assertEquals(activeBrokers.size(), 1);
@@ -380,8 +402,7 @@ public class ExtensibleLoadManagerTest extends TestRetrySupport {
             fail();
         } catch (Exception ex) {
             log.error("Failed to lookup topic: ", ex);
-            assertThat(ex.getMessage()).containsAnyOf("Failed to look up a broker",
-                    "Failed to select the new owner broker for bundle");
+            assertThat(ex.getMessage()).contains("Failed to select the new owner broker for bundle");
         }
     }