You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/11/24 20:01:05 UTC

kafka git commit: KAFKA-6261; Fix exception thrown by request logging if acks=0

Repository: kafka
Updated Branches:
  refs/heads/trunk 49b773bfc -> be2918b3a


KAFKA-6261; Fix exception thrown by request logging if acks=0

Only expect responseAsString to be set if request logging is
enabled _and_ responseSend is defined.

Also fixed a couple of issues that would manifest themselves
if trace logging is enabled:

- `MemoryRecords.toString` should not throw exception if data is corrupted
- Generate `responseString` correctly if unsupported api versions request is
received.

Unit tests were added for every issue fixed. Also changed
SocketServerTest to run with trace logging enabled as
request logging breakage has been a common issue.

Author: Ismael Juma <is...@juma.me.uk>

Reviewers: Rajini Sivaram <ra...@googlemail.com>

Closes #4250 from ijuma/fix-issues-when-trace-logging-is-enabled


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/be2918b3
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/be2918b3
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/be2918b3

Branch: refs/heads/trunk
Commit: be2918b3a04f669a1264f5d1b0be3cdb6915b2fa
Parents: 49b773b
Author: Ismael Juma <is...@juma.me.uk>
Authored: Fri Nov 24 20:00:00 2017 +0000
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Fri Nov 24 20:00:00 2017 +0000

----------------------------------------------------------------------
 .../kafka/common/record/MemoryRecords.java      | 31 +++++++++++----
 .../kafka/common/requests/RequestContext.java   | 15 +++----
 .../kafka/common/record/MemoryRecordsTest.java  | 31 +++++++++++++++
 .../common/record/SimpleMemoryRecordsTest.java  | 41 ++++++++++++++++++++
 .../common/requests/RequestContextTest.java     |  1 +
 .../scala/kafka/network/RequestChannel.scala    |  7 +++-
 .../src/main/scala/kafka/server/KafkaApis.scala |  2 +-
 .../scala/kafka/server/ReplicaManager.scala     |  2 +-
 .../unit/kafka/network/SocketServerTest.scala   | 41 ++++++++++++++++----
 9 files changed, 144 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/be2918b3/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index 2a25aad..932d4b6 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.common.record;
 
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention;
 import org.apache.kafka.common.utils.ByteBufferOutputStream;
