You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/03/08 22:26:28 UTC

kafka git commit: MINOR: Add varint serde utilities for new message format

Repository: kafka
Updated Branches:
  refs/heads/trunk 29084a9b2 -> f7354e779


MINOR: Add varint serde utilities for new message format

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Ismael Juma <is...@juma.me.uk>

Closes #2647 from hachikuji/add-varint-serdes


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f7354e77
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f7354e77
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f7354e77

Branch: refs/heads/trunk
Commit: f7354e779cb058985dd385b2b1209db71eb7a5e8
Parents: 29084a9
Author: Jason Gustafson <ja...@confluent.io>
Authored: Wed Mar 8 22:25:58 2017 +0000
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Wed Mar 8 22:25:58 2017 +0000

----------------------------------------------------------------------
 .../kafka/common/protocol/types/Type.java       |  57 ++++
 .../common/record/ByteBufferInputStream.java    |  48 ---
 .../common/record/ByteBufferLogInputStream.java |   4 +-
 .../common/record/ByteBufferOutputStream.java   |  58 ----
 .../kafka/common/record/CompressionType.java    |   2 +
 .../common/record/KafkaLZ4BlockInputStream.java |  10 +-
 .../record/KafkaLZ4BlockOutputStream.java       |  10 +-
 .../common/record/MemoryRecordsBuilder.java     |   1 +
 .../org/apache/kafka/common/record/Record.java  |   6 +-
 .../kafka/common/record/RecordsIterator.java    |   1 +
 .../common/utils/ByteBufferInputStream.java     |  48 +++
 .../common/utils/ByteBufferOutputStream.java    |  58 ++++
 .../apache/kafka/common/utils/ByteUtils.java    | 324 +++++++++++++++++++
 .../org/apache/kafka/common/utils/Utils.java    | 105 +-----
 .../clients/consumer/internals/FetcherTest.java |   2 +-
 .../types/ProtocolSerializationTest.java        |   8 +
 .../common/record/CompressionTypeTest.java      |   2 +
 .../kafka/common/record/SimpleRecordTest.java   |   1 +
 .../kafka/common/utils/ByteUtilsTest.java       | 222 +++++++++++++
 .../apache/kafka/common/utils/UtilsTest.java    |  80 +----
 core/src/main/scala/kafka/message/Message.scala |   7 +-
 .../scala/unit/kafka/message/MessageTest.scala  |   5 +-
 .../server/AbstractFetcherThreadTest.scala      |   4 +-
 .../internals/assignment/AssignmentInfo.java    |   2 +-
 24 files changed, 772 insertions(+), 293 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f7354e77/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
