You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2014/02/07 01:26:41 UTC
[10/13] Rename client package from kafka.* to org.apache.kafka.*
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/protocol/types/ArrayOf.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/protocol/types/ArrayOf.java b/clients/src/main/java/kafka/common/protocol/types/ArrayOf.java
deleted file mode 100644
index 5daf95b..0000000
--- a/clients/src/main/java/kafka/common/protocol/types/ArrayOf.java
+++ /dev/null
@@ -1,63 +0,0 @@
-package kafka.common.protocol.types;
-
-import java.nio.ByteBuffer;
-
-/**
- * Represents a type for an array of a particular type
- */
-public class ArrayOf extends Type {
-
- private final Type type;
-
- public ArrayOf(Type type) {
- this.type = type;
- }
-
- @Override
- public void write(ByteBuffer buffer, Object o) {
- Object[] objs = (Object[]) o;
- int size = objs.length;
- buffer.putInt(size);
- for (int i = 0; i < size; i++)
- type.write(buffer, objs[i]);
- }
-
- @Override
- public Object read(ByteBuffer buffer) {
- int size = buffer.getInt();
- Object[] objs = new Object[size];
- for (int i = 0; i < size; i++)
- objs[i] = type.read(buffer);
- return objs;
- }
-
- @Override
- public int sizeOf(Object o) {
- Object[] objs = (Object[]) o;
- int size = 4;
- for (int i = 0; i < objs.length; i++)
- size += type.sizeOf(objs[i]);
- return size;
- }
-
- public Type type() {
- return type;
- }
-
- @Override
- public String toString() {
- return "ARRAY(" + type + ")";
- }
-
- @Override
- public Object[] validate(Object item) {
- try {
- Object[] array = (Object[]) item;
- for (int i = 0; i < array.length; i++)
- type.validate(array[i]);
- return array;
- } catch (ClassCastException e) {
- throw new SchemaException("Not an Object[].");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/protocol/types/Field.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/protocol/types/Field.java b/clients/src/main/java/kafka/common/protocol/types/Field.java
deleted file mode 100644
index d018a12..0000000
--- a/clients/src/main/java/kafka/common/protocol/types/Field.java
+++ /dev/null
@@ -1,48 +0,0 @@
-package kafka.common.protocol.types;
-
-/**
- * A field in a schema
- */
-public class Field {
-
- public static final Object NO_DEFAULT = new Object();
-
- final int index;
- public final String name;
- public final Type type;
- public final Object defaultValue;
- public final String doc;
- final Schema schema;
-
- public Field(int index, String name, Type type, String doc, Object defaultValue, Schema schema) {
- this.index = index;
- this.name = name;
- this.type = type;
- this.doc = doc;
- this.defaultValue = defaultValue;
- this.schema = schema;
- if (defaultValue != NO_DEFAULT)
- type.validate(defaultValue);
- }
-
- public Field(int index, String name, Type type, String doc, Object defaultValue) {
- this(index, name, type, doc, defaultValue, null);
- }
-
- public Field(String name, Type type, String doc, Object defaultValue) {
- this(-1, name, type, doc, defaultValue);
- }
-
- public Field(String name, Type type, String doc) {
- this(name, type, doc, NO_DEFAULT);
- }
-
- public Field(String name, Type type) {
- this(name, type, "");
- }
-
- public Type type() {
- return type;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/protocol/types/Schema.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/protocol/types/Schema.java b/clients/src/main/java/kafka/common/protocol/types/Schema.java
deleted file mode 100644
index b7b1c75..0000000
--- a/clients/src/main/java/kafka/common/protocol/types/Schema.java
+++ /dev/null
@@ -1,134 +0,0 @@
-package kafka.common.protocol.types;
-
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * The schema for a compound record definition
- */
-public class Schema extends Type {
-
- private final Field[] fields;
- private final Map<String, Field> fieldsByName;
-
- public Schema(Field... fs) {
- this.fields = new Field[fs.length];
- this.fieldsByName = new HashMap<String, Field>();
- for (int i = 0; i < this.fields.length; i++) {
- Field field = fs[i];
- if (fieldsByName.containsKey(field.name))
- throw new SchemaException("Schema contains a duplicate field: " + field.name);
- this.fields[i] = new Field(i, field.name, field.type, field.doc, field.defaultValue, this);
- this.fieldsByName.put(fs[i].name, this.fields[i]);
- }
- }
-
- /**
- * Write a struct to the buffer
- */
- public void write(ByteBuffer buffer, Object o) {
- Struct r = (Struct) o;
- for (int i = 0; i < fields.length; i++) {
- Field f = fields[i];
- try {
- Object value = f.type().validate(r.get(f));
- f.type.write(buffer, value);
- } catch (Exception e) {
- throw new SchemaException("Error writing field '" + f.name + "': " + e.getMessage() == null ? e.getMessage() : e.getClass()
- .getName());
- }
- }
- }
-
- /**
- * Read a struct from the buffer
- */
- public Object read(ByteBuffer buffer) {
- Object[] objects = new Object[fields.length];
- for (int i = 0; i < fields.length; i++)
- objects[i] = fields[i].type.read(buffer);
- return new Struct(this, objects);
- }
-
- /**
- * The size of the given record
- */
- public int sizeOf(Object o) {
- int size = 0;
- Struct r = (Struct) o;
- for (int i = 0; i < fields.length; i++)
- size += fields[i].type.sizeOf(r.get(fields[i]));
- return size;
- }
-
- /**
- * The number of fields in this schema
- */
- public int numFields() {
- return this.fields.length;
- }
-
- /**
- * Get a field by its slot in the record array
- *
- * @param slot The slot at which this field sits
- * @return The field
- */
- public Field get(int slot) {
- return this.fields[slot];
- }
-
- /**
- * Get a field by its name
- *
- * @param name The name of the field
- * @return The field
- */
- public Field get(String name) {
- return this.fieldsByName.get(name);
- }
-
- /**
- * Get all the fields in this schema
- */
- public Field[] fields() {
- return this.fields;
- }
-
- /**
- * Display a string representation of the schema
- */
- public String toString() {
- StringBuilder b = new StringBuilder();
- b.append('{');
- for (int i = 0; i < this.fields.length; i++) {
- b.append(this.fields[i].name);
- b.append(':');
- b.append(this.fields[i].type());
- if (i < this.fields.length - 1)
- b.append(',');
- }
- b.append("}");
- return b.toString();
- }
-
- @Override
- public Struct validate(Object item) {
- try {
- Struct struct = (Struct) item;
- for (int i = 0; i < this.fields.length; i++) {
- Field field = this.fields[i];
- try {
- field.type.validate(struct.get(field));
- } catch (SchemaException e) {
- throw new SchemaException("Invalid value for field '" + field.name + "': " + e.getMessage());
- }
- }
- return struct;
- } catch (ClassCastException e) {
- throw new SchemaException("Not a Struct.");
- }
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/protocol/types/SchemaException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/protocol/types/SchemaException.java b/clients/src/main/java/kafka/common/protocol/types/SchemaException.java
deleted file mode 100644
index a2a2d50..0000000
--- a/clients/src/main/java/kafka/common/protocol/types/SchemaException.java
+++ /dev/null
@@ -1,13 +0,0 @@
-package kafka.common.protocol.types;
-
-import kafka.common.KafkaException;
-
-public class SchemaException extends KafkaException {
-
- private static final long serialVersionUID = 1L;
-
- public SchemaException(String message) {
- super(message);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/protocol/types/Struct.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/protocol/types/Struct.java b/clients/src/main/java/kafka/common/protocol/types/Struct.java
deleted file mode 100644
index c83aefa..0000000
--- a/clients/src/main/java/kafka/common/protocol/types/Struct.java
+++ /dev/null
@@ -1,227 +0,0 @@
-package kafka.common.protocol.types;
-
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-
-/**
- * A record that can be serialized and deserialized according to a pre-defined schema
- */
-public class Struct {
- private final Schema schema;
- private final Object[] values;
-
- Struct(Schema schema, Object[] values) {
- this.schema = schema;
- this.values = values;
- }
-
- public Struct(Schema schema) {
- this.schema = schema;
- this.values = new Object[this.schema.numFields()];
- }
-
- /**
- * The schema for this struct.
- */
- public Schema schema() {
- return this.schema;
- }
-
- /**
- * Return the value of the given pre-validated field, or if the value is missing return the default value.
- *
- * @param field The field for which to get the default value
- * @throws SchemaException if the field has no value and has no default.
- */
- private Object getFieldOrDefault(Field field) {
- Object value = this.values[field.index];
- if (value != null)
- return value;
- else if (field.defaultValue != Field.NO_DEFAULT)
- return field.defaultValue;
- else
- throw new SchemaException("Missing value for field '" + field.name + " which has no default value.");
- }
-
- /**
- * Get the value for the field directly by the field index with no lookup needed (faster!)
- *
- * @param field The field to look up
- * @return The value for that field.
- */
- public Object get(Field field) {
- validateField(field);
- return getFieldOrDefault(field);
- }
-
- /**
- * Get the record value for the field with the given name by doing a hash table lookup (slower!)
- *
- * @param name The name of the field
- * @return The value in the record
- */
- public Object get(String name) {
- Field field = schema.get(name);
- if (field == null)
- throw new SchemaException("No such field: " + name);
- return getFieldOrDefault(field);
- }
-
- public Struct getStruct(Field field) {
- return (Struct) get(field);
- }
-
- public Struct getStruct(String name) {
- return (Struct) get(name);
- }
-
- public Short getShort(Field field) {
- return (Short) get(field);
- }
-
- public Short getShort(String name) {
- return (Short) get(name);
- }
-
- public Integer getInt(Field field) {
- return (Integer) get(field);
- }
-
- public Integer getInt(String name) {
- return (Integer) get(name);
- }
-
- public Object[] getArray(Field field) {
- return (Object[]) get(field);
- }
-
- public Object[] getArray(String name) {
- return (Object[]) get(name);
- }
-
- public String getString(Field field) {
- return (String) get(field);
- }
-
- public String getString(String name) {
- return (String) get(name);
- }
-
- /**
- * Set the given field to the specified value
- *
- * @param field The field
- * @param value The value
- */
- public Struct set(Field field, Object value) {
- validateField(field);
- this.values[field.index] = value;
- return this;
- }
-
- /**
- * Set the field specified by the given name to the value
- *
- * @param name The name of the field
- * @param value The value to set
- */
- public Struct set(String name, Object value) {
- Field field = this.schema.get(name);
- if (field == null)
- throw new SchemaException("Unknown field: " + name);
- this.values[field.index] = value;
- return this;
- }
-
- /**
- * Create a struct for the schema of a container type (struct or array)
- *
- * @param field The field to create an instance of
- * @return The struct
- */
- public Struct instance(Field field) {
- validateField(field);
- if (field.type() instanceof Schema) {
- return new Struct((Schema) field.type());
- } else if (field.type() instanceof ArrayOf) {
- ArrayOf array = (ArrayOf) field.type();
- return new Struct((Schema) array.type());
- } else {
- throw new SchemaException("Field " + field.name + " is not a container type, it is of type " + field.type());
- }
- }
-
- /**
- * Create a struct instance for the given field which must be a container type (struct or array)
- *
- * @param field The name of the field to create (field must be a schema type)
- * @return The struct
- */
- public Struct instance(String field) {
- return instance(schema.get(field));
- }
-
- /**
- * Empty all the values from this record
- */
- public void clear() {
- Arrays.fill(this.values, null);
- }
-
- /**
- * Get the serialized size of this object
- */
- public int sizeOf() {
- return this.schema.sizeOf(this);
- }
-
- /**
- * Write this struct to a buffer
- */
- public void writeTo(ByteBuffer buffer) {
- this.schema.write(buffer, this);
- }
-
- /**
- * Ensure the user doesn't try to access fields from the wrong schema
- */
- private void validateField(Field field) {
- if (this.schema != field.schema)
- throw new SchemaException("Attempt to access field '" + field.name + " from a different schema instance.");
- if (field.index > values.length)
- throw new SchemaException("Invalid field index: " + field.index);
- }
-
- /**
- * Validate the contents of this struct against its schema
- */
- public void validate() {
- this.schema.validate(this);
- }
-
- /**
- * Create a byte buffer containing the serialized form of the values in this struct. This method can choose to break
- * the struct into multiple ByteBuffers if need be.
- */
- public ByteBuffer[] toBytes() {
- ByteBuffer buffer = ByteBuffer.allocate(sizeOf());
- writeTo(buffer);
- return new ByteBuffer[] { buffer };
- }
-
- @Override
- public String toString() {
- StringBuilder b = new StringBuilder();
- b.append('{');
- for (int i = 0; i < this.values.length; i++) {
- b.append(this.schema.get(i).name);
- b.append('=');
- b.append(this.values[i]);
- if (i < this.values.length - 1)
- b.append(',');
- }
- b.append('}');
- return b.toString();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/protocol/types/Type.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/protocol/types/Type.java b/clients/src/main/java/kafka/common/protocol/types/Type.java
deleted file mode 100644
index f4c93e3..0000000
--- a/clients/src/main/java/kafka/common/protocol/types/Type.java
+++ /dev/null
@@ -1,216 +0,0 @@
-package kafka.common.protocol.types;
-
-import java.nio.ByteBuffer;
-
-import kafka.common.utils.Utils;
-
-/**
- * A serializable type
- */
-public abstract class Type {
-
- public abstract void write(ByteBuffer buffer, Object o);
-
- public abstract Object read(ByteBuffer buffer);
-
- public abstract int sizeOf(Object o);
-
- public abstract Object validate(Object o);
-
- public static final Type INT8 = new Type() {
- @Override
- public void write(ByteBuffer buffer, Object o) {
- buffer.put((Byte) o);
- }
-
- @Override
- public Object read(ByteBuffer buffer) {
- return buffer.get();
- }
-
- @Override
- public int sizeOf(Object o) {
- return 1;
- }
-
- @Override
- public String toString() {
- return "INT8";
- }
-
- @Override
- public Byte validate(Object item) {
- if (item instanceof Byte)
- return (Byte) item;
- else
- throw new SchemaException(item + " is not a Byte.");
- }
- };
-
- public static final Type INT16 = new Type() {
- @Override
- public void write(ByteBuffer buffer, Object o) {
- buffer.putShort((Short) o);
- }
-
- @Override
- public Object read(ByteBuffer buffer) {
- return buffer.getShort();
- }
-
- @Override
- public int sizeOf(Object o) {
- return 2;
- }
-
- @Override
- public String toString() {
- return "INT16";
- }
-
- @Override
- public Short validate(Object item) {
- if (item instanceof Short)
- return (Short) item;
- else
- throw new SchemaException(item + " is not a Short.");
- }
- };
-
- public static final Type INT32 = new Type() {
- @Override
- public void write(ByteBuffer buffer, Object o) {
- buffer.putInt((Integer) o);
- }
-
- @Override
- public Object read(ByteBuffer buffer) {
- return buffer.getInt();
- }
-
- @Override
- public int sizeOf(Object o) {
- return 4;
- }
-
- @Override
- public String toString() {
- return "INT32";
- }
-
- @Override
- public Integer validate(Object item) {
- if (item instanceof Integer)
- return (Integer) item;
- else
- throw new SchemaException(item + " is not an Integer.");
- }
- };
-
- public static final Type INT64 = new Type() {
- @Override
- public void write(ByteBuffer buffer, Object o) {
- buffer.putLong((Long) o);
- }
-
- @Override
- public Object read(ByteBuffer buffer) {
- return buffer.getLong();
- }
-
- @Override
- public int sizeOf(Object o) {
- return 8;
- }
-
- @Override
- public String toString() {
- return "INT64";
- }
-
- @Override
- public Long validate(Object item) {
- if (item instanceof Long)
- return (Long) item;
- else
- throw new SchemaException(item + " is not a Long.");
- }
- };
-
- public static final Type STRING = new Type() {
- @Override
- public void write(ByteBuffer buffer, Object o) {
- byte[] bytes = Utils.utf8((String) o);
- if (bytes.length > Short.MAX_VALUE)
- throw new SchemaException("String is longer than the maximum string length.");
- buffer.putShort((short) bytes.length);
- buffer.put(bytes);
- }
-
- @Override
- public Object read(ByteBuffer buffer) {
- int length = buffer.getShort();
- byte[] bytes = new byte[length];
- buffer.get(bytes);
- return Utils.utf8(bytes);
- }
-
- @Override
- public int sizeOf(Object o) {
- return 2 + Utils.utf8Length((String) o);
- }
-
- @Override
- public String toString() {
- return "STRING";
- }
-
- @Override
- public String validate(Object item) {
- if (item instanceof String)
- return (String) item;
- else
- throw new SchemaException(item + " is not a String.");
- }
- };
-
- public static final Type BYTES = new Type() {
- @Override
- public void write(ByteBuffer buffer, Object o) {
- ByteBuffer arg = (ByteBuffer) o;
- int pos = arg.position();
- buffer.putInt(arg.remaining());
- buffer.put(arg);
- arg.position(pos);
- }
-
- @Override
- public Object read(ByteBuffer buffer) {
- int size = buffer.getInt();
- ByteBuffer val = buffer.slice();
- val.limit(size);
- buffer.position(buffer.position() + size);
- return val;
- }
-
- @Override
- public int sizeOf(Object o) {
- ByteBuffer buffer = (ByteBuffer) o;
- return 4 + buffer.remaining();
- }
-
- @Override
- public String toString() {
- return "BYTES";
- }
-
- @Override
- public ByteBuffer validate(Object item) {
- if (item instanceof ByteBuffer)
- return (ByteBuffer) item;
- else
- throw new SchemaException(item + " is not a java.nio.ByteBuffer.");
- }
- };
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/record/CompressionType.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/record/CompressionType.java b/clients/src/main/java/kafka/common/record/CompressionType.java
deleted file mode 100644
index f6d9026..0000000
--- a/clients/src/main/java/kafka/common/record/CompressionType.java
+++ /dev/null
@@ -1,40 +0,0 @@
-package kafka.common.record;
-
-/**
- * The compression type to use
- */
-public enum CompressionType {
- NONE(0, "none"), GZIP(1, "gzip"), SNAPPY(2, "snappy");
-
- public final int id;
- public final String name;
-
- private CompressionType(int id, String name) {
- this.id = id;
- this.name = name;
- }
-
- public static CompressionType forId(int id) {
- switch (id) {
- case 0:
- return NONE;
- case 1:
- return GZIP;
- case 2:
- return SNAPPY;
- default:
- throw new IllegalArgumentException("Unknown compression type id: " + id);
- }
- }
-
- public static CompressionType forName(String name) {
- if (NONE.name.equals(name))
- return NONE;
- else if (GZIP.name.equals(name))
- return GZIP;
- else if (SNAPPY.name.equals(name))
- return SNAPPY;
- else
- throw new IllegalArgumentException("Unknown compression name: " + name);
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/record/InvalidRecordException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/record/InvalidRecordException.java b/clients/src/main/java/kafka/common/record/InvalidRecordException.java
deleted file mode 100644
index 97fbe50..0000000
--- a/clients/src/main/java/kafka/common/record/InvalidRecordException.java
+++ /dev/null
@@ -1,11 +0,0 @@
-package kafka.common.record;
-
-public class InvalidRecordException extends RuntimeException {
-
- private static final long serialVersionUID = 1;
-
- public InvalidRecordException(String s) {
- super(s);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/record/LogEntry.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/record/LogEntry.java b/clients/src/main/java/kafka/common/record/LogEntry.java
deleted file mode 100644
index f5e99c9..0000000
--- a/clients/src/main/java/kafka/common/record/LogEntry.java
+++ /dev/null
@@ -1,28 +0,0 @@
-package kafka.common.record;
-
-/**
- * An offset and record pair
- */
-public final class LogEntry {
-
- private final long offset;
- private final Record record;
-
- public LogEntry(long offset, Record record) {
- this.offset = offset;
- this.record = record;
- }
-
- public long offset() {
- return this.offset;
- }
-
- public Record record() {
- return this.record;
- }
-
- @Override
- public String toString() {
- return "LogEntry(" + offset + ", " + record + ")";
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/record/MemoryRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/record/MemoryRecords.java b/clients/src/main/java/kafka/common/record/MemoryRecords.java
deleted file mode 100644
index d3f8426..0000000
--- a/clients/src/main/java/kafka/common/record/MemoryRecords.java
+++ /dev/null
@@ -1,102 +0,0 @@
-package kafka.common.record;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.GatheringByteChannel;
-import java.util.Iterator;
-
-import kafka.common.utils.AbstractIterator;
-
-/**
- * A {@link Records} implementation backed by a ByteBuffer.
- */
-public class MemoryRecords implements Records {
-
- private final ByteBuffer buffer;
-
- public MemoryRecords(int size) {
- this(ByteBuffer.allocate(size));
- }
-
- public MemoryRecords(ByteBuffer buffer) {
- this.buffer = buffer;
- }
-
- /**
- * Append the given record and offset to the buffer
- */
- public void append(long offset, Record record) {
- buffer.putLong(offset);
- buffer.putInt(record.size());
- buffer.put(record.buffer());
- record.buffer().rewind();
- }
-
- /**
- * Append a new record and offset to the buffer
- */
- public void append(long offset, byte[] key, byte[] value, CompressionType type) {
- buffer.putLong(offset);
- buffer.putInt(Record.recordSize(key, value));
- Record.write(this.buffer, key, value, type);
- }
-
- /**
- * Check if we have room for a new record containing the given key/value pair
- */
- public boolean hasRoomFor(byte[] key, byte[] value) {
- return this.buffer.remaining() >= Records.LOG_OVERHEAD + Record.recordSize(key, value);
- }
-
- /** Write the records in this set to the given channel */
- public int writeTo(GatheringByteChannel channel) throws IOException {
- return channel.write(buffer);
- }
-
- /**
- * The size of this record set
- */
- public int sizeInBytes() {
- return this.buffer.position();
- }
-
- /**
- * Get the byte buffer that backs this records instance
- */
- public ByteBuffer buffer() {
- return buffer.duplicate();
- }
-
- @Override
- public Iterator<LogEntry> iterator() {
- return new RecordsIterator(this.buffer);
- }
-
- /* TODO: allow reuse of the buffer used for iteration */
- public static class RecordsIterator extends AbstractIterator<LogEntry> {
- private final ByteBuffer buffer;
-
- public RecordsIterator(ByteBuffer buffer) {
- ByteBuffer copy = buffer.duplicate();
- copy.flip();
- this.buffer = copy;
- }
-
- @Override
- protected LogEntry makeNext() {
- if (buffer.remaining() < Records.LOG_OVERHEAD)
- return allDone();
- long offset = buffer.getLong();
- int size = buffer.getInt();
- if (size < 0)
- throw new IllegalStateException("Record with size " + size);
- if (buffer.remaining() < size)
- return allDone();
- ByteBuffer rec = buffer.slice();
- rec.limit(size);
- this.buffer.position(this.buffer.position() + size);
- return new LogEntry(offset, new Record(rec));
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/record/Record.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/record/Record.java b/clients/src/main/java/kafka/common/record/Record.java
deleted file mode 100644
index b89accf..0000000
--- a/clients/src/main/java/kafka/common/record/Record.java
+++ /dev/null
@@ -1,286 +0,0 @@
-package kafka.common.record;
-
-import java.nio.ByteBuffer;
-
-import kafka.common.utils.Utils;
-
-/**
- * A record: a serialized key and value along with the associated CRC and other fields
- */
-public final class Record {
-
- /**
- * The current offset and size for all the fixed-length fields
- */
- public static final int CRC_OFFSET = 0;
- public static final int CRC_LENGTH = 4;
- public static final int MAGIC_OFFSET = CRC_OFFSET + CRC_LENGTH;
- public static final int MAGIC_LENGTH = 1;
- public static final int ATTRIBUTES_OFFSET = MAGIC_OFFSET + MAGIC_LENGTH;
- public static final int ATTRIBUTE_LENGTH = 1;
- public static final int KEY_SIZE_OFFSET = ATTRIBUTES_OFFSET + ATTRIBUTE_LENGTH;
- public static final int KEY_SIZE_LENGTH = 4;
- public static final int KEY_OFFSET = KEY_SIZE_OFFSET + KEY_SIZE_LENGTH;
- public static final int VALUE_SIZE_LENGTH = 4;
-
- /** The amount of overhead bytes in a record */
- public static final int RECORD_OVERHEAD = KEY_OFFSET + VALUE_SIZE_LENGTH;
-
- /**
- * The minimum valid size for the record header
- */
- public static final int MIN_HEADER_SIZE = CRC_LENGTH + MAGIC_LENGTH + ATTRIBUTE_LENGTH + KEY_SIZE_LENGTH + VALUE_SIZE_LENGTH;
-
- /**
- * The current "magic" value
- */
- public static final byte CURRENT_MAGIC_VALUE = 0;
-
- /**
- * Specifies the mask for the compression code. 2 bits to hold the compression codec. 0 is reserved to indicate no
- * compression
- */
- public static final int COMPRESSION_CODEC_MASK = 0x03;
-
- /**
- * Compression code for uncompressed records
- */
- public static final int NO_COMPRESSION = 0;
-
- private final ByteBuffer buffer;
-
- public Record(ByteBuffer buffer) {
- this.buffer = buffer;
- }
-
- /**
- * A constructor to create a LogRecord
- *
- * @param key The key of the record (null, if none)
- * @param value The record value
- * @param codec The compression codec used on the contents of the record (if any)
- * @param valueOffset The offset into the payload array used to extract payload
- * @param valueSize The size of the payload to use
- */
- public Record(byte[] key, byte[] value, CompressionType codec, int valueOffset, int valueSize) {
- this(ByteBuffer.allocate(recordSize(key == null ? 0 : key.length, value == null ? 0 : valueSize >= 0 ? valueSize
- : value.length - valueOffset)));
- write(this.buffer, key, value, codec, valueOffset, valueSize);
- this.buffer.rewind();
- }
-
- public Record(byte[] key, byte[] value, CompressionType codec) {
- this(key, value, codec, 0, -1);
- }
-
- public Record(byte[] value, CompressionType codec) {
- this(null, value, codec);
- }
-
- public Record(byte[] key, byte[] value) {
- this(key, value, CompressionType.NONE);
- }
-
- public Record(byte[] value) {
- this(null, value, CompressionType.NONE);
- }
-
- public static void write(ByteBuffer buffer, byte[] key, byte[] value, CompressionType codec, int valueOffset, int valueSize) {
- // skip crc, we will fill that in at the end
- int pos = buffer.position();
- buffer.position(pos + MAGIC_OFFSET);
- buffer.put(CURRENT_MAGIC_VALUE);
- byte attributes = 0;
- if (codec.id > 0)
- attributes = (byte) (attributes | (COMPRESSION_CODEC_MASK & codec.id));
- buffer.put(attributes);
- // write the key
- if (key == null) {
- buffer.putInt(-1);
- } else {
- buffer.putInt(key.length);
- buffer.put(key, 0, key.length);
- }
- // write the value
- if (value == null) {
- buffer.putInt(-1);
- } else {
- int size = valueSize >= 0 ? valueSize : (value.length - valueOffset);
- buffer.putInt(size);
- buffer.put(value, valueOffset, size);
- }
-
- // now compute the checksum and fill it in
- long crc = computeChecksum(buffer,
- buffer.arrayOffset() + pos + MAGIC_OFFSET,
- buffer.position() - pos - MAGIC_OFFSET - buffer.arrayOffset());
- Utils.writeUnsignedInt(buffer, pos + CRC_OFFSET, crc);
- }
-
- public static void write(ByteBuffer buffer, byte[] key, byte[] value, CompressionType codec) {
- write(buffer, key, value, codec, 0, -1);
- }
-
- public static int recordSize(byte[] key, byte[] value) {
- return recordSize(key == null ? 0 : key.length, value == null ? 0 : value.length);
- }
-
- public static int recordSize(int keySize, int valueSize) {
- return CRC_LENGTH + MAGIC_LENGTH + ATTRIBUTE_LENGTH + KEY_SIZE_LENGTH + keySize + VALUE_SIZE_LENGTH + valueSize;
- }
-
- public ByteBuffer buffer() {
- return this.buffer;
- }
-
- /**
- * Compute the checksum of the record from the record contents
- */
- public static long computeChecksum(ByteBuffer buffer, int position, int size) {
- return Utils.crc32(buffer.array(), buffer.arrayOffset() + position, size - buffer.arrayOffset());
- }
-
- /**
- * Compute the checksum of the record from the record contents
- */
- public long computeChecksum() {
- return computeChecksum(buffer, MAGIC_OFFSET, buffer.limit() - MAGIC_OFFSET);
- }
-
- /**
- * Retrieve the previously computed CRC for this record
- */
- public long checksum() {
- return Utils.readUnsignedInt(buffer, CRC_OFFSET);
- }
-
- /**
- * Returns true if the crc stored with the record matches the crc computed off the record contents
- */
- public boolean isValid() {
- return checksum() == computeChecksum();
- }
-
- /**
- * Throw an InvalidRecordException if isValid is false for this record
- */
- public void ensureValid() {
- if (!isValid())
- throw new InvalidRecordException("Record is corrupt (stored crc = " + checksum()
- + ", computed crc = "
- + computeChecksum()
- + ")");
- }
-
- /**
- * The complete serialized size of this record in bytes (including crc, header attributes, etc)
- */
- public int size() {
- return buffer.limit();
- }
-
- /**
- * The length of the key in bytes
- */
- public int keySize() {
- return buffer.getInt(KEY_SIZE_OFFSET);
- }
-
- /**
- * Does the record have a key?
- */
- public boolean hasKey() {
- return keySize() >= 0;
- }
-
- /**
- * The position where the value size is stored
- */
- private int valueSizeOffset() {
- return KEY_OFFSET + Math.max(0, keySize());
- }
-
- /**
- * The length of the value in bytes
- */
- public int valueSize() {
- return buffer.getInt(valueSizeOffset());
- }
-
- /**
- * The magic version of this record
- */
- public byte magic() {
- return buffer.get(MAGIC_OFFSET);
- }
-
- /**
- * The attributes stored with this record
- */
- public byte attributes() {
- return buffer.get(ATTRIBUTES_OFFSET);
- }
-
- /**
- * The compression codec used with this record
- */
- public CompressionType compressionType() {
- return CompressionType.forId(buffer.get(ATTRIBUTES_OFFSET) & COMPRESSION_CODEC_MASK);
- }
-
- /**
- * A ByteBuffer containing the value of this record
- */
- public ByteBuffer value() {
- return sliceDelimited(valueSizeOffset());
- }
-
- /**
- * A ByteBuffer containing the message key
- */
- public ByteBuffer key() {
- return sliceDelimited(KEY_SIZE_OFFSET);
- }
-
- /**
- * Read a size-delimited byte buffer starting at the given offset
- */
- private ByteBuffer sliceDelimited(int start) {
- int size = buffer.getInt(start);
- if (size < 0) {
- return null;
- } else {
- ByteBuffer b = buffer.duplicate();
- b.position(start + 4);
- b = b.slice();
- b.limit(size);
- b.rewind();
- return b;
- }
- }
-
- public String toString() {
- return String.format("Record(magic = %d, attributes = %d, crc = %d, key = %d bytes, value = %d bytes)",
- magic(),
- attributes(),
- checksum(),
- key().limit(),
- value().limit());
- }
-
- public boolean equals(Object other) {
- if (this == other)
- return true;
- if (other == null)
- return false;
- if (!other.getClass().equals(Record.class))
- return false;
- Record record = (Record) other;
- return this.buffer.equals(record.buffer);
- }
-
- public int hashCode() {
- return buffer.hashCode();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/record/Records.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/record/Records.java b/clients/src/main/java/kafka/common/record/Records.java
deleted file mode 100644
index 6531ca0..0000000
--- a/clients/src/main/java/kafka/common/record/Records.java
+++ /dev/null
@@ -1,29 +0,0 @@
-package kafka.common.record;
-
-import java.io.IOException;
-import java.nio.channels.GatheringByteChannel;
-
-/**
- * A binary format which consists of a 4 byte size, an 8 byte offset, and the record bytes. See {@link MemoryRecords}
- * for the in-memory representation.
- */
-public interface Records extends Iterable<LogEntry> {
-
- int SIZE_LENGTH = 4;
- int OFFSET_LENGTH = 8;
- int LOG_OVERHEAD = SIZE_LENGTH + OFFSET_LENGTH;
-
- /**
- * Write these records to the given channel
- * @param channel The channel to write to
- * @return The number of bytes written
- * @throws IOException If the write fails.
- */
- public int writeTo(GatheringByteChannel channel) throws IOException;
-
- /**
- * The size of these records in bytes
- */
- public int sizeInBytes();
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/requests/RequestHeader.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/requests/RequestHeader.java b/clients/src/main/java/kafka/common/requests/RequestHeader.java
deleted file mode 100644
index 4ce67f8..0000000
--- a/clients/src/main/java/kafka/common/requests/RequestHeader.java
+++ /dev/null
@@ -1,68 +0,0 @@
-package kafka.common.requests;
-
-import static kafka.common.protocol.Protocol.REQUEST_HEADER;
-
-import java.nio.ByteBuffer;
-
-import kafka.common.protocol.ProtoUtils;
-import kafka.common.protocol.Protocol;
-import kafka.common.protocol.types.Field;
-import kafka.common.protocol.types.Struct;
-
-/**
- * The header for a request in the Kafka protocol
- */
-public class RequestHeader {
-
- private static Field API_KEY_FIELD = REQUEST_HEADER.get("api_key");
- private static Field API_VERSION_FIELD = REQUEST_HEADER.get("api_version");
- private static Field CLIENT_ID_FIELD = REQUEST_HEADER.get("client_id");
- private static Field CORRELATION_ID_FIELD = REQUEST_HEADER.get("correlation_id");
-
- private final Struct header;
-
- public RequestHeader(Struct header) {
- super();
- this.header = header;
- }
-
- public RequestHeader(short apiKey, String client, int correlation) {
- this(apiKey, ProtoUtils.latestVersion(apiKey), client, correlation);
- }
-
- public RequestHeader(short apiKey, short version, String client, int correlation) {
- this(new Struct(Protocol.REQUEST_HEADER));
- this.header.set(API_KEY_FIELD, apiKey);
- this.header.set(API_VERSION_FIELD, version);
- this.header.set(CLIENT_ID_FIELD, client);
- this.header.set(CORRELATION_ID_FIELD, correlation);
- }
-
- public short apiKey() {
- return (Short) this.header.get(API_KEY_FIELD);
- }
-
- public short apiVersion() {
- return (Short) this.header.get(API_VERSION_FIELD);
- }
-
- public String clientId() {
- return (String) this.header.get(CLIENT_ID_FIELD);
- }
-
- public int correlationId() {
- return (Integer) this.header.get(CORRELATION_ID_FIELD);
- }
-
- public static RequestHeader parse(ByteBuffer buffer) {
- return new RequestHeader((Struct) Protocol.REQUEST_HEADER.read(buffer));
- }
-
- public void writeTo(ByteBuffer buffer) {
- header.writeTo(buffer);
- }
-
- public int sizeOf() {
- return header.sizeOf();
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/requests/RequestSend.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/requests/RequestSend.java b/clients/src/main/java/kafka/common/requests/RequestSend.java
deleted file mode 100644
index f6a9a86..0000000
--- a/clients/src/main/java/kafka/common/requests/RequestSend.java
+++ /dev/null
@@ -1,38 +0,0 @@
-package kafka.common.requests;
-
-import java.nio.ByteBuffer;
-
-import kafka.common.network.NetworkSend;
-import kafka.common.protocol.types.Struct;
-
-/**
- * A send object for a kafka request
- */
-public class RequestSend extends NetworkSend {
-
- private final RequestHeader header;
- private final Struct body;
-
- public RequestSend(int destination, RequestHeader header, Struct body) {
- super(destination, serialize(header, body));
- this.header = header;
- this.body = body;
- }
-
- private static ByteBuffer serialize(RequestHeader header, Struct body) {
- ByteBuffer buffer = ByteBuffer.allocate(header.sizeOf() + body.sizeOf());
- header.writeTo(buffer);
- body.writeTo(buffer);
- buffer.rewind();
- return buffer;
- }
-
- public RequestHeader header() {
- return this.header;
- }
-
- public Struct body() {
- return body;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/requests/ResponseHeader.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/requests/ResponseHeader.java b/clients/src/main/java/kafka/common/requests/ResponseHeader.java
deleted file mode 100644
index 1ef8e15..0000000
--- a/clients/src/main/java/kafka/common/requests/ResponseHeader.java
+++ /dev/null
@@ -1,45 +0,0 @@
-package kafka.common.requests;
-
-import static kafka.common.protocol.Protocol.RESPONSE_HEADER;
-
-import java.nio.ByteBuffer;
-
-import kafka.common.protocol.Protocol;
-import kafka.common.protocol.types.Field;
-import kafka.common.protocol.types.Struct;
-
-/**
- * A response header in the kafka protocol.
- */
-public class ResponseHeader {
-
- private static Field CORRELATION_KEY_FIELD = RESPONSE_HEADER.get("correlation_id");
-
- private final Struct header;
-
- public ResponseHeader(Struct header) {
- this.header = header;
- }
-
- public ResponseHeader(int correlationId) {
- this(new Struct(Protocol.RESPONSE_HEADER));
- this.header.set(CORRELATION_KEY_FIELD, correlationId);
- }
-
- public int correlationId() {
- return (Integer) header.get(CORRELATION_KEY_FIELD);
- }
-
- public void writeTo(ByteBuffer buffer) {
- header.writeTo(buffer);
- }
-
- public int sizeOf() {
- return header.sizeOf();
- }
-
- public static ResponseHeader parse(ByteBuffer buffer) {
- return new ResponseHeader(((Struct) Protocol.RESPONSE_HEADER.read(buffer)));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/utils/AbstractIterator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/utils/AbstractIterator.java b/clients/src/main/java/kafka/common/utils/AbstractIterator.java
deleted file mode 100644
index f3190d7..0000000
--- a/clients/src/main/java/kafka/common/utils/AbstractIterator.java
+++ /dev/null
@@ -1,72 +0,0 @@
-package kafka.common.utils;
-
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
-/**
- * A base class that simplifies implementing an iterator
- * @param <T> The type of thing we are iterating over
- */
-public abstract class AbstractIterator<T> implements Iterator<T> {
-
- private static enum State {
- READY, NOT_READY, DONE, FAILED
- };
-
- private State state = State.NOT_READY;
- private T next;
-
- @Override
- public boolean hasNext() {
- switch (state) {
- case FAILED:
- throw new IllegalStateException("Iterator is in failed state");
- case DONE:
- return false;
- case READY:
- return true;
- default:
- return maybeComputeNext();
- }
- }
-
- @Override
- public T next() {
- if (!hasNext())
- throw new NoSuchElementException();
- state = State.NOT_READY;
- if (next == null)
- throw new IllegalStateException("Expected item but none found.");
- return next;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException("Removal not supported");
- }
-
- public T peek() {
- if (!hasNext())
- throw new NoSuchElementException();
- return next;
- }
-
- protected T allDone() {
- state = State.DONE;
- return null;
- }
-
- protected abstract T makeNext();
-
- private Boolean maybeComputeNext() {
- state = State.FAILED;
- next = makeNext();
- if (state == State.DONE) {
- return false;
- } else {
- state = State.READY;
- return true;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/utils/CopyOnWriteMap.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/utils/CopyOnWriteMap.java b/clients/src/main/java/kafka/common/utils/CopyOnWriteMap.java
deleted file mode 100644
index e45df98..0000000
--- a/clients/src/main/java/kafka/common/utils/CopyOnWriteMap.java
+++ /dev/null
@@ -1,130 +0,0 @@
-package kafka.common.utils;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
-
-/**
- * A simple read-optimized map implementation that synchronizes only writes and does a fully copy on each modification
- */
-public class CopyOnWriteMap<K, V> implements ConcurrentMap<K, V> {
-
- private volatile Map<K, V> map;
-
- public CopyOnWriteMap() {
- this.map = Collections.emptyMap();
- }
-
- public CopyOnWriteMap(Map<K, V> map) {
- this.map = Collections.unmodifiableMap(map);
- }
-
- @Override
- public boolean containsKey(Object k) {
- return map.containsKey(k);
- }
-
- @Override
- public boolean containsValue(Object v) {
- return map.containsValue(v);
- }
-
- @Override
- public Set<java.util.Map.Entry<K, V>> entrySet() {
- return map.entrySet();
- }
-
- @Override
- public V get(Object k) {
- return map.get(k);
- }
-
- @Override
- public boolean isEmpty() {
- return map.isEmpty();
- }
-
- @Override
- public Set<K> keySet() {
- return map.keySet();
- }
-
- @Override
- public int size() {
- return map.size();
- }
-
- @Override
- public Collection<V> values() {
- return map.values();
- }
-
- @Override
- public synchronized void clear() {
- this.map = Collections.emptyMap();
- }
-
- @Override
- public synchronized V put(K k, V v) {
- Map<K, V> copy = new HashMap<K, V>(this.map);
- V prev = copy.put(k, v);
- this.map = Collections.unmodifiableMap(copy);
- return prev;
- }
-
- @Override
- public synchronized void putAll(Map<? extends K, ? extends V> entries) {
- Map<K, V> copy = new HashMap<K, V>(this.map);
- copy.putAll(entries);
- this.map = Collections.unmodifiableMap(copy);
- }
-
- @Override
- public synchronized V remove(Object key) {
- Map<K, V> copy = new HashMap<K, V>(this.map);
- V prev = copy.remove(key);
- this.map = Collections.unmodifiableMap(copy);
- return prev;
- }
-
- @Override
- public synchronized V putIfAbsent(K k, V v) {
- if (!containsKey(k))
- return put(k, v);
- else
- return get(k);
- }
-
- @Override
- public synchronized boolean remove(Object k, Object v) {
- if (containsKey(k) && get(k).equals(v)) {
- remove(k);
- return true;
- } else {
- return false;
- }
- }
-
- @Override
- public synchronized boolean replace(K k, V original, V replacement) {
- if (containsKey(k) && get(k).equals(original)) {
- put(k, replacement);
- return true;
- } else {
- return false;
- }
- }
-
- @Override
- public synchronized V replace(K k, V v) {
- if (containsKey(k)) {
- return put(k, v);
- } else {
- return null;
- }
- }
-
-}