You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2014/03/27 05:50:37 UTC

git commit: KAFKA-1253 Compression in the new producer: follow up patch to push new files

Repository: kafka
Updated Branches:
  refs/heads/trunk 466a83b78 -> 9bc47bc13


KAFKA-1253 Compression in the new producer: follow up patch to push new files


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9bc47bc1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9bc47bc1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9bc47bc1

Branch: refs/heads/trunk
Commit: 9bc47bc1365d17c34f8d43239846de28219a663c
Parents: 466a83b
Author: Neha Narkhede <ne...@gmail.com>
Authored: Wed Mar 26 21:50:30 2014 -0700
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Wed Mar 26 21:50:30 2014 -0700

----------------------------------------------------------------------
 .../common/record/ByteBufferInputStream.java    |  49 ++++
 .../common/record/ByteBufferOutputStream.java   |  57 +++++
 .../apache/kafka/common/record/Compressor.java  | 244 +++++++++++++++++++
 .../org/apache/kafka/common/utils/CrcTest.java  |  59 +++++
 .../kafka/api/ProducerCompressionTest.scala     | 126 ++++++++++
 5 files changed, 535 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9bc47bc1/clients/src/main/java/org/apache/kafka/common/record/ByteBufferInputStream.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferInputStream.java
new file mode 100644
index 0000000..12651d4
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferInputStream.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * A byte buffer backed input outputStream
+ */
+public class ByteBufferInputStream extends InputStream {
+
+    private ByteBuffer buffer;
+
+    public ByteBufferInputStream(ByteBuffer buffer) {
+        this.buffer = buffer;
+    }
+
+    public int read() {
+        if (!buffer.hasRemaining()) {
+            return -1;
+        }
+        return buffer.get() & 0xFF;
+    }
+
+    public int read(byte[] bytes, int off, int len) {
+        if (!buffer.hasRemaining()) {
+            return -1;
+        }
+
+        len = Math.min(len, buffer.remaining());
+        buffer.get(bytes, off, len);
+        return len;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9bc47bc1/clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java
new file mode 100644
index 0000000..c7bd2f8
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * A byte buffer backed output outputStream
+ */
+public class ByteBufferOutputStream extends OutputStream {
+
+    private static float REALLOCATION_FACTOR = 1.1f;
+
+    private ByteBuffer buffer;
+
+    public ByteBufferOutputStream(ByteBuffer buffer) {
+        this.buffer = buffer;
+    }
+
+    public void write(int b) {
+        if (buffer.remaining() < 1)
+            expandBuffer(buffer.capacity() + 1);
+        buffer.put((byte) b);
+    }
+
+    public void write(byte[] bytes, int off, int len) {
+        if (buffer.remaining() < len)
+            expandBuffer(buffer.capacity() + len);
+        buffer.put(bytes, off, len);
+    }
+
+    public ByteBuffer buffer() {
+        return buffer;
+    }
+
+    private void expandBuffer(int size) {
+        int expandSize = Math.max((int) (buffer.capacity() * REALLOCATION_FACTOR), size);
+        ByteBuffer temp = ByteBuffer.allocate(expandSize);
+        temp.put(buffer.array(), buffer.arrayOffset(), buffer.position());
+        buffer = temp;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9bc47bc1/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
new file mode 100644
index 0000000..6ae3d06
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
@@ -0,0 +1,244 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+public class Compressor {
+
+    static private final float COMPRESSION_RATE_DAMPING_FACTOR = 0.9f;
+    static private final float COMPRESSION_RATE_ESTIMATION_FACTOR = 1.05f;
+    static private final int COMPRESSION_DEFAULT_BUFFER_SIZE = 1024;
+
+    private static float[] typeToRate;
+    private static int MAX_TYPE_ID = -1;
+
+    static {
+        for (CompressionType type : CompressionType.values()) {
+            MAX_TYPE_ID = Math.max(MAX_TYPE_ID, type.id);
+        }
+        typeToRate = new float[MAX_TYPE_ID+1];
+        for (CompressionType type : CompressionType.values()) {
+            typeToRate[type.id] = type.rate;
+        }
+    }
+
+    private final CompressionType type;
+    private final DataOutputStream appendStream;
+    private final ByteBufferOutputStream bufferStream;
+    private final int initPos;
+
+    public long writtenUncompressed;
+    public long numRecords;
+
+    public Compressor(ByteBuffer buffer, CompressionType type, int blockSize) {
+        this.type = type;
+        this.initPos = buffer.position();
+
+        this.numRecords = 0;
+        this.writtenUncompressed = 0;
+
+        if (type != CompressionType.NONE) {
+            // for compressed records, leave space for the header and the shallow message metadata
+            // and move the starting position to the value payload offset
+            buffer.position(initPos + Records.LOG_OVERHEAD + Record.RECORD_OVERHEAD);
+        }
+
+        // create the stream
+        bufferStream = new ByteBufferOutputStream(buffer);
+        appendStream = wrapForOutput(bufferStream, type, blockSize);
+    }
+
+    public Compressor(ByteBuffer buffer, CompressionType type) {
+        this(buffer, type, COMPRESSION_DEFAULT_BUFFER_SIZE);
+    }
+
+    public ByteBuffer buffer() {
+        return bufferStream.buffer();
+    }
+
+    public void close() {
+        try {
+            appendStream.close();
+        } catch (IOException e) {
+            throw new KafkaException(e);
+        }
+
+        if (type != CompressionType.NONE) {
+            ByteBuffer buffer = bufferStream.buffer();
+            int pos = buffer.position();
+            // write the header, for the end offset write as number of records - 1
+            buffer.position(initPos);
+            buffer.putLong(numRecords - 1);
+            buffer.putInt(pos - initPos - Records.LOG_OVERHEAD);
+            // write the shallow message (the crc and value size are not correct yet)
+            Record.write(buffer, null, null, type, 0, -1);
+            // compute the fill the value size
+            int valueSize = pos - initPos - Records.LOG_OVERHEAD - Record.RECORD_OVERHEAD;
+            buffer.putInt(initPos + Records.LOG_OVERHEAD + Record.KEY_OFFSET, valueSize);
+            // compute and fill the crc at the beginning of the message
+            long crc = Record.computeChecksum(buffer,
+                initPos + Records.LOG_OVERHEAD + Record.MAGIC_OFFSET,
+                pos - initPos - Records.LOG_OVERHEAD - Record.MAGIC_OFFSET);
+            Utils.writeUnsignedInt(buffer, initPos + Records.LOG_OVERHEAD + Record.CRC_OFFSET, crc);
+            // reset the position
+            buffer.position(pos);
+
+            // update the compression ratio
+            float compressionRate = (float) buffer.position() / this.writtenUncompressed;
+            typeToRate[type.id] = typeToRate[type.id] * COMPRESSION_RATE_DAMPING_FACTOR +
+                compressionRate * (1 - COMPRESSION_RATE_DAMPING_FACTOR);
+        }
+    }
+
+    // Note that for all the write operations below, IO exceptions should
+    // never be thrown since the underlying ByteBufferOutputStream does not throw IOException;
+    // therefore upon encountering this issue we just close the append stream.
+
+    public void putLong(final long value) {
+        try {
+            appendStream.writeLong(value);
+        } catch (IOException e) {
+            throw new KafkaException("I/O exception when writing to the append stream, closing", e);
+        }
+    }
+
+    public void putInt(final int value) {
+        try {
+            appendStream.writeInt(value);
+        } catch (IOException e) {
+            throw new KafkaException("I/O exception when writing to the append stream, closing", e);
+        }
+    }
+
+    public void put(final ByteBuffer buffer) {
+        try {
+            appendStream.write(buffer.array(), buffer.arrayOffset(), buffer.limit());
+        } catch (IOException e) {
+            throw new KafkaException("I/O exception when writing to the append stream, closing", e);
+        }
+    }
+
+    public void putByte(final byte value) {
+        try {
+            appendStream.write(value);
+        } catch (IOException e) {
+            throw new KafkaException("I/O exception when writing to the append stream, closing", e);
+        }
+    }
+
+    public void put(final byte[] bytes, final int offset, final int len) {
+        try {
+            appendStream.write(bytes, offset, len);
+        } catch (IOException e) {
+            throw new KafkaException("I/O exception when writing to the append stream, closing", e);
+        }
+    }
+
+    public void putRecord(byte[] key, byte[] value, CompressionType type, int valueOffset, int valueSize) {
+        // put a record as un-compressed into the underlying stream
+        long crc = Record.computeChecksum(key, value, type, valueOffset, valueSize);
+        byte attributes = Record.computeAttributes(type);
+        putRecord(crc, attributes, key, value, valueOffset, valueSize);
+    }
+
+    public void putRecord(byte[] key, byte[] value) {
+        putRecord(key, value, CompressionType.NONE, 0, -1);
+    }
+
+    private void putRecord(final long crc, final byte attributes, final byte[] key, final byte[] value, final int valueOffset, final int valueSize) {
+        Record.write(this, crc, attributes, key, value, valueOffset, valueSize);
+    }
+
+    public void recordWritten(int size) {
+        numRecords += 1;
+        writtenUncompressed += size;
+    }
+
+    public long estimatedBytesWritten() {
+        if (type == CompressionType.NONE) {
+            return bufferStream.buffer().position();
+        } else {
+            // estimate the written bytes to the underlying byte buffer based on uncompressed written bytes
+            return (long) (writtenUncompressed * typeToRate[type.id] * COMPRESSION_RATE_ESTIMATION_FACTOR);
+        }
+    }
+
+    // the following two functions also need to be public since they are used in MemoryRecords.iteration
+
+    static public DataOutputStream wrapForOutput(ByteBufferOutputStream buffer, CompressionType type, int bufferSize) {
+        try {
+            switch (type) {
+                case NONE:
+                    return new DataOutputStream(buffer);
+                case GZIP:
+                    return new DataOutputStream(new GZIPOutputStream(buffer, bufferSize));
+                case SNAPPY:
+                    // dynamically load the snappy class to avoid runtime dependency
+                    // on snappy if we are not using it
+                    try {
+                        Class SnappyOutputStream = Class.forName("org.xerial.snappy.SnappyOutputStream");
+                        OutputStream stream = (OutputStream) SnappyOutputStream.getConstructor(OutputStream.class, Integer.TYPE)
+                            .newInstance(buffer, bufferSize);
+                        return new DataOutputStream(stream);
+                    } catch (Exception e) {
+                        throw new KafkaException(e);
+                    }
+                default:
+                    throw new IllegalArgumentException("Unknown compression type: " + type);
+            }
+        } catch (IOException e) {
+            throw new KafkaException(e);
+        }
+    }
+
+    static public DataInputStream wrapForInput(ByteBufferInputStream buffer, CompressionType type) {
+        try {
+            switch (type) {
+                case NONE:
+                    return new DataInputStream(buffer);
+                case GZIP:
+                    return new DataInputStream(new GZIPInputStream(buffer));
+                case SNAPPY:
+                    // dynamically load the snappy class to avoid runtime dependency
+                    // on snappy if we are not using it
+                    try {
+                        Class SnappyInputStream = Class.forName("org.xerial.snappy.SnappyInputStream");
+                        InputStream stream = (InputStream) SnappyInputStream.getConstructor(InputStream.class)
+                            .newInstance(buffer);
+                        return new DataInputStream(stream);
+                    } catch (Exception e) {
+                        throw new KafkaException(e);
+                    }
+                default:
+                    throw new IllegalArgumentException("Unknown compression type: " + type);
+            }
+        } catch (IOException e) {
+            throw new KafkaException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9bc47bc1/clients/src/test/java/org/apache/kafka/common/utils/CrcTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/CrcTest.java b/clients/src/test/java/org/apache/kafka/common/utils/CrcTest.java
new file mode 100644
index 0000000..6b32381
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/utils/CrcTest.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import static org.junit.Assert.assertEquals;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+public class CrcTest {
+
+    @Test
+    public void testUpdate() {
+        final byte bytes[] = "Any String you want".getBytes();
+        final int len = bytes.length;
+
+        Crc32 crc1 = new Crc32();
+        Crc32 crc2 = new Crc32();
+        Crc32 crc3 = new Crc32();
+
+        crc1.update(bytes, 0, len);
+        for(int i = 0; i < len; i++)
+            crc2.update(bytes[i]);
+        crc3.update(bytes, 0, len/2);
+        crc3.update(bytes, len/2, len-len/2);
+
+        assertEquals("Crc values should be the same", crc1.getValue(), crc2.getValue());
+        assertEquals("Crc values should be the same", crc1.getValue(), crc3.getValue());
+    }
+
+    @Test
+    public void testUpdateInt() {
+        final int value = 1000;
+        final ByteBuffer buffer = ByteBuffer.allocate(4);
+        buffer.putInt(value);
+
+        Crc32 crc1 = new Crc32();
+        Crc32 crc2 = new Crc32();
+
+        crc1.updateInt(value);
+        crc2.update(buffer.array(), buffer.arrayOffset(), 4);
+
+        assertEquals("Crc values should be the same", crc1.getValue(), crc2.getValue());
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9bc47bc1/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
new file mode 100644
index 0000000..1d73aca
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api.test
+
+import java.util.{Properties, Collection, ArrayList}
+
+import org.scalatest.junit.JUnit3Suite
+import org.junit.runners.Parameterized
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized.Parameters
+import org.junit.{After, Before, Test}
+import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer, ProducerConfig}
+import org.junit.Assert._
+
+import kafka.api.FetchRequestBuilder
+import kafka.server.{KafkaConfig, KafkaServer}
+import kafka.consumer.SimpleConsumer
+import kafka.message.Message
+import kafka.zk.ZooKeeperTestHarness
+import kafka.utils.{Utils, TestUtils}
+
+import scala.Array
+
+
+@RunWith(value = classOf[Parameterized])
+class ProducerCompressionTest(compression: String) extends JUnit3Suite with ZooKeeperTestHarness {
+  private val brokerId = 0
+  private val port = TestUtils.choosePort
+  private var server: KafkaServer = null
+
+  private val props = TestUtils.createBrokerConfig(brokerId, port)
+  private val config = new KafkaConfig(props)
+
+  private val topic = "topic"
+  private val numRecords = 100
+
+  @Before
+  override def setUp() {
+    super.setUp()
+    server = TestUtils.createServer(config)
+  }
+
+  @After
+  override def tearDown() {
+    server.shutdown
+    Utils.rm(server.config.logDirs)
+    super.tearDown()
+  }
+
+  /**
+   * testCompression
+   *
+   * Compressed messages should be able to sent and consumed correctly
+   */
+  @Test
+  def testCompression() {
+
+    val props = new Properties()
+    props.put(ProducerConfig.BROKER_LIST_CONFIG, TestUtils.getBrokerListStrFromConfigs(Seq(config)))
+    props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compression)
+    var producer = new KafkaProducer(props)
+
+    try {
+      // create topic
+      TestUtils.createTopic(zkClient, topic, 1, 1, List(server))
+      val partition = 0
+
+      // prepare the messages
+      val messages = for (i <-0 until numRecords)
+        yield ("value" + i).getBytes
+
+      // make sure the returned messages are correct
+      val responses = for (message <- messages)
+        yield producer.send(new ProducerRecord(topic, null, null, message))
+      val futures = responses.toList
+      for ((future, offset) <- futures zip (0 until numRecords)) {
+        assertEquals(offset.toLong, future.get.offset)
+      }
+
+      // make sure the fetched message count match
+      val consumer = new SimpleConsumer("localhost", port, 100, 1024*1024, "")
+      val fetchResponse = consumer.fetch(new FetchRequestBuilder().addFetch(topic, partition, 0, Int.MaxValue).build())
+      val messageSet = fetchResponse.messageSet(topic, partition).iterator.toBuffer
+      assertEquals("Should have fetched " + numRecords + " messages", numRecords, messageSet.size)
+
+      var index = 0
+      for (message <- messages) {
+        assertEquals(new Message(bytes = message), messageSet(index).message)
+        assertEquals(index.toLong, messageSet(index).offset)
+        index += 1
+      }
+    } finally {
+      if (producer != null) {
+        producer.close()
+        producer = null
+      }
+    }
+  }
+}
+
+object ProducerCompressionTest {
+
+  // NOTE: Must return collection of Array[AnyRef] (NOT Array[Any]).
+  @Parameters
+  def parameters: Collection[Array[String]] = {
+    val list = new ArrayList[Array[String]]()
+    list.add(Array("gzip"))
+    list.add(Array("snappy"))
+    list
+  }
+}