index 3341f3e..39e46fd 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
@@ -21,6 +21,7 @@ import java.nio.ByteBuffer;
 import org.apache.kafka.common.record.FileRecords;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.utils.ByteUtils;
 import org.apache.kafka.common.utils.Utils;
 
 /**
@@ -473,4 +474,60 @@ public abstract class Type {
         }
     };
 
+    public static final Type VARINT = new Type() {
+        @Override
+        public void write(ByteBuffer buffer, Object o) {
+            ByteUtils.writeVarint((Integer) o, buffer);
+        }
+
+        @Override
+        public Integer read(ByteBuffer buffer) {
+            return ByteUtils.readVarint(buffer);
+        }
+
+        @Override
+        public Integer validate(Object item) {
+            if (item instanceof Integer)
+                return (Integer) item;
+            throw new SchemaException(item + " is not an integer");
+        }
+
+        public String toString() {
+            return "VARINT";
+        }
+
+        @Override
+        public int sizeOf(Object o) {
+            return ByteUtils.sizeOfVarint((Integer) o);
+        }
+    };
+
+    public static final Type VARLONG = new Type() {
+        @Override
+        public void write(ByteBuffer buffer, Object o) {
+            ByteUtils.writeVarlong((Long) o, buffer);
+        }
+
+        @Override
+        public Long read(ByteBuffer buffer) {
+            return ByteUtils.readVarlong(buffer);
+        }
+
+        @Override
+        public Long validate(Object item) {
+            if (item instanceof Long)
+                return (Long) item;
+            throw new SchemaException(item + " is not a long");
+        }
+
+        public String toString() {
+            return "VARLONG";
+        }
+
+        @Override
+        public int sizeOf(Object o) {
+            return ByteUtils.sizeOfVarlong((Long) o);
+        }
+    };
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7354e77/clients/src/main/java/org/apache/kafka/common/record/ByteBufferInputStream.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferInputStream.java
deleted file mode 100644
index c033b6c..0000000
--- a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferInputStream.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common.record;
-
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-
-/**
- * A byte buffer backed input inputStream
- */
-public final class ByteBufferInputStream extends InputStream {
-    private final ByteBuffer buffer;
-
-    public ByteBufferInputStream(ByteBuffer buffer) {
-        this.buffer = buffer;
-    }
-
-    public int read() {
-        if (!buffer.hasRemaining()) {
-            return -1;
-        }
-        return buffer.get() & 0xFF;
-    }
-
-    public int read(byte[] bytes, int off, int len) {
-        if (!buffer.hasRemaining()) {
-            return -1;
-        }
-
-        len = Math.min(len, buffer.remaining());
-        buffer.get(bytes, off, len);
-        return len;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7354e77/clients/src/main/java/org/apache/kafka/common/record/ByteBufferLogInputStream.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferLogInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferLogInputStream.java
index bdda998..f4a3da4 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferLogInputStream.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferLogInputStream.java
@@ -17,7 +17,7 @@
 package org.apache.kafka.common.record;
 
 import org.apache.kafka.common.errors.CorruptRecordException;
-import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.common.utils.ByteUtils;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -106,7 +106,7 @@ class ByteBufferLogInputStream implements LogInputStream<ByteBufferLogInputStrea
             buffer.put(LOG_OVERHEAD + Record.ATTRIBUTES_OFFSET, timestampType.updateAttributes(attributes));
             buffer.putLong(LOG_OVERHEAD + Record.TIMESTAMP_OFFSET, timestamp);
             long crc = record.computeChecksum();
-            Utils.writeUnsignedInt(buffer, LOG_OVERHEAD + Record.CRC_OFFSET, crc);
+            ByteUtils.writeUnsignedInt(buffer, LOG_OVERHEAD + Record.CRC_OFFSET, crc);
         }
 
         public ByteBuffer buffer() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7354e77/clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java
deleted file mode 100644
index 4eee605..0000000
--- a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common.record;
-
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-
-/**
- * A byte buffer backed output outputStream
- */
-public class ByteBufferOutputStream extends OutputStream {
-
-    private static final float REALLOCATION_FACTOR = 1.1f;
-
-    private ByteBuffer buffer;
-
-    public ByteBufferOutputStream(ByteBuffer buffer) {
-        this.buffer = buffer;
-    }
-
-    public void write(int b) {
-        if (buffer.remaining() < 1)
-            expandBuffer(buffer.capacity() + 1);
-        buffer.put((byte) b);
-    }
-
-    public void write(byte[] bytes, int off, int len) {
-        if (buffer.remaining() < len)
-            expandBuffer(buffer.capacity() + len);
-        buffer.put(bytes, off, len);
-    }
-
-    public ByteBuffer buffer() {
-        return buffer;
-    }
-
-    private void expandBuffer(int size) {
-        int expandSize = Math.max((int) (buffer.capacity() * REALLOCATION_FACTOR), size);
-        ByteBuffer temp = ByteBuffer.allocate(expandSize);
-        temp.put(buffer.array(), buffer.arrayOffset(), buffer.position());
-        buffer = temp;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7354e77/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
index 658e50c..d88c530 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
@@ -17,6 +17,8 @@
 package org.apache.kafka.common.record;
 
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.utils.ByteBufferInputStream;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
 
 import java.io.InputStream;
 import java.io.OutputStream;

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7354e77/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java
index 6544d13..a53690c 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java
@@ -26,7 +26,7 @@ import java.io.InputStream;
 
 import org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.BD;
 import org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.FLG;
-import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.common.utils.ByteUtils;
 
 import net.jpountz.lz4.LZ4Exception;
 import net.jpountz.lz4.LZ4Factory;
@@ -112,7 +112,7 @@ public final class KafkaLZ4BlockInputStream extends FilterInputStream {
             throw new IOException(PREMATURE_EOS);
         }
 
-        if (MAGIC != Utils.readUnsignedIntLE(header, headerOffset - 6)) {
+        if (MAGIC != ByteUtils.readUnsignedIntLE(header, headerOffset - 6)) {
             throw new IOException(NOT_SUPPORTED);
         }
         flg = FLG.fromByte(header[headerOffset - 2]);
@@ -145,13 +145,13 @@ public final class KafkaLZ4BlockInputStream extends FilterInputStream {
      * @throws IOException
      */
     private void readBlock() throws IOException {
-        int blockSize = Utils.readUnsignedIntLE(in);
+        int blockSize = ByteUtils.readUnsignedIntLE(in);
 
         // Check for EndMark
         if (blockSize == 0) {
             finished = true;
             if (flg.isContentChecksumSet())
-                Utils.readUnsignedIntLE(in); // TODO: verify this content checksum
+                ByteUtils.readUnsignedIntLE(in); // TODO: verify this content checksum
             return;
         } else if (blockSize > maxBlockSize) {
             throw new IOException(String.format("Block size %s exceeded max: %s", blockSize, maxBlockSize));
@@ -172,7 +172,7 @@ public final class KafkaLZ4BlockInputStream extends FilterInputStream {
         }
 
         // verify checksum
-        if (flg.isBlockChecksumSet() && Utils.readUnsignedIntLE(in) != checksum.hash(bufferToRead, 0, blockSize, 0)) {
+        if (flg.isBlockChecksumSet() && ByteUtils.readUnsignedIntLE(in) != checksum.hash(bufferToRead, 0, blockSize, 0)) {
             throw new IOException(BLOCK_HASH_MISMATCH);
         }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7354e77/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java
index 0a64d43..034b945 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java
@@ -20,7 +20,7 @@ import java.io.FilterOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 
-import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.common.utils.ByteUtils;
 
 import net.jpountz.lz4.LZ4Compressor;
 import net.jpountz.lz4.LZ4Factory;
@@ -138,7 +138,7 @@ public final class KafkaLZ4BlockOutputStream extends FilterOutputStream {
      * @throws IOException
      */
     private void writeHeader() throws IOException {
-        Utils.writeUnsignedIntLE(buffer, 0, MAGIC);
+        ByteUtils.writeUnsignedIntLE(buffer, 0, MAGIC);
         bufferOffset = 4;
         buffer[bufferOffset++] = flg.toByte();
         buffer[bufferOffset++] = bd.toByte();
@@ -182,13 +182,13 @@ public final class KafkaLZ4BlockOutputStream extends FilterOutputStream {
         }
 
         // Write content
-        Utils.writeUnsignedIntLE(out, compressedLength | compressMethod);
+        ByteUtils.writeUnsignedIntLE(out, compressedLength | compressMethod);
         out.write(bufferToWrite, 0, compressedLength);
 
         // Calculate and write block checksum
         if (flg.isBlockChecksumSet()) {
             int hash = checksum.hash(bufferToWrite, 0, compressedLength, 0);
-            Utils.writeUnsignedIntLE(out, hash);
+            ByteUtils.writeUnsignedIntLE(out, hash);
         }
         bufferOffset = 0;
     }
@@ -200,7 +200,7 @@ public final class KafkaLZ4BlockOutputStream extends FilterOutputStream {
      * @throws IOException
      */
     private void writeEndMark() throws IOException {
-        Utils.writeUnsignedIntLE(out, 0);
+        ByteUtils.writeUnsignedIntLE(out, 0);
         // TODO implement content checksum, update flg.validate()
         finished = true;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7354e77/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
index 32ddcee..f3cf43c 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.common.record;
 
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
 
 import java.io.DataOutputStream;
 import java.io.IOException;

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7354e77/clients/src/main/java/org/apache/kafka/common/record/Record.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Record.java b/clients/src/main/java/org/apache/kafka/common/record/Record.java
index a54d65d..9932238 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/Record.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/Record.java
@@ -17,6 +17,8 @@
 package org.apache.kafka.common.record;
 
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.apache.kafka.common.utils.ByteUtils;
 import org.apache.kafka.common.utils.Crc32;
 import org.apache.kafka.common.utils.Utils;
 
@@ -118,7 +120,7 @@ public final class Record {
      * Retrieve the previously computed CRC for this record
      */
     public long checksum() {
-        return Utils.readUnsignedInt(buffer, CRC_OFFSET);
+        return ByteUtils.readUnsignedInt(buffer, CRC_OFFSET);
     }
 
     /**
@@ -468,7 +470,7 @@ public final class Record {
 
         // compute and fill the crc from the beginning of the message
         long crc = Utils.computeChecksum(buffer, recordPosition + MAGIC_OFFSET, recordSize - MAGIC_OFFSET);
-        Utils.writeUnsignedInt(buffer, recordPosition + CRC_OFFSET, crc);
+        ByteUtils.writeUnsignedInt(buffer, recordPosition + CRC_OFFSET, crc);
     }
 
     private static void write(ByteBuffer buffer,

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7354e77/clients/src/main/java/org/apache/kafka/common/record/RecordsIterator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/RecordsIterator.java b/clients/src/main/java/org/apache/kafka/common/record/RecordsIterator.java
index b6f049c..710ce3b 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/RecordsIterator.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/RecordsIterator.java
@@ -19,6 +19,7 @@ package org.apache.kafka.common.record;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.errors.CorruptRecordException;
 import org.apache.kafka.common.utils.AbstractIterator;
+import org.apache.kafka.common.utils.ByteBufferInputStream;
 import org.apache.kafka.common.utils.Utils;
 
 import java.io.DataInputStream;

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7354e77/clients/src/main/java/org/apache/kafka/common/utils/ByteBufferInputStream.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/ByteBufferInputStream.java b/clients/src/main/java/org/apache/kafka/common/utils/ByteBufferInputStream.java
new file mode 100644
index 0000000..094a1a7
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/utils/ByteBufferInputStream.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * A byte buffer backed input inputStream
+ */
+public final class ByteBufferInputStream extends InputStream {
+    private final ByteBuffer buffer;
+
+    public ByteBufferInputStream(ByteBuffer buffer) {
+        this.buffer = buffer;
+    }
+
+    public int read() {
+        if (!buffer.hasRemaining()) {
+            return -1;
+        }
+        return buffer.get() & 0xFF;
+    }
+
+    public int read(byte[] bytes, int off, int len) {
+        if (!buffer.hasRemaining()) {
+            return -1;
+        }
+
+        len = Math.min(len, buffer.remaining());
+        buffer.get(bytes, off, len);
+        return len;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7354e77/clients/src/main/java/org/apache/kafka/common/utils/ByteBufferOutputStream.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/ByteBufferOutputStream.java b/clients/src/main/java/org/apache/kafka/common/utils/ByteBufferOutputStream.java
new file mode 100644
index 0000000..9480c6d
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/utils/ByteBufferOutputStream.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * A byte buffer backed output outputStream
+ */
+public class ByteBufferOutputStream extends OutputStream {
+
+    private static final float REALLOCATION_FACTOR = 1.1f;
+
+    private ByteBuffer buffer;
+
+    public ByteBufferOutputStream(ByteBuffer buffer) {
+        this.buffer = buffer;
+    }
+
+    public void write(int b) {
+        if (buffer.remaining() < 1)
+            expandBuffer(buffer.capacity() + 1);
+        buffer.put((byte) b);
+    }
+
+    public void write(byte[] bytes, int off, int len) {
+        if (buffer.remaining() < len)
+            expandBuffer(buffer.capacity() + len);
+        buffer.put(bytes, off, len);
+    }
+
+    public ByteBuffer buffer() {
+        return buffer;
+    }
+
+    private void expandBuffer(int size) {
+        int expandSize = Math.max((int) (buffer.capacity() * REALLOCATION_FACTOR), size);
+        ByteBuffer temp = ByteBuffer.allocate(expandSize);
+        temp.put(buffer.array(), buffer.arrayOffset(), buffer.position());
+        buffer = temp;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7354e77/clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java b/clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java
new file mode 100644
index 0000000..50c90a8
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * This classes exposes low-level methods for reading/writing from byte streams or buffers.
+ */
+public final class ByteUtils {
+
+    private ByteUtils() {}
+
+    /**
+     * Read an unsigned integer from the given position without modifying the buffers position
+     *
+     * @param buffer the buffer to read from
+     * @param index the index from which to read the integer
+     * @return The integer read, as a long to avoid signedness
+     */
+    public static long readUnsignedInt(ByteBuffer buffer, int index) {
+        return buffer.getInt(index) & 0xffffffffL;
+    }
+
+    /**
+     * Read an unsigned integer stored in little-endian format from the {@link InputStream}.
+     *
+     * @param in The stream to read from
+     * @return The integer read (MUST BE TREATED WITH SPECIAL CARE TO AVOID SIGNEDNESS)
+     */
+    public static int readUnsignedIntLE(InputStream in) throws IOException {
+        return in.read()
+                | (in.read() << 8)
+                | (in.read() << 16)
+                | (in.read() << 24);
+    }
+
+    /**
+     * Read an unsigned integer stored in little-endian format from a byte array
+     * at a given offset.
+     *
+     * @param buffer The byte array to read from
+     * @param offset The position in buffer to read from
+     * @return The integer read (MUST BE TREATED WITH SPECIAL CARE TO AVOID SIGNEDNESS)
+     */
+    public static int readUnsignedIntLE(byte[] buffer, int offset) {
+        return (buffer[offset] << 0 & 0xff)
+                | ((buffer[offset + 1] & 0xff) << 8)
+                | ((buffer[offset + 2] & 0xff) << 16)
+                | ((buffer[offset + 3] & 0xff) << 24);
+    }
+
+    /**
+     * Write the given long value as a 4 byte unsigned integer. Overflow is ignored.
+     *
+     * @param buffer The buffer to write to
+     * @param index The position in the buffer at which to begin writing
+     * @param value The value to write
+     */
+    public static void writeUnsignedInt(ByteBuffer buffer, int index, long value) {
+        buffer.putInt(index, (int) (value & 0xffffffffL));
+    }
+
+    /**
+     * Write the given long value as a 4 byte unsigned integer. Overflow is ignored.
+     *
+     * @param buffer The buffer to write to
+     * @param value The value to write
+     */
+    public static void writeUnsignedInt(ByteBuffer buffer, long value) {
+        buffer.putInt((int) (value & 0xffffffffL));
+    }
+
+    /**
+     * Write an unsigned integer in little-endian format to the {@link OutputStream}.
+     *
+     * @param out The stream to write to
+     * @param value The value to write
+     */
+    public static void writeUnsignedIntLE(OutputStream out, int value) throws IOException {
+        out.write(value);
+        out.write(value >>> 8);
+        out.write(value >>> 16);
+        out.write(value >>> 24);
+    }
+
+    /**
+     * Write an unsigned integer in little-endian format to a byte array
+     * at a given offset.
+     *
+     * @param buffer The byte array to write to
+     * @param offset The position in buffer to write to
+     * @param value The value to write
+     */
+    public static void writeUnsignedIntLE(byte[] buffer, int offset, int value) {
+        buffer[offset] = (byte) value;
+        buffer[offset + 1] = (byte) (value >>> 8);
+        buffer[offset + 2] = (byte) (value >>> 16);
+        buffer[offset + 3]   = (byte) (value >>> 24);
+    }
+
+    /**
+     * Read an integer stored in variable-length format using zig-zag decoding from
+     * <a href="http://code.google.com/apis/protocolbuffers/docs/encoding.html"> Google Protocol Buffers</a>.
+     *
+     * @param buffer The buffer to read from
+     * @return The integer read
+     *
+     * @throws IllegalArgumentException if variable-length value does not terminate after 5 bytes have been read
+     */
+    public static int readVarint(ByteBuffer buffer) {
+        int value = 0;
+        int i = 0;
+        int b;
+        while (((b = buffer.get()) & 0x80) != 0) {
+            value |= (b & 0x7f) << i;
+            i += 7;
+            if (i > 28)
+                throw illegalVarintException(value);
+        }
+        value |= b << i;
+        return (value >>> 1) ^ -(value & 1);
+    }
+
+    /**
+     * Read an integer stored in variable-length format using zig-zag decoding from
+     * <a href="http://code.google.com/apis/protocolbuffers/docs/encoding.html"> Google Protocol Buffers</a>.
+     *
+     * @param in The input to read from
+     * @return The integer read
+     *
+     * @throws IllegalArgumentException if variable-length value does not terminate after 5 bytes have been read
+     * @throws IOException              if {@link DataInput} throws {@link IOException}
+     */
+    public static int readVarint(DataInput in) throws IOException {
+        int value = 0;
+        int i = 0;
+        int b;
+        while (((b = in.readByte()) & 0x80) != 0) {
+            value |= (b & 0x7f) << i;
+            i += 7;
+            if (i > 28)
+                throw illegalVarintException(value);
+        }
+        value |= b << i;
+        return (value >>> 1) ^ -(value & 1);
+    }
+
+    /**
+     * Read a long stored in variable-length format using zig-zag decoding from
+     * <a href="http://code.google.com/apis/protocolbuffers/docs/encoding.html"> Google Protocol Buffers</a>.
+     *
+     * @param in The input to read from
+     * @return The long value read
+     *
+     * @throws IllegalArgumentException if variable-length value does not terminate after 10 bytes have been read
+     * @throws IOException              if {@link DataInput} throws {@link IOException}
+     */
+    public static long readVarlong(DataInput in) throws IOException {
+        long value = 0L;
+        int i = 0;
+        long b;
+        while (((b = in.readByte()) & 0x80) != 0) {
+            value |= (b & 0x7f) << i;
+            i += 7;
+            if (i > 63)
+                throw illegalVarlongException(value);
+        }
+        value |= b << i;
+        return (value >>> 1) ^ -(value & 1);
+    }
+
+    /**
+     * Read a long stored in variable-length format using zig-zag decoding from
+     * <a href="http://code.google.com/apis/protocolbuffers/docs/encoding.html"> Google Protocol Buffers</a>.
+     *
+     * @param buffer The buffer to read from
+     * @return The long value read
+     *
+     * @throws IllegalArgumentException if variable-length value does not terminate after 10 bytes have been read
+     */
+    public static long readVarlong(ByteBuffer buffer)  {
+        long value = 0L;
+        int i = 0;
+        long b;
+        while (((b = buffer.get()) & 0x80) != 0) {
+            value |= (b & 0x7f) << i;
+            i += 7;
+            if (i > 63)
+                throw illegalVarlongException(value);
+        }
+        value |= b << i;
+        return (value >>> 1) ^ -(value & 1);
+    }
+
+    /**
+     * Write the given integer following the variable-length zig-zag encoding from
+     * <a href="http://code.google.com/apis/protocolbuffers/docs/encoding.html"> Google Protocol Buffers</a>
+     * into the output.
+     *
+     * @param value The value to write
+     * @param out The output to write to
+     */
+    public static void writeVarint(int value, DataOutput out) throws IOException {
+        int v = (value << 1) ^ (value >> 31);
+        while ((v & 0xffffff80) != 0L) {
+            out.writeByte((v & 0x7f) | 0x80);
+            v >>>= 7;
+        }
+        out.writeByte((byte) v);
+    }
+
+    /**
+     * Write the given integer following the variable-length zig-zag encoding from
+     * <a href="http://code.google.com/apis/protocolbuffers/docs/encoding.html"> Google Protocol Buffers</a>
+     * into the buffer.
+     *
+     * @param value The value to write
+     * @param buffer The output to write to
+     */
+    public static void writeVarint(int value, ByteBuffer buffer) {
+        int v = (value << 1) ^ (value >> 31);
+        while ((v & 0xffffff80) != 0L) {
+            byte b = (byte) ((v & 0x7f) | 0x80);
+            buffer.put(b);
+            v >>>= 7;
+        }
+        buffer.put((byte) v);
+    }
+
+    /**
+     * Write the given integer following the variable-length zig-zag encoding from
+     * <a href="http://code.google.com/apis/protocolbuffers/docs/encoding.html"> Google Protocol Buffers</a>
+     * into the output.
+     *
+     * @param value The value to write
+     * @param out The output to write to
+     */
+    public static void writeVarlong(long value, DataOutput out) throws IOException {
+        long v = (value << 1) ^ (value >> 63);
+        while ((v & 0xffffffffffffff80L) != 0L) {
+            out.writeByte(((int) v & 0x7f) | 0x80);
+            v >>>= 7;
+        }
+        out.writeByte((byte) v);
+    }
+
+    /**
+     * Write the given integer following the variable-length zig-zag encoding from
+     * <a href="http://code.google.com/apis/protocolbuffers/docs/encoding.html"> Google Protocol Buffers</a>
+     * into the buffer.
+     *
+     * @param value The value to write
+     * @param buffer The buffer to write to
+     */
+    public static void writeVarlong(long value, ByteBuffer buffer) {
+        long v = (value << 1) ^ (value >> 63);
+        while ((v & 0xffffffffffffff80L) != 0L) {
+            byte b = (byte) ((v & 0x7f) | 0x80);
+            buffer.put(b);
+            v >>>= 7;
+        }
+        buffer.put((byte) v);
+    }
+
+    /**
+     * Number of bytes needed to encode an integer in variable-length format.
+     *
+     * @param value The signed value
+     */
+    public static int sizeOfVarint(int value) {
+        int v = (value << 1) ^ (value >> 31);
+        int bytes = 1;
+        while ((v & 0xffffff80) != 0L) {
+            bytes += 1;
+            v >>>= 7;
+        }
+        return bytes;
+    }
+
+    /**
+     * Number of bytes needed to encode a long in variable-length format.
+     *
+     * @param value The signed value
+     */
+    public static int sizeOfVarlong(long value) {
+        long v = (value << 1) ^ (value >> 63);
+        int bytes = 1;
+        while ((v & 0xffffffffffffff80L) != 0L) {
+            bytes += 1;
+            v >>>= 7;
+        }
+        return bytes;
+    }
+
+    private static IllegalArgumentException illegalVarintException(int value) {
+        throw new IllegalArgumentException("Varint is too long, the most significant bit in the 5th byte is set, " +
+                "converted value: " + Integer.toHexString(value));
+    }
+
+    private static IllegalArgumentException illegalVarlongException(long value) {
+        throw new IllegalArgumentException("Varlong is too long, most significant bit in the 10th byte is set, " +
+                "converted value: " + Long.toHexString(value));
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7354e77/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index ed5eddb..6dce978 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -16,19 +16,24 @@
  */
 package org.apache.kafka.common.utils;
 
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
+import org.apache.kafka.common.KafkaException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.Closeable;
+import java.io.EOFException;
 import java.io.File;
 import java.io.FileInputStream;
-import java.io.OutputStream;
-import java.io.UnsupportedEncodingException;
 import java.io.FileNotFoundException;
-import java.io.StringWriter;
+import java.io.IOException;
+import java.io.InputStream;
 import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.io.UnsupportedEncodingException;
 import java.nio.ByteBuffer;
 import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.charset.Charset;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.StandardCopyOption;
@@ -41,16 +46,10 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.Set;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
-import java.util.Properties;
-import java.nio.channels.FileChannel;
-import java.nio.charset.Charset;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.kafka.common.KafkaException;
 
 public class Utils {
 
@@ -103,85 +102,6 @@ public class Utils {
     }
 
     /**
-     * Read an unsigned integer from the given position without modifying the buffers position
-     *
-     * @param buffer the buffer to read from
-     * @param index the index from which to read the integer
-     * @return The integer read, as a long to avoid signedness
-     */
-    public static long readUnsignedInt(ByteBuffer buffer, int index) {
-        return buffer.getInt(index) & 0xffffffffL;
-    }
-
-    /**
-     * Read an unsigned integer stored in little-endian format from the {@link InputStream}.
-     *
-     * @param in The stream to read from
-     * @return The integer read (MUST BE TREATED WITH SPECIAL CARE TO AVOID SIGNEDNESS)
-     */
-    public static int readUnsignedIntLE(InputStream in) throws IOException {
-        return in.read()
-                | (in.read() << 8)
-                | (in.read() << 16)
-                | (in.read() << 24);
-    }
-
-    /**
-     * Read an unsigned integer stored in little-endian format from a byte array
-     * at a given offset.
-     *
-     * @param buffer The byte array to read from
-     * @param offset The position in buffer to read from
-     * @return The integer read (MUST BE TREATED WITH SPECIAL CARE TO AVOID SIGNEDNESS)
-     */
-    public static int readUnsignedIntLE(byte[] buffer, int offset) {
-        return (buffer[offset] << 0 & 0xff)
-                | ((buffer[offset + 1] & 0xff) << 8)
-                | ((buffer[offset + 2] & 0xff) << 16)
-                | ((buffer[offset + 3] & 0xff) << 24);
-    }
-
-    /**
-     * Write the given long value as a 4 byte unsigned integer. Overflow is ignored.
-     *
-     * @param buffer The buffer to write to
-     * @param index The position in the buffer at which to begin writing
-     * @param value The value to write
-     */
-    public static void writeUnsignedInt(ByteBuffer buffer, int index, long value) {
-        buffer.putInt(index, (int) (value & 0xffffffffL));
-    }
-
-    /**
-     * Write an unsigned integer in little-endian format to the {@link OutputStream}.
-     *
-     * @param out The stream to write to
-     * @param value The value to write
-     */
-    public static void writeUnsignedIntLE(OutputStream out, int value) throws IOException {
-        out.write(value);
-        out.write(value >>> 8);
-        out.write(value >>> 16);
-        out.write(value >>> 24);
-    }
-
-    /**
-     * Write an unsigned integer in little-endian format to a byte array
-     * at a given offset.
-     *
-     * @param buffer The byte array to write to
-     * @param offset The position in buffer to write to
-     * @param value The value to write
-     */
-    public static void writeUnsignedIntLE(byte[] buffer, int offset, int value) {
-        buffer[offset] = (byte) value;
-        buffer[offset + 1] = (byte) (value >>> 8);
-        buffer[offset + 2] = (byte) (value >>> 16);
-        buffer[offset + 3]   = (byte) (value >>> 24);
-    }
-
-
-    /**
      * Get the absolute value of the given number. If the number is Int.MinValue return 0. This is different from
      * java.lang.Math.abs or scala.math.abs in that they return Int.MinValue (!).
      */
@@ -824,4 +744,5 @@ public class Utils {
             currentPosition += bytesRead;
         } while (bytesRead != -1 && destinationBuffer.hasRemaining());
     }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7354e77/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 3f65caf..de527f8 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -40,7 +40,7 @@ import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.record.ByteBufferOutputStream;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
 import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.record.MemoryRecordsBuilder;

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7354e77/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java b/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java
index 1c14e82..136b55a 100644
--- a/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java
@@ -40,6 +40,8 @@ public class ProtocolSerializationTest {
                                  new Field("int16", Type.INT16),
                                  new Field("int32", Type.INT32),
                                  new Field("int64", Type.INT64),
+                                 new Field("varint", Type.VARINT),
+                                 new Field("varlong", Type.VARLONG),
                                  new Field("string", Type.STRING),
                                  new Field("nullable_string", Type.NULLABLE_STRING),
                                  new Field("bytes", Type.BYTES),
@@ -52,6 +54,8 @@ public class ProtocolSerializationTest {
                                              .set("int16", (short) 1)
                                              .set("int32", 1)
                                              .set("int64", 1L)
+                                             .set("varint", 300)
+                                             .set("varlong", 500L)
                                              .set("string", "1")
                                              .set("nullable_string", null)
                                              .set("bytes", ByteBuffer.wrap("1".getBytes()))
@@ -80,6 +84,10 @@ public class ProtocolSerializationTest {
         check(Type.NULLABLE_BYTES, null);
         check(Type.NULLABLE_BYTES, ByteBuffer.allocate(0));
         check(Type.NULLABLE_BYTES, ByteBuffer.wrap("abcd".getBytes()));
+        check(Type.VARINT, Integer.MAX_VALUE);
+        check(Type.VARINT, Integer.MIN_VALUE);
+        check(Type.VARLONG, Long.MAX_VALUE);
+        check(Type.VARLONG, Long.MIN_VALUE);
         check(new ArrayOf(Type.INT32), new Object[] {1, 2, 3, 4});
         check(new ArrayOf(Type.STRING), new Object[] {});
         check(new ArrayOf(Type.STRING), new Object[] {"hello", "there", "beautiful"});

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7354e77/clients/src/test/java/org/apache/kafka/common/record/CompressionTypeTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/CompressionTypeTest.java b/clients/src/test/java/org/apache/kafka/common/record/CompressionTypeTest.java
index 5a1d5f4..21cbfb7 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/CompressionTypeTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/CompressionTypeTest.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.common.record;
 
+import org.apache.kafka.common.utils.ByteBufferInputStream;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
 import org.junit.Test;
 
 import java.nio.ByteBuffer;

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7354e77/clients/src/test/java/org/apache/kafka/common/record/SimpleRecordTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/SimpleRecordTest.java b/clients/src/test/java/org/apache/kafka/common/record/SimpleRecordTest.java
index 4bb90cd..aa77ca4 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/SimpleRecordTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/SimpleRecordTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.common.record;
 
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
 import org.apache.kafka.common.utils.Utils;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7354e77/clients/src/test/java/org/apache/kafka/common/utils/ByteUtilsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/ByteUtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/ByteUtilsTest.java
new file mode 100644
index 0000000..0a082fb
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/utils/ByteUtilsTest.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+public class ByteUtilsTest {
+    private final byte x00 = 0x00;
+    private final byte x01 = 0x01;
+    private final byte x02 = 0x02;
+    private final byte x0F = 0x0f;
+    private final byte x7E = 0x7E;
+    private final byte x7F = 0x7F;
+    private final byte xFF = (byte) 0xff;
+    private final byte x80 = (byte) 0x80;
+    private final byte x81 = (byte) 0x81;
+    private final byte xFE = (byte) 0xfe;
+
+    @Test
+    public void testReadUnsignedIntLEFromArray() {
+        byte[] array1 = {0x01, 0x02, 0x03, 0x04, 0x05};
+        assertEquals(0x04030201, ByteUtils.readUnsignedIntLE(array1, 0));
+        assertEquals(0x05040302, ByteUtils.readUnsignedIntLE(array1, 1));
+
+        byte[] array2 = {(byte) 0xf1, (byte) 0xf2, (byte) 0xf3, (byte) 0xf4, (byte) 0xf5, (byte) 0xf6};
+        assertEquals(0xf4f3f2f1, ByteUtils.readUnsignedIntLE(array2, 0));
+        assertEquals(0xf6f5f4f3, ByteUtils.readUnsignedIntLE(array2, 2));
+    }
+
+    @Test
+    public void testReadUnsignedIntLEFromInputStream() throws IOException {
+        byte[] array1 = {0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09};
+        ByteArrayInputStream is1 = new ByteArrayInputStream(array1);
+        assertEquals(0x04030201, ByteUtils.readUnsignedIntLE(is1));
+        assertEquals(0x08070605, ByteUtils.readUnsignedIntLE(is1));
+
+        byte[] array2 = {(byte) 0xf1, (byte) 0xf2, (byte) 0xf3, (byte) 0xf4, (byte) 0xf5, (byte) 0xf6, (byte) 0xf7, (byte) 0xf8};
+        ByteArrayInputStream is2 = new ByteArrayInputStream(array2);
+        assertEquals(0xf4f3f2f1, ByteUtils.readUnsignedIntLE(is2));
+        assertEquals(0xf8f7f6f5, ByteUtils.readUnsignedIntLE(is2));
+    }
+
+    @Test
+    public void testWriteUnsignedIntLEToArray() {
+        int value1 = 0x04030201;
+
+        byte[] array1 = new byte[4];
+        ByteUtils.writeUnsignedIntLE(array1, 0, value1);
+        assertArrayEquals(new byte[] {0x01, 0x02, 0x03, 0x04}, array1);
+
+        array1 = new byte[8];
+        ByteUtils.writeUnsignedIntLE(array1, 2, value1);
+        assertArrayEquals(new byte[] {0, 0, 0x01, 0x02, 0x03, 0x04, 0, 0}, array1);
+
+        int value2 = 0xf4f3f2f1;
+
+        byte[] array2 = new byte[4];
+        ByteUtils.writeUnsignedIntLE(array2, 0, value2);
+        assertArrayEquals(new byte[] {(byte) 0xf1, (byte) 0xf2, (byte) 0xf3, (byte) 0xf4}, array2);
+
+        array2 = new byte[8];
+        ByteUtils.writeUnsignedIntLE(array2, 2, value2);
+        assertArrayEquals(new byte[] {0, 0, (byte) 0xf1, (byte) 0xf2, (byte) 0xf3, (byte) 0xf4, 0, 0}, array2);
+    }
+
+    @Test
+    public void testWriteUnsignedIntLEToOutputStream() throws IOException {
+        int value1 = 0x04030201;
+        ByteArrayOutputStream os1 = new ByteArrayOutputStream();
+        ByteUtils.writeUnsignedIntLE(os1, value1);
+        ByteUtils.writeUnsignedIntLE(os1, value1);
+        assertArrayEquals(new byte[] {0x01, 0x02, 0x03, 0x04, 0x01, 0x02, 0x03, 0x04}, os1.toByteArray());
+
+        int value2 = 0xf4f3f2f1;
+        ByteArrayOutputStream os2 = new ByteArrayOutputStream();
+        ByteUtils.writeUnsignedIntLE(os2, value2);
+        assertArrayEquals(new byte[] {(byte) 0xf1, (byte) 0xf2, (byte) 0xf3, (byte) 0xf4}, os2.toByteArray());
+    }
+
+    @Test
+    public void testVarintSerde() throws Exception {
+        assertVarintSerde(0, new byte[] {x00});
+        assertVarintSerde(-1, new byte[] {x01});
+        assertVarintSerde(1, new byte[] {x02});
+        assertVarintSerde(63, new byte[] {x7E});
+        assertVarintSerde(-64, new byte[] {x7F});
+        assertVarintSerde(64, new byte[] {x80, x01});
+        assertVarintSerde(-65, new byte[] {x81, x01});
+        assertVarintSerde(8191, new byte[] {xFE, x7F});
+        assertVarintSerde(-8192, new byte[] {xFF, x7F});
+        assertVarintSerde(8192, new byte[] {x80, x80, x01});
+        assertVarintSerde(-8193, new byte[] {x81, x80, x01});
+        assertVarintSerde(1048575, new byte[] {xFE, xFF, x7F});
+        assertVarintSerde(-1048576, new byte[] {xFF, xFF, x7F});
+        assertVarintSerde(1048576, new byte[] {x80, x80, x80, x01});
+        assertVarintSerde(-1048577, new byte[] {x81, x80, x80, x01});
+        assertVarintSerde(134217727, new byte[] {xFE, xFF, xFF, x7F});
+        assertVarintSerde(-134217728, new byte[] {xFF, xFF, xFF, x7F});
+        assertVarintSerde(134217728, new byte[] {x80, x80, x80, x80, x01});
+        assertVarintSerde(-134217729, new byte[] {x81, x80, x80, x80, x01});
+        assertVarintSerde(Integer.MAX_VALUE, new byte[] {xFE, xFF, xFF, xFF, x0F});
+        assertVarintSerde(Integer.MIN_VALUE, new byte[] {xFF, xFF, xFF, xFF, x0F});
+    }
+
+    @Test
+    public void testVarlongSerde() throws Exception {
+        assertVarlongSerde(0, new byte[] {x00});
+        assertVarlongSerde(-1, new byte[] {x01});
+        assertVarlongSerde(1, new byte[] {x02});
+        assertVarlongSerde(63, new byte[] {x7E});
+        assertVarlongSerde(-64, new byte[] {x7F});
+        assertVarlongSerde(64, new byte[] {x80, x01});
+        assertVarlongSerde(-65, new byte[] {x81, x01});
+        assertVarlongSerde(8191, new byte[] {xFE, x7F});
+        assertVarlongSerde(-8192, new byte[] {xFF, x7F});
+        assertVarlongSerde(8192, new byte[] {x80, x80, x01});
+        assertVarlongSerde(-8193, new byte[] {x81, x80, x01});
+        assertVarlongSerde(1048575, new byte[] {xFE, xFF, x7F});
+        assertVarlongSerde(-1048576, new byte[] {xFF, xFF, x7F});
+        assertVarlongSerde(1048576, new byte[] {x80, x80, x80, x01});
+        assertVarlongSerde(-1048577, new byte[] {x81, x80, x80, x01});
+        assertVarlongSerde(134217727, new byte[] {xFE, xFF, xFF, x7F});
+        assertVarlongSerde(-134217728, new byte[] {xFF, xFF, xFF, x7F});
+        assertVarlongSerde(134217728, new byte[] {x80, x80, x80, x80, x01});
+        assertVarlongSerde(-134217729, new byte[] {x81, x80, x80, x80, x01});
+        assertVarlongSerde(Integer.MAX_VALUE, new byte[] {xFE, xFF, xFF, xFF, x0F});
+        assertVarlongSerde(Integer.MIN_VALUE, new byte[] {xFF, xFF, xFF, xFF, x0F});
+        assertVarlongSerde(17179869183L, new byte[] {xFE, xFF, xFF, xFF, x7F});
+        assertVarlongSerde(-17179869184L, new byte[] {xFF, xFF, xFF, xFF, x7F});
+        assertVarlongSerde(17179869184L, new byte[] {x80, x80, x80, x80, x80, x01});
+        assertVarlongSerde(-17179869185L, new byte[] {x81, x80, x80, x80, x80, x01});
+        assertVarlongSerde(2199023255551L, new byte[] {xFE, xFF, xFF, xFF, xFF, x7F});
+        assertVarlongSerde(-2199023255552L, new byte[] {xFF, xFF, xFF, xFF, xFF, x7F});
+        assertVarlongSerde(2199023255552L, new byte[] {x80, x80, x80, x80, x80, x80, x01});
+        assertVarlongSerde(-2199023255553L, new byte[] {x81, x80, x80, x80, x80, x80, x01});
+        assertVarlongSerde(281474976710655L, new byte[] {xFE, xFF, xFF, xFF, xFF, xFF, x7F});
+        assertVarlongSerde(-281474976710656L, new byte[] {xFF, xFF, xFF, xFF, xFF, xFF, x7F});
+        assertVarlongSerde(281474976710656L, new byte[] {x80, x80, x80, x80, x80, x80, x80, x01});
+        assertVarlongSerde(-281474976710657L, new byte[] {x81, x80, x80, x80, x80, x80, x80, 1});
+        assertVarlongSerde(36028797018963967L, new byte[] {xFE, xFF, xFF, xFF, xFF, xFF, xFF, x7F});
+        assertVarlongSerde(-36028797018963968L, new byte[] {xFF, xFF, xFF, xFF, xFF, xFF, xFF, x7F});
+        assertVarlongSerde(36028797018963968L, new byte[] {x80, x80, x80, x80, x80, x80, x80, x80, x01});
+        assertVarlongSerde(-36028797018963969L, new byte[] {x81, x80, x80, x80, x80, x80, x80, x80, x01});
+        assertVarlongSerde(4611686018427387903L, new byte[] {xFE, xFF, xFF, xFF, xFF, xFF, xFF, xFF, x7F});
+        assertVarlongSerde(-4611686018427387904L, new byte[] {xFF, xFF, xFF, xFF, xFF, xFF, xFF, xFF, x7F});
+        assertVarlongSerde(4611686018427387904L, new byte[] {x80, x80, x80, x80, x80, x80, x80, x80, x80, x01});
+        assertVarlongSerde(-4611686018427387905L, new byte[] {x81, x80, x80, x80, x80, x80, x80, x80, x80, x01});
+        assertVarlongSerde(Long.MAX_VALUE, new byte[] {xFE, xFF, xFF, xFF, xFF, xFF, xFF, xFF, xFF, x01});
+        assertVarlongSerde(Long.MIN_VALUE, new byte[] {xFF, xFF, xFF, xFF, xFF, xFF, xFF, xFF, xFF, x01});
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testInvalidVarint() {
+        // varint encoding has one overflow byte
+        ByteBuffer buf = ByteBuffer.wrap(new byte[] {xFF, xFF, xFF, xFF, xFF, x01});
+        ByteUtils.readVarint(buf);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testInvalidVarlong() {
+        // varlong encoding has one overflow byte
+        ByteBuffer buf = ByteBuffer.wrap(new byte[] {xFF, xFF, xFF, xFF, xFF, xFF, xFF, xFF, xFF, xFF, x01});
+        ByteUtils.readVarlong(buf);
+    }
+
+    private void assertVarintSerde(int value, byte[] expectedEncoding) throws IOException {
+        ByteBuffer buf = ByteBuffer.allocate(32);
+        ByteUtils.writeVarint(value, buf);
+        buf.flip();
+        assertArrayEquals(expectedEncoding, Utils.toArray(buf));
+        assertEquals(value, ByteUtils.readVarint(buf.duplicate()));
+
+        buf.rewind();
+        DataOutputStream out = new DataOutputStream(new ByteBufferOutputStream(buf));
+        ByteUtils.writeVarint(value, out);
+        buf.flip();
+        assertArrayEquals(expectedEncoding, Utils.toArray(buf));
+        DataInputStream in = new DataInputStream(new ByteBufferInputStream(buf));
+        assertEquals(value, ByteUtils.readVarint(in));
+    }
+
+    private void assertVarlongSerde(long value, byte[] expectedEncoding) throws IOException {
+        ByteBuffer buf = ByteBuffer.allocate(32);
+        ByteUtils.writeVarlong(value, buf);
+        buf.flip();
+        assertEquals(value, ByteUtils.readVarlong(buf.duplicate()));
+        assertArrayEquals(expectedEncoding, Utils.toArray(buf));
+
+        buf.rewind();
+        DataOutputStream out = new DataOutputStream(new ByteBufferOutputStream(buf));
+        ByteUtils.writeVarlong(value, out);
+        buf.flip();
+        assertArrayEquals(expectedEncoding, Utils.toArray(buf));
+        DataInputStream in = new DataInputStream(new ByteBufferInputStream(buf));
+        assertEquals(value, ByteUtils.readVarlong(in));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7354e77/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
index 5f36c1c..7672335 100755
--- a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
@@ -16,28 +16,24 @@
  */
 package org.apache.kafka.common.utils;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
+import org.apache.kafka.test.TestUtils;
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.junit.Test;
+
+import java.io.Closeable;
 import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.file.StandardOpenOption;
 import java.util.Arrays;
 import java.util.Collections;
-import java.io.Closeable;
-import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.Random;
 
-import org.apache.kafka.test.TestUtils;
-import org.easymock.EasyMock;
-import org.easymock.IAnswer;
-import org.junit.Test;
-
-
 import static org.apache.kafka.common.utils.Utils.formatAddress;
 import static org.apache.kafka.common.utils.Utils.getHost;
 import static org.apache.kafka.common.utils.Utils.getPort;
-import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -337,64 +333,4 @@ public class UtilsTest {
         }
     }
 
-    @Test
-    public void testReadUnsignedIntLEFromArray() {
-        byte[] array1 = {0x01, 0x02, 0x03, 0x04, 0x05};
-        assertEquals(0x04030201, Utils.readUnsignedIntLE(array1, 0));
-        assertEquals(0x05040302, Utils.readUnsignedIntLE(array1, 1));
-
-        byte[] array2 = {(byte) 0xf1, (byte) 0xf2, (byte) 0xf3, (byte) 0xf4, (byte) 0xf5, (byte) 0xf6};
-        assertEquals(0xf4f3f2f1, Utils.readUnsignedIntLE(array2, 0));
-        assertEquals(0xf6f5f4f3, Utils.readUnsignedIntLE(array2, 2));
-    }
-
-    @Test
-    public void testReadUnsignedIntLEFromInputStream() throws IOException {
-        byte[] array1 = {0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09};
-        ByteArrayInputStream is1 = new ByteArrayInputStream(array1);
-        assertEquals(0x04030201, Utils.readUnsignedIntLE(is1));
-        assertEquals(0x08070605, Utils.readUnsignedIntLE(is1));
-
-        byte[] array2 = {(byte) 0xf1, (byte) 0xf2, (byte) 0xf3, (byte) 0xf4, (byte) 0xf5, (byte) 0xf6, (byte) 0xf7, (byte) 0xf8};
-        ByteArrayInputStream is2 = new ByteArrayInputStream(array2);
-        assertEquals(0xf4f3f2f1, Utils.readUnsignedIntLE(is2));
-        assertEquals(0xf8f7f6f5, Utils.readUnsignedIntLE(is2));
-    }
-
-    @Test
-    public void testWriteUnsignedIntLEToArray() {
-        int value1 = 0x04030201;
-
-        byte[] array1 = new byte[4];
-        Utils.writeUnsignedIntLE(array1, 0, value1);
-        assertArrayEquals(new byte[] {0x01, 0x02, 0x03, 0x04}, array1);
-
-        array1 = new byte[8];
-        Utils.writeUnsignedIntLE(array1, 2, value1);
-        assertArrayEquals(new byte[] {0, 0, 0x01, 0x02, 0x03, 0x04, 0, 0}, array1);
-
-        int value2 = 0xf4f3f2f1;
-
-        byte[] array2 = new byte[4];
-        Utils.writeUnsignedIntLE(array2, 0, value2);
-        assertArrayEquals(new byte[] {(byte) 0xf1, (byte) 0xf2, (byte) 0xf3, (byte) 0xf4}, array2);
-
-        array2 = new byte[8];
-        Utils.writeUnsignedIntLE(array2, 2, value2);
-        assertArrayEquals(new byte[] {0, 0, (byte) 0xf1, (byte) 0xf2, (byte) 0xf3, (byte) 0xf4, 0, 0}, array2);
-    }
-
-    @Test
-    public void testWriteUnsignedIntLEToOutputStream() throws IOException {
-        int value1 = 0x04030201;
-        ByteArrayOutputStream os1 = new ByteArrayOutputStream();
-        Utils.writeUnsignedIntLE(os1, value1);
-        Utils.writeUnsignedIntLE(os1, value1);
-        assertArrayEquals(new byte[] {0x01, 0x02, 0x03, 0x04, 0x01, 0x02, 0x03, 0x04}, os1.toByteArray());
-
-        int value2 = 0xf4f3f2f1;
-        ByteArrayOutputStream os2 = new ByteArrayOutputStream();
-        Utils.writeUnsignedIntLE(os2, value2);
-        assertArrayEquals(new byte[] {(byte) 0xf1, (byte) 0xf2, (byte) 0xf3, (byte) 0xf4}, os2.toByteArray());
-    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7354e77/core/src/main/scala/kafka/message/Message.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/Message.scala b/core/src/main/scala/kafka/message/Message.scala
index e0efb3d..474f934 100755
--- a/core/src/main/scala/kafka/message/Message.scala
+++ b/core/src/main/scala/kafka/message/Message.scala
@@ -23,7 +23,8 @@ import org.apache.kafka.common.record.{Record, TimestampType}
 
 import scala.math._
 import kafka.utils._
-import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.utils.ByteUtils.{readUnsignedInt, writeUnsignedInt}
+import org.apache.kafka.common.utils.{ByteUtils, Utils}
 
 /**
  * Constants related to messages
@@ -201,7 +202,7 @@ class Message(val buffer: ByteBuffer,
     buffer.rewind()
 
     // now compute the checksum and fill it in
-    Utils.writeUnsignedInt(buffer, CrcOffset, computeChecksum)
+    ByteUtils.writeUnsignedInt(buffer, CrcOffset, computeChecksum)
   }
   
   def this(bytes: Array[Byte], key: Array[Byte], timestamp: Long, codec: CompressionCodec, magicValue: Byte) =
@@ -228,7 +229,7 @@ class Message(val buffer: ByteBuffer,
   /**
    * Retrieve the previously computed CRC for this message
    */
-  def checksum: Long = Utils.readUnsignedInt(buffer, CrcOffset)
+  def checksum: Long = ByteUtils.readUnsignedInt(buffer, CrcOffset)
   
     /**
    * Returns true if the crc stored with the message matches the crc computed off the message contents

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7354e77/core/src/test/scala/unit/kafka/message/MessageTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/message/MessageTest.scala b/core/src/test/scala/unit/kafka/message/MessageTest.scala
index 46c25af..75a86d2 100755
--- a/core/src/test/scala/unit/kafka/message/MessageTest.scala
+++ b/core/src/test/scala/unit/kafka/message/MessageTest.scala
@@ -19,6 +19,7 @@ package kafka.message
 
 import java.nio._
 import java.util.HashMap
+
 import org.apache.kafka.common.protocol.Errors
 
 import scala.collection._
@@ -26,7 +27,7 @@ import org.junit.Assert._
 import org.scalatest.junit.JUnitSuite
 import org.junit.{Before, Test}
 import kafka.utils.TestUtils
-import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.utils.ByteUtils
 
 case class MessageTestVal(key: Array[Byte],
                           payload: Array[Byte],
@@ -89,7 +90,7 @@ class MessageTest extends JUnitSuite {
       assertTrue("Auto-computed checksum should be valid", v.message.isValid)
       // garble checksum
       val badChecksum: Int = (v.message.checksum + 1 % Int.MaxValue).toInt
-      Utils.writeUnsignedInt(v.message.buffer, Message.CrcOffset, badChecksum)
+      ByteUtils.writeUnsignedInt(v.message.buffer, Message.CrcOffset, badChecksum)
       assertFalse("Message with invalid checksum should be invalid", v.message.isValid)
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7354e77/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
index f279c85..a5d8102 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
@@ -23,7 +23,7 @@ import kafka.server.AbstractFetcherThread.{FetchRequest, PartitionData}
 import kafka.utils.TestUtils
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.utils.ByteUtils
 import org.apache.kafka.common.record.{MemoryRecords, Record}
 import org.junit.Assert.{assertFalse, assertTrue}
 import org.junit.{Before, Test}
@@ -184,7 +184,7 @@ class AbstractFetcherThreadTest {
         val corruptedRecord = Record.create("hello".getBytes())
         val badChecksum = (corruptedRecord.checksum + 1 % Int.MaxValue).toInt
         // Garble checksum
-        Utils.writeUnsignedInt(corruptedRecord.buffer, Record.CRC_OFFSET, badChecksum)
+        ByteUtils.writeUnsignedInt(corruptedRecord.buffer, Record.CRC_OFFSET, badChecksum)
         val records = MemoryRecords.withRecords(corruptedRecord)
         fetchRequest.offsets.mapValues(_ => new TestPartitionData(records)).toSeq
       } else

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7354e77/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
index f35de40..77fb58a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
@@ -16,7 +16,7 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import org.apache.kafka.common.record.ByteBufferInputStream;
+import org.apache.kafka.common.utils.ByteBufferInputStream;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.streams.errors.TaskAssignmentException;
 import org.apache.kafka.streams.processor.TaskId;