You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2014/03/27 05:50:37 UTC
git commit: KAFKA-1253 Compression in the new producer: follow up
patch to push new files
Repository: kafka
Updated Branches:
refs/heads/trunk 466a83b78 -> 9bc47bc13
KAFKA-1253 Compression in the new producer: follow up patch to push new files
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9bc47bc1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9bc47bc1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9bc47bc1
Branch: refs/heads/trunk
Commit: 9bc47bc1365d17c34f8d43239846de28219a663c
Parents: 466a83b
Author: Neha Narkhede <ne...@gmail.com>
Authored: Wed Mar 26 21:50:30 2014 -0700
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Wed Mar 26 21:50:30 2014 -0700
----------------------------------------------------------------------
.../common/record/ByteBufferInputStream.java | 49 ++++
.../common/record/ByteBufferOutputStream.java | 57 +++++
.../apache/kafka/common/record/Compressor.java | 244 +++++++++++++++++++
.../org/apache/kafka/common/utils/CrcTest.java | 59 +++++
.../kafka/api/ProducerCompressionTest.scala | 126 ++++++++++
5 files changed, 535 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/9bc47bc1/clients/src/main/java/org/apache/kafka/common/record/ByteBufferInputStream.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferInputStream.java
new file mode 100644
index 0000000..12651d4
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferInputStream.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * A byte buffer backed input outputStream
+ */
+public class ByteBufferInputStream extends InputStream {
+
+ private ByteBuffer buffer;
+
+ public ByteBufferInputStream(ByteBuffer buffer) {
+ this.buffer = buffer;
+ }
+
+ public int read() {
+ if (!buffer.hasRemaining()) {
+ return -1;
+ }
+ return buffer.get() & 0xFF;
+ }
+
+ public int read(byte[] bytes, int off, int len) {
+ if (!buffer.hasRemaining()) {
+ return -1;
+ }
+
+ len = Math.min(len, buffer.remaining());
+ buffer.get(bytes, off, len);
+ return len;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9bc47bc1/clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java
new file mode 100644
index 0000000..c7bd2f8
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * A byte buffer backed output outputStream
+ */
+public class ByteBufferOutputStream extends OutputStream {
+
+ private static float REALLOCATION_FACTOR = 1.1f;
+
+ private ByteBuffer buffer;
+
+ public ByteBufferOutputStream(ByteBuffer buffer) {
+ this.buffer = buffer;
+ }
+
+ public void write(int b) {
+ if (buffer.remaining() < 1)
+ expandBuffer(buffer.capacity() + 1);
+ buffer.put((byte) b);
+ }
+
+ public void write(byte[] bytes, int off, int len) {
+ if (buffer.remaining() < len)
+ expandBuffer(buffer.capacity() + len);
+ buffer.put(bytes, off, len);
+ }
+
+ public ByteBuffer buffer() {
+ return buffer;
+ }
+
+ private void expandBuffer(int size) {
+ int expandSize = Math.max((int) (buffer.capacity() * REALLOCATION_FACTOR), size);
+ ByteBuffer temp = ByteBuffer.allocate(expandSize);
+ temp.put(buffer.array(), buffer.arrayOffset(), buffer.position());
+ buffer = temp;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9bc47bc1/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
new file mode 100644
index 0000000..6ae3d06
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
@@ -0,0 +1,244 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+public class Compressor {
+
+ static private final float COMPRESSION_RATE_DAMPING_FACTOR = 0.9f;
+ static private final float COMPRESSION_RATE_ESTIMATION_FACTOR = 1.05f;
+ static private final int COMPRESSION_DEFAULT_BUFFER_SIZE = 1024;
+
+ private static float[] typeToRate;
+ private static int MAX_TYPE_ID = -1;
+
+ static {
+ for (CompressionType type : CompressionType.values()) {
+ MAX_TYPE_ID = Math.max(MAX_TYPE_ID, type.id);
+ }
+ typeToRate = new float[MAX_TYPE_ID+1];
+ for (CompressionType type : CompressionType.values()) {
+ typeToRate[type.id] = type.rate;
+ }
+ }
+
+ private final CompressionType type;
+ private final DataOutputStream appendStream;
+ private final ByteBufferOutputStream bufferStream;
+ private final int initPos;
+
+ public long writtenUncompressed;
+ public long numRecords;
+
+ public Compressor(ByteBuffer buffer, CompressionType type, int blockSize) {
+ this.type = type;
+ this.initPos = buffer.position();
+
+ this.numRecords = 0;
+ this.writtenUncompressed = 0;
+
+ if (type != CompressionType.NONE) {
+ // for compressed records, leave space for the header and the shallow message metadata
+ // and move the starting position to the value payload offset
+ buffer.position(initPos + Records.LOG_OVERHEAD + Record.RECORD_OVERHEAD);
+ }
+
+ // create the stream
+ bufferStream = new ByteBufferOutputStream(buffer);
+ appendStream = wrapForOutput(bufferStream, type, blockSize);
+ }
+
+ public Compressor(ByteBuffer buffer, CompressionType type) {
+ this(buffer, type, COMPRESSION_DEFAULT_BUFFER_SIZE);
+ }
+
+ public ByteBuffer buffer() {
+ return bufferStream.buffer();
+ }
+
+ public void close() {
+ try {
+ appendStream.close();
+ } catch (IOException e) {
+ throw new KafkaException(e);
+ }
+
+ if (type != CompressionType.NONE) {
+ ByteBuffer buffer = bufferStream.buffer();
+ int pos = buffer.position();
+ // write the header, for the end offset write as number of records - 1
+ buffer.position(initPos);
+ buffer.putLong(numRecords - 1);
+ buffer.putInt(pos - initPos - Records.LOG_OVERHEAD);
+ // write the shallow message (the crc and value size are not correct yet)
+ Record.write(buffer, null, null, type, 0, -1);
+ // compute the fill the value size
+ int valueSize = pos - initPos - Records.LOG_OVERHEAD - Record.RECORD_OVERHEAD;
+ buffer.putInt(initPos + Records.LOG_OVERHEAD + Record.KEY_OFFSET, valueSize);
+ // compute and fill the crc at the beginning of the message
+ long crc = Record.computeChecksum(buffer,
+ initPos + Records.LOG_OVERHEAD + Record.MAGIC_OFFSET,
+ pos - initPos - Records.LOG_OVERHEAD - Record.MAGIC_OFFSET);
+ Utils.writeUnsignedInt(buffer, initPos + Records.LOG_OVERHEAD + Record.CRC_OFFSET, crc);
+ // reset the position
+ buffer.position(pos);
+
+ // update the compression ratio
+ float compressionRate = (float) buffer.position() / this.writtenUncompressed;
+ typeToRate[type.id] = typeToRate[type.id] * COMPRESSION_RATE_DAMPING_FACTOR +
+ compressionRate * (1 - COMPRESSION_RATE_DAMPING_FACTOR);
+ }
+ }
+
+ // Note that for all the write operations below, IO exceptions should
+ // never be thrown since the underlying ByteBufferOutputStream does not throw IOException;
+ // therefore upon encountering this issue we just close the append stream.
+
+ public void putLong(final long value) {
+ try {
+ appendStream.writeLong(value);
+ } catch (IOException e) {
+ throw new KafkaException("I/O exception when writing to the append stream, closing", e);
+ }
+ }
+
+ public void putInt(final int value) {
+ try {
+ appendStream.writeInt(value);
+ } catch (IOException e) {
+ throw new KafkaException("I/O exception when writing to the append stream, closing", e);
+ }
+ }
+
+ public void put(final ByteBuffer buffer) {
+ try {
+ appendStream.write(buffer.array(), buffer.arrayOffset(), buffer.limit());
+ } catch (IOException e) {
+ throw new KafkaException("I/O exception when writing to the append stream, closing", e);
+ }
+ }
+
+ public void putByte(final byte value) {
+ try {
+ appendStream.write(value);
+ } catch (IOException e) {
+ throw new KafkaException("I/O exception when writing to the append stream, closing", e);
+ }
+ }
+
+ public void put(final byte[] bytes, final int offset, final int len) {
+ try {
+ appendStream.write(bytes, offset, len);
+ } catch (IOException e) {
+ throw new KafkaException("I/O exception when writing to the append stream, closing", e);
+ }
+ }
+
+ public void putRecord(byte[] key, byte[] value, CompressionType type, int valueOffset, int valueSize) {
+ // put a record as un-compressed into the underlying stream
+ long crc = Record.computeChecksum(key, value, type, valueOffset, valueSize);
+ byte attributes = Record.computeAttributes(type);
+ putRecord(crc, attributes, key, value, valueOffset, valueSize);
+ }
+
+ public void putRecord(byte[] key, byte[] value) {
+ putRecord(key, value, CompressionType.NONE, 0, -1);
+ }
+
+ private void putRecord(final long crc, final byte attributes, final byte[] key, final byte[] value, final int valueOffset, final int valueSize) {
+ Record.write(this, crc, attributes, key, value, valueOffset, valueSize);
+ }
+
+ public void recordWritten(int size) {
+ numRecords += 1;
+ writtenUncompressed += size;
+ }
+
+ public long estimatedBytesWritten() {
+ if (type == CompressionType.NONE) {
+ return bufferStream.buffer().position();
+ } else {
+ // estimate the written bytes to the underlying byte buffer based on uncompressed written bytes
+ return (long) (writtenUncompressed * typeToRate[type.id] * COMPRESSION_RATE_ESTIMATION_FACTOR);
+ }
+ }
+
+ // the following two functions also need to be public since they are used in MemoryRecords.iteration
+
+ static public DataOutputStream wrapForOutput(ByteBufferOutputStream buffer, CompressionType type, int bufferSize) {
+ try {
+ switch (type) {
+ case NONE:
+ return new DataOutputStream(buffer);
+ case GZIP:
+ return new DataOutputStream(new GZIPOutputStream(buffer, bufferSize));
+ case SNAPPY:
+ // dynamically load the snappy class to avoid runtime dependency
+ // on snappy if we are not using it
+ try {
+ Class SnappyOutputStream = Class.forName("org.xerial.snappy.SnappyOutputStream");
+ OutputStream stream = (OutputStream) SnappyOutputStream.getConstructor(OutputStream.class, Integer.TYPE)
+ .newInstance(buffer, bufferSize);
+ return new DataOutputStream(stream);
+ } catch (Exception e) {
+ throw new KafkaException(e);
+ }
+ default:
+ throw new IllegalArgumentException("Unknown compression type: " + type);
+ }
+ } catch (IOException e) {
+ throw new KafkaException(e);
+ }
+ }
+
+ static public DataInputStream wrapForInput(ByteBufferInputStream buffer, CompressionType type) {
+ try {
+ switch (type) {
+ case NONE:
+ return new DataInputStream(buffer);
+ case GZIP:
+ return new DataInputStream(new GZIPInputStream(buffer));
+ case SNAPPY:
+ // dynamically load the snappy class to avoid runtime dependency
+ // on snappy if we are not using it
+ try {
+ Class SnappyInputStream = Class.forName("org.xerial.snappy.SnappyInputStream");
+ InputStream stream = (InputStream) SnappyInputStream.getConstructor(InputStream.class)
+ .newInstance(buffer);
+ return new DataInputStream(stream);
+ } catch (Exception e) {
+ throw new KafkaException(e);
+ }
+ default:
+ throw new IllegalArgumentException("Unknown compression type: " + type);
+ }
+ } catch (IOException e) {
+ throw new KafkaException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9bc47bc1/clients/src/test/java/org/apache/kafka/common/utils/CrcTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/CrcTest.java b/clients/src/test/java/org/apache/kafka/common/utils/CrcTest.java
new file mode 100644
index 0000000..6b32381
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/utils/CrcTest.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import static org.junit.Assert.assertEquals;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+public class CrcTest {
+
+ @Test
+ public void testUpdate() {
+ final byte bytes[] = "Any String you want".getBytes();
+ final int len = bytes.length;
+
+ Crc32 crc1 = new Crc32();
+ Crc32 crc2 = new Crc32();
+ Crc32 crc3 = new Crc32();
+
+ crc1.update(bytes, 0, len);
+ for(int i = 0; i < len; i++)
+ crc2.update(bytes[i]);
+ crc3.update(bytes, 0, len/2);
+ crc3.update(bytes, len/2, len-len/2);
+
+ assertEquals("Crc values should be the same", crc1.getValue(), crc2.getValue());
+ assertEquals("Crc values should be the same", crc1.getValue(), crc3.getValue());
+ }
+
+ @Test
+ public void testUpdateInt() {
+ final int value = 1000;
+ final ByteBuffer buffer = ByteBuffer.allocate(4);
+ buffer.putInt(value);
+
+ Crc32 crc1 = new Crc32();
+ Crc32 crc2 = new Crc32();
+
+ crc1.updateInt(value);
+ crc2.update(buffer.array(), buffer.arrayOffset(), 4);
+
+ assertEquals("Crc values should be the same", crc1.getValue(), crc2.getValue());
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9bc47bc1/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
new file mode 100644
index 0000000..1d73aca
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api.test
+
+import java.util.{Properties, Collection, ArrayList}
+
+import org.scalatest.junit.JUnit3Suite
+import org.junit.runners.Parameterized
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized.Parameters
+import org.junit.{After, Before, Test}
+import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer, ProducerConfig}
+import org.junit.Assert._
+
+import kafka.api.FetchRequestBuilder
+import kafka.server.{KafkaConfig, KafkaServer}
+import kafka.consumer.SimpleConsumer
+import kafka.message.Message
+import kafka.zk.ZooKeeperTestHarness
+import kafka.utils.{Utils, TestUtils}
+
+import scala.Array
+
+
+@RunWith(value = classOf[Parameterized])
+class ProducerCompressionTest(compression: String) extends JUnit3Suite with ZooKeeperTestHarness {
+ private val brokerId = 0
+ private val port = TestUtils.choosePort
+ private var server: KafkaServer = null
+
+ private val props = TestUtils.createBrokerConfig(brokerId, port)
+ private val config = new KafkaConfig(props)
+
+ private val topic = "topic"
+ private val numRecords = 100
+
+ @Before
+ override def setUp() {
+ super.setUp()
+ server = TestUtils.createServer(config)
+ }
+
+ @After
+ override def tearDown() {
+ server.shutdown
+ Utils.rm(server.config.logDirs)
+ super.tearDown()
+ }
+
+ /**
+ * testCompression
+ *
+ * Compressed messages should be able to sent and consumed correctly
+ */
+ @Test
+ def testCompression() {
+
+ val props = new Properties()
+ props.put(ProducerConfig.BROKER_LIST_CONFIG, TestUtils.getBrokerListStrFromConfigs(Seq(config)))
+ props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compression)
+ var producer = new KafkaProducer(props)
+
+ try {
+ // create topic
+ TestUtils.createTopic(zkClient, topic, 1, 1, List(server))
+ val partition = 0
+
+ // prepare the messages
+ val messages = for (i <-0 until numRecords)
+ yield ("value" + i).getBytes
+
+ // make sure the returned messages are correct
+ val responses = for (message <- messages)
+ yield producer.send(new ProducerRecord(topic, null, null, message))
+ val futures = responses.toList
+ for ((future, offset) <- futures zip (0 until numRecords)) {
+ assertEquals(offset.toLong, future.get.offset)
+ }
+
+ // make sure the fetched message count match
+ val consumer = new SimpleConsumer("localhost", port, 100, 1024*1024, "")
+ val fetchResponse = consumer.fetch(new FetchRequestBuilder().addFetch(topic, partition, 0, Int.MaxValue).build())
+ val messageSet = fetchResponse.messageSet(topic, partition).iterator.toBuffer
+ assertEquals("Should have fetched " + numRecords + " messages", numRecords, messageSet.size)
+
+ var index = 0
+ for (message <- messages) {
+ assertEquals(new Message(bytes = message), messageSet(index).message)
+ assertEquals(index.toLong, messageSet(index).offset)
+ index += 1
+ }
+ } finally {
+ if (producer != null) {
+ producer.close()
+ producer = null
+ }
+ }
+ }
+}
+
+object ProducerCompressionTest {
+
+ // NOTE: Must return collection of Array[AnyRef] (NOT Array[Any]).
+ @Parameters
+ def parameters: Collection[Array[String]] = {
+ val list = new ArrayList[Array[String]]()
+ list.add(Array("gzip"))
+ list.add(Array("snappy"))
+ list
+ }
+}