You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2021/03/04 21:04:30 UTC
[kafka] 01/02: KAFKA-12376: Apply atomic append to the log (#10253)
This is an automated email from the ASF dual-hosted git repository.
davidarthur pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit d69212a42246eeaa123e0a574d08ba44e37e4a1d
Author: José Armando García Sancio <js...@users.noreply.github.com>
AuthorDate: Thu Mar 4 10:55:43 2021 -0800
KAFKA-12376: Apply atomic append to the log (#10253)
---
core/src/main/scala/kafka/raft/RaftManager.scala | 29 ++++++--
.../controller/ClientQuotaControlManager.java | 2 +-
.../kafka/controller/ClusterControlManager.java | 2 +-
.../controller/ConfigurationControlManager.java | 4 +-
.../apache/kafka/controller/ControllerResult.java | 38 ++++++----
.../controller/ControllerResultAndOffset.java | 32 ++++-----
.../kafka/controller/FeatureControlManager.java | 3 +-
.../apache/kafka/controller/QuorumController.java | 23 +++---
.../controller/ReplicationControlManager.java | 12 ++--
.../org/apache/kafka/metalog/LocalLogManager.java | 17 ++++-
.../org/apache/kafka/metalog/MetaLogManager.java | 23 +++++-
.../ConfigurationControlManagerTest.java | 83 ++++++++++++++++------
.../controller/FeatureControlManagerTest.java | 58 ++++++++++-----
.../org/apache/kafka/metalog/LocalLogManager.java | 17 ++++-
.../kafka/raft/metadata/MetaLogRaftShim.java | 28 +++++++-
15 files changed, 268 insertions(+), 103 deletions(-)
diff --git a/core/src/main/scala/kafka/raft/RaftManager.scala b/core/src/main/scala/kafka/raft/RaftManager.scala
index 1881a1d..3b714f3 100644
--- a/core/src/main/scala/kafka/raft/RaftManager.scala
+++ b/core/src/main/scala/kafka/raft/RaftManager.scala
@@ -92,6 +92,11 @@ trait RaftManager[T] {
listener: RaftClient.Listener[T]
): Unit
+ def scheduleAtomicAppend(
+ epoch: Int,
+ records: Seq[T]
+ ): Option[Long]
+
def scheduleAppend(
epoch: Int,
records: Seq[T]
@@ -157,16 +162,32 @@ class KafkaRaftManager[T](
raftClient.register(listener)
}
+ override def scheduleAtomicAppend(
+ epoch: Int,
+ records: Seq[T]
+ ): Option[Long] = {
+ append(epoch, records, true)
+ }
+
override def scheduleAppend(
epoch: Int,
records: Seq[T]
): Option[Long] = {
- val offset: java.lang.Long = raftClient.scheduleAppend(epoch, records.asJava)
- if (offset == null) {
- None
+ append(epoch, records, false)
+ }
+
+ private def append(
+ epoch: Int,
+ records: Seq[T],
+ isAtomic: Boolean
+ ): Option[Long] = {
+ val offset = if (isAtomic) {
+ raftClient.scheduleAtomicAppend(epoch, records.asJava)
} else {
- Some(Long.unbox(offset))
+ raftClient.scheduleAppend(epoch, records.asJava)
}
+
+ Option(offset).map(Long.unbox)
}
override def handleRequest(
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ClientQuotaControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ClientQuotaControlManager.java
index 4aac9e4..9b8e2d6 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ClientQuotaControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ClientQuotaControlManager.java
@@ -86,7 +86,7 @@ public class ClientQuotaControlManager {
}
});
- return new ControllerResult<>(outputRecords, outputResults);
+ return ControllerResult.atomicOf(outputRecords, outputResults);
}
/**
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
index 6e329c7..4748d19 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
@@ -213,7 +213,7 @@ public class ClusterControlManager {
List<ApiMessageAndVersion> records = new ArrayList<>();
records.add(new ApiMessageAndVersion(record, (short) 0));
- return new ControllerResult<>(records, new BrokerRegistrationReply(brokerEpoch));
+ return ControllerResult.of(records, new BrokerRegistrationReply(brokerEpoch));
}
public void replay(RegisterBrokerRecord record) {
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
index 16e58fa..dcfe92d46 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
@@ -83,7 +83,7 @@ public class ConfigurationControlManager {
outputRecords,
outputResults);
}
- return new ControllerResult<>(outputRecords, outputResults);
+ return ControllerResult.atomicOf(outputRecords, outputResults);
}
private void incrementalAlterConfigResource(ConfigResource configResource,
@@ -171,7 +171,7 @@ public class ConfigurationControlManager {
outputRecords,
outputResults);
}
- return new ControllerResult<>(outputRecords, outputResults);
+ return ControllerResult.atomicOf(outputRecords, outputResults);
}
private void legacyAlterConfigResource(ConfigResource configResource,
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ControllerResult.java b/metadata/src/main/java/org/apache/kafka/controller/ControllerResult.java
index 4906c8b..e6ae031 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ControllerResult.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ControllerResult.java
@@ -19,7 +19,7 @@ package org.apache.kafka.controller;
import org.apache.kafka.metadata.ApiMessageAndVersion;
-import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
@@ -28,15 +28,13 @@ import java.util.stream.Collectors;
class ControllerResult<T> {
private final List<ApiMessageAndVersion> records;
private final T response;
+ private final boolean isAtomic;
- public ControllerResult(T response) {
- this(new ArrayList<>(), response);
- }
-
- public ControllerResult(List<ApiMessageAndVersion> records, T response) {
+ protected ControllerResult(List<ApiMessageAndVersion> records, T response, boolean isAtomic) {
Objects.requireNonNull(records);
this.records = records;
this.response = response;
+ this.isAtomic = isAtomic;
}
public List<ApiMessageAndVersion> records() {
@@ -47,6 +45,10 @@ class ControllerResult<T> {
return response;
}
+ public boolean isAtomic() {
+ return isAtomic;
+ }
+
@Override
public boolean equals(Object o) {
if (o == null || (!o.getClass().equals(getClass()))) {
@@ -54,22 +56,34 @@ class ControllerResult<T> {
}
ControllerResult other = (ControllerResult) o;
return records.equals(other.records) &&
- Objects.equals(response, other.response);
+ Objects.equals(response, other.response) &&
+ Objects.equals(isAtomic, other.isAtomic);
}
@Override
public int hashCode() {
- return Objects.hash(records, response);
+ return Objects.hash(records, response, isAtomic);
}
@Override
public String toString() {
- return "ControllerResult(records=" + String.join(",",
- records.stream().map(r -> r.toString()).collect(Collectors.toList())) +
- ", response=" + response + ")";
+ return String.format(
+ "ControllerResult(records=%s, response=%s, isAtomic=%s)",
+ String.join(",", records.stream().map(ApiMessageAndVersion::toString).collect(Collectors.toList())),
+ response,
+ isAtomic
+ );
}
public ControllerResult<T> withoutRecords() {
- return new ControllerResult<>(new ArrayList<>(), response);
+ return new ControllerResult<>(Collections.emptyList(), response, false);
+ }
+
+ public static <T> ControllerResult<T> atomicOf(List<ApiMessageAndVersion> records, T response) {
+ return new ControllerResult<>(records, response, true);
+ }
+
+ public static <T> ControllerResult<T> of(List<ApiMessageAndVersion> records, T response) {
+ return new ControllerResult<>(records, response, false);
}
}
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ControllerResultAndOffset.java b/metadata/src/main/java/org/apache/kafka/controller/ControllerResultAndOffset.java
index 5e483f7..8b8ca8d 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ControllerResultAndOffset.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ControllerResultAndOffset.java
@@ -19,24 +19,15 @@ package org.apache.kafka.controller;
import org.apache.kafka.metadata.ApiMessageAndVersion;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
-class ControllerResultAndOffset<T> extends ControllerResult<T> {
+final class ControllerResultAndOffset<T> extends ControllerResult<T> {
private final long offset;
- public ControllerResultAndOffset(T response) {
- super(new ArrayList<>(), response);
- this.offset = -1;
- }
-
- public ControllerResultAndOffset(long offset,
- List<ApiMessageAndVersion> records,
- T response) {
- super(records, response);
+ private ControllerResultAndOffset(long offset, ControllerResult<T> result) {
+ super(result.records(), result.response(), result.isAtomic());
this.offset = offset;
}
@@ -52,18 +43,27 @@ class ControllerResultAndOffset<T> extends ControllerResult<T> {
ControllerResultAndOffset other = (ControllerResultAndOffset) o;
return records().equals(other.records()) &&
response().equals(other.response()) &&
+ isAtomic() == other.isAtomic() &&
offset == other.offset;
}
@Override
public int hashCode() {
- return Objects.hash(records(), response(), offset);
+ return Objects.hash(records(), response(), isAtomic(), offset);
}
@Override
public String toString() {
- return "ControllerResultAndOffset(records=" + String.join(",",
- records().stream().map(r -> r.toString()).collect(Collectors.toList())) +
- ", response=" + response() + ", offset=" + offset + ")";
+ return String.format(
+ "ControllerResultAndOffset(records=%s, response=%s, isAtomic=%s, offset=%s)",
+ String.join(",", records().stream().map(ApiMessageAndVersion::toString).collect(Collectors.toList())),
+ response(),
+ isAtomic(),
+ offset
+ );
+ }
+
+ public static <T> ControllerResultAndOffset<T> of(long offset, ControllerResult<T> result) {
+ return new ControllerResultAndOffset<>(offset, result);
}
}
diff --git a/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
index 25ff3fd..99874ac 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
@@ -69,7 +69,8 @@ public class FeatureControlManager {
results.put(entry.getKey(), updateFeature(entry.getKey(), entry.getValue(),
downgradeables.contains(entry.getKey()), brokerFeatures, records));
}
- return new ControllerResult<>(records, results);
+
+ return ControllerResult.atomicOf(records, results);
}
private ApiError updateFeature(String featureName,
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 3e5a9a8..6ee1b7e 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -17,7 +17,6 @@
package org.apache.kafka.controller;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -268,7 +267,7 @@ public final class QuorumController implements Controller {
class ControlEvent implements EventQueue.Event {
private final String name;
private final Runnable handler;
- private long eventCreatedTimeNs = time.nanoseconds();
+ private final long eventCreatedTimeNs = time.nanoseconds();
private Optional<Long> startProcessingTimeNs = Optional.empty();
ControlEvent(String name, Runnable handler) {
@@ -310,7 +309,7 @@ public final class QuorumController implements Controller {
private final String name;
private final CompletableFuture<T> future;
private final Supplier<T> handler;
- private long eventCreatedTimeNs = time.nanoseconds();
+ private final long eventCreatedTimeNs = time.nanoseconds();
private Optional<Long> startProcessingTimeNs = Optional.empty();
ControllerReadEvent(String name, Supplier<T> handler) {
@@ -392,7 +391,7 @@ public final class QuorumController implements Controller {
private final String name;
private final CompletableFuture<T> future;
private final ControllerWriteOperation<T> op;
- private long eventCreatedTimeNs = time.nanoseconds();
+ private final long eventCreatedTimeNs = time.nanoseconds();
private Optional<Long> startProcessingTimeNs = Optional.empty();
private ControllerResultAndOffset<T> resultAndOffset;
@@ -426,8 +425,7 @@ public final class QuorumController implements Controller {
if (!maybeOffset.isPresent()) {
// If the purgatory is empty, there are no pending operations and no
// uncommitted state. We can return immediately.
- resultAndOffset = new ControllerResultAndOffset<>(-1,
- new ArrayList<>(), result.response());
+ resultAndOffset = ControllerResultAndOffset.of(-1, result);
log.debug("Completing read-only operation {} immediately because " +
"the purgatory is empty.", this);
complete(null);
@@ -435,8 +433,7 @@ public final class QuorumController implements Controller {
}
// If there are operations in the purgatory, we want to wait for the latest
// one to complete before returning our result to the user.
- resultAndOffset = new ControllerResultAndOffset<>(maybeOffset.get(),
- result.records(), result.response());
+ resultAndOffset = ControllerResultAndOffset.of(maybeOffset.get(), result);
log.debug("Read-only operation {} will be completed when the log " +
"reaches offset {}", this, resultAndOffset.offset());
} else {
@@ -444,11 +441,15 @@ public final class QuorumController implements Controller {
// written before we can return our result to the user. Here, we hand off
// the batch of records to the metadata log manager. They will be written
// out asynchronously.
- long offset = logManager.scheduleWrite(controllerEpoch, result.records());
+ final long offset;
+ if (result.isAtomic()) {
+ offset = logManager.scheduleAtomicWrite(controllerEpoch, result.records());
+ } else {
+ offset = logManager.scheduleWrite(controllerEpoch, result.records());
+ }
op.processBatchEndOffset(offset);
writeOffset = offset;
- resultAndOffset = new ControllerResultAndOffset<>(offset,
- result.records(), result.response());
+ resultAndOffset = ControllerResultAndOffset.of(offset, result);
for (ApiMessageAndVersion message : result.records()) {
replay(message.message(), offset);
}
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index 14ff321..8bc0670 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -440,7 +440,7 @@ public class ReplicationControlManager {
resultsPrefix = ", ";
}
log.info("createTopics result(s): {}", resultsBuilder.toString());
- return new ControllerResult<>(records, data);
+ return ControllerResult.atomicOf(records, data);
}
private ApiError createTopic(CreatableTopic topic,
@@ -721,7 +721,7 @@ public class ReplicationControlManager {
setIsr(partitionData.newIsr()));
}
}
- return new ControllerResult<>(records, response);
+ return ControllerResult.of(records, response);
}
/**
@@ -875,7 +875,7 @@ public class ReplicationControlManager {
setErrorMessage(error.message()));
}
}
- return new ControllerResult<>(records, response);
+ return ControllerResult.of(records, response);
}
static boolean electionIsUnclean(byte electionType) {
@@ -970,7 +970,7 @@ public class ReplicationControlManager {
states.next().fenced(),
states.next().inControlledShutdown(),
states.next().shouldShutDown());
- return new ControllerResult<>(records, reply);
+ return ControllerResult.of(records, reply);
}
int bestLeader(int[] replicas, int[] isr, boolean unclean) {
@@ -999,7 +999,7 @@ public class ReplicationControlManager {
}
List<ApiMessageAndVersion> records = new ArrayList<>();
handleBrokerUnregistered(brokerId, registration.epoch(), records);
- return new ControllerResult<>(records, null);
+ return ControllerResult.of(records, null);
}
ControllerResult<Void> maybeFenceStaleBrokers() {
@@ -1011,6 +1011,6 @@ public class ReplicationControlManager {
handleBrokerFenced(brokerId, records);
heartbeatManager.fence(brokerId);
}
- return new ControllerResult<>(records, null);
+ return ControllerResult.of(records, null);
}
}
diff --git a/metadata/src/main/java/org/apache/kafka/metalog/LocalLogManager.java b/metadata/src/main/java/org/apache/kafka/metalog/LocalLogManager.java
index ef85314..99ae3a7 100644
--- a/metadata/src/main/java/org/apache/kafka/metalog/LocalLogManager.java
+++ b/metadata/src/main/java/org/apache/kafka/metalog/LocalLogManager.java
@@ -328,8 +328,21 @@ public final class LocalLogManager implements MetaLogManager, AutoCloseable {
@Override
public long scheduleWrite(long epoch, List<ApiMessageAndVersion> batch) {
- return shared.tryAppend(nodeId, leader.epoch(), new LocalRecordBatch(
- batch.stream().map(r -> r.message()).collect(Collectors.toList())));
+ return scheduleAtomicWrite(epoch, batch);
+ }
+
+ @Override
+ public long scheduleAtomicWrite(long epoch, List<ApiMessageAndVersion> batch) {
+ return shared.tryAppend(
+ nodeId,
+ leader.epoch(),
+ new LocalRecordBatch(
+ batch
+ .stream()
+ .map(ApiMessageAndVersion::message)
+ .collect(Collectors.toList())
+ )
+ );
}
@Override
diff --git a/metadata/src/main/java/org/apache/kafka/metalog/MetaLogManager.java b/metadata/src/main/java/org/apache/kafka/metalog/MetaLogManager.java
index 67a6ca5..9126245 100644
--- a/metadata/src/main/java/org/apache/kafka/metalog/MetaLogManager.java
+++ b/metadata/src/main/java/org/apache/kafka/metalog/MetaLogManager.java
@@ -50,14 +50,31 @@ public interface MetaLogManager {
* offset before renouncing its leadership. The listener should determine this by
* monitoring the committed offsets.
*
- * @param epoch The controller epoch.
- * @param batch The batch of messages to write.
+ * @param epoch the controller epoch
+ * @param batch the batch of messages to write
*
- * @return The offset of the message.
+ * @return the offset of the last message in the batch
+ * @throws IllegalArgumentException if buffer allocatio failed and the client should backoff
*/
long scheduleWrite(long epoch, List<ApiMessageAndVersion> batch);
/**
+ * Schedule a atomic write to the log.
+ *
+ * The write will be scheduled to happen at some time in the future. All of the messages in batch
+ * will be appended atomically in one batch. The listener may regard the write as successful
+ * if and only if the MetaLogManager reaches the given offset before renouncing its leadership.
+ * The listener should determine this by monitoring the committed offsets.
+ *
+ * @param epoch the controller epoch
+ * @param batch the batch of messages to write
+ *
+ * @return the offset of the last message in the batch
+ * @throws IllegalArgumentException if buffer allocatio failed and the client should backoff
+ */
+ long scheduleAtomicWrite(long epoch, List<ApiMessageAndVersion> batch);
+
+ /**
* Renounce the leadership.
*
* @param epoch The epoch. If this does not match the current epoch, this
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
index 49a5533..561a25b 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
@@ -135,18 +135,42 @@ public class ConfigurationControlManagerTest {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
ConfigurationControlManager manager =
new ConfigurationControlManager(new LogContext(), snapshotRegistry, CONFIGS);
- assertEquals(new ControllerResult<Map<ConfigResource, ApiError>>(Collections.singletonList(
- new ApiMessageAndVersion(new ConfigRecord().
- setResourceType(TOPIC.id()).setResourceName("mytopic").
- setName("abc").setValue("123"), (short) 0)),
- toMap(entry(BROKER0, new ApiError(
- Errors.INVALID_REQUEST, "A DELETE op was given with a non-null value.")),
- entry(MYTOPIC, ApiError.NONE))),
- manager.incrementalAlterConfigs(toMap(entry(BROKER0, toMap(
- entry("foo.bar", entry(DELETE, "abc")),
- entry("quux", entry(SET, "abc")))),
- entry(MYTOPIC, toMap(
- entry("abc", entry(APPEND, "123")))))));
+ assertEquals(
+ ControllerResult.atomicOf(
+ Collections.singletonList(
+ new ApiMessageAndVersion(
+ new ConfigRecord()
+ .setResourceType(TOPIC.id())
+ .setResourceName("mytopic")
+ .setName("abc")
+ .setValue("123"),
+ (short) 0
+ )
+ ),
+ toMap(
+ entry(
+ BROKER0,
+ new ApiError(
+ Errors.INVALID_REQUEST,
+ "A DELETE op was given with a non-null value."
+ )
+ ),
+ entry(MYTOPIC, ApiError.NONE)
+ )
+ ),
+ manager.incrementalAlterConfigs(
+ toMap(
+ entry(
+ BROKER0,
+ toMap(
+ entry("foo.bar", entry(DELETE, "abc")),
+ entry("quux", entry(SET, "abc"))
+ )
+ ),
+ entry(MYTOPIC, toMap(entry("abc", entry(APPEND, "123"))))
+ )
+ )
+ );
}
@Test
@@ -184,20 +208,33 @@ public class ConfigurationControlManagerTest {
new ApiMessageAndVersion(new ConfigRecord().
setResourceType(TOPIC.id()).setResourceName("mytopic").
setName("def").setValue("901"), (short) 0));
- assertEquals(new ControllerResult<Map<ConfigResource, ApiError>>(
+ assertEquals(
+ ControllerResult.atomicOf(
expectedRecords1,
- toMap(entry(MYTOPIC, ApiError.NONE))),
- manager.legacyAlterConfigs(toMap(entry(MYTOPIC, toMap(
- entry("abc", "456"), entry("def", "901"))))));
+ toMap(entry(MYTOPIC, ApiError.NONE))
+ ),
+ manager.legacyAlterConfigs(
+ toMap(entry(MYTOPIC, toMap(entry("abc", "456"), entry("def", "901"))))
+ )
+ );
for (ApiMessageAndVersion message : expectedRecords1) {
manager.replay((ConfigRecord) message.message());
}
- assertEquals(new ControllerResult<Map<ConfigResource, ApiError>>(Arrays.asList(
- new ApiMessageAndVersion(new ConfigRecord().
- setResourceType(TOPIC.id()).setResourceName("mytopic").
- setName("abc").setValue(null), (short) 0)),
- toMap(entry(MYTOPIC, ApiError.NONE))),
- manager.legacyAlterConfigs(toMap(entry(MYTOPIC, toMap(
- entry("def", "901"))))));
+ assertEquals(
+ ControllerResult.atomicOf(
+ Arrays.asList(
+ new ApiMessageAndVersion(
+ new ConfigRecord()
+ .setResourceType(TOPIC.id())
+ .setResourceName("mytopic")
+ .setName("abc")
+ .setValue(null),
+ (short) 0
+ )
+ ),
+ toMap(entry(MYTOPIC, ApiError.NONE))
+ ),
+ manager.legacyAlterConfigs(toMap(entry(MYTOPIC, toMap(entry("def", "901")))))
+ );
}
}
diff --git a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
index 8687cc8..0670984 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
@@ -18,10 +18,8 @@
package org.apache.kafka.controller;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.metadata.FeatureLevelRecord;
@@ -61,11 +59,11 @@ public class FeatureControlManagerTest {
rangeMap("foo", 1, 2), snapshotRegistry);
assertEquals(new FeatureMapAndEpoch(new FeatureMap(Collections.emptyMap()), -1),
manager.finalizedFeatures(-1));
- assertEquals(new ControllerResult<>(Collections.
+ assertEquals(ControllerResult.atomicOf(Collections.emptyList(), Collections.
singletonMap("foo", new ApiError(Errors.INVALID_UPDATE_VERSION,
"The controller does not support the given feature range."))),
manager.updateFeatures(rangeMap("foo", 1, 3),
- new HashSet<>(Arrays.asList("foo")),
+ Collections.singleton("foo"),
Collections.emptyMap()));
ControllerResult<Map<String, ApiError>> result = manager.updateFeatures(
rangeMap("foo", 1, 2, "bar", 1, 1), Collections.emptySet(),
@@ -101,12 +99,24 @@ public class FeatureControlManagerTest {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
FeatureControlManager manager = new FeatureControlManager(
rangeMap("foo", 1, 5, "bar", 1, 2), snapshotRegistry);
- assertEquals(new ControllerResult<>(Collections.
- singletonMap("foo", new ApiError(Errors.INVALID_UPDATE_VERSION,
- "Broker 5 does not support the given feature range."))),
- manager.updateFeatures(rangeMap("foo", 1, 3),
- new HashSet<>(Arrays.asList("foo")),
- Collections.singletonMap(5, rangeMap())));
+
+ assertEquals(
+ ControllerResult.atomicOf(
+ Collections.emptyList(),
+ Collections.singletonMap(
+ "foo",
+ new ApiError(
+ Errors.INVALID_UPDATE_VERSION,
+ "Broker 5 does not support the given feature range."
+ )
+ )
+ ),
+ manager.updateFeatures(
+ rangeMap("foo", 1, 3),
+ Collections.singleton("foo"),
+ Collections.singletonMap(5, rangeMap())
+ )
+ );
ControllerResult<Map<String, ApiError>> result = manager.updateFeatures(
rangeMap("foo", 1, 3), Collections.emptySet(), Collections.emptyMap());
@@ -114,19 +124,31 @@ public class FeatureControlManagerTest {
manager.replay((FeatureLevelRecord) result.records().get(0).message(), 3);
snapshotRegistry.createSnapshot(3);
- assertEquals(new ControllerResult<>(Collections.
+ assertEquals(ControllerResult.atomicOf(Collections.emptyList(), Collections.
singletonMap("foo", new ApiError(Errors.INVALID_UPDATE_VERSION,
"Can't downgrade the maximum version of this feature without " +
"setting downgradable to true."))),
manager.updateFeatures(rangeMap("foo", 1, 2),
Collections.emptySet(), Collections.emptyMap()));
- assertEquals(new ControllerResult<>(
- Collections.singletonList(new ApiMessageAndVersion(new FeatureLevelRecord().
- setName("foo").setMinFeatureLevel((short) 1).setMaxFeatureLevel((short) 2),
- (short) 0)),
- Collections.singletonMap("foo", ApiError.NONE)),
- manager.updateFeatures(rangeMap("foo", 1, 2),
- new HashSet<>(Collections.singletonList("foo")), Collections.emptyMap()));
+ assertEquals(
+ ControllerResult.atomicOf(
+ Collections.singletonList(
+ new ApiMessageAndVersion(
+ new FeatureLevelRecord()
+ .setName("foo")
+ .setMinFeatureLevel((short) 1)
+ .setMaxFeatureLevel((short) 2),
+ (short) 0
+ )
+ ),
+ Collections.singletonMap("foo", ApiError.NONE)
+ ),
+ manager.updateFeatures(
+ rangeMap("foo", 1, 2),
+ Collections.singleton("foo"),
+ Collections.emptyMap()
+ )
+ );
}
}
diff --git a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
index 7b6cf06..590f89c 100644
--- a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
+++ b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
@@ -371,8 +371,21 @@ public final class LocalLogManager implements MetaLogManager, AutoCloseable {
@Override
public long scheduleWrite(long epoch, List<ApiMessageAndVersion> batch) {
- return shared.tryAppend(nodeId, leader.epoch(), new LocalRecordBatch(
- batch.stream().map(r -> r.message()).collect(Collectors.toList())));
+ return scheduleAtomicWrite(epoch, batch);
+ }
+
+ @Override
+ public long scheduleAtomicWrite(long epoch, List<ApiMessageAndVersion> batch) {
+ return shared.tryAppend(
+ nodeId,
+ leader.epoch(),
+ new LocalRecordBatch(
+ batch
+ .stream()
+ .map(ApiMessageAndVersion::message)
+ .collect(Collectors.toList())
+ )
+ );
}
@Override
diff --git a/raft/src/main/java/org/apache/kafka/raft/metadata/MetaLogRaftShim.java b/raft/src/main/java/org/apache/kafka/raft/metadata/MetaLogRaftShim.java
index bf88e7d..1ca63f1 100644
--- a/raft/src/main/java/org/apache/kafka/raft/metadata/MetaLogRaftShim.java
+++ b/raft/src/main/java/org/apache/kafka/raft/metadata/MetaLogRaftShim.java
@@ -53,8 +53,34 @@ public class MetaLogRaftShim implements MetaLogManager {
}
@Override
+ public long scheduleAtomicWrite(long epoch, List<ApiMessageAndVersion> batch) {
+ return write(epoch, batch, true);
+ }
+
+ @Override
public long scheduleWrite(long epoch, List<ApiMessageAndVersion> batch) {
- return client.scheduleAppend((int) epoch, batch);
+ return write(epoch, batch, false);
+ }
+
+ private long write(long epoch, List<ApiMessageAndVersion> batch, boolean isAtomic) {
+ final Long result;
+ if (isAtomic) {
+ result = client.scheduleAtomicAppend((int) epoch, batch);
+ } else {
+ result = client.scheduleAppend((int) epoch, batch);
+ }
+
+ if (result == null) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Unable to alloate a buffer for the schedule write operation: epoch %s, batch %s)",
+ epoch,
+ batch
+ )
+ );
+ } else {
+ return result;
+ }
}
@Override