You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2016/12/20 19:17:46 UTC

kafka git commit: KAFKA-4554; Fix ReplicaBuffer.verifyChecksum to use iterators instead of iterables

Repository: kafka
Updated Branches:
  refs/heads/trunk 7f4b278c0 -> 3930dd7e7


KAFKA-4554; Fix ReplicaBuffer.verifyChecksum to use iterators instead of iterables

This was changed in b58b6a1bef0 and caused the `ReplicaVerificationToolTest.test_replica_lags`
system test to start failing.

I also added a unit test and a couple of other minor clean-ups.

Author: Ismael Juma <is...@juma.me.uk>

Reviewers: Jason Gustafson <ja...@confluent.io>

Closes #2280 from ijuma/kafka-4554-fix-replica-buffer-verify-checksum


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3930dd7e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3930dd7e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3930dd7e

Branch: refs/heads/trunk
Commit: 3930dd7e7502ab94a7594a0ca41f27638663ae7c
Parents: 7f4b278
Author: Ismael Juma <is...@juma.me.uk>
Authored: Tue Dec 20 11:01:46 2016 -0800
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Tue Dec 20 11:01:46 2016 -0800

----------------------------------------------------------------------
 .../java/org/apache/kafka/test/TestUtils.java   | 24 --------
 .../kafka/tools/ReplicaVerificationTool.scala   | 21 +++----
 .../tools/ReplicaVerificationToolTest.scala     | 60 ++++++++++++++++++++
 .../unit/kafka/server/ProduceRequestTest.scala  | 19 +++----
 .../apache/kafka/streams/KafkaStreamsTest.java  |  2 +-
 5 files changed, 78 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3930dd7e/clients/src/test/java/org/apache/kafka/test/TestUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
index 428b5a0..c39f402 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
@@ -21,12 +21,6 @@ import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.record.CompressionType;
-import org.apache.kafka.common.record.MemoryRecords;
-import org.apache.kafka.common.record.MemoryRecordsBuilder;
-import org.apache.kafka.common.record.Record;
-import org.apache.kafka.common.record.Records;
-import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.utils.Utils;
 
 import javax.xml.bind.DatatypeConverter;
@@ -180,24 +174,6 @@ public class TestUtils {
         return file;
     }
 
-    /**
-     * Create a records buffer including the offset and message size at the start, which is required if the buffer is to
-     * be sent as part of `ProduceRequest`. This is the reason why we can't use
-     * `Record(long timestamp, byte[] key, byte[] value, CompressionType type, int valueOffset, int valueSize)` as this
-     * constructor does not include either of these fields.
-     */
-    public static ByteBuffer partitionRecordsBuffer(final long offset, final CompressionType compressionType, final Record... records) {
-        int bufferSize = 0;
-        for (final Record record : records)
-            bufferSize += Records.LOG_OVERHEAD + record.sizeInBytes();
-        final ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
-        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, compressionType, TimestampType.CREATE_TIME);
-        long nextOffset = offset;
-        for (final Record record : records)
-            builder.append(nextOffset++, record);
-        return builder.build().buffer();
-    }
-
     public static Properties producerConfig(final String bootstrapServers,
                                             final Class keySerializer,
                                             final Class valueSerializer,

http://git-wip-us.apache.org/repos/asf/kafka/blob/3930dd7e/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
index 98e0414..f6d5153 100644
--- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
+++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
@@ -252,9 +252,8 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa
       val offsetRequest = OffsetRequest(initialOffsetMap)
       val offsetResponse = consumer.getOffsetsBefore(offsetRequest)
       assert(!offsetResponse.hasError, offsetResponseStringWithError(offsetResponse))
-      offsetResponse.partitionErrorAndOffsets.foreach{
-        case (topicAndPartition, partitionOffsetResponse) =>
-          fetchOffsetMap.put(topicAndPartition, partitionOffsetResponse.offsets.head)
+      offsetResponse.partitionErrorAndOffsets.foreach { case (topicAndPartition, partitionOffsetResponse) =>
+        fetchOffsetMap.put(topicAndPartition, partitionOffsetResponse.offsets.head)
       }
     }
   }
@@ -267,7 +266,7 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa
     fetchOffsetMap.get(topicAndPartition)
   }
 
