You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/10/21 00:51:37 UTC

[GitHub] [kafka] guozhangwang commented on a change in pull request #9418: KAFKA-10601; Add support for append linger to Raft implementation

guozhangwang commented on a change in pull request #9418:
URL: https://github.com/apache/kafka/pull/9418#discussion_r508910810



##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/DataOutputWritable.java
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.utils.ByteUtils;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class DataOutputWritable implements Writable {

Review comment:
       nit: not clear why we want an intermediate class instead of just having a `DataOutputStreamWritable` class and its constructor parameter declared as `DataOutputStream`?

##########
File path: core/src/main/scala/kafka/common/RecordValidationException.scala
##########
@@ -23,5 +23,6 @@ import org.apache.kafka.common.requests.ProduceResponse.RecordError
 import scala.collection.Seq
 
 class RecordValidationException(val invalidException: ApiException,
-                                val recordErrors: Seq[RecordError]) extends RuntimeException {
+                                val recordErrors: Seq[RecordError])
+  extends RuntimeException(invalidException) {

Review comment:
       I vaguely remember @rajinisivaram has another PR which may be overlapping with this fix, will let her to check and confirm if it is fine.

##########
File path: core/src/main/scala/kafka/tools/TestRaftServer.scala
##########
@@ -19,37 +19,44 @@ package kafka.tools
 
 import java.io.File
 import java.nio.file.Files
-import java.util.concurrent.CountDownLatch
-import java.util.{Properties, Random}
+import java.util
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+import java.util.{Collections, OptionalInt, Random}
 
-import joptsimple.OptionParser
+import com.yammer.metrics.core.MetricName
+import joptsimple.OptionException
 import kafka.log.{Log, LogConfig, LogManager}
 import kafka.network.SocketServer
 import kafka.raft.{KafkaFuturePurgatory, KafkaMetadataLog, KafkaNetworkChannel}
 import kafka.security.CredentialProvider
 import kafka.server.{BrokerTopicStats, KafkaConfig, KafkaRequestHandlerPool, KafkaServer, LogDirFailureChannel}
+import kafka.tools.TestRaftServer.{ByteArraySerde, PendingAppend, ThroughputThrottler, WriteStats}

Review comment:
       Why we need to import the inner classes?

##########
File path: core/src/main/scala/kafka/tools/TestRaftServer.scala
##########
@@ -272,7 +291,79 @@ class TestRaftServer(val config: KafkaConfig) extends Logging {
     )
   }
 
-  class RaftIoThread(client: KafkaRaftClient) extends ShutdownableThread("raft-io-thread") {
+  class RaftWorkloadGenerator(
+    client: KafkaRaftClient[Array[Byte]],
+    time: Time,
+    brokerId: Int,
+    recordsPerSec: Int,
+    recordSize: Int
+  ) extends ShutdownableThread(name = "raft-workload-generator") with RaftClient.Listener[Array[Byte]] {
+
+    private val stats = new WriteStats(time, printIntervalMs = 5000)
+    private val payload = new Array[Byte](recordSize)
+    private val pendingAppends = new util.ArrayDeque[PendingAppend]()
+
+    private var latestLeaderAndEpoch = new LeaderAndEpoch(OptionalInt.empty, 0)
+    private var isLeader = false
+    private var throttler: ThroughputThrottler = _
+    private var recordCount = 0
+
+    override def doWork(): Unit = {
+      if (latestLeaderAndEpoch != client.currentLeaderAndEpoch()) {
+        latestLeaderAndEpoch = client.currentLeaderAndEpoch()
+        isLeader = latestLeaderAndEpoch.leaderId.orElse(-1) == brokerId
+        if (isLeader) {
+          pendingAppends.clear()
+          throttler = new ThroughputThrottler(time, recordsPerSec)
+          recordCount = 0
+        }
+      }
+
+      if (isLeader) {
+        recordCount += 1
+
+        val startTimeMs = time.milliseconds()
+        val sendTimeMs = if (throttler.maybeThrottle(recordCount, startTimeMs)) {
+          time.milliseconds()
+        } else {
+          startTimeMs
+        }
+
+        val offset = client.scheduleAppend(latestLeaderAndEpoch.epoch, Collections.singletonList(payload))
+        if (offset == null || offset == Long.MaxValue) {
+          time.sleep(10)
+        } else {
+          pendingAppends.offer(PendingAppend(latestLeaderAndEpoch.epoch, offset, sendTimeMs))
+        }
+      } else {
+        time.sleep(500)
+      }
+    }
+
+    override def handleCommit(epoch: Int, lastOffset: Long, records: util.List[Array[Byte]]): Unit = {
+      var offset = lastOffset - records.size() + 1
+      val currentTimeMs = time.milliseconds()
+
+      for (record <- records.asScala) {
+        val pendingAppend = pendingAppends.poll()
+        if (pendingAppend.epoch != epoch || pendingAppend.offset!= offset) {
+          warn(s"Expected next commit at offset ${pendingAppend.offset}, " +

Review comment:
       Should this ever happen? If not, then we could shutdown the server after logging the error.

##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -1605,100 +1708,18 @@ public void poll() throws IOException {
         }
     }
 
-    private void failPendingAppends(KafkaException exception) {
-        for (UnwrittenAppend unwrittenAppend : unwrittenAppends) {
-            unwrittenAppend.fail(exception);
-        }
-        unwrittenAppends.clear();
-    }
-
-    private void pollPendingAppends(LeaderState state, long currentTimeMs) {
-        int numAppends = 0;
-        int maxNumAppends = unwrittenAppends.size();
-
-        while (!unwrittenAppends.isEmpty() && numAppends < maxNumAppends) {
-            final UnwrittenAppend unwrittenAppend = unwrittenAppends.poll();
-
-            if (unwrittenAppend.future.isDone())
-                continue;
-
-            if (unwrittenAppend.isTimedOut(currentTimeMs)) {
-                unwrittenAppend.fail(new TimeoutException("Request timeout " + unwrittenAppend.requestTimeoutMs
-                    + " expired before the records could be appended to the log"));
-            } else {
-                int epoch = quorum.epoch();
-                LogAppendInfo info = appendAsLeader(unwrittenAppend.records);
-                OffsetAndEpoch offsetAndEpoch = new OffsetAndEpoch(info.lastOffset, epoch);
-                long numRecords = info.lastOffset - info.firstOffset + 1;
-                logger.debug("Completed write of {} records at {}", numRecords, offsetAndEpoch);
-
-                if (unwrittenAppend.ackMode == AckMode.LEADER) {
-                    unwrittenAppend.complete(offsetAndEpoch);
-                } else if (unwrittenAppend.ackMode == AckMode.QUORUM) {
-                    CompletableFuture<Long> future = appendPurgatory.await(
-                        LogOffset.awaitCommitted(offsetAndEpoch.offset),
-                        unwrittenAppend.requestTimeoutMs);
-
-                    future.whenComplete((completionTimeMs, exception) -> {
-                        if (exception != null) {
-                            logger.error("Failed to commit append at {} due to {}", offsetAndEpoch, exception);
-
-                            unwrittenAppend.fail(exception);
-                        } else {
-                            long elapsedTime = Math.max(0, completionTimeMs - currentTimeMs);
-                            double elapsedTimePerRecord = (double) elapsedTime / numRecords;
-                            kafkaRaftMetrics.updateCommitLatency(elapsedTimePerRecord, currentTimeMs);
-                            unwrittenAppend.complete(offsetAndEpoch);
-
-                            logger.debug("Completed commit of {} records at {}", numRecords, offsetAndEpoch);
-                        }
-                    });
-                }
-            }
-
-            numAppends++;
-        }
-
-        if (numAppends > 0) {
-            flushLeaderLog(state, currentTimeMs);
-        }
-    }
-
-    /**
-     * Append a set of records to the log. Successful completion of the future indicates a success of
-     * the append, with the uncommitted base offset and epoch.
-     *
-     * @param records The records to write to the log
-     * @param ackMode The commit mode for the appended records
-     * @param timeoutMs The maximum time to wait for the append operation to complete (including
-     *                  any time needed for replication)
-     * @return The uncommitted base offset and epoch of the appended records
-     */
     @Override
-    public CompletableFuture<OffsetAndEpoch> append(
-        Records records,
-        AckMode ackMode,
-        long timeoutMs
-    ) {
-        if (records.sizeInBytes() == 0)
-            throw new IllegalArgumentException("Attempt to append empty record set");
-
-        if (shutdown.get() != null)
-            throw new IllegalStateException("Cannot append records while we are shutting down");
-
-        if (quorum.isObserver())
-            throw new IllegalStateException("Illegal attempt to write to an observer");
-
-        CompletableFuture<OffsetAndEpoch> future = new CompletableFuture<>();
-        UnwrittenAppend unwrittenAppend = new UnwrittenAppend(
-            records, time.milliseconds(), timeoutMs, ackMode, future);
+    public Long scheduleAppend(int epoch, List<T> records) {
+        BatchAccumulator<T> accumulator = this.accumulator;
+        if (accumulator == null) {
+            return Long.MAX_VALUE;

Review comment:
       Why return MAX_VALUE instead of null here? If we want to use `null` to indicate `memory full` and use `MAX_VALUE` to indicate `not leader`, the javadoc should reflecting this.
   
   Anyways, I think returning sth like a `combo(Offset, ErrorCode, backoffMs)` would be preferred in the end state.

##########
File path: core/src/main/scala/kafka/tools/TestRaftRequestHandler.scala
##########
@@ -56,73 +47,8 @@ class TestRaftRequestHandler(
              | ApiKeys.END_QUORUM_EPOCH
              | ApiKeys.FETCH =>
           val requestBody = request.body[AbstractRequest]
-          networkChannel.postInboundRequest(
-            request.header,
-            requestBody,
-            response => sendResponse(request, Some(response)))
-
-        case ApiKeys.API_VERSIONS =>

Review comment:
       Could you elaborate a bit why we would not expect these api keys in the test handler any more?

##########
File path: core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala
##########
@@ -216,11 +216,9 @@ class KafkaNetworkChannel(time: Time,
     endpoints.put(id, node)
   }
 
-  def postInboundRequest(header: RequestHeader,
-                         request: AbstractRequest,
-                         onResponseReceived: ResponseHandler): Unit = {
+  def postInboundRequest(request: AbstractRequest, onResponseReceived: ResponseHandler): Unit = {

Review comment:
       nit: maybe add a comment indicating this function is for testing only (and hence we do not care about the correlation id).

##########
File path: core/src/main/scala/kafka/tools/TestRaftServer.scala
##########
@@ -272,7 +291,79 @@ class TestRaftServer(val config: KafkaConfig) extends Logging {
     )
   }
 
-  class RaftIoThread(client: KafkaRaftClient) extends ShutdownableThread("raft-io-thread") {
+  class RaftWorkloadGenerator(
+    client: KafkaRaftClient[Array[Byte]],
+    time: Time,
+    brokerId: Int,
+    recordsPerSec: Int,
+    recordSize: Int
+  ) extends ShutdownableThread(name = "raft-workload-generator") with RaftClient.Listener[Array[Byte]] {
+
+    private val stats = new WriteStats(time, printIntervalMs = 5000)
+    private val payload = new Array[Byte](recordSize)
+    private val pendingAppends = new util.ArrayDeque[PendingAppend]()
+
+    private var latestLeaderAndEpoch = new LeaderAndEpoch(OptionalInt.empty, 0)
+    private var isLeader = false
+    private var throttler: ThroughputThrottler = _
+    private var recordCount = 0
+
+    override def doWork(): Unit = {
+      if (latestLeaderAndEpoch != client.currentLeaderAndEpoch()) {
+        latestLeaderAndEpoch = client.currentLeaderAndEpoch()
+        isLeader = latestLeaderAndEpoch.leaderId.orElse(-1) == brokerId
+        if (isLeader) {
+          pendingAppends.clear()
+          throttler = new ThroughputThrottler(time, recordsPerSec)
+          recordCount = 0
+        }
+      }
+
+      if (isLeader) {
+        recordCount += 1
+
+        val startTimeMs = time.milliseconds()
+        val sendTimeMs = if (throttler.maybeThrottle(recordCount, startTimeMs)) {
+          time.milliseconds()
+        } else {
+          startTimeMs
+        }
+
+        val offset = client.scheduleAppend(latestLeaderAndEpoch.epoch, Collections.singletonList(payload))
+        if (offset == null || offset == Long.MaxValue) {

Review comment:
       If `offset == Long.MaxValue` would sleeping save us anything? Should we terminate the testing server then?

##########
File path: core/src/main/scala/kafka/tools/TestRaftServer.scala
##########
@@ -272,7 +291,79 @@ class TestRaftServer(val config: KafkaConfig) extends Logging {
     )
   }
 
-  class RaftIoThread(client: KafkaRaftClient) extends ShutdownableThread("raft-io-thread") {
+  class RaftWorkloadGenerator(
+    client: KafkaRaftClient[Array[Byte]],
+    time: Time,
+    brokerId: Int,
+    recordsPerSec: Int,
+    recordSize: Int
+  ) extends ShutdownableThread(name = "raft-workload-generator") with RaftClient.Listener[Array[Byte]] {
+
+    private val stats = new WriteStats(time, printIntervalMs = 5000)
+    private val payload = new Array[Byte](recordSize)
+    private val pendingAppends = new util.ArrayDeque[PendingAppend]()
+
+    private var latestLeaderAndEpoch = new LeaderAndEpoch(OptionalInt.empty, 0)
+    private var isLeader = false
+    private var throttler: ThroughputThrottler = _
+    private var recordCount = 0
+
+    override def doWork(): Unit = {
+      if (latestLeaderAndEpoch != client.currentLeaderAndEpoch()) {
+        latestLeaderAndEpoch = client.currentLeaderAndEpoch()
+        isLeader = latestLeaderAndEpoch.leaderId.orElse(-1) == brokerId
+        if (isLeader) {
+          pendingAppends.clear()

Review comment:
       Even if the leader has changed back, previously pending appends may still get committed right? I think it is sufficient to just poll-and-drop them in `handleCommit` when leader changes and not clearing the queue here.

##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -148,36 +162,44 @@ public KafkaRaftClient(RaftConfig raftConfig,
             raftConfig.retryBackoffMs(),
             raftConfig.requestTimeoutMs(),
             1000,
+            raftConfig.lingerMs(),
             logContext,
             new Random());
     }
 
-    public KafkaRaftClient(NetworkChannel channel,
-                           ReplicatedLog log,
-                           QuorumState quorum,
-                           Time time,
-                           Metrics metrics,
-                           FuturePurgatory<LogOffset> fetchPurgatory,
-                           FuturePurgatory<LogOffset> appendPurgatory,
-                           Map<Integer, InetSocketAddress> voterAddresses,
-                           int electionBackoffMaxMs,
-                           int retryBackoffMs,
-                           int requestTimeoutMs,
-                           int fetchMaxWaitMs,
-                           LogContext logContext,
-                           Random random) {
+    public KafkaRaftClient(
+        RecordSerde<T> serde,
+        NetworkChannel channel,
+        ReplicatedLog log,
+        QuorumState quorum,
+        MemoryPool memoryPool,
+        Time time,
+        Metrics metrics,
+        FuturePurgatory<LogOffset> fetchPurgatory,
+        FuturePurgatory<LogOffset> appendPurgatory,
+        Map<Integer, InetSocketAddress> voterAddresses,
+        int electionBackoffMaxMs,
+        int retryBackoffMs,
+        int requestTimeoutMs,
+        int fetchMaxWaitMs,
+        int lingerMs,

Review comment:
       nit: `appendLingerMs`?

##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -1443,15 +1485,79 @@ private void pollShutdown(GracefulShutdown shutdown) throws IOException {
         }
     }
 
+    private void appendBatch(
+        LeaderState state,
+        BatchAccumulator.CompletedBatch<T> batch,
+        long appendTimeMs
+    ) {
+        try {
+            List<T> records = batch.records;
+            int epoch = state.epoch();
+
+            LogAppendInfo info = appendAsLeader(batch.data);
+            OffsetAndEpoch offsetAndEpoch = new OffsetAndEpoch(info.lastOffset, epoch);
+            CompletableFuture<Long> future = appendPurgatory.await(
+                LogOffset.awaitCommitted(offsetAndEpoch.offset),
+                Integer.MAX_VALUE
+            );
+
+            future.whenComplete((commitTimeMs, exception) -> {
+                int numRecords = batch.records.size();
+                if (exception != null) {
+                    logger.debug("Failed to commit {} records at {}", numRecords, offsetAndEpoch, exception);
+                } else {
+                    long elapsedTime = Math.max(0, commitTimeMs - appendTimeMs);
+                    double elapsedTimePerRecord = (double) elapsedTime / numRecords;
+                    kafkaRaftMetrics.updateCommitLatency(elapsedTimePerRecord, appendTimeMs);
+                    logger.debug("Completed commit of {} records at {}", numRecords, offsetAndEpoch);
+                    listener.handleCommit(epoch, info.lastOffset, records);
+                }
+            });
+        } finally {
+            batch.release();
+        }
+    }
+
+    private long maybeAppendBatches(
+        LeaderState state,
+        long currentTimeMs
+    ) {
+        long timeUnitFlush = accumulator.timeUntilFlush(currentTimeMs);
+        if (timeUnitFlush <= 0) {
+            List<BatchAccumulator.CompletedBatch<T>> batches = accumulator.flush();
+            Iterator<BatchAccumulator.CompletedBatch<T>> iterator = batches.iterator();
+
+            try {
+                while (iterator.hasNext()) {
+                    BatchAccumulator.CompletedBatch<T> batch = iterator.next();
+                    appendBatch(state, batch, currentTimeMs);
+                }
+                flushLeaderLog(state, currentTimeMs);

Review comment:
       For now since we only have a single thread processing all incoming req/resp, this is okay; but when we multi-thread processing requests this would no longer be safe, since it is possible that some batches gets replicated and committed while not being flushed locally yet.

##########
File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchBuilder.java
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import org.apache.kafka.common.protocol.DataOutputStreamWritable;
+import org.apache.kafka.common.record.AbstractRecords;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.DefaultRecord;
+import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.apache.kafka.common.utils.ByteUtils;
+import org.apache.kafka.raft.RecordSerde;
+
+import java.io.DataOutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+public class BatchBuilder<T> {

Review comment:
       nit: can we consolidate this class with the existing one (of course, it needs to be generalized on T record) than creating a new class? Ditto for BatchAccumulator and BatchMemoryPool.

##########
File path: raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java
##########
@@ -104,29 +62,17 @@ public synchronized boolean isWritable() {
     public synchronized void increment() {
         if (!isWritable())
             throw new KafkaException("Counter is not currently writable");
-        int initialValue = uncommitted.get();
-        int incrementedValue = uncommitted.incrementAndGet();
-        Records records = MemoryRecords.withRecords(CompressionType.NONE, serialize(incrementedValue));
-        client.append(records, AckMode.LEADER, Integer.MAX_VALUE).whenComplete((offsetAndEpoch, throwable) -> {
-            if (offsetAndEpoch != null) {
-                log.debug("Appended increment at offset {}: {} -> {}",
-                    offsetAndEpoch.offset, initialValue, incrementedValue);
-            } else {
-                uncommitted.set(initialValue);
-                log.debug("Failed append of increment: {} -> {}", initialValue, incrementedValue, throwable);
-            }
-        });
-    }
-
-    private SimpleRecord serialize(int value) {
-        ByteBuffer buffer = ByteBuffer.allocate(4);
-        Type.INT32.write(buffer, value);
-        buffer.flip();
-        return new SimpleRecord(buffer);
+        uncommitted += 1;
+        long offset = client.scheduleAppend(currentLeaderAndEpoch.epoch, Collections.singletonList(uncommitted));

Review comment:
       Should we handle `null / max-value` here?

##########
File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import org.apache.kafka.common.memory.MemoryPool;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.apache.kafka.raft.RecordSerde;
+
+import java.io.Closeable;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * TODO: Also flush after minimum size limit is reached?
+ */
+public class BatchAccumulator<T> implements Closeable {
+    private final int epoch;
+    private final Time time;
+    private final Timer lingerTimer;
+    private final int lingerMs;
+    private final int maxBatchSize;
+    private final CompressionType compressionType;
+    private final MemoryPool memoryPool;
+    private final ReentrantLock lock;
+    private final RecordSerde<T> serde;
+
+    private long nextOffset;
+    private BatchBuilder<T> currentBatch;
+    private List<CompletedBatch<T>> completed;
+
+    public BatchAccumulator(
+        int epoch,
+        long baseOffset,
+        int lingerMs,
+        int maxBatchSize,
+        MemoryPool memoryPool,
+        Time time,
+        CompressionType compressionType,
+        RecordSerde<T> serde
+    ) {
+        this.epoch = epoch;
+        this.lingerMs = lingerMs;
+        this.maxBatchSize = maxBatchSize;
+        this.memoryPool = memoryPool;
+        this.time = time;
+        this.lingerTimer = time.timer(lingerMs);
+        this.compressionType = compressionType;
+        this.serde = serde;
+        this.nextOffset = baseOffset;
+        this.completed = new ArrayList<>();
+        this.lock = new ReentrantLock();
+    }
+
+    /**
+     * Append a list of records into an atomic batch. We guarantee all records
+     * are included in the same underlying record batch so that either all of
+     * the records become committed or none of them do.
+     *
+     * @param epoch the expected leader epoch
+     * @param records the list of records to include in a batch
+     * @return the offset of the last message or {@link Long#MAX_VALUE} if the epoch
+     *         does not match
+     */
+    public Long append(int epoch, List<T> records) {
+        if (epoch != this.epoch) {
+            // If the epoch does not match, then the state machine probably
+            // has not gotten the notification about the latest epoch change.
+            // In this case, ignore the append and return a large offset value
+            // which will never be committed.
+            return Long.MAX_VALUE;
+        }
+
+        Object serdeContext = serde.newWriteContext();
+        int batchSize = 0;
+        for (T record : records) {
+            batchSize += serde.recordSize(record, serdeContext);
+        }
+
+        if (batchSize > maxBatchSize) {
+            throw new IllegalArgumentException("The total size of " + records + " is " + batchSize +
+                ", which exceeds the maximum allowed batch size of " + maxBatchSize);
+        }
+
+        lock.lock();
+        try {
+            BatchBuilder<T> batch = maybeAllocateBatch(batchSize);
+            if (batch == null) {
+                return null;
+            }
+
+            if (isEmpty()) {
+                lingerTimer.update();
+                lingerTimer.reset(lingerMs);
+            }
+
+            for (T record : records) {
+                batch.appendRecord(record, serdeContext);
+                nextOffset += 1;
+            }
+
+            return nextOffset - 1;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    private BatchBuilder<T> maybeAllocateBatch(int batchSize) {
+        if (currentBatch == null) {
+            startNewBatch();
+        } else if (!currentBatch.hasRoomFor(batchSize)) {
+            completeCurrentBatch();
+        }
+        return currentBatch;
+    }
+
+    private void completeCurrentBatch() {
+        MemoryRecords data = currentBatch.build();
+        completed.add(new CompletedBatch<>(
+            currentBatch.baseOffset(),
+            currentBatch.records(),
+            data,
+            memoryPool,
+            currentBatch.initialBuffer()
+        ));
+        currentBatch = null;
+        startNewBatch();
+    }
+
+    private void startNewBatch() {
+        ByteBuffer buffer = memoryPool.tryAllocate(maxBatchSize);
+        if (buffer != null) {
+            currentBatch = new BatchBuilder<>(
+                buffer,
+                serde,
+                compressionType,
+                nextOffset,
+                time.milliseconds(),
+                false,
+                RecordBatch.NO_PARTITION_LEADER_EPOCH,
+                maxBatchSize
+            );
+        }
+    }
+
+    /**
+     * Check whether there are any batches which need flushing now.
+     *
+     * @param currentTimeMs current time in milliseconds
+     * @return true if there are batches ready to flush, false otherwise
+     */
+    public boolean needsFlush(long currentTimeMs) {
+        return timeUntilFlush(currentTimeMs) <= 0;
+    }
+
+    /**
+     * Check the time remaining until the next needed flush. If the accumulator
+     * is empty, then {@link Long#MAX_VALUE} will be returned.
+     *
+     * @param currentTimeMs current time in milliseconds
+     * @return the delay in milliseconds before the next expected flush
+     */
+    public long timeUntilFlush(long currentTimeMs) {
+        lock.lock();
+        try {
+            lingerTimer.update(currentTimeMs);
+            if (isEmpty()) {
+                return Long.MAX_VALUE;
+            } else {
+                return lingerTimer.remainingMs();
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    private boolean isEmpty() {
+        lock.lock();
+        try {
+            if (currentBatch != null && currentBatch.nonEmpty()) {
+                return false;
+            } else {
+                return completed.isEmpty();
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Get the leader epoch, which is constant for each instance.
+     *
+     * @return the leader epoch
+     */
+    public int epoch() {
+        return epoch;
+    }
+
+    /**
+     * Flush completed batches. The caller is expected to first check whether
+     * a flush is expected using {@link #needsFlush(long)} in order to avoid
+     * unnecessary flushing.
+     *
+     * @return the list of completed batches
+     */
+    public List<CompletedBatch<T>> flush() {
+        lock.lock();
+        try {
+            if (currentBatch != null && currentBatch.nonEmpty()) {
+                completeCurrentBatch();
+            }
+
+            List<CompletedBatch<T>> res = completed;
+            this.completed = new ArrayList<>();
+            return res;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Get the number of batches including the one that is currently being
+     * written to (if it exists).
+     *
+     * @return
+     */
+    public int count() {

Review comment:
       Is this going to be used for non-testing code in the future? If it is only going to be for metrics purposes maybe we can allow it to be non thread-safe just to not blocking on other critical paths.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org