You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2016/12/20 19:17:46 UTC
kafka git commit: KAFKA-4554;
Fix ReplicaBuffer.verifyChecksum to use iterators instead of iterables
Repository: kafka
Updated Branches:
refs/heads/trunk 7f4b278c0 -> 3930dd7e7
KAFKA-4554; Fix ReplicaBuffer.verifyChecksum to use iterators instead of iterables
This was changed in b58b6a1bef0 and caused the `ReplicaVerificationToolTest.test_replica_lags`
system test to start failing.
I also added a unit test and a couple of other minor clean-ups.
Author: Ismael Juma <is...@juma.me.uk>
Reviewers: Jason Gustafson <ja...@confluent.io>
Closes #2280 from ijuma/kafka-4554-fix-replica-buffer-verify-checksum
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3930dd7e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3930dd7e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3930dd7e
Branch: refs/heads/trunk
Commit: 3930dd7e7502ab94a7594a0ca41f27638663ae7c
Parents: 7f4b278
Author: Ismael Juma <is...@juma.me.uk>
Authored: Tue Dec 20 11:01:46 2016 -0800
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Tue Dec 20 11:01:46 2016 -0800
----------------------------------------------------------------------
.../java/org/apache/kafka/test/TestUtils.java | 24 --------
.../kafka/tools/ReplicaVerificationTool.scala | 21 +++----
.../tools/ReplicaVerificationToolTest.scala | 60 ++++++++++++++++++++
.../unit/kafka/server/ProduceRequestTest.scala | 19 +++----
.../apache/kafka/streams/KafkaStreamsTest.java | 2 +-
5 files changed, 78 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/3930dd7e/clients/src/test/java/org/apache/kafka/test/TestUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
index 428b5a0..c39f402 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
@@ -21,12 +21,6 @@ import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.record.CompressionType;
-import org.apache.kafka.common.record.MemoryRecords;
-import org.apache.kafka.common.record.MemoryRecordsBuilder;
-import org.apache.kafka.common.record.Record;
-import org.apache.kafka.common.record.Records;
-import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.Utils;
import javax.xml.bind.DatatypeConverter;
@@ -180,24 +174,6 @@ public class TestUtils {
return file;
}
- /**
- * Create a records buffer including the offset and message size at the start, which is required if the buffer is to
- * be sent as part of `ProduceRequest`. This is the reason why we can't use
- * `Record(long timestamp, byte[] key, byte[] value, CompressionType type, int valueOffset, int valueSize)` as this
- * constructor does not include either of these fields.
- */
- public static ByteBuffer partitionRecordsBuffer(final long offset, final CompressionType compressionType, final Record... records) {
- int bufferSize = 0;
- for (final Record record : records)
- bufferSize += Records.LOG_OVERHEAD + record.sizeInBytes();
- final ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
- MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, compressionType, TimestampType.CREATE_TIME);
- long nextOffset = offset;
- for (final Record record : records)
- builder.append(nextOffset++, record);
- return builder.build().buffer();
- }
-
public static Properties producerConfig(final String bootstrapServers,
final Class keySerializer,
final Class valueSerializer,
http://git-wip-us.apache.org/repos/asf/kafka/blob/3930dd7e/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
index 98e0414..f6d5153 100644
--- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
+++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
@@ -252,9 +252,8 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa
val offsetRequest = OffsetRequest(initialOffsetMap)
val offsetResponse = consumer.getOffsetsBefore(offsetRequest)
assert(!offsetResponse.hasError, offsetResponseStringWithError(offsetResponse))
- offsetResponse.partitionErrorAndOffsets.foreach{
- case (topicAndPartition, partitionOffsetResponse) =>
- fetchOffsetMap.put(topicAndPartition, partitionOffsetResponse.offsets.head)
+ offsetResponse.partitionErrorAndOffsets.foreach { case (topicAndPartition, partitionOffsetResponse) =>
+ fetchOffsetMap.put(topicAndPartition, partitionOffsetResponse.offsets.head)
}
}
}
@@ -267,7 +266,7 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa
fetchOffsetMap.get(topicAndPartition)
}
- def verifyCheckSum() {
+ def verifyCheckSum(println: String => Unit) {
debug("Begin verification")
maxLag = -1L
for ((topicAndPartition, fetchResponsePerReplica) <- messageSetCache) {
@@ -275,8 +274,8 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa
assert(fetchResponsePerReplica.size == expectedReplicasPerTopicAndPartition(topicAndPartition),
"fetched " + fetchResponsePerReplica.size + " replicas for " + topicAndPartition + ", but expected "
+ expectedReplicasPerTopicAndPartition(topicAndPartition) + " replicas")
- val logEntriesMap = fetchResponsePerReplica.map { case (replicaId, fetchResponse) =>
- replicaId -> fetchResponse.messages.asInstanceOf[ByteBufferMessageSet].asRecords.shallowEntries.asScala
+ val logEntryIteratorMap = fetchResponsePerReplica.map { case (replicaId, fetchResponse) =>
+ replicaId -> fetchResponse.messages.asInstanceOf[ByteBufferMessageSet].asRecords.shallowEntries.iterator
}
val maxHw = fetchResponsePerReplica.values.map(_.hw).max
@@ -284,9 +283,8 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa
var isMessageInAllReplicas = true
while (isMessageInAllReplicas) {
var messageInfoFromFirstReplicaOpt: Option[MessageInfo] = None
- for ((replicaId, logEntries) <- logEntriesMap) {
+ for ((replicaId, logEntriesIterator) <- logEntryIteratorMap) {
try {
- val logEntriesIterator = logEntries.iterator
if (logEntriesIterator.hasNext) {
val logEntry = logEntriesIterator.next()
@@ -378,9 +376,8 @@ private class ReplicaFetcher(name: String, sourceBroker: BrokerEndPoint, topicAn
}
if (response != null) {
- response.data.foreach {
- case(topicAndPartition, partitionData) =>
- replicaBuffer.addFetchedData(topicAndPartition, sourceBroker.id, partitionData)
+ response.data.foreach { case (topicAndPartition, partitionData) =>
+ replicaBuffer.addFetchedData(topicAndPartition, sourceBroker.id, partitionData)
}
} else {
for (topicAndPartition <- topicAndPartitions)
@@ -397,7 +394,7 @@ private class ReplicaFetcher(name: String, sourceBroker: BrokerEndPoint, topicAn
// one of the fetchers will do the verification
if (doVerification) {
debug("Do verification")
- replicaBuffer.verifyCheckSum()
+ replicaBuffer.verifyCheckSum(println)
replicaBuffer.createNewFetcherBarrier()
replicaBuffer.createNewVerificationBarrier()
debug("Created new barrier")
http://git-wip-us.apache.org/repos/asf/kafka/blob/3930dd7e/core/src/test/scala/kafka/tools/ReplicaVerificationToolTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/kafka/tools/ReplicaVerificationToolTest.scala b/core/src/test/scala/kafka/tools/ReplicaVerificationToolTest.scala
new file mode 100644
index 0000000..ffa3474
--- /dev/null
+++ b/core/src/test/scala/kafka/tools/ReplicaVerificationToolTest.scala
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.tools
+
+import kafka.api.FetchResponsePartitionData
+import kafka.common.TopicAndPartition
+import kafka.message.ByteBufferMessageSet
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.record.{MemoryRecords, Record}
+import org.junit.Test
+import org.junit.Assert.assertTrue
+
+class ReplicaVerificationToolTest {
+
+ @Test
+ def testReplicaBufferVerifyChecksum(): Unit = {
+ val sb = new StringBuilder
+
+ val expectedReplicasPerTopicAndPartition = Map(
+ TopicAndPartition("a", 0) -> 3,
+ TopicAndPartition("a", 1) -> 3,
+ TopicAndPartition("b", 0) -> 2
+ )
+
+ val replicaBuffer = new ReplicaBuffer(expectedReplicasPerTopicAndPartition, Map.empty, 2, Map.empty, 0, 0)
+ expectedReplicasPerTopicAndPartition.foreach { case (tp, numReplicas) =>
+ (0 until numReplicas).foreach { replicaId =>
+ val records = (0 to 5).map { index =>
+ Record.create(s"key $index".getBytes, s"value $index".getBytes)
+ }
+ val initialOffset = 4
+ val memoryRecords = MemoryRecords.withRecords(initialOffset, records: _*)
+ replicaBuffer.addFetchedData(tp, replicaId, new FetchResponsePartitionData(Errors.NONE.code(), hw = 20,
+ new ByteBufferMessageSet(memoryRecords.buffer)))
+ }
+ }
+
+ replicaBuffer.verifyCheckSum(line => sb.append(s"$line\n"))
+ val output = sb.toString.trim
+
+ assertTrue(s"Max lag information should be in output: `$output`",
+ output.endsWith(": max lag is 10 for partition [a,0] at offset 10 among 3 partitions"))
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3930dd7e/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
index 51be54c..4dfe4b5 100644
--- a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
@@ -17,12 +17,9 @@
package kafka.server
-import java.nio.ByteBuffer
-
import kafka.utils.TestUtils
-import org.apache.kafka.test.{TestUtils => JTestUtils}
import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.protocol.{ApiKeys, Errors, ProtoUtils}
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.{CompressionType, MemoryRecords, Record}
import org.apache.kafka.common.requests.{ProduceRequest, ProduceResponse}
import org.junit.Assert._
@@ -40,9 +37,9 @@ class ProduceRequestTest extends BaseRequestTest {
def testSimpleProduceRequest() {
val (partition, leader) = createTopicAndFindPartitionWithLeader("topic")
- def sendAndCheck(recordBuffer: ByteBuffer, expectedOffset: Long): ProduceResponse.PartitionResponse = {
+ def sendAndCheck(memoryRecords: MemoryRecords, expectedOffset: Long): ProduceResponse.PartitionResponse = {
val topicPartition = new TopicPartition("topic", partition)
- val partitionRecords = Map(topicPartition -> MemoryRecords.readableRecords(recordBuffer))
+ val partitionRecords = Map(topicPartition -> memoryRecords)
val produceResponse = sendProduceRequest(leader, new ProduceRequest(-1, 3000, partitionRecords.asJava))
assertEquals(1, produceResponse.responses.size)
val (tp, partitionResponse) = produceResponse.responses.asScala.head
@@ -53,10 +50,10 @@ class ProduceRequestTest extends BaseRequestTest {
partitionResponse
}
- sendAndCheck(JTestUtils.partitionRecordsBuffer(0, CompressionType.NONE,
+ sendAndCheck(MemoryRecords.withRecords(
Record.create(System.currentTimeMillis(), "key".getBytes, "value".getBytes)), 0)
- sendAndCheck(JTestUtils.partitionRecordsBuffer(0, CompressionType.GZIP,
+ sendAndCheck(MemoryRecords.withRecords(CompressionType.GZIP,
Record.create(System.currentTimeMillis(), "key1".getBytes, "value1".getBytes),
Record.create(System.currentTimeMillis(), "key2".getBytes, "value2".getBytes)), 1)
}
@@ -73,12 +70,12 @@ class ProduceRequestTest extends BaseRequestTest {
def testCorruptLz4ProduceRequest() {
val (partition, leader) = createTopicAndFindPartitionWithLeader("topic")
val timestamp = 1000000
- val recordBuffer = JTestUtils.partitionRecordsBuffer(0, CompressionType.LZ4,
+ val memoryRecords = MemoryRecords.withRecords(CompressionType.LZ4,
Record.create(timestamp, "key".getBytes, "value".getBytes))
// Change the lz4 checksum value so that it doesn't match the contents
- recordBuffer.array.update(40, 0)
+ memoryRecords.buffer.array.update(40, 0)
val topicPartition = new TopicPartition("topic", partition)
- val partitionRecords = Map(topicPartition -> MemoryRecords.readableRecords(recordBuffer))
+ val partitionRecords = Map(topicPartition -> memoryRecords)
val produceResponse = sendProduceRequest(leader, new ProduceRequest(-1, 3000, partitionRecords.asJava))
assertEquals(1, produceResponse.responses.size)
val (tp, partitionResponse) = produceResponse.responses.asScala.head
http://git-wip-us.apache.org/repos/asf/kafka/blob/3930dd7e/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 37809bf..5804407 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -271,7 +271,7 @@ public class KafkaStreamsTest {
@Override
public void onChange(final KafkaStreams.State newState, final KafkaStreams.State oldState) {
- Long prevCount = this.mapStates.containsKey(newState) ? this.mapStates.get(newState) : 0;
+ long prevCount = this.mapStates.containsKey(newState) ? this.mapStates.get(newState) : 0;
this.numChanges++;
this.oldState = oldState;
this.newState = newState;