You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2014/03/27 05:49:32 UTC

git commit: KAFKA-1253 Compression in the new producer; reviewed by Jay Kreps and Jun Rao

Repository: kafka
Updated Branches:
  refs/heads/trunk 455c490f6 -> 466a83b78


KAFKA-1253 Compression in the new producer; reviewed by Jay Kreps and Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/466a83b7
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/466a83b7
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/466a83b7

Branch: refs/heads/trunk
Commit: 466a83b78c2bfcb9ac3116748394e7845a99bf7a
Parents: 455c490
Author: Guozhang Wang <gu...@linkedin.com>
Authored: Wed Mar 26 21:48:55 2014 -0700
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Wed Mar 26 21:49:06 2014 -0700

----------------------------------------------------------------------
 build.gradle                                    |   3 +-
 .../kafka/clients/producer/KafkaProducer.java   |   4 +-
 .../kafka/clients/producer/ProducerConfig.java  |  10 +-
 .../clients/producer/internals/BufferPool.java  |  31 ++--
 .../producer/internals/RecordAccumulator.java   |  43 +++--
 .../clients/producer/internals/RecordBatch.java |   7 +-
 .../clients/tools/ProducerPerformance.java      |   6 +-
 .../kafka/common/record/CompressionType.java    |   7 +-
 .../kafka/common/record/MemoryRecords.java      | 168 +++++++++++++++----
 .../org/apache/kafka/common/record/Record.java  | 122 +++++++++-----
 .../org/apache/kafka/common/utils/Crc32.java    |  36 ++++
 .../org/apache/kafka/common/utils/Utils.java    |  24 ---
 .../kafka/common/record/MemoryRecordsTest.java  |  32 +++-
 .../apache/kafka/common/record/RecordTest.java  |  13 +-
 .../java/org/apache/kafka/test/TestUtils.java   |   2 +-
 .../scala/kafka/producer/ConsoleProducer.scala  |   4 +-
 .../kafka/api/ProducerFailureHandlingTest.scala |   6 +-
 .../kafka/api/ProducerSendTest.scala            |  26 ++-
 .../scala/kafka/perf/ProducerPerformance.scala  |   1 +
 19 files changed, 367 insertions(+), 178 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/466a83b7/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index d6fd287..5432c0c 100644
