You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2024/01/16 08:34:00 UTC
(pulsar) branch branch-3.2 updated: [improve][broker] defer the ownership checks if the owner is inactive (ExtensibleLoadManager) (#21811)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new ffa6871a3be [improve][broker] defer the ownership checks if the owner is inactive (ExtensibleLoadManager) (#21811)
ffa6871a3be is described below
commit ffa6871a3be31a5a50e874eec0e150080b42a3a8
Author: Heesung Sohn <10...@users.noreply.github.com>
AuthorDate: Sat Dec 30 23:05:29 2023 -0800
[improve][broker] defer the ownership checks if the owner is inactive (ExtensibleLoadManager) (#21811)
---
.../extensions/ExtensibleLoadManagerWrapper.java | 2 -
.../channel/ServiceUnitStateChannelImpl.java | 86 ++++++++++++++--------
.../channel/ServiceUnitStateChannelTest.java | 75 +++++++++++++++++++
.../loadbalance/ExtensibleLoadManagerTest.java | 39 +++++++---
4 files changed, 161 insertions(+), 41 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java
index 18e949537de..cd1561cb70e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java
@@ -118,13 +118,11 @@ public class ExtensibleLoadManagerWrapper implements LoadManager {
@Override
public void writeLoadReportOnZookeeper() throws Exception {
// No-op, this operation is not useful, the load data reporter will automatically write.
- throw new UnsupportedOperationException();
}
@Override
public void writeResourceQuotasToZooKeeper() throws Exception {
// No-op, this operation is not useful, the load data reporter will automatically write.
- throw new UnsupportedOperationException();
}
@Override
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index 713d98b7250..bd571284346 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -487,6 +487,27 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
return isOwner(serviceUnit, lookupServiceAddress);
}
+ private CompletableFuture<Optional<String>> getActiveOwnerAsync(
+ String serviceUnit,
+ ServiceUnitState state,
+ Optional<String> owner) {
+ CompletableFuture<Optional<String>> activeOwner = owner.isPresent()
+ ? brokerRegistry.lookupAsync(owner.get()).thenApply(lookupData -> lookupData.flatMap(__ -> owner))
+ : CompletableFuture.completedFuture(Optional.empty());
+
+ return activeOwner
+ .thenCompose(broker -> broker
+ .map(__ -> activeOwner)
+ .orElseGet(() -> deferGetOwnerRequest(serviceUnit).thenApply(Optional::ofNullable)))
+ .whenComplete((__, e) -> {
+ if (e != null) {
+ log.error("Failed to get active owner broker. serviceUnit:{}, state:{}, owner:{}",
+ serviceUnit, state, owner, e);
+ ownerLookUpCounters.get(state).getFailure().incrementAndGet();
+ }
+ });
+ }
+
public CompletableFuture<Optional<String>> getOwnerAsync(String serviceUnit) {
if (!validateChannelState(Started, true)) {
return CompletableFuture.failedFuture(
@@ -498,18 +519,13 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
ownerLookUpCounters.get(state).getTotal().incrementAndGet();
switch (state) {
case Owned -> {
- return CompletableFuture.completedFuture(Optional.of(data.dstBroker()));
+ return getActiveOwnerAsync(serviceUnit, state, Optional.of(data.dstBroker()));
}
case Splitting -> {
- return CompletableFuture.completedFuture(Optional.of(data.sourceBroker()));
+ return getActiveOwnerAsync(serviceUnit, state, Optional.of(data.sourceBroker()));
}
case Assigning, Releasing -> {
- return deferGetOwnerRequest(serviceUnit).whenComplete((__, e) -> {
- if (e != null) {
- ownerLookUpCounters.get(state).getFailure().incrementAndGet();
- }
- }).thenApply(
- broker -> broker == null ? Optional.empty() : Optional.of(broker));
+ return getActiveOwnerAsync(serviceUnit, state, Optional.empty());
}
case Init, Free -> {
return CompletableFuture.completedFuture(Optional.empty());
@@ -812,9 +828,14 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
if (getOwnerRequest != null) {
getOwnerRequest.complete(null);
}
- stateChangeListeners.notify(serviceUnit, data, null);
+
if (isTargetBroker(data.sourceBroker())) {
- log(null, serviceUnit, data, null);
+ stateChangeListeners.notifyOnCompletion(
+ data.force() ? closeServiceUnit(serviceUnit, true)
+ : CompletableFuture.completedFuture(0), serviceUnit, data)
+ .whenComplete((__, e) -> log(e, serviceUnit, data, null));
+ } else {
+ stateChangeListeners.notify(serviceUnit, data, null);
}
}
@@ -1202,38 +1223,43 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
private ServiceUnitStateData getOverrideInactiveBrokerStateData(ServiceUnitStateData orphanData,
- String selectedBroker,
+ Optional<String> selectedBroker,
String inactiveBroker) {
+
+
+ if (selectedBroker.isEmpty()) {
+ return new ServiceUnitStateData(Free, null, inactiveBroker,
+ true, getNextVersionId(orphanData));
+ }
+
if (orphanData.state() == Splitting) {
- return new ServiceUnitStateData(Splitting, orphanData.dstBroker(), selectedBroker,
+ return new ServiceUnitStateData(Splitting, orphanData.dstBroker(), selectedBroker.get(),
Map.copyOf(orphanData.splitServiceUnitToDestBroker()),
true, getNextVersionId(orphanData));
} else {
- return new ServiceUnitStateData(Owned, selectedBroker, inactiveBroker,
+ return new ServiceUnitStateData(Owned, selectedBroker.get(), inactiveBroker,
true, getNextVersionId(orphanData));
}
}
private void overrideOwnership(String serviceUnit, ServiceUnitStateData orphanData, String inactiveBroker) {
Optional<String> selectedBroker = selectBroker(serviceUnit, inactiveBroker);
- if (selectedBroker.isPresent()) {
- var override = getOverrideInactiveBrokerStateData(
- orphanData, selectedBroker.get(), inactiveBroker);
- log.info("Overriding ownership serviceUnit:{} from orphanData:{} to overrideData:{}",
- serviceUnit, orphanData, override);
- publishOverrideEventAsync(serviceUnit, orphanData, override)
- .exceptionally(e -> {
- log.error(
- "Failed to override the ownership serviceUnit:{} orphanData:{}. "
- + "Failed to publish override event. totalCleanupErrorCnt:{}",
- serviceUnit, orphanData, totalCleanupErrorCnt.incrementAndGet());
- return null;
- });
- } else {
- log.error("Failed to override the ownership serviceUnit:{} orphanData:{}. Empty selected broker. "
+ if (selectedBroker.isEmpty()) {
+ log.warn("Empty selected broker for ownership serviceUnit:{} orphanData:{}."
+ "totalCleanupErrorCnt:{}",
serviceUnit, orphanData, totalCleanupErrorCnt.incrementAndGet());
}
+ var override = getOverrideInactiveBrokerStateData(orphanData, selectedBroker, inactiveBroker);
+ log.info("Overriding ownership serviceUnit:{} from orphanData:{} to overrideData:{}",
+ serviceUnit, orphanData, override);
+ publishOverrideEventAsync(serviceUnit, orphanData, override)
+ .exceptionally(e -> {
+ log.error(
+ "Failed to override the ownership serviceUnit:{} orphanData:{}. "
+ + "Failed to publish override event. totalCleanupErrorCnt:{}",
+ serviceUnit, orphanData, totalCleanupErrorCnt.incrementAndGet());
+ return null;
+ });
}
private void waitForCleanups(String broker, boolean excludeSystemTopics, int maxWaitTimeInMillis) {
@@ -1335,7 +1361,7 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
broker,
cleanupTime,
orphanServiceUnitCleanupCnt,
- totalCleanupErrorCntStart - totalCleanupErrorCnt.get(),
+ totalCleanupErrorCnt.get() - totalCleanupErrorCntStart,
printCleanupMetrics());
}
@@ -1524,7 +1550,7 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
inactiveBrokers, inactiveBrokers.size(),
orphanServiceUnitCleanupCnt,
serviceUnitTombstoneCleanupCnt,
- totalCleanupErrorCntStart - totalCleanupErrorCnt.get(),
+ totalCleanupErrorCnt.get() - totalCleanupErrorCntStart,
printCleanupMetrics());
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
index f99a167ff48..7bd12d66704 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
@@ -39,6 +39,7 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertThrows;
+import static org.testng.Assert.expectThrows;
import static org.testng.AssertJUnit.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
@@ -1560,6 +1561,80 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
cleanTableViews();
}
+ @Test(priority = 19)
+ public void testActiveGetOwner() throws Exception {
+
+
+ // set the bundle owner is the broker
+ String broker = lookupServiceAddress2;
+ String bundle = "public/owned/0xfffffff0_0xffffffff";
+ overrideTableViews(bundle,
+ new ServiceUnitStateData(Owned, broker, null, 1));
+ var owner = channel1.getOwnerAsync(bundle).get(5, TimeUnit.SECONDS).get();
+ assertEquals(owner, broker);
+
+ // simulate the owner is inactive
+ var spyRegistry = spy(new BrokerRegistryImpl(pulsar));
+ doReturn(CompletableFuture.completedFuture(Optional.empty()))
+ .when(spyRegistry).lookupAsync(eq(broker));
+ FieldUtils.writeDeclaredField(channel1,
+ "brokerRegistry", spyRegistry , true);
+ FieldUtils.writeDeclaredField(channel1,
+ "inFlightStateWaitingTimeInMillis", 1000, true);
+
+
+ // verify getOwnerAsync times out because the owner is inactive now.
+ long start = System.currentTimeMillis();
+ var ex = expectThrows(ExecutionException.class, () -> channel1.getOwnerAsync(bundle).get());
+ assertTrue(ex.getCause() instanceof TimeoutException);
+ assertTrue(System.currentTimeMillis() - start >= 1000);
+
+ // simulate ownership cleanup(no selected owner) by the leader channel
+ doReturn(CompletableFuture.completedFuture(Optional.empty()))
+ .when(loadManager).selectAsync(any(), any());
+ var leaderChannel = channel1;
+ String leader1 = channel1.getChannelOwnerAsync().get(2, TimeUnit.SECONDS).get();
+ String leader2 = channel2.getChannelOwnerAsync().get(2, TimeUnit.SECONDS).get();
+ assertEquals(leader1, leader2);
+ if (leader1.equals(lookupServiceAddress2)) {
+ leaderChannel = channel2;
+ }
+ leaderChannel.handleMetadataSessionEvent(SessionReestablished);
+ FieldUtils.writeDeclaredField(leaderChannel, "lastMetadataSessionEventTimestamp",
+ System.currentTimeMillis() - (MAX_CLEAN_UP_DELAY_TIME_IN_SECS * 1000 + 1000), true);
+ leaderChannel.handleBrokerRegistrationEvent(broker, NotificationType.Deleted);
+
+ // verify the ownership cleanup, and channel's getOwnerAsync returns empty result without timeout
+ FieldUtils.writeDeclaredField(channel1,
+ "inFlightStateWaitingTimeInMillis", 20 * 1000, true);
+ start = System.currentTimeMillis();
+ assertTrue(channel1.getOwnerAsync(bundle).get().isEmpty());
+ assertTrue(System.currentTimeMillis() - start < 20_000);
+
+ // simulate ownership cleanup(lookupServiceAddress1 selected owner) by the leader channel
+ overrideTableViews(bundle,
+ new ServiceUnitStateData(Owned, broker, null, 1));
+ doReturn(CompletableFuture.completedFuture(Optional.of(lookupServiceAddress1)))
+ .when(loadManager).selectAsync(any(), any());
+ leaderChannel.handleMetadataSessionEvent(SessionReestablished);
+ FieldUtils.writeDeclaredField(leaderChannel, "lastMetadataSessionEventTimestamp",
+ System.currentTimeMillis() - (MAX_CLEAN_UP_DELAY_TIME_IN_SECS * 1000 + 1000), true);
+ getCleanupJobs(leaderChannel).clear();
+ leaderChannel.handleBrokerRegistrationEvent(broker, NotificationType.Deleted);
+
+ // verify the ownership cleanup, and channel's getOwnerAsync returns lookupServiceAddress1 without timeout
+ start = System.currentTimeMillis();
+ assertEquals(lookupServiceAddress1, channel1.getOwnerAsync(bundle).get().get());
+ assertTrue(System.currentTimeMillis() - start < 20_000);
+
+ // test clean-up
+ FieldUtils.writeDeclaredField(channel1,
+ "inFlightStateWaitingTimeInMillis", 30 * 1000, true);
+ FieldUtils.writeDeclaredField(channel1,
+ "brokerRegistry", registry , true);
+ cleanTableViews();
+
+ }
private static ConcurrentHashMap<String, CompletableFuture<Optional<String>>> getOwnerRequests(
ServiceUnitStateChannel channel) throws IllegalAccessException {
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
index 64a05503e19..954c1aa3773 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
@@ -125,6 +125,26 @@ public class ExtensibleLoadManagerTest extends TestRetrySupport {
brokerContainer.start();
}
});
+ String topicName = "persistent://" + DEFAULT_NAMESPACE + "/startBrokerCheck";
+ Awaitility.await().atMost(120, TimeUnit.SECONDS).ignoreExceptions().until(
+ () -> {
+ for (BrokerContainer brokerContainer : pulsarCluster.getBrokers()) {
+ try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(
+ brokerContainer.getHttpServiceUrl()).build()) {
+ if (admin.brokers().getActiveBrokers(clusterName).size() != NUM_BROKERS) {
+ return false;
+ }
+ try {
+ admin.topics().createPartitionedTopic(topicName, 10);
+ } catch (PulsarAdminException.ConflictException e) {
+ // expected
+ }
+ admin.lookups().lookupPartitionedTopic(topicName);
+ }
+ }
+ return true;
+ }
+ );
}
}
@@ -243,7 +263,7 @@ public class ExtensibleLoadManagerTest extends TestRetrySupport {
assertFalse(admin.namespaces().getNamespaces(DEFAULT_TENANT).contains(namespace));
}
- @Test(timeOut = 40 * 1000)
+ @Test(timeOut = 120 * 1000)
public void testStopBroker() throws Exception {
String topicName = "persistent://" + DEFAULT_NAMESPACE + "/test-stop-broker-topic";
@@ -259,9 +279,11 @@ public class ExtensibleLoadManagerTest extends TestRetrySupport {
}
}
- String broker1 = admin.lookups().lookupTopic(topicName);
+ Awaitility.waitAtMost(60, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
+ String broker1 = admin.lookups().lookupTopic(topicName);
+ assertNotEquals(broker1, broker);
+ });
- assertNotEquals(broker1, broker);
}
@Test(timeOut = 80 * 1000)
@@ -309,7 +331,7 @@ public class ExtensibleLoadManagerTest extends TestRetrySupport {
parameters1.put("min_limit", "1");
parameters1.put("usage_threshold", "100");
- Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(
+ Awaitility.await().atMost(10, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(
() -> {
List<String> activeBrokers = admin.brokers().getActiveBrokers();
assertEquals(activeBrokers.size(), NUM_BROKERS);
@@ -350,14 +372,14 @@ public class ExtensibleLoadManagerTest extends TestRetrySupport {
}
}
- Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(
+ Awaitility.await().atMost(30, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(
() -> {
List<String> activeBrokers = admin.brokers().getActiveBrokers();
assertEquals(activeBrokers.size(), 2);
}
);
- Awaitility.await().atMost(30, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
+ Awaitility.await().atMost(60, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
String ownerBroker = admin.lookups().lookupTopic(topic);
assertEquals(extractBrokerIndex(ownerBroker), 1);
});
@@ -369,7 +391,7 @@ public class ExtensibleLoadManagerTest extends TestRetrySupport {
}
}
- Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(
+ Awaitility.await().atMost(30, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(
() -> {
List<String> activeBrokers = admin.brokers().getActiveBrokers();
assertEquals(activeBrokers.size(), 1);
@@ -380,8 +402,7 @@ public class ExtensibleLoadManagerTest extends TestRetrySupport {
fail();
} catch (Exception ex) {
log.error("Failed to lookup topic: ", ex);
- assertThat(ex.getMessage()).containsAnyOf("Failed to look up a broker",
- "Failed to select the new owner broker for bundle");
+ assertThat(ex.getMessage()).contains("Failed to select the new owner broker for bundle");
}
}