You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2015/08/27 20:58:55 UTC
[5/5] kafka git commit: KAFKA-2367; Add Copycat runtime data API.
KAFKA-2367; Add Copycat runtime data API.
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Reviewers: Gwen Shapira, Jay Kreps
Closes #163 from ewencp/kafka-2367-copycat-runtime-data-api
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/492bfdfa
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/492bfdfa
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/492bfdfa
Branch: refs/heads/trunk
Commit: 492bfdfa873db649049382b9785cefef2d6d1eca
Parents: 8c88d19
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Authored: Thu Aug 27 11:58:42 2015 -0700
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Thu Aug 27 11:58:42 2015 -0700
----------------------------------------------------------------------
bin/kafka-run-class.sh | 2 +-
build.gradle | 49 +-
.../kafka/common/config/AbstractConfig.java | 8 +
.../kafka/common/protocol/types/Struct.java | 10 +-
.../kafka/copycat/connector/Connector.java | 4 +-
.../kafka/copycat/connector/CopycatRecord.java | 37 +-
.../kafka/copycat/data/CopycatSchema.java | 235 ++
.../org/apache/kafka/copycat/data/Field.java | 77 +
.../org/apache/kafka/copycat/data/Schema.java | 157 ++
.../kafka/copycat/data/SchemaAndValue.java | 62 +
.../kafka/copycat/data/SchemaBuilder.java | 371 +++
.../org/apache/kafka/copycat/data/Struct.java | 265 ++
.../kafka/copycat/errors/DataException.java | 35 +
.../copycat/errors/SchemaBuilderException.java | 32 +
.../apache/kafka/copycat/sink/SinkRecord.java | 7 +-
.../kafka/copycat/sink/SinkTaskContext.java | 4 +-
.../kafka/copycat/source/SourceRecord.java | 45 +-
.../kafka/copycat/source/SourceTaskContext.java | 2 +-
.../apache/kafka/copycat/storage/Converter.java | 13 +-
.../copycat/storage/OffsetStorageReader.java | 7 +-
.../connector/ConnectorReconfigurationTest.java | 4 +-
.../kafka/copycat/data/CopycatSchemaTest.java | 272 ++
.../apache/kafka/copycat/data/FieldTest.java | 40 +
.../kafka/copycat/data/SchemaBuilderTest.java | 287 +++
.../apache/kafka/copycat/data/StructTest.java | 222 ++
.../copycat/data/DataRuntimeException.java | 36 -
.../kafka/copycat/data/DataTypeException.java | 33 -
.../kafka/copycat/data/ObjectProperties.java | 85 -
.../org/apache/kafka/copycat/data/Schema.java | 1054 --------
.../kafka/copycat/data/SchemaBuilder.java | 2415 ------------------
.../copycat/data/SchemaParseException.java | 32 -
.../copycat/file/FileStreamSinkConnector.java | 4 +-
.../kafka/copycat/file/FileStreamSinkTask.java | 2 +-
.../copycat/file/FileStreamSourceConnector.java | 4 +-
.../copycat/file/FileStreamSourceTask.java | 43 +-
.../file/FileStreamSinkConnectorTest.java | 6 +-
.../copycat/file/FileStreamSinkTaskTest.java | 7 +-
.../file/FileStreamSourceConnectorTest.java | 8 +-
.../copycat/file/FileStreamSourceTaskTest.java | 31 +-
.../kafka/copycat/json/JsonConverter.java | 451 +++-
.../apache/kafka/copycat/json/JsonSchema.java | 79 +-
.../kafka/copycat/json/JsonSerializer.java | 3 +
.../kafka/copycat/json/JsonConverterTest.java | 200 +-
.../apache/kafka/copycat/cli/WorkerConfig.java | 10 -
.../kafka/copycat/runtime/ConnectorConfig.java | 14 -
.../apache/kafka/copycat/runtime/Worker.java | 6 +-
.../kafka/copycat/runtime/WorkerSinkTask.java | 15 +-
.../copycat/runtime/WorkerSinkTaskThread.java | 12 +-
.../kafka/copycat/runtime/WorkerSourceTask.java | 14 +-
.../runtime/standalone/StandaloneHerder.java | 6 +-
.../storage/OffsetStorageReaderImpl.java | 19 +-
.../copycat/storage/OffsetStorageWriter.java | 13 +-
.../kafka/copycat/util/ConnectorTaskId.java | 4 +-
.../kafka/copycat/util/FutureCallback.java | 6 +-
.../copycat/runtime/WorkerSinkTaskTest.java | 26 +-
.../copycat/runtime/WorkerSourceTaskTest.java | 16 +-
.../standalone/StandaloneHerderTest.java | 4 +-
.../storage/OffsetStorageWriterTest.java | 19 +-
settings.gradle | 2 +-
59 files changed, 2844 insertions(+), 4082 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/bin/kafka-run-class.sh
----------------------------------------------------------------------
diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh
index b689b2e..dd37df4 100755
--- a/bin/kafka-run-class.sh
+++ b/bin/kafka-run-class.sh
@@ -66,7 +66,7 @@ do
CLASSPATH=$CLASSPATH:$file
done
-for cc_pkg in "data" "api" "runtime" "file" "json"
+for cc_pkg in "api" "runtime" "file" "json"
do
for file in $base_dir/copycat/${cc_pkg}/build/libs/copycat-${cc_pkg}*.jar $base_dir/copycat/${cc_pkg}/build/dependant-libs/*.jar;
do
http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 3fd54cf..3a11670 100644
--- a/build.gradle
+++ b/build.gradle
@@ -29,7 +29,7 @@ buildscript {
def slf4jlog4j='org.slf4j:slf4j-log4j12:1.7.6'
def slf4japi="org.slf4j:slf4j-api:1.7.6"
-def junit='junit:junit:4.6'
+def junit='junit:junit:4.11'
def easymock='org.easymock:easymock:3.3.1'
def powermock='org.powermock:powermock-module-junit4:1.6.2'
def powermock_easymock='org.powermock:powermock-api-easymock:1.6.2'
@@ -214,7 +214,7 @@ for ( sv in ['2_10_5', '2_11_7'] ) {
}
}
-def copycatPkgs = ['copycat:data', 'copycat:api', 'copycat:runtime', 'copycat:json', 'copycat:file']
+def copycatPkgs = ['copycat:api', 'copycat:runtime', 'copycat:json', 'copycat:file']
def pkgs = ['clients', 'examples', 'contrib:hadoop-consumer', 'contrib:hadoop-producer', 'log4j-appender', 'tools'] + copycatPkgs
tasks.create(name: "jarCopycat", dependsOn: copycatPkgs.collect { it + ":jar" }) {}
@@ -558,56 +558,13 @@ project(':log4j-appender') {
test.dependsOn('checkstyleMain', 'checkstyleTest')
}
-project(':copycat:data') {
- apply plugin: 'checkstyle'
- archivesBaseName = "copycat-data"
-
- dependencies {
- compile project(':clients')
- compile "$slf4japi"
-
- testCompile "$junit"
- testRuntime "$slf4jlog4j"
- }
-
- task testJar(type: Jar) {
- classifier = 'test'
- from sourceSets.test.output
- }
-
- test {
- testLogging {
- events "passed", "skipped", "failed"
- exceptionFormat = 'full'
- }
- }
-
- javadoc {
- include "**/org/apache/kafka/copycat/data/*"
- }
-
- artifacts {
- archives testJar
- }
-
- configurations {
- archives.extendsFrom (testCompile)
- }
-
- /* FIXME Re-enable this with KAFKA-2367 when the placeholder data API is replaced
- checkstyle {
- configFile = new File(rootDir, "checkstyle/checkstyle.xml")
- }
- test.dependsOn('checkstyleMain', 'checkstyleTest') */
-}
-
project(':copycat:api') {
apply plugin: 'checkstyle'
archivesBaseName = "copycat-api"
dependencies {
- compile project(':copycat:data')
compile "$slf4japi"
+ compile project(':clients')
testCompile "$junit"
testRuntime "$slf4jlog4j"
http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index 156ec14..774701a 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -101,6 +101,14 @@ public class AbstractConfig {
return keys;
}
+ public Properties unusedProperties() {
+ Set<String> unusedKeys = this.unused();
+ Properties unusedProps = new Properties();
+ for (String key : unusedKeys)
+ unusedProps.put(key, this.originals().get(key));
+ return unusedProps;
+ }
+
public Map<String, Object> originals() {
Map<String, Object> copy = new HashMap<String, Object>();
copy.putAll(originals);
http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
index 92de6a9..ef2525e 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
@@ -139,11 +139,17 @@ public class Struct {
}
public ByteBuffer getBytes(Field field) {
- return (ByteBuffer) get(field);
+ Object result = get(field);
+ if (result instanceof byte[])
+ return ByteBuffer.wrap((byte[]) result);
+ return (ByteBuffer) result;
}
public ByteBuffer getBytes(String name) {
- return (ByteBuffer) get(name);
+ Object result = get(name);
+ if (result instanceof byte[])
+ return ByteBuffer.wrap((byte[]) result);
+ return (ByteBuffer) result;
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Connector.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Connector.java b/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Connector.java
index 2ea3c95..ae141c4 100644
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Connector.java
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Connector.java
@@ -99,7 +99,7 @@ public abstract class Connector {
/**
* Returns the Task implementation for this Connector.
*/
- public abstract Class<? extends Task> getTaskClass();
+ public abstract Class<? extends Task> taskClass();
/**
* Returns a set of configurations for Tasks based on the current configuration,
@@ -108,7 +108,7 @@ public abstract class Connector {
* @param maxTasks maximum number of configurations to generate
* @return configurations for Tasks
*/
- public abstract List<Properties> getTaskConfigs(int maxTasks);
+ public abstract List<Properties> taskConfigs(int maxTasks);
/**
* Stop this connector.
http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/copycat/api/src/main/java/org/apache/kafka/copycat/connector/CopycatRecord.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/connector/CopycatRecord.java b/copycat/api/src/main/java/org/apache/kafka/copycat/connector/CopycatRecord.java
index 576904a..0d3e8dc 100644
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/connector/CopycatRecord.java
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/connector/CopycatRecord.java
@@ -18,6 +18,7 @@
package org.apache.kafka.copycat.connector;
import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.copycat.data.Schema;
/**
* <p>
@@ -31,36 +32,48 @@ import org.apache.kafka.common.annotation.InterfaceStability;
public abstract class CopycatRecord {
private final String topic;
private final Integer kafkaPartition;
+ private final Schema keySchema;
private final Object key;
+ private final Schema valueSchema;
private final Object value;
- public CopycatRecord(String topic, Integer kafkaPartition, Object value) {
- this(topic, kafkaPartition, null, value);
+ public CopycatRecord(String topic, Integer kafkaPartition, Schema valueSchema, Object value) {
+ this(topic, kafkaPartition, null, null, valueSchema, value);
}
- public CopycatRecord(String topic, Integer kafkaPartition, Object key, Object value) {
+ public CopycatRecord(String topic, Integer kafkaPartition, Schema keySchema, Object key, Schema valueSchema, Object value) {
this.topic = topic;
this.kafkaPartition = kafkaPartition;
+ this.keySchema = keySchema;
this.key = key;
+ this.valueSchema = valueSchema;
this.value = value;
}
- public String getTopic() {
+ public String topic() {
return topic;
}
- public Integer getKafkaPartition() {
+ public Integer kafkaPartition() {
return kafkaPartition;
}
- public Object getKey() {
+ public Object key() {
return key;
}
- public Object getValue() {
+ public Schema keySchema() {
+ return keySchema;
+ }
+
+ public Object value() {
return value;
}
+ public Schema valueSchema() {
+ return valueSchema;
+ }
+
@Override
public String toString() {
return "CopycatRecord{" +
@@ -80,12 +93,16 @@ public abstract class CopycatRecord {
CopycatRecord that = (CopycatRecord) o;
- if (key != null ? !key.equals(that.key) : that.key != null)
- return false;
if (kafkaPartition != null ? !kafkaPartition.equals(that.kafkaPartition) : that.kafkaPartition != null)
return false;
if (topic != null ? !topic.equals(that.topic) : that.topic != null)
return false;
+ if (keySchema != null ? !keySchema.equals(that.keySchema) : that.keySchema != null)
+ return false;
+ if (key != null ? !key.equals(that.key) : that.key != null)
+ return false;
+ if (valueSchema != null ? !valueSchema.equals(that.valueSchema) : that.valueSchema != null)
+ return false;
if (value != null ? !value.equals(that.value) : that.value != null)
return false;
@@ -96,7 +113,9 @@ public abstract class CopycatRecord {
public int hashCode() {
int result = topic != null ? topic.hashCode() : 0;
result = 31 * result + (kafkaPartition != null ? kafkaPartition.hashCode() : 0);
+ result = 31 * result + (keySchema != null ? keySchema.hashCode() : 0);
result = 31 * result + (key != null ? key.hashCode() : 0);
+ result = 31 * result + (valueSchema != null ? valueSchema.hashCode() : 0);
result = 31 * result + (value != null ? value.hashCode() : 0);
return result;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/copycat/api/src/main/java/org/apache/kafka/copycat/data/CopycatSchema.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/data/CopycatSchema.java b/copycat/api/src/main/java/org/apache/kafka/copycat/data/CopycatSchema.java
new file mode 100644
index 0000000..c823f28
--- /dev/null
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/data/CopycatSchema.java
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.data;
+
+import org.apache.kafka.copycat.errors.DataException;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+public class CopycatSchema implements Schema {
+ private static final Map<Type, Class<?>> SCHEMA_TYPE_CLASSES = new HashMap<>();
+ static {
+ SCHEMA_TYPE_CLASSES.put(Type.INT8, Byte.class);
+ SCHEMA_TYPE_CLASSES.put(Type.INT16, Short.class);
+ SCHEMA_TYPE_CLASSES.put(Type.INT32, Integer.class);
+ SCHEMA_TYPE_CLASSES.put(Type.INT64, Long.class);
+ SCHEMA_TYPE_CLASSES.put(Type.FLOAT32, Float.class);
+ SCHEMA_TYPE_CLASSES.put(Type.FLOAT64, Double.class);
+ SCHEMA_TYPE_CLASSES.put(Type.BOOLEAN, Boolean.class);
+ SCHEMA_TYPE_CLASSES.put(Type.STRING, String.class);
+ SCHEMA_TYPE_CLASSES.put(Type.ARRAY, List.class);
+ SCHEMA_TYPE_CLASSES.put(Type.MAP, Map.class);
+ SCHEMA_TYPE_CLASSES.put(Type.STRUCT, Struct.class);
+ // Bytes are handled as a special case
+ }
+
+ // The type of the field
+ private final Type type;
+ private final boolean optional;
+ private final Object defaultValue;
+
+ private final List<Field> fields;
+ private final Map<String, Field> fieldsByName;
+
+ private final Schema keySchema;
+ private final Schema valueSchema;
+
+ // Optional name and version provide a built-in way to indicate what type of data is included. Most
+ // useful for structs to indicate the semantics of the struct and map it to some existing underlying
+ // serializer-specific schema. However, can also be useful in specifying other logical types (e.g. a set is an array
+ // with additional constraints).
+ private final String name;
+ private final Integer version;
+ // Optional human readable documentation describing this schema.
+ private final String doc;
+
+ /**
+ * Construct a Schema. Most users should not construct schemas manually, preferring {@link SchemaBuilder} instead.
+ */
+ public CopycatSchema(Type type, boolean optional, Object defaultValue, String name, Integer version, String doc, List<Field> fields, Schema keySchema, Schema valueSchema) {
+ this.type = type;
+ this.optional = optional;
+ this.defaultValue = defaultValue;
+ this.name = name;
+ this.version = version;
+ this.doc = doc;
+
+ this.fields = fields;
+ if (this.fields != null && this.type == Type.STRUCT) {
+ this.fieldsByName = new HashMap<>();
+ for (Field field : fields)
+ fieldsByName.put(field.name(), field);
+ } else {
+ this.fieldsByName = null;
+ }
+
+ this.keySchema = keySchema;
+ this.valueSchema = valueSchema;
+ }
+
+ @Override
+ public Type type() {
+ return type;
+ }
+
+ @Override
+ public boolean isOptional() {
+ return optional;
+ }
+
+ @Override
+ public Object defaultValue() {
+ return defaultValue;
+ }
+
+ @Override
+ public String name() {
+ return name;
+ }
+
+ @Override
+ public Integer version() {
+ return version;
+ }
+
+ @Override
+ public String doc() {
+ return doc;
+ }
+
+
+
+ @Override
+ public List<Field> fields() {
+ if (type != Type.STRUCT)
+ throw new DataException("Cannot list fields on non-struct type");
+ return fields;
+ }
+
+ public Field field(String fieldName) {
+ if (type != Type.STRUCT)
+ throw new DataException("Cannot look up fields on non-struct type");
+ return fieldsByName.get(fieldName);
+ }
+
+ @Override
+ public Schema keySchema() {
+ if (type != Type.MAP)
+ throw new DataException("Cannot look up key schema on non-map type");
+ return keySchema;
+ }
+
+ @Override
+ public Schema valueSchema() {
+ if (type != Type.MAP && type != Type.ARRAY)
+ throw new DataException("Cannot look up value schema on non-array and non-map type");
+ return valueSchema;
+ }
+
+
+
+ /**
+ * Validate that the value can be used with the schema, i.e. that it's type matches the schema type and nullability
+ * requirements. Throws a DataException if the value is invalid. Returns
+ * @param schema Schema to test
+ * @param value value to test
+ */
+ public static void validateValue(Schema schema, Object value) {
+ if (value == null) {
+ if (!schema.isOptional())
+ throw new DataException("Invalid value: null used for required field");
+ else
+ return;
+ }
+
+ // Special case for bytes. byte[] causes problems because it doesn't handle equals()/hashCode() like we want
+ // objects to, so we support both byte[] and ByteBuffer. Using plain byte[] can cause those methods to fail, so
+ // ByteBuffers are recommended
+ if (schema.type() == Type.BYTES && (value instanceof byte[] || value instanceof ByteBuffer))
+ return;
+ Class<?> expectedClass = SCHEMA_TYPE_CLASSES.get(schema.type());
+ if (expectedClass == null || !expectedClass.isInstance(value))
+ throw new DataException("Invalid value: expected " + expectedClass + " for type " + schema.type() + " but tried to use " + value.getClass());
+
+ switch (schema.type()) {
+ case STRUCT:
+ Struct struct = (Struct) value;
+ if (!struct.schema().equals(schema))
+ throw new DataException("Struct schemas do not match.");
+ struct.validate();
+ break;
+ case ARRAY:
+ List<?> array = (List<?>) value;
+ for (Object entry : array)
+ validateValue(schema.valueSchema(), entry);
+ break;
+ case MAP:
+ Map<?, ?> map = (Map<?, ?>) value;
+ for (Map.Entry<?, ?> entry : map.entrySet()) {
+ validateValue(schema.keySchema(), entry.getKey());
+ validateValue(schema.valueSchema(), entry.getValue());
+ }
+ break;
+ }
+ }
+
+ /**
+ * Validate that the value can be used for this schema, i.e. that it's type matches the schema type and optional
+ * requirements. Throws a DataException if the value is invalid.
+ * @param value the value to validate
+ */
+ public void validateValue(Object value) {
+ validateValue(this, value);
+ }
+
+ @Override
+ public CopycatSchema schema() {
+ return this;
+ }
+
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ CopycatSchema schema = (CopycatSchema) o;
+ return Objects.equals(optional, schema.optional) &&
+ Objects.equals(type, schema.type) &&
+ Objects.equals(defaultValue, schema.defaultValue) &&
+ Objects.equals(fields, schema.fields) &&
+ Objects.equals(keySchema, schema.keySchema) &&
+ Objects.equals(valueSchema, schema.valueSchema) &&
+ Objects.equals(name, schema.name) &&
+ Objects.equals(version, schema.version) &&
+ Objects.equals(doc, schema.doc);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(type, optional, defaultValue, fields, keySchema, valueSchema, name, version, doc);
+ }
+
+ @Override
+ public String toString() {
+ if (name != null)
+ return "Schema{" + name + ":" + type + "}";
+ else
+ return "Schema{" + type + "}";
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/copycat/api/src/main/java/org/apache/kafka/copycat/data/Field.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/data/Field.java b/copycat/api/src/main/java/org/apache/kafka/copycat/data/Field.java
new file mode 100644
index 0000000..c71cdb4
--- /dev/null
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/data/Field.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.data;
+
+import java.util.Objects;
+
+/**
+ * <p>
+ * A field in a {@link Struct}, consisting of a field name, index, and {@link Schema} for the field value.
+ * </p>
+ */
+public class Field {
+ private final String name;
+ private final int index;
+ private final Schema schema;
+
+ public Field(String name, int index, Schema schema) {
+ this.name = name;
+ this.index = index;
+ this.schema = schema;
+ }
+
+ /**
+ * Get the name of this field.
+ * @return the name of this field
+ */
+ public String name() {
+ return name;
+ }
+
+
+ /**
+ * Get the index of this field within the struct.
+ * @return the index of this field
+ */
+ public int index() {
+ return index;
+ }
+
+ /**
+ * Get the schema of this field
+ * @return the schema of values of this field
+ */
+ public Schema schema() {
+ return schema;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ Field field = (Field) o;
+ return Objects.equals(index, field.index) &&
+ Objects.equals(name, field.name) &&
+ Objects.equals(schema, field.schema);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(name, index, schema);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/copycat/api/src/main/java/org/apache/kafka/copycat/data/Schema.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/data/Schema.java b/copycat/api/src/main/java/org/apache/kafka/copycat/data/Schema.java
new file mode 100644
index 0000000..5ceb57d
--- /dev/null
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/data/Schema.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.data;
+
+import java.util.List;
+
+/**
+ * <p>
+ * Definition of an abstract data type. Data types can be primitive types (integer types, floating point types,
+ * boolean, strings, and bytes) or complex types (typed arrays, maps with one key schema and value schema,
+ * and structs that have a fixed set of field names each with an associated value schema). Any type can be specified
+ * as optional, allowing it to be omitted (resulting in null values when it is missing) and can specify a default
+ * value.
+ * </p>
+ * <p>
+ * All schemas may have some associated metadata: a name, version, and documentation. These are all considered part
+ * of the schema itself and included when comparing schemas. Besides adding important metadata, these fields enable
+ * the specification of logical types that specify additional constraints and semantics (e.g. UNIX timestamps are
+ * just an int64, but the user needs the know about the additional semantics to interpret it properly).
+ * </p>
+ * <p>
+ * Schemas can be created directly, but in most cases using {@link SchemaBuilder} will be simpler.
+ * </p>
+ */
+public interface Schema {
+ /**
+ * The type of a schema. These only include the core types; logical types must be determined by checking the schema name.
+ */
+ enum Type {
+ INT8, INT16, INT32, INT64, FLOAT32, FLOAT64, BOOLEAN, STRING, BYTES, ARRAY, MAP, STRUCT;
+
+ private String name;
+
+ Type() {
+ this.name = this.name().toLowerCase();
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public boolean isPrimitive() {
+ switch (this) {
+ case INT8:
+ case INT16:
+ case INT32:
+ case INT64:
+ case FLOAT32:
+ case FLOAT64:
+ case BOOLEAN:
+ case STRING:
+ case BYTES:
+ return true;
+ }
+ return false;
+ }
+ }
+
+
+ Schema INT8_SCHEMA = SchemaBuilder.int8().build();
+ Schema INT16_SCHEMA = SchemaBuilder.int16().build();
+ Schema INT32_SCHEMA = SchemaBuilder.int32().build();
+ Schema INT64_SCHEMA = SchemaBuilder.int64().build();
+ Schema FLOAT32_SCHEMA = SchemaBuilder.float32().build();
+ Schema FLOAT64_SCHEMA = SchemaBuilder.float64().build();
+ Schema BOOLEAN_SCHEMA = SchemaBuilder.bool().build();
+ Schema STRING_SCHEMA = SchemaBuilder.string().build();
+ Schema BYTES_SCHEMA = SchemaBuilder.bytes().build();
+
+ Schema OPTIONAL_INT8_SCHEMA = SchemaBuilder.int8().optional().build();
+ Schema OPTIONAL_INT16_SCHEMA = SchemaBuilder.int16().optional().build();
+ Schema OPTIONAL_INT32_SCHEMA = SchemaBuilder.int32().optional().build();
+ Schema OPTIONAL_INT64_SCHEMA = SchemaBuilder.int64().optional().build();
+ Schema OPTIONAL_FLOAT32_SCHEMA = SchemaBuilder.float32().optional().build();
+ Schema OPTIONAL_FLOAT64_SCHEMA = SchemaBuilder.float64().optional().build();
+ Schema OPTIONAL_BOOLEAN_SCHEMA = SchemaBuilder.bool().optional().build();
+ Schema OPTIONAL_STRING_SCHEMA = SchemaBuilder.string().optional().build();
+ Schema OPTIONAL_BYTES_SCHEMA = SchemaBuilder.bytes().optional().build();
+
+
+ /**
+ * @return the type of this schema
+ */
+ Type type();
+
+ /**
+ * @return true if this field is optional, false otherwise
+ */
+ boolean isOptional();
+
+ /**
+ * @return the default value for this schema
+ */
+ Object defaultValue();
+
+ /**
+ * @return the name of this schema
+ */
+ String name();
+
+ /**
+ * Get the optional version of the schema. If a version is included, newer versions *must* be larger than older ones.
+ * @return the version of this schema
+ */
+ Integer version();
+
+ /**
+ * @return the documentation for this schema
+ */
+ String doc();
+
+ /**
+ * Get the key schema for this map schema. Throws a DataException if this schema is not a map.
+ * @return the key schema
+ */
+ Schema keySchema();
+
+ /**
+ * Get the value schema for this map or array schema. Throws a DataException if this schema is not a map or array.
+ * @return the value schema
+ */
+ Schema valueSchema();
+
+ /**
+ * Get the list of fields for this Schema. Throws a DataException if this schema is not a struct.
+ * @return the list of fields for this Schema
+ */
+ List<Field> fields();
+
+ /**
+ * Get a field for this Schema by name. Throws a DataException if this schema is not a struct.
+ * @param fieldName the name of the field to look up
+ * @return the Field object for the specified field, or null if there is no field with the given name
+ */
+ Field field(String fieldName);
+
+ /**
+ * Return a concrete instance of the {@link Schema}
+ * @return the {@link Schema}
+ */
+ Schema schema();
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/copycat/api/src/main/java/org/apache/kafka/copycat/data/SchemaAndValue.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/data/SchemaAndValue.java b/copycat/api/src/main/java/org/apache/kafka/copycat/data/SchemaAndValue.java
new file mode 100644
index 0000000..368a8cf
--- /dev/null
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/data/SchemaAndValue.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.data;
+
+import java.util.Objects;
+
+public class SchemaAndValue {
+ private final Schema schema;
+ private final Object value;
+
+ public static final SchemaAndValue NULL = new SchemaAndValue(null, null);
+
+ public SchemaAndValue(Schema schema, Object value) {
+ this.value = value;
+ this.schema = schema;
+ }
+
+ public Schema schema() {
+ return schema;
+ }
+
+ public Object value() {
+ return value;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ SchemaAndValue that = (SchemaAndValue) o;
+ return Objects.equals(schema, that.schema) &&
+ Objects.equals(value, that.value);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(schema, value);
+ }
+
+ @Override
+ public String toString() {
+ return "SchemaAndValue{" +
+ "schema=" + schema +
+ ", value=" + value +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/copycat/api/src/main/java/org/apache/kafka/copycat/data/SchemaBuilder.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/data/SchemaBuilder.java b/copycat/api/src/main/java/org/apache/kafka/copycat/data/SchemaBuilder.java
new file mode 100644
index 0000000..fe9d474
--- /dev/null
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/data/SchemaBuilder.java
@@ -0,0 +1,371 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.data;
+
+import org.apache.kafka.copycat.errors.DataException;
+import org.apache.kafka.copycat.errors.SchemaBuilderException;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * <p>
+ * SchemaBuilder provides a fluent API for constructing {@link Schema} objects. It allows you to set each of the
+ * properties for the schema and each call returns the SchemaBuilder so the calls can be chained. When nested types
+ * are required, use one of the predefined schemas from {@link Schema} or use a second SchemaBuilder inline.
+ * </p>
+ * <p>
+ * Here is an example of building a struct schema:
+ * <pre>
+ * Schema dateSchema = SchemaBuilder.struct()
+ * .name("com.example.CalendarDate").version(2).doc("A calendar date including month, day, and year.")
+ * .field("month", Schema.STRING_SCHEMA)
+ * .field("day", Schema.INT8_SCHEMA)
+ * .field("year", Schema.INT16_SCHEMA)
+ * .build();
+ * </pre>
+ * </p>
+ * <p>
+ * Here is an example of using a second SchemaBuilder to construct complex, nested types:
+ * <pre>
+ * Schema userListSchema = SchemaBuilder.array(
+ * SchemaBuilder.struct().name("com.example.User").field("username", Schema.STRING_SCHEMA).field("id", Schema.INT64_SCHEMA).build()
+ * ).build();
+ * </pre>
+ * </p>
+ */
+public class SchemaBuilder implements Schema {
+ private static final String TYPE_FIELD = "type";
+ private static final String OPTIONAL_FIELD = "optional";
+ private static final String DEFAULT_FIELD = "default";
+ private static final String NAME_FIELD = "name";
+ private static final String VERSION_FIELD = "version";
+ private static final String DOC_FIELD = "doc";
+
+
+ private final Type type;
+ private Boolean optional = null;
+ private Object defaultValue = null;
+
+ private List<Field> fields = null;
+ private Schema keySchema = null;
+ private Schema valueSchema = null;
+
+ private String name;
+ private Integer version;
+ // Optional human readable documentation describing this schema.
+ private String doc;
+
+
+ private SchemaBuilder(Type type) {
+ this.type = type;
+ }
+
+ // Common/metadata fields
+
+ @Override
+ public boolean isOptional() {
+ return optional == null ? false : optional;
+ }
+
+ /**
+ * Set this schema as optional.
+ * @return the SchemaBuilder
+ */
+ public SchemaBuilder optional() {
+ checkNull(OPTIONAL_FIELD, optional);
+ optional = true;
+ return this;
+ }
+
+ /**
+ * Set this schema as required. This is the default, but this method can be used to make this choice explicit.
+ * @return the SchemaBuilder
+ */
+ public SchemaBuilder required() {
+ checkNull(OPTIONAL_FIELD, optional);
+ optional = false;
+ return this;
+ }
+
+ @Override
+ public Object defaultValue() {
+ return defaultValue;
+ }
+
+ /**
+ * Set the default value for this schema. The value is validated against the schema type, throwing a
+ * {@link SchemaBuilderException} if it does not match.
+ * @param value the default value
+ * @return the SchemaBuilder
+ */
+ public SchemaBuilder defaultValue(Object value) {
+ checkNull(DEFAULT_FIELD, defaultValue);
+ checkNotNull(TYPE_FIELD, type, DEFAULT_FIELD);
+ try {
+ CopycatSchema.validateValue(this, value);
+ } catch (DataException e) {
+ throw new SchemaBuilderException("Invalid default value", e);
+ }
+ defaultValue = value;
+ return this;
+ }
+
+ @Override
+ public String name() {
+ return name;
+ }
+
+ /**
+ * Set the name of this schema.
+ * @param name the schema name
+ * @return the SchemaBuilder
+ */
+ public SchemaBuilder name(String name) {
+ checkNull(NAME_FIELD, this.name);
+ this.name = name;
+ return this;
+ }
+
+ @Override
+ public Integer version() {
+ return version;
+ }
+
+ /**
+ * Set the version of this schema. Schema versions are integers which, if provided, must indicate which schema is
+ * newer and which is older by their ordering.
+ * @param version the schema version
+ * @return the SchemaBuilder
+ */
+ public SchemaBuilder version(Integer version) {
+ checkNull(VERSION_FIELD, this.version);
+ this.version = version;
+ return this;
+ }
+
+ @Override
+ public String doc() {
+ return doc;
+ }
+
+ /**
+ * Set the documentation for this schema.
+ * @param doc the documentation
+ * @return the SchemaBuilder
+ */
+ public SchemaBuilder doc(String doc) {
+ checkNull(DOC_FIELD, this.doc);
+ this.doc = doc;
+ return this;
+ }
+
+
+ @Override
+ public Type type() {
+ return type;
+ }
+
+ /**
+ * Create a SchemaBuilder for the specified type.
+ *
+ * Usually it will be simpler to use one of the variants like {@link #string()} or {@link #struct()}, but this form
+ * can be useful when generating schemas dynamically.
+ *
+ * @param type the schema type
+ * @return a new SchemaBuilder
+ */
+ public static SchemaBuilder type(Type type) {
+ return new SchemaBuilder(type);
+ }
+
+ // Primitive types
+
+ /**
+ * @return a new {@link Type#INT8} SchemaBuilder
+ */
+ public static SchemaBuilder int8() {
+ return new SchemaBuilder(Type.INT8);
+ }
+
+ /**
+ * @return a new {@link Type#INT16} SchemaBuilder
+ */
+ public static SchemaBuilder int16() {
+ return new SchemaBuilder(Type.INT16);
+ }
+
+ /**
+ * @return a new {@link Type#INT32} SchemaBuilder
+ */
+ public static SchemaBuilder int32() {
+ return new SchemaBuilder(Type.INT32);
+ }
+
+ /**
+ * @return a new {@link Type#INT64} SchemaBuilder
+ */
+ public static SchemaBuilder int64() {
+ return new SchemaBuilder(Type.INT64);
+ }
+
+ /**
+ * @return a new {@link Type#FLOAT32} SchemaBuilder
+ */
+ public static SchemaBuilder float32() {
+ return new SchemaBuilder(Type.FLOAT32);
+ }
+
+ /**
+ * @return a new {@link Type#FLOAT64} SchemaBuilder
+ */
+ public static SchemaBuilder float64() {
+ return new SchemaBuilder(Type.FLOAT64);
+ }
+
+ /**
+ * @return a new {@link Type#BOOLEAN} SchemaBuilder
+ */
+ public static SchemaBuilder bool() {
+ return new SchemaBuilder(Type.BOOLEAN);
+ }
+
+ /**
+ * @return a new {@link Type#STRING} SchemaBuilder
+ */
+ public static SchemaBuilder string() {
+ return new SchemaBuilder(Type.STRING);
+ }
+
+ /**
+ * @return a new {@link Type#BYTES} SchemaBuilder
+ */
+ public static SchemaBuilder bytes() {
+ return new SchemaBuilder(Type.BYTES);
+ }
+
+
+ // Structs
+
+ /**
+ * @return a new {@link Type#STRUCT} SchemaBuilder
+ */
+ public static SchemaBuilder struct() {
+ return new SchemaBuilder(Type.STRUCT);
+ }
+
+ /**
+ * Add a field to this struct schema. Throws a SchemaBuilderException if this is not a struct schema.
+ * @param fieldName the name of the field to add
+ * @param fieldSchema the Schema for the field's value
+ * @return the SchemaBuilder
+ */
+ public SchemaBuilder field(String fieldName, Schema fieldSchema) {
+ if (type != Type.STRUCT)
+ throw new SchemaBuilderException("Cannot create fields on type " + type);
+ if (fields == null)
+ fields = new ArrayList<>();
+ int fieldIndex = fields.size();
+ fields.add(new Field(fieldName, fieldIndex, fieldSchema));
+ return this;
+ }
+
+ /**
+ * Get the list of fields for this Schema. Throws a DataException if this schema is not a struct.
+ * @return the list of fields for this Schema
+ */
+ public List<Field> fields() {
+ if (type != Type.STRUCT)
+ throw new DataException("Cannot list fields on non-struct type");
+ return fields;
+ }
+
+ public Field field(String fieldName) {
+ if (type != Type.STRUCT)
+ throw new DataException("Cannot look up fields on non-struct type");
+ for (Field field : fields)
+ if (field.name() == fieldName)
+ return field;
+ return null;
+ }
+
+
+
+ // Maps & Arrays
+
+ /**
+ * @param valueSchema the schema for elements of the array
+ * @return a new {@link Type#ARRAY} SchemaBuilder
+ */
+ public static SchemaBuilder array(Schema valueSchema) {
+ SchemaBuilder builder = new SchemaBuilder(Type.ARRAY);
+ builder.valueSchema = valueSchema;
+ return builder;
+ }
+
+ /**
+ * @param keySchema the schema for keys in the map
+ * @param valueSchema the schema for values in the map
+ * @return a new {@link Type#MAP} SchemaBuilder
+ */
+ public static SchemaBuilder map(Schema keySchema, Schema valueSchema) {
+ SchemaBuilder builder = new SchemaBuilder(Type.MAP);
+ builder.keySchema = keySchema;
+ builder.valueSchema = valueSchema;
+ return builder;
+ }
+
+ @Override
+ public Schema keySchema() {
+ return keySchema;
+ }
+
+ @Override
+ public Schema valueSchema() {
+ return valueSchema;
+ }
+
+
+ /**
+ * Build the Schema using the current settings
+ * @return the {@link Schema}
+ */
+ public Schema build() {
+ return new CopycatSchema(type, isOptional(), defaultValue, name, version, doc, fields, keySchema, valueSchema);
+ }
+
+ /**
+ * Return a concrete instance of the {@link Schema} specified by this builder
+ * @return the {@link Schema}
+ */
+ @Override
+ public Schema schema() {
+ return build();
+ }
+
+
+ private static void checkNull(String fieldName, Object val) {
+ if (val != null)
+ throw new SchemaBuilderException("Invalid SchemaBuilder call: " + fieldName + " has already been set.");
+ }
+
+ private static void checkNotNull(String fieldName, Object val, String fieldToSet) {
+ if (val == null)
+ throw new SchemaBuilderException("Invalid SchemaBuilder call: " + fieldName + " must be specified to set " + fieldToSet);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/copycat/api/src/main/java/org/apache/kafka/copycat/data/Struct.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/data/Struct.java b/copycat/api/src/main/java/org/apache/kafka/copycat/data/Struct.java
new file mode 100644
index 0000000..bd757c4
--- /dev/null
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/data/Struct.java
@@ -0,0 +1,265 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.data;
+
+import org.apache.kafka.copycat.errors.DataException;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * <p>
+ * A structured record containing a set of named fields with values, each field using an independent {@link Schema}.
+ * Struct objects must specify a complete {@link Schema} up front, and only fields specified in the Schema may be set.
+ * </p>
+ * <p>
+ * The Struct's {@link #put(String, Object)} method returns the Struct itself to provide a fluent API for constructing
+ * complete objects:
+ * <pre>
+ * Schema schema = SchemaBuilder.struct().name("com.example.Person")
+ * .field("name", Schema.STRING_SCHEMA).field("age", Schema.INT32_SCHEMA).build()
+ * Struct struct = new Struct(schema).put("name", "Bobby McGee").put("age", 21)
+ * </pre>
+ * </p>
+ */
+public class Struct {
+
+ private final Schema schema;
+ private final Object[] values;
+
+ /**
+ * Create a new Struct for this {@link Schema}
+ * @param schema the {@link Schema} for the Struct
+ */
+ public Struct(Schema schema) {
+ if (schema.type() != Schema.Type.STRUCT)
+ throw new DataException("Not a struct schema: " + schema);
+ this.schema = schema;
+ this.values = new Object[schema.fields().size()];
+ }
+
+ /**
+ * Get the schema for this Struct.
+ * @return the Struct's schema
+ */
+ public Schema schema() {
+ return schema;
+ }
+
+ /**
+ * Get the value of a field, returning the default value if no value has been set yet and a default value is specified
+ * in the field's schema. Because this handles fields of all types, the value is returned as an {@link Object} and
+ * must be cast to a more specific type.
+ * @param fieldName the field name to lookup
+ * @return the value for the field
+ */
+ public Object get(String fieldName) {
+ Field field = lookupField(fieldName);
+ return get(field);
+ }
+
+ /**
+ * Get the value of a field, returning the default value if no value has been set yet and a default value is specified
+ * in the field's schema. Because this handles fields of all types, the value is returned as an {@link Object} and
+ * must be cast to a more specific type.
+ * @param field the field to lookup
+ * @return the value for the field
+ */
+ public Object get(Field field) {
+ Object val = values[field.index()];
+ if (val == null && schema.defaultValue() != null) {
+ val = schema.defaultValue();
+ }
+ return val;
+ }
+
+ /**
+ * Get the underlying raw value for the field without accounting for default values.
+ * @param fieldName the field to get the value of
+ * @return the raw value
+ */
+ public Object getWithoutDefault(String fieldName) {
+ Field field = lookupField(fieldName);
+ return values[field.index()];
+ }
+
+ // Note that all getters have to have boxed return types since the fields might be optional
+
+ /**
+ * Equivalent to calling {@link #get(String)} and casting the result to a Byte.
+ */
+ public Byte getInt8(String fieldName) {
+ return (Byte) getCheckType(fieldName, Schema.Type.INT8);
+ }
+
+ /**
+ * Equivalent to calling {@link #get(String)} and casting the result to a Short.
+ */
+ public Short getInt16(String fieldName) {
+ return (Short) getCheckType(fieldName, Schema.Type.INT16);
+ }
+
+ /**
+ * Equivalent to calling {@link #get(String)} and casting the result to a Integer.
+ */
+ public Integer getInt32(String fieldName) {
+ return (Integer) getCheckType(fieldName, Schema.Type.INT32);
+ }
+
+ /**
+ * Equivalent to calling {@link #get(String)} and casting the result to a Long.
+ */
+ public Long getInt64(String fieldName) {
+ return (Long) getCheckType(fieldName, Schema.Type.INT64);
+ }
+
+ /**
+ * Equivalent to calling {@link #get(String)} and casting the result to a Float.
+ */
+ public Float getFloat32(String fieldName) {
+ return (Float) getCheckType(fieldName, Schema.Type.FLOAT32);
+ }
+
+ /**
+ * Equivalent to calling {@link #get(String)} and casting the result to a Double.
+ */
+ public Double getFloat64(String fieldName) {
+ return (Double) getCheckType(fieldName, Schema.Type.FLOAT64);
+ }
+
+ /**
+ * Equivalent to calling {@link #get(String)} and casting the result to a Boolean.
+ */
+ public Boolean getBoolean(String fieldName) {
+ return (Boolean) getCheckType(fieldName, Schema.Type.BOOLEAN);
+ }
+
+ /**
+ * Equivalent to calling {@link #get(String)} and casting the result to a String.
+ */
+ public String getString(String fieldName) {
+ return (String) getCheckType(fieldName, Schema.Type.STRING);
+ }
+
+ /**
+ * Equivalent to calling {@link #get(String)} and casting the result to a byte[].
+ */
+ public byte[] getBytes(String fieldName) {
+ Object bytes = getCheckType(fieldName, Schema.Type.BYTES);
+ if (bytes instanceof ByteBuffer)
+ return ((ByteBuffer) bytes).array();
+ return (byte[]) bytes;
+ }
+
+ /**
+ * Equivalent to calling {@link #get(String)} and casting the result to a List.
+ */
+ public <T> List<T> getArray(String fieldName) {
+ return (List<T>) getCheckType(fieldName, Schema.Type.ARRAY);
+ }
+
+ /**
+ * Equivalent to calling {@link #get(String)} and casting the result to a Map.
+ */
+ public <K, V> Map<K, V> getMap(String fieldName) {
+ return (Map<K, V>) getCheckType(fieldName, Schema.Type.MAP);
+ }
+
+ /**
+ * Equivalent to calling {@link #get(String)} and casting the result to a Struct.
+ */
+ public Struct getStruct(String fieldName) {
+ return (Struct) getCheckType(fieldName, Schema.Type.STRUCT);
+ }
+
+ /**
+ * Set the value of a field. Validates the value, throwing a {@link DataException} if it does not match the field's
+ * {@link Schema}.
+ * @param fieldName the name of the field to set
+ * @param value the value of the field
+ * @return the Struct, to allow chaining of {@link #put(String, Object)} calls
+ */
+ public Struct put(String fieldName, Object value) {
+ Field field = lookupField(fieldName);
+ return put(field, value);
+ }
+
+ /**
+ * Set the value of a field. Validates the value, throwing a {@link DataException} if it does not match the field's
+ * {@link Schema}.
+ * @param field the field to set
+ * @param value the value of the field
+ * @return the Struct, to allow chaining of {@link #put(String, Object)} calls
+ */
+ public Struct put(Field field, Object value) {
+ CopycatSchema.validateValue(field.schema(), value);
+ values[field.index()] = value;
+ return this;
+ }
+
+
+ /**
+ * Validates that this struct has filled in all the necessary data with valid values. For required fields
+ * without defaults, this validates that a value has been set and has matching types/schemas. If any validation
+ * fails, throws a DataException.
+ */
+ public void validate() {
+ for (Field field : schema.fields()) {
+ Schema fieldSchema = field.schema();
+ Object value = values[field.index()];
+ if (value == null && (fieldSchema.isOptional() || fieldSchema.defaultValue() != null))
+ continue;
+ CopycatSchema.validateValue(fieldSchema, value);
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ Struct struct = (Struct) o;
+ return Objects.equals(schema, struct.schema) &&
+ Arrays.equals(values, struct.values);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(schema, Arrays.hashCode(values));
+ }
+
+ private Field lookupField(String fieldName) {
+ Field field = schema.field(fieldName);
+ if (field == null)
+ throw new DataException(fieldName + " is not a valid field name");
+ return field;
+ }
+
+ // Get the field's value, but also check that the field matches the specified type, throwing an exception if it doesn't.
+ // Used to implement the get*() methods that return typed data instead of Object
+ private Object getCheckType(String fieldName, Schema.Type type) {
+ Field field = lookupField(fieldName);
+ if (field.schema().type() != type)
+ throw new DataException("Field '" + fieldName + "' is not of type " + type);
+ return values[field.index()];
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/copycat/api/src/main/java/org/apache/kafka/copycat/errors/DataException.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/errors/DataException.java b/copycat/api/src/main/java/org/apache/kafka/copycat/errors/DataException.java
new file mode 100644
index 0000000..11139a4
--- /dev/null
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/errors/DataException.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.errors;
+
+/**
+ * Base class for all Copycat data API exceptions.
+ */
+public class DataException extends CopycatException {
+ public DataException(String s) {
+ super(s);
+ }
+
+ public DataException(String s, Throwable throwable) {
+ super(s, throwable);
+ }
+
+ public DataException(Throwable throwable) {
+ super(throwable);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/copycat/api/src/main/java/org/apache/kafka/copycat/errors/SchemaBuilderException.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/errors/SchemaBuilderException.java b/copycat/api/src/main/java/org/apache/kafka/copycat/errors/SchemaBuilderException.java
new file mode 100644
index 0000000..b5a93af
--- /dev/null
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/errors/SchemaBuilderException.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.errors;
+
+public class SchemaBuilderException extends DataException {
+ public SchemaBuilderException(String s) {
+ super(s);
+ }
+
+ public SchemaBuilderException(String s, Throwable throwable) {
+ super(s, throwable);
+ }
+
+ public SchemaBuilderException(Throwable throwable) {
+ super(throwable);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkRecord.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkRecord.java b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkRecord.java
index e3775b3..79ac725 100644
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkRecord.java
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkRecord.java
@@ -19,6 +19,7 @@ package org.apache.kafka.copycat.sink;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.copycat.connector.CopycatRecord;
+import org.apache.kafka.copycat.data.Schema;
/**
* SinkRecord is a CopycatRecord that has been read from Kafka and includes the kafkaOffset of
@@ -29,12 +30,12 @@ import org.apache.kafka.copycat.connector.CopycatRecord;
public class SinkRecord extends CopycatRecord {
private final long kafkaOffset;
- public SinkRecord(String topic, int partition, Object key, Object value, long kafkaOffset) {
- super(topic, partition, key, value);
+ public SinkRecord(String topic, int partition, Schema keySchema, Object key, Schema valueSchema, Object value, long kafkaOffset) {
+ super(topic, partition, keySchema, key, valueSchema, value);
this.kafkaOffset = kafkaOffset;
}
- public long getKafkaOffset() {
+ public long kafkaOffset() {
return kafkaOffset;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java
index 7cc6109..67c045f 100644
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java
@@ -45,7 +45,7 @@ public abstract class SinkTaskContext {
*
* @param offsets map of offsets for topic partitions
*/
- public void resetOffset(Map<TopicPartition, Long> offsets) {
+ public void offset(Map<TopicPartition, Long> offsets) {
this.offsets = offsets;
}
@@ -53,7 +53,7 @@ public abstract class SinkTaskContext {
* Get offsets that the SinkTask has submitted to be reset. Used by the Copycat framework.
* @return the map of offsets
*/
- public Map<TopicPartition, Long> getOffsets() {
+ public Map<TopicPartition, Long> offsets() {
return offsets;
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceRecord.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceRecord.java b/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceRecord.java
index 2085f66..05286a1 100644
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceRecord.java
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceRecord.java
@@ -19,6 +19,7 @@ package org.apache.kafka.copycat.source;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.copycat.connector.CopycatRecord;
+import org.apache.kafka.copycat.data.Schema;
/**
* <p>
@@ -40,29 +41,47 @@ import org.apache.kafka.copycat.connector.CopycatRecord;
*/
@InterfaceStability.Unstable
public class SourceRecord extends CopycatRecord {
+ private final Schema sourcePartitionSchema;
private final Object sourcePartition;
+ private final Schema sourceOffsetSchema;
private final Object sourceOffset;
- public SourceRecord(Object sourcePartition, Object sourceOffset, String topic, Integer partition, Object value) {
- this(sourcePartition, sourceOffset, topic, partition, null, value);
+ public SourceRecord(Schema sourcePartitionSchema, Object sourcePartition,
+ Schema sourceOffsetSchema, Object sourceOffset,
+ String topic, Integer partition, Schema valueSchema, Object value) {
+ this(sourcePartitionSchema, sourcePartition, sourceOffsetSchema, sourceOffset, topic, partition, null, null, valueSchema, value);
}
- public SourceRecord(Object sourcePartition, Object sourceOffset, String topic, Object value) {
- this(sourcePartition, sourceOffset, topic, null, null, value);
+ public SourceRecord(Schema sourcePartitionSchema, Object sourcePartition,
+ Schema sourceOffsetSchema, Object sourceOffset,
+ String topic, Schema valueSchema, Object value) {
+ this(sourcePartitionSchema, sourcePartition, sourceOffsetSchema, sourceOffset, topic, null, null, null, valueSchema, value);
}
- public SourceRecord(Object sourcePartition, Object sourceOffset, String topic, Integer partition,
- Object key, Object value) {
- super(topic, partition, key, value);
+ public SourceRecord(Schema sourcePartitionSchema, Object sourcePartition,
+ Schema sourceOffsetSchema, Object sourceOffset,
+ String topic, Integer partition,
+ Schema keySchema, Object key, Schema valueSchema, Object value) {
+ super(topic, partition, keySchema, key, valueSchema, value);
+ this.sourcePartitionSchema = sourcePartitionSchema;
this.sourcePartition = sourcePartition;
+ this.sourceOffsetSchema = sourceOffsetSchema;
this.sourceOffset = sourceOffset;
}
- public Object getSourcePartition() {
+ public Schema sourcePartitionSchema() {
+ return sourcePartitionSchema;
+ }
+
+ public Object sourcePartition() {
return sourcePartition;
}
- public Object getSourceOffset() {
+ public Schema sourceOffsetSchema() {
+ return sourceOffsetSchema;
+ }
+
+ public Object sourceOffset() {
return sourceOffset;
}
@@ -77,10 +96,14 @@ public class SourceRecord extends CopycatRecord {
SourceRecord that = (SourceRecord) o;
- if (sourceOffset != null ? !sourceOffset.equals(that.sourceOffset) : that.sourceOffset != null)
+ if (sourcePartitionSchema != null ? !sourcePartitionSchema.equals(that.sourcePartitionSchema) : that.sourcePartitionSchema != null)
+ return false;
+ if (sourceOffsetSchema != null ? !sourceOffsetSchema.equals(that.sourceOffsetSchema) : that.sourceOffsetSchema != null)
return false;
if (sourcePartition != null ? !sourcePartition.equals(that.sourcePartition) : that.sourcePartition != null)
return false;
+ if (sourceOffset != null ? !sourceOffset.equals(that.sourceOffset) : that.sourceOffset != null)
+ return false;
return true;
}
@@ -88,6 +111,8 @@ public class SourceRecord extends CopycatRecord {
@Override
public int hashCode() {
int result = super.hashCode();
+ result = 31 * result + (sourcePartitionSchema != null ? sourcePartitionSchema.hashCode() : 0);
+ result = 31 * result + (sourceOffsetSchema != null ? sourceOffsetSchema.hashCode() : 0);
result = 31 * result + (sourcePartition != null ? sourcePartition.hashCode() : 0);
result = 31 * result + (sourceOffset != null ? sourceOffset.hashCode() : 0);
return result;
http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTaskContext.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTaskContext.java b/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTaskContext.java
index d52fd62..a3875e7 100644
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTaskContext.java
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTaskContext.java
@@ -34,7 +34,7 @@ public class SourceTaskContext {
/**
* Get the OffsetStorageReader for this SourceTask.
*/
- public OffsetStorageReader getOffsetStorageReader() {
+ public OffsetStorageReader offsetStorageReader() {
return reader;
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/copycat/api/src/main/java/org/apache/kafka/copycat/storage/Converter.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/storage/Converter.java b/copycat/api/src/main/java/org/apache/kafka/copycat/storage/Converter.java
index c50aee7..dd2068d 100644
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/storage/Converter.java
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/storage/Converter.java
@@ -18,6 +18,8 @@
package org.apache.kafka.copycat.storage;
import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.copycat.data.Schema;
+import org.apache.kafka.copycat.data.SchemaAndValue;
/**
* The Converter interface provides support for translating between Copycat's runtime data format
@@ -31,15 +33,16 @@ public interface Converter<T> {
/**
* Convert a Copycat data object to a native object for serialization.
- * @param value
+ * @param schema the schema for the value
+ * @param value the value to convert
* @return
*/
- T fromCopycatData(Object value);
+ T fromCopycatData(Schema schema, Object value);
/**
* Convert a native object to a Copycat data object.
- * @param value
- * @return
+ * @param value the value to convert
+ * @return an object containing the {@link Schema} and the converted value
*/
- Object toCopycatData(T value);
+ SchemaAndValue toCopycatData(T value);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/copycat/api/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReader.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReader.java b/copycat/api/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReader.java
index 785660d..b51fbde 100644
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReader.java
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReader.java
@@ -18,6 +18,7 @@
package org.apache.kafka.copycat.storage;
import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.copycat.data.SchemaAndValue;
import java.util.Collection;
import java.util.Map;
@@ -36,12 +37,12 @@ public interface OffsetStorageReader {
* @param partition object uniquely identifying the partition of data
* @return object uniquely identifying the offset in the partition of data
*/
- Object getOffset(Object partition);
+ SchemaAndValue offset(SchemaAndValue partition);
/**
* <p>
* Get a set of offsets for the specified partition identifiers. This may be more efficient
- * than calling {@link #getOffset(Object)} repeatedly.
+ * than calling {@link #offset(SchemaAndValue)} repeatedly.
* </p>
* <p>
* Note that when errors occur, this method omits the associated data and tries to return as
@@ -55,5 +56,5 @@ public interface OffsetStorageReader {
* @param partitions set of identifiers for partitions of data
* @return a map of partition identifiers to decoded offsets
*/
- Map<Object, Object> getOffsets(Collection<Object> partitions);
+ Map<SchemaAndValue, SchemaAndValue> offsets(Collection<SchemaAndValue> partitions);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/copycat/api/src/test/java/org/apache/kafka/copycat/connector/ConnectorReconfigurationTest.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/test/java/org/apache/kafka/copycat/connector/ConnectorReconfigurationTest.java b/copycat/api/src/test/java/org/apache/kafka/copycat/connector/ConnectorReconfigurationTest.java
index e7ad2f3..cbaf866 100644
--- a/copycat/api/src/test/java/org/apache/kafka/copycat/connector/ConnectorReconfigurationTest.java
+++ b/copycat/api/src/test/java/org/apache/kafka/copycat/connector/ConnectorReconfigurationTest.java
@@ -57,12 +57,12 @@ public class ConnectorReconfigurationTest {
}
@Override
- public Class<? extends Task> getTaskClass() {
+ public Class<? extends Task> taskClass() {
return null;
}
@Override
- public List<Properties> getTaskConfigs(int count) {
+ public List<Properties> taskConfigs(int count) {
return null;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/copycat/api/src/test/java/org/apache/kafka/copycat/data/CopycatSchemaTest.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/test/java/org/apache/kafka/copycat/data/CopycatSchemaTest.java b/copycat/api/src/test/java/org/apache/kafka/copycat/data/CopycatSchemaTest.java
new file mode 100644
index 0000000..f400863
--- /dev/null
+++ b/copycat/api/src/test/java/org/apache/kafka/copycat/data/CopycatSchemaTest.java
@@ -0,0 +1,272 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.data;
+
+import org.apache.kafka.copycat.errors.DataException;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNull;
+
+public class CopycatSchemaTest {
+ private static final Schema MAP_INT_STRING_SCHEMA = SchemaBuilder.map(Schema.INT32_SCHEMA, Schema.STRING_SCHEMA).build();
+ private static final Schema FLAT_STRUCT_SCHEMA = SchemaBuilder.struct()
+ .field("field", Schema.INT32_SCHEMA)
+ .build();
+ private static final Schema STRUCT_SCHEMA = SchemaBuilder.struct()
+ .field("first", Schema.INT32_SCHEMA)
+ .field("second", Schema.STRING_SCHEMA)
+ .field("array", SchemaBuilder.array(Schema.INT32_SCHEMA).build())
+ .field("map", SchemaBuilder.map(Schema.INT32_SCHEMA, Schema.STRING_SCHEMA).build())
+ .field("nested", FLAT_STRUCT_SCHEMA)
+ .build();
+ private static final Schema PARENT_STRUCT_SCHEMA = SchemaBuilder.struct()
+ .field("nested", FLAT_STRUCT_SCHEMA)
+ .build();
+
+ @Test
+ public void testFieldsOnStructSchema() {
+ Schema schema = SchemaBuilder.struct()
+ .field("foo", Schema.BOOLEAN_SCHEMA)
+ .field("bar", Schema.INT32_SCHEMA)
+ .build();
+
+ assertEquals(2, schema.fields().size());
+ // Validate field lookup by name
+ Field foo = schema.field("foo");
+ assertEquals(0, foo.index());
+ Field bar = schema.field("bar");
+ assertEquals(1, bar.index());
+ // Any other field name should fail
+ assertNull(schema.field("other"));
+ }
+
+
+ @Test(expected = DataException.class)
+ public void testFieldsOnlyValidForStructs() {
+ Schema.INT8_SCHEMA.fields();
+ }
+
+ @Test
+ public void testValidateValueMatchingType() {
+ CopycatSchema.validateValue(Schema.INT8_SCHEMA, (byte) 1);
+ CopycatSchema.validateValue(Schema.INT16_SCHEMA, (short) 1);
+ CopycatSchema.validateValue(Schema.INT32_SCHEMA, 1);
+ CopycatSchema.validateValue(Schema.INT64_SCHEMA, (long) 1);
+ CopycatSchema.validateValue(Schema.FLOAT32_SCHEMA, 1.f);
+ CopycatSchema.validateValue(Schema.FLOAT64_SCHEMA, 1.);
+ CopycatSchema.validateValue(Schema.BOOLEAN_SCHEMA, true);
+ CopycatSchema.validateValue(Schema.STRING_SCHEMA, "a string");
+ CopycatSchema.validateValue(Schema.BYTES_SCHEMA, "a byte array".getBytes());
+ CopycatSchema.validateValue(Schema.BYTES_SCHEMA, ByteBuffer.wrap("a byte array".getBytes()));
+ CopycatSchema.validateValue(SchemaBuilder.array(Schema.INT32_SCHEMA).build(), Arrays.asList(1, 2, 3));
+ CopycatSchema.validateValue(
+ SchemaBuilder.map(Schema.INT32_SCHEMA, Schema.STRING_SCHEMA).build(),
+ Collections.singletonMap(1, "value")
+ );
+ // Struct tests the basic struct layout + complex field types + nested structs
+ Struct structValue = new Struct(STRUCT_SCHEMA)
+ .put("first", 1)
+ .put("second", "foo")
+ .put("array", Arrays.asList(1, 2, 3))
+ .put("map", Collections.singletonMap(1, "value"))
+ .put("nested", new Struct(FLAT_STRUCT_SCHEMA).put("field", 12));
+ CopycatSchema.validateValue(STRUCT_SCHEMA, structValue);
+ }
+
+ // To avoid requiring excessive numbers of tests, these checks for invalid types use a similar type where possible
+ // to only include a single test for each type
+
+ @Test(expected = DataException.class)
+ public void testValidateValueMismatchInt8() {
+ CopycatSchema.validateValue(Schema.INT8_SCHEMA, 1);
+ }
+
+ @Test(expected = DataException.class)
+ public void testValidateValueMismatchInt16() {
+ CopycatSchema.validateValue(Schema.INT16_SCHEMA, 1);
+ }
+
+ @Test(expected = DataException.class)
+ public void testValidateValueMismatchInt32() {
+ CopycatSchema.validateValue(Schema.INT32_SCHEMA, (long) 1);
+ }
+
+ @Test(expected = DataException.class)
+ public void testValidateValueMismatchInt64() {
+ CopycatSchema.validateValue(Schema.INT64_SCHEMA, 1);
+ }
+
+ @Test(expected = DataException.class)
+ public void testValidateValueMismatchFloat() {
+ CopycatSchema.validateValue(Schema.FLOAT32_SCHEMA, 1.0);
+ }
+
+ @Test(expected = DataException.class)
+ public void testValidateValueMismatchDouble() {
+ CopycatSchema.validateValue(Schema.FLOAT64_SCHEMA, 1.f);
+ }
+
+ @Test(expected = DataException.class)
+ public void testValidateValueMismatchBoolean() {
+ CopycatSchema.validateValue(Schema.BOOLEAN_SCHEMA, 1.f);
+ }
+
+ @Test(expected = DataException.class)
+ public void testValidateValueMismatchString() {
+ // CharSequence is a similar type (supertype of String), but we restrict to String.
+ CharBuffer cbuf = CharBuffer.wrap("abc");
+ CopycatSchema.validateValue(Schema.STRING_SCHEMA, cbuf);
+ }
+
+ @Test(expected = DataException.class)
+ public void testValidateValueMismatchBytes() {
+ CopycatSchema.validateValue(Schema.BYTES_SCHEMA, new Object[]{1, "foo"});
+ }
+
+ @Test(expected = DataException.class)
+ public void testValidateValueMismatchArray() {
+ CopycatSchema.validateValue(SchemaBuilder.array(Schema.INT32_SCHEMA).build(), Arrays.asList("a", "b", "c"));
+ }
+
+ @Test(expected = DataException.class)
+ public void testValidateValueMismatchArraySomeMatch() {
+ // Even if some match the right type, this should fail if any mismatch. In this case, type erasure loses
+ // the fact that the list is actually List<Object>, but we couldn't tell if only checking the first element
+ CopycatSchema.validateValue(SchemaBuilder.array(Schema.INT32_SCHEMA).build(), Arrays.asList(1, 2, "c"));
+ }
+
+ @Test(expected = DataException.class)
+ public void testValidateValueMismatchMapKey() {
+ CopycatSchema.validateValue(MAP_INT_STRING_SCHEMA, Collections.singletonMap("wrong key type", "value"));
+ }
+
+ @Test(expected = DataException.class)
+ public void testValidateValueMismatchMapValue() {
+ CopycatSchema.validateValue(MAP_INT_STRING_SCHEMA, Collections.singletonMap(1, 2));
+ }
+
+ @Test(expected = DataException.class)
+ public void testValidateValueMismatchMapSomeKeys() {
+ Map<Object, String> data = new HashMap<>();
+ data.put(1, "abc");
+ data.put("wrong", "it's as easy as one two three");
+ CopycatSchema.validateValue(MAP_INT_STRING_SCHEMA, data);
+ }
+
+ @Test(expected = DataException.class)
+ public void testValidateValueMismatchMapSomeValues() {
+ Map<Integer, Object> data = new HashMap<>();
+ data.put(1, "abc");
+ data.put(2, "wrong".getBytes());
+ CopycatSchema.validateValue(MAP_INT_STRING_SCHEMA, data);
+ }
+
+ @Test(expected = DataException.class)
+ public void testValidateValueMismatchStructWrongSchema() {
+ // Completely mismatching schemas
+ CopycatSchema.validateValue(
+ FLAT_STRUCT_SCHEMA,
+ new Struct(SchemaBuilder.struct().field("x", Schema.INT32_SCHEMA).build()).put("x", 1)
+ );
+ }
+
+ @Test(expected = DataException.class)
+ public void testValidateValueMismatchStructWrongNestedSchema() {
+ // Top-level schema matches, but nested does not.
+ CopycatSchema.validateValue(
+ PARENT_STRUCT_SCHEMA,
+ new Struct(PARENT_STRUCT_SCHEMA)
+ .put("nested", new Struct(SchemaBuilder.struct().field("x", Schema.INT32_SCHEMA).build()).put("x", 1))
+ );
+ }
+
+
+ @Test
+ public void testPrimitiveEquality() {
+ // Test that primitive types, which only need to consider all the type & metadata fields, handle equality correctly
+ CopycatSchema s1 = new CopycatSchema(Schema.Type.INT8, false, null, "name", 2, "doc", null, null, null);
+ CopycatSchema s2 = new CopycatSchema(Schema.Type.INT8, false, null, "name", 2, "doc", null, null, null);
+ CopycatSchema differentType = new CopycatSchema(Schema.Type.INT16, false, null, "name", 2, "doc", null, null, null);
+ CopycatSchema differentOptional = new CopycatSchema(Schema.Type.INT8, true, null, "name", 2, "doc", null, null, null);
+ CopycatSchema differentDefault = new CopycatSchema(Schema.Type.INT8, false, true, "name", 2, "doc", null, null, null);
+ CopycatSchema differentName = new CopycatSchema(Schema.Type.INT8, false, null, "otherName", 2, "doc", null, null, null);
+ CopycatSchema differentVersion = new CopycatSchema(Schema.Type.INT8, false, null, "name", 4, "doc", null, null, null);
+ CopycatSchema differentDoc = new CopycatSchema(Schema.Type.INT8, false, null, "name", 2, "other doc", null, null, null);
+
+ assertEquals(s1, s2);
+ assertNotEquals(s1, differentType);
+ assertNotEquals(s1, differentOptional);
+ assertNotEquals(s1, differentDefault);
+ assertNotEquals(s1, differentName);
+ assertNotEquals(s1, differentVersion);
+ assertNotEquals(s1, differentDoc);
+ }
+
+ @Test
+ public void testArrayEquality() {
+ // Validate that the value type for the array is tested for equality. This test makes sure the same schema object is
+ // never reused to ensure we're actually checking equality
+ CopycatSchema s1 = new CopycatSchema(Schema.Type.ARRAY, false, null, null, null, null, null, null, SchemaBuilder.int8().build());
+ CopycatSchema s2 = new CopycatSchema(Schema.Type.ARRAY, false, null, null, null, null, null, null, SchemaBuilder.int8().build());
+ CopycatSchema differentValueSchema = new CopycatSchema(Schema.Type.ARRAY, false, null, null, null, null, null, null, SchemaBuilder.int16().build());
+
+ assertEquals(s1, s2);
+ assertNotEquals(s1, differentValueSchema);
+ }
+
+ @Test
+ public void testMapEquality() {
+ // Same as testArrayEquality, but for both key and value schemas
+ CopycatSchema s1 = new CopycatSchema(Schema.Type.MAP, false, null, null, null, null, null, SchemaBuilder.int8().build(), SchemaBuilder.int16().build());
+ CopycatSchema s2 = new CopycatSchema(Schema.Type.MAP, false, null, null, null, null, null, SchemaBuilder.int8().build(), SchemaBuilder.int16().build());
+ CopycatSchema differentKeySchema = new CopycatSchema(Schema.Type.MAP, false, null, null, null, null, null, SchemaBuilder.string().build(), SchemaBuilder.int16().build());
+ CopycatSchema differentValueSchema = new CopycatSchema(Schema.Type.MAP, false, null, null, null, null, null, SchemaBuilder.int8().build(), SchemaBuilder.string().build());
+
+ assertEquals(s1, s2);
+ assertNotEquals(s1, differentKeySchema);
+ assertNotEquals(s1, differentValueSchema);
+ }
+
+ @Test
+ public void testStructEquality() {
+ // Same as testArrayEquality, but checks differences in fields. Only does a simple check, relying on tests of
+ // Field's equals() method to validate all variations in the list of fields will be checked
+ CopycatSchema s1 = new CopycatSchema(Schema.Type.STRUCT, false, null, null, null, null,
+ Arrays.asList(new Field("field", 0, SchemaBuilder.int8().build()),
+ new Field("field2", 1, SchemaBuilder.int16().build())), null, null);
+ CopycatSchema s2 = new CopycatSchema(Schema.Type.STRUCT, false, null, null, null, null,
+ Arrays.asList(new Field("field", 0, SchemaBuilder.int8().build()),
+ new Field("field2", 1, SchemaBuilder.int16().build())), null, null);
+ CopycatSchema differentField = new CopycatSchema(Schema.Type.STRUCT, false, null, null, null, null,
+ Arrays.asList(new Field("field", 0, SchemaBuilder.int8().build()),
+ new Field("different field name", 1, SchemaBuilder.int16().build())), null, null);
+
+ assertEquals(s1, s2);
+ assertNotEquals(s1, differentField);
+ }
+
+}