You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/12/09 04:53:44 UTC

[GitHub] [kafka] guozhangwang commented on a change in pull request #9716: KAFKA-10826; Ensure raft io thread respects linger timeout

guozhangwang commented on a change in pull request #9716:
URL: https://github.com/apache/kafka/pull/9716#discussion_r538997549



##########
File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##########
@@ -271,22 +272,19 @@ public int epoch() {
         }
     }
 
+    public boolean isEmpty() {
+        // The linger timer begins running when we have pending batches.
+        // We use this to infer when the accumulator is empty to avoid the

Review comment:
       Nice one.

##########
File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##########
@@ -271,22 +272,19 @@ public int epoch() {
         }
     }
 
+    public boolean isEmpty() {
+        // The linger timer begins running when we have pending batches.
+        // We use this to infer when the accumulator is empty to avoid the
+        // need to acquire the append lock.
+        return !lingerTimer.isRunning();
+    }
+
     /**
      * Get the number of batches including the one that is currently being
      * written to (if it exists).
      */
-    public int count() {
-        appendLock.lock();
-        try {
-            int count = completed.size();
-            if (currentBatch != null) {
-                return count + 1;
-            } else {
-                return count;
-            }
-        } finally {
-            appendLock.unlock();
-        }
+    public int numCompletedBatches() {

Review comment:
       Does the javadoc above need updates?

##########
File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
##########
@@ -477,6 +477,74 @@ public void testAccumulatorClearedAfterBecomingUnattached() throws Exception {
         Mockito.verify(memoryPool).release(buffer);
     }
 
+    @Test
+    public void testChannelWokenUpIfLingerTimeoutReachedWithoutAppend() throws Exception {
+        // This test verifies that the client will set its poll timeout accounting
+        // for the lingerMs of a pending append
+
+        int localId = 0;
+        int otherNodeId = 1;
+        int lingerMs = 50;
+        Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
+
+        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters)
+            .withAppendLingerMs(lingerMs)
+            .build();
+
+        context.becomeLeader();
+        assertEquals(OptionalInt.of(localId), context.currentLeader());
+        assertEquals(1L, context.log.endOffset().offset);
+
+        int epoch = context.currentEpoch();
+        assertEquals(1L, context.client.scheduleAppend(epoch, singletonList("a")));
+        assertTrue(context.channel.wakeupRequested());
+
+        context.client.poll();
+        assertEquals(OptionalLong.of(lingerMs), context.channel.lastReceiveTimeout());
+
+        context.time.sleep(25);

Review comment:
       nit: maybe we split 50 to 20/30 to avoid some reading difficulty? :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org