You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2015/08/15 01:01:10 UTC
[6/7] kafka git commit: KAFKA-2366; Initial patch for Copycat
http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/data/src/main/java/org/apache/kafka/copycat/data/DataRuntimeException.java
----------------------------------------------------------------------
diff --git a/copycat/data/src/main/java/org/apache/kafka/copycat/data/DataRuntimeException.java b/copycat/data/src/main/java/org/apache/kafka/copycat/data/DataRuntimeException.java
new file mode 100644
index 0000000..855c0fd
--- /dev/null
+++ b/copycat/data/src/main/java/org/apache/kafka/copycat/data/DataRuntimeException.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+
+
+package org.apache.kafka.copycat.data;
+
+/** Base Avro exception. */
+public class DataRuntimeException extends RuntimeException {
+ public DataRuntimeException(Throwable cause) {
+ super(cause);
+ }
+
+ public DataRuntimeException(String message) {
+ super(message);
+ }
+
+ public DataRuntimeException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/data/src/main/java/org/apache/kafka/copycat/data/DataTypeException.java
----------------------------------------------------------------------
diff --git a/copycat/data/src/main/java/org/apache/kafka/copycat/data/DataTypeException.java b/copycat/data/src/main/java/org/apache/kafka/copycat/data/DataTypeException.java
new file mode 100644
index 0000000..6a74d88
--- /dev/null
+++ b/copycat/data/src/main/java/org/apache/kafka/copycat/data/DataTypeException.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+
+
+package org.apache.kafka.copycat.data;
+
+
+/** Thrown when an illegal type is used. */
+public class DataTypeException extends DataRuntimeException {
+ public DataTypeException(String message) {
+ super(message);
+ }
+
+ public DataTypeException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/data/src/main/java/org/apache/kafka/copycat/data/ObjectProperties.java
----------------------------------------------------------------------
diff --git a/copycat/data/src/main/java/org/apache/kafka/copycat/data/ObjectProperties.java b/copycat/data/src/main/java/org/apache/kafka/copycat/data/ObjectProperties.java
new file mode 100644
index 0000000..e995b7f
--- /dev/null
+++ b/copycat/data/src/main/java/org/apache/kafka/copycat/data/ObjectProperties.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+
+
+package org.apache.kafka.copycat.data;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Base class for objects that have Object-valued properties.
+ */
+public abstract class ObjectProperties {
+ public static class Null {
+ private Null() {
+ }
+ }
+
+ /** A value representing a JSON <code>null</code>. */
+ public static final Null NULL_VALUE = new Null();
+
+ Map<String, Object> props = new LinkedHashMap<String, Object>(1);
+
+ private Set<String> reserved;
+
+ ObjectProperties(Set<String> reserved) {
+ this.reserved = reserved;
+ }
+
+ /**
+ * Returns the value of the named, string-valued property in this schema.
+ * Returns <tt>null</tt> if there is no string-valued property with that name.
+ */
+ public String getProp(String name) {
+ Object value = getObjectProp(name);
+ return (value instanceof String) ? (String) value : null;
+ }
+
+ /**
+ * Returns the value of the named property in this schema.
+ * Returns <tt>null</tt> if there is no property with that name.
+ */
+ public synchronized Object getObjectProp(String name) {
+ return props.get(name);
+ }
+
+ /**
+ * Adds a property with the given name <tt>name</tt> and
+ * value <tt>value</tt>. Neither <tt>name</tt> nor <tt>value</tt> can be
+ * <tt>null</tt>. It is illegal to add a property if another with
+ * the same name but different value already exists in this schema.
+ *
+ * @param name The name of the property to add
+ * @param value The value for the property to add
+ */
+ public synchronized void addProp(String name, Object value) {
+ if (reserved.contains(name))
+ throw new DataRuntimeException("Can't set reserved property: " + name);
+
+ if (value == null)
+ throw new DataRuntimeException("Can't set a property to null: " + name);
+
+ Object old = props.get(name);
+ if (old == null)
+ props.put(name, value);
+ else if (!old.equals(value))
+ throw new DataRuntimeException("Can't overwrite property: " + name);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/data/src/main/java/org/apache/kafka/copycat/data/Schema.java
----------------------------------------------------------------------
diff --git a/copycat/data/src/main/java/org/apache/kafka/copycat/data/Schema.java b/copycat/data/src/main/java/org/apache/kafka/copycat/data/Schema.java
new file mode 100644
index 0000000..04906c3
--- /dev/null
+++ b/copycat/data/src/main/java/org/apache/kafka/copycat/data/Schema.java
@@ -0,0 +1,1054 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+
+
+package org.apache.kafka.copycat.data;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+/** An abstract data type.
+ * <p>A schema may be one of:
+ * <ul>
+ * <li>A <i>record</i>, mapping field names to field value data;
+ * <li>An <i>enum</i>, containing one of a small set of symbols;
+ * <li>An <i>array</i> of values, all of the same schema;
+ * <li>A <i>map</i>, containing string/value pairs, of a declared schema;
+ * <li>A <i>union</i> of other schemas;
+ * <li>A <i>fixed</i> sized binary object;
+ * <li>A unicode <i>string</i>;
+ * <li>A sequence of <i>bytes</i>;
+ * <li>A 32-bit signed <i>int</i>;
+ * <li>A 64-bit signed <i>long</i>;
+ * <li>A 32-bit IEEE single-<i>float</i>; or
+ * <li>A 64-bit IEEE <i>double</i>-float; or
+ * <li>A <i>boolean</i>; or
+ * <li><i>null</i>.
+ * </ul>
+ *
+ * A schema can be constructed using one of its static <tt>createXXX</tt>
+ * methods, or more conveniently using {@link SchemaBuilder}. The schema objects are
+ * <i>logically</i> immutable.
+ * There are only two mutating methods - {@link #setFields(List)} and
+ * {@link #addProp(String, Object)}. The following restrictions apply on these
+ * two methods.
+ * <ul>
+ * <li> {@link #setFields(List)}, can be called at most once. This method exists
+ * in order to enable clients to build recursive schemas.
+ * <li> {@link #addProp(String, Object)} can be called with property names
+ * that are not present already. It is not possible to change or delete an
+ * existing property.
+ * </ul>
+ */
+public abstract class Schema extends ObjectProperties {
+ private static final int NO_HASHCODE = Integer.MIN_VALUE;
+
+ /** The type of a schema. */
+ public enum Type {
+ ENUM {
+ @Override
+ public Object defaultValue(Schema schema) {
+ return null;
+ }
+ },
+ ARRAY {
+ @Override
+ public Object defaultValue(Schema schema) {
+ return new ArrayList<>();
+ }
+ },
+ MAP {
+ @Override
+ public Object defaultValue(Schema schema) {
+ return new HashMap<Object, Object>();
+ }
+ },
+ UNION {
+ @Override
+ public Object defaultValue(Schema schema) {
+ Schema firstSchema = schema.getTypes().get(0);
+ return firstSchema.getType().defaultValue(firstSchema);
+ }
+ },
+ STRING {
+ @Override
+ public Object defaultValue(Schema schema) {
+ return "";
+ }
+ },
+ BYTES {
+ @Override
+ public Object defaultValue(Schema schema) {
+ return new byte[0];
+ }
+ },
+ INT {
+ @Override
+ public Object defaultValue(Schema schema) {
+ return 0;
+ }
+ },
+ LONG {
+ @Override
+ public Object defaultValue(Schema schema) {
+ return 0;
+ }
+ },
+ FLOAT {
+ @Override
+ public Object defaultValue(Schema schema) {
+ return 0;
+ }
+ },
+ DOUBLE {
+ @Override
+ public Object defaultValue(Schema schema) {
+ return 0;
+ }
+ },
+ BOOLEAN {
+ @Override
+ public Object defaultValue(Schema schema) {
+ return false;
+ }
+ },
+ NULL {
+ @Override
+ public Object defaultValue(Schema schema) {
+ return null;
+ }
+ };
+ private String name;
+
+ private Type() {
+ this.name = this.name().toLowerCase();
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public abstract Object defaultValue(Schema schema);
+ }
+
+ private final Type type;
+
+ Schema(Type type) {
+ super(SCHEMA_RESERVED);
+ this.type = type;
+ }
+
+ /** Create a schema for a primitive type. */
+ public static Schema create(Type type) {
+ switch (type) {
+ case STRING:
+ return new StringSchema();
+ case BYTES:
+ return new BytesSchema();
+ case INT:
+ return new IntSchema();
+ case LONG:
+ return new LongSchema();
+ case FLOAT:
+ return new FloatSchema();
+ case DOUBLE:
+ return new DoubleSchema();
+ case BOOLEAN:
+ return new BooleanSchema();
+ case NULL:
+ return new NullSchema();
+ default:
+ throw new DataRuntimeException("Can't create a: " + type);
+ }
+ }
+
+ private static final Set<String> SCHEMA_RESERVED = new HashSet<String>();
+
+ static {
+ Collections.addAll(SCHEMA_RESERVED,
+ "doc", "fields", "items", "name", "namespace",
+ "size", "symbols", "values", "type", "aliases");
+ }
+
+ int hashCode = NO_HASHCODE;
+
+ @Override
+ public void addProp(String name, Object value) {
+ super.addProp(name, value);
+ hashCode = NO_HASHCODE;
+ }
+
+ /** Create an enum schema. */
+ public static Schema createEnum(String name, String doc, String namespace,
+ List<String> values) {
+ return new EnumSchema(new Name(name, namespace), doc,
+ new LockableArrayList<String>(values));
+ }
+
+ /** Create an array schema. */
+ public static Schema createArray(Schema elementType) {
+ return new ArraySchema(elementType);
+ }
+
+ /** Create a map schema. */
+ public static Schema createMap(Schema valueType) {
+ return new MapSchema(valueType);
+ }
+
+ /** Create a union schema. */
+ public static Schema createUnion(List<Schema> types) {
+ return new UnionSchema(new LockableArrayList<Schema>(types));
+ }
+
+ /** Create a union schema. */
+ public static Schema createUnion(Schema... types) {
+ return createUnion(new LockableArrayList<Schema>(types));
+ }
+
+ /** Return the type of this schema. */
+ public Type getType() {
+ return type;
+ }
+
+ /**
+ * If this is a record, returns the Field with the
+ * given name <tt>fieldName</tt>. If there is no field by that name, a
+ * <tt>null</tt> is returned.
+ */
+ public Field getField(String fieldname) {
+ throw new DataRuntimeException("Not a record: " + this);
+ }
+
+ /**
+ * If this is a record, returns the fields in it. The returned
+ * list is in the order of their positions.
+ */
+ public List<Field> getFields() {
+ throw new DataRuntimeException("Not a record: " + this);
+ }
+
+ /**
+ * If this is a record, set its fields. The fields can be set
+ * only once in a schema.
+ */
+ public void setFields(List<Field> fields) {
+ throw new DataRuntimeException("Not a record: " + this);
+ }
+
+ /** If this is an enum, return its symbols. */
+ public List<String> getEnumSymbols() {
+ throw new DataRuntimeException("Not an enum: " + this);
+ }
+
+ /** If this is an enum, return a symbol's ordinal value. */
+ public int getEnumOrdinal(String symbol) {
+ throw new DataRuntimeException("Not an enum: " + this);
+ }
+
+ /** If this is an enum, returns true if it contains given symbol. */
+ public boolean hasEnumSymbol(String symbol) {
+ throw new DataRuntimeException("Not an enum: " + this);
+ }
+
+ /** If this is a record, enum or fixed, returns its name, otherwise the name
+ * of the primitive type. */
+ public String getName() {
+ return type.name;
+ }
+
+ /** If this is a record, enum, or fixed, returns its docstring,
+ * if available. Otherwise, returns null. */
+ public String getDoc() {
+ return null;
+ }
+
+ /** If this is a record, enum or fixed, returns its namespace, if any. */
+ public String getNamespace() {
+ throw new DataRuntimeException("Not a named type: " + this);
+ }
+
+ /** If this is a record, enum or fixed, returns its namespace-qualified name,
+ * otherwise returns the name of the primitive type. */
+ public String getFullName() {
+ return getName();
+ }
+
+ /** If this is a record, enum or fixed, add an alias. */
+ public void addAlias(String alias) {
+ throw new DataRuntimeException("Not a named type: " + this);
+ }
+
+ /** If this is a record, enum or fixed, add an alias. */
+ public void addAlias(String alias, String space) {
+ throw new DataRuntimeException("Not a named type: " + this);
+ }
+
+ /** If this is a record, enum or fixed, return its aliases, if any. */
+ public Set<String> getAliases() {
+ throw new DataRuntimeException("Not a named type: " + this);
+ }
+
+ /** Returns true if this record is an error type. */
+ public boolean isError() {
+ throw new DataRuntimeException("Not a record: " + this);
+ }
+
+ /** If this is an array, returns its element type. */
+ public Schema getElementType() {
+ throw new DataRuntimeException("Not an array: " + this);
+ }
+
+ /** If this is a map, returns its value type. */
+ public Schema getValueType() {
+ throw new DataRuntimeException("Not a map: " + this);
+ }
+
+ /** If this is a union, returns its types. */
+ public List<Schema> getTypes() {
+ throw new DataRuntimeException("Not a union: " + this);
+ }
+
+ /** If this is a union, return the branch with the provided full name. */
+ public Integer getIndexNamed(String name) {
+ throw new DataRuntimeException("Not a union: " + this);
+ }
+
+ /** If this is fixed, returns its size. */
+ public int getFixedSize() {
+ throw new DataRuntimeException("Not fixed: " + this);
+ }
+
+ @Override
+ public String toString() {
+ // FIXME A more JSON-like output showing the details would be nice
+ return "Schema:" + this.getType() + ":" + getFullName();
+ }
+
+ public boolean equals(Object o) {
+ if (o == this) return true;
+ if (!(o instanceof Schema)) return false;
+ Schema that = (Schema) o;
+ if (!(this.type == that.type)) return false;
+ return equalCachedHash(that) && props.equals(that.props);
+ }
+
+ public final int hashCode() {
+ if (hashCode == NO_HASHCODE)
+ hashCode = computeHash();
+ return hashCode;
+ }
+
+ int computeHash() {
+ return getType().hashCode() + props.hashCode();
+ }
+
+ final boolean equalCachedHash(Schema other) {
+ return (hashCode == other.hashCode)
+ || (hashCode == NO_HASHCODE)
+ || (other.hashCode == NO_HASHCODE);
+ }
+
+ private static final Set<String> FIELD_RESERVED = new HashSet<String>();
+
+ static {
+ Collections.addAll(FIELD_RESERVED,
+ "default", "doc", "name", "order", "type", "aliases");
+ }
+
+ /** A field within a record. */
+ public static class Field extends ObjectProperties {
+
+ /** How values of this field should be ordered when sorting records. */
+ public enum Order {
+ ASCENDING, DESCENDING, IGNORE;
+ private String name;
+
+ private Order() {
+ this.name = this.name().toLowerCase();
+ }
+ }
+
+
+ private final String name; // name of the field.
+ private int position = -1;
+ private final Schema schema;
+ private final String doc;
+ private final Object defaultValue;
+ private final Order order;
+ private Set<String> aliases;
+
+ public Field(String name, Schema schema, String doc,
+ Object defaultValue) {
+ this(name, schema, doc, defaultValue, Order.ASCENDING);
+ }
+
+ public Field(String name, Schema schema, String doc,
+ Object defaultValue, Order order) {
+ super(FIELD_RESERVED);
+ this.name = validateName(name);
+ this.schema = schema;
+ this.doc = doc;
+ this.defaultValue = validateDefault(name, schema, defaultValue);
+ this.order = order;
+ }
+
+ public String name() {
+ return name;
+ }
+
+
+ /** The position of this field within the record. */
+ public int pos() {
+ return position;
+ }
+
+ /** This field's {@link Schema}. */
+ public Schema schema() {
+ return schema;
+ }
+
+ /** Field's documentation within the record, if set. May return null. */
+ public String doc() {
+ return doc;
+ }
+
+ public Object defaultValue() {
+ return defaultValue;
+ }
+
+ public Order order() {
+ return order;
+ }
+
+ public void addAlias(String alias) {
+ if (aliases == null)
+ this.aliases = new LinkedHashSet<String>();
+ aliases.add(alias);
+ }
+
+ /** Return the defined aliases as an unmodifieable Set. */
+ public Set<String> aliases() {
+ if (aliases == null)
+ return Collections.emptySet();
+ return Collections.unmodifiableSet(aliases);
+ }
+
+ public boolean equals(Object other) {
+ if (other == this) return true;
+ if (!(other instanceof Field)) return false;
+ Field that = (Field) other;
+ return (name.equals(that.name)) &&
+ (schema.equals(that.schema)) &&
+ defaultValueEquals(that.defaultValue) &&
+ (order == that.order) &&
+ props.equals(that.props);
+ }
+
+ public int hashCode() {
+ return name.hashCode() + schema.computeHash();
+ }
+
+ /** Do any possible implicit conversions to double, or return 0 if there isn't a
+ * valid conversion */
+ private double doubleValue(Object v) {
+ if (v instanceof Integer)
+ return (double) (Integer) v;
+ else if (v instanceof Long)
+ return (double) (Long) v;
+ else if (v instanceof Float)
+ return (double) (Float) v;
+ else if (v instanceof Double)
+ return (double) (Double) v;
+ else
+ return 0;
+ }
+
+ private boolean defaultValueEquals(Object thatDefaultValue) {
+ if (defaultValue == null)
+ return thatDefaultValue == null;
+ if (Double.isNaN(doubleValue(defaultValue)))
+ return Double.isNaN(doubleValue(thatDefaultValue));
+ return defaultValue.equals(thatDefaultValue);
+ }
+
+ @Override
+ public String toString() {
+ return name + " type:" + schema.type + " pos:" + position;
+ }
+ }
+
+ static class Name {
+ private final String name;
+ private final String space;
+ private final String full;
+
+ public Name(String name, String space) {
+ if (name == null) { // anonymous
+ this.name = this.space = this.full = null;
+ return;
+ }
+ int lastDot = name.lastIndexOf('.');
+ if (lastDot < 0) { // unqualified name
+ this.name = validateName(name);
+ } else { // qualified name
+ space = name.substring(0, lastDot); // get space from name
+ this.name = validateName(name.substring(lastDot + 1, name.length()));
+ }
+ if ("".equals(space))
+ space = null;
+ this.space = space;
+ this.full = (this.space == null) ? this.name : this.space + "." + this.name;
+ }
+
+ public boolean equals(Object o) {
+ if (o == this) return true;
+ if (!(o instanceof Name)) return false;
+ Name that = (Name) o;
+ return full == null ? that.full == null : full.equals(that.full);
+ }
+
+ public int hashCode() {
+ return full == null ? 0 : full.hashCode();
+ }
+
+ public String toString() {
+ return full;
+ }
+
+ public String getQualified(String defaultSpace) {
+ return (space == null || space.equals(defaultSpace)) ? name : full;
+ }
+ }
+
+ private static abstract class NamedSchema extends Schema {
+ final Name name;
+ final String doc;
+ Set<Name> aliases;
+
+ public NamedSchema(Type type, Name name, String doc) {
+ super(type);
+ this.name = name;
+ this.doc = doc;
+ if (PRIMITIVES.containsKey(name.full)) {
+ throw new DataTypeException("Schemas may not be named after primitives: " + name.full);
+ }
+ }
+
+ public String getName() {
+ return name.name;
+ }
+
+ public String getDoc() {
+ return doc;
+ }
+
+ public String getNamespace() {
+ return name.space;
+ }
+
+ public String getFullName() {
+ return name.full;
+ }
+
+ public void addAlias(String alias) {
+ addAlias(alias, null);
+ }
+
+ public void addAlias(String name, String space) {
+ if (aliases == null)
+ this.aliases = new LinkedHashSet<Name>();
+ if (space == null)
+ space = this.name.space;
+ aliases.add(new Name(name, space));
+ }
+
+ public Set<String> getAliases() {
+ Set<String> result = new LinkedHashSet<String>();
+ if (aliases != null)
+ for (Name alias : aliases)
+ result.add(alias.full);
+ return result;
+ }
+
+ public boolean equalNames(NamedSchema that) {
+ return this.name.equals(that.name);
+ }
+
+ @Override
+ int computeHash() {
+ return super.computeHash() + name.hashCode();
+ }
+ }
+
+ private static class SeenPair {
+ private Object s1;
+ private Object s2;
+
+ private SeenPair(Object s1, Object s2) {
+ this.s1 = s1;
+ this.s2 = s2;
+ }
+
+ public boolean equals(Object o) {
+ return this.s1 == ((SeenPair) o).s1 && this.s2 == ((SeenPair) o).s2;
+ }
+
+ public int hashCode() {
+ return System.identityHashCode(s1) + System.identityHashCode(s2);
+ }
+ }
+
+ private static final ThreadLocal<Set> SEEN_EQUALS = new ThreadLocal<Set>() {
+ protected Set initialValue() {
+ return new HashSet();
+ }
+ };
+ private static final ThreadLocal<Map> SEEN_HASHCODE = new ThreadLocal<Map>() {
+ protected Map initialValue() {
+ return new IdentityHashMap();
+ }
+ };
+
+ private static class EnumSchema extends NamedSchema {
+ private final List<String> symbols;
+ private final Map<String, Integer> ordinals;
+
+ public EnumSchema(Name name, String doc,
+ LockableArrayList<String> symbols) {
+ super(Type.ENUM, name, doc);
+ this.symbols = symbols.lock();
+ this.ordinals = new HashMap<String, Integer>();
+ int i = 0;
+ for (String symbol : symbols)
+ if (ordinals.put(validateName(symbol), i++) != null)
+ throw new SchemaParseException("Duplicate enum symbol: " + symbol);
+ }
+
+ public List<String> getEnumSymbols() {
+ return symbols;
+ }
+
+ public boolean hasEnumSymbol(String symbol) {
+ return ordinals.containsKey(symbol);
+ }
+
+ public int getEnumOrdinal(String symbol) {
+ return ordinals.get(symbol);
+ }
+
+ public boolean equals(Object o) {
+ if (o == this) return true;
+ if (!(o instanceof EnumSchema)) return false;
+ EnumSchema that = (EnumSchema) o;
+ return equalCachedHash(that)
+ && equalNames(that)
+ && symbols.equals(that.symbols)
+ && props.equals(that.props);
+ }
+
+ @Override
+ int computeHash() {
+ return super.computeHash() + symbols.hashCode();
+ }
+ }
+
+ private static class ArraySchema extends Schema {
+ private final Schema elementType;
+
+ public ArraySchema(Schema elementType) {
+ super(Type.ARRAY);
+ this.elementType = elementType;
+ }
+
+ public Schema getElementType() {
+ return elementType;
+ }
+
+ public boolean equals(Object o) {
+ if (o == this) return true;
+ if (!(o instanceof ArraySchema)) return false;
+ ArraySchema that = (ArraySchema) o;
+ return equalCachedHash(that)
+ && elementType.equals(that.elementType)
+ && props.equals(that.props);
+ }
+
+ @Override
+ int computeHash() {
+ return super.computeHash() + elementType.computeHash();
+ }
+ }
+
+ private static class MapSchema extends Schema {
+ private final Schema valueType;
+
+ public MapSchema(Schema valueType) {
+ super(Type.MAP);
+ this.valueType = valueType;
+ }
+
+ public Schema getValueType() {
+ return valueType;
+ }
+
+ public boolean equals(Object o) {
+ if (o == this) return true;
+ if (!(o instanceof MapSchema)) return false;
+ MapSchema that = (MapSchema) o;
+ return equalCachedHash(that)
+ && valueType.equals(that.valueType)
+ && props.equals(that.props);
+ }
+
+ @Override
+ int computeHash() {
+ return super.computeHash() + valueType.computeHash();
+ }
+ }
+
+ private static class UnionSchema extends Schema {
+ private final List<Schema> types;
+ private final Map<String, Integer> indexByName
+ = new HashMap<String, Integer>();
+
+ public UnionSchema(LockableArrayList<Schema> types) {
+ super(Type.UNION);
+ this.types = types.lock();
+ int index = 0;
+ for (Schema type : types) {
+ if (type.getType() == Type.UNION)
+ throw new DataRuntimeException("Nested union: " + this);
+ String name = type.getFullName();
+ if (name == null)
+ throw new DataRuntimeException("Nameless in union:" + this);
+ if (indexByName.put(name, index++) != null)
+ throw new DataRuntimeException("Duplicate in union:" + name);
+ }
+ }
+
+ public List<Schema> getTypes() {
+ return types;
+ }
+
+ public Integer getIndexNamed(String name) {
+ return indexByName.get(name);
+ }
+
+ public boolean equals(Object o) {
+ if (o == this) return true;
+ if (!(o instanceof UnionSchema)) return false;
+ UnionSchema that = (UnionSchema) o;
+ return equalCachedHash(that)
+ && types.equals(that.types)
+ && props.equals(that.props);
+ }
+
+ @Override
+ int computeHash() {
+ int hash = super.computeHash();
+ for (Schema type : types)
+ hash += type.computeHash();
+ return hash;
+ }
+ }
+
+ private static class StringSchema extends Schema {
+ public StringSchema() {
+ super(Type.STRING);
+ }
+ }
+
+ private static class BytesSchema extends Schema {
+ public BytesSchema() {
+ super(Type.BYTES);
+ }
+ }
+
+ private static class IntSchema extends Schema {
+ public IntSchema() {
+ super(Type.INT);
+ }
+ }
+
+ private static class LongSchema extends Schema {
+ public LongSchema() {
+ super(Type.LONG);
+ }
+ }
+
+ private static class FloatSchema extends Schema {
+ public FloatSchema() {
+ super(Type.FLOAT);
+ }
+ }
+
+ private static class DoubleSchema extends Schema {
+ public DoubleSchema() {
+ super(Type.DOUBLE);
+ }
+ }
+
+ private static class BooleanSchema extends Schema {
+ public BooleanSchema() {
+ super(Type.BOOLEAN);
+ }
+ }
+
+ private static class NullSchema extends Schema {
+ public NullSchema() {
+ super(Type.NULL);
+ }
+ }
+
+ static final Map<String, Type> PRIMITIVES = new HashMap<String, Type>();
+
+ static {
+ PRIMITIVES.put("string", Type.STRING);
+ PRIMITIVES.put("bytes", Type.BYTES);
+ PRIMITIVES.put("int", Type.INT);
+ PRIMITIVES.put("long", Type.LONG);
+ PRIMITIVES.put("float", Type.FLOAT);
+ PRIMITIVES.put("double", Type.DOUBLE);
+ PRIMITIVES.put("boolean", Type.BOOLEAN);
+ PRIMITIVES.put("null", Type.NULL);
+ }
+
+ static class Names extends LinkedHashMap<Name, Schema> {
+ private String space; // default namespace
+
+ public Names() {
+ }
+
+ public Names(String space) {
+ this.space = space;
+ }
+
+ public String space() {
+ return space;
+ }
+
+ public void space(String space) {
+ this.space = space;
+ }
+
+ @Override
+ public Schema get(Object o) {
+ Name name;
+ if (o instanceof String) {
+ Type primitive = PRIMITIVES.get((String) o);
+ if (primitive != null) return Schema.create(primitive);
+ name = new Name((String) o, space);
+ if (!containsKey(name)) // if not in default
+ name = new Name((String) o, ""); // try anonymous
+ } else {
+ name = (Name) o;
+ }
+ return super.get(name);
+ }
+
+ public boolean contains(Schema schema) {
+ return get(((NamedSchema) schema).name) != null;
+ }
+
+ public void add(Schema schema) {
+ put(((NamedSchema) schema).name, schema);
+ }
+
+ @Override
+ public Schema put(Name name, Schema schema) {
+ if (containsKey(name))
+ throw new SchemaParseException("Can't redefine: " + name);
+ return super.put(name, schema);
+ }
+ }
+
+ private static ThreadLocal<Boolean> validateNames
+ = new ThreadLocal<Boolean>() {
+ @Override
+ protected Boolean initialValue() {
+ return true;
+ }
+ };
+
+ private static String validateName(String name) {
+ if (!validateNames.get()) return name; // not validating names
+ int length = name.length();
+ if (length == 0)
+ throw new SchemaParseException("Empty name");
+ char first = name.charAt(0);
+ if (!(Character.isLetter(first) || first == '_'))
+ throw new SchemaParseException("Illegal initial character: " + name);
+ for (int i = 1; i < length; i++) {
+ char c = name.charAt(i);
+ if (!(Character.isLetterOrDigit(c) || c == '_'))
+ throw new SchemaParseException("Illegal character in: " + name);
+ }
+ return name;
+ }
+
+ private static final ThreadLocal<Boolean> VALIDATE_DEFAULTS
+ = new ThreadLocal<Boolean>() {
+ @Override
+ protected Boolean initialValue() {
+ return false;
+ }
+ };
+
+ private static Object validateDefault(String fieldName, Schema schema,
+ Object defaultValue) {
+ if ((defaultValue != null)
+ && !isValidDefault(schema, defaultValue)) { // invalid default
+ String message = "Invalid default for field " + fieldName
+ + ": " + defaultValue + " not a " + schema;
+ if (VALIDATE_DEFAULTS.get())
+ throw new DataTypeException(message); // throw exception
+ System.err.println("[WARNING] Avro: " + message); // or log warning
+ }
+ return defaultValue;
+ }
+
+ private static boolean isValidDefault(Schema schema, Object defaultValue) {
+ switch (schema.getType()) {
+ case STRING:
+ case ENUM:
+ return (defaultValue instanceof String);
+ case BYTES:
+ case INT:
+ return (defaultValue instanceof Integer);
+ case LONG:
+ return (defaultValue instanceof Long);
+ case FLOAT:
+ return (defaultValue instanceof Float);
+ case DOUBLE:
+ return (defaultValue instanceof Double);
+ case BOOLEAN:
+ return (defaultValue instanceof Boolean);
+ case NULL:
+ return defaultValue == null;
+ case ARRAY:
+ if (!(defaultValue instanceof Collection))
+ return false;
+ for (Object element : (Collection<Object>) defaultValue)
+ if (!isValidDefault(schema.getElementType(), element))
+ return false;
+ return true;
+ case MAP:
+ if (!(defaultValue instanceof Map))
+ return false;
+ for (Object value : ((Map<Object, Object>) defaultValue).values())
+ if (!isValidDefault(schema.getValueType(), value))
+ return false;
+ return true;
+ case UNION: // union default: first branch
+ return isValidDefault(schema.getTypes().get(0), defaultValue);
+ default:
+ return false;
+ }
+ }
+
+ /**
+ * No change is permitted on LockableArrayList once lock() has been
+ * called on it.
+ * @param <E>
+ */
+
+ /*
+ * This class keeps a boolean variable <tt>locked</tt> which is set
+ * to <tt>true</tt> in the lock() method. It's legal to call
+ * lock() any number of times. Any lock() other than the first one
+ * is a no-op.
+ *
+ * This class throws <tt>IllegalStateException</tt> if a mutating
+ * operation is performed after being locked. Since modifications through
+ * iterator also use the list's mutating operations, this effectively
+ * blocks all modifications.
+ */
+ static class LockableArrayList<E> extends ArrayList<E> {
+ private static final long serialVersionUID = 1L;
+ private boolean locked = false;
+
+ public LockableArrayList() {
+ }
+
+ public LockableArrayList(int size) {
+ super(size);
+ }
+
+ public LockableArrayList(List<E> types) {
+ super(types);
+ }
+
+ public LockableArrayList(E... types) {
+ super(types.length);
+ Collections.addAll(this, types);
+ }
+
+ public List<E> lock() {
+ locked = true;
+ return this;
+ }
+
+ private void ensureUnlocked() {
+ if (locked) {
+ throw new IllegalStateException();
+ }
+ }
+
+ public boolean add(E e) {
+ ensureUnlocked();
+ return super.add(e);
+ }
+
+ public boolean remove(Object o) {
+ ensureUnlocked();
+ return super.remove(o);
+ }
+
+ public E remove(int index) {
+ ensureUnlocked();
+ return super.remove(index);
+ }
+
+ public boolean addAll(Collection<? extends E> c) {
+ ensureUnlocked();
+ return super.addAll(c);
+ }
+
+ public boolean addAll(int index, Collection<? extends E> c) {
+ ensureUnlocked();
+ return super.addAll(index, c);
+ }
+
+ public boolean removeAll(Collection<?> c) {
+ ensureUnlocked();
+ return super.removeAll(c);
+ }
+
+ public boolean retainAll(Collection<?> c) {
+ ensureUnlocked();
+ return super.retainAll(c);
+ }
+
+ public void clear() {
+ ensureUnlocked();
+ super.clear();
+ }
+
+ }
+
+}