You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2015/08/15 01:01:10 UTC

[6/7] kafka git commit: KAFKA-2366; Initial patch for Copycat

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/data/src/main/java/org/apache/kafka/copycat/data/DataRuntimeException.java
----------------------------------------------------------------------
diff --git a/copycat/data/src/main/java/org/apache/kafka/copycat/data/DataRuntimeException.java b/copycat/data/src/main/java/org/apache/kafka/copycat/data/DataRuntimeException.java
new file mode 100644
index 0000000..855c0fd
--- /dev/null
+++ b/copycat/data/src/main/java/org/apache/kafka/copycat/data/DataRuntimeException.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+
+
+package org.apache.kafka.copycat.data;
+
+/** Base Avro exception. */
+public class DataRuntimeException extends RuntimeException {
+    public DataRuntimeException(Throwable cause) {
+        super(cause);
+    }
+
+    public DataRuntimeException(String message) {
+        super(message);
+    }
+
+    public DataRuntimeException(String message, Throwable cause) {
+        super(message, cause);
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/data/src/main/java/org/apache/kafka/copycat/data/DataTypeException.java
----------------------------------------------------------------------
diff --git a/copycat/data/src/main/java/org/apache/kafka/copycat/data/DataTypeException.java b/copycat/data/src/main/java/org/apache/kafka/copycat/data/DataTypeException.java
new file mode 100644
index 0000000..6a74d88
--- /dev/null
+++ b/copycat/data/src/main/java/org/apache/kafka/copycat/data/DataTypeException.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+
+
+package org.apache.kafka.copycat.data;
+
+
+/** Thrown when an illegal type is used. */
+public class DataTypeException extends DataRuntimeException {
+    public DataTypeException(String message) {
+        super(message);
+    }
+
+    public DataTypeException(String message, Throwable cause) {
+        super(message, cause);
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/data/src/main/java/org/apache/kafka/copycat/data/ObjectProperties.java
----------------------------------------------------------------------
diff --git a/copycat/data/src/main/java/org/apache/kafka/copycat/data/ObjectProperties.java b/copycat/data/src/main/java/org/apache/kafka/copycat/data/ObjectProperties.java
new file mode 100644
index 0000000..e995b7f
--- /dev/null
+++ b/copycat/data/src/main/java/org/apache/kafka/copycat/data/ObjectProperties.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+
+
+package org.apache.kafka.copycat.data;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Base class for objects that have Object-valued properties.
+ */
+public abstract class ObjectProperties {
+    public static class Null {
+        private Null() {
+        }
+    }
+
+    /** A value representing a JSON <code>null</code>. */
+    public static final Null NULL_VALUE = new Null();
+
+    Map<String, Object> props = new LinkedHashMap<String, Object>(1);
+
+    private Set<String> reserved;
+
+    ObjectProperties(Set<String> reserved) {
+        this.reserved = reserved;
+    }
+
+    /**
+     * Returns the value of the named, string-valued property in this schema.
+     * Returns <tt>null</tt> if there is no string-valued property with that name.
+     */
+    public String getProp(String name) {
+        Object value = getObjectProp(name);
+        return (value instanceof String) ? (String) value : null;
+    }
+
+    /**
+     * Returns the value of the named property in this schema.
+     * Returns <tt>null</tt> if there is no property with that name.
+     */
+    public synchronized Object getObjectProp(String name) {
+        return props.get(name);
+    }
+
+    /**
+     * Adds a property with the given name <tt>name</tt> and
+     * value <tt>value</tt>. Neither <tt>name</tt> nor <tt>value</tt> can be
+     * <tt>null</tt>. It is illegal to add a property if another with
+     * the same name but different value already exists in this schema.
+     *
+     * @param name The name of the property to add
+     * @param value The value for the property to add
+     */
+    public synchronized void addProp(String name, Object value) {
+        if (reserved.contains(name))
+            throw new DataRuntimeException("Can't set reserved property: " + name);
+
+        if (value == null)
+            throw new DataRuntimeException("Can't set a property to null: " + name);
+
+        Object old = props.get(name);
+        if (old == null)
+            props.put(name, value);
+        else if (!old.equals(value))
+            throw new DataRuntimeException("Can't overwrite property: " + name);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/data/src/main/java/org/apache/kafka/copycat/data/Schema.java
----------------------------------------------------------------------
diff --git a/copycat/data/src/main/java/org/apache/kafka/copycat/data/Schema.java b/copycat/data/src/main/java/org/apache/kafka/copycat/data/Schema.java
new file mode 100644
index 0000000..04906c3
--- /dev/null
+++ b/copycat/data/src/main/java/org/apache/kafka/copycat/data/Schema.java
@@ -0,0 +1,1054 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+
+
+package org.apache.kafka.copycat.data;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+/** An abstract data type.
+ * <p>A schema may be one of:
+ * <ul>
+ * <li>A <i>record</i>, mapping field names to field value data;
+ * <li>An <i>enum</i>, containing one of a small set of symbols;
+ * <li>An <i>array</i> of values, all of the same schema;
+ * <li>A <i>map</i>, containing string/value pairs, of a declared schema;
+ * <li>A <i>union</i> of other schemas;
+ * <li>A <i>fixed</i> sized binary object;
+ * <li>A unicode <i>string</i>;
+ * <li>A sequence of <i>bytes</i>;
+ * <li>A 32-bit signed <i>int</i>;
+ * <li>A 64-bit signed <i>long</i>;
+ * <li>A 32-bit IEEE single-<i>float</i>; or
+ * <li>A 64-bit IEEE <i>double</i>-float; or
+ * <li>A <i>boolean</i>; or
+ * <li><i>null</i>.
+ * </ul>
+ *
+ * A schema can be constructed using one of its static <tt>createXXX</tt>
+ * methods, or more conveniently using {@link SchemaBuilder}. The schema objects are
+ * <i>logically</i> immutable.
+ * There are only two mutating methods - {@link #setFields(List)} and
+ * {@link #addProp(String, Object)}. The following restrictions apply on these
+ * two methods.
+ * <ul>
+ * <li> {@link #setFields(List)}, can be called at most once. This method exists
+ * in order to enable clients to build recursive schemas.
+ * <li> {@link #addProp(String, Object)} can be called with property names
+ * that are not present already. It is not possible to change or delete an
+ * existing property.
+ * </ul>
+ */
+public abstract class Schema extends ObjectProperties {
+    private static final int NO_HASHCODE = Integer.MIN_VALUE;
+
+    /** The type of a schema. */
+    public enum Type {
+        ENUM {
+            @Override
+            public Object defaultValue(Schema schema) {
+                return null;
+            }
+        },
+        ARRAY {
+            @Override
+            public Object defaultValue(Schema schema) {
+                return new ArrayList<>();
+            }
+        },
+        MAP {
+            @Override
+            public Object defaultValue(Schema schema) {
+                return new HashMap<Object, Object>();
+            }
+        },
+        UNION {
+            @Override
+            public Object defaultValue(Schema schema) {
+                Schema firstSchema = schema.getTypes().get(0);
+                return firstSchema.getType().defaultValue(firstSchema);
+            }
+        },
+        STRING {
+            @Override
+            public Object defaultValue(Schema schema) {
+                return "";
+            }
+        },
+        BYTES {
+            @Override
+            public Object defaultValue(Schema schema) {
+                return new byte[0];
+            }
+        },
+        INT {
+            @Override
+            public Object defaultValue(Schema schema) {
+                return 0;
+            }
+        },
+        LONG {
+            @Override
+            public Object defaultValue(Schema schema) {
+                return 0;
+            }
+        },
+        FLOAT {
+            @Override
+            public Object defaultValue(Schema schema) {
+                return 0;
+            }
+        },
+        DOUBLE {
+            @Override
+            public Object defaultValue(Schema schema) {
+                return 0;
+            }
+        },
+        BOOLEAN {
+            @Override
+            public Object defaultValue(Schema schema) {
+                return false;
+            }
+        },
+        NULL {
+            @Override
+            public Object defaultValue(Schema schema) {
+                return null;
+            }
+        };
+        private String name;
+
+        private Type() {
+            this.name = this.name().toLowerCase();
+        }
+
+        public String getName() {
+            return name;
+        }
+
+        public abstract Object defaultValue(Schema schema);
+    }
+
+    private final Type type;
+
+    Schema(Type type) {
+        super(SCHEMA_RESERVED);
+        this.type = type;
+    }
+
+    /** Create a schema for a primitive type. */
+    public static Schema create(Type type) {
+        switch (type) {
+            case STRING:
+                return new StringSchema();
+            case BYTES:
+                return new BytesSchema();
+            case INT:
+                return new IntSchema();
+            case LONG:
+                return new LongSchema();
+            case FLOAT:
+                return new FloatSchema();
+            case DOUBLE:
+                return new DoubleSchema();
+            case BOOLEAN:
+                return new BooleanSchema();
+            case NULL:
+                return new NullSchema();
+            default:
+                throw new DataRuntimeException("Can't create a: " + type);
+        }
+    }
+
+    private static final Set<String> SCHEMA_RESERVED = new HashSet<String>();
+
+    static {
+        Collections.addAll(SCHEMA_RESERVED,
+                "doc", "fields", "items", "name", "namespace",
+                "size", "symbols", "values", "type", "aliases");
+    }
+
+    int hashCode = NO_HASHCODE;
+
+    @Override
+    public void addProp(String name, Object value) {
+        super.addProp(name, value);
+        hashCode = NO_HASHCODE;
+    }
+
+    /** Create an enum schema. */
+    public static Schema createEnum(String name, String doc, String namespace,
+                                    List<String> values) {
+        return new EnumSchema(new Name(name, namespace), doc,
+                new LockableArrayList<String>(values));
+    }
+
+    /** Create an array schema. */
+    public static Schema createArray(Schema elementType) {
+        return new ArraySchema(elementType);
+    }
+
+    /** Create a map schema. */
+    public static Schema createMap(Schema valueType) {
+        return new MapSchema(valueType);
+    }
+
+    /** Create a union schema. */
+    public static Schema createUnion(List<Schema> types) {
+        return new UnionSchema(new LockableArrayList<Schema>(types));
+    }
+
+    /** Create a union schema. */
+    public static Schema createUnion(Schema... types) {
+        return createUnion(new LockableArrayList<Schema>(types));
+    }
+
+    /** Return the type of this schema. */
+    public Type getType() {
+        return type;
+    }
+
+    /**
+     * If this is a record, returns the Field with the
+     * given name <tt>fieldName</tt>. If there is no field by that name, a
+     * <tt>null</tt> is returned.
+     */
+    public Field getField(String fieldname) {
+        throw new DataRuntimeException("Not a record: " + this);
+    }
+
+    /**
+     * If this is a record, returns the fields in it. The returned
+     * list is in the order of their positions.
+     */
+    public List<Field> getFields() {
+        throw new DataRuntimeException("Not a record: " + this);
+    }
+
+    /**
+     * If this is a record, set its fields. The fields can be set
+     * only once in a schema.
+     */
+    public void setFields(List<Field> fields) {
+        throw new DataRuntimeException("Not a record: " + this);
+    }
+
+    /** If this is an enum, return its symbols. */
+    public List<String> getEnumSymbols() {
+        throw new DataRuntimeException("Not an enum: " + this);
+    }
+
+    /** If this is an enum, return a symbol's ordinal value. */
+    public int getEnumOrdinal(String symbol) {
+        throw new DataRuntimeException("Not an enum: " + this);
+    }
+
+    /** If this is an enum, returns true if it contains given symbol. */
+    public boolean hasEnumSymbol(String symbol) {
+        throw new DataRuntimeException("Not an enum: " + this);
+    }
+
+    /** If this is a record, enum or fixed, returns its name, otherwise the name
+     * of the primitive type. */
+    public String getName() {
+        return type.name;
+    }
+
+    /** If this is a record, enum, or fixed, returns its docstring,
+     * if available.  Otherwise, returns null. */
+    public String getDoc() {
+        return null;
+    }
+
+    /** If this is a record, enum or fixed, returns its namespace, if any. */
+    public String getNamespace() {
+        throw new DataRuntimeException("Not a named type: " + this);
+    }
+
+    /** If this is a record, enum or fixed, returns its namespace-qualified name,
+     * otherwise returns the name of the primitive type. */
+    public String getFullName() {
+        return getName();
+    }
+
+    /** If this is a record, enum or fixed, add an alias. */
+    public void addAlias(String alias) {
+        throw new DataRuntimeException("Not a named type: " + this);
+    }
+
+    /** If this is a record, enum or fixed, add an alias. */
+    public void addAlias(String alias, String space) {
+        throw new DataRuntimeException("Not a named type: " + this);
+    }
+
+    /** If this is a record, enum or fixed, return its aliases, if any. */
+    public Set<String> getAliases() {
+        throw new DataRuntimeException("Not a named type: " + this);
+    }
+
+    /** Returns true if this record is an error type. */
+    public boolean isError() {
+        throw new DataRuntimeException("Not a record: " + this);
+    }
+
+    /** If this is an array, returns its element type. */
+    public Schema getElementType() {
+        throw new DataRuntimeException("Not an array: " + this);
+    }
+
+    /** If this is a map, returns its value type. */
+    public Schema getValueType() {
+        throw new DataRuntimeException("Not a map: " + this);
+    }
+
+    /** If this is a union, returns its types. */
+    public List<Schema> getTypes() {
+        throw new DataRuntimeException("Not a union: " + this);
+    }
+
+    /** If this is a union, return the branch with the provided full name. */
+    public Integer getIndexNamed(String name) {
+        throw new DataRuntimeException("Not a union: " + this);
+    }
+
+    /** If this is fixed, returns its size. */
+    public int getFixedSize() {
+        throw new DataRuntimeException("Not fixed: " + this);
+    }
+
+    @Override
+    public String toString() {
+        // FIXME A more JSON-like output showing the details would be nice
+        return "Schema:" + this.getType() + ":" + getFullName();
+    }
+
+    public boolean equals(Object o) {
+        if (o == this) return true;
+        if (!(o instanceof Schema)) return false;
+        Schema that = (Schema) o;
+        if (!(this.type == that.type)) return false;
+        return equalCachedHash(that) && props.equals(that.props);
+    }
+
+    public final int hashCode() {
+        if (hashCode == NO_HASHCODE)
+            hashCode = computeHash();
+        return hashCode;
+    }
+
+    int computeHash() {
+        return getType().hashCode() + props.hashCode();
+    }
+
+    final boolean equalCachedHash(Schema other) {
+        return (hashCode == other.hashCode)
+                || (hashCode == NO_HASHCODE)
+                || (other.hashCode == NO_HASHCODE);
+    }
+
+    private static final Set<String> FIELD_RESERVED = new HashSet<String>();
+
+    static {
+        Collections.addAll(FIELD_RESERVED,
+                "default", "doc", "name", "order", "type", "aliases");
+    }
+
+    /** A field within a record. */
+    public static class Field extends ObjectProperties {
+
+        /** How values of this field should be ordered when sorting records. */
+        public enum Order {
+            ASCENDING, DESCENDING, IGNORE;
+            private String name;
+
+            private Order() {
+                this.name = this.name().toLowerCase();
+            }
+        }
+
+
+        private final String name;    // name of the field.
+        private int position = -1;
+        private final Schema schema;
+        private final String doc;
+        private final Object defaultValue;
+        private final Order order;
+        private Set<String> aliases;
+
+        public Field(String name, Schema schema, String doc,
+                     Object defaultValue) {
+            this(name, schema, doc, defaultValue, Order.ASCENDING);
+        }
+
+        public Field(String name, Schema schema, String doc,
+                     Object defaultValue, Order order) {
+            super(FIELD_RESERVED);
+            this.name = validateName(name);
+            this.schema = schema;
+            this.doc = doc;
+            this.defaultValue = validateDefault(name, schema, defaultValue);
+            this.order = order;
+        }
+
+        public String name() {
+            return name;
+        }
+
+
+        /** The position of this field within the record. */
+        public int pos() {
+            return position;
+        }
+
+        /** This field's {@link Schema}. */
+        public Schema schema() {
+            return schema;
+        }
+
+        /** Field's documentation within the record, if set. May return null. */
+        public String doc() {
+            return doc;
+        }
+
+        public Object defaultValue() {
+            return defaultValue;
+        }
+
+        public Order order() {
+            return order;
+        }
+
+        public void addAlias(String alias) {
+            if (aliases == null)
+                this.aliases = new LinkedHashSet<String>();
+            aliases.add(alias);
+        }
+
+        /** Return the defined aliases as an unmodifieable Set. */
+        public Set<String> aliases() {
+            if (aliases == null)
+                return Collections.emptySet();
+            return Collections.unmodifiableSet(aliases);
+        }
+
+        public boolean equals(Object other) {
+            if (other == this) return true;
+            if (!(other instanceof Field)) return false;
+            Field that = (Field) other;
+            return (name.equals(that.name)) &&
+                    (schema.equals(that.schema)) &&
+                    defaultValueEquals(that.defaultValue) &&
+                    (order == that.order) &&
+                    props.equals(that.props);
+        }
+
+        public int hashCode() {
+            return name.hashCode() + schema.computeHash();
+        }
+
+        /** Do any possible implicit conversions to double, or return 0 if there isn't a
+         * valid conversion */
+        private double doubleValue(Object v) {
+            if (v instanceof Integer)
+                return (double) (Integer) v;
+            else if (v instanceof Long)
+                return (double) (Long) v;
+            else if (v instanceof Float)
+                return (double) (Float) v;
+            else if (v instanceof Double)
+                return (double) (Double) v;
+            else
+                return 0;
+        }
+
+        private boolean defaultValueEquals(Object thatDefaultValue) {
+            if (defaultValue == null)
+                return thatDefaultValue == null;
+            if (Double.isNaN(doubleValue(defaultValue)))
+                return Double.isNaN(doubleValue(thatDefaultValue));
+            return defaultValue.equals(thatDefaultValue);
+        }
+
+        @Override
+        public String toString() {
+            return name + " type:" + schema.type + " pos:" + position;
+        }
+    }
+
+    static class Name {
+        private final String name;
+        private final String space;
+        private final String full;
+
+        public Name(String name, String space) {
+            if (name == null) {                         // anonymous
+                this.name = this.space = this.full = null;
+                return;
+            }
+            int lastDot = name.lastIndexOf('.');
+            if (lastDot < 0) {                          // unqualified name
+                this.name = validateName(name);
+            } else {                                    // qualified name
+                space = name.substring(0, lastDot);       // get space from name
+                this.name = validateName(name.substring(lastDot + 1, name.length()));
+            }
+            if ("".equals(space))
+                space = null;
+            this.space = space;
+            this.full = (this.space == null) ? this.name : this.space + "." + this.name;
+        }
+
+        public boolean equals(Object o) {
+            if (o == this) return true;
+            if (!(o instanceof Name)) return false;
+            Name that = (Name) o;
+            return full == null ? that.full == null : full.equals(that.full);
+        }
+
+        public int hashCode() {
+            return full == null ? 0 : full.hashCode();
+        }
+
+        public String toString() {
+            return full;
+        }
+
+        public String getQualified(String defaultSpace) {
+            return (space == null || space.equals(defaultSpace)) ? name : full;
+        }
+    }
+
+    private static abstract class NamedSchema extends Schema {
+        final Name name;
+        final String doc;
+        Set<Name> aliases;
+
+        public NamedSchema(Type type, Name name, String doc) {
+            super(type);
+            this.name = name;
+            this.doc = doc;
+            if (PRIMITIVES.containsKey(name.full)) {
+                throw new DataTypeException("Schemas may not be named after primitives: " + name.full);
+            }
+        }
+
+        public String getName() {
+            return name.name;
+        }
+
+        public String getDoc() {
+            return doc;
+        }
+
+        public String getNamespace() {
+            return name.space;
+        }
+
+        public String getFullName() {
+            return name.full;
+        }
+
+        public void addAlias(String alias) {
+            addAlias(alias, null);
+        }
+
+        public void addAlias(String name, String space) {
+            if (aliases == null)
+                this.aliases = new LinkedHashSet<Name>();
+            if (space == null)
+                space = this.name.space;
+            aliases.add(new Name(name, space));
+        }
+
+        public Set<String> getAliases() {
+            Set<String> result = new LinkedHashSet<String>();
+            if (aliases != null)
+                for (Name alias : aliases)
+                    result.add(alias.full);
+            return result;
+        }
+
+        public boolean equalNames(NamedSchema that) {
+            return this.name.equals(that.name);
+        }
+
+        @Override
+        int computeHash() {
+            return super.computeHash() + name.hashCode();
+        }
+    }
+
+    private static class SeenPair {
+        private Object s1;
+        private Object s2;
+
+        private SeenPair(Object s1, Object s2) {
+            this.s1 = s1;
+            this.s2 = s2;
+        }
+
+        public boolean equals(Object o) {
+            return this.s1 == ((SeenPair) o).s1 && this.s2 == ((SeenPair) o).s2;
+        }
+
+        public int hashCode() {
+            return System.identityHashCode(s1) + System.identityHashCode(s2);
+        }
+    }
+
+    private static final ThreadLocal<Set> SEEN_EQUALS = new ThreadLocal<Set>() {
+        protected Set initialValue() {
+            return new HashSet();
+        }
+    };
+    private static final ThreadLocal<Map> SEEN_HASHCODE = new ThreadLocal<Map>() {
+        protected Map initialValue() {
+            return new IdentityHashMap();
+        }
+    };
+
+    private static class EnumSchema extends NamedSchema {
+        private final List<String> symbols;
+        private final Map<String, Integer> ordinals;
+
+        public EnumSchema(Name name, String doc,
+                          LockableArrayList<String> symbols) {
+            super(Type.ENUM, name, doc);
+            this.symbols = symbols.lock();
+            this.ordinals = new HashMap<String, Integer>();
+            int i = 0;
+            for (String symbol : symbols)
+                if (ordinals.put(validateName(symbol), i++) != null)
+                    throw new SchemaParseException("Duplicate enum symbol: " + symbol);
+        }
+
+        public List<String> getEnumSymbols() {
+            return symbols;
+        }
+
+        public boolean hasEnumSymbol(String symbol) {
+            return ordinals.containsKey(symbol);
+        }
+
+        public int getEnumOrdinal(String symbol) {
+            return ordinals.get(symbol);
+        }
+
+        public boolean equals(Object o) {
+            if (o == this) return true;
+            if (!(o instanceof EnumSchema)) return false;
+            EnumSchema that = (EnumSchema) o;
+            return equalCachedHash(that)
+                    && equalNames(that)
+                    && symbols.equals(that.symbols)
+                    && props.equals(that.props);
+        }
+
+        @Override
+        int computeHash() {
+            return super.computeHash() + symbols.hashCode();
+        }
+    }
+
+    private static class ArraySchema extends Schema {
+        private final Schema elementType;
+
+        public ArraySchema(Schema elementType) {
+            super(Type.ARRAY);
+            this.elementType = elementType;
+        }
+
+        public Schema getElementType() {
+            return elementType;
+        }
+
+        public boolean equals(Object o) {
+            if (o == this) return true;
+            if (!(o instanceof ArraySchema)) return false;
+            ArraySchema that = (ArraySchema) o;
+            return equalCachedHash(that)
+                    && elementType.equals(that.elementType)
+                    && props.equals(that.props);
+        }
+
+        @Override
+        int computeHash() {
+            return super.computeHash() + elementType.computeHash();
+        }
+    }
+
+    private static class MapSchema extends Schema {
+        private final Schema valueType;
+
+        public MapSchema(Schema valueType) {
+            super(Type.MAP);
+            this.valueType = valueType;
+        }
+
+        public Schema getValueType() {
+            return valueType;
+        }
+
+        public boolean equals(Object o) {
+            if (o == this) return true;
+            if (!(o instanceof MapSchema)) return false;
+            MapSchema that = (MapSchema) o;
+            return equalCachedHash(that)
+                    && valueType.equals(that.valueType)
+                    && props.equals(that.props);
+        }
+
+        @Override
+        int computeHash() {
+            return super.computeHash() + valueType.computeHash();
+        }
+    }
+
+    private static class UnionSchema extends Schema {
+        private final List<Schema> types;
+        private final Map<String, Integer> indexByName
+                = new HashMap<String, Integer>();
+
+        public UnionSchema(LockableArrayList<Schema> types) {
+            super(Type.UNION);
+            this.types = types.lock();
+            int index = 0;
+            for (Schema type : types) {
+                if (type.getType() == Type.UNION)
+                    throw new DataRuntimeException("Nested union: " + this);
+                String name = type.getFullName();
+                if (name == null)
+                    throw new DataRuntimeException("Nameless in union:" + this);
+                if (indexByName.put(name, index++) != null)
+                    throw new DataRuntimeException("Duplicate in union:" + name);
+            }
+        }
+
+        public List<Schema> getTypes() {
+            return types;
+        }
+
+        public Integer getIndexNamed(String name) {
+            return indexByName.get(name);
+        }
+
+        public boolean equals(Object o) {
+            if (o == this) return true;
+            if (!(o instanceof UnionSchema)) return false;
+            UnionSchema that = (UnionSchema) o;
+            return equalCachedHash(that)
+                    && types.equals(that.types)
+                    && props.equals(that.props);
+        }
+
+        @Override
+        int computeHash() {
+            int hash = super.computeHash();
+            for (Schema type : types)
+                hash += type.computeHash();
+            return hash;
+        }
+    }
+
+    private static class StringSchema extends Schema {
+        public StringSchema() {
+            super(Type.STRING);
+        }
+    }
+
+    private static class BytesSchema extends Schema {
+        public BytesSchema() {
+            super(Type.BYTES);
+        }
+    }
+
+    private static class IntSchema extends Schema {
+        public IntSchema() {
+            super(Type.INT);
+        }
+    }
+
+    private static class LongSchema extends Schema {
+        public LongSchema() {
+            super(Type.LONG);
+        }
+    }
+
+    private static class FloatSchema extends Schema {
+        public FloatSchema() {
+            super(Type.FLOAT);
+        }
+    }
+
+    private static class DoubleSchema extends Schema {
+        public DoubleSchema() {
+            super(Type.DOUBLE);
+        }
+    }
+
+    private static class BooleanSchema extends Schema {
+        public BooleanSchema() {
+            super(Type.BOOLEAN);
+        }
+    }
+
+    private static class NullSchema extends Schema {
+        public NullSchema() {
+            super(Type.NULL);
+        }
+    }
+
+    static final Map<String, Type> PRIMITIVES = new HashMap<String, Type>();
+
+    static {
+        PRIMITIVES.put("string", Type.STRING);
+        PRIMITIVES.put("bytes", Type.BYTES);
+        PRIMITIVES.put("int", Type.INT);
+        PRIMITIVES.put("long", Type.LONG);
+        PRIMITIVES.put("float", Type.FLOAT);
+        PRIMITIVES.put("double", Type.DOUBLE);
+        PRIMITIVES.put("boolean", Type.BOOLEAN);
+        PRIMITIVES.put("null", Type.NULL);
+    }
+
+    static class Names extends LinkedHashMap<Name, Schema> {
+        private String space;                         // default namespace
+
+        public Names() {
+        }
+
+        public Names(String space) {
+            this.space = space;
+        }
+
+        public String space() {
+            return space;
+        }
+
+        public void space(String space) {
+            this.space = space;
+        }
+
+        @Override
+        public Schema get(Object o) {
+            Name name;
+            if (o instanceof String) {
+                Type primitive = PRIMITIVES.get((String) o);
+                if (primitive != null) return Schema.create(primitive);
+                name = new Name((String) o, space);
+                if (!containsKey(name))                   // if not in default
+                    name = new Name((String) o, "");         // try anonymous
+            } else {
+                name = (Name) o;
+            }
+            return super.get(name);
+        }
+
+        public boolean contains(Schema schema) {
+            return get(((NamedSchema) schema).name) != null;
+        }
+
+        public void add(Schema schema) {
+            put(((NamedSchema) schema).name, schema);
+        }
+
+        @Override
+        public Schema put(Name name, Schema schema) {
+            if (containsKey(name))
+                throw new SchemaParseException("Can't redefine: " + name);
+            return super.put(name, schema);
+        }
+    }
+
+    private static ThreadLocal<Boolean> validateNames
+            = new ThreadLocal<Boolean>() {
+        @Override
+        protected Boolean initialValue() {
+            return true;
+        }
+    };
+
+    private static String validateName(String name) {
+        if (!validateNames.get()) return name;        // not validating names
+        int length = name.length();
+        if (length == 0)
+            throw new SchemaParseException("Empty name");
+        char first = name.charAt(0);
+        if (!(Character.isLetter(first) || first == '_'))
+            throw new SchemaParseException("Illegal initial character: " + name);
+        for (int i = 1; i < length; i++) {
+            char c = name.charAt(i);
+            if (!(Character.isLetterOrDigit(c) || c == '_'))
+                throw new SchemaParseException("Illegal character in: " + name);
+        }
+        return name;
+    }
+
+    private static final ThreadLocal<Boolean> VALIDATE_DEFAULTS
+            = new ThreadLocal<Boolean>() {
+        @Override
+        protected Boolean initialValue() {
+            return false;
+        }
+    };
+
+    private static Object validateDefault(String fieldName, Schema schema,
+                                          Object defaultValue) {
+        if ((defaultValue != null)
+                && !isValidDefault(schema, defaultValue)) { // invalid default
+            String message = "Invalid default for field " + fieldName
+                    + ": " + defaultValue + " not a " + schema;
+            if (VALIDATE_DEFAULTS.get())
+                throw new DataTypeException(message);     // throw exception
+            System.err.println("[WARNING] Avro: " + message); // or log warning
+        }
+        return defaultValue;
+    }
+
+    private static boolean isValidDefault(Schema schema, Object defaultValue) {
+        switch (schema.getType()) {
+            case STRING:
+            case ENUM:
+                return (defaultValue instanceof String);
+            case BYTES:
+            case INT:
+                return (defaultValue instanceof Integer);
+            case LONG:
+                return (defaultValue instanceof Long);
+            case FLOAT:
+                return (defaultValue instanceof Float);
+            case DOUBLE:
+                return (defaultValue instanceof Double);
+            case BOOLEAN:
+                return (defaultValue instanceof Boolean);
+            case NULL:
+                return defaultValue == null;
+            case ARRAY:
+                if (!(defaultValue instanceof Collection))
+                    return false;
+                for (Object element : (Collection<Object>) defaultValue)
+                    if (!isValidDefault(schema.getElementType(), element))
+                        return false;
+                return true;
+            case MAP:
+                if (!(defaultValue instanceof Map))
+                    return false;
+                for (Object value : ((Map<Object, Object>) defaultValue).values())
+                    if (!isValidDefault(schema.getValueType(), value))
+                        return false;
+                return true;
+            case UNION:                                   // union default: first branch
+                return isValidDefault(schema.getTypes().get(0), defaultValue);
+            default:
+                return false;
+        }
+    }
+
+    /**
+     * No change is permitted on LockableArrayList once lock() has been
+     * called on it.
+     * @param <E>
+     */
+  
+  /*
+   * This class keeps a boolean variable <tt>locked</tt> which is set
+   * to <tt>true</tt> in the lock() method. It's legal to call
+   * lock() any number of times. Any lock() other than the first one
+   * is a no-op.
+   * 
+   * This class throws <tt>IllegalStateException</tt> if a mutating
+   * operation is performed after being locked. Since modifications through
+   * iterator also use the list's mutating operations, this effectively
+   * blocks all modifications.
+   */
+    static class LockableArrayList<E> extends ArrayList<E> {
+        private static final long serialVersionUID = 1L;
+        private boolean locked = false;
+
+        public LockableArrayList() {
+        }
+
+        public LockableArrayList(int size) {
+            super(size);
+        }
+
+        public LockableArrayList(List<E> types) {
+            super(types);
+        }
+
+        public LockableArrayList(E... types) {
+            super(types.length);
+            Collections.addAll(this, types);
+        }
+
+        public List<E> lock() {
+            locked = true;
+            return this;
+        }
+
+        private void ensureUnlocked() {
+            if (locked) {
+                throw new IllegalStateException();
+            }
+        }
+
+        public boolean add(E e) {
+            ensureUnlocked();
+            return super.add(e);
+        }
+
+        public boolean remove(Object o) {
+            ensureUnlocked();
+            return super.remove(o);
+        }
+
+        public E remove(int index) {
+            ensureUnlocked();
+            return super.remove(index);
+        }
+
+        public boolean addAll(Collection<? extends E> c) {
+            ensureUnlocked();
+            return super.addAll(c);
+        }
+
+        public boolean addAll(int index, Collection<? extends E> c) {
+            ensureUnlocked();
+            return super.addAll(index, c);
+        }
+
+        public boolean removeAll(Collection<?> c) {
+            ensureUnlocked();
+            return super.removeAll(c);
+        }
+
+        public boolean retainAll(Collection<?> c) {
+            ensureUnlocked();
+            return super.retainAll(c);
+        }
+
+        public void clear() {
+            ensureUnlocked();
+            super.clear();
+        }
+
+    }
+
+}