You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2014/02/07 01:26:41 UTC

[10/13] Rename client package from kafka.* to org.apache.kafka.*

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/protocol/types/ArrayOf.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/protocol/types/ArrayOf.java b/clients/src/main/java/kafka/common/protocol/types/ArrayOf.java
deleted file mode 100644
index 5daf95b..0000000
--- a/clients/src/main/java/kafka/common/protocol/types/ArrayOf.java
+++ /dev/null
@@ -1,63 +0,0 @@
-package kafka.common.protocol.types;
-
-import java.nio.ByteBuffer;
-
-/**
- * Represents a type for an array of a particular type
- */
-public class ArrayOf extends Type {
-
-    private final Type type;
-
-    public ArrayOf(Type type) {
-        this.type = type;
-    }
-
-    @Override
-    public void write(ByteBuffer buffer, Object o) {
-        Object[] objs = (Object[]) o;
-        int size = objs.length;
-        buffer.putInt(size);
-        for (int i = 0; i < size; i++)
-            type.write(buffer, objs[i]);
-    }
-
-    @Override
-    public Object read(ByteBuffer buffer) {
-        int size = buffer.getInt();
-        Object[] objs = new Object[size];
-        for (int i = 0; i < size; i++)
-            objs[i] = type.read(buffer);
-        return objs;
-    }
-
-    @Override
-    public int sizeOf(Object o) {
-        Object[] objs = (Object[]) o;
-        int size = 4;
-        for (int i = 0; i < objs.length; i++)
-            size += type.sizeOf(objs[i]);
-        return size;
-    }
-
-    public Type type() {
-        return type;
-    }
-
-    @Override
-    public String toString() {
-        return "ARRAY(" + type + ")";
-    }
-
-    @Override
-    public Object[] validate(Object item) {
-        try {
-            Object[] array = (Object[]) item;
-            for (int i = 0; i < array.length; i++)
-                type.validate(array[i]);
-            return array;
-        } catch (ClassCastException e) {
-            throw new SchemaException("Not an Object[].");
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/protocol/types/Field.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/protocol/types/Field.java b/clients/src/main/java/kafka/common/protocol/types/Field.java
deleted file mode 100644
index d018a12..0000000
--- a/clients/src/main/java/kafka/common/protocol/types/Field.java
+++ /dev/null
@@ -1,48 +0,0 @@
-package kafka.common.protocol.types;
-
-/**
- * A field in a schema
- */
-public class Field {
-
-    public static final Object NO_DEFAULT = new Object();
-
-    final int index;
-    public final String name;
-    public final Type type;
-    public final Object defaultValue;
-    public final String doc;
-    final Schema schema;
-
-    public Field(int index, String name, Type type, String doc, Object defaultValue, Schema schema) {
-        this.index = index;
-        this.name = name;
-        this.type = type;
-        this.doc = doc;
-        this.defaultValue = defaultValue;
-        this.schema = schema;
-        if (defaultValue != NO_DEFAULT)
-            type.validate(defaultValue);
-    }
-
-    public Field(int index, String name, Type type, String doc, Object defaultValue) {
-        this(index, name, type, doc, defaultValue, null);
-    }
-
-    public Field(String name, Type type, String doc, Object defaultValue) {
-        this(-1, name, type, doc, defaultValue);
-    }
-
-    public Field(String name, Type type, String doc) {
-        this(name, type, doc, NO_DEFAULT);
-    }
-
-    public Field(String name, Type type) {
-        this(name, type, "");
-    }
-
-    public Type type() {
-        return type;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/protocol/types/Schema.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/protocol/types/Schema.java b/clients/src/main/java/kafka/common/protocol/types/Schema.java
deleted file mode 100644
index b7b1c75..0000000
--- a/clients/src/main/java/kafka/common/protocol/types/Schema.java
+++ /dev/null
@@ -1,134 +0,0 @@
-package kafka.common.protocol.types;
-
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * The schema for a compound record definition
- */
-public class Schema extends Type {
-
-    private final Field[] fields;
-    private final Map<String, Field> fieldsByName;
-
-    public Schema(Field... fs) {
-        this.fields = new Field[fs.length];
-        this.fieldsByName = new HashMap<String, Field>();
-        for (int i = 0; i < this.fields.length; i++) {
-            Field field = fs[i];
-            if (fieldsByName.containsKey(field.name))
-                throw new SchemaException("Schema contains a duplicate field: " + field.name);
-            this.fields[i] = new Field(i, field.name, field.type, field.doc, field.defaultValue, this);
-            this.fieldsByName.put(fs[i].name, this.fields[i]);
-        }
-    }
-
-    /**
-     * Write a struct to the buffer
-     */
-    public void write(ByteBuffer buffer, Object o) {
-        Struct r = (Struct) o;
-        for (int i = 0; i < fields.length; i++) {
-            Field f = fields[i];
-            try {
-                Object value = f.type().validate(r.get(f));
-                f.type.write(buffer, value);
-            } catch (Exception e) {
-                throw new SchemaException("Error writing field '" + f.name + "': " + e.getMessage() == null ? e.getMessage() : e.getClass()
-                                                                                                                                .getName());
-            }
-        }
-    }
-
-    /**
-     * Read a struct from the buffer
-     */
-    public Object read(ByteBuffer buffer) {
-        Object[] objects = new Object[fields.length];
-        for (int i = 0; i < fields.length; i++)
-            objects[i] = fields[i].type.read(buffer);
-        return new Struct(this, objects);
-    }
-
-    /**
-     * The size of the given record
-     */
-    public int sizeOf(Object o) {
-        int size = 0;
-        Struct r = (Struct) o;
-        for (int i = 0; i < fields.length; i++)
-            size += fields[i].type.sizeOf(r.get(fields[i]));
-        return size;
-    }
-
-    /**
-     * The number of fields in this schema
-     */
-    public int numFields() {
-        return this.fields.length;
-    }
-
-    /**
-     * Get a field by its slot in the record array
-     * 
-     * @param slot The slot at which this field sits
-     * @return The field
-     */
-    public Field get(int slot) {
-        return this.fields[slot];
-    }
-
-    /**
-     * Get a field by its name
-     * 
-     * @param name The name of the field
-     * @return The field
-     */
-    public Field get(String name) {
-        return this.fieldsByName.get(name);
-    }
-
-    /**
-     * Get all the fields in this schema
-     */
-    public Field[] fields() {
-        return this.fields;
-    }
-
-    /**
-     * Display a string representation of the schema
-     */
-    public String toString() {
-        StringBuilder b = new StringBuilder();
-        b.append('{');
-        for (int i = 0; i < this.fields.length; i++) {
-            b.append(this.fields[i].name);
-            b.append(':');
-            b.append(this.fields[i].type());
-            if (i < this.fields.length - 1)
-                b.append(',');
-        }
-        b.append("}");
-        return b.toString();
-    }
-
-    @Override
-    public Struct validate(Object item) {
-        try {
-            Struct struct = (Struct) item;
-            for (int i = 0; i < this.fields.length; i++) {
-                Field field = this.fields[i];
-                try {
-                    field.type.validate(struct.get(field));
-                } catch (SchemaException e) {
-                    throw new SchemaException("Invalid value for field '" + field.name + "': " + e.getMessage());
-                }
-            }
-            return struct;
-        } catch (ClassCastException e) {
-            throw new SchemaException("Not a Struct.");
-        }
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/protocol/types/SchemaException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/protocol/types/SchemaException.java b/clients/src/main/java/kafka/common/protocol/types/SchemaException.java
deleted file mode 100644
index a2a2d50..0000000
--- a/clients/src/main/java/kafka/common/protocol/types/SchemaException.java
+++ /dev/null
@@ -1,13 +0,0 @@
-package kafka.common.protocol.types;
-
-import kafka.common.KafkaException;
-
-public class SchemaException extends KafkaException {
-
-    private static final long serialVersionUID = 1L;
-
-    public SchemaException(String message) {
-        super(message);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/protocol/types/Struct.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/protocol/types/Struct.java b/clients/src/main/java/kafka/common/protocol/types/Struct.java
deleted file mode 100644
index c83aefa..0000000
--- a/clients/src/main/java/kafka/common/protocol/types/Struct.java
+++ /dev/null
@@ -1,227 +0,0 @@
-package kafka.common.protocol.types;
-
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-
-/**
- * A record that can be serialized and deserialized according to a pre-defined schema
- */
-public class Struct {
-    private final Schema schema;
-    private final Object[] values;
-
-    Struct(Schema schema, Object[] values) {
-        this.schema = schema;
-        this.values = values;
-    }
-
-    public Struct(Schema schema) {
-        this.schema = schema;
-        this.values = new Object[this.schema.numFields()];
-    }
-
-    /**
-     * The schema for this struct.
-     */
-    public Schema schema() {
-        return this.schema;
-    }
-
-    /**
-     * Return the value of the given pre-validated field, or if the value is missing return the default value.
-     * 
-     * @param field The field for which to get the default value
-     * @throws SchemaException if the field has no value and has no default.
-     */
-    private Object getFieldOrDefault(Field field) {
-        Object value = this.values[field.index];
-        if (value != null)
-            return value;
-        else if (field.defaultValue != Field.NO_DEFAULT)
-            return field.defaultValue;
-        else
-            throw new SchemaException("Missing value for field '" + field.name + " which has no default value.");
-    }
-
-    /**
-     * Get the value for the field directly by the field index with no lookup needed (faster!)
-     * 
-     * @param field The field to look up
-     * @return The value for that field.
-     */
-    public Object get(Field field) {
-        validateField(field);
-        return getFieldOrDefault(field);
-    }
-
-    /**
-     * Get the record value for the field with the given name by doing a hash table lookup (slower!)
-     * 
-     * @param name The name of the field
-     * @return The value in the record
-     */
-    public Object get(String name) {
-        Field field = schema.get(name);
-        if (field == null)
-            throw new SchemaException("No such field: " + name);
-        return getFieldOrDefault(field);
-    }
-
-    public Struct getStruct(Field field) {
-        return (Struct) get(field);
-    }
-
-    public Struct getStruct(String name) {
-        return (Struct) get(name);
-    }
-
-    public Short getShort(Field field) {
-        return (Short) get(field);
-    }
-
-    public Short getShort(String name) {
-        return (Short) get(name);
-    }
-
-    public Integer getInt(Field field) {
-        return (Integer) get(field);
-    }
-
-    public Integer getInt(String name) {
-        return (Integer) get(name);
-    }
-
-    public Object[] getArray(Field field) {
-        return (Object[]) get(field);
-    }
-
-    public Object[] getArray(String name) {
-        return (Object[]) get(name);
-    }
-
-    public String getString(Field field) {
-        return (String) get(field);
-    }
-
-    public String getString(String name) {
-        return (String) get(name);
-    }
-
-    /**
-     * Set the given field to the specified value
-     * 
-     * @param field The field
-     * @param value The value
-     */
-    public Struct set(Field field, Object value) {
-        validateField(field);
-        this.values[field.index] = value;
-        return this;
-    }
-
-    /**
-     * Set the field specified by the given name to the value
-     * 
-     * @param name The name of the field
-     * @param value The value to set
-     */
-    public Struct set(String name, Object value) {
-        Field field = this.schema.get(name);
-        if (field == null)
-            throw new SchemaException("Unknown field: " + name);
-        this.values[field.index] = value;
-        return this;
-    }
-
-    /**
-     * Create a struct for the schema of a container type (struct or array)
-     * 
-     * @param field The field to create an instance of
-     * @return The struct
-     */
-    public Struct instance(Field field) {
-        validateField(field);
-        if (field.type() instanceof Schema) {
-            return new Struct((Schema) field.type());
-        } else if (field.type() instanceof ArrayOf) {
-            ArrayOf array = (ArrayOf) field.type();
-            return new Struct((Schema) array.type());
-        } else {
-            throw new SchemaException("Field " + field.name + " is not a container type, it is of type " + field.type());
-        }
-    }
-
-    /**
-     * Create a struct instance for the given field which must be a container type (struct or array)
-     * 
-     * @param field The name of the field to create (field must be a schema type)
-     * @return The struct
-     */
-    public Struct instance(String field) {
-        return instance(schema.get(field));
-    }
-
-    /**
-     * Empty all the values from this record
-     */
-    public void clear() {
-        Arrays.fill(this.values, null);
-    }
-
-    /**
-     * Get the serialized size of this object
-     */
-    public int sizeOf() {
-        return this.schema.sizeOf(this);
-    }
-
-    /**
-     * Write this struct to a buffer
-     */
-    public void writeTo(ByteBuffer buffer) {
-        this.schema.write(buffer, this);
-    }
-
-    /**
-     * Ensure the user doesn't try to access fields from the wrong schema
-     */
-    private void validateField(Field field) {
-        if (this.schema != field.schema)
-            throw new SchemaException("Attempt to access field '" + field.name + " from a different schema instance.");
-        if (field.index > values.length)
-            throw new SchemaException("Invalid field index: " + field.index);
-    }
-
-    /**
-     * Validate the contents of this struct against its schema
-     */
-    public void validate() {
-        this.schema.validate(this);
-    }
-
-    /**
-     * Create a byte buffer containing the serialized form of the values in this struct. This method can choose to break
-     * the struct into multiple ByteBuffers if need be.
-     */
-    public ByteBuffer[] toBytes() {
-        ByteBuffer buffer = ByteBuffer.allocate(sizeOf());
-        writeTo(buffer);
-        return new ByteBuffer[] { buffer };
-    }
-
-    @Override
-    public String toString() {
-        StringBuilder b = new StringBuilder();
-        b.append('{');
-        for (int i = 0; i < this.values.length; i++) {
-            b.append(this.schema.get(i).name);
-            b.append('=');
-            b.append(this.values[i]);
-            if (i < this.values.length - 1)
-                b.append(',');
-        }
-        b.append('}');
-        return b.toString();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/protocol/types/Type.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/protocol/types/Type.java b/clients/src/main/java/kafka/common/protocol/types/Type.java
deleted file mode 100644
index f4c93e3..0000000
--- a/clients/src/main/java/kafka/common/protocol/types/Type.java
+++ /dev/null
@@ -1,216 +0,0 @@
-package kafka.common.protocol.types;
-
-import java.nio.ByteBuffer;
-
-import kafka.common.utils.Utils;
-
-/**
- * A serializable type
- */
-public abstract class Type {
-
-    public abstract void write(ByteBuffer buffer, Object o);
-
-    public abstract Object read(ByteBuffer buffer);
-
-    public abstract int sizeOf(Object o);
-
-    public abstract Object validate(Object o);
-
-    public static final Type INT8 = new Type() {
-        @Override
-        public void write(ByteBuffer buffer, Object o) {
-            buffer.put((Byte) o);
-        }
-
-        @Override
-        public Object read(ByteBuffer buffer) {
-            return buffer.get();
-        }
-
-        @Override
-        public int sizeOf(Object o) {
-            return 1;
-        }
-
-        @Override
-        public String toString() {
-            return "INT8";
-        }
-
-        @Override
-        public Byte validate(Object item) {
-            if (item instanceof Byte)
-                return (Byte) item;
-            else
-                throw new SchemaException(item + " is not a Byte.");
-        }
-    };
-
-    public static final Type INT16 = new Type() {
-        @Override
-        public void write(ByteBuffer buffer, Object o) {
-            buffer.putShort((Short) o);
-        }
-
-        @Override
-        public Object read(ByteBuffer buffer) {
-            return buffer.getShort();
-        }
-
-        @Override
-        public int sizeOf(Object o) {
-            return 2;
-        }
-
-        @Override
-        public String toString() {
-            return "INT16";
-        }
-
-        @Override
-        public Short validate(Object item) {
-            if (item instanceof Short)
-                return (Short) item;
-            else
-                throw new SchemaException(item + " is not a Short.");
-        }
-    };
-
-    public static final Type INT32 = new Type() {
-        @Override
-        public void write(ByteBuffer buffer, Object o) {
-            buffer.putInt((Integer) o);
-        }
-
-        @Override
-        public Object read(ByteBuffer buffer) {
-            return buffer.getInt();
-        }
-
-        @Override
-        public int sizeOf(Object o) {
-            return 4;
-        }
-
-        @Override
-        public String toString() {
-            return "INT32";
-        }
-
-        @Override
-        public Integer validate(Object item) {
-            if (item instanceof Integer)
-                return (Integer) item;
-            else
-                throw new SchemaException(item + " is not an Integer.");
-        }
-    };
-
-    public static final Type INT64 = new Type() {
-        @Override
-        public void write(ByteBuffer buffer, Object o) {
-            buffer.putLong((Long) o);
-        }
-
-        @Override
-        public Object read(ByteBuffer buffer) {
-            return buffer.getLong();
-        }
-
-        @Override
-        public int sizeOf(Object o) {
-            return 8;
-        }
-
-        @Override
-        public String toString() {
-            return "INT64";
-        }
-
-        @Override
-        public Long validate(Object item) {
-            if (item instanceof Long)
-                return (Long) item;
-            else
-                throw new SchemaException(item + " is not a Long.");
-        }
-    };
-
-    public static final Type STRING = new Type() {
-        @Override
-        public void write(ByteBuffer buffer, Object o) {
-            byte[] bytes = Utils.utf8((String) o);
-            if (bytes.length > Short.MAX_VALUE)
-                throw new SchemaException("String is longer than the maximum string length.");
-            buffer.putShort((short) bytes.length);
-            buffer.put(bytes);
-        }
-
-        @Override
-        public Object read(ByteBuffer buffer) {
-            int length = buffer.getShort();
-            byte[] bytes = new byte[length];
-            buffer.get(bytes);
-            return Utils.utf8(bytes);
-        }
-
-        @Override
-        public int sizeOf(Object o) {
-            return 2 + Utils.utf8Length((String) o);
-        }
-
-        @Override
-        public String toString() {
-            return "STRING";
-        }
-
-        @Override
-        public String validate(Object item) {
-            if (item instanceof String)
-                return (String) item;
-            else
-                throw new SchemaException(item + " is not a String.");
-        }
-    };
-
-    public static final Type BYTES = new Type() {
-        @Override
-        public void write(ByteBuffer buffer, Object o) {
-            ByteBuffer arg = (ByteBuffer) o;
-            int pos = arg.position();
-            buffer.putInt(arg.remaining());
-            buffer.put(arg);
-            arg.position(pos);
-        }
-
-        @Override
-        public Object read(ByteBuffer buffer) {
-            int size = buffer.getInt();
-            ByteBuffer val = buffer.slice();
-            val.limit(size);
-            buffer.position(buffer.position() + size);
-            return val;
-        }
-
-        @Override
-        public int sizeOf(Object o) {
-            ByteBuffer buffer = (ByteBuffer) o;
-            return 4 + buffer.remaining();
-        }
-
-        @Override
-        public String toString() {
-            return "BYTES";
-        }
-
-        @Override
-        public ByteBuffer validate(Object item) {
-            if (item instanceof ByteBuffer)
-                return (ByteBuffer) item;
-            else
-                throw new SchemaException(item + " is not a java.nio.ByteBuffer.");
-        }
-    };
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/record/CompressionType.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/record/CompressionType.java b/clients/src/main/java/kafka/common/record/CompressionType.java
deleted file mode 100644
index f6d9026..0000000
--- a/clients/src/main/java/kafka/common/record/CompressionType.java
+++ /dev/null
@@ -1,40 +0,0 @@
-package kafka.common.record;
-
-/**
- * The compression type to use
- */
-public enum CompressionType {
-    NONE(0, "none"), GZIP(1, "gzip"), SNAPPY(2, "snappy");
-
-    public final int id;
-    public final String name;
-
-    private CompressionType(int id, String name) {
-        this.id = id;
-        this.name = name;
-    }
-
-    public static CompressionType forId(int id) {
-        switch (id) {
-            case 0:
-                return NONE;
-            case 1:
-                return GZIP;
-            case 2:
-                return SNAPPY;
-            default:
-                throw new IllegalArgumentException("Unknown compression type id: " + id);
-        }
-    }
-
-    public static CompressionType forName(String name) {
-        if (NONE.name.equals(name))
-            return NONE;
-        else if (GZIP.name.equals(name))
-            return GZIP;
-        else if (SNAPPY.name.equals(name))
-            return SNAPPY;
-        else
-            throw new IllegalArgumentException("Unknown compression name: " + name);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/record/InvalidRecordException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/record/InvalidRecordException.java b/clients/src/main/java/kafka/common/record/InvalidRecordException.java
deleted file mode 100644
index 97fbe50..0000000
--- a/clients/src/main/java/kafka/common/record/InvalidRecordException.java
+++ /dev/null
@@ -1,11 +0,0 @@
-package kafka.common.record;
-
-public class InvalidRecordException extends RuntimeException {
-
-    private static final long serialVersionUID = 1;
-
-    public InvalidRecordException(String s) {
-        super(s);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/record/LogEntry.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/record/LogEntry.java b/clients/src/main/java/kafka/common/record/LogEntry.java
deleted file mode 100644
index f5e99c9..0000000
--- a/clients/src/main/java/kafka/common/record/LogEntry.java
+++ /dev/null
@@ -1,28 +0,0 @@
-package kafka.common.record;
-
-/**
- * An offset and record pair
- */
-public final class LogEntry {
-
-    private final long offset;
-    private final Record record;
-
-    public LogEntry(long offset, Record record) {
-        this.offset = offset;
-        this.record = record;
-    }
-
-    public long offset() {
-        return this.offset;
-    }
-
-    public Record record() {
-        return this.record;
-    }
-
-    @Override
-    public String toString() {
-        return "LogEntry(" + offset + ", " + record + ")";
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/record/MemoryRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/record/MemoryRecords.java b/clients/src/main/java/kafka/common/record/MemoryRecords.java
deleted file mode 100644
index d3f8426..0000000
--- a/clients/src/main/java/kafka/common/record/MemoryRecords.java
+++ /dev/null
@@ -1,102 +0,0 @@
-package kafka.common.record;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.GatheringByteChannel;
-import java.util.Iterator;
-
-import kafka.common.utils.AbstractIterator;
-
-/**
- * A {@link Records} implementation backed by a ByteBuffer.
- */
-public class MemoryRecords implements Records {
-
-    private final ByteBuffer buffer;
-
-    public MemoryRecords(int size) {
-        this(ByteBuffer.allocate(size));
-    }
-
-    public MemoryRecords(ByteBuffer buffer) {
-        this.buffer = buffer;
-    }
-
-    /**
-     * Append the given record and offset to the buffer
-     */
-    public void append(long offset, Record record) {
-        buffer.putLong(offset);
-        buffer.putInt(record.size());
-        buffer.put(record.buffer());
-        record.buffer().rewind();
-    }
-
-    /**
-     * Append a new record and offset to the buffer
-     */
-    public void append(long offset, byte[] key, byte[] value, CompressionType type) {
-        buffer.putLong(offset);
-        buffer.putInt(Record.recordSize(key, value));
-        Record.write(this.buffer, key, value, type);
-    }
-
-    /**
-     * Check if we have room for a new record containing the given key/value pair
-     */
-    public boolean hasRoomFor(byte[] key, byte[] value) {
-        return this.buffer.remaining() >= Records.LOG_OVERHEAD + Record.recordSize(key, value);
-    }
-
-    /** Write the records in this set to the given channel */
-    public int writeTo(GatheringByteChannel channel) throws IOException {
-        return channel.write(buffer);
-    }
-
-    /**
-     * The size of this record set
-     */
-    public int sizeInBytes() {
-        return this.buffer.position();
-    }
-
-    /**
-     * Get the byte buffer that backs this records instance
-     */
-    public ByteBuffer buffer() {
-        return buffer.duplicate();
-    }
-
-    @Override
-    public Iterator<LogEntry> iterator() {
-        return new RecordsIterator(this.buffer);
-    }
-
-    /* TODO: allow reuse of the buffer used for iteration */
-    public static class RecordsIterator extends AbstractIterator<LogEntry> {
-        private final ByteBuffer buffer;
-
-        public RecordsIterator(ByteBuffer buffer) {
-            ByteBuffer copy = buffer.duplicate();
-            copy.flip();
-            this.buffer = copy;
-        }
-
-        @Override
-        protected LogEntry makeNext() {
-            if (buffer.remaining() < Records.LOG_OVERHEAD)
-                return allDone();
-            long offset = buffer.getLong();
-            int size = buffer.getInt();
-            if (size < 0)
-                throw new IllegalStateException("Record with size " + size);
-            if (buffer.remaining() < size)
-                return allDone();
-            ByteBuffer rec = buffer.slice();
-            rec.limit(size);
-            this.buffer.position(this.buffer.position() + size);
-            return new LogEntry(offset, new Record(rec));
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/record/Record.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/record/Record.java b/clients/src/main/java/kafka/common/record/Record.java
deleted file mode 100644
index b89accf..0000000
--- a/clients/src/main/java/kafka/common/record/Record.java
+++ /dev/null
@@ -1,286 +0,0 @@
-package kafka.common.record;
-
-import java.nio.ByteBuffer;
-
-import kafka.common.utils.Utils;
-
-/**
- * A record: a serialized key and value along with the associated CRC and other fields
- */
-public final class Record {
-
-    /**
-     * The current offset and size for all the fixed-length fields
-     */
-    public static final int CRC_OFFSET = 0;
-    public static final int CRC_LENGTH = 4;
-    public static final int MAGIC_OFFSET = CRC_OFFSET + CRC_LENGTH;
-    public static final int MAGIC_LENGTH = 1;
-    public static final int ATTRIBUTES_OFFSET = MAGIC_OFFSET + MAGIC_LENGTH;
-    public static final int ATTRIBUTE_LENGTH = 1;
-    public static final int KEY_SIZE_OFFSET = ATTRIBUTES_OFFSET + ATTRIBUTE_LENGTH;
-    public static final int KEY_SIZE_LENGTH = 4;
-    public static final int KEY_OFFSET = KEY_SIZE_OFFSET + KEY_SIZE_LENGTH;
-    public static final int VALUE_SIZE_LENGTH = 4;
-
-    /** The amount of overhead bytes in a record */
-    public static final int RECORD_OVERHEAD = KEY_OFFSET + VALUE_SIZE_LENGTH;
-
-    /**
-     * The minimum valid size for the record header
-     */
-    public static final int MIN_HEADER_SIZE = CRC_LENGTH + MAGIC_LENGTH + ATTRIBUTE_LENGTH + KEY_SIZE_LENGTH + VALUE_SIZE_LENGTH;
-
-    /**
-     * The current "magic" value
-     */
-    public static final byte CURRENT_MAGIC_VALUE = 0;
-
-    /**
-     * Specifies the mask for the compression code. 2 bits to hold the compression codec. 0 is reserved to indicate no
-     * compression
-     */
-    public static final int COMPRESSION_CODEC_MASK = 0x03;
-
-    /**
-     * Compression code for uncompressed records
-     */
-    public static final int NO_COMPRESSION = 0;
-
-    private final ByteBuffer buffer;
-
-    public Record(ByteBuffer buffer) {
-        this.buffer = buffer;
-    }
-
-    /**
-     * A constructor to create a LogRecord
-     * 
-     * @param key The key of the record (null, if none)
-     * @param value The record value
-     * @param codec The compression codec used on the contents of the record (if any)
-     * @param valueOffset The offset into the payload array used to extract payload
-     * @param valueSize The size of the payload to use
-     */
-    public Record(byte[] key, byte[] value, CompressionType codec, int valueOffset, int valueSize) {
-        this(ByteBuffer.allocate(recordSize(key == null ? 0 : key.length, value == null ? 0 : valueSize >= 0 ? valueSize
-                                                                                                            : value.length - valueOffset)));
-        write(this.buffer, key, value, codec, valueOffset, valueSize);
-        this.buffer.rewind();
-    }
-
-    public Record(byte[] key, byte[] value, CompressionType codec) {
-        this(key, value, codec, 0, -1);
-    }
-
-    public Record(byte[] value, CompressionType codec) {
-        this(null, value, codec);
-    }
-
-    public Record(byte[] key, byte[] value) {
-        this(key, value, CompressionType.NONE);
-    }
-
-    public Record(byte[] value) {
-        this(null, value, CompressionType.NONE);
-    }
-
-    public static void write(ByteBuffer buffer, byte[] key, byte[] value, CompressionType codec, int valueOffset, int valueSize) {
-        // skip crc, we will fill that in at the end
-        int pos = buffer.position();
-        buffer.position(pos + MAGIC_OFFSET);
-        buffer.put(CURRENT_MAGIC_VALUE);
-        byte attributes = 0;
-        if (codec.id > 0)
-            attributes = (byte) (attributes | (COMPRESSION_CODEC_MASK & codec.id));
-        buffer.put(attributes);
-        // write the key
-        if (key == null) {
-            buffer.putInt(-1);
-        } else {
-            buffer.putInt(key.length);
-            buffer.put(key, 0, key.length);
-        }
-        // write the value
-        if (value == null) {
-            buffer.putInt(-1);
-        } else {
-            int size = valueSize >= 0 ? valueSize : (value.length - valueOffset);
-            buffer.putInt(size);
-            buffer.put(value, valueOffset, size);
-        }
-
-        // now compute the checksum and fill it in
-        long crc = computeChecksum(buffer,
-                                   buffer.arrayOffset() + pos + MAGIC_OFFSET,
-                                   buffer.position() - pos - MAGIC_OFFSET - buffer.arrayOffset());
-        Utils.writeUnsignedInt(buffer, pos + CRC_OFFSET, crc);
-    }
-
-    public static void write(ByteBuffer buffer, byte[] key, byte[] value, CompressionType codec) {
-        write(buffer, key, value, codec, 0, -1);
-    }
-
-    public static int recordSize(byte[] key, byte[] value) {
-        return recordSize(key == null ? 0 : key.length, value == null ? 0 : value.length);
-    }
-
-    public static int recordSize(int keySize, int valueSize) {
-        return CRC_LENGTH + MAGIC_LENGTH + ATTRIBUTE_LENGTH + KEY_SIZE_LENGTH + keySize + VALUE_SIZE_LENGTH + valueSize;
-    }
-
-    public ByteBuffer buffer() {
-        return this.buffer;
-    }
-
-    /**
-     * Compute the checksum of the record from the record contents
-     */
-    public static long computeChecksum(ByteBuffer buffer, int position, int size) {
-        return Utils.crc32(buffer.array(), buffer.arrayOffset() + position, size - buffer.arrayOffset());
-    }
-
-    /**
-     * Compute the checksum of the record from the record contents
-     */
-    public long computeChecksum() {
-        return computeChecksum(buffer, MAGIC_OFFSET, buffer.limit() - MAGIC_OFFSET);
-    }
-
-    /**
-     * Retrieve the previously computed CRC for this record
-     */
-    public long checksum() {
-        return Utils.readUnsignedInt(buffer, CRC_OFFSET);
-    }
-
-    /**
-     * Returns true if the crc stored with the record matches the crc computed off the record contents
-     */
-    public boolean isValid() {
-        return checksum() == computeChecksum();
-    }
-
-    /**
-     * Throw an InvalidRecordException if isValid is false for this record
-     */
-    public void ensureValid() {
-        if (!isValid())
-            throw new InvalidRecordException("Record is corrupt (stored crc = " + checksum()
-                                             + ", computed crc = "
-                                             + computeChecksum()
-                                             + ")");
-    }
-
-    /**
-     * The complete serialized size of this record in bytes (including crc, header attributes, etc)
-     */
-    public int size() {
-        return buffer.limit();
-    }
-
-    /**
-     * The length of the key in bytes
-     */
-    public int keySize() {
-        return buffer.getInt(KEY_SIZE_OFFSET);
-    }
-
-    /**
-     * Does the record have a key?
-     */
-    public boolean hasKey() {
-        return keySize() >= 0;
-    }
-
-    /**
-     * The position where the value size is stored
-     */
-    private int valueSizeOffset() {
-        return KEY_OFFSET + Math.max(0, keySize());
-    }
-
-    /**
-     * The length of the value in bytes
-     */
-    public int valueSize() {
-        return buffer.getInt(valueSizeOffset());
-    }
-
-    /**
-     * The magic version of this record
-     */
-    public byte magic() {
-        return buffer.get(MAGIC_OFFSET);
-    }
-
-    /**
-     * The attributes stored with this record
-     */
-    public byte attributes() {
-        return buffer.get(ATTRIBUTES_OFFSET);
-    }
-
-    /**
-     * The compression codec used with this record
-     */
-    public CompressionType compressionType() {
-        return CompressionType.forId(buffer.get(ATTRIBUTES_OFFSET) & COMPRESSION_CODEC_MASK);
-    }
-
-    /**
-     * A ByteBuffer containing the value of this record
-     */
-    public ByteBuffer value() {
-        return sliceDelimited(valueSizeOffset());
-    }
-
-    /**
-     * A ByteBuffer containing the message key
-     */
-    public ByteBuffer key() {
-        return sliceDelimited(KEY_SIZE_OFFSET);
-    }
-
-    /**
-     * Read a size-delimited byte buffer starting at the given offset
-     */
-    private ByteBuffer sliceDelimited(int start) {
-        int size = buffer.getInt(start);
-        if (size < 0) {
-            return null;
-        } else {
-            ByteBuffer b = buffer.duplicate();
-            b.position(start + 4);
-            b = b.slice();
-            b.limit(size);
-            b.rewind();
-            return b;
-        }
-    }
-
-    public String toString() {
-        return String.format("Record(magic = %d, attributes = %d, crc = %d, key = %d bytes, value = %d bytes)",
-                             magic(),
-                             attributes(),
-                             checksum(),
-                             key().limit(),
-                             value().limit());
-    }
-
-    public boolean equals(Object other) {
-        if (this == other)
-            return true;
-        if (other == null)
-            return false;
-        if (!other.getClass().equals(Record.class))
-            return false;
-        Record record = (Record) other;
-        return this.buffer.equals(record.buffer);
-    }
-
-    public int hashCode() {
-        return buffer.hashCode();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/record/Records.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/record/Records.java b/clients/src/main/java/kafka/common/record/Records.java
deleted file mode 100644
index 6531ca0..0000000
--- a/clients/src/main/java/kafka/common/record/Records.java
+++ /dev/null
@@ -1,29 +0,0 @@
-package kafka.common.record;
-
-import java.io.IOException;
-import java.nio.channels.GatheringByteChannel;
-
-/**
- * A binary format which consists of a 4 byte size, an 8 byte offset, and the record bytes. See {@link MemoryRecords}
- * for the in-memory representation.
- */
-public interface Records extends Iterable<LogEntry> {
-
-    int SIZE_LENGTH = 4;
-    int OFFSET_LENGTH = 8;
-    int LOG_OVERHEAD = SIZE_LENGTH + OFFSET_LENGTH;
-
-    /**
-     * Write these records to the given channel
-     * @param channel The channel to write to
-     * @return The number of bytes written
-     * @throws IOException If the write fails.
-     */
-    public int writeTo(GatheringByteChannel channel) throws IOException;
-
-    /**
-     * The size of these records in bytes
-     */
-    public int sizeInBytes();
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/requests/RequestHeader.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/requests/RequestHeader.java b/clients/src/main/java/kafka/common/requests/RequestHeader.java
deleted file mode 100644
index 4ce67f8..0000000
--- a/clients/src/main/java/kafka/common/requests/RequestHeader.java
+++ /dev/null
@@ -1,68 +0,0 @@
-package kafka.common.requests;
-
-import static kafka.common.protocol.Protocol.REQUEST_HEADER;
-
-import java.nio.ByteBuffer;
-
-import kafka.common.protocol.ProtoUtils;
-import kafka.common.protocol.Protocol;
-import kafka.common.protocol.types.Field;
-import kafka.common.protocol.types.Struct;
-
-/**
- * The header for a request in the Kafka protocol
- */
-public class RequestHeader {
-
-    private static Field API_KEY_FIELD = REQUEST_HEADER.get("api_key");
-    private static Field API_VERSION_FIELD = REQUEST_HEADER.get("api_version");
-    private static Field CLIENT_ID_FIELD = REQUEST_HEADER.get("client_id");
-    private static Field CORRELATION_ID_FIELD = REQUEST_HEADER.get("correlation_id");
-
-    private final Struct header;
-
-    public RequestHeader(Struct header) {
-        super();
-        this.header = header;
-    }
-
-    public RequestHeader(short apiKey, String client, int correlation) {
-        this(apiKey, ProtoUtils.latestVersion(apiKey), client, correlation);
-    }
-
-    public RequestHeader(short apiKey, short version, String client, int correlation) {
-        this(new Struct(Protocol.REQUEST_HEADER));
-        this.header.set(API_KEY_FIELD, apiKey);
-        this.header.set(API_VERSION_FIELD, version);
-        this.header.set(CLIENT_ID_FIELD, client);
-        this.header.set(CORRELATION_ID_FIELD, correlation);
-    }
-
-    public short apiKey() {
-        return (Short) this.header.get(API_KEY_FIELD);
-    }
-
-    public short apiVersion() {
-        return (Short) this.header.get(API_VERSION_FIELD);
-    }
-
-    public String clientId() {
-        return (String) this.header.get(CLIENT_ID_FIELD);
-    }
-
-    public int correlationId() {
-        return (Integer) this.header.get(CORRELATION_ID_FIELD);
-    }
-
-    public static RequestHeader parse(ByteBuffer buffer) {
-        return new RequestHeader((Struct) Protocol.REQUEST_HEADER.read(buffer));
-    }
-
-    public void writeTo(ByteBuffer buffer) {
-        header.writeTo(buffer);
-    }
-
-    public int sizeOf() {
-        return header.sizeOf();
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/requests/RequestSend.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/requests/RequestSend.java b/clients/src/main/java/kafka/common/requests/RequestSend.java
deleted file mode 100644
index f6a9a86..0000000
--- a/clients/src/main/java/kafka/common/requests/RequestSend.java
+++ /dev/null
@@ -1,38 +0,0 @@
-package kafka.common.requests;
-
-import java.nio.ByteBuffer;
-
-import kafka.common.network.NetworkSend;
-import kafka.common.protocol.types.Struct;
-
-/**
- * A send object for a kafka request
- */
-public class RequestSend extends NetworkSend {
-
-    private final RequestHeader header;
-    private final Struct body;
-
-    public RequestSend(int destination, RequestHeader header, Struct body) {
-        super(destination, serialize(header, body));
-        this.header = header;
-        this.body = body;
-    }
-
-    private static ByteBuffer serialize(RequestHeader header, Struct body) {
-        ByteBuffer buffer = ByteBuffer.allocate(header.sizeOf() + body.sizeOf());
-        header.writeTo(buffer);
-        body.writeTo(buffer);
-        buffer.rewind();
-        return buffer;
-    }
-
-    public RequestHeader header() {
-        return this.header;
-    }
-
-    public Struct body() {
-        return body;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/requests/ResponseHeader.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/requests/ResponseHeader.java b/clients/src/main/java/kafka/common/requests/ResponseHeader.java
deleted file mode 100644
index 1ef8e15..0000000
--- a/clients/src/main/java/kafka/common/requests/ResponseHeader.java
+++ /dev/null
@@ -1,45 +0,0 @@
-package kafka.common.requests;
-
-import static kafka.common.protocol.Protocol.RESPONSE_HEADER;
-
-import java.nio.ByteBuffer;
-
-import kafka.common.protocol.Protocol;
-import kafka.common.protocol.types.Field;
-import kafka.common.protocol.types.Struct;
-
-/**
- * A response header in the kafka protocol.
- */
-public class ResponseHeader {
-
-    private static Field CORRELATION_KEY_FIELD = RESPONSE_HEADER.get("correlation_id");
-
-    private final Struct header;
-
-    public ResponseHeader(Struct header) {
-        this.header = header;
-    }
-
-    public ResponseHeader(int correlationId) {
-        this(new Struct(Protocol.RESPONSE_HEADER));
-        this.header.set(CORRELATION_KEY_FIELD, correlationId);
-    }
-
-    public int correlationId() {
-        return (Integer) header.get(CORRELATION_KEY_FIELD);
-    }
-
-    public void writeTo(ByteBuffer buffer) {
-        header.writeTo(buffer);
-    }
-
-    public int sizeOf() {
-        return header.sizeOf();
-    }
-
-    public static ResponseHeader parse(ByteBuffer buffer) {
-        return new ResponseHeader(((Struct) Protocol.RESPONSE_HEADER.read(buffer)));
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/utils/AbstractIterator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/utils/AbstractIterator.java b/clients/src/main/java/kafka/common/utils/AbstractIterator.java
deleted file mode 100644
index f3190d7..0000000
--- a/clients/src/main/java/kafka/common/utils/AbstractIterator.java
+++ /dev/null
@@ -1,72 +0,0 @@
-package kafka.common.utils;
-
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
-/**
- * A base class that simplifies implementing an iterator
- * @param <T> The type of thing we are iterating over
- */
-public abstract class AbstractIterator<T> implements Iterator<T> {
-
-    private static enum State {
-        READY, NOT_READY, DONE, FAILED
-    };
-
-    private State state = State.NOT_READY;
-    private T next;
-
-    @Override
-    public boolean hasNext() {
-        switch (state) {
-            case FAILED:
-                throw new IllegalStateException("Iterator is in failed state");
-            case DONE:
-                return false;
-            case READY:
-                return true;
-            default:
-                return maybeComputeNext();
-        }
-    }
-
-    @Override
-    public T next() {
-        if (!hasNext())
-            throw new NoSuchElementException();
-        state = State.NOT_READY;
-        if (next == null)
-            throw new IllegalStateException("Expected item but none found.");
-        return next;
-    }
-
-    @Override
-    public void remove() {
-        throw new UnsupportedOperationException("Removal not supported");
-    }
-
-    public T peek() {
-        if (!hasNext())
-            throw new NoSuchElementException();
-        return next;
-    }
-
-    protected T allDone() {
-        state = State.DONE;
-        return null;
-    }
-
-    protected abstract T makeNext();
-
-    private Boolean maybeComputeNext() {
-        state = State.FAILED;
-        next = makeNext();
-        if (state == State.DONE) {
-            return false;
-        } else {
-            state = State.READY;
-            return true;
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/utils/CopyOnWriteMap.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/utils/CopyOnWriteMap.java b/clients/src/main/java/kafka/common/utils/CopyOnWriteMap.java
deleted file mode 100644
index e45df98..0000000
--- a/clients/src/main/java/kafka/common/utils/CopyOnWriteMap.java
+++ /dev/null
@@ -1,130 +0,0 @@
-package kafka.common.utils;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
-
-/**
- * A simple read-optimized map implementation that synchronizes only writes and does a fully copy on each modification
- */
-public class CopyOnWriteMap<K, V> implements ConcurrentMap<K, V> {
-
-    private volatile Map<K, V> map;
-
-    public CopyOnWriteMap() {
-        this.map = Collections.emptyMap();
-    }
-
-    public CopyOnWriteMap(Map<K, V> map) {
-        this.map = Collections.unmodifiableMap(map);
-    }
-
-    @Override
-    public boolean containsKey(Object k) {
-        return map.containsKey(k);
-    }
-
-    @Override
-    public boolean containsValue(Object v) {
-        return map.containsValue(v);
-    }
-
-    @Override
-    public Set<java.util.Map.Entry<K, V>> entrySet() {
-        return map.entrySet();
-    }
-
-    @Override
-    public V get(Object k) {
-        return map.get(k);
-    }
-
-    @Override
-    public boolean isEmpty() {
-        return map.isEmpty();
-    }
-
-    @Override
-    public Set<K> keySet() {
-        return map.keySet();
-    }
-
-    @Override
-    public int size() {
-        return map.size();
-    }
-
-    @Override
-    public Collection<V> values() {
-        return map.values();
-    }
-
-    @Override
-    public synchronized void clear() {
-        this.map = Collections.emptyMap();
-    }
-
-    @Override
-    public synchronized V put(K k, V v) {
-        Map<K, V> copy = new HashMap<K, V>(this.map);
-        V prev = copy.put(k, v);
-        this.map = Collections.unmodifiableMap(copy);
-        return prev;
-    }
-
-    @Override
-    public synchronized void putAll(Map<? extends K, ? extends V> entries) {
-        Map<K, V> copy = new HashMap<K, V>(this.map);
-        copy.putAll(entries);
-        this.map = Collections.unmodifiableMap(copy);
-    }
-
-    @Override
-    public synchronized V remove(Object key) {
-        Map<K, V> copy = new HashMap<K, V>(this.map);
-        V prev = copy.remove(key);
-        this.map = Collections.unmodifiableMap(copy);
-        return prev;
-    }
-
-    @Override
-    public synchronized V putIfAbsent(K k, V v) {
-        if (!containsKey(k))
-            return put(k, v);
-        else
-            return get(k);
-    }
-
-    @Override
-    public synchronized boolean remove(Object k, Object v) {
-        if (containsKey(k) && get(k).equals(v)) {
-            remove(k);
-            return true;
-        } else {
-            return false;
-        }
-    }
-
-    @Override
-    public synchronized boolean replace(K k, V original, V replacement) {
-        if (containsKey(k) && get(k).equals(original)) {
-            put(k, replacement);
-            return true;
-        } else {
-            return false;
-        }
-    }
-
-    @Override
-    public synchronized V replace(K k, V v) {
-        if (containsKey(k)) {
-            return put(k, v);
-        } else {
-            return null;
-        }
-    }
-
-}