You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/03/11 00:18:24 UTC
kafka git commit: KAFKA-1910;
follow-up on fixing buffer.flip on produce requests
Repository: kafka
Updated Branches:
refs/heads/trunk 0b92cec1e -> 01d2a2523
KAFKA-1910; follow-up on fixing buffer.flip on produce requests
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/01d2a252
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/01d2a252
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/01d2a252
Branch: refs/heads/trunk
Commit: 01d2a25235841feb01eac55f6ea5141750225a21
Parents: 0b92cec
Author: Guozhang Wang <wa...@gmail.com>
Authored: Tue Mar 10 16:18:00 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Mar 10 16:18:00 2015 -0700
----------------------------------------------------------------------
.../clients/producer/internals/Sender.java | 3 +--
.../kafka/common/record/MemoryRecords.java | 20 ++++++++++----------
.../clients/consumer/internals/FetcherTest.java | 2 +-
.../internals/RecordAccumulatorTest.java | 6 +++---
.../kafka/common/record/MemoryRecordsTest.java | 4 ++--
.../kafka/api/ProducerFailureHandlingTest.scala | 7 ++++---
6 files changed, 21 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/01d2a252/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 03df9ea..70954ca 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -298,8 +298,7 @@ public class Sender implements Runnable {
final Map<TopicPartition, RecordBatch> recordsByPartition = new HashMap<TopicPartition, RecordBatch>(batches.size());
for (RecordBatch batch : batches) {
TopicPartition tp = batch.topicPartition;
- batch.records.rewind();
- produceRecordsByPartition.put(tp, batch.records.buffer());
+ produceRecordsByPartition.put(tp, (ByteBuffer) batch.records.buffer().flip());
recordsByPartition.put(tp, batch);
}
ProduceRequest request = new ProduceRequest(acks, timeout, produceRecordsByPartition);
http://git-wip-us.apache.org/repos/asf/kafka/blob/01d2a252/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index a412f61..d20a2e4 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -120,16 +120,6 @@ public class MemoryRecords implements Records {
buffer = compressor.buffer();
}
- /**
- * Rewind the writable records to read mode
- */
- public void rewind() {
- if (writable)
- throw new IllegalStateException("The memory records need to be closed for write before rewinding for read");
-
- buffer.flip();
- }
-
/** Write the records in this set to the given channel */
public int writeTo(GatheringByteChannel channel) throws IOException {
return channel.write(buffer);
@@ -166,6 +156,16 @@ public class MemoryRecords implements Records {
return buffer.duplicate();
}
+ /**
+ * Return a flipped duplicate of the closed buffer to reading records
+ */
+ public ByteBuffer flip() {
+ if (writable)
+ throw new IllegalStateException("The memory records need to be closed for write before rewinding for read");
+
+ return (ByteBuffer) buffer.duplicate().flip();
+ }
+
@Override
public Iterator<LogEntry> iterator() {
ByteBuffer copy = this.buffer.duplicate();
http://git-wip-us.apache.org/repos/asf/kafka/blob/01d2a252/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index de03ff1..4195410 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -90,7 +90,7 @@ public class FetcherTest {
records.append(2L, "key".getBytes(), "value-2".getBytes());
records.append(3L, "key".getBytes(), "value-3".getBytes());
records.close();
- records.rewind();
+ records.flip();
}
@Test
http://git-wip-us.apache.org/repos/asf/kafka/blob/01d2a252/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index d34d27e..7b3fb1d 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -76,7 +76,7 @@ public class RecordAccumulatorTest {
List<RecordBatch> batches = accum.drain(cluster, Collections.singleton(node1), Integer.MAX_VALUE, 0).get(node1.id());
assertEquals(1, batches.size());
RecordBatch batch = batches.get(0);
- batch.records.rewind();
+ batch.records.flip();
Iterator<LogEntry> iter = batch.records.iterator();
for (int i = 0; i < appends; i++) {
LogEntry entry = iter.next();
@@ -105,7 +105,7 @@ public class RecordAccumulatorTest {
List<RecordBatch> batches = accum.drain(cluster, Collections.singleton(node1), Integer.MAX_VALUE, 0).get(node1.id());
assertEquals(1, batches.size());
RecordBatch batch = batches.get(0);
- batch.records.rewind();
+ batch.records.flip();
Iterator<LogEntry> iter = batch.records.iterator();
LogEntry entry = iter.next();
assertEquals("Keys should match", ByteBuffer.wrap(key), entry.record().key());
@@ -158,7 +158,7 @@ public class RecordAccumulatorTest {
List<RecordBatch> batches = accum.drain(cluster, nodes, 5 * 1024, 0).get(node1.id());
if (batches != null) {
for (RecordBatch batch : batches) {
- batch.records.rewind();
+ batch.records.flip();
for (LogEntry entry : batch.records)
read++;
accum.deallocate(batch);
http://git-wip-us.apache.org/repos/asf/kafka/blob/01d2a252/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
index 42f8f5e..8ec610a 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
@@ -50,9 +50,9 @@ public class MemoryRecordsTest {
recs2.append(i, toArray(r.key()), toArray(r.value()));
}
recs1.close();
- recs1.rewind();
+ recs1.flip();
recs2.close();
- recs2.rewind();
+ recs2.flip();
for (int iteration = 0; iteration < 2; iteration++) {
for (MemoryRecords recs : Arrays.asList(recs1, recs2)) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/01d2a252/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
index 84689e1..7eb6d05 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
@@ -24,16 +24,16 @@ import java.lang.Integer
import java.util.{Properties, Random}
import java.util.concurrent.{TimeoutException, TimeUnit, ExecutionException}
-import kafka.api.FetchRequestBuilder
import kafka.common.Topic
import kafka.consumer.SimpleConsumer
-import kafka.server.KafkaConfig
import kafka.integration.KafkaServerTestHarness
+import kafka.server.KafkaConfig
import kafka.utils.{TestZKUtils, ShutdownableThread, TestUtils}
import org.apache.kafka.common.KafkaException
import org.apache.kafka.common.errors.{InvalidTopicException, NotEnoughReplicasException, NotEnoughReplicasAfterAppendException}
import org.apache.kafka.clients.producer._
+import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
class ProducerFailureHandlingTest extends KafkaServerTestHarness {
private val producerBufferSize = 30000
@@ -371,7 +371,8 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
override def doWork(): Unit = {
val responses =
for (i <- sent+1 to sent+numRecords)
- yield producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, null, i.toString.getBytes))
+ yield producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, null, i.toString.getBytes),
+ new ErrorLoggingCallback(topic1, null, null, true))
val futures = responses.toList
try {