You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2024/03/07 15:51:11 UTC

(kafka) branch trunk updated: MINOR: Add read/write all operation (#15462)

This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f5c4d522fd7 MINOR: Add read/write all operation (#15462)
f5c4d522fd7 is described below

commit f5c4d522fd79775692441cd75f6f733324f2e7e9
Author: David Jacot <dj...@confluent.io>
AuthorDate: Thu Mar 7 07:51:04 2024 -0800

    MINOR: Add read/write all operation (#15462)
    
    There are a few cases in the group coordinator service where we want to read from or write to each of the known coordinators (each of __consumer_offsets partitions). The current implementation needs to get the list of the known coordinators then schedules the operation and finally aggregate the results. This patch is an attempt to streamline this by adding multi read/write to the runtime.
    
    Reviewers: Omnia Ibrahim <o....@gmail.com>, Chia-Ping Tsai <ch...@gmail.com>
---
 .../coordinator/group/GroupCoordinatorService.java |  60 ++++-----
 .../group/runtime/CoordinatorRuntime.java          |  67 ++++++++--
 .../group/GroupCoordinatorServiceTest.java         | 148 +++++++--------------
 .../group/runtime/CoordinatorRuntimeTest.java      | 110 +++++++++++++++
 .../org/apache/kafka/server/util/FutureUtils.java  |  19 +++
 5 files changed, 254 insertions(+), 150 deletions(-)

diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index a2363b48222..2fdc128c7b9 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -80,7 +80,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.OptionalInt;
 import java.util.Properties;
-import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
@@ -498,29 +497,24 @@ public class GroupCoordinatorService implements GroupCoordinator {
             );
         }
 
-        final Set<TopicPartition> existingPartitionSet = runtime.partitions();
-
-        if (existingPartitionSet.isEmpty()) {
-            return CompletableFuture.completedFuture(new ListGroupsResponseData());
-        }
-
-        final List<CompletableFuture<List<ListGroupsResponseData.ListedGroup>>> futures =
-            new ArrayList<>();
-
-        for (TopicPartition tp : existingPartitionSet) {
-            futures.add(runtime.scheduleReadOperation(
+        final List<CompletableFuture<List<ListGroupsResponseData.ListedGroup>>> futures = FutureUtils.mapExceptionally(
+            runtime.scheduleReadAllOperation(
                 "list-groups",
-                tp,
-                (coordinator, lastCommittedOffset) -> coordinator.listGroups(request.statesFilter(), request.typesFilter(), lastCommittedOffset)
-            ).exceptionally(exception -> {
+                (coordinator, lastCommittedOffset) -> coordinator.listGroups(
+                    request.statesFilter(),
+                    request.typesFilter(),
+                    lastCommittedOffset
+                )
+            ),
+            exception -> {
                 exception = Errors.maybeUnwrapException(exception);
                 if (exception instanceof NotCoordinatorException) {
                     return Collections.emptyList();
                 } else {
                     throw new CompletionException(exception);
                 }
-            }));
-        }
+            }
+        );
 
         return FutureUtils
             .combineFutures(futures, ArrayList::new, List::addAll)
@@ -963,23 +957,21 @@ public class GroupCoordinatorService implements GroupCoordinator {
     ) throws ExecutionException, InterruptedException {
         throwIfNotActive();
 
-        final Set<TopicPartition> existingPartitionSet = runtime.partitions();
-        final List<CompletableFuture<Void>> futures = new ArrayList<>(existingPartitionSet.size());
-
-        existingPartitionSet.forEach(partition -> futures.add(
-            runtime.scheduleWriteOperation(
-                "on-partition-deleted",
-                partition,
-                Duration.ofMillis(config.offsetCommitTimeoutMs),
-                coordinator -> coordinator.onPartitionsDeleted(topicPartitions)
-            ).exceptionally(exception -> {
-                log.error("Could not delete offsets for deleted partitions {} in coordinator {} due to: {}.",
-                    partition, partition, exception.getMessage(), exception);
-                return null;
-            })
-        ));
-
-        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();
+        CompletableFuture.allOf(
+            FutureUtils.mapExceptionally(
+                runtime.scheduleWriteAllOperation(
+                    "on-partition-deleted",
+                    Duration.ofMillis(config.offsetCommitTimeoutMs),
+                    coordinator -> coordinator.onPartitionsDeleted(topicPartitions)
+                ),
+                exception -> {
+                    log.error("Could not delete offsets for deleted partitions {} due to: {}.",
+                        topicPartitions, exception.getMessage(), exception
+                    );
+                    return null;
+                }
+            ).toArray(new CompletableFuture[0])
+        ).get();
     }
 
     /**
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java
index ccb4caf04f9..6b98a51dd47 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java
@@ -41,12 +41,11 @@ import org.slf4j.Logger;
 
 import java.time.Duration;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.OptionalInt;
 import java.util.OptionalLong;
-import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.RejectedExecutionException;
@@ -54,6 +53,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Consumer;
+import java.util.stream.Collectors;
 
 /**
  * The CoordinatorRuntime provides a framework to implement coordinators such as the group coordinator
@@ -1446,6 +1446,32 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
         return event.future;
     }
 
+    /**
+     * Schedule a write operation for each coordinator.
+     *
+     * @param name      The name of the write operation.
+     * @param timeout   The write operation timeout.
+     * @param op        The write operation.
+     *
+     * @return A list of futures where each future will be completed with the result of the write operation
+     * when the operation is completed or an exception if the write operation failed.
+     *
+     * @param <T> The type of the result.
+     */
+    public <T> List<CompletableFuture<T>> scheduleWriteAllOperation(
+        String name,
+        Duration timeout,
+        CoordinatorWriteOperation<S, T, U> op
+    ) {
+        throwIfNotRunning();
+        log.debug("Scheduled execution of write all operation {}.", name);
+        return coordinators
+            .keySet()
+            .stream()
+            .map(tp -> scheduleWriteOperation(name, tp, timeout, op))
+            .collect(Collectors.toList());
+    }
+
     /**
      * Schedules a transactional write operation.
      *
@@ -1535,12 +1561,12 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
     /**
      * Schedules a read operation.
      *
-     * @param name  The name of the write operation.
+     * @param name  The name of the read operation.
      * @param tp    The address of the coordinator (aka its topic-partitions).
      * @param op    The read operation.
      *
      * @return A future that will be completed with the result of the read operation
-     * when the operation is completed or an exception if the write operation failed.
+     * when the operation is completed or an exception if the read operation failed.
      *
      * @param <T> The type of the result.
      */
@@ -1556,6 +1582,30 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
         return event.future;
     }
 
+    /**
+     * Schedules a read operation for each coordinator.
+     *
+     * @param name  The name of the read operation.
+     * @param op    The read operation.
+     *
+     * @return A list of futures where each future will be completed with the result of the read operation
+     * when the operation is completed or an exception if the read operation failed.
+     *
+     * @param <T> The type of the result.
+     */
+    public <T> List<CompletableFuture<T>> scheduleReadAllOperation(
+        String name,
+        CoordinatorReadOperation<S, T> op
+    ) {
+        throwIfNotRunning();
+        log.debug("Scheduled execution of read all operation {}.", name);
+        return coordinators
+            .keySet()
+            .stream()
+            .map(tp -> scheduleReadOperation(name, tp, op))
+            .collect(Collectors.toList());
+    }
+
     /**
      * Schedules an internal event.
      *
@@ -1572,15 +1622,6 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
         enqueue(new CoordinatorInternalEvent(name, tp, op));
     }
 
-    /**
-     * @return The topic partitions of the coordinators currently registered in the
-     * runtime.
-     */
-    public Set<TopicPartition> partitions() {
-        throwIfNotRunning();
-        return new HashSet<>(coordinators.keySet());
-    }
-
     /**
      * Schedules the loading of a coordinator. This is called when the broker is elected as
      * the leader for a partition.
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
index 2404d304bec..0d93f450686 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
@@ -77,7 +77,6 @@ import org.junit.jupiter.params.provider.MethodSource;
 import org.junit.jupiter.params.provider.NullSource;
 import org.junit.jupiter.params.provider.ValueSource;
 import org.mockito.ArgumentMatchers;
-import org.mockito.internal.util.collections.Sets;
 
 import java.net.InetAddress;
 import java.time.Duration;
@@ -91,8 +90,6 @@ import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
 import static org.apache.kafka.common.requests.JoinGroupRequest.UNKNOWN_MEMBER_ID;
@@ -759,10 +756,7 @@ public class GroupCoordinatorServiceTest {
             runtime,
             new GroupCoordinatorMetrics()
         );
-        int partitionCount = 3;
-        service.startup(() -> partitionCount);
-
-        ListGroupsRequestData request = new ListGroupsRequestData();
+        service.startup(() -> 3);
 
         List<ListGroupsResponseData.ListedGroup> expectedResults = Arrays.asList(
             new ListGroupsResponseData.ListedGroup()
@@ -781,26 +775,22 @@ public class GroupCoordinatorServiceTest {
                 .setGroupState("Dead")
                 .setGroupType("consumer")
         );
-        when(runtime.partitions()).thenReturn(Sets.newSet(
-            new TopicPartition("__consumer_offsets", 0),
-            new TopicPartition("__consumer_offsets", 1),
-            new TopicPartition("__consumer_offsets", 2)
+
+        when(runtime.scheduleReadAllOperation(
+            ArgumentMatchers.eq("list-groups"),
+            ArgumentMatchers.any()
+        )).thenReturn(Arrays.asList(
+            CompletableFuture.completedFuture(Collections.singletonList(expectedResults.get(0))),
+            CompletableFuture.completedFuture(Collections.singletonList(expectedResults.get(1))),
+            CompletableFuture.completedFuture(Collections.singletonList(expectedResults.get(2)))
         ));
-        for (int i = 0; i < partitionCount; i++) {
-            when(runtime.scheduleReadOperation(
-                ArgumentMatchers.eq("list-groups"),
-                ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", i)),
-                ArgumentMatchers.any()
-            )).thenReturn(CompletableFuture.completedFuture(Collections.singletonList(expectedResults.get(i))));
-        }
 
         CompletableFuture<ListGroupsResponseData> responseFuture = service.listGroups(
             requestContext(ApiKeys.LIST_GROUPS),
-            request
+            new ListGroupsRequestData()
         );
 
-        List<ListGroupsResponseData.ListedGroup> actualResults = responseFuture.get(5, TimeUnit.SECONDS).groups();
-        assertEquals(expectedResults, actualResults);
+        assertEquals(expectedResults, responseFuture.get(5, TimeUnit.SECONDS).groups());
     }
 
     @Test
@@ -813,8 +803,7 @@ public class GroupCoordinatorServiceTest {
             runtime,
             new GroupCoordinatorMetrics()
         );
-        int partitionCount = 3;
-        service.startup(() -> partitionCount);
+        service.startup(() -> 3);
 
         List<ListGroupsResponseData.ListedGroup> expectedResults = Arrays.asList(
             new ListGroupsResponseData.ListedGroup()
@@ -829,36 +818,25 @@ public class GroupCoordinatorServiceTest {
                 .setGroupType("consumer")
         );
 
-        ListGroupsRequestData request = new ListGroupsRequestData();
-        when(runtime.partitions()).thenReturn(Sets.newSet(
-            new TopicPartition("__consumer_offsets", 0),
-            new TopicPartition("__consumer_offsets", 1),
-            new TopicPartition("__consumer_offsets", 2)
-        ));
-        for (int i = 0; i < 2; i++) {
-            when(runtime.scheduleReadOperation(
-                ArgumentMatchers.eq("list-groups"),
-                ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", i)),
-                ArgumentMatchers.any()
-            )).thenReturn(CompletableFuture.completedFuture(Collections.singletonList(expectedResults.get(i))));
-        }
-
-        when(runtime.scheduleReadOperation(
+        when(runtime.scheduleReadAllOperation(
             ArgumentMatchers.eq("list-groups"),
-            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 2)),
             ArgumentMatchers.any()
-        )).thenReturn(FutureUtils.failedFuture(new NotCoordinatorException("")));
+        )).thenReturn(Arrays.asList(
+            CompletableFuture.completedFuture(Collections.singletonList(expectedResults.get(0))),
+            CompletableFuture.completedFuture(Collections.singletonList(expectedResults.get(1))),
+            FutureUtils.failedFuture(new NotCoordinatorException(""))
+        ));
 
         CompletableFuture<ListGroupsResponseData> responseFuture = service.listGroups(
             requestContext(ApiKeys.LIST_GROUPS),
-            request
+            new ListGroupsRequestData()
         );
-        List<ListGroupsResponseData.ListedGroup> actualResults = responseFuture.get(5, TimeUnit.SECONDS).groups();
-        assertEquals(expectedResults, actualResults);
+
+        assertEquals(expectedResults, responseFuture.get(5, TimeUnit.SECONDS).groups());
     }
 
     @Test
-    public void testListGroupsFailedImmediately()
+    public void testListGroupsWithFailure()
         throws InterruptedException, ExecutionException, TimeoutException {
         CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime();
         GroupCoordinatorService service = new GroupCoordinatorService(
@@ -867,37 +845,27 @@ public class GroupCoordinatorServiceTest {
             runtime,
             new GroupCoordinatorMetrics()
         );
-        int partitionCount = 3;
-        service.startup(() -> partitionCount);
-
-        ListGroupsRequestData request = new ListGroupsRequestData();
-        when(runtime.partitions()).thenReturn(Sets.newSet(
-            new TopicPartition("__consumer_offsets", 0),
-            new TopicPartition("__consumer_offsets", 1),
-            new TopicPartition("__consumer_offsets", 2)
-        ));
-        for (int i = 0; i < 2; i++) {
-            when(runtime.scheduleReadOperation(
-                ArgumentMatchers.eq("list-groups"),
-                ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", i)),
-                ArgumentMatchers.any()
-            )).thenReturn(CompletableFuture.completedFuture(Collections.emptyList()));
-        }
+        service.startup(() -> 3);
 
-        when(runtime.scheduleReadOperation(
+        when(runtime.scheduleReadAllOperation(
             ArgumentMatchers.eq("list-groups"),
-            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 2)),
             ArgumentMatchers.any()
-        )).thenReturn(FutureUtils.failedFuture(new CoordinatorLoadInProgressException("")));
+        )).thenReturn(Arrays.asList(
+            CompletableFuture.completedFuture(Collections.emptyList()),
+            CompletableFuture.completedFuture(Collections.emptyList()),
+            FutureUtils.failedFuture(new CoordinatorLoadInProgressException(""))
+        ));
 
         CompletableFuture<ListGroupsResponseData> responseFuture = service.listGroups(
             requestContext(ApiKeys.LIST_GROUPS),
-            request
+            new ListGroupsRequestData()
         );
-        ListGroupsResponseData listGroupsResponseData = responseFuture.get(5, TimeUnit.SECONDS);
 
-        assertEquals(Errors.COORDINATOR_LOAD_IN_PROGRESS.code(), listGroupsResponseData.errorCode());
-        assertEquals(Collections.emptyList(), listGroupsResponseData.groups());
+        assertEquals(
+            new ListGroupsResponseData()
+                .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code()),
+            responseFuture.get(5, TimeUnit.SECONDS)
+        );
     }
 
     @Test
@@ -1703,12 +1671,6 @@ public class GroupCoordinatorServiceTest {
             result1.duplicate()
         ));
 
-        when(runtime.partitions()).thenReturn(Sets.newSet(
-            new TopicPartition("__consumer_offsets", 0),
-            new TopicPartition("__consumer_offsets", 1),
-            new TopicPartition("__consumer_offsets", 2)
-        ));
-
         when(runtime.scheduleWriteOperation(
             ArgumentMatchers.eq("delete-groups"),
             ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 2)),
@@ -2067,7 +2029,6 @@ public class GroupCoordinatorServiceTest {
 
     @Test
     public void testOnPartitionsDeleted() {
-        int partitionCount = 3;
         CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime();
         GroupCoordinatorService service = new GroupCoordinatorService(
             new LogContext(),
@@ -2075,36 +2036,17 @@ public class GroupCoordinatorServiceTest {
             runtime,
             new GroupCoordinatorMetrics()
         );
+        service.startup(() -> 3);
 
-        service.startup(() -> partitionCount);
-
-        when(runtime.partitions()).thenReturn(
-            IntStream
-                .range(0, partitionCount)
-                .mapToObj(i -> new TopicPartition("__consumer_offsets", i))
-                .collect(Collectors.toSet())
-        );
-
-        List<CompletableFuture<Void>> futures = IntStream
-            .range(0, partitionCount)
-            .mapToObj(__ -> new CompletableFuture<Void>())
-            .collect(Collectors.toList());
-
-        IntStream.range(0, partitionCount).forEach(i -> {
-            CompletableFuture<Void> future = futures.get(i);
-            when(runtime.scheduleWriteOperation(
-                ArgumentMatchers.eq("on-partition-deleted"),
-                ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", i)),
-                ArgumentMatchers.eq(Duration.ofMillis(5000)),
-                ArgumentMatchers.any()
-            )).thenAnswer(__ -> future);
-        });
-
-        IntStream.range(0, partitionCount - 1).forEach(i -> {
-            futures.get(i).complete(null);
-        });
-
-        futures.get(partitionCount - 1).completeExceptionally(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception());
+        when(runtime.scheduleWriteAllOperation(
+            ArgumentMatchers.eq("on-partition-deleted"),
+            ArgumentMatchers.eq(Duration.ofMillis(5000)),
+            ArgumentMatchers.any()
+        )).thenReturn(Arrays.asList(
+            CompletableFuture.completedFuture(null),
+            CompletableFuture.completedFuture(null),
+            FutureUtils.failedFuture(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception())
+        ));
 
         // The exception is logged and swallowed.
         assertDoesNotThrow(() ->
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java
index ba1a340e8e1..4e10978d357 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java
@@ -43,6 +43,7 @@ import org.junit.jupiter.params.provider.EnumSource;
 import org.mockito.ArgumentMatcher;
 
 import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
@@ -1145,6 +1146,63 @@ public class CoordinatorRuntimeTest {
         assertFutureThrows(timedOutWrite, org.apache.kafka.common.errors.TimeoutException.class);
     }
 
+    @Test
+    public void testScheduleWriteAllOperation() throws ExecutionException, InterruptedException, TimeoutException {
+        MockTimer timer = new MockTimer();
+        MockPartitionWriter writer = new MockPartitionWriter();
+
+        CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+                .withTime(timer.time())
+                .withTimer(timer)
+                .withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT)
+                .withLoader(new MockCoordinatorLoader())
+                .withEventProcessor(new DirectEventProcessor())
+                .withPartitionWriter(writer)
+                .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
+                .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
+                .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+                .build();
+
+        TopicPartition coordinator0 = new TopicPartition("__consumer_offsets", 0);
+        TopicPartition coordinator1 = new TopicPartition("__consumer_offsets", 1);
+        TopicPartition coordinator2 = new TopicPartition("__consumer_offsets", 2);
+
+        // Load coordinators.
+        runtime.scheduleLoadOperation(coordinator0, 10);
+        runtime.scheduleLoadOperation(coordinator1, 10);
+        runtime.scheduleLoadOperation(coordinator2, 10);
+
+        // Writes.
+        AtomicInteger cnt = new AtomicInteger(0);
+        List<CompletableFuture<List<String>>> writes = runtime.scheduleWriteAllOperation("write", DEFAULT_WRITE_TIMEOUT, state -> {
+            int counter = cnt.getAndIncrement();
+            return new CoordinatorResult<>(
+                Collections.singletonList("record#" + counter),
+                Collections.singletonList("response#" + counter)
+            );
+        });
+
+        assertEquals(1L, runtime.contextOrThrow(coordinator0).coordinator.lastWrittenOffset());
+        assertEquals(1L, runtime.contextOrThrow(coordinator1).coordinator.lastWrittenOffset());
+        assertEquals(1L, runtime.contextOrThrow(coordinator2).coordinator.lastWrittenOffset());
+
+        assertEquals(Collections.singletonList(InMemoryPartitionWriter.LogEntry.value("record#0")), writer.entries(coordinator0));
+        assertEquals(Collections.singletonList(InMemoryPartitionWriter.LogEntry.value("record#1")), writer.entries(coordinator1));
+        assertEquals(Collections.singletonList(InMemoryPartitionWriter.LogEntry.value("record#2")), writer.entries(coordinator2));
+
+        // Commit.
+        writer.commit(coordinator0);
+        writer.commit(coordinator1);
+        writer.commit(coordinator2);
+
+        // Verify.
+        assertEquals(
+            Arrays.asList("response#0", "response#1", "response#2"),
+            FutureUtils.combineFutures(writes, ArrayList::new, List::addAll).get(5, TimeUnit.SECONDS)
+        );
+    }
+
     @Test
     public void testScheduleTransactionalWriteOp() {
         MockTimer timer = new MockTimer();
@@ -1743,6 +1801,58 @@ public class CoordinatorRuntimeTest {
         assertFutureThrows(read, IllegalArgumentException.class);
     }
 
+    @Test
+    public void testScheduleReadAllOp() throws ExecutionException, InterruptedException, TimeoutException {
+        MockTimer timer = new MockTimer();
+        MockPartitionWriter writer = new MockPartitionWriter();
+
+        CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+                .withTime(timer.time())
+                .withTimer(timer)
+                .withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT)
+                .withLoader(new MockCoordinatorLoader())
+                .withEventProcessor(new DirectEventProcessor())
+                .withPartitionWriter(writer)
+                .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
+                .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
+                .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+                .build();
+
+        TopicPartition coordinator0 = new TopicPartition("__consumer_offsets", 0);
+        TopicPartition coordinator1 = new TopicPartition("__consumer_offsets", 1);
+        TopicPartition coordinator2 = new TopicPartition("__consumer_offsets", 2);
+
+        // Loads the coordinators.
+        runtime.scheduleLoadOperation(coordinator0, 10);
+        runtime.scheduleLoadOperation(coordinator1, 10);
+        runtime.scheduleLoadOperation(coordinator2, 10);
+
+        // Writes
+        runtime.scheduleWriteOperation("write#0", coordinator0, DEFAULT_WRITE_TIMEOUT,
+            state -> new CoordinatorResult<>(Collections.singletonList("record0"), "response0"));
+        runtime.scheduleWriteOperation("write#1", coordinator1, DEFAULT_WRITE_TIMEOUT,
+            state -> new CoordinatorResult<>(Collections.singletonList("record1"), "response1"));
+        runtime.scheduleWriteOperation("write#2", coordinator2, DEFAULT_WRITE_TIMEOUT,
+            state -> new CoordinatorResult<>(Collections.singletonList("record2"), "response2"));
+
+        // Commit writes.
+        writer.commit(coordinator0);
+        writer.commit(coordinator1);
+        writer.commit(coordinator2);
+
+        // Read.
+        List<CompletableFuture<List<String>>> responses = runtime.scheduleReadAllOperation(
+            "read",
+            (state, offset) -> new ArrayList<>(state.records)
+        );
+
+        assertEquals(
+            Arrays.asList("record0", "record1", "record2"),
+            FutureUtils.combineFutures(responses, ArrayList::new, List::addAll).get(5, TimeUnit.SECONDS)
+        );
+    }
+
     @Test
     public void testClose() throws Exception {
         MockCoordinatorLoader loader = spy(new MockCoordinatorLoader());
diff --git a/server-common/src/main/java/org/apache/kafka/server/util/FutureUtils.java b/server-common/src/main/java/org/apache/kafka/server/util/FutureUtils.java
index a3f78a84b17..a95716407b4 100644
--- a/server-common/src/main/java/org/apache/kafka/server/util/FutureUtils.java
+++ b/server-common/src/main/java/org/apache/kafka/server/util/FutureUtils.java
@@ -19,11 +19,13 @@ package org.apache.kafka.server.util;
 import org.apache.kafka.common.utils.Time;
 import org.slf4j.Logger;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
 import java.util.function.BiConsumer;
+import java.util.function.Function;
 import java.util.function.Supplier;
 
 
@@ -125,4 +127,21 @@ public class FutureUtils {
             return res;
         });
     }
+
+    /**
+     * Applies the given exception handler to all the futures provided in the list
+     * and returns a new list of futures.
+     *
+     * @param futures   A list of futures.
+     * @param fn        A function taking an exception to handle it.
+     * @return A list of futures.
+     */
+    public static <T> List<CompletableFuture<T>> mapExceptionally(
+        List<CompletableFuture<T>> futures,
+        Function<Throwable, ? extends T> fn
+    ) {
+        final List<CompletableFuture<T>> results = new ArrayList<>(futures.size());
+        futures.forEach(future -> results.add(future.exceptionally(fn)));
+        return results;
+    }
 }