You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2015/08/27 20:58:54 UTC
[4/5] kafka git commit: KAFKA-2367; Add Copycat runtime data API.
http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/copycat/api/src/test/java/org/apache/kafka/copycat/data/FieldTest.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/test/java/org/apache/kafka/copycat/data/FieldTest.java b/copycat/api/src/test/java/org/apache/kafka/copycat/data/FieldTest.java
new file mode 100644
index 0000000..d5458bc
--- /dev/null
+++ b/copycat/api/src/test/java/org/apache/kafka/copycat/data/FieldTest.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.data;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+public class FieldTest {
+
+ @Test
+ public void testEquality() {
+ Field field1 = new Field("name", 0, Schema.INT8_SCHEMA);
+ Field field2 = new Field("name", 0, Schema.INT8_SCHEMA);
+ Field differentName = new Field("name2", 0, Schema.INT8_SCHEMA);
+ Field differentIndex = new Field("name", 1, Schema.INT8_SCHEMA);
+ Field differentSchema = new Field("name", 0, Schema.INT16_SCHEMA);
+
+ assertEquals(field1, field2);
+ assertNotEquals(field1, differentName);
+ assertNotEquals(field1, differentIndex);
+ assertNotEquals(field1, differentSchema);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/copycat/api/src/test/java/org/apache/kafka/copycat/data/SchemaBuilderTest.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/test/java/org/apache/kafka/copycat/data/SchemaBuilderTest.java b/copycat/api/src/test/java/org/apache/kafka/copycat/data/SchemaBuilderTest.java
new file mode 100644
index 0000000..fca1c10
--- /dev/null
+++ b/copycat/api/src/test/java/org/apache/kafka/copycat/data/SchemaBuilderTest.java
@@ -0,0 +1,287 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.data;
+
+import org.apache.kafka.copycat.errors.SchemaBuilderException;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class SchemaBuilderTest {
+ private static final String NAME = "name";
+ private static final Integer VERSION = 2;
+ private static final String DOC = "doc";
+
+ @Test
+ public void testInt8Builder() {
+ Schema schema = SchemaBuilder.int8().build();
+ assertTypeAndDefault(schema, Schema.Type.INT8, false, null);
+ assertNoMetadata(schema);
+
+ schema = SchemaBuilder.int8().name(NAME).optional().defaultValue((byte) 12)
+ .version(VERSION).doc(DOC).build();
+ assertTypeAndDefault(schema, Schema.Type.INT8, true, (byte) 12);
+ assertMetadata(schema, NAME, VERSION, DOC);
+ }
+
+ @Test(expected = SchemaBuilderException.class)
+ public void testInt8BuilderInvalidDefault() {
+ SchemaBuilder.int8().defaultValue("invalid");
+ }
+
+ @Test
+ public void testInt16Builder() {
+ Schema schema = SchemaBuilder.int16().build();
+ assertTypeAndDefault(schema, Schema.Type.INT16, false, null);
+ assertNoMetadata(schema);
+
+ schema = SchemaBuilder.int16().name(NAME).optional().defaultValue((short) 12)
+ .version(VERSION).doc(DOC).build();
+ assertTypeAndDefault(schema, Schema.Type.INT16, true, (short) 12);
+ assertMetadata(schema, NAME, VERSION, DOC);
+ }
+
+ @Test(expected = SchemaBuilderException.class)
+ public void testInt16BuilderInvalidDefault() {
+ SchemaBuilder.int16().defaultValue("invalid");
+ }
+
+ @Test
+ public void testInt32Builder() {
+ Schema schema = SchemaBuilder.int32().build();
+ assertTypeAndDefault(schema, Schema.Type.INT32, false, null);
+ assertNoMetadata(schema);
+
+ schema = SchemaBuilder.int32().name(NAME).optional().defaultValue(12)
+ .version(VERSION).doc(DOC).build();
+ assertTypeAndDefault(schema, Schema.Type.INT32, true, 12);
+ assertMetadata(schema, NAME, VERSION, DOC);
+ }
+
+ @Test(expected = SchemaBuilderException.class)
+ public void testInt32BuilderInvalidDefault() {
+ SchemaBuilder.int32().defaultValue("invalid");
+ }
+
+ @Test
+ public void testInt64Builder() {
+ Schema schema = SchemaBuilder.int64().build();
+ assertTypeAndDefault(schema, Schema.Type.INT64, false, null);
+ assertNoMetadata(schema);
+
+ schema = SchemaBuilder.int64().name(NAME).optional().defaultValue((long) 12)
+ .version(VERSION).doc(DOC).build();
+ assertTypeAndDefault(schema, Schema.Type.INT64, true, (long) 12);
+ assertMetadata(schema, NAME, VERSION, DOC);
+ }
+
+ @Test(expected = SchemaBuilderException.class)
+ public void testInt64BuilderInvalidDefault() {
+ SchemaBuilder.int64().defaultValue("invalid");
+ }
+
+ @Test
+ public void testFloatBuilder() {
+ Schema schema = SchemaBuilder.float32().build();
+ assertTypeAndDefault(schema, Schema.Type.FLOAT32, false, null);
+ assertNoMetadata(schema);
+
+ schema = SchemaBuilder.float32().name(NAME).optional().defaultValue(12.f)
+ .version(VERSION).doc(DOC).build();
+ assertTypeAndDefault(schema, Schema.Type.FLOAT32, true, 12.f);
+ assertMetadata(schema, NAME, VERSION, DOC);
+ }
+
+ @Test(expected = SchemaBuilderException.class)
+ public void testFloatBuilderInvalidDefault() {
+ SchemaBuilder.float32().defaultValue("invalid");
+ }
+
+ @Test
+ public void testDoubleBuilder() {
+ Schema schema = SchemaBuilder.float64().build();
+ assertTypeAndDefault(schema, Schema.Type.FLOAT64, false, null);
+ assertNoMetadata(schema);
+
+ schema = SchemaBuilder.float64().name(NAME).optional().defaultValue(12.0)
+ .version(VERSION).doc(DOC).build();
+ assertTypeAndDefault(schema, Schema.Type.FLOAT64, true, 12.0);
+ assertMetadata(schema, NAME, VERSION, DOC);
+ }
+
+ @Test(expected = SchemaBuilderException.class)
+ public void testDoubleBuilderInvalidDefault() {
+ SchemaBuilder.float64().defaultValue("invalid");
+ }
+
+ @Test
+ public void testBooleanBuilder() {
+ Schema schema = SchemaBuilder.bool().build();
+ assertTypeAndDefault(schema, Schema.Type.BOOLEAN, false, null);
+ assertNoMetadata(schema);
+
+ schema = SchemaBuilder.bool().name(NAME).optional().defaultValue(true)
+ .version(VERSION).doc(DOC).build();
+ assertTypeAndDefault(schema, Schema.Type.BOOLEAN, true, true);
+ assertMetadata(schema, NAME, VERSION, DOC);
+ }
+
+ @Test(expected = SchemaBuilderException.class)
+ public void testBooleanBuilderInvalidDefault() {
+ SchemaBuilder.bool().defaultValue("invalid");
+ }
+
+ @Test
+ public void testStringBuilder() {
+ Schema schema = SchemaBuilder.string().build();
+ assertTypeAndDefault(schema, Schema.Type.STRING, false, null);
+ assertNoMetadata(schema);
+
+ schema = SchemaBuilder.string().name(NAME).optional().defaultValue("a default string")
+ .version(VERSION).doc(DOC).build();
+ assertTypeAndDefault(schema, Schema.Type.STRING, true, "a default string");
+ assertMetadata(schema, NAME, VERSION, DOC);
+ }
+
+ @Test(expected = SchemaBuilderException.class)
+ public void testStringBuilderInvalidDefault() {
+ SchemaBuilder.string().defaultValue(true);
+ }
+
+ @Test
+ public void testBytesBuilder() {
+ Schema schema = SchemaBuilder.bytes().build();
+ assertTypeAndDefault(schema, Schema.Type.BYTES, false, null);
+ assertNoMetadata(schema);
+
+ schema = SchemaBuilder.bytes().name(NAME).optional().defaultValue("a default byte array".getBytes())
+ .version(VERSION).doc(DOC).build();
+ assertTypeAndDefault(schema, Schema.Type.BYTES, true, "a default byte array".getBytes());
+ assertMetadata(schema, NAME, VERSION, DOC);
+ }
+
+ @Test(expected = SchemaBuilderException.class)
+ public void testBytesBuilderInvalidDefault() {
+ SchemaBuilder.bytes().defaultValue("a string, not bytes");
+ }
+
+
+
+ @Test
+ public void testStructBuilder() {
+ Schema schema = SchemaBuilder.struct()
+ .field("field1", Schema.INT8_SCHEMA)
+ .field("field2", Schema.INT8_SCHEMA)
+ .build();
+ assertTypeAndDefault(schema, Schema.Type.STRUCT, false, null);
+ assertEquals(2, schema.fields().size());
+ assertEquals("field1", schema.fields().get(0).name());
+ assertEquals(0, schema.fields().get(0).index());
+ assertEquals(Schema.INT8_SCHEMA, schema.fields().get(0).schema());
+ assertEquals("field2", schema.fields().get(1).name());
+ assertEquals(1, schema.fields().get(1).index());
+ assertEquals(Schema.INT8_SCHEMA, schema.fields().get(1).schema());
+ assertNoMetadata(schema);
+ }
+
+ @Test(expected = SchemaBuilderException.class)
+ public void testNonStructCantHaveFields() {
+ SchemaBuilder.int8().field("field", SchemaBuilder.int8().build());
+ }
+
+
+ @Test
+ public void testArrayBuilder() {
+ Schema schema = SchemaBuilder.array(Schema.INT8_SCHEMA).build();
+ assertTypeAndDefault(schema, Schema.Type.ARRAY, false, null);
+ assertEquals(schema.valueSchema(), Schema.INT8_SCHEMA);
+ assertNoMetadata(schema);
+
+ // Default value
+ List<Byte> defArray = Arrays.asList((byte) 1, (byte) 2);
+ schema = SchemaBuilder.array(Schema.INT8_SCHEMA).defaultValue(defArray).build();
+ assertTypeAndDefault(schema, Schema.Type.ARRAY, false, defArray);
+ assertEquals(schema.valueSchema(), Schema.INT8_SCHEMA);
+ assertNoMetadata(schema);
+ }
+
+ @Test(expected = SchemaBuilderException.class)
+ public void testArrayBuilderInvalidDefault() {
+ // Array, but wrong embedded type
+ SchemaBuilder.array(Schema.INT8_SCHEMA).defaultValue(Arrays.asList("string")).build();
+ }
+
+ @Test
+ public void testMapBuilder() {
+ Schema schema = SchemaBuilder.map(Schema.INT8_SCHEMA, Schema.INT8_SCHEMA).build();
+ assertTypeAndDefault(schema, Schema.Type.MAP, false, null);
+ assertEquals(schema.keySchema(), Schema.INT8_SCHEMA);
+ assertEquals(schema.valueSchema(), Schema.INT8_SCHEMA);
+ assertNoMetadata(schema);
+
+ // Default value
+ Map<Byte, Byte> defMap = Collections.singletonMap((byte) 5, (byte) 10);
+ schema = SchemaBuilder.map(Schema.INT8_SCHEMA, Schema.INT8_SCHEMA)
+ .defaultValue(defMap).build();
+ assertTypeAndDefault(schema, Schema.Type.MAP, false, defMap);
+ assertEquals(schema.keySchema(), Schema.INT8_SCHEMA);
+ assertEquals(schema.valueSchema(), Schema.INT8_SCHEMA);
+ assertNoMetadata(schema);
+ }
+
+ @Test(expected = SchemaBuilderException.class)
+ public void testMapBuilderInvalidDefault() {
+ // Map, but wrong embedded type
+ Map<Byte, String> defMap = Collections.singletonMap((byte) 5, "foo");
+ SchemaBuilder.map(Schema.INT8_SCHEMA, Schema.INT8_SCHEMA)
+ .defaultValue(defMap).build();
+ }
+
+
+
+ private void assertTypeAndDefault(Schema schema, Schema.Type type, boolean optional, Object defaultValue) {
+ assertEquals(type, schema.type());
+ assertEquals(optional, schema.isOptional());
+ if (type == Schema.Type.BYTES) {
+ // byte[] is not comparable, need to wrap to check correctly
+ if (defaultValue == null)
+ assertNull(schema.defaultValue());
+ else
+ assertEquals(ByteBuffer.wrap((byte[]) defaultValue), ByteBuffer.wrap((byte[]) schema.defaultValue()));
+ } else {
+ assertEquals(defaultValue, schema.defaultValue());
+ }
+ }
+
+ private void assertMetadata(Schema schema, String name, Integer version, String doc) {
+ assertEquals(name, schema.name());
+ assertEquals(version, schema.version());
+ assertEquals(doc, schema.doc());
+ }
+
+ private void assertNoMetadata(Schema schema) {
+ assertMetadata(schema, null, null, null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/copycat/api/src/test/java/org/apache/kafka/copycat/data/StructTest.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/test/java/org/apache/kafka/copycat/data/StructTest.java b/copycat/api/src/test/java/org/apache/kafka/copycat/data/StructTest.java
new file mode 100644
index 0000000..162396b
--- /dev/null
+++ b/copycat/api/src/test/java/org/apache/kafka/copycat/data/StructTest.java
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.data;
+
+import org.apache.kafka.copycat.errors.DataException;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+public class StructTest {
+
+ private static final Schema FLAT_STRUCT_SCHEMA = SchemaBuilder.struct()
+ .field("int8", Schema.INT8_SCHEMA)
+ .field("int16", Schema.INT16_SCHEMA)
+ .field("int32", Schema.INT32_SCHEMA)
+ .field("int64", Schema.INT64_SCHEMA)
+ .field("float32", Schema.FLOAT32_SCHEMA)
+ .field("float64", Schema.FLOAT64_SCHEMA)
+ .field("boolean", Schema.BOOLEAN_SCHEMA)
+ .field("string", Schema.STRING_SCHEMA)
+ .field("bytes", Schema.BYTES_SCHEMA)
+ .build();
+
+ private static final Schema ARRAY_SCHEMA = SchemaBuilder.array(Schema.INT8_SCHEMA).build();
+ private static final Schema MAP_SCHEMA = SchemaBuilder.map(
+ Schema.INT32_SCHEMA,
+ Schema.STRING_SCHEMA
+ ).build();
+ private static final Schema NESTED_CHILD_SCHEMA = SchemaBuilder.struct()
+ .field("int8", Schema.INT8_SCHEMA)
+ .build();
+ private static final Schema NESTED_SCHEMA = SchemaBuilder.struct()
+ .field("array", ARRAY_SCHEMA)
+ .field("map", MAP_SCHEMA)
+ .field("nested", NESTED_CHILD_SCHEMA)
+ .build();
+
+ private static final Schema REQUIRED_FIELD_SCHEMA = Schema.INT8_SCHEMA;
+ private static final Schema OPTIONAL_FIELD_SCHEMA = SchemaBuilder.int8().optional().build();
+ private static final Schema DEFAULT_FIELD_SCHEMA = SchemaBuilder.int8().defaultValue((byte) 0).build();
+
+ @Test
+ public void testFlatStruct() {
+ Struct struct = new Struct(FLAT_STRUCT_SCHEMA)
+ .put("int8", (byte) 12)
+ .put("int16", (short) 12)
+ .put("int32", 12)
+ .put("int64", (long) 12)
+ .put("float32", 12.f)
+ .put("float64", 12.)
+ .put("boolean", true)
+ .put("string", "foobar")
+ .put("bytes", "foobar".getBytes());
+
+ // Test equality, and also the type-specific getters
+ assertEquals((byte) 12, (byte) struct.getInt8("int8"));
+ assertEquals((short) 12, (short) struct.getInt16("int16"));
+ assertEquals(12, (int) struct.getInt32("int32"));
+ assertEquals((long) 12, (long) struct.getInt64("int64"));
+ assertEquals((Float) 12.f, struct.getFloat32("float32"));
+ assertEquals((Double) 12., struct.getFloat64("float64"));
+ assertEquals(true, struct.getBoolean("boolean"));
+ assertEquals("foobar", struct.getString("string"));
+ assertEquals(ByteBuffer.wrap("foobar".getBytes()), ByteBuffer.wrap(struct.getBytes("bytes")));
+
+ struct.validate();
+ }
+
+ @Test
+ public void testComplexStruct() {
+ List<Byte> array = Arrays.asList((byte) 1, (byte) 2);
+ Map<Integer, String> map = Collections.singletonMap(1, "string");
+ Struct struct = new Struct(NESTED_SCHEMA)
+ .put("array", array)
+ .put("map", map)
+ .put("nested", new Struct(NESTED_CHILD_SCHEMA).put("int8", (byte) 12));
+
+ // Separate the call to get the array and map to validate the typed get methods work properly
+ List<Byte> arrayExtracted = struct.getArray("array");
+ assertEquals(array, arrayExtracted);
+ Map<Byte, Byte> mapExtracted = struct.getMap("map");
+ assertEquals(map, mapExtracted);
+ assertEquals((byte) 12, struct.getStruct("nested").get("int8"));
+
+ struct.validate();
+ }
+
+
+ // These don't test all the ways validation can fail, just one for each element. See more extensive validation
+ // tests in SchemaTest. These are meant to ensure that we are invoking the same code path and that we do deeper
+ // inspection than just checking the class of the object
+
+ @Test(expected = DataException.class)
+ public void testInvalidFieldType() {
+ new Struct(FLAT_STRUCT_SCHEMA).put("int8", "should fail because this is a string, not int8");
+ }
+
+ @Test(expected = DataException.class)
+ public void testInvalidArrayFieldElements() {
+ new Struct(NESTED_SCHEMA).put("array", Arrays.asList("should fail since elements should be int8s"));
+ }
+
+ @Test(expected = DataException.class)
+ public void testInvalidMapKeyElements() {
+ new Struct(NESTED_SCHEMA).put("map", Collections.singletonMap("should fail because keys should be int8s", (byte) 12));
+ }
+
+ @Test(expected = DataException.class)
+ public void testInvalidStructFieldSchema() {
+ new Struct(NESTED_SCHEMA).put("nested", new Struct(MAP_SCHEMA));
+ }
+
+ @Test(expected = DataException.class)
+ public void testInvalidStructFieldValue() {
+ new Struct(NESTED_SCHEMA).put("nested", new Struct(NESTED_CHILD_SCHEMA));
+ }
+
+
+ @Test(expected = DataException.class)
+ public void testMissingFieldValidation() {
+ // Required int8 field
+ Schema schema = SchemaBuilder.struct().field("field", REQUIRED_FIELD_SCHEMA).build();
+ Struct struct = new Struct(schema);
+ struct.validate();
+ }
+
+ @Test
+ public void testMissingOptionalFieldValidation() {
+ Schema schema = SchemaBuilder.struct().field("field", OPTIONAL_FIELD_SCHEMA).build();
+ Struct struct = new Struct(schema);
+ struct.validate();
+ }
+
+ @Test
+ public void testMissingFieldWithDefaultValidation() {
+ Schema schema = SchemaBuilder.struct().field("field", DEFAULT_FIELD_SCHEMA).build();
+ Struct struct = new Struct(schema);
+ struct.validate();
+ }
+
+
+ @Test
+ public void testEquals() {
+ Struct struct1 = new Struct(FLAT_STRUCT_SCHEMA)
+ .put("int8", (byte) 12)
+ .put("int16", (short) 12)
+ .put("int32", 12)
+ .put("int64", (long) 12)
+ .put("float32", 12.f)
+ .put("float64", 12.)
+ .put("boolean", true)
+ .put("string", "foobar")
+ .put("bytes", ByteBuffer.wrap("foobar".getBytes()));
+ Struct struct2 = new Struct(FLAT_STRUCT_SCHEMA)
+ .put("int8", (byte) 12)
+ .put("int16", (short) 12)
+ .put("int32", 12)
+ .put("int64", (long) 12)
+ .put("float32", 12.f)
+ .put("float64", 12.)
+ .put("boolean", true)
+ .put("string", "foobar")
+ .put("bytes", ByteBuffer.wrap("foobar".getBytes()));
+ Struct struct3 = new Struct(FLAT_STRUCT_SCHEMA)
+ .put("int8", (byte) 12)
+ .put("int16", (short) 12)
+ .put("int32", 12)
+ .put("int64", (long) 12)
+ .put("float32", 12.f)
+ .put("float64", 12.)
+ .put("boolean", true)
+ .put("string", "mismatching string")
+ .put("bytes", ByteBuffer.wrap("foobar".getBytes()));
+
+ assertEquals(struct1, struct2);
+ assertNotEquals(struct1, struct3);
+
+ List<Byte> array = Arrays.asList((byte) 1, (byte) 2);
+ Map<Integer, String> map = Collections.singletonMap(1, "string");
+ struct1 = new Struct(NESTED_SCHEMA)
+ .put("array", array)
+ .put("map", map)
+ .put("nested", new Struct(NESTED_CHILD_SCHEMA).put("int8", (byte) 12));
+ List<Byte> array2 = Arrays.asList((byte) 1, (byte) 2);
+ Map<Integer, String> map2 = Collections.singletonMap(1, "string");
+ struct2 = new Struct(NESTED_SCHEMA)
+ .put("array", array2)
+ .put("map", map2)
+ .put("nested", new Struct(NESTED_CHILD_SCHEMA).put("int8", (byte) 12));
+ List<Byte> array3 = Arrays.asList((byte) 1, (byte) 2, (byte) 3);
+ Map<Integer, String> map3 = Collections.singletonMap(2, "string");
+ struct3 = new Struct(NESTED_SCHEMA)
+ .put("array", array3)
+ .put("map", map3)
+ .put("nested", new Struct(NESTED_CHILD_SCHEMA).put("int8", (byte) 13));
+
+ assertEquals(struct1, struct2);
+ assertNotEquals(struct1, struct3);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/copycat/data/src/main/java/org/apache/kafka/copycat/data/DataRuntimeException.java
----------------------------------------------------------------------
diff --git a/copycat/data/src/main/java/org/apache/kafka/copycat/data/DataRuntimeException.java b/copycat/data/src/main/java/org/apache/kafka/copycat/data/DataRuntimeException.java
deleted file mode 100644
index 855c0fd..0000000
--- a/copycat/data/src/main/java/org/apache/kafka/copycat/data/DataRuntimeException.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-
-
-package org.apache.kafka.copycat.data;
-
-/** Base Avro exception. */
-public class DataRuntimeException extends RuntimeException {
- public DataRuntimeException(Throwable cause) {
- super(cause);
- }
-
- public DataRuntimeException(String message) {
- super(message);
- }
-
- public DataRuntimeException(String message, Throwable cause) {
- super(message, cause);
- }
-}
-
http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/copycat/data/src/main/java/org/apache/kafka/copycat/data/DataTypeException.java
----------------------------------------------------------------------
diff --git a/copycat/data/src/main/java/org/apache/kafka/copycat/data/DataTypeException.java b/copycat/data/src/main/java/org/apache/kafka/copycat/data/DataTypeException.java
deleted file mode 100644
index 6a74d88..0000000
--- a/copycat/data/src/main/java/org/apache/kafka/copycat/data/DataTypeException.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-
-
-package org.apache.kafka.copycat.data;
-
-
-/** Thrown when an illegal type is used. */
-public class DataTypeException extends DataRuntimeException {
- public DataTypeException(String message) {
- super(message);
- }
-
- public DataTypeException(String message, Throwable cause) {
- super(message, cause);
- }
-}
-
http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/copycat/data/src/main/java/org/apache/kafka/copycat/data/ObjectProperties.java
----------------------------------------------------------------------
diff --git a/copycat/data/src/main/java/org/apache/kafka/copycat/data/ObjectProperties.java b/copycat/data/src/main/java/org/apache/kafka/copycat/data/ObjectProperties.java
deleted file mode 100644
index e995b7f..0000000
--- a/copycat/data/src/main/java/org/apache/kafka/copycat/data/ObjectProperties.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-
-
-package org.apache.kafka.copycat.data;
-
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * Base class for objects that have Object-valued properties.
- */
-public abstract class ObjectProperties {
- public static class Null {
- private Null() {
- }
- }
-
- /** A value representing a JSON <code>null</code>. */
- public static final Null NULL_VALUE = new Null();
-
- Map<String, Object> props = new LinkedHashMap<String, Object>(1);
-
- private Set<String> reserved;
-
- ObjectProperties(Set<String> reserved) {
- this.reserved = reserved;
- }
-
- /**
- * Returns the value of the named, string-valued property in this schema.
- * Returns <tt>null</tt> if there is no string-valued property with that name.
- */
- public String getProp(String name) {
- Object value = getObjectProp(name);
- return (value instanceof String) ? (String) value : null;
- }
-
- /**
- * Returns the value of the named property in this schema.
- * Returns <tt>null</tt> if there is no property with that name.
- */
- public synchronized Object getObjectProp(String name) {
- return props.get(name);
- }
-
- /**
- * Adds a property with the given name <tt>name</tt> and
- * value <tt>value</tt>. Neither <tt>name</tt> nor <tt>value</tt> can be
- * <tt>null</tt>. It is illegal to add a property if another with
- * the same name but different value already exists in this schema.
- *
- * @param name The name of the property to add
- * @param value The value for the property to add
- */
- public synchronized void addProp(String name, Object value) {
- if (reserved.contains(name))
- throw new DataRuntimeException("Can't set reserved property: " + name);
-
- if (value == null)
- throw new DataRuntimeException("Can't set a property to null: " + name);
-
- Object old = props.get(name);
- if (old == null)
- props.put(name, value);
- else if (!old.equals(value))
- throw new DataRuntimeException("Can't overwrite property: " + name);
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/copycat/data/src/main/java/org/apache/kafka/copycat/data/Schema.java
----------------------------------------------------------------------
diff --git a/copycat/data/src/main/java/org/apache/kafka/copycat/data/Schema.java b/copycat/data/src/main/java/org/apache/kafka/copycat/data/Schema.java
deleted file mode 100644
index 04906c3..0000000
--- a/copycat/data/src/main/java/org/apache/kafka/copycat/data/Schema.java
+++ /dev/null
@@ -1,1054 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-
-
-package org.apache.kafka.copycat.data;
-
-import java.nio.ByteBuffer;
-import java.util.*;
-
-/** An abstract data type.
- * <p>A schema may be one of:
- * <ul>
- * <li>A <i>record</i>, mapping field names to field value data;
- * <li>An <i>enum</i>, containing one of a small set of symbols;
- * <li>An <i>array</i> of values, all of the same schema;
- * <li>A <i>map</i>, containing string/value pairs, of a declared schema;
- * <li>A <i>union</i> of other schemas;
- * <li>A <i>fixed</i> sized binary object;
- * <li>A unicode <i>string</i>;
- * <li>A sequence of <i>bytes</i>;
- * <li>A 32-bit signed <i>int</i>;
- * <li>A 64-bit signed <i>long</i>;
- * <li>A 32-bit IEEE single-<i>float</i>; or
- * <li>A 64-bit IEEE <i>double</i>-float; or
- * <li>A <i>boolean</i>; or
- * <li><i>null</i>.
- * </ul>
- *
- * A schema can be constructed using one of its static <tt>createXXX</tt>
- * methods, or more conveniently using {@link SchemaBuilder}. The schema objects are
- * <i>logically</i> immutable.
- * There are only two mutating methods - {@link #setFields(List)} and
- * {@link #addProp(String, Object)}. The following restrictions apply on these
- * two methods.
- * <ul>
- * <li> {@link #setFields(List)}, can be called at most once. This method exists
- * in order to enable clients to build recursive schemas.
- * <li> {@link #addProp(String, Object)} can be called with property names
- * that are not present already. It is not possible to change or delete an
- * existing property.
- * </ul>
- */
-public abstract class Schema extends ObjectProperties {
- private static final int NO_HASHCODE = Integer.MIN_VALUE;
-
- /** The type of a schema. */
- public enum Type {
- ENUM {
- @Override
- public Object defaultValue(Schema schema) {
- return null;
- }
- },
- ARRAY {
- @Override
- public Object defaultValue(Schema schema) {
- return new ArrayList<>();
- }
- },
- MAP {
- @Override
- public Object defaultValue(Schema schema) {
- return new HashMap<Object, Object>();
- }
- },
- UNION {
- @Override
- public Object defaultValue(Schema schema) {
- Schema firstSchema = schema.getTypes().get(0);
- return firstSchema.getType().defaultValue(firstSchema);
- }
- },
- STRING {
- @Override
- public Object defaultValue(Schema schema) {
- return "";
- }
- },
- BYTES {
- @Override
- public Object defaultValue(Schema schema) {
- return new byte[0];
- }
- },
- INT {
- @Override
- public Object defaultValue(Schema schema) {
- return 0;
- }
- },
- LONG {
- @Override
- public Object defaultValue(Schema schema) {
- return 0;
- }
- },
- FLOAT {
- @Override
- public Object defaultValue(Schema schema) {
- return 0;
- }
- },
- DOUBLE {
- @Override
- public Object defaultValue(Schema schema) {
- return 0;
- }
- },
- BOOLEAN {
- @Override
- public Object defaultValue(Schema schema) {
- return false;
- }
- },
- NULL {
- @Override
- public Object defaultValue(Schema schema) {
- return null;
- }
- };
- private String name;
-
- private Type() {
- this.name = this.name().toLowerCase();
- }
-
- public String getName() {
- return name;
- }
-
- public abstract Object defaultValue(Schema schema);
- }
-
- private final Type type;
-
- Schema(Type type) {
- super(SCHEMA_RESERVED);
- this.type = type;
- }
-
- /** Create a schema for a primitive type. */
- public static Schema create(Type type) {
- switch (type) {
- case STRING:
- return new StringSchema();
- case BYTES:
- return new BytesSchema();
- case INT:
- return new IntSchema();
- case LONG:
- return new LongSchema();
- case FLOAT:
- return new FloatSchema();
- case DOUBLE:
- return new DoubleSchema();
- case BOOLEAN:
- return new BooleanSchema();
- case NULL:
- return new NullSchema();
- default:
- throw new DataRuntimeException("Can't create a: " + type);
- }
- }
-
- private static final Set<String> SCHEMA_RESERVED = new HashSet<String>();
-
- static {
- Collections.addAll(SCHEMA_RESERVED,
- "doc", "fields", "items", "name", "namespace",
- "size", "symbols", "values", "type", "aliases");
- }
-
- int hashCode = NO_HASHCODE;
-
- @Override
- public void addProp(String name, Object value) {
- super.addProp(name, value);
- hashCode = NO_HASHCODE;
- }
-
- /** Create an enum schema. */
- public static Schema createEnum(String name, String doc, String namespace,
- List<String> values) {
- return new EnumSchema(new Name(name, namespace), doc,
- new LockableArrayList<String>(values));
- }
-
- /** Create an array schema. */
- public static Schema createArray(Schema elementType) {
- return new ArraySchema(elementType);
- }
-
- /** Create a map schema. */
- public static Schema createMap(Schema valueType) {
- return new MapSchema(valueType);
- }
-
- /** Create a union schema. */
- public static Schema createUnion(List<Schema> types) {
- return new UnionSchema(new LockableArrayList<Schema>(types));
- }
-
- /** Create a union schema. */
- public static Schema createUnion(Schema... types) {
- return createUnion(new LockableArrayList<Schema>(types));
- }
-
- /** Return the type of this schema. */
- public Type getType() {
- return type;
- }
-
- /**
- * If this is a record, returns the Field with the
- * given name <tt>fieldName</tt>. If there is no field by that name, a
- * <tt>null</tt> is returned.
- */
- public Field getField(String fieldname) {
- throw new DataRuntimeException("Not a record: " + this);
- }
-
- /**
- * If this is a record, returns the fields in it. The returned
- * list is in the order of their positions.
- */
- public List<Field> getFields() {
- throw new DataRuntimeException("Not a record: " + this);
- }
-
- /**
- * If this is a record, set its fields. The fields can be set
- * only once in a schema.
- */
- public void setFields(List<Field> fields) {
- throw new DataRuntimeException("Not a record: " + this);
- }
-
- /** If this is an enum, return its symbols. */
- public List<String> getEnumSymbols() {
- throw new DataRuntimeException("Not an enum: " + this);
- }
-
- /** If this is an enum, return a symbol's ordinal value. */
- public int getEnumOrdinal(String symbol) {
- throw new DataRuntimeException("Not an enum: " + this);
- }
-
- /** If this is an enum, returns true if it contains given symbol. */
- public boolean hasEnumSymbol(String symbol) {
- throw new DataRuntimeException("Not an enum: " + this);
- }
-
- /** If this is a record, enum or fixed, returns its name, otherwise the name
- * of the primitive type. */
- public String getName() {
- return type.name;
- }
-
- /** If this is a record, enum, or fixed, returns its docstring,
- * if available. Otherwise, returns null. */
- public String getDoc() {
- return null;
- }
-
- /** If this is a record, enum or fixed, returns its namespace, if any. */
- public String getNamespace() {
- throw new DataRuntimeException("Not a named type: " + this);
- }
-
- /** If this is a record, enum or fixed, returns its namespace-qualified name,
- * otherwise returns the name of the primitive type. */
- public String getFullName() {
- return getName();
- }
-
- /** If this is a record, enum or fixed, add an alias. */
- public void addAlias(String alias) {
- throw new DataRuntimeException("Not a named type: " + this);
- }
-
- /** If this is a record, enum or fixed, add an alias. */
- public void addAlias(String alias, String space) {
- throw new DataRuntimeException("Not a named type: " + this);
- }
-
- /** If this is a record, enum or fixed, return its aliases, if any. */
- public Set<String> getAliases() {
- throw new DataRuntimeException("Not a named type: " + this);
- }
-
- /** Returns true if this record is an error type. */
- public boolean isError() {
- throw new DataRuntimeException("Not a record: " + this);
- }
-
- /** If this is an array, returns its element type. */
- public Schema getElementType() {
- throw new DataRuntimeException("Not an array: " + this);
- }
-
- /** If this is a map, returns its value type. */
- public Schema getValueType() {
- throw new DataRuntimeException("Not a map: " + this);
- }
-
- /** If this is a union, returns its types. */
- public List<Schema> getTypes() {
- throw new DataRuntimeException("Not a union: " + this);
- }
-
- /** If this is a union, return the branch with the provided full name. */
- public Integer getIndexNamed(String name) {
- throw new DataRuntimeException("Not a union: " + this);
- }
-
- /** If this is fixed, returns its size. */
- public int getFixedSize() {
- throw new DataRuntimeException("Not fixed: " + this);
- }
-
- @Override
- public String toString() {
- // FIXME A more JSON-like output showing the details would be nice
- return "Schema:" + this.getType() + ":" + getFullName();
- }
-
- public boolean equals(Object o) {
- if (o == this) return true;
- if (!(o instanceof Schema)) return false;
- Schema that = (Schema) o;
- if (!(this.type == that.type)) return false;
- return equalCachedHash(that) && props.equals(that.props);
- }
-
- public final int hashCode() {
- if (hashCode == NO_HASHCODE)
- hashCode = computeHash();
- return hashCode;
- }
-
- int computeHash() {
- return getType().hashCode() + props.hashCode();
- }
-
- final boolean equalCachedHash(Schema other) {
- return (hashCode == other.hashCode)
- || (hashCode == NO_HASHCODE)
- || (other.hashCode == NO_HASHCODE);
- }
-
- private static final Set<String> FIELD_RESERVED = new HashSet<String>();
-
- static {
- Collections.addAll(FIELD_RESERVED,
- "default", "doc", "name", "order", "type", "aliases");
- }
-
- /** A field within a record. */
- public static class Field extends ObjectProperties {
-
- /** How values of this field should be ordered when sorting records. */
- public enum Order {
- ASCENDING, DESCENDING, IGNORE;
- private String name;
-
- private Order() {
- this.name = this.name().toLowerCase();
- }
- }
-
-
- private final String name; // name of the field.
- private int position = -1;
- private final Schema schema;
- private final String doc;
- private final Object defaultValue;
- private final Order order;
- private Set<String> aliases;
-
- public Field(String name, Schema schema, String doc,
- Object defaultValue) {
- this(name, schema, doc, defaultValue, Order.ASCENDING);
- }
-
- public Field(String name, Schema schema, String doc,
- Object defaultValue, Order order) {
- super(FIELD_RESERVED);
- this.name = validateName(name);
- this.schema = schema;
- this.doc = doc;
- this.defaultValue = validateDefault(name, schema, defaultValue);
- this.order = order;
- }
-
- public String name() {
- return name;
- }
-
-
- /** The position of this field within the record. */
- public int pos() {
- return position;
- }
-
- /** This field's {@link Schema}. */
- public Schema schema() {
- return schema;
- }
-
- /** Field's documentation within the record, if set. May return null. */
- public String doc() {
- return doc;
- }
-
- public Object defaultValue() {
- return defaultValue;
- }
-
- public Order order() {
- return order;
- }
-
- public void addAlias(String alias) {
- if (aliases == null)
- this.aliases = new LinkedHashSet<String>();
- aliases.add(alias);
- }
-
- /** Return the defined aliases as an unmodifieable Set. */
- public Set<String> aliases() {
- if (aliases == null)
- return Collections.emptySet();
- return Collections.unmodifiableSet(aliases);
- }
-
- public boolean equals(Object other) {
- if (other == this) return true;
- if (!(other instanceof Field)) return false;
- Field that = (Field) other;
- return (name.equals(that.name)) &&
- (schema.equals(that.schema)) &&
- defaultValueEquals(that.defaultValue) &&
- (order == that.order) &&
- props.equals(that.props);
- }
-
- public int hashCode() {
- return name.hashCode() + schema.computeHash();
- }
-
- /** Do any possible implicit conversions to double, or return 0 if there isn't a
- * valid conversion */
- private double doubleValue(Object v) {
- if (v instanceof Integer)
- return (double) (Integer) v;
- else if (v instanceof Long)
- return (double) (Long) v;
- else if (v instanceof Float)
- return (double) (Float) v;
- else if (v instanceof Double)
- return (double) (Double) v;
- else
- return 0;
- }
-
- private boolean defaultValueEquals(Object thatDefaultValue) {
- if (defaultValue == null)
- return thatDefaultValue == null;
- if (Double.isNaN(doubleValue(defaultValue)))
- return Double.isNaN(doubleValue(thatDefaultValue));
- return defaultValue.equals(thatDefaultValue);
- }
-
- @Override
- public String toString() {
- return name + " type:" + schema.type + " pos:" + position;
- }
- }
-
- static class Name {
- private final String name;
- private final String space;
- private final String full;
-
- public Name(String name, String space) {
- if (name == null) { // anonymous
- this.name = this.space = this.full = null;
- return;
- }
- int lastDot = name.lastIndexOf('.');
- if (lastDot < 0) { // unqualified name
- this.name = validateName(name);
- } else { // qualified name
- space = name.substring(0, lastDot); // get space from name
- this.name = validateName(name.substring(lastDot + 1, name.length()));
- }
- if ("".equals(space))
- space = null;
- this.space = space;
- this.full = (this.space == null) ? this.name : this.space + "." + this.name;
- }
-
- public boolean equals(Object o) {
- if (o == this) return true;
- if (!(o instanceof Name)) return false;
- Name that = (Name) o;
- return full == null ? that.full == null : full.equals(that.full);
- }
-
- public int hashCode() {
- return full == null ? 0 : full.hashCode();
- }
-
- public String toString() {
- return full;
- }
-
- public String getQualified(String defaultSpace) {
- return (space == null || space.equals(defaultSpace)) ? name : full;
- }
- }
-
- private static abstract class NamedSchema extends Schema {
- final Name name;
- final String doc;
- Set<Name> aliases;
-
- public NamedSchema(Type type, Name name, String doc) {
- super(type);
- this.name = name;
- this.doc = doc;
- if (PRIMITIVES.containsKey(name.full)) {
- throw new DataTypeException("Schemas may not be named after primitives: " + name.full);
- }
- }
-
- public String getName() {
- return name.name;
- }
-
- public String getDoc() {
- return doc;
- }
-
- public String getNamespace() {
- return name.space;
- }
-
- public String getFullName() {
- return name.full;
- }
-
- public void addAlias(String alias) {
- addAlias(alias, null);
- }
-
- public void addAlias(String name, String space) {
- if (aliases == null)
- this.aliases = new LinkedHashSet<Name>();
- if (space == null)
- space = this.name.space;
- aliases.add(new Name(name, space));
- }
-
- public Set<String> getAliases() {
- Set<String> result = new LinkedHashSet<String>();
- if (aliases != null)
- for (Name alias : aliases)
- result.add(alias.full);
- return result;
- }
-
- public boolean equalNames(NamedSchema that) {
- return this.name.equals(that.name);
- }
-
- @Override
- int computeHash() {
- return super.computeHash() + name.hashCode();
- }
- }
-
- private static class SeenPair {
- private Object s1;
- private Object s2;
-
- private SeenPair(Object s1, Object s2) {
- this.s1 = s1;
- this.s2 = s2;
- }
-
- public boolean equals(Object o) {
- return this.s1 == ((SeenPair) o).s1 && this.s2 == ((SeenPair) o).s2;
- }
-
- public int hashCode() {
- return System.identityHashCode(s1) + System.identityHashCode(s2);
- }
- }
-
- private static final ThreadLocal<Set> SEEN_EQUALS = new ThreadLocal<Set>() {
- protected Set initialValue() {
- return new HashSet();
- }
- };
- private static final ThreadLocal<Map> SEEN_HASHCODE = new ThreadLocal<Map>() {
- protected Map initialValue() {
- return new IdentityHashMap();
- }
- };
-
- private static class EnumSchema extends NamedSchema {
- private final List<String> symbols;
- private final Map<String, Integer> ordinals;
-
- public EnumSchema(Name name, String doc,
- LockableArrayList<String> symbols) {
- super(Type.ENUM, name, doc);
- this.symbols = symbols.lock();
- this.ordinals = new HashMap<String, Integer>();
- int i = 0;
- for (String symbol : symbols)
- if (ordinals.put(validateName(symbol), i++) != null)
- throw new SchemaParseException("Duplicate enum symbol: " + symbol);
- }
-
- public List<String> getEnumSymbols() {
- return symbols;
- }
-
- public boolean hasEnumSymbol(String symbol) {
- return ordinals.containsKey(symbol);
- }
-
- public int getEnumOrdinal(String symbol) {
- return ordinals.get(symbol);
- }
-
- public boolean equals(Object o) {
- if (o == this) return true;
- if (!(o instanceof EnumSchema)) return false;
- EnumSchema that = (EnumSchema) o;
- return equalCachedHash(that)
- && equalNames(that)
- && symbols.equals(that.symbols)
- && props.equals(that.props);
- }
-
- @Override
- int computeHash() {
- return super.computeHash() + symbols.hashCode();
- }
- }
-
- private static class ArraySchema extends Schema {
- private final Schema elementType;
-
- public ArraySchema(Schema elementType) {
- super(Type.ARRAY);
- this.elementType = elementType;
- }
-
- public Schema getElementType() {
- return elementType;
- }
-
- public boolean equals(Object o) {
- if (o == this) return true;
- if (!(o instanceof ArraySchema)) return false;
- ArraySchema that = (ArraySchema) o;
- return equalCachedHash(that)
- && elementType.equals(that.elementType)
- && props.equals(that.props);
- }
-
- @Override
- int computeHash() {
- return super.computeHash() + elementType.computeHash();
- }
- }
-
- private static class MapSchema extends Schema {
- private final Schema valueType;
-
- public MapSchema(Schema valueType) {
- super(Type.MAP);
- this.valueType = valueType;
- }
-
- public Schema getValueType() {
- return valueType;
- }
-
- public boolean equals(Object o) {
- if (o == this) return true;
- if (!(o instanceof MapSchema)) return false;
- MapSchema that = (MapSchema) o;
- return equalCachedHash(that)
- && valueType.equals(that.valueType)
- && props.equals(that.props);
- }
-
- @Override
- int computeHash() {
- return super.computeHash() + valueType.computeHash();
- }
- }
-
- private static class UnionSchema extends Schema {
- private final List<Schema> types;
- private final Map<String, Integer> indexByName
- = new HashMap<String, Integer>();
-
- public UnionSchema(LockableArrayList<Schema> types) {
- super(Type.UNION);
- this.types = types.lock();
- int index = 0;
- for (Schema type : types) {
- if (type.getType() == Type.UNION)
- throw new DataRuntimeException("Nested union: " + this);
- String name = type.getFullName();
- if (name == null)
- throw new DataRuntimeException("Nameless in union:" + this);
- if (indexByName.put(name, index++) != null)
- throw new DataRuntimeException("Duplicate in union:" + name);
- }
- }
-
- public List<Schema> getTypes() {
- return types;
- }
-
- public Integer getIndexNamed(String name) {
- return indexByName.get(name);
- }
-
- public boolean equals(Object o) {
- if (o == this) return true;
- if (!(o instanceof UnionSchema)) return false;
- UnionSchema that = (UnionSchema) o;
- return equalCachedHash(that)
- && types.equals(that.types)
- && props.equals(that.props);
- }
-
- @Override
- int computeHash() {
- int hash = super.computeHash();
- for (Schema type : types)
- hash += type.computeHash();
- return hash;
- }
- }
-
- private static class StringSchema extends Schema {
- public StringSchema() {
- super(Type.STRING);
- }
- }
-
- private static class BytesSchema extends Schema {
- public BytesSchema() {
- super(Type.BYTES);
- }
- }
-
- private static class IntSchema extends Schema {
- public IntSchema() {
- super(Type.INT);
- }
- }
-
- private static class LongSchema extends Schema {
- public LongSchema() {
- super(Type.LONG);
- }
- }
-
- private static class FloatSchema extends Schema {
- public FloatSchema() {
- super(Type.FLOAT);
- }
- }
-
- private static class DoubleSchema extends Schema {
- public DoubleSchema() {
- super(Type.DOUBLE);
- }
- }
-
- private static class BooleanSchema extends Schema {
- public BooleanSchema() {
- super(Type.BOOLEAN);
- }
- }
-
- private static class NullSchema extends Schema {
- public NullSchema() {
- super(Type.NULL);
- }
- }
-
- static final Map<String, Type> PRIMITIVES = new HashMap<String, Type>();
-
- static {
- PRIMITIVES.put("string", Type.STRING);
- PRIMITIVES.put("bytes", Type.BYTES);
- PRIMITIVES.put("int", Type.INT);
- PRIMITIVES.put("long", Type.LONG);
- PRIMITIVES.put("float", Type.FLOAT);
- PRIMITIVES.put("double", Type.DOUBLE);
- PRIMITIVES.put("boolean", Type.BOOLEAN);
- PRIMITIVES.put("null", Type.NULL);
- }
-
- static class Names extends LinkedHashMap<Name, Schema> {
- private String space; // default namespace
-
- public Names() {
- }
-
- public Names(String space) {
- this.space = space;
- }
-
- public String space() {
- return space;
- }
-
- public void space(String space) {
- this.space = space;
- }
-
- @Override
- public Schema get(Object o) {
- Name name;
- if (o instanceof String) {
- Type primitive = PRIMITIVES.get((String) o);
- if (primitive != null) return Schema.create(primitive);
- name = new Name((String) o, space);
- if (!containsKey(name)) // if not in default
- name = new Name((String) o, ""); // try anonymous
- } else {
- name = (Name) o;
- }
- return super.get(name);
- }
-
- public boolean contains(Schema schema) {
- return get(((NamedSchema) schema).name) != null;
- }
-
- public void add(Schema schema) {
- put(((NamedSchema) schema).name, schema);
- }
-
- @Override
- public Schema put(Name name, Schema schema) {
- if (containsKey(name))
- throw new SchemaParseException("Can't redefine: " + name);
- return super.put(name, schema);
- }
- }
-
- private static ThreadLocal<Boolean> validateNames
- = new ThreadLocal<Boolean>() {
- @Override
- protected Boolean initialValue() {
- return true;
- }
- };
-
- private static String validateName(String name) {
- if (!validateNames.get()) return name; // not validating names
- int length = name.length();
- if (length == 0)
- throw new SchemaParseException("Empty name");
- char first = name.charAt(0);
- if (!(Character.isLetter(first) || first == '_'))
- throw new SchemaParseException("Illegal initial character: " + name);
- for (int i = 1; i < length; i++) {
- char c = name.charAt(i);
- if (!(Character.isLetterOrDigit(c) || c == '_'))
- throw new SchemaParseException("Illegal character in: " + name);
- }
- return name;
- }
-
- private static final ThreadLocal<Boolean> VALIDATE_DEFAULTS
- = new ThreadLocal<Boolean>() {
- @Override
- protected Boolean initialValue() {
- return false;
- }
- };
-
- private static Object validateDefault(String fieldName, Schema schema,
- Object defaultValue) {
- if ((defaultValue != null)
- && !isValidDefault(schema, defaultValue)) { // invalid default
- String message = "Invalid default for field " + fieldName
- + ": " + defaultValue + " not a " + schema;
- if (VALIDATE_DEFAULTS.get())
- throw new DataTypeException(message); // throw exception
- System.err.println("[WARNING] Avro: " + message); // or log warning
- }
- return defaultValue;
- }
-
- private static boolean isValidDefault(Schema schema, Object defaultValue) {
- switch (schema.getType()) {
- case STRING:
- case ENUM:
- return (defaultValue instanceof String);
- case BYTES:
- case INT:
- return (defaultValue instanceof Integer);
- case LONG:
- return (defaultValue instanceof Long);
- case FLOAT:
- return (defaultValue instanceof Float);
- case DOUBLE:
- return (defaultValue instanceof Double);
- case BOOLEAN:
- return (defaultValue instanceof Boolean);
- case NULL:
- return defaultValue == null;
- case ARRAY:
- if (!(defaultValue instanceof Collection))
- return false;
- for (Object element : (Collection<Object>) defaultValue)
- if (!isValidDefault(schema.getElementType(), element))
- return false;
- return true;
- case MAP:
- if (!(defaultValue instanceof Map))
- return false;
- for (Object value : ((Map<Object, Object>) defaultValue).values())
- if (!isValidDefault(schema.getValueType(), value))
- return false;
- return true;
- case UNION: // union default: first branch
- return isValidDefault(schema.getTypes().get(0), defaultValue);
- default:
- return false;
- }
- }
-
- /**
- * No change is permitted on LockableArrayList once lock() has been
- * called on it.
- * @param <E>
- */
-
- /*
- * This class keeps a boolean variable <tt>locked</tt> which is set
- * to <tt>true</tt> in the lock() method. It's legal to call
- * lock() any number of times. Any lock() other than the first one
- * is a no-op.
- *
- * This class throws <tt>IllegalStateException</tt> if a mutating
- * operation is performed after being locked. Since modifications through
- * iterator also use the list's mutating operations, this effectively
- * blocks all modifications.
- */
- static class LockableArrayList<E> extends ArrayList<E> {
- private static final long serialVersionUID = 1L;
- private boolean locked = false;
-
- public LockableArrayList() {
- }
-
- public LockableArrayList(int size) {
- super(size);
- }
-
- public LockableArrayList(List<E> types) {
- super(types);
- }
-
- public LockableArrayList(E... types) {
- super(types.length);
- Collections.addAll(this, types);
- }
-
- public List<E> lock() {
- locked = true;
- return this;
- }
-
- private void ensureUnlocked() {
- if (locked) {
- throw new IllegalStateException();
- }
- }
-
- public boolean add(E e) {
- ensureUnlocked();
- return super.add(e);
- }
-
- public boolean remove(Object o) {
- ensureUnlocked();
- return super.remove(o);
- }
-
- public E remove(int index) {
- ensureUnlocked();
- return super.remove(index);
- }
-
- public boolean addAll(Collection<? extends E> c) {
- ensureUnlocked();
- return super.addAll(c);
- }
-
- public boolean addAll(int index, Collection<? extends E> c) {
- ensureUnlocked();
- return super.addAll(index, c);
- }
-
- public boolean removeAll(Collection<?> c) {
- ensureUnlocked();
- return super.removeAll(c);
- }
-
- public boolean retainAll(Collection<?> c) {
- ensureUnlocked();
- return super.retainAll(c);
- }
-
- public void clear() {
- ensureUnlocked();
- super.clear();
- }
-
- }
-
-}