You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by js...@apache.org on 2022/08/12 16:06:39 UTC
[kafka] branch trunk updated: KAFKA-13959: Controller should unfence Broker with busy metadata log (#12274)
This is an automated email from the ASF dual-hosted git repository.
jsancio pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 50e5b32a6d5 KAFKA-13959: Controller should unfence Broker with busy metadata log (#12274)
50e5b32a6d5 is described below
commit 50e5b32a6d5f7940e932c6acb20a452535c0bf60
Author: dengziming <de...@gmail.com>
AuthorDate: Sat Aug 13 00:06:24 2022 +0800
KAFKA-13959: Controller should unfence Broker with busy metadata log (#12274)
The reason for KAFKA-13959 is a little complex, the two keys to this problem are:
KafkaRaftClient.MAX_FETCH_WAIT_MS==MetadataMaxIdleIntervalMs == 500ms. We rely on fetchPurgatory to complete a FetchRequest, in details, if FetchRequest.fetchOffset >= log.endOffset, we will wait for 500ms to send a FetchResponse. The follower needs to send one more FetchRequest to get the HW.
Here are the event sequences:
1. When starting the leader(active controller) LEO=m+1(m is the offset of the last record), leader HW=m(because we need more than half of the voters to reach m+1)
2. Follower (standby controller) and observer (broker) send FetchRequest(fetchOffset=m)
2.1. leader receives FetchRequest, set leader HW=m and waits 500ms before send FetchResponse
2.2. leader send FetchResponse(HW=m)
3.3 broker receive FetchResponse(HW=m), set metadataOffset=m.
3. Leader append NoOpRecord, LEO=m+2. leader HW=m
4. Looping 1-4
If we change MAX_FETCH_WAIT_MS=200 (less than half of MetadataMaxIdleIntervalMs), this problem can be solved temporarily.
We plan to improve this problem in 2 ways, firstly, in this PR, we change the controller to unfence a broker when the broker's high-watermark has reached the broker registration record for that broker. Secondly, we will propagate the HWM to the replicas as quickly as possible in KAFKA-14145.
Reviewers: Luke Chen <sh...@gmail.com>, José Armando García Sancio <js...@users.noreply.github.com>
---
core/src/main/scala/kafka/server/KafkaConfig.scala | 2 +-
.../kafka/controller/BrokerHeartbeatManager.java | 26 ++++++++---------
.../kafka/controller/ClusterControlManager.java | 25 ++++++++++++++--
.../apache/kafka/controller/QuorumController.java | 27 +++++++++++++-----
.../controller/ReplicationControlManager.java | 6 ++--
.../controller/ClusterControlManagerTest.java | 19 +++++++------
.../controller/ProducerIdControlManagerTest.java | 2 +-
.../org/apache/kafka/metadata/RecordTestUtils.java | 33 +++++++++++++---------
.../kafka/raft/internals/FuturePurgatory.java | 4 +--
9 files changed, 92 insertions(+), 52 deletions(-)
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 4e253047ee6..860056f9a3e 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -81,7 +81,7 @@ object Defaults {
val BrokerHeartbeatIntervalMs = 2000
val BrokerSessionTimeoutMs = 9000
val MetadataSnapshotMaxNewRecordBytes = 20 * 1024 * 1024
- val MetadataMaxIdleIntervalMs = 5000
+ val MetadataMaxIdleIntervalMs = 500
/** KRaft mode configs */
val EmptyNodeId: Int = -1
diff --git a/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java b/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java
index f31df917d76..428f1c5833e 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java
@@ -511,17 +511,17 @@ public class BrokerHeartbeatManager {
/**
* Calculate the next broker state for a broker that just sent a heartbeat request.
*
- * @param brokerId The broker id.
- * @param request The incoming heartbeat request.
- * @param lastCommittedOffset The last committed offset of the quorum controller.
- * @param hasLeaderships A callback which evaluates to true if the broker leads
- * at least one partition.
+ * @param brokerId The broker id.
+ * @param request The incoming heartbeat request.
+ * @param registerBrokerRecordOffset The offset of the broker's {@link org.apache.kafka.common.metadata.RegisterBrokerRecord}.
+ * @param hasLeaderships A callback which evaluates to true if the broker leads
+ * at least one partition.
*
- * @return The current and next broker states.
+ * @return The current and next broker states.
*/
BrokerControlStates calculateNextBrokerState(int brokerId,
BrokerHeartbeatRequestData request,
- long lastCommittedOffset,
+ long registerBrokerRecordOffset,
Supplier<Boolean> hasLeaderships) {
BrokerHeartbeatState broker = brokers.getOrDefault(brokerId,
new BrokerHeartbeatState(brokerId));
@@ -533,17 +533,17 @@ public class BrokerHeartbeatManager {
"shutdown.", brokerId);
return new BrokerControlStates(currentState, SHUTDOWN_NOW);
} else if (!request.wantFence()) {
- if (request.currentMetadataOffset() >= lastCommittedOffset) {
+ if (request.currentMetadataOffset() >= registerBrokerRecordOffset) {
log.info("The request from broker {} to unfence has been granted " +
- "because it has caught up with the last committed metadata " +
- "offset {}.", brokerId, lastCommittedOffset);
+ "because it has caught up with the offset of it's register " +
+ "broker record {}.", brokerId, registerBrokerRecordOffset);
return new BrokerControlStates(currentState, UNFENCED);
} else {
if (log.isDebugEnabled()) {
log.debug("The request from broker {} to unfence cannot yet " +
- "be granted because it has not caught up with the last " +
- "committed metadata offset {}. It is still at offset {}.",
- brokerId, lastCommittedOffset, request.currentMetadataOffset());
+ "be granted because it has not caught up with the offset of " +
+ "it's register broker record {}. It is still at offset {}.",
+ brokerId, registerBrokerRecordOffset, request.currentMetadataOffset());
}
return new BrokerControlStates(currentState, FENCED);
}
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
index 235f077cfff..d30f4324217 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
@@ -60,6 +60,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import java.util.Optional;
+import java.util.OptionalLong;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -217,6 +218,14 @@ public class ClusterControlManager {
*/
private final TimelineHashMap<Integer, BrokerRegistration> brokerRegistrations;
+ /**
+ * Save the offset of each broker registration record, we will only unfence a
+ * broker when its high watermark has reached its broker registration record,
+ * this is not necessarily the exact offset of each broker registration record
+ * but should not be smaller than it.
+ */
+ private final TimelineHashMap<Integer, Long> registerBrokerRecordOffsets;
+
/**
* A reference to the controller's metrics registry.
*/
@@ -255,6 +264,7 @@ public class ClusterControlManager {
this.sessionTimeoutNs = sessionTimeoutNs;
this.replicaPlacer = replicaPlacer;
this.brokerRegistrations = new TimelineHashMap<>(snapshotRegistry, 0);
+ this.registerBrokerRecordOffsets = new TimelineHashMap<>(snapshotRegistry, 0);
this.heartbeatManager = null;
this.readyBrokersFuture = Optional.empty();
this.controllerMetrics = metrics;
@@ -366,7 +376,15 @@ public class ClusterControlManager {
return ControllerResult.atomicOf(records, new BrokerRegistrationReply(brokerEpoch));
}
- public void replay(RegisterBrokerRecord record) {
+ public OptionalLong registerBrokerRecordOffset(int brokerId) {
+ if (registerBrokerRecordOffsets.containsKey(brokerId)) {
+ return OptionalLong.of(registerBrokerRecordOffsets.get(brokerId));
+ }
+ return OptionalLong.empty();
+ }
+
+ public void replay(RegisterBrokerRecord record, long offset) {
+ registerBrokerRecordOffsets.put(record.brokerId(), offset);
int brokerId = record.brokerId();
List<Endpoint> listeners = new ArrayList<>();
for (BrokerEndpoint endpoint : record.endPoints()) {
@@ -401,14 +419,15 @@ public class ClusterControlManager {
}
public void replay(UnregisterBrokerRecord record) {
+ registerBrokerRecordOffsets.remove(record.brokerId());
int brokerId = record.brokerId();
BrokerRegistration registration = brokerRegistrations.get(brokerId);
if (registration == null) {
throw new RuntimeException(String.format("Unable to replay %s: no broker " +
- "registration found for that id", record.toString()));
+ "registration found for that id", record));
} else if (registration.epoch() != record.brokerEpoch()) {
throw new RuntimeException(String.format("Unable to replay %s: no broker " +
- "registration with that epoch found", record.toString()));
+ "registration with that epoch found", record));
} else {
if (heartbeatManager != null) heartbeatManager.remove(brokerId);
brokerRegistrations.remove(brokerId);
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index ef87248f134..3fee25841ba 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -27,6 +27,7 @@ import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.BrokerIdNotRegisteredException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.NotControllerException;
+import org.apache.kafka.common.errors.StaleBrokerEpochException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.message.AllocateProducerIdsRequestData;
@@ -759,7 +760,7 @@ public final class QuorumController implements Controller {
int i = 1;
for (ApiMessageAndVersion message : result.records()) {
try {
- replay(message.message(), Optional.empty());
+ replay(message.message(), Optional.empty(), writeOffset + result.records().size());
} catch (Throwable e) {
String failureMessage = String.format("Unable to apply %s record, which was " +
"%d of %d record(s) in the batch following last writeOffset %d.",
@@ -883,7 +884,7 @@ public final class QuorumController implements Controller {
int i = 1;
for (ApiMessageAndVersion message : messages) {
try {
- replay(message.message(), Optional.empty());
+ replay(message.message(), Optional.empty(), offset);
} catch (Throwable e) {
String failureMessage = String.format("Unable to apply %s record on standby " +
"controller, which was %d of %d record(s) in the batch with baseOffset %d.",
@@ -938,7 +939,7 @@ public final class QuorumController implements Controller {
int i = 1;
for (ApiMessageAndVersion message : messages) {
try {
- replay(message.message(), Optional.of(reader.snapshotId()));
+ replay(message.message(), Optional.of(reader.snapshotId()), reader.lastContainedLogOffset());
} catch (Throwable e) {
String failureMessage = String.format("Unable to apply %s record " +
"from snapshot %s on standby controller, which was %d of " +
@@ -1305,12 +1306,19 @@ public final class QuorumController implements Controller {
}
}
- @SuppressWarnings("unchecked")
- private void replay(ApiMessage message, Optional<OffsetAndEpoch> snapshotId) {
+ /**
+ * Apply the metadata record to its corresponding in-memory state(s)
+ *
+ * @param message The metadata record
+ * @param snapshotId The snapshotId if this record is from a snapshot
+ * @param batchLastOffset The offset of the last record in the log batch, or the lastContainedLogOffset
+ * if this record is from a snapshot, this is used along with RegisterBrokerRecord
+ */
+ private void replay(ApiMessage message, Optional<OffsetAndEpoch> snapshotId, long batchLastOffset) {
MetadataRecordType type = MetadataRecordType.fromId(message.apiKey());
switch (type) {
case REGISTER_BROKER_RECORD:
- clusterControl.replay((RegisterBrokerRecord) message);
+ clusterControl.replay((RegisterBrokerRecord) message, batchLastOffset);
break;
case UNREGISTER_BROKER_RECORD:
clusterControl.replay((UnregisterBrokerRecord) message);
@@ -1874,8 +1882,13 @@ public final class QuorumController implements Controller {
@Override
public ControllerResult<BrokerHeartbeatReply> generateRecordsAndResult() {
+ OptionalLong offsetForRegisterBrokerRecord = clusterControl.registerBrokerRecordOffset(brokerId);
+ if (!offsetForRegisterBrokerRecord.isPresent()) {
+ throw new StaleBrokerEpochException(
+ String.format("Receive a heartbeat from broker %d before registration", brokerId));
+ }
ControllerResult<BrokerHeartbeatReply> result = replicationControl.
- processBrokerHeartbeat(request, lastCommittedOffset);
+ processBrokerHeartbeat(request, offsetForRegisterBrokerRecord.getAsLong());
inControlledShutdown = result.response().inControlledShutdown();
rescheduleMaybeFenceStaleBrokers();
return result;
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index bf3a679d2ce..4ffb339967c 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -1355,13 +1355,13 @@ public class ReplicationControlManager {
}
ControllerResult<BrokerHeartbeatReply> processBrokerHeartbeat(
- BrokerHeartbeatRequestData request, long lastCommittedOffset) {
+ BrokerHeartbeatRequestData request, long registerBrokerRecordOffset) {
int brokerId = request.brokerId();
long brokerEpoch = request.brokerEpoch();
clusterControl.checkBrokerEpoch(brokerId, brokerEpoch);
BrokerHeartbeatManager heartbeatManager = clusterControl.heartbeatManager();
BrokerControlStates states = heartbeatManager.calculateNextBrokerState(brokerId,
- request, lastCommittedOffset, () -> brokersToIsrs.hasLeaderships(brokerId));
+ request, registerBrokerRecordOffset, () -> brokersToIsrs.hasLeaderships(brokerId));
List<ApiMessageAndVersion> records = new ArrayList<>();
if (states.current() != states.next()) {
switch (states.next()) {
@@ -1382,7 +1382,7 @@ public class ReplicationControlManager {
heartbeatManager.touch(brokerId,
states.next().fenced(),
request.currentMetadataOffset());
- boolean isCaughtUp = request.currentMetadataOffset() >= lastCommittedOffset;
+ boolean isCaughtUp = request.currentMetadataOffset() >= registerBrokerRecordOffset;
BrokerHeartbeatReply reply = new BrokerHeartbeatReply(isCaughtUp,
states.next().fenced(),
states.next().inControlledShutdown(),
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
index 97d6c883772..e47def81e6d 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
@@ -96,7 +96,7 @@ public class ClusterControlManagerTest {
setPort((short) 9092).
setName("PLAINTEXT").
setHost("example.com"));
- clusterControl.replay(brokerRecord);
+ clusterControl.replay(brokerRecord, 100L);
clusterControl.checkBrokerEpoch(1, 100);
assertThrows(StaleBrokerEpochException.class,
() -> clusterControl.checkBrokerEpoch(1, 101));
@@ -165,19 +165,20 @@ public class ClusterControlManagerTest {
setPort((short) 9092).
setName("PLAINTEXT").
setHost("example.com"));
- clusterControl.replay(brokerRecord);
+ clusterControl.replay(brokerRecord, 100L);
assertFalse(clusterControl.unfenced(0));
assertTrue(clusterControl.inControlledShutdown(0));
brokerRecord.setInControlledShutdown(false);
- clusterControl.replay(brokerRecord);
+ clusterControl.replay(brokerRecord, 100L);
assertFalse(clusterControl.unfenced(0));
assertFalse(clusterControl.inControlledShutdown(0));
+ assertEquals(100L, clusterControl.registerBrokerRecordOffset(brokerRecord.brokerId()).getAsLong());
brokerRecord.setFenced(false);
- clusterControl.replay(brokerRecord);
+ clusterControl.replay(brokerRecord, 100L);
assertTrue(clusterControl.unfenced(0));
assertFalse(clusterControl.inControlledShutdown(0));
@@ -217,7 +218,7 @@ public class ClusterControlManagerTest {
setPort((short) 9092).
setName("PLAINTEXT").
setHost("example.com"));
- clusterControl.replay(brokerRecord);
+ clusterControl.replay(brokerRecord, 100L);
assertTrue(clusterControl.unfenced(0));
assertFalse(clusterControl.inControlledShutdown(0));
@@ -341,17 +342,19 @@ public class ClusterControlManagerTest {
setFeatureControlManager(featureControl).
build();
clusterControl.activate();
- clusterControl.replay(brokerRecord);
+ clusterControl.replay(brokerRecord, 100L);
assertEquals(new BrokerRegistration(1, 100,
Uuid.fromString("fPZv1VBsRFmnlRvmGcOW9w"), Collections.singletonMap("PLAINTEXT",
new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, "example.com", 9092)),
Collections.emptyMap(), Optional.of("arack"), true, false),
clusterControl.brokerRegistrations().get(1));
+ assertEquals(100L, clusterControl.registerBrokerRecordOffset(brokerRecord.brokerId()).getAsLong());
UnregisterBrokerRecord unregisterRecord = new UnregisterBrokerRecord().
setBrokerId(1).
setBrokerEpoch(100);
clusterControl.replay(unregisterRecord);
assertFalse(clusterControl.brokerRegistrations().containsKey(1));
+ assertFalse(clusterControl.registerBrokerRecordOffset(brokerRecord.brokerId()).isPresent());
}
@ParameterizedTest
@@ -382,7 +385,7 @@ public class ClusterControlManagerTest {
setPort((short) 9092).
setName("PLAINTEXT").
setHost("example.com"));
- clusterControl.replay(brokerRecord);
+ clusterControl.replay(brokerRecord, 100L);
UnfenceBrokerRecord unfenceRecord =
new UnfenceBrokerRecord().setId(i).setEpoch(100);
clusterControl.replay(unfenceRecord);
@@ -442,7 +445,7 @@ public class ClusterControlManagerTest {
setPort((short) 9092 + i).
setName("PLAINTEXT").
setHost("example.com"));
- clusterControl.replay(brokerRecord);
+ clusterControl.replay(brokerRecord, 100L);
}
for (int i = 0; i < 2; i++) {
UnfenceBrokerRecord unfenceBrokerRecord =
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java
index ccdd3a5b233..80c5c505ae0 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java
@@ -76,7 +76,7 @@ public class ProducerIdControlManagerTest {
setPort((short) 9092).
setName("PLAINTEXT").
setHost(String.format("broker-%02d.example.org", i)));
- clusterControl.replay(brokerRecord);
+ clusterControl.replay(brokerRecord, 100L);
}
this.producerIdControlManager = new ProducerIdControlManager(clusterControl, snapshotRegistry);
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java b/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java
index f5a8da5f8a2..c21bdb54478 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java
@@ -57,20 +57,25 @@ public class RecordTestUtils {
for (ApiMessageAndVersion recordAndVersion : recordsAndVersions) {
ApiMessage record = recordAndVersion.message();
try {
- Method method = target.getClass().getMethod("replay", record.getClass());
- method.invoke(target, record);
- } catch (NoSuchMethodException e) {
try {
- Method method = target.getClass().getMethod("replay",
- record.getClass(),
- Optional.class);
- method.invoke(target, record, Optional.empty());
- } catch (NoSuchMethodException t) {
- // ignore
- } catch (InvocationTargetException t) {
- throw new RuntimeException(t);
- } catch (IllegalAccessException t) {
- throw new RuntimeException(t);
+ Method method = target.getClass().getMethod("replay", record.getClass());
+ method.invoke(target, record);
+ } catch (NoSuchMethodException e) {
+ try {
+ Method method = target.getClass().getMethod("replay",
+ record.getClass(),
+ Optional.class);
+ method.invoke(target, record, Optional.empty());
+ } catch (NoSuchMethodException t) {
+ try {
+ Method method = target.getClass().getMethod("replay",
+ record.getClass(),
+ long.class);
+ method.invoke(target, record, 0L);
+ } catch (NoSuchMethodException i) {
+ // ignore
+ }
+ }
}
} catch (InvocationTargetException e) {
throw new RuntimeException(e);
@@ -119,7 +124,7 @@ public class RecordTestUtils {
* @param delta the metadata delta on which to replay the records
* @param highestOffset highest offset from the list of record batches
* @param highestEpoch highest epoch from the list of record batches
- * @param recordsAndVersions list of batches of records
+ * @param batches list of batches of records
*/
public static void replayAllBatches(
MetadataDelta delta,
diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/FuturePurgatory.java b/raft/src/main/java/org/apache/kafka/raft/internals/FuturePurgatory.java
index b37fb3a3847..e5dceeaa0c3 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/FuturePurgatory.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/FuturePurgatory.java
@@ -56,8 +56,8 @@ public interface FuturePurgatory<T extends Comparable<T>> {
CompletableFuture<Long> await(T threshold, long maxWaitTimeMs);
/**
- * Complete awaiting futures whose associated values are larger than the given threshold value.
- * The completion callbacks will be triggered from the calling thread.
+ * Complete awaiting futures whose threshold value from {@link FuturePurgatory#await} are smaller
+ * than the given threshold value. The completion callbacks will be triggered from the calling thread.
*
* @param value the threshold value used to determine which futures can be completed
* @param currentTimeMs the current time in milliseconds that will be passed to