You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2014/01/28 20:16:30 UTC

[4/7] KAFKA-1227 New producer!

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/protocol/types/Field.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/protocol/types/Field.java b/clients/src/main/java/kafka/common/protocol/types/Field.java
new file mode 100644
index 0000000..d018a12
--- /dev/null
+++ b/clients/src/main/java/kafka/common/protocol/types/Field.java
@@ -0,0 +1,48 @@
+package kafka.common.protocol.types;
+
+/**
+ * A field in a schema
+ */
+public class Field {
+
+    public static final Object NO_DEFAULT = new Object();
+
+    final int index;
+    public final String name;
+    public final Type type;
+    public final Object defaultValue;
+    public final String doc;
+    final Schema schema;
+
+    public Field(int index, String name, Type type, String doc, Object defaultValue, Schema schema) {
+        this.index = index;
+        this.name = name;
+        this.type = type;
+        this.doc = doc;
+        this.defaultValue = defaultValue;
+        this.schema = schema;
+        if (defaultValue != NO_DEFAULT)
+            type.validate(defaultValue);
+    }
+
+    public Field(int index, String name, Type type, String doc, Object defaultValue) {
+        this(index, name, type, doc, defaultValue, null);
+    }
+
+    public Field(String name, Type type, String doc, Object defaultValue) {
+        this(-1, name, type, doc, defaultValue);
+    }
+
+    public Field(String name, Type type, String doc) {
+        this(name, type, doc, NO_DEFAULT);
+    }
+
+    public Field(String name, Type type) {
+        this(name, type, "");
+    }
+
+    public Type type() {
+        return type;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/protocol/types/Schema.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/protocol/types/Schema.java b/clients/src/main/java/kafka/common/protocol/types/Schema.java
new file mode 100644
index 0000000..b7b1c75
--- /dev/null
+++ b/clients/src/main/java/kafka/common/protocol/types/Schema.java
@@ -0,0 +1,134 @@
+package kafka.common.protocol.types;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * The schema for a compound record definition
+ */
+public class Schema extends Type {
+
+    private final Field[] fields;
+    private final Map<String, Field> fieldsByName;
+
+    public Schema(Field... fs) {
+        this.fields = new Field[fs.length];
+        this.fieldsByName = new HashMap<String, Field>();
+        for (int i = 0; i < this.fields.length; i++) {
+            Field field = fs[i];
+            if (fieldsByName.containsKey(field.name))
+                throw new SchemaException("Schema contains a duplicate field: " + field.name);
+            this.fields[i] = new Field(i, field.name, field.type, field.doc, field.defaultValue, this);
+            this.fieldsByName.put(fs[i].name, this.fields[i]);
+        }
+    }
+
+    /**
+     * Write a struct to the buffer
+     */
+    public void write(ByteBuffer buffer, Object o) {
+        Struct r = (Struct) o;
+        for (int i = 0; i < fields.length; i++) {
+            Field f = fields[i];
+            try {
+                Object value = f.type().validate(r.get(f));
+                f.type.write(buffer, value);
+            } catch (Exception e) {
+                throw new SchemaException("Error writing field '" + f.name + "': " + e.getMessage() == null ? e.getMessage() : e.getClass()
+                                                                                                                                .getName());
+            }
+        }
+    }
+
+    /**
+     * Read a struct from the buffer
+     */
+    public Object read(ByteBuffer buffer) {
+        Object[] objects = new Object[fields.length];
+        for (int i = 0; i < fields.length; i++)
+            objects[i] = fields[i].type.read(buffer);
+        return new Struct(this, objects);
+    }
+
+    /**
+     * The size of the given record
+     */
+    public int sizeOf(Object o) {
+        int size = 0;
+        Struct r = (Struct) o;
+        for (int i = 0; i < fields.length; i++)
+            size += fields[i].type.sizeOf(r.get(fields[i]));
+        return size;
+    }
+
+    /**
+     * The number of fields in this schema
+     */
+    public int numFields() {
+        return this.fields.length;
+    }
+
+    /**
+     * Get a field by its slot in the record array
+     * 
+     * @param slot The slot at which this field sits
+     * @return The field
+     */
+    public Field get(int slot) {
+        return this.fields[slot];
+    }
+
+    /**
+     * Get a field by its name
+     * 
+     * @param name The name of the field
+     * @return The field
+     */
+    public Field get(String name) {
+        return this.fieldsByName.get(name);
+    }
+
+    /**
+     * Get all the fields in this schema
+     */
+    public Field[] fields() {
+        return this.fields;
+    }
+
+    /**
+     * Display a string representation of the schema
+     */
+    public String toString() {
+        StringBuilder b = new StringBuilder();
+        b.append('{');
+        for (int i = 0; i < this.fields.length; i++) {
+            b.append(this.fields[i].name);
+            b.append(':');
+            b.append(this.fields[i].type());
+            if (i < this.fields.length - 1)
+                b.append(',');
+        }
+        b.append("}");
+        return b.toString();
+    }
+
+    @Override
+    public Struct validate(Object item) {
+        try {
+            Struct struct = (Struct) item;
+            for (int i = 0; i < this.fields.length; i++) {
+                Field field = this.fields[i];
+                try {
+                    field.type.validate(struct.get(field));
+                } catch (SchemaException e) {
+                    throw new SchemaException("Invalid value for field '" + field.name + "': " + e.getMessage());
+                }
+            }
+            return struct;
+        } catch (ClassCastException e) {
+            throw new SchemaException("Not a Struct.");
+        }
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/protocol/types/SchemaException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/protocol/types/SchemaException.java b/clients/src/main/java/kafka/common/protocol/types/SchemaException.java
new file mode 100644
index 0000000..a2a2d50
--- /dev/null
+++ b/clients/src/main/java/kafka/common/protocol/types/SchemaException.java
@@ -0,0 +1,13 @@
+package kafka.common.protocol.types;
+
+import kafka.common.KafkaException;
+
+public class SchemaException extends KafkaException {
+
+    private static final long serialVersionUID = 1L;
+
+    public SchemaException(String message) {
+        super(message);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/protocol/types/Struct.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/protocol/types/Struct.java b/clients/src/main/java/kafka/common/protocol/types/Struct.java
new file mode 100644
index 0000000..c83aefa
--- /dev/null
+++ b/clients/src/main/java/kafka/common/protocol/types/Struct.java
@@ -0,0 +1,227 @@
+package kafka.common.protocol.types;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+/**
+ * A record that can be serialized and deserialized according to a pre-defined schema
+ */
+public class Struct {
+    private final Schema schema;
+    private final Object[] values;
+
+    Struct(Schema schema, Object[] values) {
+        this.schema = schema;
+        this.values = values;
+    }
+
+    public Struct(Schema schema) {
+        this.schema = schema;
+        this.values = new Object[this.schema.numFields()];
+    }
+
+    /**
+     * The schema for this struct.
+     */
+    public Schema schema() {
+        return this.schema;
+    }
+
+    /**
+     * Return the value of the given pre-validated field, or if the value is missing return the default value.
+     * 
+     * @param field The field for which to get the default value
+     * @throws SchemaException if the field has no value and has no default.
+     */
+    private Object getFieldOrDefault(Field field) {
+        Object value = this.values[field.index];
+        if (value != null)
+            return value;
+        else if (field.defaultValue != Field.NO_DEFAULT)
+            return field.defaultValue;
+        else
+            throw new SchemaException("Missing value for field '" + field.name + " which has no default value.");
+    }
+
+    /**
+     * Get the value for the field directly by the field index with no lookup needed (faster!)
+     * 
+     * @param field The field to look up
+     * @return The value for that field.
+     */
+    public Object get(Field field) {
+        validateField(field);
+        return getFieldOrDefault(field);
+    }
+
+    /**
+     * Get the record value for the field with the given name by doing a hash table lookup (slower!)
+     * 
+     * @param name The name of the field
+     * @return The value in the record
+     */
+    public Object get(String name) {
+        Field field = schema.get(name);
+        if (field == null)
+            throw new SchemaException("No such field: " + name);
+        return getFieldOrDefault(field);
+    }
+
+    public Struct getStruct(Field field) {
+        return (Struct) get(field);
+    }
+
+    public Struct getStruct(String name) {
+        return (Struct) get(name);
+    }
+
+    public Short getShort(Field field) {
+        return (Short) get(field);
+    }
+
+    public Short getShort(String name) {
+        return (Short) get(name);
+    }
+
+    public Integer getInt(Field field) {
+        return (Integer) get(field);
+    }
+
+    public Integer getInt(String name) {
+        return (Integer) get(name);
+    }
+
+    public Object[] getArray(Field field) {
+        return (Object[]) get(field);
+    }
+
+    public Object[] getArray(String name) {
+        return (Object[]) get(name);
+    }
+
+    public String getString(Field field) {
+        return (String) get(field);
+    }
+
+    public String getString(String name) {
+        return (String) get(name);
+    }
+
+    /**
+     * Set the given field to the specified value
+     * 
+     * @param field The field
+     * @param value The value
+     */
+    public Struct set(Field field, Object value) {
+        validateField(field);
+        this.values[field.index] = value;
+        return this;
+    }
+
+    /**
+     * Set the field specified by the given name to the value
+     * 
+     * @param name The name of the field
+     * @param value The value to set
+     */
+    public Struct set(String name, Object value) {
+        Field field = this.schema.get(name);
+        if (field == null)
+            throw new SchemaException("Unknown field: " + name);
+        this.values[field.index] = value;
+        return this;
+    }
+
+    /**
+     * Create a struct for the schema of a container type (struct or array)
+     * 
+     * @param field The field to create an instance of
+     * @return The struct
+     */
+    public Struct instance(Field field) {
+        validateField(field);
+        if (field.type() instanceof Schema) {
+            return new Struct((Schema) field.type());
+        } else if (field.type() instanceof ArrayOf) {
+            ArrayOf array = (ArrayOf) field.type();
+            return new Struct((Schema) array.type());
+        } else {
+            throw new SchemaException("Field " + field.name + " is not a container type, it is of type " + field.type());
+        }
+    }
+
+    /**
+     * Create a struct instance for the given field which must be a container type (struct or array)
+     * 
+     * @param field The name of the field to create (field must be a schema type)
+     * @return The struct
+     */
+    public Struct instance(String field) {
+        return instance(schema.get(field));
+    }
+
+    /**
+     * Empty all the values from this record
+     */
+    public void clear() {
+        Arrays.fill(this.values, null);
+    }
+
+    /**
+     * Get the serialized size of this object
+     */
+    public int sizeOf() {
+        return this.schema.sizeOf(this);
+    }
+
+    /**
+     * Write this struct to a buffer
+     */
+    public void writeTo(ByteBuffer buffer) {
+        this.schema.write(buffer, this);
+    }
+
+    /**
+     * Ensure the user doesn't try to access fields from the wrong schema
+     */
+    private void validateField(Field field) {
+        if (this.schema != field.schema)
+            throw new SchemaException("Attempt to access field '" + field.name + " from a different schema instance.");
+        if (field.index > values.length)
+            throw new SchemaException("Invalid field index: " + field.index);
+    }
+
+    /**
+     * Validate the contents of this struct against its schema
+     */
+    public void validate() {
+        this.schema.validate(this);
+    }
+
+    /**
+     * Create a byte buffer containing the serialized form of the values in this struct. This method can choose to break
+     * the struct into multiple ByteBuffers if need be.
+     */
+    public ByteBuffer[] toBytes() {
+        ByteBuffer buffer = ByteBuffer.allocate(sizeOf());
+        writeTo(buffer);
+        return new ByteBuffer[] { buffer };
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder b = new StringBuilder();
+        b.append('{');
+        for (int i = 0; i < this.values.length; i++) {
+            b.append(this.schema.get(i).name);
+            b.append('=');
+            b.append(this.values[i]);
+            if (i < this.values.length - 1)
+                b.append(',');
+        }
+        b.append('}');
+        return b.toString();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/protocol/types/Type.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/protocol/types/Type.java b/clients/src/main/java/kafka/common/protocol/types/Type.java
new file mode 100644
index 0000000..f4c93e3
--- /dev/null
+++ b/clients/src/main/java/kafka/common/protocol/types/Type.java
@@ -0,0 +1,216 @@
+package kafka.common.protocol.types;
+
+import java.nio.ByteBuffer;
+
+import kafka.common.utils.Utils;
+
+/**
+ * A serializable type
+ */
+public abstract class Type {
+
+    public abstract void write(ByteBuffer buffer, Object o);
+
+    public abstract Object read(ByteBuffer buffer);
+
+    public abstract int sizeOf(Object o);
+
+    public abstract Object validate(Object o);
+
+    public static final Type INT8 = new Type() {
+        @Override
+        public void write(ByteBuffer buffer, Object o) {
+            buffer.put((Byte) o);
+        }
+
+        @Override
+        public Object read(ByteBuffer buffer) {
+            return buffer.get();
+        }
+
+        @Override
+        public int sizeOf(Object o) {
+            return 1;
+        }
+
+        @Override
+        public String toString() {
+            return "INT8";
+        }
+
+        @Override
+        public Byte validate(Object item) {
+            if (item instanceof Byte)
+                return (Byte) item;
+            else
+                throw new SchemaException(item + " is not a Byte.");
+        }
+    };
+
+    public static final Type INT16 = new Type() {
+        @Override
+        public void write(ByteBuffer buffer, Object o) {
+            buffer.putShort((Short) o);
+        }
+
+        @Override
+        public Object read(ByteBuffer buffer) {
+            return buffer.getShort();
+        }
+
+        @Override
+        public int sizeOf(Object o) {
+            return 2;
+        }
+
+        @Override
+        public String toString() {
+            return "INT16";
+        }
+
+        @Override
+        public Short validate(Object item) {
+            if (item instanceof Short)
+                return (Short) item;
+            else
+                throw new SchemaException(item + " is not a Short.");
+        }
+    };
+
+    public static final Type INT32 = new Type() {
+        @Override
+        public void write(ByteBuffer buffer, Object o) {
+            buffer.putInt((Integer) o);
+        }
+
+        @Override
+        public Object read(ByteBuffer buffer) {
+            return buffer.getInt();
+        }
+
+        @Override
+        public int sizeOf(Object o) {
+            return 4;
+        }
+
+        @Override
+        public String toString() {
+            return "INT32";
+        }
+
+        @Override
+        public Integer validate(Object item) {
+            if (item instanceof Integer)
+                return (Integer) item;
+            else
+                throw new SchemaException(item + " is not an Integer.");
+        }
+    };
+
+    public static final Type INT64 = new Type() {
+        @Override
+        public void write(ByteBuffer buffer, Object o) {
+            buffer.putLong((Long) o);
+        }
+
+        @Override
+        public Object read(ByteBuffer buffer) {
+            return buffer.getLong();
+        }
+
+        @Override
+        public int sizeOf(Object o) {
+            return 8;
+        }
+
+        @Override
+        public String toString() {
+            return "INT64";
+        }
+
+        @Override
+        public Long validate(Object item) {
+            if (item instanceof Long)
+                return (Long) item;
+            else
+                throw new SchemaException(item + " is not a Long.");
+        }
+    };
+
+    public static final Type STRING = new Type() {
+        @Override
+        public void write(ByteBuffer buffer, Object o) {
+            byte[] bytes = Utils.utf8((String) o);
+            if (bytes.length > Short.MAX_VALUE)
+                throw new SchemaException("String is longer than the maximum string length.");
+            buffer.putShort((short) bytes.length);
+            buffer.put(bytes);
+        }
+
+        @Override
+        public Object read(ByteBuffer buffer) {
+            int length = buffer.getShort();
+            byte[] bytes = new byte[length];
+            buffer.get(bytes);
+            return Utils.utf8(bytes);
+        }
+
+        @Override
+        public int sizeOf(Object o) {
+            return 2 + Utils.utf8Length((String) o);
+        }
+
+        @Override
+        public String toString() {
+            return "STRING";
+        }
+
+        @Override
+        public String validate(Object item) {
+            if (item instanceof String)
+                return (String) item;
+            else
+                throw new SchemaException(item + " is not a String.");
+        }
+    };
+
+    public static final Type BYTES = new Type() {
+        @Override
+        public void write(ByteBuffer buffer, Object o) {
+            ByteBuffer arg = (ByteBuffer) o;
+            int pos = arg.position();
+            buffer.putInt(arg.remaining());
+            buffer.put(arg);
+            arg.position(pos);
+        }
+
+        @Override
+        public Object read(ByteBuffer buffer) {
+            int size = buffer.getInt();
+            ByteBuffer val = buffer.slice();
+            val.limit(size);
+            buffer.position(buffer.position() + size);
+            return val;
+        }
+
+        @Override
+        public int sizeOf(Object o) {
+            ByteBuffer buffer = (ByteBuffer) o;
+            return 4 + buffer.remaining();
+        }
+
+        @Override
+        public String toString() {
+            return "BYTES";
+        }
+
+        @Override
+        public ByteBuffer validate(Object item) {
+            if (item instanceof ByteBuffer)
+                return (ByteBuffer) item;
+            else
+                throw new SchemaException(item + " is not a java.nio.ByteBuffer.");
+        }
+    };
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/record/CompressionType.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/record/CompressionType.java b/clients/src/main/java/kafka/common/record/CompressionType.java
new file mode 100644
index 0000000..f6d9026
--- /dev/null
+++ b/clients/src/main/java/kafka/common/record/CompressionType.java
@@ -0,0 +1,40 @@
+package kafka.common.record;
+
+/**
+ * The compression type to use
+ */
+public enum CompressionType {
+    NONE(0, "none"), GZIP(1, "gzip"), SNAPPY(2, "snappy");
+
+    public final int id;
+    public final String name;
+
+    private CompressionType(int id, String name) {
+        this.id = id;
+        this.name = name;
+    }
+
+    public static CompressionType forId(int id) {
+        switch (id) {
+            case 0:
+                return NONE;
+            case 1:
+                return GZIP;
+            case 2:
+                return SNAPPY;
+            default:
+                throw new IllegalArgumentException("Unknown compression type id: " + id);
+        }
+    }
+
+    public static CompressionType forName(String name) {
+        if (NONE.name.equals(name))
+            return NONE;
+        else if (GZIP.name.equals(name))
+            return GZIP;
+        else if (SNAPPY.name.equals(name))
+            return SNAPPY;
+        else
+            throw new IllegalArgumentException("Unknown compression name: " + name);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/record/InvalidRecordException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/record/InvalidRecordException.java b/clients/src/main/java/kafka/common/record/InvalidRecordException.java
new file mode 100644
index 0000000..97fbe50
--- /dev/null
+++ b/clients/src/main/java/kafka/common/record/InvalidRecordException.java
@@ -0,0 +1,11 @@
+package kafka.common.record;
+
+public class InvalidRecordException extends RuntimeException {
+
+    private static final long serialVersionUID = 1;
+
+    public InvalidRecordException(String s) {
+        super(s);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/record/LogEntry.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/record/LogEntry.java b/clients/src/main/java/kafka/common/record/LogEntry.java
new file mode 100644
index 0000000..f5e99c9
--- /dev/null
+++ b/clients/src/main/java/kafka/common/record/LogEntry.java
@@ -0,0 +1,28 @@
+package kafka.common.record;
+
+/**
+ * An offset and record pair
+ */
+public final class LogEntry {
+
+    private final long offset;
+    private final Record record;
+
+    public LogEntry(long offset, Record record) {
+        this.offset = offset;
+        this.record = record;
+    }
+
+    public long offset() {
+        return this.offset;
+    }
+
+    public Record record() {
+        return this.record;
+    }
+
+    @Override
+    public String toString() {
+        return "LogEntry(" + offset + ", " + record + ")";
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/record/MemoryRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/record/MemoryRecords.java b/clients/src/main/java/kafka/common/record/MemoryRecords.java
new file mode 100644
index 0000000..ec98226
--- /dev/null
+++ b/clients/src/main/java/kafka/common/record/MemoryRecords.java
@@ -0,0 +1,102 @@
+package kafka.common.record;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.GatheringByteChannel;
+import java.util.Iterator;
+
+import kafka.common.utils.AbstractIterator;
+
+/**
+ * A {@link Records} implementation backed by a ByteBuffer.
+ */
+public class MemoryRecords implements Records {
+
+    private final ByteBuffer buffer;
+
+    public MemoryRecords(int size) {
+        this(ByteBuffer.allocate(size));
+    }
+
+    public MemoryRecords(ByteBuffer buffer) {
+        this.buffer = buffer;
+    }
+
+    /**
+     * Append the given record and offset to the buffer
+     */
+    public void append(long offset, Record record) {
+        buffer.putLong(offset);
+        buffer.putInt(record.size());
+        buffer.put(record.buffer());
+        record.buffer().rewind();
+    }
+
+    /**
+     * Append a new record and offset to the buffer
+     */
+    public void append(long offset, byte[] key, byte[] value, CompressionType type) {
+        buffer.putLong(offset);
+        buffer.putInt(Record.recordSize(key, value));
+        Record.write(this.buffer, key, value, type);
+    }
+
+    /**
+     * Check if we have room for a new record containing the given key/value pair
+     */
+    public boolean hasRoomFor(byte[] key, byte[] value) {
+        return this.buffer.remaining() >= Records.LOG_OVERHEAD + Record.recordSize(key, value);
+    }
+
+    /** Write the messages in this set to the given channel */
+    public int writeTo(GatheringByteChannel channel) throws IOException {
+        return channel.write(buffer);
+    }
+
+    /**
+     * The size of this record set
+     */
+    public int sizeInBytes() {
+        return this.buffer.position();
+    }
+
+    /**
+     * Get the byte buffer that backs this records instance
+     */
+    public ByteBuffer buffer() {
+        return buffer.duplicate();
+    }
+
+    @Override
+    public Iterator<LogEntry> iterator() {
+        return new RecordsIterator(this.buffer);
+    }
+
+    /* TODO: allow reuse of the buffer used for iteration */
+    public static class RecordsIterator extends AbstractIterator<LogEntry> {
+        private final ByteBuffer buffer;
+
+        public RecordsIterator(ByteBuffer buffer) {
+            ByteBuffer copy = buffer.duplicate();
+            copy.flip();
+            this.buffer = copy;
+        }
+
+        @Override
+        protected LogEntry makeNext() {
+            if (buffer.remaining() < Records.LOG_OVERHEAD)
+                return allDone();
+            long offset = buffer.getLong();
+            int size = buffer.getInt();
+            if (size < 0)
+                throw new IllegalStateException("Message with size " + size);
+            if (buffer.remaining() < size)
+                return allDone();
+            ByteBuffer rec = buffer.slice();
+            rec.limit(size);
+            this.buffer.position(this.buffer.position() + size);
+            return new LogEntry(offset, new Record(rec));
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/record/Record.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/record/Record.java b/clients/src/main/java/kafka/common/record/Record.java
new file mode 100644
index 0000000..835a0a4
--- /dev/null
+++ b/clients/src/main/java/kafka/common/record/Record.java
@@ -0,0 +1,286 @@
+package kafka.common.record;
+
+import java.nio.ByteBuffer;
+
+import kafka.common.utils.Utils;
+
+/**
+ * A record: a serialized key and value along with the associated CRC and other fields
+ */
+public final class Record {
+
+    /**
+     * The current offset and size for all the fixed-length fields
+     */
+    public static final int CRC_OFFSET = 0;
+    public static final int CRC_LENGTH = 4;
+    public static final int MAGIC_OFFSET = CRC_OFFSET + CRC_LENGTH;
+    public static final int MAGIC_LENGTH = 1;
+    public static final int ATTRIBUTES_OFFSET = MAGIC_OFFSET + MAGIC_LENGTH;
+    public static final int ATTRIBUTE_LENGTH = 1;
+    public static final int KEY_SIZE_OFFSET = ATTRIBUTES_OFFSET + ATTRIBUTE_LENGTH;
+    public static final int KEY_SIZE_LENGTH = 4;
+    public static final int KEY_OFFSET = KEY_SIZE_OFFSET + KEY_SIZE_LENGTH;
+    public static final int VALUE_SIZE_LENGTH = 4;
+
+    /** The amount of overhead bytes in a record */
+    public static final int RECORD_OVERHEAD = KEY_OFFSET + VALUE_SIZE_LENGTH;
+
+    /**
+     * The minimum valid size for the record header
+     */
+    public static final int MIN_HEADER_SIZE = CRC_LENGTH + MAGIC_LENGTH + ATTRIBUTE_LENGTH + KEY_SIZE_LENGTH + VALUE_SIZE_LENGTH;
+
+    /**
+     * The current "magic" value
+     */
+    public static final byte CURRENT_MAGIC_VALUE = 0;
+
+    /**
+     * Specifies the mask for the compression code. 2 bits to hold the compression codec. 0 is reserved to indicate no
+     * compression
+     */
+    public static final int COMPRESSION_CODEC_MASK = 0x03;
+
+    /**
+     * Compression code for uncompressed records
+     */
+    public static final int NO_COMPRESSION = 0;
+
+    private final ByteBuffer buffer;
+
+    public Record(ByteBuffer buffer) {
+        this.buffer = buffer;
+    }
+
+    /**
+     * A constructor to create a LogRecord
+     * 
+     * @param key The key of the record (null, if none)
+     * @param value The record value
+     * @param codec The compression codec used on the contents of the record (if any)
+     * @param valueOffset The offset into the payload array used to extract payload
+     * @param valueSize The size of the payload to use
+     */
+    public Record(byte[] key, byte[] value, CompressionType codec, int valueOffset, int valueSize) {
+        this(ByteBuffer.allocate(recordSize(key == null ? 0 : key.length, value == null ? 0 : valueSize >= 0 ? valueSize
+                                                                                                            : value.length - valueOffset)));
+        write(this.buffer, key, value, codec, valueOffset, valueSize);
+        this.buffer.rewind();
+    }
+
+    public Record(byte[] key, byte[] value, CompressionType codec) {
+        this(key, value, codec, 0, -1);
+    }
+
+    public Record(byte[] value, CompressionType codec) {
+        this(null, value, codec);
+    }
+
+    public Record(byte[] key, byte[] value) {
+        this(key, value, CompressionType.NONE);
+    }
+
+    public Record(byte[] value) {
+        this(null, value, CompressionType.NONE);
+    }
+
+    public static void write(ByteBuffer buffer, byte[] key, byte[] value, CompressionType codec, int valueOffset, int valueSize) {
+        // skip crc, we will fill that in at the end
+        int pos = buffer.position();
+        buffer.position(pos + MAGIC_OFFSET);
+        buffer.put(CURRENT_MAGIC_VALUE);
+        byte attributes = 0;
+        if (codec.id > 0)
+            attributes = (byte) (attributes | (COMPRESSION_CODEC_MASK & codec.id));
+        buffer.put(attributes);
+        // write the key
+        if (key == null) {
+            buffer.putInt(-1);
+        } else {
+            buffer.putInt(key.length);
+            buffer.put(key, 0, key.length);
+        }
+        // write the value
+        if (value == null) {
+            buffer.putInt(-1);
+        } else {
+            int size = valueSize >= 0 ? valueSize : (value.length - valueOffset);
+            buffer.putInt(size);
+            buffer.put(value, valueOffset, size);
+        }
+
+        // now compute the checksum and fill it in
+        long crc = computeChecksum(buffer,
+                                   buffer.arrayOffset() + pos + MAGIC_OFFSET,
+                                   buffer.position() - pos - MAGIC_OFFSET - buffer.arrayOffset());
+        Utils.writeUnsignedInt(buffer, pos + CRC_OFFSET, crc);
+    }
+
+    public static void write(ByteBuffer buffer, byte[] key, byte[] value, CompressionType codec) {
+        write(buffer, key, value, codec, 0, -1);
+    }
+
+    public static int recordSize(byte[] key, byte[] value) {
+        return recordSize(key == null ? 0 : key.length, value == null ? 0 : value.length);
+    }
+
+    public static int recordSize(int keySize, int valueSize) {
+        return CRC_LENGTH + MAGIC_LENGTH + ATTRIBUTE_LENGTH + KEY_SIZE_LENGTH + keySize + VALUE_SIZE_LENGTH + valueSize;
+    }
+
+    public ByteBuffer buffer() {
+        return this.buffer;
+    }
+
+    /**
+     * Compute the checksum of the record from the record contents
+     */
+    public static long computeChecksum(ByteBuffer buffer, int position, int size) {
+        return Utils.crc32(buffer.array(), buffer.arrayOffset() + position, size - buffer.arrayOffset());
+    }
+
+    /**
+     * Compute the checksum of the record from the record contents
+     */
+    public long computeChecksum() {
+        return computeChecksum(buffer, MAGIC_OFFSET, buffer.limit() - MAGIC_OFFSET);
+    }
+
+    /**
+     * Retrieve the previously computed CRC for this record
+     */
+    public long checksum() {
+        return Utils.readUnsignedInt(buffer, CRC_OFFSET);
+    }
+
+    /**
+     * Returns true if the crc stored with the record matches the crc computed off the record contents
+     */
+    public boolean isValid() {
+        return checksum() == computeChecksum();
+    }
+
+    /**
+     * Throw an InvalidMessageException if isValid is false for this record
+     */
+    public void ensureValid() {
+        if (!isValid())
+            throw new InvalidRecordException("Record is corrupt (stored crc = " + checksum()
+                                             + ", computed crc = "
+                                             + computeChecksum()
+                                             + ")");
+    }
+
+    /**
+     * The complete serialized size of this record in bytes (including crc, header attributes, etc)
+     */
+    public int size() {
+        return buffer.limit();
+    }
+
+    /**
+     * The length of the key in bytes
+     */
+    public int keySize() {
+        return buffer.getInt(KEY_SIZE_OFFSET);
+    }
+
+    /**
+     * Does the record have a key?
+     */
+    public boolean hasKey() {
+        return keySize() >= 0;
+    }
+
+    /**
+     * The position where the value size is stored
+     */
+    private int valueSizeOffset() {
+        return KEY_OFFSET + Math.max(0, keySize());
+    }
+
+    /**
+     * The length of the value in bytes
+     */
+    public int valueSize() {
+        return buffer.getInt(valueSizeOffset());
+    }
+
+    /**
+     * The magic version of this record
+     */
+    public byte magic() {
+        return buffer.get(MAGIC_OFFSET);
+    }
+
+    /**
+     * The attributes stored with this record
+     */
+    public byte attributes() {
+        return buffer.get(ATTRIBUTES_OFFSET);
+    }
+
+    /**
+     * The compression codec used with this record
+     */
+    public CompressionType compressionType() {
+        return CompressionType.forId(buffer.get(ATTRIBUTES_OFFSET) & COMPRESSION_CODEC_MASK);
+    }
+
+    /**
+     * A ByteBuffer containing the value of this record
+     */
+    public ByteBuffer value() {
+        return sliceDelimited(valueSizeOffset());
+    }
+
+    /**
+     * A ByteBuffer containing the message key
+     */
+    public ByteBuffer key() {
+        return sliceDelimited(KEY_SIZE_OFFSET);
+    }
+
+    /**
+     * Read a size-delimited byte buffer starting at the given offset
+     */
+    private ByteBuffer sliceDelimited(int start) {
+        int size = buffer.getInt(start);
+        if (size < 0) {
+            return null;
+        } else {
+            ByteBuffer b = buffer.duplicate();
+            b.position(start + 4);
+            b = b.slice();
+            b.limit(size);
+            b.rewind();
+            return b;
+        }
+    }
+
+    public String toString() {
+        return String.format("Message(magic = %d, attributes = %d, crc = %d, key = %d bytes, value = %d bytes)",
+                             magic(),
+                             attributes(),
+                             checksum(),
+                             key().limit(),
+                             value().limit());
+    }
+
+    public boolean equals(Object other) {
+        if (this == other)
+            return true;
+        if (other == null)
+            return false;
+        if (!other.getClass().equals(Record.class))
+            return false;
+        Record record = (Record) other;
+        return this.buffer.equals(record.buffer);
+    }
+
+    public int hashCode() {
+        return buffer.hashCode();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/record/Records.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/record/Records.java b/clients/src/main/java/kafka/common/record/Records.java
new file mode 100644
index 0000000..6531ca0
--- /dev/null
+++ b/clients/src/main/java/kafka/common/record/Records.java
@@ -0,0 +1,29 @@
+package kafka.common.record;
+
+import java.io.IOException;
+import java.nio.channels.GatheringByteChannel;
+
+/**
+ * A binary format which consists of a 4 byte size, an 8 byte offset, and the record bytes. See {@link MemoryRecords}
+ * for the in-memory representation.
+ */
+public interface Records extends Iterable<LogEntry> {
+
+    int SIZE_LENGTH = 4;
+    int OFFSET_LENGTH = 8;
+    int LOG_OVERHEAD = SIZE_LENGTH + OFFSET_LENGTH;
+
+    /**
+     * Write these records to the given channel
+     * @param channel The channel to write to
+     * @return The number of bytes written
+     * @throws IOException If the write fails.
+     */
+    public int writeTo(GatheringByteChannel channel) throws IOException;
+
+    /**
+     * The size of these records in bytes
+     */
+    public int sizeInBytes();
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/requests/RequestHeader.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/requests/RequestHeader.java b/clients/src/main/java/kafka/common/requests/RequestHeader.java
new file mode 100644
index 0000000..4ce67f8
--- /dev/null
+++ b/clients/src/main/java/kafka/common/requests/RequestHeader.java
@@ -0,0 +1,68 @@
+package kafka.common.requests;
+
+import static kafka.common.protocol.Protocol.REQUEST_HEADER;
+
+import java.nio.ByteBuffer;
+
+import kafka.common.protocol.ProtoUtils;
+import kafka.common.protocol.Protocol;
+import kafka.common.protocol.types.Field;
+import kafka.common.protocol.types.Struct;
+
+/**
+ * The header for a request in the Kafka protocol
+ */
+public class RequestHeader {
+
+    private static Field API_KEY_FIELD = REQUEST_HEADER.get("api_key");
+    private static Field API_VERSION_FIELD = REQUEST_HEADER.get("api_version");
+    private static Field CLIENT_ID_FIELD = REQUEST_HEADER.get("client_id");
+    private static Field CORRELATION_ID_FIELD = REQUEST_HEADER.get("correlation_id");
+
+    private final Struct header;
+
+    public RequestHeader(Struct header) {
+        super();
+        this.header = header;
+    }
+
+    public RequestHeader(short apiKey, String client, int correlation) {
+        this(apiKey, ProtoUtils.latestVersion(apiKey), client, correlation);
+    }
+
+    public RequestHeader(short apiKey, short version, String client, int correlation) {
+        this(new Struct(Protocol.REQUEST_HEADER));
+        this.header.set(API_KEY_FIELD, apiKey);
+        this.header.set(API_VERSION_FIELD, version);
+        this.header.set(CLIENT_ID_FIELD, client);
+        this.header.set(CORRELATION_ID_FIELD, correlation);
+    }
+
+    public short apiKey() {
+        return (Short) this.header.get(API_KEY_FIELD);
+    }
+
+    public short apiVersion() {
+        return (Short) this.header.get(API_VERSION_FIELD);
+    }
+
+    public String clientId() {
+        return (String) this.header.get(CLIENT_ID_FIELD);
+    }
+
+    public int correlationId() {
+        return (Integer) this.header.get(CORRELATION_ID_FIELD);
+    }
+
+    public static RequestHeader parse(ByteBuffer buffer) {
+        return new RequestHeader((Struct) Protocol.REQUEST_HEADER.read(buffer));
+    }
+
+    public void writeTo(ByteBuffer buffer) {
+        header.writeTo(buffer);
+    }
+
+    public int sizeOf() {
+        return header.sizeOf();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/requests/RequestSend.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/requests/RequestSend.java b/clients/src/main/java/kafka/common/requests/RequestSend.java
new file mode 100644
index 0000000..f6a9a86
--- /dev/null
+++ b/clients/src/main/java/kafka/common/requests/RequestSend.java
@@ -0,0 +1,38 @@
+package kafka.common.requests;
+
+import java.nio.ByteBuffer;
+
+import kafka.common.network.NetworkSend;
+import kafka.common.protocol.types.Struct;
+
+/**
+ * A send object for a kafka request
+ */
+public class RequestSend extends NetworkSend {
+
+    private final RequestHeader header;
+    private final Struct body;
+
+    public RequestSend(int destination, RequestHeader header, Struct body) {
+        super(destination, serialize(header, body));
+        this.header = header;
+        this.body = body;
+    }
+
+    private static ByteBuffer serialize(RequestHeader header, Struct body) {
+        ByteBuffer buffer = ByteBuffer.allocate(header.sizeOf() + body.sizeOf());
+        header.writeTo(buffer);
+        body.writeTo(buffer);
+        buffer.rewind();
+        return buffer;
+    }
+
+    public RequestHeader header() {
+        return this.header;
+    }
+
+    public Struct body() {
+        return body;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/requests/ResponseHeader.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/requests/ResponseHeader.java b/clients/src/main/java/kafka/common/requests/ResponseHeader.java
new file mode 100644
index 0000000..1ef8e15
--- /dev/null
+++ b/clients/src/main/java/kafka/common/requests/ResponseHeader.java
@@ -0,0 +1,45 @@
+package kafka.common.requests;
+
+import static kafka.common.protocol.Protocol.RESPONSE_HEADER;
+
+import java.nio.ByteBuffer;
+
+import kafka.common.protocol.Protocol;
+import kafka.common.protocol.types.Field;
+import kafka.common.protocol.types.Struct;
+
+/**
+ * A response header in the kafka protocol.
+ */
+public class ResponseHeader {
+
+    private static Field CORRELATION_KEY_FIELD = RESPONSE_HEADER.get("correlation_id");
+
+    private final Struct header;
+
+    public ResponseHeader(Struct header) {
+        this.header = header;
+    }
+
+    public ResponseHeader(int correlationId) {
+        this(new Struct(Protocol.RESPONSE_HEADER));
+        this.header.set(CORRELATION_KEY_FIELD, correlationId);
+    }
+
+    public int correlationId() {
+        return (Integer) header.get(CORRELATION_KEY_FIELD);
+    }
+
+    public void writeTo(ByteBuffer buffer) {
+        header.writeTo(buffer);
+    }
+
+    public int sizeOf() {
+        return header.sizeOf();
+    }
+
+    public static ResponseHeader parse(ByteBuffer buffer) {
+        return new ResponseHeader(((Struct) Protocol.RESPONSE_HEADER.read(buffer)));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/utils/AbstractIterator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/utils/AbstractIterator.java b/clients/src/main/java/kafka/common/utils/AbstractIterator.java
new file mode 100644
index 0000000..f3190d7
--- /dev/null
+++ b/clients/src/main/java/kafka/common/utils/AbstractIterator.java
@@ -0,0 +1,72 @@
+package kafka.common.utils;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * A base class that simplifies implementing an iterator
+ * @param <T> The type of thing we are iterating over
+ */
+public abstract class AbstractIterator<T> implements Iterator<T> {
+
+    private static enum State {
+        READY, NOT_READY, DONE, FAILED
+    };
+
+    private State state = State.NOT_READY;
+    private T next;
+
+    @Override
+    public boolean hasNext() {
+        switch (state) {
+            case FAILED:
+                throw new IllegalStateException("Iterator is in failed state");
+            case DONE:
+                return false;
+            case READY:
+                return true;
+            default:
+                return maybeComputeNext();
+        }
+    }
+
+    @Override
+    public T next() {
+        if (!hasNext())
+            throw new NoSuchElementException();
+        state = State.NOT_READY;
+        if (next == null)
+            throw new IllegalStateException("Expected item but none found.");
+        return next;
+    }
+
+    @Override
+    public void remove() {
+        throw new UnsupportedOperationException("Removal not supported");
+    }
+
+    public T peek() {
+        if (!hasNext())
+            throw new NoSuchElementException();
+        return next;
+    }
+
+    protected T allDone() {
+        state = State.DONE;
+        return null;
+    }
+
+    protected abstract T makeNext();
+
+    private Boolean maybeComputeNext() {
+        state = State.FAILED;
+        next = makeNext();
+        if (state == State.DONE) {
+            return false;
+        } else {
+            state = State.READY;
+            return true;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/common/utils/CopyOnWriteMap.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/utils/CopyOnWriteMap.java b/clients/src/main/java/kafka/common/utils/CopyOnWriteMap.java
new file mode 100644
index 0000000..e45df98
--- /dev/null
+++ b/clients/src/main/java/kafka/common/utils/CopyOnWriteMap.java
@@ -0,0 +1,130 @@
+package kafka.common.utils;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * A simple read-optimized map implementation that synchronizes only writes and does a fully copy on each modification
+ */
+public class CopyOnWriteMap<K, V> implements ConcurrentMap<K, V> {
+
+    private volatile Map<K, V> map;
+
+    public CopyOnWriteMap() {
+        this.map = Collections.emptyMap();
+    }
+
+    public CopyOnWriteMap(Map<K, V> map) {
+        this.map = Collections.unmodifiableMap(map);
+    }
+
+    @Override
+    public boolean containsKey(Object k) {
+        return map.containsKey(k);
+    }
+
+    @Override
+    public boolean containsValue(Object v) {
+        return map.containsValue(v);
+    }
+
+    @Override
+    public Set<java.util.Map.Entry<K, V>> entrySet() {
+        return map.entrySet();
+    }
+
+    @Override
+    public V get(Object k) {
+        return map.get(k);
+    }
+
+    @Override
+    public boolean isEmpty() {
+        return map.isEmpty();
+    }
+
+    @Override
+    public Set<K> keySet() {
+        return map.keySet();
+    }
+
+    @Override
+    public int size() {
+        return map.size();
+    }
+
+    @Override
+    public Collection<V> values() {
+        return map.values();
+    }
+
+    @Override
+    public synchronized void clear() {
+        this.map = Collections.emptyMap();
+    }
+
+    @Override
+    public synchronized V put(K k, V v) {
+        Map<K, V> copy = new HashMap<K, V>(this.map);
+        V prev = copy.put(k, v);
+        this.map = Collections.unmodifiableMap(copy);
+        return prev;
+    }
+
+    @Override
+    public synchronized void putAll(Map<? extends K, ? extends V> entries) {
+        Map<K, V> copy = new HashMap<K, V>(this.map);
+        copy.putAll(entries);
+        this.map = Collections.unmodifiableMap(copy);
+    }
+
+    @Override
+    public synchronized V remove(Object key) {
+        Map<K, V> copy = new HashMap<K, V>(this.map);
+        V prev = copy.remove(key);
+        this.map = Collections.unmodifiableMap(copy);
+        return prev;
+    }
+
+    @Override
+    public synchronized V putIfAbsent(K k, V v) {
+        if (!containsKey(k))
+            return put(k, v);
+        else
+            return get(k);
+    }
+
+    @Override
+    public synchronized boolean remove(Object k, Object v) {
+        if (containsKey(k) && get(k).equals(v)) {
+            remove(k);
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    @Override
+    public synchronized boolean replace(K k, V original, V replacement) {
+        if (containsKey(k) && get(k).equals(original)) {
+            put(k, replacement);
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    @Override
+    public synchronized V replace(K k, V v) {
+        if (containsKey(k)) {
+            return put(k, v);
+        } else {
+            return null;
+        }
+    }
+
+}