@@ -280,22 +281,36 @@ public class MemoryRecords extends AbstractRecords {
 
     @Override
     public String toString() {
-        Iterator<Record> iter = records().iterator();
         StringBuilder builder = new StringBuilder();
         builder.append('[');
-        while (iter.hasNext()) {
-            Record record = iter.next();
-            builder.append('(');
-            builder.append("record=");
-            builder.append(record);
-            builder.append(")");
-            if (iter.hasNext())
+
+        Iterator<MutableRecordBatch> batchIterator = batches.iterator();
+        while (batchIterator.hasNext()) {
+            RecordBatch batch = batchIterator.next();
+            try (CloseableIterator<Record> recordsIterator = batch.streamingIterator(BufferSupplier.create())) {
+                while (recordsIterator.hasNext()) {
+                    Record record = recordsIterator.next();
+                    appendRecordToStringBuilder(builder, record.toString());
+                    if (recordsIterator.hasNext())
+                        builder.append(", ");
+                }
+            } catch (KafkaException e) {
+                appendRecordToStringBuilder(builder, "CORRUPTED");
+            }
+            if (batchIterator.hasNext())
                 builder.append(", ");
         }
         builder.append(']');
         return builder.toString();
     }
 
+    private void appendRecordToStringBuilder(StringBuilder builder, String recordAsString) {
+        builder.append('(')
+            .append("record=")
+            .append(recordAsString)
+            .append(")");
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o)

http://git-wip-us.apache.org/repos/asf/kafka/blob/be2918b3/clients/src/main/java/org/apache/kafka/common/requests/RequestContext.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RequestContext.java b/clients/src/main/java/org/apache/kafka/common/requests/RequestContext.java
index 232c18a..663d746 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/RequestContext.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/RequestContext.java
@@ -75,17 +75,18 @@ public class RequestContext {
 
     public Send buildResponse(AbstractResponse body) {
         ResponseHeader responseHeader = header.toResponseHeader();
-        short version = header.apiVersion();
-
-        // Use v0 when serializing an unhandled ApiVersion response
-        if (isUnsupportedApiVersionsRequest())
-            version = 0;
-
-        return body.toSend(connectionId, responseHeader, version);
+        return body.toSend(connectionId, responseHeader, apiVersion());
     }
 
     private boolean isUnsupportedApiVersionsRequest() {
         return header.apiKey() == API_VERSIONS && !API_VERSIONS.isVersionSupported(header.apiVersion());
     }
 
+    public short apiVersion() {
+        // Use v0 when serializing an unhandled ApiVersion response
+        if (isUnsupportedApiVersionsRequest())
+            return 0;
+        return header.apiVersion();
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/be2918b3/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
index de00378..e1409e0 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
@@ -37,6 +37,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 @RunWith(value = Parameterized.class)
 public class MemoryRecordsTest {
@@ -607,6 +608,36 @@ public class MemoryRecordsTest {
     }
 
     @Test
+    public void testToString() {
+        long timestamp = 1000000;
+        MemoryRecords memoryRecords = MemoryRecords.withRecords(magic, compression,
+                new SimpleRecord(timestamp, "key1".getBytes(), "value1".getBytes()),
+                new SimpleRecord(timestamp + 1, "key2".getBytes(), "value2".getBytes()));
+        switch (magic) {
+            case RecordBatch.MAGIC_VALUE_V0:
+                assertEquals("[(record=LegacyRecordBatch(offset=0, Record(magic=0, attributes=0, compression=NONE, " +
+                                "crc=1978725405, key=4 bytes, value=6 bytes))), (record=LegacyRecordBatch(offset=1, Record(magic=0, " +
+                                "attributes=0, compression=NONE, crc=1964753830, key=4 bytes, value=6 bytes)))]",
+                        memoryRecords.toString());
+                break;
+            case RecordBatch.MAGIC_VALUE_V1:
+                assertEquals("[(record=LegacyRecordBatch(offset=0, Record(magic=1, attributes=0, compression=NONE, " +
+                        "crc=97210616, CreateTime=1000000, key=4 bytes, value=6 bytes))), (record=LegacyRecordBatch(offset=1, " +
+                        "Record(magic=1, attributes=0, compression=NONE, crc=3535988507, CreateTime=1000001, key=4 bytes, " +
+                        "value=6 bytes)))]",
+                        memoryRecords.toString());
+                break;
+            case RecordBatch.MAGIC_VALUE_V2:
+                assertEquals("[(record=DefaultRecord(offset=0, timestamp=1000000, key=4 bytes, value=6 bytes)), " +
+                                "(record=DefaultRecord(offset=1, timestamp=1000001, key=4 bytes, value=6 bytes))]",
+                        memoryRecords.toString());
+                break;
+            default:
+                fail("Unexpected magic " + magic);
+        }
+    }
+
+    @Test
     public void testFilterTo() {
         ByteBuffer buffer = ByteBuffer.allocate(2048);
         MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 0L);

http://git-wip-us.apache.org/repos/asf/kafka/blob/be2918b3/clients/src/test/java/org/apache/kafka/common/record/SimpleMemoryRecordsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/SimpleMemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/SimpleMemoryRecordsTest.java
new file mode 100644
index 0000000..2909566
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/record/SimpleMemoryRecordsTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.record;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Non-parameterized MemoryRecords tests.
+ */
+public class SimpleMemoryRecordsTest {
+
+    @Test
+    public void testToStringIfLz4ChecksumIsCorrupted() {
+        long timestamp = 1000000;
+        MemoryRecords memoryRecords = MemoryRecords.withRecords(CompressionType.LZ4,
+                new SimpleRecord(timestamp, "key1".getBytes(), "value1".getBytes()),
+                new SimpleRecord(timestamp + 1, "key2".getBytes(), "value2".getBytes()));
+        // Change the lz4 checksum value (not the kafka record crc) so that it doesn't match the contents
+        int lz4ChecksumOffset = 6;
+        memoryRecords.buffer().array()[DefaultRecordBatch.RECORD_BATCH_OVERHEAD + lz4ChecksumOffset] = 0;
+        assertEquals("[(record=CORRUPTED)]", memoryRecords.toString());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/be2918b3/clients/src/test/java/org/apache/kafka/common/requests/RequestContextTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestContextTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestContextTest.java
index baf0faf..ed50b93 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestContextTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestContextTest.java
@@ -41,6 +41,7 @@ public class RequestContextTest {
         RequestHeader header = new RequestHeader(ApiKeys.API_VERSIONS, Short.MAX_VALUE, "", correlationId);
         RequestContext context = new RequestContext(header, "0", InetAddress.getLocalHost(), KafkaPrincipal.ANONYMOUS,
                 new ListenerName("ssl"), SecurityProtocol.SASL_SSL);
+        assertEquals(0, context.apiVersion());
 
         // Write some garbage to the request buffer. This should be ignored since we will treat
         // the unknown version type as v0 which has an empty request body.

http://git-wip-us.apache.org/repos/asf/kafka/blob/be2918b3/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 7cc8619..561ec8d 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -177,8 +177,11 @@ object RequestChannel extends Logging {
 
       if (isRequestLoggingEnabled) {
         val detailsEnabled = requestLogger.underlying.isTraceEnabled
-        val responseString = response.responseAsString.getOrElse(
-          throw new IllegalStateException("responseAsString should always be defined if request logging is enabled"))
+        val responseString =
+          if (response.responseSend.isDefined)
+            response.responseAsString.getOrElse(
+              throw new IllegalStateException("responseAsString should always be defined if request logging is enabled"))
+          else ""
 
         val builder = new StringBuilder(256)
         builder.append("Completed request:").append(requestDesc(detailsEnabled))

http://git-wip-us.apache.org/repos/asf/kafka/blob/be2918b3/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index a31b6c3..f52a720 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -2076,7 +2076,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       case Some(response) =>
         val responseSend = request.context.buildResponse(response)
         val responseString =
-          if (RequestChannel.isRequestLoggingEnabled) Some(response.toString(request.context.header.apiVersion))
+          if (RequestChannel.isRequestLoggingEnabled) Some(response.toString(request.context.apiVersion))
           else None
         requestChannel.sendResponse(new RequestChannel.Response(request, Some(responseSend), SendAction, responseString))
       case None =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/be2918b3/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 1344660..61c430a 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -724,7 +724,7 @@ class ReplicaManager(val config: KafkaConfig,
                                isFromClient: Boolean,
                                entriesPerPartition: Map[TopicPartition, MemoryRecords],
                                requiredAcks: Short): Map[TopicPartition, LogAppendResult] = {
-    trace("Append [%s] to local log ".format(entriesPerPartition))
+    trace(s"Append [$entriesPerPartition] to local log")
     entriesPerPartition.map { case (topicPartition, records) =>
       brokerTopicStats.topicStats(topicPartition.topic).totalProduceRequestRate.mark()
       brokerTopicStats.allTopicsStats.totalProduceRequestRate.mark()

http://git-wip-us.apache.org/repos/asf/kafka/blob/be2918b3/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 024e7f9..f1f302a 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -39,9 +39,11 @@ import org.apache.kafka.common.record.MemoryRecords
 import org.apache.kafka.common.requests.{AbstractRequest, ProduceRequest, RequestHeader}
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.apache.kafka.common.utils.{LogContext, MockTime, Time}
+import org.apache.log4j.Level
 import org.junit.Assert._
 import org.junit._
 import org.scalatest.junit.JUnitSuite
+import org.slf4j.{Logger, LoggerFactory}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
@@ -70,6 +72,22 @@ class SocketServerTest extends JUnitSuite {
   val server = new SocketServer(config, metrics, Time.SYSTEM, credentialProvider)
   server.startup()
   val sockets = new ArrayBuffer[Socket]
+  private var logLevelToRestore: Level = _
+
+  @Before
+  def setUp(): Unit = {
+    // Run the tests with TRACE logging to exercise request logging path
+    logLevelToRestore = org.apache.log4j.LogManager.getRootLogger.getLevel
+    org.apache.log4j.LogManager.getLogger("kafka").setLevel(Level.TRACE)
+  }
+
+  @After
+  def tearDown() {
+    shutdownServerAndMetrics(server)
+    sockets.foreach(_.close())
+    sockets.clear()
+    org.apache.log4j.LogManager.getLogger("kafka").setLevel(logLevelToRestore)
+  }
 
   def sendRequest(socket: Socket, request: Array[Byte], id: Option[Short] = None, flush: Boolean = true) {
     val outgoing = new DataOutputStream(socket.getOutputStream)
@@ -111,7 +129,7 @@ class SocketServerTest extends JUnitSuite {
     byteBuffer.rewind()
 
     val send = new NetworkSend(request.context.connectionId, byteBuffer)
-    channel.sendResponse(new RequestChannel.Response(request, Some(send), SendAction, None))
+    channel.sendResponse(new RequestChannel.Response(request, Some(send), SendAction, Some(request.header.toString)))
   }
 
   def connect(s: SocketServer = server, protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT) = {
@@ -138,13 +156,6 @@ class SocketServerTest extends JUnitSuite {
     server.metrics.close()
   }
 
-  @After
-  def tearDown() {
-    shutdownServerAndMetrics(server)
-    sockets.foreach(_.close())
-    sockets.clear()
-  }
-
   private def producerRequestBytes: Array[Byte] = {
     val correlationId = -1
     val clientId = ""
@@ -207,6 +218,20 @@ class SocketServerTest extends JUnitSuite {
   }
 
   @Test
+  def testNoOpAction(): Unit = {
+    val plainSocket = connect(protocol = SecurityProtocol.PLAINTEXT)
+    val serializedBytes = producerRequestBytes
+
+    for (_ <- 0 until 3)
+      sendRequest(plainSocket, serializedBytes)
+    for (_ <- 0 until 3) {
+      val request = receiveRequest(server.requestChannel)
+      assertNotNull("receiveRequest timed out", request)
+      server.requestChannel.sendResponse(new RequestChannel.Response(request, None, RequestChannel.NoOpAction, None))
+    }
+  }
+
+  @Test
   def testConnectionId() {
     val sockets = (1 to 5).map(_ => connect(protocol = SecurityProtocol.PLAINTEXT))
     val serializedBytes = producerRequestBytes