--- a/build.gradle
+++ b/build.gradle
@@ -151,7 +151,6 @@ project(':core') {
     compile 'com.101tec:zkclient:0.3'
     compile 'com.yammer.metrics:metrics-core:2.2.0'
     compile 'net.sf.jopt-simple:jopt-simple:3.2'
-    compile 'org.xerial.snappy:snappy-java:1.0.5'
 
     testCompile 'junit:junit:4.1'
     testCompile 'org.easymock:easymock:3.0'
@@ -317,6 +316,8 @@ project(':clients') {
 
   dependencies {
     compile "org.slf4j:slf4j-api:1.7.6"
+    compile 'org.xerial.snappy:snappy-java:1.0.5'
+
     testCompile 'com.novocode:junit-interface:0.9'
     testRuntime "$slf4jlog4j"
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/466a83b7/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 1ac6943..1ff9174 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -69,6 +69,7 @@ public class KafkaProducer implements Producer {
     private final Sender sender;
     private final Metrics metrics;
     private final Thread ioThread;
+    private final CompressionType compressionType;
 
     /**
      * A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings
@@ -99,6 +100,7 @@ public class KafkaProducer implements Producer {
                                      config.getLong(ProducerConfig.METADATA_EXPIRY_CONFIG));
         this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
         this.totalMemorySize = config.getLong(ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG);
+        this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
         this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.MAX_PARTITION_SIZE_CONFIG),
                                                  this.totalMemorySize,
                                                  config.getLong(ProducerConfig.LINGER_MS_CONFIG),
@@ -224,7 +226,7 @@ public class KafkaProducer implements Producer {
             ensureValidSize(record.key(), record.value());
             TopicPartition tp = new TopicPartition(record.topic(), partition);
             log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
-            FutureRecordMetadata future = accumulator.append(tp, record.key(), record.value(), CompressionType.NONE, callback);
+            FutureRecordMetadata future = accumulator.append(tp, record.key(), record.value(), compressionType, callback);
             this.sender.wakeup();
             return future;
             // For API exceptions return them in the future;

http://git-wip-us.apache.org/repos/asf/kafka/blob/466a83b7/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index 32e12ad..48706ba 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -136,6 +136,11 @@ public class ProducerConfig extends AbstractConfig {
     public static final String RETRY_BACKOFF_MS_CONFIG = "retry.backoff.ms";
 
     /**
+     * The compression type for all data generated. The default is none (i.e. no compression)
+     */
+    public static final String COMPRESSION_TYPE_CONFIG = "compression.type";
+
+    /**
      * Should we register the Kafka metrics as JMX mbeans?
      */
     public static final String ENABLE_JMX_CONFIG = "enable.jmx";
@@ -158,9 +163,10 @@ public class ProducerConfig extends AbstractConfig {
                                 .define(MAX_REQUEST_SIZE_CONFIG, Type.INT, 1 * 1024 * 1024, atLeast(0), "blah blah")
                                 .define(RECONNECT_BACKOFF_MS_CONFIG, Type.LONG, 10L, atLeast(0L), "blah blah")
                                 .define(BLOCK_ON_BUFFER_FULL_CONFIG, Type.BOOLEAN, true, "blah blah")
-                                .define(ENABLE_JMX_CONFIG, Type.BOOLEAN, true, "")
                                 .define(MAX_RETRIES_CONFIG, Type.INT, 0, between(0, Integer.MAX_VALUE), "")
-                                .define(RETRY_BACKOFF_MS_CONFIG, Type.LONG, 100L, atLeast(0L), "blah blah");
+                                .define(RETRY_BACKOFF_MS_CONFIG, Type.LONG, 100L, atLeast(0L), "blah blah")
+                                .define(COMPRESSION_TYPE_CONFIG, Type.STRING, "none", "blah blah")
+                                .define(ENABLE_JMX_CONFIG, Type.BOOLEAN, true, "");
     }
 
     ProducerConfig(Map<? extends Object, ? extends Object> props) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/466a83b7/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
index b69866a..d1d6c4b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
@@ -71,7 +71,7 @@ public final class BufferPool {
      * @param size The buffer size to allocate in bytes
      * @return The buffer
      * @throws InterruptedException If the thread is interrupted while blocked
-     * @throws IllegalArgument if size is larger than the total memory controlled by the pool (and hence we would block
+     * @throws IllegalArgumentException if size is larger than the total memory controlled by the pool (and hence we would block
      *         forever)
      * @throws BufferExhaustedException if the pool is in non-blocking mode and size exceeds the free memory in the pool
      */
@@ -167,28 +167,31 @@ public final class BufferPool {
      * Return buffers to the pool. If they are of the poolable size add them to the free list, otherwise just mark the
      * memory as free.
      * 
-     * @param buffers The buffers to return
+     * @param buffer The buffer to return
+     * @param size The size of the buffer to mark as deallocated, note that this maybe smaller than buffer.capacity
+     *             since the buffer may re-allocate itself during in-place compression
      */
-    public void deallocate(ByteBuffer... buffers) {
+    public void deallocate(ByteBuffer buffer, int size) {
         lock.lock();
         try {
-            for (int i = 0; i < buffers.length; i++) {
-                int size = buffers[i].capacity();
-                if (size == this.poolableSize) {
-                    buffers[i].clear();
-                    this.free.add(buffers[i]);
-                } else {
-                    this.availableMemory += size;
-                }
-                Condition moreMem = this.waiters.peekFirst();
-                if (moreMem != null)
-                    moreMem.signal();
+            if (size == this.poolableSize && size == buffer.capacity()) {
+                buffer.clear();
+                this.free.add(buffer);
+            } else {
+                this.availableMemory += size;
             }
+            Condition moreMem = this.waiters.peekFirst();
+            if (moreMem != null)
+                moreMem.signal();
         } finally {
             lock.unlock();
         }
     }
 
+    public void deallocate(ByteBuffer buffer) {
+        deallocate(buffer, buffer.capacity());
+    }
+
     /**
      * the total free memory both unallocated and in the free list
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/466a83b7/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index 673b296..50bf95f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -28,8 +28,8 @@ import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.record.MemoryRecords;
-import org.apache.kafka.common.record.Record;
 import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.record.Record;
 import org.apache.kafka.common.utils.CopyOnWriteMap;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
@@ -91,26 +91,26 @@ public final class RecordAccumulator {
 
     private void registerMetrics(Metrics metrics) {
         metrics.addMetric("blocked_threads",
-                          "The number of user threads blocked waiting for buffer memory to enqueue their records",
-                          new Measurable() {
+            "The number of user threads blocked waiting for buffer memory to enqueue their records",
+            new Measurable() {
                               public double measure(MetricConfig config, long now) {
                                   return free.queued();
                               }
                           });
         metrics.addMetric("buffer_total_bytes",
-                          "The total amount of buffer memory that is available (not currently used for buffering records).",
-                          new Measurable() {
+            "The total amount of buffer memory that is available (not currently used for buffering records).",
+            new Measurable() {
                               public double measure(MetricConfig config, long now) {
                                   return free.totalMemory();
                               }
                           });
         metrics.addMetric("buffer_available_bytes",
-                          "The total amount of buffer memory that is available (not currently used for buffering records).",
-                          new Measurable() {
-                              public double measure(MetricConfig config, long now) {
-                                  return free.availableMemory();
-                              }
-                          });
+            "The total amount of buffer memory that is available (not currently used for buffering records).",
+            new Measurable() {
+                public double measure(MetricConfig config, long now) {
+                    return free.availableMemory();
+                }
+            });
     }
 
     /**
@@ -132,7 +132,7 @@ public final class RecordAccumulator {
         synchronized (dq) {
             RecordBatch batch = dq.peekLast();
             if (batch != null) {
-                FutureRecordMetadata future = batch.tryAppend(key, value, compression, callback);
+                FutureRecordMetadata future = batch.tryAppend(key, value, callback);
                 if (future != null)
                     return future;
             }
@@ -145,7 +145,7 @@ public final class RecordAccumulator {
         synchronized (dq) {
             RecordBatch last = dq.peekLast();
             if (last != null) {
-                FutureRecordMetadata future = last.tryAppend(key, value, compression, callback);
+                FutureRecordMetadata future = last.tryAppend(key, value, callback);
                 if (future != null) {
                     // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen
                     // often...
@@ -153,8 +153,10 @@ public final class RecordAccumulator {
                     return future;
                 }
             }
-            RecordBatch batch = new RecordBatch(tp, new MemoryRecords(buffer), time.milliseconds());
-            FutureRecordMetadata future = Utils.notNull(batch.tryAppend(key, value, compression, callback));
+            MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression);
+            RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
+            FutureRecordMetadata future = Utils.notNull(batch.tryAppend(key, value, callback));
+
             dq.addLast(batch);
             return future;
         }
@@ -193,7 +195,7 @@ public final class RecordAccumulator {
                 RecordBatch batch = deque.peekFirst();
                 if (batch != null) {
                     boolean backingOff = batch.attempts > 0 && batch.lastAttempt + retryBackoffMs > now;
-                    boolean full = deque.size() > 1 || !batch.records.buffer().hasRemaining();
+                    boolean full = deque.size() > 1 || batch.records.isFull();
                     boolean expired = now - batch.created >= lingerMs;
                     boolean sendable = full || expired || exhausted || closed;
                     if (sendable && !backingOff)
@@ -239,10 +241,15 @@ public final class RecordAccumulator {
             Deque<RecordBatch> deque = dequeFor(tp);
             if (deque != null) {
                 synchronized (deque) {
-                    if (size + deque.peekFirst().records.sizeInBytes() > maxSize) {
+                    RecordBatch first = deque.peekFirst();
+                    if (size + first.records.sizeInBytes() > maxSize && !ready.isEmpty()) {
+                        // there is a rare case that a single batch size is larger than the request size due
+                        // to compression; in this case we will still eventually send this batch in a single
+                        // request
                         return ready;
                     } else {
                         RecordBatch batch = deque.pollFirst();
+                        batch.records.close();
                         size += batch.records.sizeInBytes();
                         ready.add(batch);
                     }
@@ -269,7 +276,7 @@ public final class RecordAccumulator {
      * Deallocate the record batch
      */
     public void deallocate(RecordBatch batch) {
-        free.deallocate(batch.records.buffer());
+        free.deallocate(batch.records.buffer(), batch.records.capacity());
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/466a83b7/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
index 038a05a..35f1d7a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
@@ -17,7 +17,6 @@ import java.util.List;
 
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -54,11 +53,11 @@ public final class RecordBatch {
      * 
      * @return The RecordSend corresponding to this record or null if there isn't sufficient room.
      */
-    public FutureRecordMetadata tryAppend(byte[] key, byte[] value, CompressionType compression, Callback callback) {
+    public FutureRecordMetadata tryAppend(byte[] key, byte[] value, Callback callback) {
         if (!this.records.hasRoomFor(key, value)) {
             return null;
         } else {
-            this.records.append(0L, key, value, compression);
+            this.records.append(0L, key, value);
             FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount);
             if (callback != null)
                 thunks.add(new Thunk(callback, future));
@@ -71,7 +70,7 @@ public final class RecordBatch {
      * Complete the request
      * 
      * @param baseOffset The base offset of the messages assigned by the server
-     * @param errorCode The error code or 0 if no error
+     * @param exception The exception returned or null if no exception
      */
     public void done(long baseOffset, RuntimeException exception) {
         this.produceFuture.done(topicPartition, baseOffset, exception);

http://git-wip-us.apache.org/repos/asf/kafka/blob/466a83b7/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java b/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
index 3ebbb80..05085e0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
+++ b/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
@@ -29,8 +29,8 @@ import org.apache.kafka.common.record.Records;
 public class ProducerPerformance {
 
     public static void main(String[] args) throws Exception {
-        if (args.length != 5) {
-            System.err.println("USAGE: java " + ProducerPerformance.class.getName() + " url topic_name num_records record_size acks");
+        if (args.length < 5) {
+            System.err.println("USAGE: java " + ProducerPerformance.class.getName() + " url topic_name num_records record_size acks [compression_type]");
             System.exit(1);
         }
         String url = args[0];
@@ -45,6 +45,8 @@ public class ProducerPerformance {
         props.setProperty(ProducerConfig.REQUEST_TIMEOUT_CONFIG, Integer.toString(Integer.MAX_VALUE));
         props.setProperty(ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG, Integer.toString(256 * 1024 * 1024));
         props.setProperty(ProducerConfig.MAX_PARTITION_SIZE_CONFIG, Integer.toString(256 * 1024));
+        if (args.length == 6)
+            props.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, args[5]);
 
         KafkaProducer producer = new KafkaProducer(props);
         Callback callback = new Callback() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/466a83b7/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
index 906da02..c557e44 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
@@ -20,14 +20,16 @@ package org.apache.kafka.common.record;
  * The compression type to use
  */
 public enum CompressionType {
-    NONE(0, "none"), GZIP(1, "gzip"), SNAPPY(2, "snappy");
+    NONE(0, "none", 1.0f), GZIP(1, "gzip", 0.5f), SNAPPY(2, "snappy", 0.5f);
 
     public final int id;
     public final String name;
+    public final float rate;
 
-    private CompressionType(int id, String name) {
+    private CompressionType(int id, String name, float rate) {
         this.id = id;
         this.name = name;
+        this.rate = rate;
     }
 
     public static CompressionType forId(int id) {
@@ -53,4 +55,5 @@ public enum CompressionType {
         else
             throw new IllegalArgumentException("Unknown compression name: " + name);
     }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/466a83b7/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index 9d8935f..428968c 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -16,53 +16,99 @@
  */
 package org.apache.kafka.common.record;
 
+import java.io.DataInputStream;
+import java.io.EOFException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.GatheringByteChannel;
 import java.util.Iterator;
 
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.utils.AbstractIterator;
 
-
 /**
  * A {@link Records} implementation backed by a ByteBuffer.
  */
 public class MemoryRecords implements Records {
 
-    private final ByteBuffer buffer;
+    private final Compressor compressor;
+    private final int capacity;
+    private ByteBuffer buffer;
+    private boolean writable;
+
+    // Construct a writable memory records
+    private MemoryRecords(ByteBuffer buffer, CompressionType type, boolean writable) {
+        this.writable = writable;
+        this.capacity = buffer.capacity();
+        if (this.writable) {
+            this.buffer = null;
+            this.compressor = new Compressor(buffer, type);
+        } else {
+            this.buffer = buffer;
+            this.compressor = null;
+        }
+    }
 
-    public MemoryRecords(int size) {
-        this(ByteBuffer.allocate(size));
+    public static MemoryRecords emptyRecords(ByteBuffer buffer, CompressionType type) {
+        return new MemoryRecords(buffer, type, true);
     }
 
-    public MemoryRecords(ByteBuffer buffer) {
-        this.buffer = buffer;
+    public static MemoryRecords iterableRecords(ByteBuffer buffer) {
+        return new MemoryRecords(buffer, CompressionType.NONE, false);
     }
 
     /**
      * Append the given record and offset to the buffer
      */
     public void append(long offset, Record record) {
-        buffer.putLong(offset);
-        buffer.putInt(record.size());
-        buffer.put(record.buffer());
+        if (!writable)
+            throw new IllegalStateException("Memory records is not writable");
+
+        int size = record.size();
+        compressor.putLong(offset);
+        compressor.putInt(size);
+        compressor.put(record.buffer());
+        compressor.recordWritten(size + Records.LOG_OVERHEAD);
         record.buffer().rewind();
     }
 
     /**
      * Append a new record and offset to the buffer
      */
-    public void append(long offset, byte[] key, byte[] value, CompressionType type) {
-        buffer.putLong(offset);
-        buffer.putInt(Record.recordSize(key, value));
-        Record.write(this.buffer, key, value, type);
+    public void append(long offset, byte[] key, byte[] value) {
+        if (!writable)
+            throw new IllegalStateException("Memory records is not writable");
+
+        int size = Record.recordSize(key, value);
+        compressor.putLong(offset);
+        compressor.putInt(size);
+        compressor.putRecord(key, value);
+        compressor.recordWritten(size + Records.LOG_OVERHEAD);
     }
 
     /**
      * Check if we have room for a new record containing the given key/value pair
+     *
+     * Note that the return value is based on the estimate of the bytes written to the compressor,
+     * which may not be accurate if compression is really used. When this happens, the following
+     * append may cause dynamic buffer re-allocation in the underlying byte buffer stream.
      */
     public boolean hasRoomFor(byte[] key, byte[] value) {
-        return this.buffer.remaining() >= Records.LOG_OVERHEAD + Record.recordSize(key, value);
+        return this.writable &&
+            this.capacity >= this.compressor.estimatedBytesWritten() + Records.LOG_OVERHEAD + Record.recordSize(key, value);
+    }
+
+    public boolean isFull() {
+        return !this.writable || this.capacity <= this.compressor.estimatedBytesWritten();
+    }
+
+    /**
+     * Close this batch for no more appends
+     */
+    public void close() {
+        compressor.close();
+        writable = false;
+        buffer = compressor.buffer();
     }
 
     /** Write the records in this set to the given channel */
@@ -74,7 +120,14 @@ public class MemoryRecords implements Records {
      * The size of this record set
      */
     public int sizeInBytes() {
-        return this.buffer.position();
+        return compressor.buffer().position();
+    }
+
+    /**
+     * Return the capacity of the buffer
+     */
+    public int capacity() {
+        return this.capacity;
     }
 
     /**
@@ -86,34 +139,79 @@ public class MemoryRecords implements Records {
 
     @Override
     public Iterator<LogEntry> iterator() {
-        return new RecordsIterator(this.buffer);
+        ByteBuffer copy = (ByteBuffer) this.buffer.duplicate().flip();
+        return new RecordsIterator(copy, CompressionType.NONE, false);
     }
 
-    /* TODO: allow reuse of the buffer used for iteration */
     public static class RecordsIterator extends AbstractIterator<LogEntry> {
         private final ByteBuffer buffer;
-
-        public RecordsIterator(ByteBuffer buffer) {
-            ByteBuffer copy = buffer.duplicate();
-            copy.flip();
-            this.buffer = copy;
+        private final DataInputStream stream;
+        private final CompressionType type;
+        private final boolean shallow;
+        private RecordsIterator innerIter;
+
+        public RecordsIterator(ByteBuffer buffer, CompressionType type, boolean shallow) {
+            this.type = type;
+            this.buffer = buffer;
+            this.shallow = shallow;
+            stream = Compressor.wrapForInput(new ByteBufferInputStream(this.buffer), type);
         }
 
+        /*
+         * Read the next record from the buffer.
+         *
+         * Note that in the compressed message set, each message value size is set as the size
+         * of the un-compressed version of the message value, so when we do de-compression
+         * allocating an array of the specified size for reading compressed value data is sufficient.
+         */
         @Override
         protected LogEntry makeNext() {
-            if (buffer.remaining() < Records.LOG_OVERHEAD)
-                return allDone();
-            long offset = buffer.getLong();
-            int size = buffer.getInt();
-            if (size < 0)
-                throw new IllegalStateException("Record with size " + size);
-            if (buffer.remaining() < size)
-                return allDone();
-            ByteBuffer rec = buffer.slice();
-            rec.limit(size);
-            this.buffer.position(this.buffer.position() + size);
-            return new LogEntry(offset, new Record(rec));
+            if (innerDone()) {
+                try {
+                    // read the offset
+                    long offset = stream.readLong();
+                    // read record size
+                    int size = stream.readInt();
+                    if (size < 0)
+                        throw new IllegalStateException("Record with size " + size);
+                    // read the record, if compression is used we cannot depend on size
+                    // and hence has to do extra copy
+                    ByteBuffer rec;
+                    if (type == CompressionType.NONE) {
+                        rec = buffer.slice();
+                        buffer.position(buffer.position() + size);
+                        rec.limit(size);
+                    } else {
+                        byte[] recordBuffer = new byte[size];
+                        stream.read(recordBuffer, 0, size);
+                        rec = ByteBuffer.wrap(recordBuffer);
+                    }
+                    LogEntry entry = new LogEntry(offset, new Record(rec));
+                    entry.record().ensureValid();
+
+                    // decide whether to go shallow or deep iteration if it is compressed
+                    CompressionType compression = entry.record().compressionType();
+                    if (compression == CompressionType.NONE || shallow) {
+                        return entry;
+                    } else {
+                        // init the inner iterator with the value payload of the message,
+                        // which will de-compress the payload to a set of messages
+                        ByteBuffer value = entry.record().value();
+                        innerIter = new RecordsIterator(value, compression, true);
+                        return innerIter.next();
+                    }
+                } catch (EOFException e) {
+                    return allDone();
+                } catch (IOException e) {
+                    throw new KafkaException(e);
+                }
+            } else {
+                return innerIter.next();
+            }
         }
-    }
 
+        private boolean innerDone() {
+            return (innerIter == null || !innerIter.hasNext());
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/466a83b7/clients/src/main/java/org/apache/kafka/common/record/Record.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Record.java b/clients/src/main/java/org/apache/kafka/common/record/Record.java
index f1dc977..ce1177e 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/Record.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/Record.java
@@ -18,6 +18,7 @@ package org.apache.kafka.common.record;
 
 import java.nio.ByteBuffer;
 
+import org.apache.kafka.common.utils.Crc32;
 import org.apache.kafka.common.utils.Utils;
 
 
@@ -40,13 +41,15 @@ public final class Record {
     public static final int KEY_OFFSET = KEY_SIZE_OFFSET + KEY_SIZE_LENGTH;
     public static final int VALUE_SIZE_LENGTH = 4;
 
-    /** The amount of overhead bytes in a record */
-    public static final int RECORD_OVERHEAD = KEY_OFFSET + VALUE_SIZE_LENGTH;
+    /**
+     * The size for the record header
+     */
+    public static final int HEADER_SIZE = CRC_LENGTH + MAGIC_LENGTH + ATTRIBUTE_LENGTH;
 
     /**
-     * The minimum valid size for the record header
+     * The amount of overhead bytes in a record
      */
-    public static final int MIN_HEADER_SIZE = CRC_LENGTH + MAGIC_LENGTH + ATTRIBUTE_LENGTH + KEY_SIZE_LENGTH + VALUE_SIZE_LENGTH;
+    public static final int RECORD_OVERHEAD = HEADER_SIZE + KEY_SIZE_LENGTH + VALUE_SIZE_LENGTH;
 
     /**
      * The current "magic" value
@@ -71,27 +74,29 @@ public final class Record {
     }
 
     /**
-     * A constructor to create a LogRecord
+     * A constructor to create a LogRecord. If the record's compression type is not none, then
+     * its value payload should be already compressed with the specified type; the constructor
+     * would always write the value payload as is and will not do the compression itself.
      * 
      * @param key The key of the record (null, if none)
      * @param value The record value
-     * @param codec The compression codec used on the contents of the record (if any)
+     * @param type The compression type used on the contents of the record (if any)
      * @param valueOffset The offset into the payload array used to extract payload
      * @param valueSize The size of the payload to use
      */
-    public Record(byte[] key, byte[] value, CompressionType codec, int valueOffset, int valueSize) {
-        this(ByteBuffer.allocate(recordSize(key == null ? 0 : key.length, value == null ? 0 : valueSize >= 0 ? valueSize
-                                                                                                            : value.length - valueOffset)));
-        write(this.buffer, key, value, codec, valueOffset, valueSize);
+    public Record(byte[] key, byte[] value, CompressionType type, int valueOffset, int valueSize) {
+        this(ByteBuffer.allocate(recordSize(key == null ? 0 : key.length,
+            value == null ? 0 : valueSize >= 0 ? valueSize : value.length - valueOffset)));
+        write(this.buffer, key, value, type, valueOffset, valueSize);
         this.buffer.rewind();
     }
 
-    public Record(byte[] key, byte[] value, CompressionType codec) {
-        this(key, value, codec, 0, -1);
+    public Record(byte[] key, byte[] value, CompressionType type) {
+        this(key, value, type, 0, -1);
     }
 
-    public Record(byte[] value, CompressionType codec) {
-        this(null, value, codec);
+    public Record(byte[] value, CompressionType type) {
+        this(null, value, type);
     }
 
     public Record(byte[] key, byte[] value) {
@@ -102,40 +107,37 @@ public final class Record {
         this(null, value, CompressionType.NONE);
     }
 
-    public static void write(ByteBuffer buffer, byte[] key, byte[] value, CompressionType codec, int valueOffset, int valueSize) {
-        // skip crc, we will fill that in at the end
-        int pos = buffer.position();
-        buffer.position(pos + MAGIC_OFFSET);
-        buffer.put(CURRENT_MAGIC_VALUE);
-        byte attributes = 0;
-        if (codec.id > 0)
-            attributes = (byte) (attributes | (COMPRESSION_CODEC_MASK & codec.id));
-        buffer.put(attributes);
+    // Write a record to the buffer, if the record's compression type is none, then
+    // its value payload should be already compressed with the specified type
+    public static void write(ByteBuffer buffer, byte[] key, byte[] value, CompressionType type, int valueOffset, int valueSize) {
+        // construct the compressor with compression type none since this function will not do any
+        //compression according to the input type, it will just write the record's payload as is
+        Compressor compressor = new Compressor(buffer, CompressionType.NONE, buffer.capacity());
+        compressor.putRecord(key, value, type, valueOffset, valueSize);
+    }
+
+    public static void write(Compressor compressor, long crc, byte attributes, byte[] key, byte[] value, int valueOffset, int valueSize) {
+        // write crc
+        compressor.putInt((int) (crc & 0xffffffffL));
+        // write magic value
+        compressor.putByte(CURRENT_MAGIC_VALUE);
+        // write attributes
+        compressor.putByte(attributes);
         // write the key
         if (key == null) {
-            buffer.putInt(-1);
+            compressor.putInt(-1);
         } else {
-            buffer.putInt(key.length);
-            buffer.put(key, 0, key.length);
+            compressor.putInt(key.length);
+            compressor.put(key, 0, key.length);
         }
         // write the value
         if (value == null) {
-            buffer.putInt(-1);
+            compressor.putInt(-1);
         } else {
             int size = valueSize >= 0 ? valueSize : (value.length - valueOffset);
-            buffer.putInt(size);
-            buffer.put(value, valueOffset, size);
+            compressor.putInt(size);
+            compressor.put(value, valueOffset, size);
         }
-
-        // now compute the checksum and fill it in
-        long crc = computeChecksum(buffer,
-                                   buffer.arrayOffset() + pos + MAGIC_OFFSET,
-                                   buffer.position() - pos - MAGIC_OFFSET - buffer.arrayOffset());
-        Utils.writeUnsignedInt(buffer, pos + CRC_OFFSET, crc);
-    }
-
-    public static void write(ByteBuffer buffer, byte[] key, byte[] value, CompressionType codec) {
-        write(buffer, key, value, codec, 0, -1);
     }
 
     public static int recordSize(byte[] key, byte[] value) {
@@ -150,13 +152,51 @@ public final class Record {
         return this.buffer;
     }
 
+    public static byte computeAttributes(CompressionType type) {
+        byte attributes = 0;
+        if (type.id > 0)
+            attributes = (byte) (attributes | (COMPRESSION_CODEC_MASK & type.id));
+        return attributes;
+    }
+
     /**
      * Compute the checksum of the record from the record contents
      */
     public static long computeChecksum(ByteBuffer buffer, int position, int size) {
-        return Utils.crc32(buffer.array(), buffer.arrayOffset() + position, size - buffer.arrayOffset());
+        Crc32 crc = new Crc32();
+        crc.update(buffer.array(), buffer.arrayOffset() + position, size);
+        return crc.getValue();
+    }
+
+    /**
+     * Compute the checksum of the record from the attributes, key and value payloads
+     */
+    public static long computeChecksum(byte[] key, byte[] value, CompressionType type, int valueOffset, int valueSize) {
+        Crc32 crc = new Crc32();
+        crc.update(CURRENT_MAGIC_VALUE);
+        byte attributes = 0;
+        if (type.id > 0)
+            attributes = (byte) (attributes | (COMPRESSION_CODEC_MASK & type.id));
+        crc.update(attributes);
+        // update for the key
+        if (key == null) {
+            crc.updateInt(-1);
+        } else {
+            crc.updateInt(key.length);
+            crc.update(key, 0, key.length);
+        }
+        // update for the value
+        if (value == null) {
+            crc.updateInt(-1);
+        } else {
+            int size = valueSize >= 0 ? valueSize : (value.length - valueOffset);
+            crc.updateInt(size);
+            crc.update(value, valueOffset, size);
+        }
+        return crc.getValue();
     }
 
+
     /**
      * Compute the checksum of the record from the record contents
      */
@@ -239,7 +279,7 @@ public final class Record {
     }
 
     /**
-     * The compression codec used with this record
+     * The compression type used with this record
      */
     public CompressionType compressionType() {
         return CompressionType.forId(buffer.get(ATTRIBUTES_OFFSET) & COMPRESSION_CODEC_MASK);

http://git-wip-us.apache.org/repos/asf/kafka/blob/466a83b7/clients/src/main/java/org/apache/kafka/common/utils/Crc32.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Crc32.java b/clients/src/main/java/org/apache/kafka/common/utils/Crc32.java
index 153c5a6..047ca98 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/Crc32.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Crc32.java
@@ -28,6 +28,30 @@ import java.util.zip.Checksum;
  */
 public class Crc32 implements Checksum {
 
+    /**
+     * Compute the CRC32 of the byte array
+     *
+     * @param bytes The array to compute the checksum for
+     * @return The CRC32
+     */
+    public static long crc32(byte[] bytes) {
+        return crc32(bytes, 0, bytes.length);
+    }
+
+    /**
+     * Compute the CRC32 of the segment of the byte array given by the specified size and offset
+     *
+     * @param bytes The bytes to checksum
+     * @param offset the offset at which to begin checksumming
+     * @param size the number of bytes to checksum
+     * @return The CRC32
+     */
+    public static long crc32(byte[] bytes, int offset, int size) {
+        Crc32 crc = new Crc32();
+        crc.update(bytes, offset, size);
+        return crc.getValue();
+    }
+
     /** the current CRC value, bit-flipped */
     private int crc;
 
@@ -97,6 +121,18 @@ public class Crc32 implements Checksum {
         crc = (crc >>> 8) ^ T[T8_0_start + ((crc ^ b) & 0xff)];
     }
 
+    /**
+     * Update the CRC32 given an integer
+     */
+    final public void updateInt(int input) {
+        update((byte) (input >> 24));
+        update((byte) (input >> 16));
+        update((byte) (input >> 8));
+        update((byte) input /* >> 0 */);
+    }
+
+
+
     /*
      * CRC-32 lookup tables generated by the polynomial 0xEDB88320. See also TestPureJavaCrc32.Table.
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/466a83b7/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index 0c6b365..50af601 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -92,30 +92,6 @@ public class Utils {
     }
 
     /**
-     * Compute the CRC32 of the byte array
-     * 
-     * @param bytes The array to compute the checksum for
-     * @return The CRC32
-     */
-    public static long crc32(byte[] bytes) {
-        return crc32(bytes, 0, bytes.length);
-    }
-
-    /**
-     * Compute the CRC32 of the segment of the byte array given by the specificed size and offset
-     * 
-     * @param bytes The bytes to checksum
-     * @param offset the offset at which to begin checksumming
-     * @param size the number of bytes to checksum
-     * @return The CRC32
-     */
-    public static long crc32(byte[] bytes, int offset, int size) {
-        Crc32 crc = new Crc32();
-        crc.update(bytes, offset, size);
-        return crc.getValue();
-    }
-
-    /**
      * Get the absolute value of the given number. If the number is Int.MinValue return 0. This is different from
      * java.lang.Math.abs or scala.math.abs in that they return Int.MinValue (!).
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/466a83b7/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
index b0745b5..94a1112 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
@@ -22,29 +22,35 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
+import java.util.*;
 
-import org.apache.kafka.common.record.LogEntry;
-import org.apache.kafka.common.record.MemoryRecords;
-import org.apache.kafka.common.record.Record;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
+@RunWith(value = Parameterized.class)
 public class MemoryRecordsTest {
 
+    private CompressionType compression;
+
+    public MemoryRecordsTest(CompressionType compression) {
+        this.compression = compression;
+    }
+
     @Test
     public void testIterator() {
-        MemoryRecords recs1 = new MemoryRecords(ByteBuffer.allocate(1024));
-        MemoryRecords recs2 = new MemoryRecords(ByteBuffer.allocate(1024));
+        MemoryRecords recs1 = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), compression);
+        MemoryRecords recs2 = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), compression);
         List<Record> list = Arrays.asList(new Record("a".getBytes(), "1".getBytes()),
                                           new Record("b".getBytes(), "2".getBytes()),
                                           new Record("c".getBytes(), "3".getBytes()));
         for (int i = 0; i < list.size(); i++) {
             Record r = list.get(i);
             recs1.append(i, r);
-            recs2.append(i, toArray(r.key()), toArray(r.value()), r.compressionType());
+            recs2.append(i, toArray(r.key()), toArray(r.value()));
         }
+        recs1.close();
+        recs2.close();
 
         for (int iteration = 0; iteration < 2; iteration++) {
             for (MemoryRecords recs : Arrays.asList(recs1, recs2)) {
@@ -54,10 +60,18 @@ public class MemoryRecordsTest {
                     LogEntry entry = iter.next();
                     assertEquals((long) i, entry.offset());
                     assertEquals(list.get(i), entry.record());
+                    entry.record().ensureValid();
                 }
                 assertFalse(iter.hasNext());
             }
         }
     }
 
+    @Parameterized.Parameters
+    public static Collection<Object[]> data() {
+        List<Object[]> values = new ArrayList<Object[]>();
+        for (CompressionType type: CompressionType.values())
+            values.add(new Object[] { type });
+        return values;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/466a83b7/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java b/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java
index ae54d67..2765913 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java
@@ -27,9 +27,6 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 
-import org.apache.kafka.common.record.CompressionType;
-import org.apache.kafka.common.record.InvalidRecordException;
-import org.apache.kafka.common.record.Record;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -66,6 +63,10 @@ public class RecordTest {
     @Test
     public void testChecksum() {
         assertEquals(record.checksum(), record.computeChecksum());
+        assertEquals(record.checksum(), record.computeChecksum(
+            this.key == null ? null : this.key.array(),
+            this.value == null ? null : this.value.array(),
+            this.compression, 0, -1));
         assertTrue(record.isValid());
         for (int i = Record.CRC_OFFSET + Record.CRC_LENGTH; i < record.size(); i++) {
             Record copy = copyOf(record);
@@ -95,9 +96,11 @@ public class RecordTest {
 
     @Parameters
     public static Collection<Object[]> data() {
+        byte[] payload = new byte[1000];
+        Arrays.fill(payload, (byte) 1);
         List<Object[]> values = new ArrayList<Object[]>();
-        for (byte[] key : Arrays.asList(null, "".getBytes(), "key".getBytes()))
-            for (byte[] value : Arrays.asList(null, "".getBytes(), "value".getBytes()))
+        for (byte[] key : Arrays.asList(null, "".getBytes(), "key".getBytes(), payload))
+            for (byte[] value : Arrays.asList(null, "".getBytes(), "value".getBytes(), payload))
                 for (CompressionType compression : CompressionType.values())
                     values.add(new Object[] { key, value, compression });
         return values;

http://git-wip-us.apache.org/repos/asf/kafka/blob/466a83b7/clients/src/test/java/org/apache/kafka/test/TestUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
index 36cfc0f..76a17e8 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
@@ -88,7 +88,7 @@ public class TestUtils {
     /**
      * Generate an array of random bytes
      * 
-     * @param numBytes The size of the array
+     * @param size The size of the array
      */
     public static byte[] randomBytes(int size) {
         byte[] bytes = new byte[size];

http://git-wip-us.apache.org/repos/asf/kafka/blob/466a83b7/core/src/main/scala/kafka/producer/ConsoleProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/ConsoleProducer.scala b/core/src/main/scala/kafka/producer/ConsoleProducer.scala
index dd39ff2..57386b1 100644
--- a/core/src/main/scala/kafka/producer/ConsoleProducer.scala
+++ b/core/src/main/scala/kafka/producer/ConsoleProducer.scala
@@ -255,8 +255,8 @@ object ConsoleProducer {
   class NewShinyProducer(producerConfig: ProducerConfig) extends Producer {
     val props = new Properties()
     props.put("metadata.broker.list", producerConfig.brokerList)
-    val codec = if(producerConfig.compress) DefaultCompressionCodec.codec else NoCompressionCodec.codec
-    props.put("compression.codec", codec.toString)
+    val compression = if(producerConfig.compress) DefaultCompressionCodec.name else NoCompressionCodec.name
+    props.put("compression.type", compression)
     props.put("send.buffer.bytes", producerConfig.socketBuffer.toString)
     props.put("metadata.fetch.backoff.ms", producerConfig.retryBackoffMs.toString)
     props.put("metadata.expiry.ms", producerConfig.metadataExpiryMs.toString)

http://git-wip-us.apache.org/repos/asf/kafka/blob/466a83b7/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
index c002f5e..525a060 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
@@ -319,7 +319,6 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness
     producerProps.put(ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG, bufferSize.toString)
     producerProps.put(ProducerConfig.MAX_RETRIES_CONFIG, 10.toString)
     producerProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 1000.toString)
-
     val producer = new KafkaProducer(producerProps)
 
     override def doWork(): Unit = {
@@ -335,5 +334,10 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness
         case e : Exception => failed = true
       }
     }
+
+    override def shutdown(){
+      super.shutdown()
+      producer.close
+    }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/466a83b7/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
index 66ea76b..3c37330 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
@@ -17,21 +17,20 @@
 
 package kafka.api.test
 
-import kafka.server.{KafkaConfig, KafkaServer}
-import kafka.utils.{Utils, TestUtils}
-import kafka.zk.ZooKeeperTestHarness
-import kafka.consumer.SimpleConsumer
-import kafka.api.FetchRequestBuilder
-import kafka.message.Message
+import java.util.Properties
+import java.lang.{Integer, IllegalArgumentException}
 
 import org.apache.kafka.clients.producer._
-
 import org.scalatest.junit.JUnit3Suite
 import org.junit.Test
 import org.junit.Assert._
 
-import java.util.Properties
-import java.lang.{Integer, IllegalArgumentException}
+import kafka.server.{KafkaConfig, KafkaServer}
+import kafka.utils.{Utils, TestUtils}
+import kafka.zk.ZooKeeperTestHarness
+import kafka.consumer.SimpleConsumer
+import kafka.api.FetchRequestBuilder
+import kafka.message.Message
 
 
 class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness {
@@ -76,15 +75,10 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness {
     super.tearDown()
   }
 
-  class PrintOffsetCallback extends Callback {
+  class CheckErrorCallback extends Callback {
     def onCompletion(metadata: RecordMetadata, exception: Exception) {
       if (exception != null)
         fail("Send callback returns the following exception", exception)
-      try {
-        System.out.println("The message we just sent is marked as [" + metadata.partition + "] : " + metadata.offset);
-      } catch {
-        case e: Throwable => fail("Should succeed sending the message", e)
-      }
     }
   }
 
@@ -100,7 +94,7 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness {
     props.put(ProducerConfig.BROKER_LIST_CONFIG, TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)))
     var producer = new KafkaProducer(props)
 
-    val callback = new PrintOffsetCallback
+    val callback = new CheckErrorCallback
 
     try {
       // create topic

http://git-wip-us.apache.org/repos/asf/kafka/blob/466a83b7/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
----------------------------------------------------------------------
diff --git a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
index 3df0d13..9e4ebaf 100644
--- a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
+++ b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
@@ -212,6 +212,7 @@ object ProducerPerformance extends Logging {
     props.put("request.timeout.ms", config.producerRequestTimeoutMs.toString)
     props.put("request.retries", config.producerNumRetries.toString)
     props.put("retry.backoff.ms", config.producerRetryBackoffMs.toString)
+    props.put("compression.type", config.compressionCodec.name)
     val producer = new KafkaProducer(props)
 
     def send(topic: String, partition: Long, bytes: Array[Byte]) {