You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/03/08 22:26:28 UTC
kafka git commit: MINOR: Add varint serde utilities for new message
format
Repository: kafka
Updated Branches:
refs/heads/trunk 29084a9b2 -> f7354e779
MINOR: Add varint serde utilities for new message format
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Ismael Juma <is...@juma.me.uk>
Closes #2647 from hachikuji/add-varint-serdes
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f7354e77
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f7354e77
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f7354e77
Branch: refs/heads/trunk
Commit: f7354e779cb058985dd385b2b1209db71eb7a5e8
Parents: 29084a9
Author: Jason Gustafson <ja...@confluent.io>
Authored: Wed Mar 8 22:25:58 2017 +0000
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Wed Mar 8 22:25:58 2017 +0000
----------------------------------------------------------------------
.../kafka/common/protocol/types/Type.java | 57 ++++
.../common/record/ByteBufferInputStream.java | 48 ---
.../common/record/ByteBufferLogInputStream.java | 4 +-
.../common/record/ByteBufferOutputStream.java | 58 ----
.../kafka/common/record/CompressionType.java | 2 +
.../common/record/KafkaLZ4BlockInputStream.java | 10 +-
.../record/KafkaLZ4BlockOutputStream.java | 10 +-
.../common/record/MemoryRecordsBuilder.java | 1 +
.../org/apache/kafka/common/record/Record.java | 6 +-
.../kafka/common/record/RecordsIterator.java | 1 +
.../common/utils/ByteBufferInputStream.java | 48 +++
.../common/utils/ByteBufferOutputStream.java | 58 ++++
.../apache/kafka/common/utils/ByteUtils.java | 324 +++++++++++++++++++
.../org/apache/kafka/common/utils/Utils.java | 105 +-----
.../clients/consumer/internals/FetcherTest.java | 2 +-
.../types/ProtocolSerializationTest.java | 8 +
.../common/record/CompressionTypeTest.java | 2 +
.../kafka/common/record/SimpleRecordTest.java | 1 +
.../kafka/common/utils/ByteUtilsTest.java | 222 +++++++++++++
.../apache/kafka/common/utils/UtilsTest.java | 80 +----
core/src/main/scala/kafka/message/Message.scala | 7 +-
.../scala/unit/kafka/message/MessageTest.scala | 5 +-
.../server/AbstractFetcherThreadTest.scala | 4 +-
.../internals/assignment/AssignmentInfo.java | 2 +-
24 files changed, 772 insertions(+), 293 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7354e77/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
index 3341f3e..39e46fd 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
@@ -21,6 +21,7 @@ import java.nio.ByteBuffer;
import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.utils.ByteUtils;
import org.apache.kafka.common.utils.Utils;
/**
@@ -473,4 +474,60 @@ public abstract class Type {
}
};
+ public static final Type VARINT = new Type() {
+ @Override
+ public void write(ByteBuffer buffer, Object o) {
+ ByteUtils.writeVarint((Integer) o, buffer);
+ }
+
+ @Override
+ public Integer read(ByteBuffer buffer) {
+ return ByteUtils.readVarint(buffer);
+ }
+
+ @Override
+ public Integer validate(Object item) {
+ if (item instanceof Integer)
+ return (Integer) item;
+ throw new SchemaException(item + " is not an integer");
+ }
+
+ public String toString() {
+ return "VARINT";
+ }
+
+ @Override
+ public int sizeOf(Object o) {
+ return ByteUtils.sizeOfVarint((Integer) o);
+ }
+ };
+
+ public static final Type VARLONG = new Type() {
+ @Override
+ public void write(ByteBuffer buffer, Object o) {
+ ByteUtils.writeVarlong((Long) o, buffer);
+ }
+
+ @Override
+ public Long read(ByteBuffer buffer) {
+ return ByteUtils.readVarlong(buffer);
+ }
+
+ @Override
+ public Long validate(Object item) {
+ if (item instanceof Long)
+ return (Long) item;
+ throw new SchemaException(item + " is not a long");
+ }
+
+ public String toString() {
+ return "VARLONG";
+ }
+
+ @Override
+ public int sizeOf(Object o) {
+ return ByteUtils.sizeOfVarlong((Long) o);
+ }
+ };
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7354e77/clients/src/main/java/org/apache/kafka/common/record/ByteBufferInputStream.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferInputStream.java
deleted file mode 100644
index c033b6c..0000000
--- a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferInputStream.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common.record;
-
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-
-/**
- * A byte buffer backed input inputStream
- */
-public final class ByteBufferInputStream extends InputStream {
- private final ByteBuffer buffer;
-
- public ByteBufferInputStream(ByteBuffer buffer) {
- this.buffer = buffer;
- }
-
- public int read() {
- if (!buffer.hasRemaining()) {
- return -1;
- }
- return buffer.get() & 0xFF;
- }
-
- public int read(byte[] bytes, int off, int len) {
- if (!buffer.hasRemaining()) {
- return -1;
- }
-
- len = Math.min(len, buffer.remaining());
- buffer.get(bytes, off, len);
- return len;
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7354e77/clients/src/main/java/org/apache/kafka/common/record/ByteBufferLogInputStream.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferLogInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferLogInputStream.java
index bdda998..f4a3da4 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferLogInputStream.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferLogInputStream.java
@@ -17,7 +17,7 @@
package org.apache.kafka.common.record;
import org.apache.kafka.common.errors.CorruptRecordException;
-import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.common.utils.ByteUtils;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -106,7 +106,7 @@ class ByteBufferLogInputStream implements LogInputStream<ByteBufferLogInputStrea
buffer.put(LOG_OVERHEAD + Record.ATTRIBUTES_OFFSET, timestampType.updateAttributes(attributes));
buffer.putLong(LOG_OVERHEAD + Record.TIMESTAMP_OFFSET, timestamp);
long crc = record.computeChecksum();
- Utils.writeUnsignedInt(buffer, LOG_OVERHEAD + Record.CRC_OFFSET, crc);
+ ByteUtils.writeUnsignedInt(buffer, LOG_OVERHEAD + Record.CRC_OFFSET, crc);
}
public ByteBuffer buffer() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7354e77/clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java
deleted file mode 100644
index 4eee605..0000000
--- a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common.record;
-
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-
-/**
- * A byte buffer backed output outputStream
- */
-public class ByteBufferOutputStream extends OutputStream {
-
- private static final float REALLOCATION_FACTOR = 1.1f;
-
- private ByteBuffer buffer;
-
- public ByteBufferOutputStream(ByteBuffer buffer) {
- this.buffer = buffer;
- }
-
- public void write(int b) {
- if (buffer.remaining() < 1)
- expandBuffer(buffer.capacity() + 1);
- buffer.put((byte) b);
- }
-
- public void write(byte[] bytes, int off, int len) {
- if (buffer.remaining() < len)
- expandBuffer(buffer.capacity() + len);
- buffer.put(bytes, off, len);
- }
-
- public ByteBuffer buffer() {
- return buffer;
- }
-
- private void expandBuffer(int size) {
- int expandSize = Math.max((int) (buffer.capacity() * REALLOCATION_FACTOR), size);
- ByteBuffer temp = ByteBuffer.allocate(expandSize);
- temp.put(buffer.array(), buffer.arrayOffset(), buffer.position());
- buffer = temp;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7354e77/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
index 658e50c..d88c530 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
@@ -17,6 +17,8 @@
package org.apache.kafka.common.record;
import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.utils.ByteBufferInputStream;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
import java.io.InputStream;
import java.io.OutputStream;
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7354e77/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java
index 6544d13..a53690c 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java
@@ -26,7 +26,7 @@ import java.io.InputStream;
import org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.BD;
import org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.FLG;
-import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.common.utils.ByteUtils;
import net.jpountz.lz4.LZ4Exception;
import net.jpountz.lz4.LZ4Factory;
@@ -112,7 +112,7 @@ public final class KafkaLZ4BlockInputStream extends FilterInputStream {
throw new IOException(PREMATURE_EOS);
}
- if (MAGIC != Utils.readUnsignedIntLE(header, headerOffset - 6)) {
+ if (MAGIC != ByteUtils.readUnsignedIntLE(header, headerOffset - 6)) {
throw new IOException(NOT_SUPPORTED);
}
flg = FLG.fromByte(header[headerOffset - 2]);
@@ -145,13 +145,13 @@ public final class KafkaLZ4BlockInputStream extends FilterInputStream {
* @throws IOException
*/
private void readBlock() throws IOException {
- int blockSize = Utils.readUnsignedIntLE(in);
+ int blockSize = ByteUtils.readUnsignedIntLE(in);
// Check for EndMark
if (blockSize == 0) {
finished = true;
if (flg.isContentChecksumSet())
- Utils.readUnsignedIntLE(in); // TODO: verify this content checksum
+ ByteUtils.readUnsignedIntLE(in); // TODO: verify this content checksum
return;
} else if (blockSize > maxBlockSize) {
throw new IOException(String.format("Block size %s exceeded max: %s", blockSize, maxBlockSize));
@@ -172,7 +172,7 @@ public final class KafkaLZ4BlockInputStream extends FilterInputStream {
}
// verify checksum
- if (flg.isBlockChecksumSet() && Utils.readUnsignedIntLE(in) != checksum.hash(bufferToRead, 0, blockSize, 0)) {
+ if (flg.isBlockChecksumSet() && ByteUtils.readUnsignedIntLE(in) != checksum.hash(bufferToRead, 0, blockSize, 0)) {
throw new IOException(BLOCK_HASH_MISMATCH);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7354e77/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java
index 0a64d43..034b945 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java
@@ -20,7 +20,7 @@ import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.OutputStream;
-import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.common.utils.ByteUtils;
import net.jpountz.lz4.LZ4Compressor;
import net.jpountz.lz4.LZ4Factory;
@@ -138,7 +138,7 @@ public final class KafkaLZ4BlockOutputStream extends FilterOutputStream {
* @throws IOException
*/
private void writeHeader() throws IOException {
- Utils.writeUnsignedIntLE(buffer, 0, MAGIC);
+ ByteUtils.writeUnsignedIntLE(buffer, 0, MAGIC);
bufferOffset = 4;
buffer[bufferOffset++] = flg.toByte();
buffer[bufferOffset++] = bd.toByte();
@@ -182,13 +182,13 @@ public final class KafkaLZ4BlockOutputStream extends FilterOutputStream {
}
// Write content
- Utils.writeUnsignedIntLE(out, compressedLength | compressMethod);
+ ByteUtils.writeUnsignedIntLE(out, compressedLength | compressMethod);
out.write(bufferToWrite, 0, compressedLength);
// Calculate and write block checksum
if (flg.isBlockChecksumSet()) {
int hash = checksum.hash(bufferToWrite, 0, compressedLength, 0);
- Utils.writeUnsignedIntLE(out, hash);
+ ByteUtils.writeUnsignedIntLE(out, hash);
}
bufferOffset = 0;
}
@@ -200,7 +200,7 @@ public final class KafkaLZ4BlockOutputStream extends FilterOutputStream {
* @throws IOException
*/
private void writeEndMark() throws IOException {
- Utils.writeUnsignedIntLE(out, 0);
+ ByteUtils.writeUnsignedIntLE(out, 0);
// TODO implement content checksum, update flg.validate()
finished = true;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7354e77/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
index 32ddcee..f3cf43c 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
@@ -17,6 +17,7 @@
package org.apache.kafka.common.record;
import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7354e77/clients/src/main/java/org/apache/kafka/common/record/Record.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Record.java b/clients/src/main/java/org/apache/kafka/common/record/Record.java
index a54d65d..9932238 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/Record.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/Record.java
@@ -17,6 +17,8 @@
package org.apache.kafka.common.record;
import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.apache.kafka.common.utils.ByteUtils;
import org.apache.kafka.common.utils.Crc32;
import org.apache.kafka.common.utils.Utils;
@@ -118,7 +120,7 @@ public final class Record {
* Retrieve the previously computed CRC for this record
*/
public long checksum() {
- return Utils.readUnsignedInt(buffer, CRC_OFFSET);
+ return ByteUtils.readUnsignedInt(buffer, CRC_OFFSET);
}
/**
@@ -468,7 +470,7 @@ public final class Record {
// compute and fill the crc from the beginning of the message
long crc = Utils.computeChecksum(buffer, recordPosition + MAGIC_OFFSET, recordSize - MAGIC_OFFSET);
- Utils.writeUnsignedInt(buffer, recordPosition + CRC_OFFSET, crc);
+ ByteUtils.writeUnsignedInt(buffer, recordPosition + CRC_OFFSET, crc);
}
private static void write(ByteBuffer buffer,
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7354e77/clients/src/main/java/org/apache/kafka/common/record/RecordsIterator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/RecordsIterator.java b/clients/src/main/java/org/apache/kafka/common/record/RecordsIterator.java
index b6f049c..710ce3b 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/RecordsIterator.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/RecordsIterator.java
@@ -19,6 +19,7 @@ package org.apache.kafka.common.record;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.CorruptRecordException;
import org.apache.kafka.common.utils.AbstractIterator;
+import org.apache.kafka.common.utils.ByteBufferInputStream;
import org.apache.kafka.common.utils.Utils;
import java.io.DataInputStream;
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7354e77/clients/src/main/java/org/apache/kafka/common/utils/ByteBufferInputStream.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/ByteBufferInputStream.java b/clients/src/main/java/org/apache/kafka/common/utils/ByteBufferInputStream.java
new file mode 100644
index 0000000..094a1a7
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/utils/ByteBufferInputStream.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * A byte buffer backed input inputStream
+ */
+public final class ByteBufferInputStream extends InputStream {
+ private final ByteBuffer buffer;
+
+ public ByteBufferInputStream(ByteBuffer buffer) {
+ this.buffer = buffer;
+ }
+
+ public int read() {
+ if (!buffer.hasRemaining()) {
+ return -1;
+ }
+ return buffer.get() & 0xFF;
+ }
+
+ public int read(byte[] bytes, int off, int len) {
+ if (!buffer.hasRemaining()) {
+ return -1;
+ }
+
+ len = Math.min(len, buffer.remaining());
+ buffer.get(bytes, off, len);
+ return len;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7354e77/clients/src/main/java/org/apache/kafka/common/utils/ByteBufferOutputStream.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/ByteBufferOutputStream.java b/clients/src/main/java/org/apache/kafka/common/utils/ByteBufferOutputStream.java
new file mode 100644
index 0000000..9480c6d
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/utils/ByteBufferOutputStream.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * A byte buffer backed output outputStream
+ */
+public class ByteBufferOutputStream extends OutputStream {
+
+ private static final float REALLOCATION_FACTOR = 1.1f;
+
+ private ByteBuffer buffer;
+
+ public ByteBufferOutputStream(ByteBuffer buffer) {
+ this.buffer = buffer;
+ }
+
+ public void write(int b) {
+ if (buffer.remaining() < 1)
+ expandBuffer(buffer.capacity() + 1);
+ buffer.put((byte) b);
+ }
+
+ public void write(byte[] bytes, int off, int len) {
+ if (buffer.remaining() < len)
+ expandBuffer(buffer.capacity() + len);
+ buffer.put(bytes, off, len);
+ }
+
+ public ByteBuffer buffer() {
+ return buffer;
+ }
+
+ private void expandBuffer(int size) {
+ int expandSize = Math.max((int) (buffer.capacity() * REALLOCATION_FACTOR), size);
+ ByteBuffer temp = ByteBuffer.allocate(expandSize);
+ temp.put(buffer.array(), buffer.arrayOffset(), buffer.position());
+ buffer = temp;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7354e77/clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java b/clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java
new file mode 100644
index 0000000..50c90a8
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * This classes exposes low-level methods for reading/writing from byte streams or buffers.
+ */
+public final class ByteUtils {
+
+ private ByteUtils() {}
+
+ /**
+ * Read an unsigned integer from the given position without modifying the buffers position
+ *
+ * @param buffer the buffer to read from
+ * @param index the index from which to read the integer
+ * @return The integer read, as a long to avoid signedness
+ */
+ public static long readUnsignedInt(ByteBuffer buffer, int index) {
+ return buffer.getInt(index) & 0xffffffffL;
+ }
+
+ /**
+ * Read an unsigned integer stored in little-endian format from the {@link InputStream}.
+ *
+ * @param in The stream to read from
+ * @return The integer read (MUST BE TREATED WITH SPECIAL CARE TO AVOID SIGNEDNESS)
+ */
+ public static int readUnsignedIntLE(InputStream in) throws IOException {
+ return in.read()
+ | (in.read() << 8)
+ | (in.read() << 16)
+ | (in.read() << 24);
+ }
+
+ /**
+ * Read an unsigned integer stored in little-endian format from a byte array
+ * at a given offset.
+ *
+ * @param buffer The byte array to read from
+ * @param offset The position in buffer to read from
+ * @return The integer read (MUST BE TREATED WITH SPECIAL CARE TO AVOID SIGNEDNESS)
+ */
+ public static int readUnsignedIntLE(byte[] buffer, int offset) {
+ return (buffer[offset] << 0 & 0xff)
+ | ((buffer[offset + 1] & 0xff) << 8)
+ | ((buffer[offset + 2] & 0xff) << 16)
+ | ((buffer[offset + 3] & 0xff) << 24);
+ }
+
+ /**
+ * Write the given long value as a 4 byte unsigned integer. Overflow is ignored.
+ *
+ * @param buffer The buffer to write to
+ * @param index The position in the buffer at which to begin writing
+ * @param value The value to write
+ */
+ public static void writeUnsignedInt(ByteBuffer buffer, int index, long value) {
+ buffer.putInt(index, (int) (value & 0xffffffffL));
+ }
+
+ /**
+ * Write the given long value as a 4 byte unsigned integer. Overflow is ignored.
+ *
+ * @param buffer The buffer to write to
+ * @param value The value to write
+ */
+ public static void writeUnsignedInt(ByteBuffer buffer, long value) {
+ buffer.putInt((int) (value & 0xffffffffL));
+ }
+
+ /**
+ * Write an unsigned integer in little-endian format to the {@link OutputStream}.
+ *
+ * @param out The stream to write to
+ * @param value The value to write
+ */
+ public static void writeUnsignedIntLE(OutputStream out, int value) throws IOException {
+ out.write(value);
+ out.write(value >>> 8);
+ out.write(value >>> 16);
+ out.write(value >>> 24);
+ }
+
+ /**
+ * Write an unsigned integer in little-endian format to a byte array
+ * at a given offset.
+ *
+ * @param buffer The byte array to write to
+ * @param offset The position in buffer to write to
+ * @param value The value to write
+ */
+ public static void writeUnsignedIntLE(byte[] buffer, int offset, int value) {
+ buffer[offset] = (byte) value;
+ buffer[offset + 1] = (byte) (value >>> 8);
+ buffer[offset + 2] = (byte) (value >>> 16);
+ buffer[offset + 3] = (byte) (value >>> 24);
+ }
+
+ /**
+ * Read an integer stored in variable-length format using zig-zag decoding from
+ * <a href="http://code.google.com/apis/protocolbuffers/docs/encoding.html"> Google Protocol Buffers</a>.
+ *
+ * @param buffer The buffer to read from
+ * @return The integer read
+ *
+ * @throws IllegalArgumentException if variable-length value does not terminate after 5 bytes have been read
+ */
+ public static int readVarint(ByteBuffer buffer) {
+ int value = 0;
+ int i = 0;
+ int b;
+ while (((b = buffer.get()) & 0x80) != 0) {
+ value |= (b & 0x7f) << i;
+ i += 7;
+ if (i > 28)
+ throw illegalVarintException(value);
+ }
+ value |= b << i;
+ return (value >>> 1) ^ -(value & 1);
+ }
+
+ /**
+ * Read an integer stored in variable-length format using zig-zag decoding from
+ * <a href="http://code.google.com/apis/protocolbuffers/docs/encoding.html"> Google Protocol Buffers</a>.
+ *
+ * @param in The input to read from
+ * @return The integer read
+ *
+ * @throws IllegalArgumentException if variable-length value does not terminate after 5 bytes have been read
+ * @throws IOException if {@link DataInput} throws {@link IOException}
+ */
+ public static int readVarint(DataInput in) throws IOException {
+ int value = 0;
+ int i = 0;
+ int b;
+ while (((b = in.readByte()) & 0x80) != 0) {
+ value |= (b & 0x7f) << i;
+ i += 7;
+ if (i > 28)
+ throw illegalVarintException(value);
+ }
+ value |= b << i;
+ return (value >>> 1) ^ -(value & 1);
+ }
+
+ /**
+ * Read a long stored in variable-length format using zig-zag decoding from
+ * <a href="http://code.google.com/apis/protocolbuffers/docs/encoding.html"> Google Protocol Buffers</a>.
+ *
+ * @param in The input to read from
+ * @return The long value read
+ *
+ * @throws IllegalArgumentException if variable-length value does not terminate after 10 bytes have been read
+ * @throws IOException if {@link DataInput} throws {@link IOException}
+ */
+ public static long readVarlong(DataInput in) throws IOException {
+ long value = 0L;
+ int i = 0;
+ long b;
+ while (((b = in.readByte()) & 0x80) != 0) {
+ value |= (b & 0x7f) << i;
+ i += 7;
+ if (i > 63)
+ throw illegalVarlongException(value);
+ }
+ value |= b << i;
+ return (value >>> 1) ^ -(value & 1);
+ }
+
+ /**
+ * Read a long stored in variable-length format using zig-zag decoding from
+ * <a href="http://code.google.com/apis/protocolbuffers/docs/encoding.html"> Google Protocol Buffers</a>.
+ *
+ * @param buffer The buffer to read from
+ * @return The long value read
+ *
+ * @throws IllegalArgumentException if variable-length value does not terminate after 10 bytes have been read
+ */
+ public static long readVarlong(ByteBuffer buffer) {
+ long value = 0L;
+ int i = 0;
+ long b;
+ while (((b = buffer.get()) & 0x80) != 0) {
+ value |= (b & 0x7f) << i;
+ i += 7;
+ if (i > 63)
+ throw illegalVarlongException(value);
+ }
+ value |= b << i;
+ return (value >>> 1) ^ -(value & 1);
+ }
+
+ /**
+ * Write the given integer following the variable-length zig-zag encoding from
+ * <a href="http://code.google.com/apis/protocolbuffers/docs/encoding.html"> Google Protocol Buffers</a>
+ * into the output.
+ *
+ * @param value The value to write
+ * @param out The output to write to
+ */
+ public static void writeVarint(int value, DataOutput out) throws IOException {
+ int v = (value << 1) ^ (value >> 31);
+ while ((v & 0xffffff80) != 0L) {
+ out.writeByte((v & 0x7f) | 0x80);
+ v >>>= 7;
+ }
+ out.writeByte((byte) v);
+ }
+
+ /**
+ * Write the given integer following the variable-length zig-zag encoding from
+ * <a href="http://code.google.com/apis/protocolbuffers/docs/encoding.html"> Google Protocol Buffers</a>
+ * into the buffer.
+ *
+ * @param value The value to write
+ * @param buffer The output to write to
+ */
+ public static void writeVarint(int value, ByteBuffer buffer) {
+ int v = (value << 1) ^ (value >> 31);
+ while ((v & 0xffffff80) != 0L) {
+ byte b = (byte) ((v & 0x7f) | 0x80);
+ buffer.put(b);
+ v >>>= 7;
+ }
+ buffer.put((byte) v);
+ }
+
+ /**
+ * Write the given integer following the variable-length zig-zag encoding from
+ * <a href="http://code.google.com/apis/protocolbuffers/docs/encoding.html"> Google Protocol Buffers</a>
+ * into the output.
+ *
+ * @param value The value to write
+ * @param out The output to write to
+ */
+ public static void writeVarlong(long value, DataOutput out) throws IOException {
+ long v = (value << 1) ^ (value >> 63);
+ while ((v & 0xffffffffffffff80L) != 0L) {
+ out.writeByte(((int) v & 0x7f) | 0x80);
+ v >>>= 7;
+ }
+ out.writeByte((byte) v);
+ }
+
+ /**
+ * Write the given integer following the variable-length zig-zag encoding from
+ * <a href="http://code.google.com/apis/protocolbuffers/docs/encoding.html"> Google Protocol Buffers</a>
+ * into the buffer.
+ *
+ * @param value The value to write
+ * @param buffer The buffer to write to
+ */
+ public static void writeVarlong(long value, ByteBuffer buffer) {
+ long v = (value << 1) ^ (value >> 63);
+ while ((v & 0xffffffffffffff80L) != 0L) {
+ byte b = (byte) ((v & 0x7f) | 0x80);
+ buffer.put(b);
+ v >>>= 7;
+ }
+ buffer.put((byte) v);
+ }
+
+ /**
+ * Number of bytes needed to encode an integer in variable-length format.
+ *
+ * @param value The signed value
+ */
+ public static int sizeOfVarint(int value) {
+ int v = (value << 1) ^ (value >> 31);
+ int bytes = 1;
+ while ((v & 0xffffff80) != 0L) {
+ bytes += 1;
+ v >>>= 7;
+ }
+ return bytes;
+ }
+
+ /**
+ * Number of bytes needed to encode a long in variable-length format.
+ *
+ * @param value The signed value
+ */
+ public static int sizeOfVarlong(long value) {
+ long v = (value << 1) ^ (value >> 63);
+ int bytes = 1;
+ while ((v & 0xffffffffffffff80L) != 0L) {
+ bytes += 1;
+ v >>>= 7;
+ }
+ return bytes;
+ }
+
+ private static IllegalArgumentException illegalVarintException(int value) {
+ throw new IllegalArgumentException("Varint is too long, the most significant bit in the 5th byte is set, " +
+ "converted value: " + Integer.toHexString(value));
+ }
+
+ private static IllegalArgumentException illegalVarlongException(long value) {
+ throw new IllegalArgumentException("Varlong is too long, most significant bit in the 10th byte is set, " +
+ "converted value: " + Long.toHexString(value));
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7354e77/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index ed5eddb..6dce978 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -16,19 +16,24 @@
*/
package org.apache.kafka.common.utils;
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
+import org.apache.kafka.common.KafkaException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.Closeable;
+import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
-import java.io.OutputStream;
-import java.io.UnsupportedEncodingException;
import java.io.FileNotFoundException;
-import java.io.StringWriter;
+import java.io.IOException;
+import java.io.InputStream;
import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
@@ -41,16 +46,10 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import java.util.Properties;
-import java.nio.channels.FileChannel;
-import java.nio.charset.Charset;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.kafka.common.KafkaException;
public class Utils {
@@ -103,85 +102,6 @@ public class Utils {
}
/**
- * Read an unsigned integer from the given position without modifying the buffers position
- *
- * @param buffer the buffer to read from
- * @param index the index from which to read the integer
- * @return The integer read, as a long to avoid signedness
- */
- public static long readUnsignedInt(ByteBuffer buffer, int index) {
- return buffer.getInt(index) & 0xffffffffL;
- }
-
- /**
- * Read an unsigned integer stored in little-endian format from the {@link InputStream}.
- *
- * @param in The stream to read from
- * @return The integer read (MUST BE TREATED WITH SPECIAL CARE TO AVOID SIGNEDNESS)
- */
- public static int readUnsignedIntLE(InputStream in) throws IOException {
- return in.read()
- | (in.read() << 8)
- | (in.read() << 16)
- | (in.read() << 24);
- }
-
- /**
- * Read an unsigned integer stored in little-endian format from a byte array
- * at a given offset.
- *
- * @param buffer The byte array to read from
- * @param offset The position in buffer to read from
- * @return The integer read (MUST BE TREATED WITH SPECIAL CARE TO AVOID SIGNEDNESS)
- */
- public static int readUnsignedIntLE(byte[] buffer, int offset) {
- return (buffer[offset] << 0 & 0xff)
- | ((buffer[offset + 1] & 0xff) << 8)
- | ((buffer[offset + 2] & 0xff) << 16)
- | ((buffer[offset + 3] & 0xff) << 24);
- }
-
- /**
- * Write the given long value as a 4 byte unsigned integer. Overflow is ignored.
- *
- * @param buffer The buffer to write to
- * @param index The position in the buffer at which to begin writing
- * @param value The value to write
- */
- public static void writeUnsignedInt(ByteBuffer buffer, int index, long value) {
- buffer.putInt(index, (int) (value & 0xffffffffL));
- }
-
- /**
- * Write an unsigned integer in little-endian format to the {@link OutputStream}.
- *
- * @param out The stream to write to
- * @param value The value to write
- */
- public static void writeUnsignedIntLE(OutputStream out, int value) throws IOException {
- out.write(value);
- out.write(value >>> 8);
- out.write(value >>> 16);
- out.write(value >>> 24);
- }
-
- /**
- * Write an unsigned integer in little-endian format to a byte array
- * at a given offset.
- *
- * @param buffer The byte array to write to
- * @param offset The position in buffer to write to
- * @param value The value to write
- */
- public static void writeUnsignedIntLE(byte[] buffer, int offset, int value) {
- buffer[offset] = (byte) value;
- buffer[offset + 1] = (byte) (value >>> 8);
- buffer[offset + 2] = (byte) (value >>> 16);
- buffer[offset + 3] = (byte) (value >>> 24);
- }
-
-
- /**
* Get the absolute value of the given number. If the number is Int.MinValue return 0. This is different from
* java.lang.Math.abs or scala.math.abs in that they return Int.MinValue (!).
*/
@@ -824,4 +744,5 @@ public class Utils {
currentPosition += bytesRead;
} while (bytesRead != -1 && destinationBuffer.hasRemaining());
}
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7354e77/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 3f65caf..de527f8 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -40,7 +40,7 @@ import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.record.ByteBufferOutputStream;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7354e77/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java b/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java
index 1c14e82..136b55a 100644
--- a/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java
@@ -40,6 +40,8 @@ public class ProtocolSerializationTest {
new Field("int16", Type.INT16),
new Field("int32", Type.INT32),
new Field("int64", Type.INT64),
+ new Field("varint", Type.VARINT),
+ new Field("varlong", Type.VARLONG),
new Field("string", Type.STRING),
new Field("nullable_string", Type.NULLABLE_STRING),
new Field("bytes", Type.BYTES),
@@ -52,6 +54,8 @@ public class ProtocolSerializationTest {
.set("int16", (short) 1)
.set("int32", 1)
.set("int64", 1L)
+ .set("varint", 300)
+ .set("varlong", 500L)
.set("string", "1")
.set("nullable_string", null)
.set("bytes", ByteBuffer.wrap("1".getBytes()))
@@ -80,6 +84,10 @@ public class ProtocolSerializationTest {
check(Type.NULLABLE_BYTES, null);
check(Type.NULLABLE_BYTES, ByteBuffer.allocate(0));
check(Type.NULLABLE_BYTES, ByteBuffer.wrap("abcd".getBytes()));
+ check(Type.VARINT, Integer.MAX_VALUE);
+ check(Type.VARINT, Integer.MIN_VALUE);
+ check(Type.VARLONG, Long.MAX_VALUE);
+ check(Type.VARLONG, Long.MIN_VALUE);
check(new ArrayOf(Type.INT32), new Object[] {1, 2, 3, 4});
check(new ArrayOf(Type.STRING), new Object[] {});
check(new ArrayOf(Type.STRING), new Object[] {"hello", "there", "beautiful"});
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7354e77/clients/src/test/java/org/apache/kafka/common/record/CompressionTypeTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/CompressionTypeTest.java b/clients/src/test/java/org/apache/kafka/common/record/CompressionTypeTest.java
index 5a1d5f4..21cbfb7 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/CompressionTypeTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/CompressionTypeTest.java
@@ -16,6 +16,8 @@
*/
package org.apache.kafka.common.record;
+import org.apache.kafka.common.utils.ByteBufferInputStream;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
import org.junit.Test;
import java.nio.ByteBuffer;
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7354e77/clients/src/test/java/org/apache/kafka/common/record/SimpleRecordTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/SimpleRecordTest.java b/clients/src/test/java/org/apache/kafka/common/record/SimpleRecordTest.java
index 4bb90cd..aa77ca4 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/SimpleRecordTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/SimpleRecordTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.common.record;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
import org.apache.kafka.common.utils.Utils;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7354e77/clients/src/test/java/org/apache/kafka/common/utils/ByteUtilsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/ByteUtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/ByteUtilsTest.java
new file mode 100644
index 0000000..0a082fb
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/utils/ByteUtilsTest.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+public class ByteUtilsTest {
+ private final byte x00 = 0x00;
+ private final byte x01 = 0x01;
+ private final byte x02 = 0x02;
+ private final byte x0F = 0x0f;
+ private final byte x7E = 0x7E;
+ private final byte x7F = 0x7F;
+ private final byte xFF = (byte) 0xff;
+ private final byte x80 = (byte) 0x80;
+ private final byte x81 = (byte) 0x81;
+ private final byte xFE = (byte) 0xfe;
+
+ @Test
+ public void testReadUnsignedIntLEFromArray() {
+ byte[] array1 = {0x01, 0x02, 0x03, 0x04, 0x05};
+ assertEquals(0x04030201, ByteUtils.readUnsignedIntLE(array1, 0));
+ assertEquals(0x05040302, ByteUtils.readUnsignedIntLE(array1, 1));
+
+ byte[] array2 = {(byte) 0xf1, (byte) 0xf2, (byte) 0xf3, (byte) 0xf4, (byte) 0xf5, (byte) 0xf6};
+ assertEquals(0xf4f3f2f1, ByteUtils.readUnsignedIntLE(array2, 0));
+ assertEquals(0xf6f5f4f3, ByteUtils.readUnsignedIntLE(array2, 2));
+ }
+
+ @Test
+ public void testReadUnsignedIntLEFromInputStream() throws IOException {
+ byte[] array1 = {0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09};
+ ByteArrayInputStream is1 = new ByteArrayInputStream(array1);
+ assertEquals(0x04030201, ByteUtils.readUnsignedIntLE(is1));
+ assertEquals(0x08070605, ByteUtils.readUnsignedIntLE(is1));
+
+ byte[] array2 = {(byte) 0xf1, (byte) 0xf2, (byte) 0xf3, (byte) 0xf4, (byte) 0xf5, (byte) 0xf6, (byte) 0xf7, (byte) 0xf8};
+ ByteArrayInputStream is2 = new ByteArrayInputStream(array2);
+ assertEquals(0xf4f3f2f1, ByteUtils.readUnsignedIntLE(is2));
+ assertEquals(0xf8f7f6f5, ByteUtils.readUnsignedIntLE(is2));
+ }
+
+ @Test
+ public void testWriteUnsignedIntLEToArray() {
+ int value1 = 0x04030201;
+
+ byte[] array1 = new byte[4];
+ ByteUtils.writeUnsignedIntLE(array1, 0, value1);
+ assertArrayEquals(new byte[] {0x01, 0x02, 0x03, 0x04}, array1);
+
+ array1 = new byte[8];
+ ByteUtils.writeUnsignedIntLE(array1, 2, value1);
+ assertArrayEquals(new byte[] {0, 0, 0x01, 0x02, 0x03, 0x04, 0, 0}, array1);
+
+ int value2 = 0xf4f3f2f1;
+
+ byte[] array2 = new byte[4];
+ ByteUtils.writeUnsignedIntLE(array2, 0, value2);
+ assertArrayEquals(new byte[] {(byte) 0xf1, (byte) 0xf2, (byte) 0xf3, (byte) 0xf4}, array2);
+
+ array2 = new byte[8];
+ ByteUtils.writeUnsignedIntLE(array2, 2, value2);
+ assertArrayEquals(new byte[] {0, 0, (byte) 0xf1, (byte) 0xf2, (byte) 0xf3, (byte) 0xf4, 0, 0}, array2);
+ }
+
+ @Test
+ public void testWriteUnsignedIntLEToOutputStream() throws IOException {
+ int value1 = 0x04030201;
+ ByteArrayOutputStream os1 = new ByteArrayOutputStream();
+ ByteUtils.writeUnsignedIntLE(os1, value1);
+ ByteUtils.writeUnsignedIntLE(os1, value1);
+ assertArrayEquals(new byte[] {0x01, 0x02, 0x03, 0x04, 0x01, 0x02, 0x03, 0x04}, os1.toByteArray());
+
+ int value2 = 0xf4f3f2f1;
+ ByteArrayOutputStream os2 = new ByteArrayOutputStream();
+ ByteUtils.writeUnsignedIntLE(os2, value2);
+ assertArrayEquals(new byte[] {(byte) 0xf1, (byte) 0xf2, (byte) 0xf3, (byte) 0xf4}, os2.toByteArray());
+ }
+
+ @Test
+ public void testVarintSerde() throws Exception {
+ assertVarintSerde(0, new byte[] {x00});
+ assertVarintSerde(-1, new byte[] {x01});
+ assertVarintSerde(1, new byte[] {x02});
+ assertVarintSerde(63, new byte[] {x7E});
+ assertVarintSerde(-64, new byte[] {x7F});
+ assertVarintSerde(64, new byte[] {x80, x01});
+ assertVarintSerde(-65, new byte[] {x81, x01});
+ assertVarintSerde(8191, new byte[] {xFE, x7F});
+ assertVarintSerde(-8192, new byte[] {xFF, x7F});
+ assertVarintSerde(8192, new byte[] {x80, x80, x01});
+ assertVarintSerde(-8193, new byte[] {x81, x80, x01});
+ assertVarintSerde(1048575, new byte[] {xFE, xFF, x7F});
+ assertVarintSerde(-1048576, new byte[] {xFF, xFF, x7F});
+ assertVarintSerde(1048576, new byte[] {x80, x80, x80, x01});
+ assertVarintSerde(-1048577, new byte[] {x81, x80, x80, x01});
+ assertVarintSerde(134217727, new byte[] {xFE, xFF, xFF, x7F});
+ assertVarintSerde(-134217728, new byte[] {xFF, xFF, xFF, x7F});
+ assertVarintSerde(134217728, new byte[] {x80, x80, x80, x80, x01});
+ assertVarintSerde(-134217729, new byte[] {x81, x80, x80, x80, x01});
+ assertVarintSerde(Integer.MAX_VALUE, new byte[] {xFE, xFF, xFF, xFF, x0F});
+ assertVarintSerde(Integer.MIN_VALUE, new byte[] {xFF, xFF, xFF, xFF, x0F});
+ }
+
+ @Test
+ public void testVarlongSerde() throws Exception {
+ assertVarlongSerde(0, new byte[] {x00});
+ assertVarlongSerde(-1, new byte[] {x01});
+ assertVarlongSerde(1, new byte[] {x02});
+ assertVarlongSerde(63, new byte[] {x7E});
+ assertVarlongSerde(-64, new byte[] {x7F});
+ assertVarlongSerde(64, new byte[] {x80, x01});
+ assertVarlongSerde(-65, new byte[] {x81, x01});
+ assertVarlongSerde(8191, new byte[] {xFE, x7F});
+ assertVarlongSerde(-8192, new byte[] {xFF, x7F});
+ assertVarlongSerde(8192, new byte[] {x80, x80, x01});
+ assertVarlongSerde(-8193, new byte[] {x81, x80, x01});
+ assertVarlongSerde(1048575, new byte[] {xFE, xFF, x7F});
+ assertVarlongSerde(-1048576, new byte[] {xFF, xFF, x7F});
+ assertVarlongSerde(1048576, new byte[] {x80, x80, x80, x01});
+ assertVarlongSerde(-1048577, new byte[] {x81, x80, x80, x01});
+ assertVarlongSerde(134217727, new byte[] {xFE, xFF, xFF, x7F});
+ assertVarlongSerde(-134217728, new byte[] {xFF, xFF, xFF, x7F});
+ assertVarlongSerde(134217728, new byte[] {x80, x80, x80, x80, x01});
+ assertVarlongSerde(-134217729, new byte[] {x81, x80, x80, x80, x01});
+ assertVarlongSerde(Integer.MAX_VALUE, new byte[] {xFE, xFF, xFF, xFF, x0F});
+ assertVarlongSerde(Integer.MIN_VALUE, new byte[] {xFF, xFF, xFF, xFF, x0F});
+ assertVarlongSerde(17179869183L, new byte[] {xFE, xFF, xFF, xFF, x7F});
+ assertVarlongSerde(-17179869184L, new byte[] {xFF, xFF, xFF, xFF, x7F});
+ assertVarlongSerde(17179869184L, new byte[] {x80, x80, x80, x80, x80, x01});
+ assertVarlongSerde(-17179869185L, new byte[] {x81, x80, x80, x80, x80, x01});
+ assertVarlongSerde(2199023255551L, new byte[] {xFE, xFF, xFF, xFF, xFF, x7F});
+ assertVarlongSerde(-2199023255552L, new byte[] {xFF, xFF, xFF, xFF, xFF, x7F});
+ assertVarlongSerde(2199023255552L, new byte[] {x80, x80, x80, x80, x80, x80, x01});
+ assertVarlongSerde(-2199023255553L, new byte[] {x81, x80, x80, x80, x80, x80, x01});
+ assertVarlongSerde(281474976710655L, new byte[] {xFE, xFF, xFF, xFF, xFF, xFF, x7F});
+ assertVarlongSerde(-281474976710656L, new byte[] {xFF, xFF, xFF, xFF, xFF, xFF, x7F});
+ assertVarlongSerde(281474976710656L, new byte[] {x80, x80, x80, x80, x80, x80, x80, x01});
+ assertVarlongSerde(-281474976710657L, new byte[] {x81, x80, x80, x80, x80, x80, x80, 1});
+ assertVarlongSerde(36028797018963967L, new byte[] {xFE, xFF, xFF, xFF, xFF, xFF, xFF, x7F});
+ assertVarlongSerde(-36028797018963968L, new byte[] {xFF, xFF, xFF, xFF, xFF, xFF, xFF, x7F});
+ assertVarlongSerde(36028797018963968L, new byte[] {x80, x80, x80, x80, x80, x80, x80, x80, x01});
+ assertVarlongSerde(-36028797018963969L, new byte[] {x81, x80, x80, x80, x80, x80, x80, x80, x01});
+ assertVarlongSerde(4611686018427387903L, new byte[] {xFE, xFF, xFF, xFF, xFF, xFF, xFF, xFF, x7F});
+ assertVarlongSerde(-4611686018427387904L, new byte[] {xFF, xFF, xFF, xFF, xFF, xFF, xFF, xFF, x7F});
+ assertVarlongSerde(4611686018427387904L, new byte[] {x80, x80, x80, x80, x80, x80, x80, x80, x80, x01});
+ assertVarlongSerde(-4611686018427387905L, new byte[] {x81, x80, x80, x80, x80, x80, x80, x80, x80, x01});
+ assertVarlongSerde(Long.MAX_VALUE, new byte[] {xFE, xFF, xFF, xFF, xFF, xFF, xFF, xFF, xFF, x01});
+ assertVarlongSerde(Long.MIN_VALUE, new byte[] {xFF, xFF, xFF, xFF, xFF, xFF, xFF, xFF, xFF, x01});
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testInvalidVarint() {
+ // varint encoding has one overflow byte
+ ByteBuffer buf = ByteBuffer.wrap(new byte[] {xFF, xFF, xFF, xFF, xFF, x01});
+ ByteUtils.readVarint(buf);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testInvalidVarlong() {
+ // varlong encoding has one overflow byte
+ ByteBuffer buf = ByteBuffer.wrap(new byte[] {xFF, xFF, xFF, xFF, xFF, xFF, xFF, xFF, xFF, xFF, x01});
+ ByteUtils.readVarlong(buf);
+ }
+
+ private void assertVarintSerde(int value, byte[] expectedEncoding) throws IOException {
+ ByteBuffer buf = ByteBuffer.allocate(32);
+ ByteUtils.writeVarint(value, buf);
+ buf.flip();
+ assertArrayEquals(expectedEncoding, Utils.toArray(buf));
+ assertEquals(value, ByteUtils.readVarint(buf.duplicate()));
+
+ buf.rewind();
+ DataOutputStream out = new DataOutputStream(new ByteBufferOutputStream(buf));
+ ByteUtils.writeVarint(value, out);
+ buf.flip();
+ assertArrayEquals(expectedEncoding, Utils.toArray(buf));
+ DataInputStream in = new DataInputStream(new ByteBufferInputStream(buf));
+ assertEquals(value, ByteUtils.readVarint(in));
+ }
+
+ private void assertVarlongSerde(long value, byte[] expectedEncoding) throws IOException {
+ ByteBuffer buf = ByteBuffer.allocate(32);
+ ByteUtils.writeVarlong(value, buf);
+ buf.flip();
+ assertEquals(value, ByteUtils.readVarlong(buf.duplicate()));
+ assertArrayEquals(expectedEncoding, Utils.toArray(buf));
+
+ buf.rewind();
+ DataOutputStream out = new DataOutputStream(new ByteBufferOutputStream(buf));
+ ByteUtils.writeVarlong(value, out);
+ buf.flip();
+ assertArrayEquals(expectedEncoding, Utils.toArray(buf));
+ DataInputStream in = new DataInputStream(new ByteBufferInputStream(buf));
+ assertEquals(value, ByteUtils.readVarlong(in));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7354e77/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
index 5f36c1c..7672335 100755
--- a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
@@ -16,28 +16,24 @@
*/
package org.apache.kafka.common.utils;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
+import org.apache.kafka.test.TestUtils;
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.junit.Test;
+
+import java.io.Closeable;
import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.StandardOpenOption;
import java.util.Arrays;
import java.util.Collections;
-import java.io.Closeable;
-import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.Random;
-import org.apache.kafka.test.TestUtils;
-import org.easymock.EasyMock;
-import org.easymock.IAnswer;
-import org.junit.Test;
-
-
import static org.apache.kafka.common.utils.Utils.formatAddress;
import static org.apache.kafka.common.utils.Utils.getHost;
import static org.apache.kafka.common.utils.Utils.getPort;
-import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -337,64 +333,4 @@ public class UtilsTest {
}
}
- @Test
- public void testReadUnsignedIntLEFromArray() {
- byte[] array1 = {0x01, 0x02, 0x03, 0x04, 0x05};
- assertEquals(0x04030201, Utils.readUnsignedIntLE(array1, 0));
- assertEquals(0x05040302, Utils.readUnsignedIntLE(array1, 1));
-
- byte[] array2 = {(byte) 0xf1, (byte) 0xf2, (byte) 0xf3, (byte) 0xf4, (byte) 0xf5, (byte) 0xf6};
- assertEquals(0xf4f3f2f1, Utils.readUnsignedIntLE(array2, 0));
- assertEquals(0xf6f5f4f3, Utils.readUnsignedIntLE(array2, 2));
- }
-
- @Test
- public void testReadUnsignedIntLEFromInputStream() throws IOException {
- byte[] array1 = {0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09};
- ByteArrayInputStream is1 = new ByteArrayInputStream(array1);
- assertEquals(0x04030201, Utils.readUnsignedIntLE(is1));
- assertEquals(0x08070605, Utils.readUnsignedIntLE(is1));
-
- byte[] array2 = {(byte) 0xf1, (byte) 0xf2, (byte) 0xf3, (byte) 0xf4, (byte) 0xf5, (byte) 0xf6, (byte) 0xf7, (byte) 0xf8};
- ByteArrayInputStream is2 = new ByteArrayInputStream(array2);
- assertEquals(0xf4f3f2f1, Utils.readUnsignedIntLE(is2));
- assertEquals(0xf8f7f6f5, Utils.readUnsignedIntLE(is2));
- }
-
- @Test
- public void testWriteUnsignedIntLEToArray() {
- int value1 = 0x04030201;
-
- byte[] array1 = new byte[4];
- Utils.writeUnsignedIntLE(array1, 0, value1);
- assertArrayEquals(new byte[] {0x01, 0x02, 0x03, 0x04}, array1);
-
- array1 = new byte[8];
- Utils.writeUnsignedIntLE(array1, 2, value1);
- assertArrayEquals(new byte[] {0, 0, 0x01, 0x02, 0x03, 0x04, 0, 0}, array1);
-
- int value2 = 0xf4f3f2f1;
-
- byte[] array2 = new byte[4];
- Utils.writeUnsignedIntLE(array2, 0, value2);
- assertArrayEquals(new byte[] {(byte) 0xf1, (byte) 0xf2, (byte) 0xf3, (byte) 0xf4}, array2);
-
- array2 = new byte[8];
- Utils.writeUnsignedIntLE(array2, 2, value2);
- assertArrayEquals(new byte[] {0, 0, (byte) 0xf1, (byte) 0xf2, (byte) 0xf3, (byte) 0xf4, 0, 0}, array2);
- }
-
- @Test
- public void testWriteUnsignedIntLEToOutputStream() throws IOException {
- int value1 = 0x04030201;
- ByteArrayOutputStream os1 = new ByteArrayOutputStream();
- Utils.writeUnsignedIntLE(os1, value1);
- Utils.writeUnsignedIntLE(os1, value1);
- assertArrayEquals(new byte[] {0x01, 0x02, 0x03, 0x04, 0x01, 0x02, 0x03, 0x04}, os1.toByteArray());
-
- int value2 = 0xf4f3f2f1;
- ByteArrayOutputStream os2 = new ByteArrayOutputStream();
- Utils.writeUnsignedIntLE(os2, value2);
- assertArrayEquals(new byte[] {(byte) 0xf1, (byte) 0xf2, (byte) 0xf3, (byte) 0xf4}, os2.toByteArray());
- }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7354e77/core/src/main/scala/kafka/message/Message.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/Message.scala b/core/src/main/scala/kafka/message/Message.scala
index e0efb3d..474f934 100755
--- a/core/src/main/scala/kafka/message/Message.scala
+++ b/core/src/main/scala/kafka/message/Message.scala
@@ -23,7 +23,8 @@ import org.apache.kafka.common.record.{Record, TimestampType}
import scala.math._
import kafka.utils._
-import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.utils.ByteUtils.{readUnsignedInt, writeUnsignedInt}
+import org.apache.kafka.common.utils.{ByteUtils, Utils}
/**
* Constants related to messages
@@ -201,7 +202,7 @@ class Message(val buffer: ByteBuffer,
buffer.rewind()
// now compute the checksum and fill it in
- Utils.writeUnsignedInt(buffer, CrcOffset, computeChecksum)
+ ByteUtils.writeUnsignedInt(buffer, CrcOffset, computeChecksum)
}
def this(bytes: Array[Byte], key: Array[Byte], timestamp: Long, codec: CompressionCodec, magicValue: Byte) =
@@ -228,7 +229,7 @@ class Message(val buffer: ByteBuffer,
/**
* Retrieve the previously computed CRC for this message
*/
- def checksum: Long = Utils.readUnsignedInt(buffer, CrcOffset)
+ def checksum: Long = ByteUtils.readUnsignedInt(buffer, CrcOffset)
/**
* Returns true if the crc stored with the message matches the crc computed off the message contents
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7354e77/core/src/test/scala/unit/kafka/message/MessageTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/message/MessageTest.scala b/core/src/test/scala/unit/kafka/message/MessageTest.scala
index 46c25af..75a86d2 100755
--- a/core/src/test/scala/unit/kafka/message/MessageTest.scala
+++ b/core/src/test/scala/unit/kafka/message/MessageTest.scala
@@ -19,6 +19,7 @@ package kafka.message
import java.nio._
import java.util.HashMap
+
import org.apache.kafka.common.protocol.Errors
import scala.collection._
@@ -26,7 +27,7 @@ import org.junit.Assert._
import org.scalatest.junit.JUnitSuite
import org.junit.{Before, Test}
import kafka.utils.TestUtils
-import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.utils.ByteUtils
case class MessageTestVal(key: Array[Byte],
payload: Array[Byte],
@@ -89,7 +90,7 @@ class MessageTest extends JUnitSuite {
assertTrue("Auto-computed checksum should be valid", v.message.isValid)
// garble checksum
val badChecksum: Int = (v.message.checksum + 1 % Int.MaxValue).toInt
- Utils.writeUnsignedInt(v.message.buffer, Message.CrcOffset, badChecksum)
+ ByteUtils.writeUnsignedInt(v.message.buffer, Message.CrcOffset, badChecksum)
assertFalse("Message with invalid checksum should be invalid", v.message.isValid)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7354e77/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
index f279c85..a5d8102 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
@@ -23,7 +23,7 @@ import kafka.server.AbstractFetcherThread.{FetchRequest, PartitionData}
import kafka.utils.TestUtils
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.utils.ByteUtils
import org.apache.kafka.common.record.{MemoryRecords, Record}
import org.junit.Assert.{assertFalse, assertTrue}
import org.junit.{Before, Test}
@@ -184,7 +184,7 @@ class AbstractFetcherThreadTest {
val corruptedRecord = Record.create("hello".getBytes())
val badChecksum = (corruptedRecord.checksum + 1 % Int.MaxValue).toInt
// Garble checksum
- Utils.writeUnsignedInt(corruptedRecord.buffer, Record.CRC_OFFSET, badChecksum)
+ ByteUtils.writeUnsignedInt(corruptedRecord.buffer, Record.CRC_OFFSET, badChecksum)
val records = MemoryRecords.withRecords(corruptedRecord)
fetchRequest.offsets.mapValues(_ => new TestPartitionData(records)).toSeq
} else
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7354e77/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
index f35de40..77fb58a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
@@ -16,7 +16,7 @@
*/
package org.apache.kafka.streams.processor.internals.assignment;
-import org.apache.kafka.common.record.ByteBufferInputStream;
+import org.apache.kafka.common.utils.ByteBufferInputStream;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.errors.TaskAssignmentException;
import org.apache.kafka.streams.processor.TaskId;