-  def verifyCheckSum() {
+  def verifyCheckSum(println: String => Unit) {
     debug("Begin verification")
     maxLag = -1L
     for ((topicAndPartition, fetchResponsePerReplica) <- messageSetCache) {
@@ -275,8 +274,8 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa
       assert(fetchResponsePerReplica.size == expectedReplicasPerTopicAndPartition(topicAndPartition),
             "fetched " + fetchResponsePerReplica.size + " replicas for " + topicAndPartition + ", but expected "
             + expectedReplicasPerTopicAndPartition(topicAndPartition) + " replicas")
-      val logEntriesMap = fetchResponsePerReplica.map { case (replicaId, fetchResponse) =>
-        replicaId -> fetchResponse.messages.asInstanceOf[ByteBufferMessageSet].asRecords.shallowEntries.asScala
+      val logEntryIteratorMap = fetchResponsePerReplica.map { case (replicaId, fetchResponse) =>
+        replicaId -> fetchResponse.messages.asInstanceOf[ByteBufferMessageSet].asRecords.shallowEntries.iterator
       }
       val maxHw = fetchResponsePerReplica.values.map(_.hw).max
 
@@ -284,9 +283,8 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa
       var isMessageInAllReplicas = true
       while (isMessageInAllReplicas) {
         var messageInfoFromFirstReplicaOpt: Option[MessageInfo] = None
-        for ((replicaId, logEntries) <- logEntriesMap) {
+        for ((replicaId, logEntriesIterator) <- logEntryIteratorMap) {
           try {
-            val logEntriesIterator = logEntries.iterator
             if (logEntriesIterator.hasNext) {
               val logEntry = logEntriesIterator.next()
 
@@ -378,9 +376,8 @@ private class ReplicaFetcher(name: String, sourceBroker: BrokerEndPoint, topicAn
     }
 
     if (response != null) {
-      response.data.foreach {
-        case(topicAndPartition, partitionData) =>
-          replicaBuffer.addFetchedData(topicAndPartition, sourceBroker.id, partitionData)
+      response.data.foreach { case (topicAndPartition, partitionData) =>
+        replicaBuffer.addFetchedData(topicAndPartition, sourceBroker.id, partitionData)
       }
     } else {
       for (topicAndPartition <- topicAndPartitions)
@@ -397,7 +394,7 @@ private class ReplicaFetcher(name: String, sourceBroker: BrokerEndPoint, topicAn
     // one of the fetchers will do the verification
     if (doVerification) {
       debug("Do verification")
-      replicaBuffer.verifyCheckSum()
+      replicaBuffer.verifyCheckSum(println)
       replicaBuffer.createNewFetcherBarrier()
       replicaBuffer.createNewVerificationBarrier()
       debug("Created new barrier")

http://git-wip-us.apache.org/repos/asf/kafka/blob/3930dd7e/core/src/test/scala/kafka/tools/ReplicaVerificationToolTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/kafka/tools/ReplicaVerificationToolTest.scala b/core/src/test/scala/kafka/tools/ReplicaVerificationToolTest.scala
new file mode 100644
index 0000000..ffa3474
--- /dev/null
+++ b/core/src/test/scala/kafka/tools/ReplicaVerificationToolTest.scala
@@ -0,0 +1,60 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.tools
+
+import kafka.api.FetchResponsePartitionData
+import kafka.common.TopicAndPartition
+import kafka.message.ByteBufferMessageSet
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.record.{MemoryRecords, Record}
+import org.junit.Test
+import org.junit.Assert.assertTrue
+
+class ReplicaVerificationToolTest {
+
+  @Test
+  def testReplicaBufferVerifyChecksum(): Unit = {
+    val sb = new StringBuilder
+
+    val expectedReplicasPerTopicAndPartition = Map(
+      TopicAndPartition("a", 0) -> 3,
+      TopicAndPartition("a", 1) -> 3,
+      TopicAndPartition("b", 0) -> 2
+    )
+
+    val replicaBuffer = new ReplicaBuffer(expectedReplicasPerTopicAndPartition, Map.empty, 2, Map.empty, 0, 0)
+    expectedReplicasPerTopicAndPartition.foreach { case (tp, numReplicas) =>
+      (0 until numReplicas).foreach { replicaId =>
+        val records = (0 to 5).map { index =>
+          Record.create(s"key $index".getBytes, s"value $index".getBytes)
+        }
+        val initialOffset = 4
+        val memoryRecords = MemoryRecords.withRecords(initialOffset, records: _*)
+        replicaBuffer.addFetchedData(tp, replicaId, new FetchResponsePartitionData(Errors.NONE.code(), hw = 20,
+          new ByteBufferMessageSet(memoryRecords.buffer)))
+      }
+    }
+
+    replicaBuffer.verifyCheckSum(line => sb.append(s"$line\n"))
+    val output = sb.toString.trim
+
+    assertTrue(s"Max lag information should be in output: `$output`",
+      output.endsWith(": max lag is 10 for partition [a,0] at offset 10 among 3 partitions"))
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/3930dd7e/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
index 51be54c..4dfe4b5 100644
--- a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
@@ -17,12 +17,9 @@
 
 package kafka.server
 
-import java.nio.ByteBuffer
-
 import kafka.utils.TestUtils
-import org.apache.kafka.test.{TestUtils => JTestUtils}
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.protocol.{ApiKeys, Errors, ProtoUtils}
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record.{CompressionType, MemoryRecords, Record}
 import org.apache.kafka.common.requests.{ProduceRequest, ProduceResponse}
 import org.junit.Assert._
@@ -40,9 +37,9 @@ class ProduceRequestTest extends BaseRequestTest {
   def testSimpleProduceRequest() {
     val (partition, leader) = createTopicAndFindPartitionWithLeader("topic")
 
-    def sendAndCheck(recordBuffer: ByteBuffer, expectedOffset: Long): ProduceResponse.PartitionResponse = {
+    def sendAndCheck(memoryRecords: MemoryRecords, expectedOffset: Long): ProduceResponse.PartitionResponse = {
       val topicPartition = new TopicPartition("topic", partition)
-      val partitionRecords = Map(topicPartition -> MemoryRecords.readableRecords(recordBuffer))
+      val partitionRecords = Map(topicPartition -> memoryRecords)
       val produceResponse = sendProduceRequest(leader, new ProduceRequest(-1, 3000, partitionRecords.asJava))
       assertEquals(1, produceResponse.responses.size)
       val (tp, partitionResponse) = produceResponse.responses.asScala.head
@@ -53,10 +50,10 @@ class ProduceRequestTest extends BaseRequestTest {
       partitionResponse
     }
 
-    sendAndCheck(JTestUtils.partitionRecordsBuffer(0, CompressionType.NONE,
+    sendAndCheck(MemoryRecords.withRecords(
       Record.create(System.currentTimeMillis(), "key".getBytes, "value".getBytes)), 0)
 
-    sendAndCheck(JTestUtils.partitionRecordsBuffer(0, CompressionType.GZIP,
+    sendAndCheck(MemoryRecords.withRecords(CompressionType.GZIP,
       Record.create(System.currentTimeMillis(), "key1".getBytes, "value1".getBytes),
       Record.create(System.currentTimeMillis(), "key2".getBytes, "value2".getBytes)), 1)
   }
@@ -73,12 +70,12 @@ class ProduceRequestTest extends BaseRequestTest {
   def testCorruptLz4ProduceRequest() {
     val (partition, leader) = createTopicAndFindPartitionWithLeader("topic")
     val timestamp = 1000000
-    val recordBuffer = JTestUtils.partitionRecordsBuffer(0, CompressionType.LZ4,
+    val memoryRecords = MemoryRecords.withRecords(CompressionType.LZ4,
       Record.create(timestamp, "key".getBytes, "value".getBytes))
     // Change the lz4 checksum value so that it doesn't match the contents
-    recordBuffer.array.update(40, 0)
+    memoryRecords.buffer.array.update(40, 0)
     val topicPartition = new TopicPartition("topic", partition)
-    val partitionRecords = Map(topicPartition -> MemoryRecords.readableRecords(recordBuffer))
+    val partitionRecords = Map(topicPartition -> memoryRecords)
     val produceResponse = sendProduceRequest(leader, new ProduceRequest(-1, 3000, partitionRecords.asJava))
     assertEquals(1, produceResponse.responses.size)
     val (tp, partitionResponse) = produceResponse.responses.asScala.head

http://git-wip-us.apache.org/repos/asf/kafka/blob/3930dd7e/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 37809bf..5804407 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -271,7 +271,7 @@ public class KafkaStreamsTest {
 
         @Override
         public void onChange(final KafkaStreams.State newState, final KafkaStreams.State oldState) {
-            Long prevCount = this.mapStates.containsKey(newState) ? this.mapStates.get(newState) : 0;
+            long prevCount = this.mapStates.containsKey(newState) ? this.mapStates.get(newState) : 0;
             this.numChanges++;
             this.oldState = oldState;
             this.newState = newState;