You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/03/11 00:18:24 UTC

kafka git commit: KAFKA-1910; follow-up on fixing buffer.flip on produce requests

Repository: kafka
Updated Branches:
  refs/heads/trunk 0b92cec1e -> 01d2a2523


KAFKA-1910; follow-up on fixing buffer.flip on produce requests


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/01d2a252
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/01d2a252
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/01d2a252

Branch: refs/heads/trunk
Commit: 01d2a25235841feb01eac55f6ea5141750225a21
Parents: 0b92cec
Author: Guozhang Wang <wa...@gmail.com>
Authored: Tue Mar 10 16:18:00 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Mar 10 16:18:00 2015 -0700

----------------------------------------------------------------------
 .../clients/producer/internals/Sender.java      |  3 +--
 .../kafka/common/record/MemoryRecords.java      | 20 ++++++++++----------
 .../clients/consumer/internals/FetcherTest.java |  2 +-
 .../internals/RecordAccumulatorTest.java        |  6 +++---
 .../kafka/common/record/MemoryRecordsTest.java  |  4 ++--
 .../kafka/api/ProducerFailureHandlingTest.scala |  7 ++++---
 6 files changed, 21 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/01d2a252/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 03df9ea..70954ca 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -298,8 +298,7 @@ public class Sender implements Runnable {
         final Map<TopicPartition, RecordBatch> recordsByPartition = new HashMap<TopicPartition, RecordBatch>(batches.size());
         for (RecordBatch batch : batches) {
             TopicPartition tp = batch.topicPartition;
-            batch.records.rewind();
-            produceRecordsByPartition.put(tp, batch.records.buffer());
+            produceRecordsByPartition.put(tp, (ByteBuffer) batch.records.buffer().flip());
             recordsByPartition.put(tp, batch);
         }
         ProduceRequest request = new ProduceRequest(acks, timeout, produceRecordsByPartition);

http://git-wip-us.apache.org/repos/asf/kafka/blob/01d2a252/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index a412f61..d20a2e4 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -120,16 +120,6 @@ public class MemoryRecords implements Records {
         buffer = compressor.buffer();
     }
 
-    /**
-     * Rewind the writable records to read mode
-     */
-    public void rewind() {
-        if (writable)
-            throw new IllegalStateException("The memory records need to be closed for write before rewinding for read");
-
-        buffer.flip();
-    }
-
     /** Write the records in this set to the given channel */
     public int writeTo(GatheringByteChannel channel) throws IOException {
         return channel.write(buffer);
@@ -166,6 +156,16 @@ public class MemoryRecords implements Records {
         return buffer.duplicate();
     }
 
+    /**
+     * Return a flipped duplicate of the closed buffer to reading records
+     */
+    public ByteBuffer flip() {
+        if (writable)
+            throw new IllegalStateException("The memory records need to be closed for write before rewinding for read");
+
+        return (ByteBuffer) buffer.duplicate().flip();
+    }
+
     @Override
     public Iterator<LogEntry> iterator() {
         ByteBuffer copy = this.buffer.duplicate();

http://git-wip-us.apache.org/repos/asf/kafka/blob/01d2a252/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index de03ff1..4195410 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -90,7 +90,7 @@ public class FetcherTest {
         records.append(2L, "key".getBytes(), "value-2".getBytes());
         records.append(3L, "key".getBytes(), "value-3".getBytes());
         records.close();
-        records.rewind();
+        records.flip();
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/01d2a252/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index d34d27e..7b3fb1d 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -76,7 +76,7 @@ public class RecordAccumulatorTest {
         List<RecordBatch> batches = accum.drain(cluster, Collections.singleton(node1), Integer.MAX_VALUE, 0).get(node1.id());
         assertEquals(1, batches.size());
         RecordBatch batch = batches.get(0);
-        batch.records.rewind();
+        batch.records.flip();
         Iterator<LogEntry> iter = batch.records.iterator();
         for (int i = 0; i < appends; i++) {
             LogEntry entry = iter.next();
@@ -105,7 +105,7 @@ public class RecordAccumulatorTest {
         List<RecordBatch> batches = accum.drain(cluster, Collections.singleton(node1), Integer.MAX_VALUE, 0).get(node1.id());
         assertEquals(1, batches.size());
         RecordBatch batch = batches.get(0);
-        batch.records.rewind();
+        batch.records.flip();
         Iterator<LogEntry> iter = batch.records.iterator();
         LogEntry entry = iter.next();
         assertEquals("Keys should match", ByteBuffer.wrap(key), entry.record().key());
@@ -158,7 +158,7 @@ public class RecordAccumulatorTest {
             List<RecordBatch> batches = accum.drain(cluster, nodes, 5 * 1024, 0).get(node1.id());
             if (batches != null) {
                 for (RecordBatch batch : batches) {
-                    batch.records.rewind();
+                    batch.records.flip();
                     for (LogEntry entry : batch.records)
                         read++;
                     accum.deallocate(batch);

http://git-wip-us.apache.org/repos/asf/kafka/blob/01d2a252/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
index 42f8f5e..8ec610a 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
@@ -50,9 +50,9 @@ public class MemoryRecordsTest {
             recs2.append(i, toArray(r.key()), toArray(r.value()));
         }
         recs1.close();
-        recs1.rewind();
+        recs1.flip();
         recs2.close();
-        recs2.rewind();
+        recs2.flip();
 
         for (int iteration = 0; iteration < 2; iteration++) {
             for (MemoryRecords recs : Arrays.asList(recs1, recs2)) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/01d2a252/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
index 84689e1..7eb6d05 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
@@ -24,16 +24,16 @@ import java.lang.Integer
 import java.util.{Properties, Random}
 import java.util.concurrent.{TimeoutException, TimeUnit, ExecutionException}
 
-import kafka.api.FetchRequestBuilder
 import kafka.common.Topic
 import kafka.consumer.SimpleConsumer
-import kafka.server.KafkaConfig
 import kafka.integration.KafkaServerTestHarness
+import kafka.server.KafkaConfig
 import kafka.utils.{TestZKUtils, ShutdownableThread, TestUtils}
 
 import org.apache.kafka.common.KafkaException
 import org.apache.kafka.common.errors.{InvalidTopicException, NotEnoughReplicasException, NotEnoughReplicasAfterAppendException}
 import org.apache.kafka.clients.producer._
+import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
 
 class ProducerFailureHandlingTest extends KafkaServerTestHarness {
   private val producerBufferSize = 30000
@@ -371,7 +371,8 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
     override def doWork(): Unit = {
       val responses =
         for (i <- sent+1 to sent+numRecords)
-        yield producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, null, i.toString.getBytes))
+        yield producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, null, i.toString.getBytes),
+                            new ErrorLoggingCallback(topic1, null, null, true))
       val futures = responses.toList
 
       try {