You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2014/01/28 20:16:30 UTC
[4/7] KAFKA-1227 New producer!
http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/protocol/types/Field.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/protocol/types/Field.java b/clients/src/main/java/kafka/common/protocol/types/Field.java
new file mode 100644
index 0000000..d018a12
--- /dev/null
+++ b/clients/src/main/java/kafka/common/protocol/types/Field.java
@@ -0,0 +1,48 @@
+package kafka.common.protocol.types;
+
+/**
+ * A field in a schema
+ */
+public class Field {
+
+ public static final Object NO_DEFAULT = new Object();
+
+ final int index;
+ public final String name;
+ public final Type type;
+ public final Object defaultValue;
+ public final String doc;
+ final Schema schema;
+
+ public Field(int index, String name, Type type, String doc, Object defaultValue, Schema schema) {
+ this.index = index;
+ this.name = name;
+ this.type = type;
+ this.doc = doc;
+ this.defaultValue = defaultValue;
+ this.schema = schema;
+ if (defaultValue != NO_DEFAULT)
+ type.validate(defaultValue);
+ }
+
+ public Field(int index, String name, Type type, String doc, Object defaultValue) {
+ this(index, name, type, doc, defaultValue, null);
+ }
+
+ public Field(String name, Type type, String doc, Object defaultValue) {
+ this(-1, name, type, doc, defaultValue);
+ }
+
+ public Field(String name, Type type, String doc) {
+ this(name, type, doc, NO_DEFAULT);
+ }
+
+ public Field(String name, Type type) {
+ this(name, type, "");
+ }
+
+ public Type type() {
+ return type;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/protocol/types/Schema.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/protocol/types/Schema.java b/clients/src/main/java/kafka/common/protocol/types/Schema.java
new file mode 100644
index 0000000..b7b1c75
--- /dev/null
+++ b/clients/src/main/java/kafka/common/protocol/types/Schema.java
@@ -0,0 +1,134 @@
+package kafka.common.protocol.types;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * The schema for a compound record definition
+ */
+public class Schema extends Type {
+
+ private final Field[] fields;
+ private final Map<String, Field> fieldsByName;
+
+ public Schema(Field... fs) {
+ this.fields = new Field[fs.length];
+ this.fieldsByName = new HashMap<String, Field>();
+ for (int i = 0; i < this.fields.length; i++) {
+ Field field = fs[i];
+ if (fieldsByName.containsKey(field.name))
+ throw new SchemaException("Schema contains a duplicate field: " + field.name);
+ this.fields[i] = new Field(i, field.name, field.type, field.doc, field.defaultValue, this);
+ this.fieldsByName.put(fs[i].name, this.fields[i]);
+ }
+ }
+
+ /**
+ * Write a struct to the buffer
+ */
+ public void write(ByteBuffer buffer, Object o) {
+ Struct r = (Struct) o;
+ for (int i = 0; i < fields.length; i++) {
+ Field f = fields[i];
+ try {
+ Object value = f.type().validate(r.get(f));
+ f.type.write(buffer, value);
+ } catch (Exception e) {
+ throw new SchemaException("Error writing field '" + f.name + "': " + e.getMessage() == null ? e.getMessage() : e.getClass()
+ .getName());
+ }
+ }
+ }
+
+ /**
+ * Read a struct from the buffer
+ */
+ public Object read(ByteBuffer buffer) {
+ Object[] objects = new Object[fields.length];
+ for (int i = 0; i < fields.length; i++)
+ objects[i] = fields[i].type.read(buffer);
+ return new Struct(this, objects);
+ }
+
+ /**
+ * The size of the given record
+ */
+ public int sizeOf(Object o) {
+ int size = 0;
+ Struct r = (Struct) o;
+ for (int i = 0; i < fields.length; i++)
+ size += fields[i].type.sizeOf(r.get(fields[i]));
+ return size;
+ }
+
+ /**
+ * The number of fields in this schema
+ */
+ public int numFields() {
+ return this.fields.length;
+ }
+
+ /**
+ * Get a field by its slot in the record array
+ *
+ * @param slot The slot at which this field sits
+ * @return The field
+ */
+ public Field get(int slot) {
+ return this.fields[slot];
+ }
+
+ /**
+ * Get a field by its name
+ *
+ * @param name The name of the field
+ * @return The field
+ */
+ public Field get(String name) {
+ return this.fieldsByName.get(name);
+ }
+
+ /**
+ * Get all the fields in this schema
+ */
+ public Field[] fields() {
+ return this.fields;
+ }
+
+ /**
+ * Display a string representation of the schema
+ */
+ public String toString() {
+ StringBuilder b = new StringBuilder();
+ b.append('{');
+ for (int i = 0; i < this.fields.length; i++) {
+ b.append(this.fields[i].name);
+ b.append(':');
+ b.append(this.fields[i].type());
+ if (i < this.fields.length - 1)
+ b.append(',');
+ }
+ b.append("}");
+ return b.toString();
+ }
+
+ @Override
+ public Struct validate(Object item) {
+ try {
+ Struct struct = (Struct) item;
+ for (int i = 0; i < this.fields.length; i++) {
+ Field field = this.fields[i];
+ try {
+ field.type.validate(struct.get(field));
+ } catch (SchemaException e) {
+ throw new SchemaException("Invalid value for field '" + field.name + "': " + e.getMessage());
+ }
+ }
+ return struct;
+ } catch (ClassCastException e) {
+ throw new SchemaException("Not a Struct.");
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/protocol/types/SchemaException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/protocol/types/SchemaException.java b/clients/src/main/java/kafka/common/protocol/types/SchemaException.java
new file mode 100644
index 0000000..a2a2d50
--- /dev/null
+++ b/clients/src/main/java/kafka/common/protocol/types/SchemaException.java
@@ -0,0 +1,13 @@
+package kafka.common.protocol.types;
+
+import kafka.common.KafkaException;
+
+public class SchemaException extends KafkaException {
+
+ private static final long serialVersionUID = 1L;
+
+ public SchemaException(String message) {
+ super(message);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/protocol/types/Struct.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/protocol/types/Struct.java b/clients/src/main/java/kafka/common/protocol/types/Struct.java
new file mode 100644
index 0000000..c83aefa
--- /dev/null
+++ b/clients/src/main/java/kafka/common/protocol/types/Struct.java
@@ -0,0 +1,227 @@
+package kafka.common.protocol.types;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+/**
+ * A record that can be serialized and deserialized according to a pre-defined schema
+ */
+public class Struct {
+ private final Schema schema;
+ private final Object[] values;
+
+ Struct(Schema schema, Object[] values) {
+ this.schema = schema;
+ this.values = values;
+ }
+
+ public Struct(Schema schema) {
+ this.schema = schema;
+ this.values = new Object[this.schema.numFields()];
+ }
+
+ /**
+ * The schema for this struct.
+ */
+ public Schema schema() {
+ return this.schema;
+ }
+
+ /**
+ * Return the value of the given pre-validated field, or if the value is missing return the default value.
+ *
+ * @param field The field for which to get the default value
+ * @throws SchemaException if the field has no value and has no default.
+ */
+ private Object getFieldOrDefault(Field field) {
+ Object value = this.values[field.index];
+ if (value != null)
+ return value;
+ else if (field.defaultValue != Field.NO_DEFAULT)
+ return field.defaultValue;
+ else
+ throw new SchemaException("Missing value for field '" + field.name + " which has no default value.");
+ }
+
+ /**
+ * Get the value for the field directly by the field index with no lookup needed (faster!)
+ *
+ * @param field The field to look up
+ * @return The value for that field.
+ */
+ public Object get(Field field) {
+ validateField(field);
+ return getFieldOrDefault(field);
+ }
+
+ /**
+ * Get the record value for the field with the given name by doing a hash table lookup (slower!)
+ *
+ * @param name The name of the field
+ * @return The value in the record
+ */
+ public Object get(String name) {
+ Field field = schema.get(name);
+ if (field == null)
+ throw new SchemaException("No such field: " + name);
+ return getFieldOrDefault(field);
+ }
+
+ public Struct getStruct(Field field) {
+ return (Struct) get(field);
+ }
+
+ public Struct getStruct(String name) {
+ return (Struct) get(name);
+ }
+
+ public Short getShort(Field field) {
+ return (Short) get(field);
+ }
+
+ public Short getShort(String name) {
+ return (Short) get(name);
+ }
+
+ public Integer getInt(Field field) {
+ return (Integer) get(field);
+ }
+
+ public Integer getInt(String name) {
+ return (Integer) get(name);
+ }
+
+ public Object[] getArray(Field field) {
+ return (Object[]) get(field);
+ }
+
+ public Object[] getArray(String name) {
+ return (Object[]) get(name);
+ }
+
+ public String getString(Field field) {
+ return (String) get(field);
+ }
+
+ public String getString(String name) {
+ return (String) get(name);
+ }
+
+ /**
+ * Set the given field to the specified value
+ *
+ * @param field The field
+ * @param value The value
+ */
+ public Struct set(Field field, Object value) {
+ validateField(field);
+ this.values[field.index] = value;
+ return this;
+ }
+
+ /**
+ * Set the field specified by the given name to the value
+ *
+ * @param name The name of the field
+ * @param value The value to set
+ */
+ public Struct set(String name, Object value) {
+ Field field = this.schema.get(name);
+ if (field == null)
+ throw new SchemaException("Unknown field: " + name);
+ this.values[field.index] = value;
+ return this;
+ }
+
+ /**
+ * Create a struct for the schema of a container type (struct or array)
+ *
+ * @param field The field to create an instance of
+ * @return The struct
+ */
+ public Struct instance(Field field) {
+ validateField(field);
+ if (field.type() instanceof Schema) {
+ return new Struct((Schema) field.type());
+ } else if (field.type() instanceof ArrayOf) {
+ ArrayOf array = (ArrayOf) field.type();
+ return new Struct((Schema) array.type());
+ } else {
+ throw new SchemaException("Field " + field.name + " is not a container type, it is of type " + field.type());
+ }
+ }
+
+ /**
+ * Create a struct instance for the given field which must be a container type (struct or array)
+ *
+ * @param field The name of the field to create (field must be a schema type)
+ * @return The struct
+ */
+ public Struct instance(String field) {
+ return instance(schema.get(field));
+ }
+
+ /**
+ * Empty all the values from this record
+ */
+ public void clear() {
+ Arrays.fill(this.values, null);
+ }
+
+ /**
+ * Get the serialized size of this object
+ */
+ public int sizeOf() {
+ return this.schema.sizeOf(this);
+ }
+
+ /**
+ * Write this struct to a buffer
+ */
+ public void writeTo(ByteBuffer buffer) {
+ this.schema.write(buffer, this);
+ }
+
+ /**
+ * Ensure the user doesn't try to access fields from the wrong schema
+ */
+ private void validateField(Field field) {
+ if (this.schema != field.schema)
+ throw new SchemaException("Attempt to access field '" + field.name + " from a different schema instance.");
+ if (field.index > values.length)
+ throw new SchemaException("Invalid field index: " + field.index);
+ }
+
+ /**
+ * Validate the contents of this struct against its schema
+ */
+ public void validate() {
+ this.schema.validate(this);
+ }
+
+ /**
+ * Create a byte buffer containing the serialized form of the values in this struct. This method can choose to break
+ * the struct into multiple ByteBuffers if need be.
+ */
+ public ByteBuffer[] toBytes() {
+ ByteBuffer buffer = ByteBuffer.allocate(sizeOf());
+ writeTo(buffer);
+ return new ByteBuffer[] { buffer };
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder b = new StringBuilder();
+ b.append('{');
+ for (int i = 0; i < this.values.length; i++) {
+ b.append(this.schema.get(i).name);
+ b.append('=');
+ b.append(this.values[i]);
+ if (i < this.values.length - 1)
+ b.append(',');
+ }
+ b.append('}');
+ return b.toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/protocol/types/Type.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/protocol/types/Type.java b/clients/src/main/java/kafka/common/protocol/types/Type.java
new file mode 100644
index 0000000..f4c93e3
--- /dev/null
+++ b/clients/src/main/java/kafka/common/protocol/types/Type.java
@@ -0,0 +1,216 @@
+package kafka.common.protocol.types;
+
+import java.nio.ByteBuffer;
+
+import kafka.common.utils.Utils;
+
+/**
+ * A serializable type
+ */
+public abstract class Type {
+
+ public abstract void write(ByteBuffer buffer, Object o);
+
+ public abstract Object read(ByteBuffer buffer);
+
+ public abstract int sizeOf(Object o);
+
+ public abstract Object validate(Object o);
+
+ public static final Type INT8 = new Type() {
+ @Override
+ public void write(ByteBuffer buffer, Object o) {
+ buffer.put((Byte) o);
+ }
+
+ @Override
+ public Object read(ByteBuffer buffer) {
+ return buffer.get();
+ }
+
+ @Override
+ public int sizeOf(Object o) {
+ return 1;
+ }
+
+ @Override
+ public String toString() {
+ return "INT8";
+ }
+
+ @Override
+ public Byte validate(Object item) {
+ if (item instanceof Byte)
+ return (Byte) item;
+ else
+ throw new SchemaException(item + " is not a Byte.");
+ }
+ };
+
+ public static final Type INT16 = new Type() {
+ @Override
+ public void write(ByteBuffer buffer, Object o) {
+ buffer.putShort((Short) o);
+ }
+
+ @Override
+ public Object read(ByteBuffer buffer) {
+ return buffer.getShort();
+ }
+
+ @Override
+ public int sizeOf(Object o) {
+ return 2;
+ }
+
+ @Override
+ public String toString() {
+ return "INT16";
+ }
+
+ @Override
+ public Short validate(Object item) {
+ if (item instanceof Short)
+ return (Short) item;
+ else
+ throw new SchemaException(item + " is not a Short.");
+ }
+ };
+
+ public static final Type INT32 = new Type() {
+ @Override
+ public void write(ByteBuffer buffer, Object o) {
+ buffer.putInt((Integer) o);
+ }
+
+ @Override
+ public Object read(ByteBuffer buffer) {
+ return buffer.getInt();
+ }
+
+ @Override
+ public int sizeOf(Object o) {
+ return 4;
+ }
+
+ @Override
+ public String toString() {
+ return "INT32";
+ }
+
+ @Override
+ public Integer validate(Object item) {
+ if (item instanceof Integer)
+ return (Integer) item;
+ else
+ throw new SchemaException(item + " is not an Integer.");
+ }
+ };
+
+ public static final Type INT64 = new Type() {
+ @Override
+ public void write(ByteBuffer buffer, Object o) {
+ buffer.putLong((Long) o);
+ }
+
+ @Override
+ public Object read(ByteBuffer buffer) {
+ return buffer.getLong();
+ }
+
+ @Override
+ public int sizeOf(Object o) {
+ return 8;
+ }
+
+ @Override
+ public String toString() {
+ return "INT64";
+ }
+
+ @Override
+ public Long validate(Object item) {
+ if (item instanceof Long)
+ return (Long) item;
+ else
+ throw new SchemaException(item + " is not a Long.");
+ }
+ };
+
+ public static final Type STRING = new Type() {
+ @Override
+ public void write(ByteBuffer buffer, Object o) {
+ byte[] bytes = Utils.utf8((String) o);
+ if (bytes.length > Short.MAX_VALUE)
+ throw new SchemaException("String is longer than the maximum string length.");
+ buffer.putShort((short) bytes.length);
+ buffer.put(bytes);
+ }
+
+ @Override
+ public Object read(ByteBuffer buffer) {
+ int length = buffer.getShort();
+ byte[] bytes = new byte[length];
+ buffer.get(bytes);
+ return Utils.utf8(bytes);
+ }
+
+ @Override
+ public int sizeOf(Object o) {
+ return 2 + Utils.utf8Length((String) o);
+ }
+
+ @Override
+ public String toString() {
+ return "STRING";
+ }
+
+ @Override
+ public String validate(Object item) {
+ if (item instanceof String)
+ return (String) item;
+ else
+ throw new SchemaException(item + " is not a String.");
+ }
+ };
+
+ public static final Type BYTES = new Type() {
+ @Override
+ public void write(ByteBuffer buffer, Object o) {
+ ByteBuffer arg = (ByteBuffer) o;
+ int pos = arg.position();
+ buffer.putInt(arg.remaining());
+ buffer.put(arg);
+ arg.position(pos);
+ }
+
+ @Override
+ public Object read(ByteBuffer buffer) {
+ int size = buffer.getInt();
+ ByteBuffer val = buffer.slice();
+ val.limit(size);
+ buffer.position(buffer.position() + size);
+ return val;
+ }
+
+ @Override
+ public int sizeOf(Object o) {
+ ByteBuffer buffer = (ByteBuffer) o;
+ return 4 + buffer.remaining();
+ }
+
+ @Override
+ public String toString() {
+ return "BYTES";
+ }
+
+ @Override
+ public ByteBuffer validate(Object item) {
+ if (item instanceof ByteBuffer)
+ return (ByteBuffer) item;
+ else
+ throw new SchemaException(item + " is not a java.nio.ByteBuffer.");
+ }
+ };
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/record/CompressionType.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/record/CompressionType.java b/clients/src/main/java/kafka/common/record/CompressionType.java
new file mode 100644
index 0000000..f6d9026
--- /dev/null
+++ b/clients/src/main/java/kafka/common/record/CompressionType.java
@@ -0,0 +1,40 @@
+package kafka.common.record;
+
+/**
+ * The compression type to use
+ */
+public enum CompressionType {
+ NONE(0, "none"), GZIP(1, "gzip"), SNAPPY(2, "snappy");
+
+ public final int id;
+ public final String name;
+
+ private CompressionType(int id, String name) {
+ this.id = id;
+ this.name = name;
+ }
+
+ public static CompressionType forId(int id) {
+ switch (id) {
+ case 0:
+ return NONE;
+ case 1:
+ return GZIP;
+ case 2:
+ return SNAPPY;
+ default:
+ throw new IllegalArgumentException("Unknown compression type id: " + id);
+ }
+ }
+
+ public static CompressionType forName(String name) {
+ if (NONE.name.equals(name))
+ return NONE;
+ else if (GZIP.name.equals(name))
+ return GZIP;
+ else if (SNAPPY.name.equals(name))
+ return SNAPPY;
+ else
+ throw new IllegalArgumentException("Unknown compression name: " + name);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/record/InvalidRecordException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/record/InvalidRecordException.java b/clients/src/main/java/kafka/common/record/InvalidRecordException.java
new file mode 100644
index 0000000..97fbe50
--- /dev/null
+++ b/clients/src/main/java/kafka/common/record/InvalidRecordException.java
@@ -0,0 +1,11 @@
+package kafka.common.record;
+
+public class InvalidRecordException extends RuntimeException {
+
+ private static final long serialVersionUID = 1;
+
+ public InvalidRecordException(String s) {
+ super(s);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/record/LogEntry.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/record/LogEntry.java b/clients/src/main/java/kafka/common/record/LogEntry.java
new file mode 100644
index 0000000..f5e99c9
--- /dev/null
+++ b/clients/src/main/java/kafka/common/record/LogEntry.java
@@ -0,0 +1,28 @@
+package kafka.common.record;
+
+/**
+ * An offset and record pair
+ */
+public final class LogEntry {
+
+ private final long offset;
+ private final Record record;
+
+ public LogEntry(long offset, Record record) {
+ this.offset = offset;
+ this.record = record;
+ }
+
+ public long offset() {
+ return this.offset;
+ }
+
+ public Record record() {
+ return this.record;
+ }
+
+ @Override
+ public String toString() {
+ return "LogEntry(" + offset + ", " + record + ")";
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/record/MemoryRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/record/MemoryRecords.java b/clients/src/main/java/kafka/common/record/MemoryRecords.java
new file mode 100644
index 0000000..ec98226
--- /dev/null
+++ b/clients/src/main/java/kafka/common/record/MemoryRecords.java
@@ -0,0 +1,102 @@
+package kafka.common.record;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.GatheringByteChannel;
+import java.util.Iterator;
+
+import kafka.common.utils.AbstractIterator;
+
+/**
+ * A {@link Records} implementation backed by a ByteBuffer.
+ */
+public class MemoryRecords implements Records {
+
+ private final ByteBuffer buffer;
+
+ public MemoryRecords(int size) {
+ this(ByteBuffer.allocate(size));
+ }
+
+ public MemoryRecords(ByteBuffer buffer) {
+ this.buffer = buffer;
+ }
+
+ /**
+ * Append the given record and offset to the buffer
+ */
+ public void append(long offset, Record record) {
+ buffer.putLong(offset);
+ buffer.putInt(record.size());
+ buffer.put(record.buffer());
+ record.buffer().rewind();
+ }
+
+ /**
+ * Append a new record and offset to the buffer
+ */
+ public void append(long offset, byte[] key, byte[] value, CompressionType type) {
+ buffer.putLong(offset);
+ buffer.putInt(Record.recordSize(key, value));
+ Record.write(this.buffer, key, value, type);
+ }
+
+ /**
+ * Check if we have room for a new record containing the given key/value pair
+ */
+ public boolean hasRoomFor(byte[] key, byte[] value) {
+ return this.buffer.remaining() >= Records.LOG_OVERHEAD + Record.recordSize(key, value);
+ }
+
+ /** Write the messages in this set to the given channel */
+ public int writeTo(GatheringByteChannel channel) throws IOException {
+ return channel.write(buffer);
+ }
+
+ /**
+ * The size of this record set
+ */
+ public int sizeInBytes() {
+ return this.buffer.position();
+ }
+
+ /**
+ * Get the byte buffer that backs this records instance
+ */
+ public ByteBuffer buffer() {
+ return buffer.duplicate();
+ }
+
+ @Override
+ public Iterator<LogEntry> iterator() {
+ return new RecordsIterator(this.buffer);
+ }
+
+ /* TODO: allow reuse of the buffer used for iteration */
+ public static class RecordsIterator extends AbstractIterator<LogEntry> {
+ private final ByteBuffer buffer;
+
+ public RecordsIterator(ByteBuffer buffer) {
+ ByteBuffer copy = buffer.duplicate();
+ copy.flip();
+ this.buffer = copy;
+ }
+
+ @Override
+ protected LogEntry makeNext() {
+ if (buffer.remaining() < Records.LOG_OVERHEAD)
+ return allDone();
+ long offset = buffer.getLong();
+ int size = buffer.getInt();
+ if (size < 0)
+ throw new IllegalStateException("Message with size " + size);
+ if (buffer.remaining() < size)
+ return allDone();
+ ByteBuffer rec = buffer.slice();
+ rec.limit(size);
+ this.buffer.position(this.buffer.position() + size);
+ return new LogEntry(offset, new Record(rec));
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/record/Record.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/record/Record.java b/clients/src/main/java/kafka/common/record/Record.java
new file mode 100644
index 0000000..835a0a4
--- /dev/null
+++ b/clients/src/main/java/kafka/common/record/Record.java
@@ -0,0 +1,286 @@
+package kafka.common.record;
+
+import java.nio.ByteBuffer;
+
+import kafka.common.utils.Utils;
+
+/**
+ * A record: a serialized key and value along with the associated CRC and other fields
+ */
+public final class Record {
+
+ /**
+ * The current offset and size for all the fixed-length fields
+ */
+ public static final int CRC_OFFSET = 0;
+ public static final int CRC_LENGTH = 4;
+ public static final int MAGIC_OFFSET = CRC_OFFSET + CRC_LENGTH;
+ public static final int MAGIC_LENGTH = 1;
+ public static final int ATTRIBUTES_OFFSET = MAGIC_OFFSET + MAGIC_LENGTH;
+ public static final int ATTRIBUTE_LENGTH = 1;
+ public static final int KEY_SIZE_OFFSET = ATTRIBUTES_OFFSET + ATTRIBUTE_LENGTH;
+ public static final int KEY_SIZE_LENGTH = 4;
+ public static final int KEY_OFFSET = KEY_SIZE_OFFSET + KEY_SIZE_LENGTH;
+ public static final int VALUE_SIZE_LENGTH = 4;
+
+ /** The amount of overhead bytes in a record */
+ public static final int RECORD_OVERHEAD = KEY_OFFSET + VALUE_SIZE_LENGTH;
+
+ /**
+ * The minimum valid size for the record header
+ */
+ public static final int MIN_HEADER_SIZE = CRC_LENGTH + MAGIC_LENGTH + ATTRIBUTE_LENGTH + KEY_SIZE_LENGTH + VALUE_SIZE_LENGTH;
+
+ /**
+ * The current "magic" value
+ */
+ public static final byte CURRENT_MAGIC_VALUE = 0;
+
+ /**
+ * Specifies the mask for the compression code. 2 bits to hold the compression codec. 0 is reserved to indicate no
+ * compression
+ */
+ public static final int COMPRESSION_CODEC_MASK = 0x03;
+
+ /**
+ * Compression code for uncompressed records
+ */
+ public static final int NO_COMPRESSION = 0;
+
+ private final ByteBuffer buffer;
+
+ public Record(ByteBuffer buffer) {
+ this.buffer = buffer;
+ }
+
+ /**
+ * A constructor to create a LogRecord
+ *
+ * @param key The key of the record (null, if none)
+ * @param value The record value
+ * @param codec The compression codec used on the contents of the record (if any)
+ * @param valueOffset The offset into the payload array used to extract payload
+ * @param valueSize The size of the payload to use
+ */
+ public Record(byte[] key, byte[] value, CompressionType codec, int valueOffset, int valueSize) {
+ this(ByteBuffer.allocate(recordSize(key == null ? 0 : key.length, value == null ? 0 : valueSize >= 0 ? valueSize
+ : value.length - valueOffset)));
+ write(this.buffer, key, value, codec, valueOffset, valueSize);
+ this.buffer.rewind();
+ }
+
+ public Record(byte[] key, byte[] value, CompressionType codec) {
+ this(key, value, codec, 0, -1);
+ }
+
+ public Record(byte[] value, CompressionType codec) {
+ this(null, value, codec);
+ }
+
+ public Record(byte[] key, byte[] value) {
+ this(key, value, CompressionType.NONE);
+ }
+
+ public Record(byte[] value) {
+ this(null, value, CompressionType.NONE);
+ }
+
+ public static void write(ByteBuffer buffer, byte[] key, byte[] value, CompressionType codec, int valueOffset, int valueSize) {
+ // skip crc, we will fill that in at the end
+ int pos = buffer.position();
+ buffer.position(pos + MAGIC_OFFSET);
+ buffer.put(CURRENT_MAGIC_VALUE);
+ byte attributes = 0;
+ if (codec.id > 0)
+ attributes = (byte) (attributes | (COMPRESSION_CODEC_MASK & codec.id));
+ buffer.put(attributes);
+ // write the key
+ if (key == null) {
+ buffer.putInt(-1);
+ } else {
+ buffer.putInt(key.length);
+ buffer.put(key, 0, key.length);
+ }
+ // write the value
+ if (value == null) {
+ buffer.putInt(-1);
+ } else {
+ int size = valueSize >= 0 ? valueSize : (value.length - valueOffset);
+ buffer.putInt(size);
+ buffer.put(value, valueOffset, size);
+ }
+
+ // now compute the checksum and fill it in
+ long crc = computeChecksum(buffer,
+ buffer.arrayOffset() + pos + MAGIC_OFFSET,
+ buffer.position() - pos - MAGIC_OFFSET - buffer.arrayOffset());
+ Utils.writeUnsignedInt(buffer, pos + CRC_OFFSET, crc);
+ }
+
+ public static void write(ByteBuffer buffer, byte[] key, byte[] value, CompressionType codec) {
+ write(buffer, key, value, codec, 0, -1);
+ }
+
+ public static int recordSize(byte[] key, byte[] value) {
+ return recordSize(key == null ? 0 : key.length, value == null ? 0 : value.length);
+ }
+
+ public static int recordSize(int keySize, int valueSize) {
+ return CRC_LENGTH + MAGIC_LENGTH + ATTRIBUTE_LENGTH + KEY_SIZE_LENGTH + keySize + VALUE_SIZE_LENGTH + valueSize;
+ }
+
+ public ByteBuffer buffer() {
+ return this.buffer;
+ }
+
+ /**
+ * Compute the checksum of the record from the record contents
+ */
+ public static long computeChecksum(ByteBuffer buffer, int position, int size) {
+ return Utils.crc32(buffer.array(), buffer.arrayOffset() + position, size - buffer.arrayOffset());
+ }
+
+ /**
+ * Compute the checksum of the record from the record contents
+ */
+ public long computeChecksum() {
+ return computeChecksum(buffer, MAGIC_OFFSET, buffer.limit() - MAGIC_OFFSET);
+ }
+
+ /**
+ * Retrieve the previously computed CRC for this record
+ */
+ public long checksum() {
+ return Utils.readUnsignedInt(buffer, CRC_OFFSET);
+ }
+
+ /**
+ * Returns true if the crc stored with the record matches the crc computed off the record contents
+ */
+ public boolean isValid() {
+ return checksum() == computeChecksum();
+ }
+
+ /**
+ * Throw an InvalidMessageException if isValid is false for this record
+ */
+ public void ensureValid() {
+ if (!isValid())
+ throw new InvalidRecordException("Record is corrupt (stored crc = " + checksum()
+ + ", computed crc = "
+ + computeChecksum()
+ + ")");
+ }
+
+ /**
+ * The complete serialized size of this record in bytes (including crc, header attributes, etc)
+ */
+ public int size() {
+ return buffer.limit();
+ }
+
+ /**
+ * The length of the key in bytes
+ */
+ public int keySize() {
+ return buffer.getInt(KEY_SIZE_OFFSET);
+ }
+
+ /**
+ * Does the record have a key?
+ */
+ public boolean hasKey() {
+ return keySize() >= 0;
+ }
+
+ /**
+ * The position where the value size is stored
+ */
+ private int valueSizeOffset() {
+ return KEY_OFFSET + Math.max(0, keySize());
+ }
+
+ /**
+ * The length of the value in bytes
+ */
+ public int valueSize() {
+ return buffer.getInt(valueSizeOffset());
+ }
+
+ /**
+ * The magic version of this record
+ */
+ public byte magic() {
+ return buffer.get(MAGIC_OFFSET);
+ }
+
+ /**
+ * The attributes stored with this record
+ */
+ public byte attributes() {
+ return buffer.get(ATTRIBUTES_OFFSET);
+ }
+
+ /**
+ * The compression codec used with this record
+ */
+ public CompressionType compressionType() {
+ return CompressionType.forId(buffer.get(ATTRIBUTES_OFFSET) & COMPRESSION_CODEC_MASK);
+ }
+
+ /**
+ * A ByteBuffer containing the value of this record
+ */
+ public ByteBuffer value() {
+ return sliceDelimited(valueSizeOffset());
+ }
+
+ /**
+ * A ByteBuffer containing the message key
+ */
+ public ByteBuffer key() {
+ return sliceDelimited(KEY_SIZE_OFFSET);
+ }
+
+ /**
+ * Read a size-delimited byte buffer starting at the given offset
+ */
+ private ByteBuffer sliceDelimited(int start) {
+ int size = buffer.getInt(start);
+ if (size < 0) {
+ return null;
+ } else {
+ ByteBuffer b = buffer.duplicate();
+ b.position(start + 4);
+ b = b.slice();
+ b.limit(size);
+ b.rewind();
+ return b;
+ }
+ }
+
+ public String toString() {
+ return String.format("Message(magic = %d, attributes = %d, crc = %d, key = %d bytes, value = %d bytes)",
+ magic(),
+ attributes(),
+ checksum(),
+ key().limit(),
+ value().limit());
+ }
+
+ public boolean equals(Object other) {
+ if (this == other)
+ return true;
+ if (other == null)
+ return false;
+ if (!other.getClass().equals(Record.class))
+ return false;
+ Record record = (Record) other;
+ return this.buffer.equals(record.buffer);
+ }
+
+ public int hashCode() {
+ return buffer.hashCode();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/record/Records.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/record/Records.java b/clients/src/main/java/kafka/common/record/Records.java
new file mode 100644
index 0000000..6531ca0
--- /dev/null
+++ b/clients/src/main/java/kafka/common/record/Records.java
@@ -0,0 +1,29 @@
+package kafka.common.record;
+
+import java.io.IOException;
+import java.nio.channels.GatheringByteChannel;
+
+/**
+ * A binary format which consists of a 4 byte size, an 8 byte offset, and the record bytes. See {@link MemoryRecords}
+ * for the in-memory representation.
+ */
+public interface Records extends Iterable<LogEntry> {
+
+ int SIZE_LENGTH = 4;
+ int OFFSET_LENGTH = 8;
+ int LOG_OVERHEAD = SIZE_LENGTH + OFFSET_LENGTH;
+
+ /**
+ * Write these records to the given channel
+ * @param channel The channel to write to
+ * @return The number of bytes written
+ * @throws IOException If the write fails.
+ */
+ public int writeTo(GatheringByteChannel channel) throws IOException;
+
+ /**
+ * The size of these records in bytes
+ */
+ public int sizeInBytes();
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/requests/RequestHeader.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/requests/RequestHeader.java b/clients/src/main/java/kafka/common/requests/RequestHeader.java
new file mode 100644
index 0000000..4ce67f8
--- /dev/null
+++ b/clients/src/main/java/kafka/common/requests/RequestHeader.java
@@ -0,0 +1,68 @@
+package kafka.common.requests;
+
+import static kafka.common.protocol.Protocol.REQUEST_HEADER;
+
+import java.nio.ByteBuffer;
+
+import kafka.common.protocol.ProtoUtils;
+import kafka.common.protocol.Protocol;
+import kafka.common.protocol.types.Field;
+import kafka.common.protocol.types.Struct;
+
+/**
+ * The header for a request in the Kafka protocol
+ */
+public class RequestHeader {
+
+ private static Field API_KEY_FIELD = REQUEST_HEADER.get("api_key");
+ private static Field API_VERSION_FIELD = REQUEST_HEADER.get("api_version");
+ private static Field CLIENT_ID_FIELD = REQUEST_HEADER.get("client_id");
+ private static Field CORRELATION_ID_FIELD = REQUEST_HEADER.get("correlation_id");
+
+ private final Struct header;
+
+ public RequestHeader(Struct header) {
+ super();
+ this.header = header;
+ }
+
+ public RequestHeader(short apiKey, String client, int correlation) {
+ this(apiKey, ProtoUtils.latestVersion(apiKey), client, correlation);
+ }
+
+ public RequestHeader(short apiKey, short version, String client, int correlation) {
+ this(new Struct(Protocol.REQUEST_HEADER));
+ this.header.set(API_KEY_FIELD, apiKey);
+ this.header.set(API_VERSION_FIELD, version);
+ this.header.set(CLIENT_ID_FIELD, client);
+ this.header.set(CORRELATION_ID_FIELD, correlation);
+ }
+
+ public short apiKey() {
+ return (Short) this.header.get(API_KEY_FIELD);
+ }
+
+ public short apiVersion() {
+ return (Short) this.header.get(API_VERSION_FIELD);
+ }
+
+ public String clientId() {
+ return (String) this.header.get(CLIENT_ID_FIELD);
+ }
+
+ public int correlationId() {
+ return (Integer) this.header.get(CORRELATION_ID_FIELD);
+ }
+
+ public static RequestHeader parse(ByteBuffer buffer) {
+ return new RequestHeader((Struct) Protocol.REQUEST_HEADER.read(buffer));
+ }
+
+ public void writeTo(ByteBuffer buffer) {
+ header.writeTo(buffer);
+ }
+
+ public int sizeOf() {
+ return header.sizeOf();
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/requests/RequestSend.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/requests/RequestSend.java b/clients/src/main/java/kafka/common/requests/RequestSend.java
new file mode 100644
index 0000000..f6a9a86
--- /dev/null
+++ b/clients/src/main/java/kafka/common/requests/RequestSend.java
@@ -0,0 +1,38 @@
+package kafka.common.requests;
+
+import java.nio.ByteBuffer;
+
+import kafka.common.network.NetworkSend;
+import kafka.common.protocol.types.Struct;
+
+/**
+ * A send object for a kafka request
+ */
+public class RequestSend extends NetworkSend {
+
+ private final RequestHeader header;
+ private final Struct body;
+
+ public RequestSend(int destination, RequestHeader header, Struct body) {
+ super(destination, serialize(header, body));
+ this.header = header;
+ this.body = body;
+ }
+
+ private static ByteBuffer serialize(RequestHeader header, Struct body) {
+ ByteBuffer buffer = ByteBuffer.allocate(header.sizeOf() + body.sizeOf());
+ header.writeTo(buffer);
+ body.writeTo(buffer);
+ buffer.rewind();
+ return buffer;
+ }
+
+ public RequestHeader header() {
+ return this.header;
+ }
+
+ public Struct body() {
+ return body;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/requests/ResponseHeader.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/requests/ResponseHeader.java b/clients/src/main/java/kafka/common/requests/ResponseHeader.java
new file mode 100644
index 0000000..1ef8e15
--- /dev/null
+++ b/clients/src/main/java/kafka/common/requests/ResponseHeader.java
@@ -0,0 +1,45 @@
+package kafka.common.requests;
+
+import static kafka.common.protocol.Protocol.RESPONSE_HEADER;
+
+import java.nio.ByteBuffer;
+
+import kafka.common.protocol.Protocol;
+import kafka.common.protocol.types.Field;
+import kafka.common.protocol.types.Struct;
+
+/**
+ * A response header in the kafka protocol.
+ */
+public class ResponseHeader {
+
+ private static Field CORRELATION_KEY_FIELD = RESPONSE_HEADER.get("correlation_id");
+
+ private final Struct header;
+
+ public ResponseHeader(Struct header) {
+ this.header = header;
+ }
+
+ public ResponseHeader(int correlationId) {
+ this(new Struct(Protocol.RESPONSE_HEADER));
+ this.header.set(CORRELATION_KEY_FIELD, correlationId);
+ }
+
+ public int correlationId() {
+ return (Integer) header.get(CORRELATION_KEY_FIELD);
+ }
+
+ public void writeTo(ByteBuffer buffer) {
+ header.writeTo(buffer);
+ }
+
+ public int sizeOf() {
+ return header.sizeOf();
+ }
+
+ public static ResponseHeader parse(ByteBuffer buffer) {
+ return new ResponseHeader(((Struct) Protocol.RESPONSE_HEADER.read(buffer)));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/utils/AbstractIterator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/utils/AbstractIterator.java b/clients/src/main/java/kafka/common/utils/AbstractIterator.java
new file mode 100644
index 0000000..f3190d7
--- /dev/null
+++ b/clients/src/main/java/kafka/common/utils/AbstractIterator.java
@@ -0,0 +1,72 @@
+package kafka.common.utils;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * A base class that simplifies implementing an iterator
+ * @param <T> The type of thing we are iterating over
+ */
+public abstract class AbstractIterator<T> implements Iterator<T> {
+
+ private static enum State {
+ READY, NOT_READY, DONE, FAILED
+ };
+
+ private State state = State.NOT_READY;
+ private T next;
+
+ @Override
+ public boolean hasNext() {
+ switch (state) {
+ case FAILED:
+ throw new IllegalStateException("Iterator is in failed state");
+ case DONE:
+ return false;
+ case READY:
+ return true;
+ default:
+ return maybeComputeNext();
+ }
+ }
+
+ @Override
+ public T next() {
+ if (!hasNext())
+ throw new NoSuchElementException();
+ state = State.NOT_READY;
+ if (next == null)
+ throw new IllegalStateException("Expected item but none found.");
+ return next;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("Removal not supported");
+ }
+
+ public T peek() {
+ if (!hasNext())
+ throw new NoSuchElementException();
+ return next;
+ }
+
+ protected T allDone() {
+ state = State.DONE;
+ return null;
+ }
+
+ protected abstract T makeNext();
+
+ private Boolean maybeComputeNext() {
+ state = State.FAILED;
+ next = makeNext();
+ if (state == State.DONE) {
+ return false;
+ } else {
+ state = State.READY;
+ return true;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/utils/CopyOnWriteMap.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/utils/CopyOnWriteMap.java b/clients/src/main/java/kafka/common/utils/CopyOnWriteMap.java
new file mode 100644
index 0000000..e45df98
--- /dev/null
+++ b/clients/src/main/java/kafka/common/utils/CopyOnWriteMap.java
@@ -0,0 +1,130 @@
+package kafka.common.utils;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * A simple read-optimized map implementation that synchronizes only writes and does a fully copy on each modification
+ */
+public class CopyOnWriteMap<K, V> implements ConcurrentMap<K, V> {
+
+ private volatile Map<K, V> map;
+
+ public CopyOnWriteMap() {
+ this.map = Collections.emptyMap();
+ }
+
+ public CopyOnWriteMap(Map<K, V> map) {
+ this.map = Collections.unmodifiableMap(map);
+ }
+
+ @Override
+ public boolean containsKey(Object k) {
+ return map.containsKey(k);
+ }
+
+ @Override
+ public boolean containsValue(Object v) {
+ return map.containsValue(v);
+ }
+
+ @Override
+ public Set<java.util.Map.Entry<K, V>> entrySet() {
+ return map.entrySet();
+ }
+
+ @Override
+ public V get(Object k) {
+ return map.get(k);
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return map.isEmpty();
+ }
+
+ @Override
+ public Set<K> keySet() {
+ return map.keySet();
+ }
+
+ @Override
+ public int size() {
+ return map.size();
+ }
+
+ @Override
+ public Collection<V> values() {
+ return map.values();
+ }
+
+ @Override
+ public synchronized void clear() {
+ this.map = Collections.emptyMap();
+ }
+
+ @Override
+ public synchronized V put(K k, V v) {
+ Map<K, V> copy = new HashMap<K, V>(this.map);
+ V prev = copy.put(k, v);
+ this.map = Collections.unmodifiableMap(copy);
+ return prev;
+ }
+
+ @Override
+ public synchronized void putAll(Map<? extends K, ? extends V> entries) {
+ Map<K, V> copy = new HashMap<K, V>(this.map);
+ copy.putAll(entries);
+ this.map = Collections.unmodifiableMap(copy);
+ }
+
+ @Override
+ public synchronized V remove(Object key) {
+ Map<K, V> copy = new HashMap<K, V>(this.map);
+ V prev = copy.remove(key);
+ this.map = Collections.unmodifiableMap(copy);
+ return prev;
+ }
+
+ @Override
+ public synchronized V putIfAbsent(K k, V v) {
+ if (!containsKey(k))
+ return put(k, v);
+ else
+ return get(k);
+ }
+
+ @Override
+ public synchronized boolean remove(Object k, Object v) {
+ if (containsKey(k) && get(k).equals(v)) {
+ remove(k);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public synchronized boolean replace(K k, V original, V replacement) {
+ if (containsKey(k) && get(k).equals(original)) {
+ put(k, replacement);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public synchronized V replace(K k, V v) {
+ if (containsKey(k)) {
+ return put(k, v);
+ } else {
+ return null;
+ }
+ }
+
+}