You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jq...@apache.org on 2017/05/22 00:31:36 UTC

kafka git commit: KAFKA-3995; KIP-126 Allow KafkaProducer to split and resend oversized batches

Repository: kafka
Updated Branches:
  refs/heads/trunk 1c7fdd284 -> 7fad45557


KAFKA-3995; KIP-126 Allow KafkaProducer to split and resend oversized batches

Author: Jiangjie Qin <be...@gmail.com>

Reviewers: Joel Koshy <jj...@gmail.com>, Ismael Juma <is...@juma.me.uk>

Closes #2638 from becketqin/KAFKA-3995


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7fad4555
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7fad4555
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7fad4555

Branch: refs/heads/trunk
Commit: 7fad45557e4cb7b345f34cec32f910b437c59bc2
Parents: 1c7fdd2
Author: Jiangjie Qin <be...@gmail.com>
Authored: Sun May 21 17:31:31 2017 -0700
Committer: Jiangjie Qin <be...@gmail.com>
Committed: Sun May 21 17:31:31 2017 -0700

----------------------------------------------------------------------
 checkstyle/checkstyle.xml                       |   2 +-
 .../internals/FutureRecordMetadata.java         |  34 +++-
 .../producer/internals/ProducerBatch.java       | 142 ++++++++++++-
 .../producer/internals/RecordAccumulator.java   |  37 +++-
 .../clients/producer/internals/Sender.java      |  29 ++-
 .../kafka/common/record/AbstractRecords.java    |   4 +
 .../record/CompressionRatioEstimator.java       | 111 ++++++++++
 .../kafka/common/record/CompressionType.java    |   6 +-
 .../kafka/common/record/DefaultRecord.java      |   8 +-
 .../kafka/common/record/DefaultRecordBatch.java |   8 +
 .../common/record/MemoryRecordsBuilder.java     |  78 ++++---
 .../internals/RecordAccumulatorTest.java        | 203 ++++++++++++++++++-
 .../clients/producer/internals/SenderTest.java  |  90 ++++++++
 .../common/record/MemoryRecordsBuilderTest.java |   9 +-
 14 files changed, 701 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/7fad4555/checkstyle/checkstyle.xml
----------------------------------------------------------------------
diff --git a/checkstyle/checkstyle.xml b/checkstyle/checkstyle.xml
index 9f9e9ae..cf57a50 100644
--- a/checkstyle/checkstyle.xml
+++ b/checkstyle/checkstyle.xml
@@ -105,7 +105,7 @@
     </module>
     <module name="ClassDataAbstractionCoupling">
       <!-- default is 7 -->
-      <property name="max" value="15"/>
+      <property name="max" value="17"/>
     </module>
     <module name="BooleanExpressionComplexity">
       <!-- default is 3 -->

