You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2019/11/02 15:42:55 UTC

[kafka] branch trunk updated: HOTFIX: Try to complete Send even if no bytes were written (#7622)

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7bdbdf1  HOTFIX: Try to complete Send even if no bytes were written (#7622)
7bdbdf1 is described below

commit 7bdbdf19009709e83cb8c679a0c1c0972746555c
Author: Ismael Juma <is...@juma.me.uk>
AuthorDate: Sat Nov 2 08:42:30 2019 -0700

    HOTFIX: Try to complete Send even if no bytes were written (#7622)
    
    If there are pending bytes in the transport layer, we may
    complete a send even if no bytes were recorded as written.
    We assume bytes are written when they are in the netWriteBuffer,
    but we only consider the send as completed when it's in
    the socket channel buffer.
    
    This fixes a regression introduced via 0971f66ff546. The impact is
    that we would sometimes throw the following exception in
    `MultiRecordsSend.writeTo`:
    
    ```java
    if (completed())
        throw new KafkaException("This operation cannot be invoked on a complete request.");
    ```
    
    Added unit test verifying the bug fix. While in the area, I simplified one of the
    `SslSelectorTest` methods.
    
    Reviewers: Jun Rao <ju...@gmail.com>, Rajini Sivaram <ra...@googlemail.com>
---
 .../org/apache/kafka/common/network/Selector.java  | 52 +++++++++++++---------
 .../apache/kafka/common/network/SelectorTest.java  | 11 +++++
 .../kafka/common/network/SslSelectorTest.java      | 16 +++----
 3 files changed, 49 insertions(+), 30 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index 5096f12..072406e 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -584,26 +584,11 @@ public class Selector implements Selectable, AutoCloseable {
                 /* if channel is ready write to any sockets that have space in their buffer and for which we have data */
 
                 long nowNanos = channelStartTimeNanos != 0 ? channelStartTimeNanos : currentTimeNanos;
-                if (channel.hasSend()
-                        && channel.ready()
-                        && key.isWritable()
-                        && !channel.maybeBeginClientReauthentication(() -> nowNanos)) {
-                    try {
-                        long bytesSent = channel.write();
-                        if (bytesSent > 0) {
-                            long currentTimeMs = time.milliseconds();
-                            this.sensors.recordBytesSent(nodeId, bytesSent, currentTimeMs);
-
-                            Send send = channel.maybeCompleteSend();
-                            if (send != null) {
-                                this.completedSends.add(send);
-                                this.sensors.recordCompletedSend(nodeId, send.size(), currentTimeMs);
-                            }
-                        }
-                    } catch (Exception e) {
-                        sendFailed = true;
-                        throw e;
-                    }
+                try {
+                    attemptWrite(key, channel, nowNanos);
+                } catch (Exception e) {
+                    sendFailed = true;
+                    throw e;
                 }
 
                 /* cancel any defunct sockets */
@@ -639,6 +624,33 @@ public class Selector implements Selectable, AutoCloseable {
         }
     }
 
+    private void attemptWrite(SelectionKey key, KafkaChannel channel, long nowNanos) throws IOException {
+        if (channel.hasSend()
+                && channel.ready()
+                && key.isWritable()
+                && !channel.maybeBeginClientReauthentication(() -> nowNanos)) {
+            write(channel);
+        }
+    }
+
+    // package-private for testing
+    void write(KafkaChannel channel) throws IOException {
+        String nodeId = channel.id();
+        long bytesSent = channel.write();
+        Send send = channel.maybeCompleteSend();
+        // We may complete the send with bytesSent < 1 if `TransportLayer.hasPendingWrites` was true and `channel.write()`
+        // caused the pending writes to be written to the socket channel buffer
+        if (bytesSent > 0 || send != null) {
+            long currentTimeMs = time.milliseconds();
+            if (bytesSent > 0)
+                this.sensors.recordBytesSent(nodeId, bytesSent, currentTimeMs);
+            if (send != null) {
+                this.completedSends.add(send);
+                this.sensors.recordCompletedSend(nodeId, send.size(), currentTimeMs);
+            }
+        }
+    }
+
     private Collection<SelectionKey> determineHandlingOrder(Set<SelectionKey> selectionKeys) {
         //it is possible that the iteration order over selectionKeys is the same every invocation.
         //this may cause starvation of reads when memory is low. to address this we shuffle the keys if memory is low.
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
index 785e86d..85df3a5 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
@@ -53,6 +53,7 @@ import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import static java.util.Arrays.asList;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -773,6 +774,16 @@ public class SelectorTest {
         assertEquals(1, metrics.metrics().size());
     }
 
+    @Test
+    public void testWriteCompletesSendWithNoBytesWritten() throws IOException {
+        KafkaChannel channel = mock(KafkaChannel.class);
+        when(channel.id()).thenReturn("1");
+        when(channel.write()).thenReturn(0L);
+        ByteBufferSend send = new ByteBufferSend("destination", ByteBuffer.allocate(0));
+        when(channel.maybeCompleteSend()).thenReturn(send);
+        selector.write(channel);
+        assertEquals(asList(send), selector.completedSends());
+    }
 
     private String blockingRequest(String node, String s) throws IOException {
         selector.send(createSend(node, s));
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
index 19e7a8b..6889a4d 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
@@ -31,7 +31,6 @@ import org.apache.kafka.common.security.ssl.mock.TestTrustManagerFactory;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestSslUtils;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
@@ -157,15 +156,12 @@ public class SslSelectorTest extends SelectorTest {
     }
 
     private void waitForBytesBuffered(Selector selector, String node) throws Exception {
-        TestUtils.waitForCondition(new TestCondition() {
-            @Override
-            public boolean conditionMet() {
-                try {
-                    selector.poll(0L);
-                    return selector.channel(node).hasBytesBuffered();
-                } catch (IOException e) {
-                    throw new RuntimeException(e);
-                }
+        TestUtils.waitForCondition(() -> {
+            try {
+                selector.poll(0L);
+                return selector.channel(node).hasBytesBuffered();
+            } catch (IOException e) {
+                throw new RuntimeException(e);
             }
         }, 2000L, "Failed to reach socket state with bytes buffered");
     }