http://git-wip-us.apache.org/repos/asf/kafka/blob/7fad4555/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
index f8b38e8..1de965f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
@@ -34,6 +34,7 @@ public final class FutureRecordMetadata implements Future<RecordMetadata> {
     private final long checksum;
     private final int serializedKeySize;
     private final int serializedValueSize;
+    private volatile FutureRecordMetadata nextRecordMetadata = null;
 
     public FutureRecordMetadata(ProduceRequestResult result, long relativeOffset, long createTimestamp,
                                 long checksum, int serializedKeySize, int serializedValueSize) {
@@ -58,25 +59,54 @@ public final class FutureRecordMetadata implements Future<RecordMetadata> {
     @Override
     public RecordMetadata get() throws InterruptedException, ExecutionException {
         this.result.await();
+        if (nextRecordMetadata != null)
+            return nextRecordMetadata.get();
         return valueOrError();
     }
 
     @Override
     public RecordMetadata get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+        // Handle overflow.
+        long now = System.currentTimeMillis();
+        long deadline = Long.MAX_VALUE - timeout < now ? Long.MAX_VALUE : now + timeout;
         boolean occurred = this.result.await(timeout, unit);
+        if (nextRecordMetadata != null)
+            return nextRecordMetadata.get(deadline - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
         if (!occurred)
             throw new TimeoutException("Timeout after waiting for " + TimeUnit.MILLISECONDS.convert(timeout, unit) + " ms.");
         return valueOrError();
     }
 
+    /**
+     * This method is used when we have to split a large batch in smaller ones. A chained metadata will allow the
+     * future that has already returned to the users to wait on the newly created split batches even after the
+     * old big batch has been deemed as done.
+     */
+    void chain(FutureRecordMetadata futureRecordMetadata) {
+        if (nextRecordMetadata == null)
+            nextRecordMetadata = futureRecordMetadata;
+        else
+            nextRecordMetadata.chain(futureRecordMetadata);
+    }
+
     RecordMetadata valueOrError() throws ExecutionException {
         if (this.result.error() != null)
             throw new ExecutionException(this.result.error());
         else
             return value();
     }
-    
+
+    long checksum() {
+        return this.checksum;
+    }
+
+    long relativeOffset() {
+        return this.relativeOffset;
+    }
+
     RecordMetadata value() {
+        if (nextRecordMetadata != null)
+            return nextRecordMetadata.value();
         return new RecordMetadata(result.topicPartition(), this.result.baseOffset(), this.relativeOffset,
                                   timestamp(), this.checksum, this.serializedKeySize, this.serializedValueSize);
     }
@@ -87,6 +117,8 @@ public final class FutureRecordMetadata implements Future<RecordMetadata> {
 
     @Override
     public boolean isDone() {
+        if (nextRecordMetadata != null)
+            return nextRecordMetadata.isDone();
         return this.result.completed();
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/7fad4555/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
index 1c078c8..cdf85ce 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
@@ -16,15 +16,26 @@
  */
 package org.apache.kafka.clients.producer.internals;
 
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.Iterator;
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.RecordBatchTooLargeException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.record.AbstractRecords;
+import org.apache.kafka.common.record.CompressionRatioEstimator;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.MutableRecordBatch;
+import org.apache.kafka.common.record.Record;
 import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.ProduceResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,6 +45,9 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import static org.apache.kafka.common.record.RecordBatch.NO_TIMESTAMP;
+
+
 /**
  * A batch of records that is or will be sent.
  *
@@ -51,6 +65,7 @@ public final class ProducerBatch {
     private final MemoryRecordsBuilder recordsBuilder;
 
     private final AtomicInteger attempts = new AtomicInteger(0);
+    private final boolean isSplitBatch;
     int recordCount;
     int maxRecordSize;
     private long lastAttemptMs;
@@ -61,6 +76,10 @@ public final class ProducerBatch {
     private boolean retry;
 
     public ProducerBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, long now) {
+        this(tp, recordsBuilder, now, false);
+    }
+
+    public ProducerBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, long now, boolean isSplitBatch) {
         this.createdMs = now;
         this.lastAttemptMs = now;
         this.recordsBuilder = recordsBuilder;
@@ -69,6 +88,10 @@ public final class ProducerBatch {
         this.produceFuture = new ProduceRequestResult(topicPartition);
         this.completed = new AtomicBoolean();
         this.retry = false;
+        this.isSplitBatch = isSplitBatch;
+        float compressionRatioEstimation = CompressionRatioEstimator.estimation(topicPartition.topic(),
+                                                                                recordsBuilder.compressionType());
+        recordsBuilder.setEstimatedCompressionRatio(compressionRatioEstimation);
     }
 
     /**
@@ -87,14 +110,39 @@ public final class ProducerBatch {
                                                                    timestamp, checksum,
                                                                    key == null ? -1 : key.length,
                                                                    value == null ? -1 : value.length);
-            if (callback != null)
-                thunks.add(new Thunk(callback, future));
+            // we have to keep every future returned to the users in case the batch needs to be
+            // split to several new batches and resent.
+            thunks.add(new Thunk(callback, future));
             this.recordCount++;
             return future;
         }
     }
 
     /**
+     +     * This method is only used by {@link #split(int)} when splitting a large batch to smaller ones.
+     +     * @return true if the record has been successfully appended, false otherwise.
+     +     */
+    private boolean tryAppendForSplit(long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers, Thunk thunk) {
+        if (!recordsBuilder.hasRoomFor(timestamp, key, value)) {
+            return false;
+        } else {
+            // No need to get the CRC.
+            this.recordsBuilder.append(timestamp, key, value);
+            this.maxRecordSize = Math.max(this.maxRecordSize,
+                                          AbstractRecords.sizeInBytesUpperBound(magic(), key, value, headers));
+            FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
+                                                                   timestamp, thunk.future.checksum(),
+                                                                   key == null ? -1 : key.remaining(),
+                                                                   value == null ? -1 : value.remaining());
+            // Chain the future to the original thunk.
+            thunk.future.chain(future);
+            this.thunks.add(thunk);
+            this.recordCount++;
+            return true;
+        }
+    }
+
+    /**
      * Complete the request.
      *
      * @param baseOffset The base offset of the messages assigned by the server
@@ -116,9 +164,11 @@ public final class ProducerBatch {
             try {
                 if (exception == null) {
                     RecordMetadata metadata = thunk.future.value();
-                    thunk.callback.onCompletion(metadata, null);
+                    if (thunk.callback != null)
+                        thunk.callback.onCompletion(metadata, null);
                 } else {
-                    thunk.callback.onCompletion(null, exception);
+                    if (thunk.callback != null)
+                        thunk.callback.onCompletion(null, exception);
                 }
             } catch (Exception e) {
                 log.error("Error executing user-provided callback on message for topic-partition '{}'", topicPartition, e);
@@ -128,6 +178,71 @@ public final class ProducerBatch {
         produceFuture.done();
     }
 
+    public Deque<ProducerBatch> split(int splitBatchSize) {
+        Deque<ProducerBatch> batches = new ArrayDeque<>();
+        MemoryRecords memoryRecords = recordsBuilder.build();
+        Iterator<MutableRecordBatch> recordBatchIter = memoryRecords.batches().iterator();
+        if (!recordBatchIter.hasNext())
+            throw new IllegalStateException("Cannot split an empty producer batch.");
+        RecordBatch recordBatch = recordBatchIter.next();
+        if (recordBatchIter.hasNext())
+            throw new IllegalStateException("A producer batch should only have one record batch.");
+
+        Iterator<Thunk> thunkIter = thunks.iterator();
+        // We always allocate batch size because we are already splitting a big batch.
+        // And we also Retain the create time of the original batch.
+        ProducerBatch batch = null;
+        for (Record record : recordBatch) {
+            assert thunkIter.hasNext();
+            Thunk thunk = thunkIter.next();
+            if (batch == null) {
+                batch = createBatchOffAccumulatorForRecord(this.topicPartition, this.recordsBuilder.compressionType(),
+                                                           record, splitBatchSize, this.createdMs);
+            }
+
+            // A newly created batch can always host the first message.
+            if (!batch.tryAppendForSplit(record.timestamp(), record.key(), record.value(), record.headers(), thunk)) {
+                batches.add(batch);
+                batch = createBatchOffAccumulatorForRecord(this.topicPartition, this.recordsBuilder.compressionType(),
+                                                           record, splitBatchSize, this.createdMs);
+                batch.tryAppendForSplit(record.timestamp(), record.key(), record.value(), record.headers(), thunk);
+            }
+        }
+        // Close the last batch and add it to the batch list after split.
+        if (batch != null)
+            batches.add(batch);
+
+        produceFuture.set(ProduceResponse.INVALID_OFFSET, NO_TIMESTAMP, new RecordBatchTooLargeException());
+        produceFuture.done();
+        return batches;
+    }
+
+    private ProducerBatch createBatchOffAccumulatorForRecord(TopicPartition tp,
+                                                             CompressionType compressionType,
+                                                             Record record,
+                                                             int batchSize,
+                                                             long createdMs) {
+        int initialSize = Math.max(Records.LOG_OVERHEAD + AbstractRecords.sizeInBytesUpperBound(magic(),
+                                                                                                record.key(),
+                                                                                                record.value(),
+                                                                                                record.headers()),
+                                   batchSize);
+        return createBatchOffAccumulator(tp, compressionType, initialSize, createdMs);
+    }
+
+    // package private for testing purpose.
+    static ProducerBatch createBatchOffAccumulator(TopicPartition tp,
+                                                   CompressionType compressionType,
+                                                   int batchSize,
+                                                   long createdMs) {
+        ByteBuffer buffer = ByteBuffer.allocate(batchSize);
+        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer,
+                                                             compressionType,
+                                                             TimestampType.CREATE_TIME,
+                                                             batchSize);
+        return new ProducerBatch(tp, builder, createdMs, true);
+    }
+
     /**
      * A callback and the associated FutureRecordMetadata argument to pass to it.
      */
@@ -135,7 +250,7 @@ public final class ProducerBatch {
         final Callback callback;
         final FutureRecordMetadata future;
 
-        public Thunk(Callback callback, FutureRecordMetadata future) {
+        Thunk(Callback callback, FutureRecordMetadata future) {
             this.callback = callback;
             this.future = future;
         }
@@ -155,7 +270,7 @@ public final class ProducerBatch {
      * This methods closes this batch and sets {@code expiryErrorMessage} if the batch has timed out.
      * {@link #expirationDone()} must be invoked to complete the produce future and invoke callbacks.
      */
-    public boolean maybeExpire(int requestTimeoutMs, long retryBackoffMs, long now, long lingerMs, boolean isFull) {
+    boolean maybeExpire(int requestTimeoutMs, long retryBackoffMs, long now, long lingerMs, boolean isFull) {
         if (!this.inRetry() && isFull && requestTimeoutMs < (now - this.lastAppendTime))
             expiryErrorMessage = (now - this.lastAppendTime) + " ms has passed since last append";
         else if (!this.inRetry() && requestTimeoutMs < (createdTimeMs(now) - lingerMs))
@@ -177,7 +292,7 @@ public final class ProducerBatch {
     void expirationDone() {
         if (expiryErrorMessage == null)
             throw new IllegalStateException("Batch has not expired");
-        this.done(-1L, RecordBatch.NO_TIMESTAMP,
+        this.done(-1L, NO_TIMESTAMP,
                   new TimeoutException("Expiring " + recordCount + " record(s) for " + topicPartition + ": " + expiryErrorMessage));
     }
 
@@ -208,6 +323,10 @@ public final class ProducerBatch {
         this.drainedMs = Math.max(drainedMs, nowMs);
     }
 
+    boolean isSplitBatch() {
+        return isSplitBatch;
+    }
+
     /**
      * Returns if the batch is been retried for sending to kafka
      */
@@ -223,8 +342,8 @@ public final class ProducerBatch {
         return recordsBuilder.sizeInBytes();
     }
 
-    public double compressionRate() {
-        return recordsBuilder.compressionRate();
+    public double compressionRatio() {
+        return recordsBuilder.compressionRatio();
     }
 
     public boolean isFull() {
@@ -245,6 +364,11 @@ public final class ProducerBatch {
 
     public void close() {
         recordsBuilder.close();
+        if (!recordsBuilder.isControlBatch()) {
+            CompressionRatioEstimator.updateEstimation(topicPartition.topic(),
+                                                       recordsBuilder.compressionType(),
+                                                       (float) recordsBuilder.compressionRatio());
+        }
     }
 
     public void abort() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/7fad4555/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index 5b8fb96..e1f04a8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -31,6 +31,7 @@ import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Rate;
 import org.apache.kafka.common.record.AbstractRecords;
+import org.apache.kafka.common.record.CompressionRatioEstimator;
 import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.record.Record;
 import org.apache.kafka.common.record.RecordBatch;
@@ -325,6 +326,30 @@ public final class RecordAccumulator {
     }
 
     /**
+     * Split the big batch that has been rejected and reenqueue the split batches in to the accumulator.
+     * @return the number of split batches.
+     */
+    public int splitAndReenqueue(ProducerBatch bigBatch) {
+        // Reset the estimated compression ratio to the initial value or the big batch compression ratio, whichever
+        // is bigger. There are several different ways to do the reset. We chose the most conservative one to ensure
+        // the split doesn't happen too often.
+        CompressionRatioEstimator.setEstimation(bigBatch.topicPartition.topic(), compression,
+                                                Math.max(1.0f, (float) bigBatch.compressionRatio()));
+        Deque<ProducerBatch> dq = bigBatch.split(this.batchSize);
+        int numSplitBatches = dq.size();
+        Deque<ProducerBatch> partitionDequeue = getOrCreateDeque(bigBatch.topicPartition);
+        while (!dq.isEmpty()) {
+            ProducerBatch batch = dq.pollLast();
+            incomplete.add(batch);
+            // We treat the newly split batches as if they are not even tried.
+            synchronized (partitionDequeue) {
+                partitionDequeue.addFirst(batch);
+            }
+        }
+        return numSplitBatches;
+    }
+
+    /**
      * Get a list of nodes whose partitions are ready to be sent, and the earliest time at which any non-sendable
      * partition will be ready; Also return the flag for whether there are any unknown leaders for the accumulated
      * partition batches.
@@ -506,7 +531,17 @@ public final class RecordAccumulator {
      */
     public void deallocate(ProducerBatch batch) {
         incomplete.remove(batch);
-        free.deallocate(batch.buffer(), batch.initialCapacity());
+        // Only deallocate the batch if it is not a split batch because split batch are allocated aside the
+        // buffer pool.
+        if (!batch.isSplitBatch())
+            free.deallocate(batch.buffer(), batch.initialCapacity());
+    }
+
+    /**
+     * Package private for unit test. Get the buffer pool remaining size in bytes.
+     */
+    long bufferPoolAvailableMemory() {
+        return free.availableMemory();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/7fad4555/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 8dea9c6..4c3b99d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -185,7 +185,7 @@ public class Sender implements Runnable {
 
     /**
      * Run a single iteration of sending
-     * 
+     *
      * @param now The current POSIX time in milliseconds
      */
     void run(long now) {
@@ -478,7 +478,7 @@ public class Sender implements Runnable {
 
     /**
      * Complete or retry the given batch of records.
-     * 
+     *
      * @param batch The record batch
      * @param response The produce response
      * @param correlationId The correlation id for the request
@@ -487,7 +487,18 @@ public class Sender implements Runnable {
     private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response, long correlationId,
                                long now) {
         Errors error = response.error;
-        if (error != Errors.NONE) {
+        if (error == Errors.MESSAGE_TOO_LARGE && batch.recordCount > 1) {
+            // If the batch is too large, we split the batch and send the split batches again. We do not decrement
+            // the retry attempts in this case.
+            log.warn("Got error produce response in correlation id {} on topic-partition {}, spitting and retrying ({} attempts left). Error: {}",
+                     correlationId,
+                     batch.topicPartition,
+                     this.retries - batch.attempts(),
+                     error);
+            this.accumulator.splitAndReenqueue(batch);
+            this.accumulator.deallocate(batch);
+            this.sensors.recordBatchSplit();
+        } else if (error != Errors.NONE) {
             if (canRetry(batch, error)) {
                 log.warn("Got error produce response with correlation id {} on topic-partition {}, retrying ({} attempts left). Error: {}",
                         correlationId,
@@ -656,6 +667,7 @@ public class Sender implements Runnable {
         public final Sensor compressionRateSensor;
         public final Sensor maxRecordSizeSensor;
         public final Sensor produceThrottleTimeSensor;
+        public final Sensor batchSplitSensor;
 
         public SenderMetrics(Metrics metrics) {
             this.metrics = metrics;
@@ -721,6 +733,10 @@ public class Sender implements Runnable {
                     return (now - metadata.lastSuccessfulUpdate()) / 1000.0;
                 }
             });
+
+            this.batchSplitSensor = metrics.sensor("batch-split-rate");
+            m = metrics.metricName("batch-split-rate", metricGrpName, "The rate of record batch split");
+            this.batchSplitSensor.add(m, new Rate());
         }
 
         private void maybeRegisterTopicMetrics(String topic) {
@@ -780,12 +796,12 @@ public class Sender implements Runnable {
                     // per-topic compression rate
                     String topicCompressionRateName = "topic." + topic + ".compression-rate";
                     Sensor topicCompressionRate = Utils.notNull(this.metrics.getSensor(topicCompressionRateName));
-                    topicCompressionRate.record(batch.compressionRate());
+                    topicCompressionRate.record(batch.compressionRatio());
 
                     // global metrics
                     this.batchSizeSensor.record(batch.sizeInBytes(), now);
                     this.queueTimeSensor.record(batch.queueTimeMs(), now);
-                    this.compressionRateSensor.record(batch.compressionRate());
+                    this.compressionRateSensor.record(batch.compressionRatio());
                     this.maxRecordSizeSensor.record(batch.maxRecordSize, now);
                     records += batch.recordCount;
                 }
@@ -826,6 +842,9 @@ public class Sender implements Runnable {
             this.produceThrottleTimeSensor.record(throttleTimeMs, time.milliseconds());
         }
 
+        void recordBatchSplit() {
+            this.batchSplitSensor.record();
+        }
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7fad4555/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
index cfda8a4..2771ab7 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
@@ -165,6 +165,10 @@ public abstract class AbstractRecords implements Records {
     }
 
     public static int sizeInBytesUpperBound(byte magic, byte[] key, byte[] value, Header[] headers) {
+        return sizeInBytesUpperBound(magic, Utils.wrapNullable(key), Utils.wrapNullable(value), headers);
+    }
+
+    public static int sizeInBytesUpperBound(byte magic, ByteBuffer key, ByteBuffer value, Header[] headers) {
         if (magic >= RecordBatch.MAGIC_VALUE_V2)
             return DefaultRecordBatch.batchSizeUpperBound(key, value, headers);
         else

http://git-wip-us.apache.org/repos/asf/kafka/blob/7fad4555/clients/src/main/java/org/apache/kafka/common/record/CompressionRatioEstimator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/CompressionRatioEstimator.java b/clients/src/main/java/org/apache/kafka/common/record/CompressionRatioEstimator.java
new file mode 100644
index 0000000..7f11784
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/record/CompressionRatioEstimator.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.record;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+
+/**
+ * This class help estimate the compression ratio for each topic and compression type combination.
+ */
+public class CompressionRatioEstimator {
+    // The constant speed to increase compression ratio when a batch compresses better than expected.
+    public static final float COMPRESSION_RATIO_IMPROVING_STEP = 0.005f;
+    // The minimum speed to decrease compression ratio when a batch compresses worse than expected.
+    public static final float COMPRESSION_RATIO_DETERIORATE_STEP = 0.05f;
+    private static final ConcurrentMap<String, float[]> COMPRESSION_RATIO = new ConcurrentHashMap<>();
+
+    /**
+     * Update the compression ratio estimation for a topic and compression type.
+     *
+     * @param topic         the topic to update compression ratio estimation.
+     * @param type          the compression type.
+     * @param observedRatio the observed compression ratio.
+     * @return the compression ratio estimation after the update.
+     */
+    public static float updateEstimation(String topic, CompressionType type, float observedRatio) {
+        float[] compressionRatioForTopic = getAndCreateEstimationIfAbsent(topic);
+        float currentEstimation = compressionRatioForTopic[type.id];
+        synchronized (compressionRatioForTopic) {
+            if (observedRatio > currentEstimation)
+                compressionRatioForTopic[type.id] = Math.max(currentEstimation + COMPRESSION_RATIO_DETERIORATE_STEP, observedRatio);
+            else if (observedRatio < currentEstimation) {
+                compressionRatioForTopic[type.id] = currentEstimation - COMPRESSION_RATIO_IMPROVING_STEP;
+            }
+        }
+        return compressionRatioForTopic[type.id];
+    }
+
+    /**
+     * Get the compression ratio estimation for a topic and compression type.
+     */
+    public static float estimation(String topic, CompressionType type) {
+        float[] compressionRatioForTopic = getAndCreateEstimationIfAbsent(topic);
+        return compressionRatioForTopic[type.id];
+    }
+
+    /**
+     * Reset the compression ratio estimation to the initial values for a topic.
+     */
+    public static void resetEstimation(String topic) {
+        float[] compressionRatioForTopic = getAndCreateEstimationIfAbsent(topic);
+        synchronized (compressionRatioForTopic) {
+            for (CompressionType type : CompressionType.values()) {
+                compressionRatioForTopic[type.id] = type.rate;
+            }
+        }
+    }
+
+    /**
+     * Remove the compression ratio estimation for a topic.
+     */
+    public static void removeEstimation(String topic) {
+        COMPRESSION_RATIO.remove(topic);
+    }
+
+    /**
+     * Set the compression estimation for a topic compression type combination. This method is for unit test purpose.
+     */
+    public static void setEstimation(String topic, CompressionType type, float ratio) {
+        float[] compressionRatioForTopic = getAndCreateEstimationIfAbsent(topic);
+        synchronized (compressionRatioForTopic) {
+            compressionRatioForTopic[type.id] = ratio;
+        }
+    }
+
+    private static float[] getAndCreateEstimationIfAbsent(String topic) {
+        float[] compressionRatioForTopic = COMPRESSION_RATIO.get(topic);
+        if (compressionRatioForTopic == null) {
+            compressionRatioForTopic = initialCompressionRatio();
+            float[] existingCompressionRatio = COMPRESSION_RATIO.putIfAbsent(topic, compressionRatioForTopic);
+            // Someone created the compression ratio array before us, use it.
+            if (existingCompressionRatio != null)
+                return existingCompressionRatio;
+        }
+        return compressionRatioForTopic;
+    }
+
+    private static float[] initialCompressionRatio() {
+        float[] compressionRatio = new float[CompressionType.values().length];
+        for (CompressionType type : CompressionType.values()) {
+            compressionRatio[type.id] = type.rate;
+        }
+        return compressionRatio;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/7fad4555/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
index a78c5a2..15b5958 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
@@ -44,7 +44,7 @@ public enum CompressionType {
         }
     },
 
-    GZIP(1, "gzip", 0.5f) {
+    GZIP(1, "gzip", 1.0f) {
         @Override
         public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion, int bufferSize) {
             try {
@@ -64,7 +64,7 @@ public enum CompressionType {
         }
     },
 
-    SNAPPY(2, "snappy", 0.5f) {
+    SNAPPY(2, "snappy", 1.0f) {
         @Override
         public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion, int bufferSize) {
             try {
@@ -84,7 +84,7 @@ public enum CompressionType {
         }
     },
 
-    LZ4(3, "lz4", 0.5f) {
+    LZ4(3, "lz4", 1.0f) {
         @Override
         public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion, int bufferSize) {
             try {

http://git-wip-us.apache.org/repos/asf/kafka/blob/7fad4555/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
index 669c75d..37f92d2 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
@@ -494,8 +494,12 @@ public class DefaultRecord implements Record {
     }
 
     static int recordSizeUpperBound(byte[] key, byte[] value, Header[] headers) {
-        int keySize = key == null ? -1 : key.length;
-        int valueSize = value == null ? -1 : value.length;
+        return recordSizeUpperBound(Utils.wrapNullable(key), Utils.wrapNullable(value), headers);
+    }
+
+    static int recordSizeUpperBound(ByteBuffer key, ByteBuffer value, Header[] headers) {
+        int keySize = key == null ? -1 : key.remaining();
+        int valueSize = value == null ? -1 : value.remaining();
         return MAX_RECORD_OVERHEAD + sizeOf(keySize, valueSize, headers);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/7fad4555/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
index 74bd3c0..589e67c 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
@@ -30,6 +30,7 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.NoSuchElementException;
+import org.apache.kafka.common.utils.Utils;
 
 import static org.apache.kafka.common.record.Records.LOG_OVERHEAD;
 
@@ -440,6 +441,13 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
      * Get an upper bound on the size of a batch with only a single record using a given key and value.
      */
     static int batchSizeUpperBound(byte[] key, byte[] value, Header[] headers) {
+        return batchSizeUpperBound(Utils.wrapNullable(key), Utils.wrapNullable(value), headers);
+    }
+
+    /**
+     * Get an upper bound on the size of a batch with only a single record using a given key and value.
+     */
+    static int batchSizeUpperBound(ByteBuffer key, ByteBuffer value, Header[] headers) {
         return RECORD_BATCH_OVERHEAD + DefaultRecord.recordSizeUpperBound(key, value, headers);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/7fad4555/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
index 6f90fac..42ae0f8 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
@@ -37,22 +37,9 @@ import static org.apache.kafka.common.utils.Utils.wrapNullable;
  * This will release resources like compression buffers that can be relatively large (64 KB for LZ4).
  */
 public class MemoryRecordsBuilder {
-    private static final float COMPRESSION_RATE_DAMPING_FACTOR = 0.9f;
     private static final float COMPRESSION_RATE_ESTIMATION_FACTOR = 1.05f;
     private static final int COMPRESSION_DEFAULT_BUFFER_SIZE = 1024;
 
-    private static final float[] TYPE_TO_RATE;
-
-    static {
-        int maxTypeId = -1;
-        for (CompressionType type : CompressionType.values())
-            maxTypeId = Math.max(maxTypeId, type.id);
-        TYPE_TO_RATE = new float[maxTypeId + 1];
-        for (CompressionType type : CompressionType.values()) {
-            TYPE_TO_RATE[type.id] = type.rate;
-        }
-    }
-
     private final TimestampType timestampType;
     private final CompressionType compressionType;
     // Used to append records, may compress data on the fly
@@ -71,13 +58,15 @@ public class MemoryRecordsBuilder {
     private final int writeLimit;
     private final int initialCapacity;
 
+    private volatile float estimatedCompressionRatio;
+
     private boolean appendStreamIsClosed = false;
     private long producerId;
     private short producerEpoch;
     private int baseSequence;
     private long writtenUncompressed = 0;
     private int numRecords = 0;
-    private float compressionRate = 1;
+    private float actualCompressionRatio = 1;
     private long maxTimestamp = RecordBatch.NO_TIMESTAMP;
     private long offsetOfMaxTimestamp = -1;
     private Long lastOffset = null;
@@ -134,7 +123,7 @@ public class MemoryRecordsBuilder {
         this.initPos = buffer.position();
         this.numRecords = 0;
         this.writtenUncompressed = 0;
-        this.compressionRate = 1;
+        this.actualCompressionRatio = 1;
         this.maxTimestamp = RecordBatch.NO_TIMESTAMP;
         this.producerId = producerId;
         this.producerEpoch = producerEpoch;
@@ -167,8 +156,16 @@ public class MemoryRecordsBuilder {
         return initialCapacity;
     }
 
-    public double compressionRate() {
-        return compressionRate;
+    public double compressionRatio() {
+        return actualCompressionRatio;
+    }
+
+    public CompressionType compressionType() {
+        return compressionType;
+    }
+
+    public boolean isControlBatch() {
+        return isControlBatch;
     }
 
     /**
@@ -284,9 +281,9 @@ public class MemoryRecordsBuilder {
             builtRecords = MemoryRecords.EMPTY;
         } else {
             if (magic > RecordBatch.MAGIC_VALUE_V1)
-                writeDefaultBatchHeader();
+                this.actualCompressionRatio = (float) writeDefaultBatchHeader() / this.writtenUncompressed;
             else if (compressionType != CompressionType.NONE)
-                writeLegacyCompressedWrapperHeader();
+                this.actualCompressionRatio = (float) writeLegacyCompressedWrapperHeader() / this.writtenUncompressed;
 
             ByteBuffer buffer = buffer().duplicate();
             buffer.flip();
@@ -295,12 +292,17 @@ public class MemoryRecordsBuilder {
         }
     }
 
-    private void writeDefaultBatchHeader() {
+    /**
+     * Write the header to the default batch.
+     * @return the written compressed bytes.
+     */
+    private int writeDefaultBatchHeader() {
         ensureOpenForRecordBatchWrite();
         ByteBuffer buffer = bufferStream.buffer();
         int pos = buffer.position();
         buffer.position(initPos);
         int size = pos - initPos;
+        int writtenCompressed = size - DefaultRecordBatch.RECORD_BATCH_OVERHEAD;
         int offsetDelta = (int) (lastOffset - baseOffset);
 
         final long baseTimestamp;
@@ -318,9 +320,14 @@ public class MemoryRecordsBuilder {
                 partitionLeaderEpoch, numRecords);
 
         buffer.position(pos);
+        return writtenCompressed;
     }
 
-    private void writeLegacyCompressedWrapperHeader() {
+    /**
+     * Write the header to the legacy batch.
+     * @return the written compressed bytes.
+     */
+    private int writeLegacyCompressedWrapperHeader() {
         ensureOpenForRecordBatchWrite();
         ByteBuffer buffer = bufferStream.buffer();
         int pos = buffer.position();
@@ -334,11 +341,7 @@ public class MemoryRecordsBuilder {
         LegacyRecord.writeCompressedRecordHeader(buffer, magic, wrapperSize, timestamp, compressionType, timestampType);
 
         buffer.position(pos);
-
-        // update the compression ratio
-        this.compressionRate = (float) writtenCompressed / this.writtenUncompressed;
-        TYPE_TO_RATE[compressionType.id] = TYPE_TO_RATE[compressionType.id] * COMPRESSION_RATE_DAMPING_FACTOR +
-            compressionRate * (1 - COMPRESSION_RATE_DAMPING_FACTOR);
+        return writtenCompressed;
     }
 
     private long appendWithOffset(long offset, boolean isControlRecord, long timestamp, ByteBuffer key,
@@ -440,7 +443,7 @@ public class MemoryRecordsBuilder {
     public long append(long timestamp, ByteBuffer key, ByteBuffer value) {
         return append(timestamp, key, value, Record.EMPTY_HEADERS);
     }
-    
+
     /**
      * Append a new record at the next sequential offset.
      * @param timestamp The record timestamp
@@ -636,11 +639,25 @@ public class MemoryRecordsBuilder {
             return buffer().position();
         } else {
             // estimate the written bytes to the underlying byte buffer based on uncompressed written bytes
-            return (int) (writtenUncompressed * TYPE_TO_RATE[compressionType.id] * COMPRESSION_RATE_ESTIMATION_FACTOR);
+            return (int) (writtenUncompressed * estimatedCompressionRatio * COMPRESSION_RATE_ESTIMATION_FACTOR);
         }
     }
 
     /**
+     * Set the estimated compression ratio for the memory records builder.
+     */
+    public void setEstimatedCompressionRatio(float estimatedCompressionRatio) {
+        this.estimatedCompressionRatio = estimatedCompressionRatio;
+    }
+
+    /**
+     * Check if we have room for a new record containing the given key/value pair
+     */
+    public boolean hasRoomFor(long timestamp, byte[] key, byte[] value) {
+        return hasRoomFor(timestamp, wrapNullable(key), wrapNullable(value));
+    }
+
+    /**
      * Check if we have room for a new record containing the given key/value pair
      *
      * Note that the return value is based on the estimate of the bytes written to the compressor, which may not be
@@ -652,7 +669,7 @@ public class MemoryRecordsBuilder {
      * the checking should be based on the capacity of the initialized buffer rather than the write limit in order
      * to accept this single record.
      */
-    public boolean hasRoomFor(long timestamp, byte[] key, byte[] value) {
+    public boolean hasRoomFor(long timestamp, ByteBuffer key, ByteBuffer value) {
         if (isFull())
             return false;
 
@@ -662,9 +679,10 @@ public class MemoryRecordsBuilder {
         } else {
             int nextOffsetDelta = lastOffset == null ? 0 : (int) (lastOffset - baseOffset + 1);
             long timestampDelta = baseTimestamp == null ? 0 : timestamp - baseTimestamp;
-            recordSize = DefaultRecord.sizeInBytes(nextOffsetDelta, timestampDelta, key, value);
+            recordSize = DefaultRecord.sizeInBytes(nextOffsetDelta, timestampDelta, key, value, Record.EMPTY_HEADERS);
         }
 
+        // Be conservative and not take compression of the new record into consideration.
         return numRecords == 0 ?
                 this.initialCapacity >= recordSize :
                 this.writeLimit >= estimatedBytesWritten() + recordSize;

http://git-wip-us.apache.org/repos/asf/kafka/blob/7fad4555/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index af599ca..b9675c3 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -16,6 +16,9 @@
  */
 package org.apache.kafka.clients.producer.internals;
 
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.NodeApiVersions;
 import org.apache.kafka.clients.producer.Callback;
@@ -28,6 +31,7 @@ import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.record.CompressionRatioEstimator;
 import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.record.DefaultRecordBatch;
 import org.apache.kafka.common.record.DefaultRecord;
@@ -55,6 +59,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import static java.util.Arrays.asList;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -311,7 +316,7 @@ public class RecordAccumulatorTest {
         assertEquals("Node1 should only have one batch drained.", 1, batches.get(0).size());
         assertEquals("Node1 should only have one batch for partition 0.", tp1, batches.get(0).get(0).topicPartition);
     }
-    
+
     @Test
     public void testFlush() throws Exception {
         long lingerMs = Long.MAX_VALUE;
@@ -321,16 +326,16 @@ public class RecordAccumulatorTest {
             accum.append(new TopicPartition(topic, i % 3), 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs);
         RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds());
         assertEquals("No nodes should be ready.", 0, result.readyNodes.size());
-        
+
         accum.beginFlush();
         result = accum.ready(cluster, time.milliseconds());
-        
+
         // drain and deallocate all batches
         Map<Integer, List<ProducerBatch>> results = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, time.milliseconds());
         for (List<ProducerBatch> batches: results.values())
             for (ProducerBatch batch: batches)
                 accum.deallocate(batch);
-        
+
         // should be complete with no unsent records.
         accum.awaitFlushCompletion();
         assertFalse(accum.hasUnsent());
@@ -552,6 +557,196 @@ public class RecordAccumulatorTest {
         accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 0);
     }
 
+    @Test
+    public void testSplitAndReenqueue() throws ExecutionException, InterruptedException {
+        long now = time.milliseconds();
+        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.GZIP, 10, 100L, metrics, time,
+                                                        new ApiVersions(), null);
+        // Create a big batch
+        ProducerBatch batch = ProducerBatch.createBatchOffAccumulator(tp1, CompressionType.NONE, 4096, now);
+        byte[] value = new byte[1024];
+        final AtomicInteger acked = new AtomicInteger(0);
+        Callback cb = new Callback() {
+            @Override
+            public void onCompletion(RecordMetadata metadata, Exception exception) {
+                acked.incrementAndGet();
+            }
+        };
+        // Append two messages so the batch is too big.
+        Future<RecordMetadata> future1 = batch.tryAppend(now, null, value, Record.EMPTY_HEADERS, cb, now);
+        Future<RecordMetadata> future2 = batch.tryAppend(now, null, value, Record.EMPTY_HEADERS, cb, now);
+        assertNotNull(future1);
+        assertNotNull(future2);
+        batch.close();
+        // Enqueue the batch to the accumulator so that as if the batch was created by the accumulator.
+        accum.reenqueue(batch, now);
+        time.sleep(101L);
+        // Drain the batch.
+        RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds());
+        assertTrue("The batch should be ready", result.readyNodes.size() > 0);
+        Map<Integer, List<ProducerBatch>> drained = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, time.milliseconds());
+        assertEquals("Only node1 should be drained", 1, drained.size());
+        assertEquals("Only one batch should be drained", 1, drained.get(node1.id()).size());
+        // Split and reenqueue the batch.
+        accum.splitAndReenqueue(drained.get(node1.id()).get(0));
+        time.sleep(101L);
+
+        drained = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, time.milliseconds());
+        assertFalse(drained.isEmpty());
+        assertFalse(drained.get(node1.id()).isEmpty());
+        drained.get(node1.id()).get(0).done(acked.get(), 100L, null);
+        assertEquals("The first message should have been acked.", 1, acked.get());
+        assertTrue(future1.isDone());
+        assertEquals(0, future1.get().offset());
+
+        drained = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, time.milliseconds());
+        assertFalse(drained.isEmpty());
+        assertFalse(drained.get(node1.id()).isEmpty());
+        drained.get(node1.id()).get(0).done(acked.get(), 100L, null);
+        assertEquals("Both message should have been acked.", 2, acked.get());
+        assertTrue(future2.isDone());
+        assertEquals(1, future2.get().offset());
+    }
+
+    @Test
+    public void testSplitBatchOffAccumulator() throws InterruptedException {
+        long seed = System.currentTimeMillis();
+        final int batchSize = 1024;
+        final int bufferCapacity = 3 * 1024;
+
+        // First set the compression ratio estimation to be good.
+        CompressionRatioEstimator.setEstimation(tp1.topic(), CompressionType.GZIP, 0.1f);
+        RecordAccumulator accum = new RecordAccumulator(batchSize, bufferCapacity, CompressionType.GZIP, 0L, 100L,
+                                                        metrics, time, new ApiVersions(), null);
+        int numSplitBatches = prepareSplitBatches(accum, seed, 100, 20);
+        assertTrue("There should be some split batches", numSplitBatches > 0);
+        // Drain all the split batches.
+        RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds());
+        for (int i = 0; i < numSplitBatches; i++) {
+            Map<Integer, List<ProducerBatch>> drained =
+                accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, time.milliseconds());
+            assertFalse(drained.isEmpty());
+            assertFalse(drained.get(node1.id()).isEmpty());
+        }
+        assertTrue("All the batches should have been drained.",
+                   accum.ready(cluster, time.milliseconds()).readyNodes.isEmpty());
+        assertEquals("The split batches should be allocated off the accumulator",
+                     bufferCapacity, accum.bufferPoolAvailableMemory());
+    }
+
+    @Test
+    public void testSplitFrequency() throws InterruptedException {
+        long seed = System.currentTimeMillis();
+        Random random = new Random();
+        random.setSeed(seed);
+        final int batchSize = 1024;
+        final int numMessages = 1000;
+
+        RecordAccumulator accum = new RecordAccumulator(batchSize, 3 * 1024, CompressionType.GZIP, 10, 100L,
+                                                        metrics, time, new ApiVersions(), null);
+        // Adjust the high and low compression ratio message percentage
+        for (int goodCompRatioPercentage = 1; goodCompRatioPercentage < 100; goodCompRatioPercentage++) {
+            int numSplit = 0;
+            int numBatches = 0;
+            CompressionRatioEstimator.resetEstimation(topic);
+            for (int i = 0; i < numMessages; i++) {
+                int dice = random.nextInt(100);
+                byte[] value = (dice < goodCompRatioPercentage) ?
+                        bytesWithGoodCompression(random) : bytesWithPoorCompression(random, 100);
+                accum.append(tp1, 0L, null, value, Record.EMPTY_HEADERS, null, 0);
+                BatchDrainedResult result = completeOrSplitBatches(accum, batchSize);
+                numSplit += result.numSplit;
+                numBatches += result.numBatches;
+            }
+            time.sleep(10);
+            BatchDrainedResult result = completeOrSplitBatches(accum, batchSize);
+            numSplit += result.numSplit;
+            numBatches += result.numBatches;
+            assertTrue(String.format("Total num batches = %d, split batches = %d, more than 10%% of the batch splits. "
+                                         + "Random seed is " + seed,
+                                     numBatches, numSplit), (double) numSplit / numBatches < 0.1f);
+        }
+    }
+
+    private int prepareSplitBatches(RecordAccumulator accum, long seed, int recordSize, int numRecords)
+        throws InterruptedException {
+        Random random = new Random();
+        random.setSeed(seed);
+
+        // First set the compression ratio estimation to be good.
+        CompressionRatioEstimator.setEstimation(tp1.topic(), CompressionType.GZIP, 0.1f);
+        // Append 20 records of 100 bytes size with poor compression ratio should make the batch too big.
+        for (int i = 0; i < numRecords; i++) {
+            accum.append(tp1, 0L, null, bytesWithPoorCompression(random, recordSize), Record.EMPTY_HEADERS, null, 0);
+        }
+
+        RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds());
+        assertFalse(result.readyNodes.isEmpty());
+        Map<Integer, List<ProducerBatch>> batches = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, time.milliseconds());
+        assertEquals(1, batches.size());
+        assertEquals(1, batches.values().iterator().next().size());
+        ProducerBatch batch = batches.values().iterator().next().get(0);
+        int numSplitBatches = accum.splitAndReenqueue(batch);
+        accum.deallocate(batch);
+
+        return numSplitBatches;
+    }
+
+    private BatchDrainedResult completeOrSplitBatches(RecordAccumulator accum, int batchSize) {
+        int numSplit = 0;
+        int numBatches = 0;
+        boolean batchDrained;
+        do {
+            batchDrained = false;
+            RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds());
+            Map<Integer, List<ProducerBatch>> batches = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, time.milliseconds());
+            for (List<ProducerBatch> batchList : batches.values()) {
+                for (ProducerBatch batch : batchList) {
+                    batchDrained = true;
+                    numBatches++;
+                    if (batch.sizeInBytes() > batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD) {
+                        accum.splitAndReenqueue(batch);
+                        // release the resource of the original big batch.
+                        numSplit++;
+                    } else {
+                        batch.done(0L, 0L, null);
+                    }
+                    accum.deallocate(batch);
+                }
+            }
+        } while (batchDrained);
+        return new BatchDrainedResult(numSplit, numBatches);
+    }
+
+    /**
+     * Generates the compression ratio at about 0.6
+     */
+    private byte[] bytesWithGoodCompression(Random random) {
+        byte[] value = new byte[100];
+        ByteBuffer buffer = ByteBuffer.wrap(value);
+        while (buffer.remaining() > 0)
+            buffer.putInt(random.nextInt(1000));
+        return value;
+    }
+
+    /**
+     * Generates the compression ratio at about 0.9
+     */
+    private byte[] bytesWithPoorCompression(Random random, int size) {
+        byte[] value = new byte[size];
+        random.nextBytes(value);
+        return value;
+    }
+
+    private class BatchDrainedResult {
+        final int numSplit;
+        final int numBatches;
+        BatchDrainedResult(int numSplit, int numBatches) {
+            this.numBatches = numBatches;
+            this.numSplit = numSplit;
+        }
+    }
+
     /**
      * Return the offset delta.
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/7fad4555/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 1321fba..cc30f4d 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -31,6 +31,7 @@ import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionRatioEstimator;
 import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.record.MutableRecordBatch;
 import org.apache.kafka.common.record.RecordBatch;
@@ -531,6 +532,95 @@ public class SenderTest {
         assertFalse("Expected transaction state to be reset upon receiving an OutOfOrderSequenceException", transactionManager.hasProducerId());
     }
 
+    @Test
+    public void testSplitBatchAndSend() throws Exception {
+        int maxRetries = 1;
+        String topic = "testSplitBatchAndSend";
+        // Set a good compression ratio.
+        CompressionRatioEstimator.setEstimation(topic, CompressionType.GZIP, 0.2f);
+        Metrics m = new Metrics();
+        TransactionManager txnManager = new TransactionManager("testSplitBatchAndSend", 0);
+        txnManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(123456L, (short) 0));
+        accumulator = new RecordAccumulator(batchSize, 1024 * 1024, CompressionType.GZIP, 0L, 0L, m, time,
+                                            new ApiVersions(), txnManager);
+        try {
+            Sender sender = new Sender(client,
+                                       metadata,
+                                       this.accumulator,
+                                       true,
+                                       MAX_REQUEST_SIZE,
+                                       ACKS_ALL,
+                                       maxRetries,
+                                       m,
+                                       time,
+                                       REQUEST_TIMEOUT,
+                                       1000L,
+                                       txnManager,
+                                       new ApiVersions());
+            // Create a two broker cluster, with partition 0 on broker 0 and partition 1 on broker 1
+            Cluster cluster1 = TestUtils.clusterWith(2, topic, 2);
+            metadata.update(cluster1, Collections.<String>emptySet(), time.milliseconds());
+            // Send the first message.
+            TopicPartition tp2 = new TopicPartition(topic, 1);
+            Future<RecordMetadata> f1 =
+                    accumulator.append(tp2, 0L, "key1".getBytes(), new byte[batchSize / 2], null, null, MAX_BLOCK_TIMEOUT).future;
+            Future<RecordMetadata> f2 =
+                    accumulator.append(tp2, 0L, "key2".getBytes(), new byte[batchSize / 2], null, null, MAX_BLOCK_TIMEOUT).future;
+            sender.run(time.milliseconds()); // connect
+            sender.run(time.milliseconds()); // send produce request
+            assertEquals("The sequence number should be 0", 0, txnManager.sequenceNumber(tp2).longValue());
+            String id = client.requests().peek().destination();
+            assertEquals(ApiKeys.PRODUCE, client.requests().peek().requestBuilder().apiKey());
+            Node node = new Node(Integer.valueOf(id), "localhost", 0);
+            assertEquals(1, client.inFlightRequestCount());
+            assertTrue("Client ready status should be true", client.isReady(node, 0L));
+
+            Map<TopicPartition, ProduceResponse.PartitionResponse> responseMap = new HashMap<>();
+            responseMap.put(tp2, new ProduceResponse.PartitionResponse(Errors.MESSAGE_TOO_LARGE));
+            client.respond(new ProduceResponse(responseMap));
+            sender.run(time.milliseconds()); // split and reenqueue
+            // The compression ratio should have been improved once.
+            assertEquals(CompressionType.GZIP.rate - CompressionRatioEstimator.COMPRESSION_RATIO_IMPROVING_STEP,
+                    CompressionRatioEstimator.estimation(topic, CompressionType.GZIP), 0.01);
+            sender.run(time.milliseconds()); // send produce request
+            assertEquals("The sequence number should be 0", 0, txnManager.sequenceNumber(tp2).longValue());
+            assertFalse("The future shouldn't have been done.", f1.isDone());
+            assertFalse("The future shouldn't have been done.", f2.isDone());
+            id = client.requests().peek().destination();
+            assertEquals(ApiKeys.PRODUCE, client.requests().peek().requestBuilder().apiKey());
+            node = new Node(Integer.valueOf(id), "localhost", 0);
+            assertEquals(1, client.inFlightRequestCount());
+            assertTrue("Client ready status should be true", client.isReady(node, 0L));
+
+            responseMap.put(tp2, new ProduceResponse.PartitionResponse(Errors.NONE, 0L, 0L));
+            client.respond(new ProduceResponse(responseMap));
+            sender.run(time.milliseconds()); // receive
+            assertTrue("The future should have been done.", f1.isDone());
+            assertEquals("The sequence number should be 1", 1, txnManager.sequenceNumber(tp2).longValue());
+            assertFalse("The future shouldn't have been done.", f2.isDone());
+            assertEquals("Offset of the first message should be 0", 0L, f1.get().offset());
+            sender.run(time.milliseconds()); // send produce request
+            id = client.requests().peek().destination();
+            assertEquals(ApiKeys.PRODUCE, client.requests().peek().requestBuilder().apiKey());
+            node = new Node(Integer.valueOf(id), "localhost", 0);
+            assertEquals(1, client.inFlightRequestCount());
+            assertTrue("Client ready status should be true", client.isReady(node, 0L));
+
+            responseMap.put(tp2, new ProduceResponse.PartitionResponse(Errors.NONE, 1L, 0L));
+            client.respond(new ProduceResponse(responseMap));
+            sender.run(time.milliseconds()); // receive
+            assertTrue("The future should have been done.", f2.isDone());
+            assertEquals("The sequence number should be 2", 2, txnManager.sequenceNumber(tp2).longValue());
+            assertEquals("Offset of the first message should be 1", 1L, f2.get().offset());
+            assertTrue("There should be no batch in the accumulator", accumulator.batches().get(tp2).isEmpty());
+
+            assertTrue("There should be a split",
+                    m.metrics().get(m.metricName("batch-split-rate", "producer-metrics")).value() > 0);
+        } finally {
+            m.close();
+        }
+    }
+
     private void completedWithError(Future<RecordMetadata> future, Errors error) throws Exception {
         assertTrue("Request should be completed", future.isDone());
         try {

http://git-wip-us.apache.org/repos/asf/kafka/blob/7fad4555/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
index c08a2f0..58d4371 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
@@ -223,11 +223,11 @@ public class MemoryRecordsBuilderTest {
 
         MemoryRecords built = builder.build();
         if (compressionType == CompressionType.NONE) {
-            assertEquals(1.0, builder.compressionRate(), 0.00001);
+            assertEquals(1.0, builder.compressionRatio(), 0.00001);
         } else {
             int compressedSize = built.sizeInBytes() - Records.LOG_OVERHEAD - LegacyRecord.RECORD_OVERHEAD_V0;
             double computedCompressionRate = (double) compressedSize / uncompressedSize;
-            assertEquals(computedCompressionRate, builder.compressionRate(), 0.00001);
+            assertEquals(computedCompressionRate, builder.compressionRatio(), 0.00001);
         }
     }
 
@@ -254,11 +254,11 @@ public class MemoryRecordsBuilderTest {
 
         MemoryRecords built = builder.build();
         if (compressionType == CompressionType.NONE) {
-            assertEquals(1.0, builder.compressionRate(), 0.00001);
+            assertEquals(1.0, builder.compressionRatio(), 0.00001);
         } else {
             int compressedSize = built.sizeInBytes() - Records.LOG_OVERHEAD - LegacyRecord.RECORD_OVERHEAD_V1;
             double computedCompressionRate = (double) compressedSize / uncompressedSize;
-            assertEquals(computedCompressionRate, builder.compressionRate(), 0.00001);
+            assertEquals(computedCompressionRate, builder.compressionRatio(), 0.00001);
         }
     }
 
@@ -359,6 +359,7 @@ public class MemoryRecordsBuilderTest {
         MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V1, compressionType,
                 TimestampType.CREATE_TIME, 0L, logAppendTime, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE,
                 false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
+        builder.setEstimatedCompressionRatio(0.5f);
         builder.append(0L, "a".getBytes(), "1".getBytes());
         builder.append(1L, "b".getBytes(), "2".getBytes());