You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2015/08/27 20:58:55 UTC

[5/5] kafka git commit: KAFKA-2367; Add Copycat runtime data API.

KAFKA-2367; Add Copycat runtime data API.

Author: Ewen Cheslack-Postava <me...@ewencp.org>

Reviewers: Gwen Shapira, Jay Kreps

Closes #163 from ewencp/kafka-2367-copycat-runtime-data-api


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/492bfdfa
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/492bfdfa
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/492bfdfa

Branch: refs/heads/trunk
Commit: 492bfdfa873db649049382b9785cefef2d6d1eca
Parents: 8c88d19
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Authored: Thu Aug 27 11:58:42 2015 -0700
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Thu Aug 27 11:58:42 2015 -0700

----------------------------------------------------------------------
 bin/kafka-run-class.sh                          |    2 +-
 build.gradle                                    |   49 +-
 .../kafka/common/config/AbstractConfig.java     |    8 +
 .../kafka/common/protocol/types/Struct.java     |   10 +-
 .../kafka/copycat/connector/Connector.java      |    4 +-
 .../kafka/copycat/connector/CopycatRecord.java  |   37 +-
 .../kafka/copycat/data/CopycatSchema.java       |  235 ++
 .../org/apache/kafka/copycat/data/Field.java    |   77 +
 .../org/apache/kafka/copycat/data/Schema.java   |  157 ++
 .../kafka/copycat/data/SchemaAndValue.java      |   62 +
 .../kafka/copycat/data/SchemaBuilder.java       |  371 +++
 .../org/apache/kafka/copycat/data/Struct.java   |  265 ++
 .../kafka/copycat/errors/DataException.java     |   35 +
 .../copycat/errors/SchemaBuilderException.java  |   32 +
 .../apache/kafka/copycat/sink/SinkRecord.java   |    7 +-
 .../kafka/copycat/sink/SinkTaskContext.java     |    4 +-
 .../kafka/copycat/source/SourceRecord.java      |   45 +-
 .../kafka/copycat/source/SourceTaskContext.java |    2 +-
 .../apache/kafka/copycat/storage/Converter.java |   13 +-
 .../copycat/storage/OffsetStorageReader.java    |    7 +-
 .../connector/ConnectorReconfigurationTest.java |    4 +-
 .../kafka/copycat/data/CopycatSchemaTest.java   |  272 ++
 .../apache/kafka/copycat/data/FieldTest.java    |   40 +
 .../kafka/copycat/data/SchemaBuilderTest.java   |  287 +++
 .../apache/kafka/copycat/data/StructTest.java   |  222 ++
 .../copycat/data/DataRuntimeException.java      |   36 -
 .../kafka/copycat/data/DataTypeException.java   |   33 -
 .../kafka/copycat/data/ObjectProperties.java    |   85 -
 .../org/apache/kafka/copycat/data/Schema.java   | 1054 --------
 .../kafka/copycat/data/SchemaBuilder.java       | 2415 ------------------
 .../copycat/data/SchemaParseException.java      |   32 -
 .../copycat/file/FileStreamSinkConnector.java   |    4 +-
 .../kafka/copycat/file/FileStreamSinkTask.java  |    2 +-
 .../copycat/file/FileStreamSourceConnector.java |    4 +-
 .../copycat/file/FileStreamSourceTask.java      |   43 +-
 .../file/FileStreamSinkConnectorTest.java       |    6 +-
 .../copycat/file/FileStreamSinkTaskTest.java    |    7 +-
 .../file/FileStreamSourceConnectorTest.java     |    8 +-
 .../copycat/file/FileStreamSourceTaskTest.java  |   31 +-
 .../kafka/copycat/json/JsonConverter.java       |  451 +++-
 .../apache/kafka/copycat/json/JsonSchema.java   |   79 +-
 .../kafka/copycat/json/JsonSerializer.java      |    3 +
 .../kafka/copycat/json/JsonConverterTest.java   |  200 +-
 .../apache/kafka/copycat/cli/WorkerConfig.java  |   10 -
 .../kafka/copycat/runtime/ConnectorConfig.java  |   14 -
 .../apache/kafka/copycat/runtime/Worker.java    |    6 +-
 .../kafka/copycat/runtime/WorkerSinkTask.java   |   15 +-
 .../copycat/runtime/WorkerSinkTaskThread.java   |   12 +-
 .../kafka/copycat/runtime/WorkerSourceTask.java |   14 +-
 .../runtime/standalone/StandaloneHerder.java    |    6 +-
 .../storage/OffsetStorageReaderImpl.java        |   19 +-
 .../copycat/storage/OffsetStorageWriter.java    |   13 +-
 .../kafka/copycat/util/ConnectorTaskId.java     |    4 +-
 .../kafka/copycat/util/FutureCallback.java      |    6 +-
 .../copycat/runtime/WorkerSinkTaskTest.java     |   26 +-
 .../copycat/runtime/WorkerSourceTaskTest.java   |   16 +-
 .../standalone/StandaloneHerderTest.java        |    4 +-
 .../storage/OffsetStorageWriterTest.java        |   19 +-
 settings.gradle                                 |    2 +-
 59 files changed, 2844 insertions(+), 4082 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/bin/kafka-run-class.sh
----------------------------------------------------------------------
diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh
index b689b2e..dd37df4 100755
--- a/bin/kafka-run-class.sh
+++ b/bin/kafka-run-class.sh
@@ -66,7 +66,7 @@ do
   CLASSPATH=$CLASSPATH:$file
 done
 
-for cc_pkg in "data" "api" "runtime" "file" "json"
+for cc_pkg in "api" "runtime" "file" "json"
 do
   for file in $base_dir/copycat/${cc_pkg}/build/libs/copycat-${cc_pkg}*.jar $base_dir/copycat/${cc_pkg}/build/dependant-libs/*.jar;
   do

http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 3fd54cf..3a11670 100644
--- a/build.gradle
+++ b/build.gradle
@@ -29,7 +29,7 @@ buildscript {
 
 def slf4jlog4j='org.slf4j:slf4j-log4j12:1.7.6'
 def slf4japi="org.slf4j:slf4j-api:1.7.6"
-def junit='junit:junit:4.6'
+def junit='junit:junit:4.11'
 def easymock='org.easymock:easymock:3.3.1'
 def powermock='org.powermock:powermock-module-junit4:1.6.2'
 def powermock_easymock='org.powermock:powermock-api-easymock:1.6.2'
@@ -214,7 +214,7 @@ for ( sv in ['2_10_5', '2_11_7'] ) {
   }
 }
 
-def copycatPkgs = ['copycat:data', 'copycat:api', 'copycat:runtime', 'copycat:json', 'copycat:file']
+def copycatPkgs = ['copycat:api', 'copycat:runtime', 'copycat:json', 'copycat:file']
 def pkgs = ['clients', 'examples', 'contrib:hadoop-consumer', 'contrib:hadoop-producer', 'log4j-appender', 'tools'] + copycatPkgs
 
 tasks.create(name: "jarCopycat", dependsOn: copycatPkgs.collect { it + ":jar" }) {}
@@ -558,56 +558,13 @@ project(':log4j-appender') {
   test.dependsOn('checkstyleMain', 'checkstyleTest')
 }
 
-project(':copycat:data') {
-  apply plugin: 'checkstyle'
-  archivesBaseName = "copycat-data"
-
-  dependencies {
-    compile project(':clients')
-    compile "$slf4japi"
-
-    testCompile "$junit"
-    testRuntime "$slf4jlog4j"
-  }
-
-  task testJar(type: Jar) {
-    classifier = 'test'
-    from sourceSets.test.output
-  }
-
-  test {
-    testLogging {
-      events "passed", "skipped", "failed"
-      exceptionFormat = 'full'
-    }
-  }
-
-  javadoc {
-    include "**/org/apache/kafka/copycat/data/*"
-  }
-
-  artifacts {
-    archives testJar
-  }
-
-  configurations {
-    archives.extendsFrom (testCompile)
-  }
-
-  /* FIXME Re-enable this with KAFKA-2367 when the placeholder data API is replaced
-  checkstyle {
-    configFile = new File(rootDir, "checkstyle/checkstyle.xml")
-  }
-  test.dependsOn('checkstyleMain', 'checkstyleTest') */
-}
-
 project(':copycat:api') {
   apply plugin: 'checkstyle'
   archivesBaseName = "copycat-api"
 
   dependencies {
-    compile project(':copycat:data')
     compile "$slf4japi"
+    compile project(':clients')
 
     testCompile "$junit"
     testRuntime "$slf4jlog4j"

http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index 156ec14..774701a 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -101,6 +101,14 @@ public class AbstractConfig {
         return keys;
     }
 
+    public Properties unusedProperties() {
+        Set<String> unusedKeys = this.unused();
+        Properties unusedProps = new Properties();
+        for (String key : unusedKeys)
+            unusedProps.put(key, this.originals().get(key));
+        return unusedProps;
+    }
+
     public Map<String, Object> originals() {
         Map<String, Object> copy = new HashMap<String, Object>();
         copy.putAll(originals);

http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
index 92de6a9..ef2525e 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
@@ -139,11 +139,17 @@ public class Struct {
     }
 
     public ByteBuffer getBytes(Field field) {
-        return (ByteBuffer) get(field);
+        Object result = get(field);
+        if (result instanceof byte[])
+            return ByteBuffer.wrap((byte[]) result);
+        return (ByteBuffer) result;
     }
 
     public ByteBuffer getBytes(String name) {
-        return (ByteBuffer) get(name);
+        Object result = get(name);
+        if (result instanceof byte[])
+            return ByteBuffer.wrap((byte[]) result);
+        return (ByteBuffer) result;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Connector.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Connector.java b/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Connector.java
index 2ea3c95..ae141c4 100644
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Connector.java
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Connector.java
@@ -99,7 +99,7 @@ public abstract class Connector {
     /**
      * Returns the Task implementation for this Connector.
      */
-    public abstract Class<? extends Task> getTaskClass();
+    public abstract Class<? extends Task> taskClass();
 
     /**
      * Returns a set of configurations for Tasks based on the current configuration,
@@ -108,7 +108,7 @@ public abstract class Connector {
      * @param maxTasks maximum number of configurations to generate
      * @return configurations for Tasks
      */
-    public abstract List<Properties> getTaskConfigs(int maxTasks);
+    public abstract List<Properties> taskConfigs(int maxTasks);
 
     /**
      * Stop this connector.

http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/copycat/api/src/main/java/org/apache/kafka/copycat/connector/CopycatRecord.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/connector/CopycatRecord.java b/copycat/api/src/main/java/org/apache/kafka/copycat/connector/CopycatRecord.java
index 576904a..0d3e8dc 100644
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/connector/CopycatRecord.java
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/connector/CopycatRecord.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.copycat.connector;
 
 import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.copycat.data.Schema;
 
 /**
  * <p>
@@ -31,36 +32,48 @@ import org.apache.kafka.common.annotation.InterfaceStability;
 public abstract class CopycatRecord {
     private final String topic;
     private final Integer kafkaPartition;
+    private final Schema keySchema;
     private final Object key;
+    private final Schema valueSchema;
     private final Object value;
 
-    public CopycatRecord(String topic, Integer kafkaPartition, Object value) {
-        this(topic, kafkaPartition, null, value);
+    public CopycatRecord(String topic, Integer kafkaPartition, Schema valueSchema, Object value) {
+        this(topic, kafkaPartition, null, null, valueSchema, value);
     }
 
-    public CopycatRecord(String topic, Integer kafkaPartition, Object key, Object value) {
+    public CopycatRecord(String topic, Integer kafkaPartition, Schema keySchema, Object key, Schema valueSchema, Object value) {
         this.topic = topic;
         this.kafkaPartition = kafkaPartition;
+        this.keySchema = keySchema;
         this.key = key;
+        this.valueSchema = valueSchema;
         this.value = value;
     }
 
-    public String getTopic() {
+    public String topic() {
         return topic;
     }
 
-    public Integer getKafkaPartition() {
+    public Integer kafkaPartition() {
         return kafkaPartition;
     }
 
-    public Object getKey() {
+    public Object key() {
         return key;
     }
 
-    public Object getValue() {
+    public Schema keySchema() {
+        return keySchema;
+    }
+
+    public Object value() {
         return value;
     }
 
+    public Schema valueSchema() {
+        return valueSchema;
+    }
+
     @Override
     public String toString() {
         return "CopycatRecord{" +
@@ -80,12 +93,16 @@ public abstract class CopycatRecord {
 
         CopycatRecord that = (CopycatRecord) o;
 
-        if (key != null ? !key.equals(that.key) : that.key != null)
-            return false;
         if (kafkaPartition != null ? !kafkaPartition.equals(that.kafkaPartition) : that.kafkaPartition != null)
             return false;
         if (topic != null ? !topic.equals(that.topic) : that.topic != null)
             return false;
+        if (keySchema != null ? !keySchema.equals(that.keySchema) : that.keySchema != null)
+            return false;
+        if (key != null ? !key.equals(that.key) : that.key != null)
+            return false;
+        if (valueSchema != null ? !valueSchema.equals(that.valueSchema) : that.valueSchema != null)
+            return false;
         if (value != null ? !value.equals(that.value) : that.value != null)
             return false;
 
@@ -96,7 +113,9 @@ public abstract class CopycatRecord {
     public int hashCode() {
         int result = topic != null ? topic.hashCode() : 0;
         result = 31 * result + (kafkaPartition != null ? kafkaPartition.hashCode() : 0);
+        result = 31 * result + (keySchema != null ? keySchema.hashCode() : 0);
         result = 31 * result + (key != null ? key.hashCode() : 0);
+        result = 31 * result + (valueSchema != null ? valueSchema.hashCode() : 0);
         result = 31 * result + (value != null ? value.hashCode() : 0);
         return result;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/copycat/api/src/main/java/org/apache/kafka/copycat/data/CopycatSchema.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/data/CopycatSchema.java b/copycat/api/src/main/java/org/apache/kafka/copycat/data/CopycatSchema.java
new file mode 100644
index 0000000..c823f28
--- /dev/null
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/data/CopycatSchema.java
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.data;
+
+import org.apache.kafka.copycat.errors.DataException;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+public class CopycatSchema implements Schema {
+    private static final Map<Type, Class<?>> SCHEMA_TYPE_CLASSES = new HashMap<>();
+    static {
+        SCHEMA_TYPE_CLASSES.put(Type.INT8, Byte.class);
+        SCHEMA_TYPE_CLASSES.put(Type.INT16, Short.class);
+        SCHEMA_TYPE_CLASSES.put(Type.INT32, Integer.class);
+        SCHEMA_TYPE_CLASSES.put(Type.INT64, Long.class);
+        SCHEMA_TYPE_CLASSES.put(Type.FLOAT32, Float.class);
+        SCHEMA_TYPE_CLASSES.put(Type.FLOAT64, Double.class);
+        SCHEMA_TYPE_CLASSES.put(Type.BOOLEAN, Boolean.class);
+        SCHEMA_TYPE_CLASSES.put(Type.STRING, String.class);
+        SCHEMA_TYPE_CLASSES.put(Type.ARRAY, List.class);
+        SCHEMA_TYPE_CLASSES.put(Type.MAP, Map.class);
+        SCHEMA_TYPE_CLASSES.put(Type.STRUCT, Struct.class);
+        // Bytes are handled as a special case
+    }
+
+    // The type of the field
+    private final Type type;
+    private final boolean optional;
+    private final Object defaultValue;
+
+    private final List<Field> fields;
+    private final Map<String, Field> fieldsByName;
+
+    private final Schema keySchema;
+    private final Schema valueSchema;
+
+    // Optional name and version provide a built-in way to indicate what type of data is included. Most
+    // useful for structs to indicate the semantics of the struct and map it to some existing underlying
+    // serializer-specific schema. However, can also be useful in specifying other logical types (e.g. a set is an array
+    // with additional constraints).
+    private final String name;
+    private final Integer version;
+    // Optional human readable documentation describing this schema.
+    private final String doc;
+
+    /**
+     * Construct a Schema. Most users should not construct schemas manually, preferring {@link SchemaBuilder} instead.
+     */
+    public CopycatSchema(Type type, boolean optional, Object defaultValue, String name, Integer version, String doc, List<Field> fields, Schema keySchema, Schema valueSchema) {
+        this.type = type;
+        this.optional = optional;
+        this.defaultValue = defaultValue;
+        this.name = name;
+        this.version = version;
+        this.doc = doc;
+
+        this.fields = fields;
+        if (this.fields != null && this.type == Type.STRUCT) {
+            this.fieldsByName = new HashMap<>();
+            for (Field field : fields)
+                fieldsByName.put(field.name(), field);
+        } else {
+            this.fieldsByName = null;
+        }
+
+        this.keySchema = keySchema;
+        this.valueSchema = valueSchema;
+    }
+
+    @Override
+    public Type type() {
+        return type;
+    }
+
+    @Override
+    public boolean isOptional() {
+        return optional;
+    }
+
+    @Override
+    public Object defaultValue() {
+        return defaultValue;
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
+
+    @Override
+    public Integer version() {
+        return version;
+    }
+
+    @Override
+    public String doc() {
+        return doc;
+    }
+
+
+
+    @Override
+    public List<Field> fields() {
+        if (type != Type.STRUCT)
+            throw new DataException("Cannot list fields on non-struct type");
+        return fields;
+    }
+
+    public Field field(String fieldName) {
+        if (type != Type.STRUCT)
+            throw new DataException("Cannot look up fields on non-struct type");
+        return fieldsByName.get(fieldName);
+    }
+
+    @Override
+    public Schema keySchema() {
+        if (type != Type.MAP)
+            throw new DataException("Cannot look up key schema on non-map type");
+        return keySchema;
+    }
+
+    @Override
+    public Schema valueSchema() {
+        if (type != Type.MAP && type != Type.ARRAY)
+            throw new DataException("Cannot look up value schema on non-array and non-map type");
+        return valueSchema;
+    }
+
+
+
+    /**
+     * Validate that the value can be used with the schema, i.e. that it's type matches the schema type and nullability
+     * requirements. Throws a DataException if the value is invalid. Returns
+     * @param schema Schema to test
+     * @param value value to test
+     */
+    public static void validateValue(Schema schema, Object value) {
+        if (value == null) {
+            if (!schema.isOptional())
+                throw new DataException("Invalid value: null used for required field");
+            else
+                return;
+        }
+
+        // Special case for bytes. byte[] causes problems because it doesn't handle equals()/hashCode() like we want
+        // objects to, so we support both byte[] and ByteBuffer. Using plain byte[] can cause those methods to fail, so
+        // ByteBuffers are recommended
+        if (schema.type() == Type.BYTES && (value instanceof byte[] || value instanceof ByteBuffer))
+            return;
+        Class<?> expectedClass = SCHEMA_TYPE_CLASSES.get(schema.type());
+        if (expectedClass == null || !expectedClass.isInstance(value))
+                throw new DataException("Invalid value: expected " + expectedClass + " for type " + schema.type() + " but tried to use " + value.getClass());
+
+        switch (schema.type()) {
+            case STRUCT:
+                Struct struct = (Struct) value;
+                if (!struct.schema().equals(schema))
+                    throw new DataException("Struct schemas do not match.");
+                struct.validate();
+                break;
+            case ARRAY:
+                List<?> array = (List<?>) value;
+                for (Object entry : array)
+                    validateValue(schema.valueSchema(), entry);
+                break;
+            case MAP:
+                Map<?, ?> map = (Map<?, ?>) value;
+                for (Map.Entry<?, ?> entry : map.entrySet()) {
+                    validateValue(schema.keySchema(), entry.getKey());
+                    validateValue(schema.valueSchema(), entry.getValue());
+                }
+                break;
+        }
+    }
+
+    /**
+     * Validate that the value can be used for this schema, i.e. that it's type matches the schema type and optional
+     * requirements. Throws a DataException if the value is invalid.
+     * @param value the value to validate
+     */
+    public void validateValue(Object value) {
+        validateValue(this, value);
+    }
+
+    @Override
+    public CopycatSchema schema() {
+        return this;
+    }
+
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        CopycatSchema schema = (CopycatSchema) o;
+        return Objects.equals(optional, schema.optional) &&
+                Objects.equals(type, schema.type) &&
+                Objects.equals(defaultValue, schema.defaultValue) &&
+                Objects.equals(fields, schema.fields) &&
+                Objects.equals(keySchema, schema.keySchema) &&
+                Objects.equals(valueSchema, schema.valueSchema) &&
+                Objects.equals(name, schema.name) &&
+                Objects.equals(version, schema.version) &&
+                Objects.equals(doc, schema.doc);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(type, optional, defaultValue, fields, keySchema, valueSchema, name, version, doc);
+    }
+
+    @Override
+    public String toString() {
+        if (name != null)
+            return "Schema{" + name + ":" + type + "}";
+        else
+            return "Schema{" + type + "}";
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/copycat/api/src/main/java/org/apache/kafka/copycat/data/Field.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/data/Field.java b/copycat/api/src/main/java/org/apache/kafka/copycat/data/Field.java
new file mode 100644
index 0000000..c71cdb4
--- /dev/null
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/data/Field.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.data;
+
+import java.util.Objects;
+
+/**
+ * <p>
+ *     A field in a {@link Struct}, consisting of a field name, index, and {@link Schema} for the field value.
+ * </p>
+ */
+public class Field {
+    private final String name;
+    private final int index;
+    private final Schema schema;
+
+    public Field(String name, int index, Schema schema) {
+        this.name = name;
+        this.index = index;
+        this.schema = schema;
+    }
+
+    /**
+     * Get the name of this field.
+     * @return the name of this field
+     */
+    public String name() {
+        return name;
+    }
+
+
+    /**
+     * Get the index of this field within the struct.
+     * @return the index of this field
+     */
+    public int index() {
+        return index;
+    }
+
+    /**
+     * Get the schema of this field
+     * @return the schema of values of this field
+     */
+    public Schema schema() {
+        return schema;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        Field field = (Field) o;
+        return Objects.equals(index, field.index) &&
+                Objects.equals(name, field.name) &&
+                Objects.equals(schema, field.schema);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(name, index, schema);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/copycat/api/src/main/java/org/apache/kafka/copycat/data/Schema.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/data/Schema.java b/copycat/api/src/main/java/org/apache/kafka/copycat/data/Schema.java
new file mode 100644
index 0000000..5ceb57d
--- /dev/null
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/data/Schema.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.data;
+
+import java.util.List;
+
+/**
+ * <p>
+ *     Definition of an abstract data type. Data types can be primitive types (integer types, floating point types,
+ *     boolean, strings, and bytes) or complex types (typed arrays, maps with one key schema and value schema,
+ *     and structs that have a fixed set of field names each with an associated value schema). Any type can be specified
+ *     as optional, allowing it to be omitted (resulting in null values when it is missing) and can specify a default
+ *     value.
+ * </p>
+ * <p>
+ *     All schemas may have some associated metadata: a name, version, and documentation. These are all considered part
+ *     of the schema itself and included when comparing schemas. Besides adding important metadata, these fields enable
+ *     the specification of logical types that specify additional constraints and semantics (e.g. UNIX timestamps are
+ *     just an int64, but the user needs the know about the additional semantics to interpret it properly).
+ * </p>
+ * <p>
+ *     Schemas can be created directly, but in most cases using {@link SchemaBuilder} will be simpler.
+ * </p>
+ */
+public interface Schema {
+    /**
+     * The type of a schema. These only include the core types; logical types must be determined by checking the schema name.
+     */
+    enum Type {
+        INT8, INT16, INT32, INT64, FLOAT32, FLOAT64, BOOLEAN, STRING, BYTES, ARRAY, MAP, STRUCT;
+
+        private String name;
+
+        Type() {
+            this.name = this.name().toLowerCase();
+        }
+
+        public String getName() {
+            return name;
+        }
+
+        public boolean isPrimitive() {
+            switch (this) {
+                case INT8:
+                case INT16:
+                case INT32:
+                case INT64:
+                case FLOAT32:
+                case FLOAT64:
+                case BOOLEAN:
+                case STRING:
+                case BYTES:
+                    return true;
+            }
+            return false;
+        }
+    }
+
+
+    Schema INT8_SCHEMA = SchemaBuilder.int8().build();
+    Schema INT16_SCHEMA = SchemaBuilder.int16().build();
+    Schema INT32_SCHEMA = SchemaBuilder.int32().build();
+    Schema INT64_SCHEMA = SchemaBuilder.int64().build();
+    Schema FLOAT32_SCHEMA = SchemaBuilder.float32().build();
+    Schema FLOAT64_SCHEMA = SchemaBuilder.float64().build();
+    Schema BOOLEAN_SCHEMA = SchemaBuilder.bool().build();
+    Schema STRING_SCHEMA = SchemaBuilder.string().build();
+    Schema BYTES_SCHEMA = SchemaBuilder.bytes().build();
+
+    Schema OPTIONAL_INT8_SCHEMA = SchemaBuilder.int8().optional().build();
+    Schema OPTIONAL_INT16_SCHEMA = SchemaBuilder.int16().optional().build();
+    Schema OPTIONAL_INT32_SCHEMA = SchemaBuilder.int32().optional().build();
+    Schema OPTIONAL_INT64_SCHEMA = SchemaBuilder.int64().optional().build();
+    Schema OPTIONAL_FLOAT32_SCHEMA = SchemaBuilder.float32().optional().build();
+    Schema OPTIONAL_FLOAT64_SCHEMA = SchemaBuilder.float64().optional().build();
+    Schema OPTIONAL_BOOLEAN_SCHEMA = SchemaBuilder.bool().optional().build();
+    Schema OPTIONAL_STRING_SCHEMA = SchemaBuilder.string().optional().build();
+    Schema OPTIONAL_BYTES_SCHEMA = SchemaBuilder.bytes().optional().build();
+
+
+    /**
+     * @return the type of this schema
+     */
+    Type type();
+
+    /**
+     * @return true if this field is optional, false otherwise
+     */
+    boolean isOptional();
+
+    /**
+     * @return the default value for this schema
+     */
+    Object defaultValue();
+
+    /**
+     * @return the name of this schema
+     */
+    String name();
+
+    /**
+     * Get the optional version of the schema. If a version is included, newer versions *must* be larger than older ones.
+     * @return the version of this schema
+     */
+    Integer version();
+
+    /**
+     * @return the documentation for this schema
+     */
+    String doc();
+
+    /**
+     * Get the key schema for this map schema. Throws a DataException if this schema is not a map.
+     * @return the key schema
+     */
+    Schema keySchema();
+
+    /**
+     * Get the value schema for this map or array schema. Throws a DataException if this schema is not a map or array.
+     * @return the value schema
+     */
+    Schema valueSchema();
+
+    /**
+     * Get the list of fields for this Schema. Throws a DataException if this schema is not a struct.
+     * @return the list of fields for this Schema
+     */
+    List<Field> fields();
+
+    /**
+     * Get a field for this Schema by name. Throws a DataException if this schema is not a struct.
+     * @param fieldName the name of the field to look up
+     * @return the Field object for the specified field, or null if there is no field with the given name
+     */
+    Field field(String fieldName);
+
+    /**
+     * Return a concrete instance of the {@link Schema}
+     * @return the {@link Schema}
+     */
+    Schema schema();
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/copycat/api/src/main/java/org/apache/kafka/copycat/data/SchemaAndValue.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/data/SchemaAndValue.java b/copycat/api/src/main/java/org/apache/kafka/copycat/data/SchemaAndValue.java
new file mode 100644
index 0000000..368a8cf
--- /dev/null
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/data/SchemaAndValue.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.data;
+
+import java.util.Objects;
+
+public class SchemaAndValue {
+    private final Schema schema;
+    private final Object value;
+
+    public static final SchemaAndValue NULL = new SchemaAndValue(null, null);
+
+    public SchemaAndValue(Schema schema, Object value) {
+        this.value = value;
+        this.schema = schema;
+    }
+
+    public Schema schema() {
+        return schema;
+    }
+
+    public Object value() {
+        return value;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        SchemaAndValue that = (SchemaAndValue) o;
+        return Objects.equals(schema, that.schema) &&
+                Objects.equals(value, that.value);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(schema, value);
+    }
+
+    @Override
+    public String toString() {
+        return "SchemaAndValue{" +
+                "schema=" + schema +
+                ", value=" + value +
+                '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/copycat/api/src/main/java/org/apache/kafka/copycat/data/SchemaBuilder.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/data/SchemaBuilder.java b/copycat/api/src/main/java/org/apache/kafka/copycat/data/SchemaBuilder.java
new file mode 100644
index 0000000..fe9d474
--- /dev/null
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/data/SchemaBuilder.java
@@ -0,0 +1,371 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.data;
+
+import org.apache.kafka.copycat.errors.DataException;
+import org.apache.kafka.copycat.errors.SchemaBuilderException;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * <p>
+ *     SchemaBuilder provides a fluent API for constructing {@link Schema} objects. It allows you to set each of the
+ *     properties for the schema and each call returns the SchemaBuilder so the calls can be chained. When nested types
+ *     are required, use one of the predefined schemas from {@link Schema} or use a second SchemaBuilder inline.
+ * </p>
+ * <p>
+ *     Here is an example of building a struct schema:
+ *     <pre>
+ *     Schema dateSchema = SchemaBuilder.struct()
+ *         .name("com.example.CalendarDate").version(2).doc("A calendar date including month, day, and year.")
+ *         .field("month", Schema.STRING_SCHEMA)
+ *         .field("day", Schema.INT8_SCHEMA)
+ *         .field("year", Schema.INT16_SCHEMA)
+ *         .build();
+ *     </pre>
+ * </p>
+ * <p>
+ *     Here is an example of using a second SchemaBuilder to construct complex, nested types:
+ *     <pre>
+ *     Schema userListSchema = SchemaBuilder.array(
+ *         SchemaBuilder.struct().name("com.example.User").field("username", Schema.STRING_SCHEMA).field("id", Schema.INT64_SCHEMA).build()
+ *     ).build();
+ *     </pre>
+ * </p>
+ */
+public class SchemaBuilder implements Schema {
+    private static final String TYPE_FIELD = "type";
+    private static final String OPTIONAL_FIELD = "optional";
+    private static final String DEFAULT_FIELD = "default";
+    private static final String NAME_FIELD = "name";
+    private static final String VERSION_FIELD = "version";
+    private static final String DOC_FIELD = "doc";
+
+
+    private final Type type;
+    private Boolean optional = null;
+    private Object defaultValue = null;
+
+    private List<Field> fields = null;
+    private Schema keySchema = null;
+    private Schema valueSchema = null;
+
+    private String name;
+    private Integer version;
+    // Optional human readable documentation describing this schema.
+    private String doc;
+
+
+    private SchemaBuilder(Type type) {
+        this.type = type;
+    }
+
+    // Common/metadata fields
+
+    @Override
+    public boolean isOptional() {
+        return optional == null ? false : optional;
+    }
+
+    /**
+     * Set this schema as optional.
+     * @return the SchemaBuilder
+     */
+    public SchemaBuilder optional() {
+        checkNull(OPTIONAL_FIELD, optional);
+        optional = true;
+        return this;
+    }
+
+    /**
+     * Set this schema as required. This is the default, but this method can be used to make this choice explicit.
+     * @return the SchemaBuilder
+     */
+    public SchemaBuilder required() {
+        checkNull(OPTIONAL_FIELD, optional);
+        optional = false;
+        return this;
+    }
+
+    @Override
+    public Object defaultValue() {
+        return defaultValue;
+    }
+
+    /**
+     * Set the default value for this schema. The value is validated against the schema type, throwing a
+     * {@link SchemaBuilderException} if it does not match.
+     * @param value the default value
+     * @return the SchemaBuilder
+     */
+    public SchemaBuilder defaultValue(Object value) {
+        checkNull(DEFAULT_FIELD, defaultValue);
+        checkNotNull(TYPE_FIELD, type, DEFAULT_FIELD);
+        try {
+            CopycatSchema.validateValue(this, value);
+        } catch (DataException e) {
+            throw new SchemaBuilderException("Invalid default value", e);
+        }
+        defaultValue = value;
+        return this;
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
+
+    /**
+     * Set the name of this schema.
+     * @param name the schema name
+     * @return the SchemaBuilder
+     */
+    public SchemaBuilder name(String name) {
+        checkNull(NAME_FIELD, this.name);
+        this.name = name;
+        return this;
+    }
+
+    @Override
+    public Integer version() {
+        return version;
+    }
+
+    /**
+     * Set the version of this schema. Schema versions are integers which, if provided, must indicate which schema is
+     * newer and which is older by their ordering.
+     * @param version the schema version
+     * @return the SchemaBuilder
+     */
+    public SchemaBuilder version(Integer version) {
+        checkNull(VERSION_FIELD, this.version);
+        this.version = version;
+        return this;
+    }
+
+    @Override
+    public String doc() {
+        return doc;
+    }
+
+    /**
+     * Set the documentation for this schema.
+     * @param doc the documentation
+     * @return the SchemaBuilder
+     */
+    public SchemaBuilder doc(String doc) {
+        checkNull(DOC_FIELD, this.doc);
+        this.doc = doc;
+        return this;
+    }
+
+
+    @Override
+    public Type type() {
+        return type;
+    }
+
+    /**
+     * Create a SchemaBuilder for the specified type.
+     *
+     * Usually it will be simpler to use one of the variants like {@link #string()} or {@link #struct()}, but this form
+     * can be useful when generating schemas dynamically.
+     *
+     * @param type the schema type
+     * @return a new SchemaBuilder
+     */
+    public static SchemaBuilder type(Type type) {
+        return new SchemaBuilder(type);
+    }
+
+    // Primitive types
+
+    /**
+     * @return a new {@link Type#INT8} SchemaBuilder
+     */
+    public static SchemaBuilder int8() {
+        return new SchemaBuilder(Type.INT8);
+    }
+
+    /**
+     * @return a new {@link Type#INT16} SchemaBuilder
+     */
+    public static SchemaBuilder int16() {
+        return new SchemaBuilder(Type.INT16);
+    }
+
+    /**
+     * @return a new {@link Type#INT32} SchemaBuilder
+     */
+    public static SchemaBuilder int32() {
+        return new SchemaBuilder(Type.INT32);
+    }
+
+    /**
+     * @return a new {@link Type#INT64} SchemaBuilder
+     */
+    public static SchemaBuilder int64() {
+        return new SchemaBuilder(Type.INT64);
+    }
+
+    /**
+     * @return a new {@link Type#FLOAT32} SchemaBuilder
+     */
+    public static SchemaBuilder float32() {
+        return new SchemaBuilder(Type.FLOAT32);
+    }
+
+    /**
+     * @return a new {@link Type#FLOAT64} SchemaBuilder
+     */
+    public static SchemaBuilder float64() {
+        return new SchemaBuilder(Type.FLOAT64);
+    }
+
+    /**
+     * @return a new {@link Type#BOOLEAN} SchemaBuilder
+     */
+    public static SchemaBuilder bool() {
+        return new SchemaBuilder(Type.BOOLEAN);
+    }
+
+    /**
+     * @return a new {@link Type#STRING} SchemaBuilder
+     */
+    public static SchemaBuilder string() {
+        return new SchemaBuilder(Type.STRING);
+    }
+
+    /**
+     * @return a new {@link Type#BYTES} SchemaBuilder
+     */
+    public static SchemaBuilder bytes() {
+        return new SchemaBuilder(Type.BYTES);
+    }
+
+
+    // Structs
+
+    /**
+     * @return a new {@link Type#STRUCT} SchemaBuilder
+     */
+    public static SchemaBuilder struct() {
+        return new SchemaBuilder(Type.STRUCT);
+    }
+
+    /**
+     * Add a field to this struct schema. Throws a SchemaBuilderException if this is not a struct schema.
+     * @param fieldName the name of the field to add
+     * @param fieldSchema the Schema for the field's value
+     * @return the SchemaBuilder
+     */
+    public SchemaBuilder field(String fieldName, Schema fieldSchema) {
+        if (type != Type.STRUCT)
+            throw new SchemaBuilderException("Cannot create fields on type " + type);
+        if (fields == null)
+            fields = new ArrayList<>();
+        int fieldIndex = fields.size();
+        fields.add(new Field(fieldName, fieldIndex, fieldSchema));
+        return this;
+    }
+
+    /**
+     * Get the list of fields for this Schema. Throws a DataException if this schema is not a struct.
+     * @return the list of fields for this Schema
+     */
+    public List<Field> fields() {
+        if (type != Type.STRUCT)
+            throw new DataException("Cannot list fields on non-struct type");
+        return fields;
+    }
+
+    public Field field(String fieldName) {
+        if (type != Type.STRUCT)
+            throw new DataException("Cannot look up fields on non-struct type");
+        for (Field field : fields)
+            if (field.name() == fieldName)
+                return field;
+        return null;
+    }
+
+
+
+    // Maps & Arrays
+
+    /**
+     * @param valueSchema the schema for elements of the array
+     * @return a new {@link Type#ARRAY} SchemaBuilder
+     */
+    public static SchemaBuilder array(Schema valueSchema) {
+        SchemaBuilder builder = new SchemaBuilder(Type.ARRAY);
+        builder.valueSchema = valueSchema;
+        return builder;
+    }
+
+    /**
+     * @param keySchema the schema for keys in the map
+     * @param valueSchema the schema for values in the map
+     * @return a new {@link Type#MAP} SchemaBuilder
+     */
+    public static SchemaBuilder map(Schema keySchema, Schema valueSchema) {
+        SchemaBuilder builder = new SchemaBuilder(Type.MAP);
+        builder.keySchema = keySchema;
+        builder.valueSchema = valueSchema;
+        return builder;
+    }
+
+    @Override
+    public Schema keySchema() {
+        return keySchema;
+    }
+
+    @Override
+    public Schema valueSchema() {
+        return valueSchema;
+    }
+
+
+    /**
+     * Build the Schema using the current settings
+     * @return the {@link Schema}
+     */
+    public Schema build() {
+        return new CopycatSchema(type, isOptional(), defaultValue, name, version, doc, fields, keySchema, valueSchema);
+    }
+
+    /**
+     * Return a concrete instance of the {@link Schema} specified by this builder
+     * @return the {@link Schema}
+     */
+    @Override
+    public Schema schema() {
+        return build();
+    }
+
+
+    private static void checkNull(String fieldName, Object val) {
+        if (val != null)
+            throw new SchemaBuilderException("Invalid SchemaBuilder call: " + fieldName + " has already been set.");
+    }
+
+    private static void checkNotNull(String fieldName, Object val, String fieldToSet) {
+        if (val == null)
+            throw new SchemaBuilderException("Invalid SchemaBuilder call: " + fieldName + " must be specified to set " + fieldToSet);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/copycat/api/src/main/java/org/apache/kafka/copycat/data/Struct.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/data/Struct.java b/copycat/api/src/main/java/org/apache/kafka/copycat/data/Struct.java
new file mode 100644
index 0000000..bd757c4
--- /dev/null
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/data/Struct.java
@@ -0,0 +1,265 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.data;
+
+import org.apache.kafka.copycat.errors.DataException;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * <p>
+ *     A structured record containing a set of named fields with values, each field using an independent {@link Schema}.
+ *     Struct objects must specify a complete {@link Schema} up front, and only fields specified in the Schema may be set.
+ * </p>
+ * <p>
+ *     The Struct's {@link #put(String, Object)} method returns the Struct itself to provide a fluent API for constructing
+ *     complete objects:
+ *     <pre>
+ *         Schema schema = SchemaBuilder.struct().name("com.example.Person")
+ *             .field("name", Schema.STRING_SCHEMA).field("age", Schema.INT32_SCHEMA).build()
+ *         Struct struct = new Struct(schema).put("name", "Bobby McGee").put("age", 21)
+ *     </pre>
+ * </p>
+ */
+public class Struct {
+
+    private final Schema schema;
+    private final Object[] values;
+
+    /**
+     * Create a new Struct for this {@link Schema}
+     * @param schema the {@link Schema} for the Struct
+     */
+    public Struct(Schema schema) {
+        if (schema.type() != Schema.Type.STRUCT)
+            throw new DataException("Not a struct schema: " + schema);
+        this.schema = schema;
+        this.values = new Object[schema.fields().size()];
+    }
+
+    /**
+     * Get the schema for this Struct.
+     * @return the Struct's schema
+     */
+    public Schema schema() {
+        return schema;
+    }
+
+    /**
+     * Get the value of a field, returning the default value if no value has been set yet and a default value is specified
+     * in the field's schema. Because this handles fields of all types, the value is returned as an {@link Object} and
+     * must be cast to a more specific type.
+     * @param fieldName the field name to lookup
+     * @return the value for the field
+     */
+    public Object get(String fieldName) {
+        Field field = lookupField(fieldName);
+        return get(field);
+    }
+
+    /**
+     * Get the value of a field, returning the default value if no value has been set yet and a default value is specified
+     * in the field's schema. Because this handles fields of all types, the value is returned as an {@link Object} and
+     * must be cast to a more specific type.
+     * @param field the field to lookup
+     * @return the value for the field
+     */
+    public Object get(Field field) {
+        Object val = values[field.index()];
+        if (val == null && schema.defaultValue() != null) {
+            val = schema.defaultValue();
+        }
+        return val;
+    }
+
+    /**
+     * Get the underlying raw value for the field without accounting for default values.
+     * @param fieldName the field to get the value of
+     * @return the raw value
+     */
+    public Object getWithoutDefault(String fieldName) {
+        Field field = lookupField(fieldName);
+        return values[field.index()];
+    }
+
+    // Note that all getters have to have boxed return types since the fields might be optional
+
+    /**
+     * Equivalent to calling {@link #get(String)} and casting the result to a Byte.
+     */
+    public Byte getInt8(String fieldName) {
+        return (Byte) getCheckType(fieldName, Schema.Type.INT8);
+    }
+
+    /**
+     * Equivalent to calling {@link #get(String)} and casting the result to a Short.
+     */
+    public Short getInt16(String fieldName) {
+        return (Short) getCheckType(fieldName, Schema.Type.INT16);
+    }
+
+    /**
+     * Equivalent to calling {@link #get(String)} and casting the result to a Integer.
+     */
+    public Integer getInt32(String fieldName) {
+        return (Integer) getCheckType(fieldName, Schema.Type.INT32);
+    }
+
+    /**
+     * Equivalent to calling {@link #get(String)} and casting the result to a Long.
+     */
+    public Long getInt64(String fieldName) {
+        return (Long) getCheckType(fieldName, Schema.Type.INT64);
+    }
+
+    /**
+     * Equivalent to calling {@link #get(String)} and casting the result to a Float.
+     */
+    public Float getFloat32(String fieldName) {
+        return (Float) getCheckType(fieldName, Schema.Type.FLOAT32);
+    }
+
+    /**
+     * Equivalent to calling {@link #get(String)} and casting the result to a Double.
+     */
+    public Double getFloat64(String fieldName) {
+        return (Double) getCheckType(fieldName, Schema.Type.FLOAT64);
+    }
+
+    /**
+     * Equivalent to calling {@link #get(String)} and casting the result to a Boolean.
+     */
+    public Boolean getBoolean(String fieldName) {
+        return (Boolean) getCheckType(fieldName, Schema.Type.BOOLEAN);
+    }
+
+    /**
+     * Equivalent to calling {@link #get(String)} and casting the result to a String.
+     */
+    public String getString(String fieldName) {
+        return (String) getCheckType(fieldName, Schema.Type.STRING);
+    }
+
+    /**
+     * Equivalent to calling {@link #get(String)} and casting the result to a byte[].
+     */
+    public byte[] getBytes(String fieldName) {
+        Object bytes = getCheckType(fieldName, Schema.Type.BYTES);
+        if (bytes instanceof ByteBuffer)
+            return ((ByteBuffer) bytes).array();
+        return (byte[]) bytes;
+    }
+
+    /**
+     * Equivalent to calling {@link #get(String)} and casting the result to a List.
+     */
+    public <T> List<T> getArray(String fieldName) {
+        return (List<T>) getCheckType(fieldName, Schema.Type.ARRAY);
+    }
+
+    /**
+     * Equivalent to calling {@link #get(String)} and casting the result to a Map.
+     */
+    public <K, V> Map<K, V> getMap(String fieldName) {
+        return (Map<K, V>) getCheckType(fieldName, Schema.Type.MAP);
+    }
+
+    /**
+     * Equivalent to calling {@link #get(String)} and casting the result to a Struct.
+     */
+    public Struct getStruct(String fieldName) {
+        return (Struct) getCheckType(fieldName, Schema.Type.STRUCT);
+    }
+
+    /**
+     * Set the value of a field. Validates the value, throwing a {@link DataException} if it does not match the field's
+     * {@link Schema}.
+     * @param fieldName the name of the field to set
+     * @param value the value of the field
+     * @return the Struct, to allow chaining of {@link #put(String, Object)} calls
+     */
+    public Struct put(String fieldName, Object value) {
+        Field field = lookupField(fieldName);
+        return put(field, value);
+    }
+
+    /**
+     * Set the value of a field. Validates the value, throwing a {@link DataException} if it does not match the field's
+     * {@link Schema}.
+     * @param field the field to set
+     * @param value the value of the field
+     * @return the Struct, to allow chaining of {@link #put(String, Object)} calls
+     */
+    public Struct put(Field field, Object value) {
+        CopycatSchema.validateValue(field.schema(), value);
+        values[field.index()] = value;
+        return this;
+    }
+
+
+    /**
+     * Validates that this struct has filled in all the necessary data with valid values. For required fields
+     * without defaults, this validates that a value has been set and has matching types/schemas. If any validation
+     * fails, throws a DataException.
+     */
+    public void validate() {
+        for (Field field : schema.fields()) {
+            Schema fieldSchema = field.schema();
+            Object value = values[field.index()];
+            if (value == null && (fieldSchema.isOptional() || fieldSchema.defaultValue() != null))
+                continue;
+            CopycatSchema.validateValue(fieldSchema, value);
+        }
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        Struct struct = (Struct) o;
+        return Objects.equals(schema, struct.schema) &&
+                Arrays.equals(values, struct.values);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(schema, Arrays.hashCode(values));
+    }
+
+    private Field lookupField(String fieldName) {
+        Field field = schema.field(fieldName);
+        if (field == null)
+            throw new DataException(fieldName + " is not a valid field name");
+        return field;
+    }
+
+    // Get the field's value, but also check that the field matches the specified type, throwing an exception if it doesn't.
+    // Used to implement the get*() methods that return typed data instead of Object
+    private Object getCheckType(String fieldName, Schema.Type type) {
+        Field field = lookupField(fieldName);
+        if (field.schema().type() != type)
+            throw new DataException("Field '" + fieldName + "' is not of type " + type);
+        return values[field.index()];
+    }
+
+}
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/copycat/api/src/main/java/org/apache/kafka/copycat/errors/DataException.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/errors/DataException.java b/copycat/api/src/main/java/org/apache/kafka/copycat/errors/DataException.java
new file mode 100644
index 0000000..11139a4
--- /dev/null
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/errors/DataException.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.errors;
+
+/**
+ * Base class for all Copycat data API exceptions.
+ */
+public class DataException extends CopycatException {
+    public DataException(String s) {
+        super(s);
+    }
+
+    public DataException(String s, Throwable throwable) {
+        super(s, throwable);
+    }
+
+    public DataException(Throwable throwable) {
+        super(throwable);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/copycat/api/src/main/java/org/apache/kafka/copycat/errors/SchemaBuilderException.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/errors/SchemaBuilderException.java b/copycat/api/src/main/java/org/apache/kafka/copycat/errors/SchemaBuilderException.java
new file mode 100644
index 0000000..b5a93af
--- /dev/null
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/errors/SchemaBuilderException.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.errors;
+
+public class SchemaBuilderException extends DataException {
+    public SchemaBuilderException(String s) {
+        super(s);
+    }
+
+    public SchemaBuilderException(String s, Throwable throwable) {
+        super(s, throwable);
+    }
+
+    public SchemaBuilderException(Throwable throwable) {
+        super(throwable);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkRecord.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkRecord.java b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkRecord.java
index e3775b3..79ac725 100644
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkRecord.java
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkRecord.java
@@ -19,6 +19,7 @@ package org.apache.kafka.copycat.sink;
 
 import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.copycat.connector.CopycatRecord;
+import org.apache.kafka.copycat.data.Schema;
 
 /**
  * SinkRecord is a CopycatRecord that has been read from Kafka and includes the kafkaOffset of
@@ -29,12 +30,12 @@ import org.apache.kafka.copycat.connector.CopycatRecord;
 public class SinkRecord extends CopycatRecord {
     private final long kafkaOffset;
 
-    public SinkRecord(String topic, int partition, Object key, Object value, long kafkaOffset) {
-        super(topic, partition, key, value);
+    public SinkRecord(String topic, int partition, Schema keySchema, Object key, Schema valueSchema, Object value, long kafkaOffset) {
+        super(topic, partition, keySchema, key, valueSchema, value);
         this.kafkaOffset = kafkaOffset;
     }
 
-    public long getKafkaOffset() {
+    public long kafkaOffset() {
         return kafkaOffset;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java
index 7cc6109..67c045f 100644
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java
@@ -45,7 +45,7 @@ public abstract class SinkTaskContext {
      *
      * @param offsets map of offsets for topic partitions
      */
-    public void resetOffset(Map<TopicPartition, Long> offsets) {
+    public void offset(Map<TopicPartition, Long> offsets) {
         this.offsets = offsets;
     }
 
@@ -53,7 +53,7 @@ public abstract class SinkTaskContext {
      * Get offsets that the SinkTask has submitted to be reset. Used by the Copycat framework.
      * @return the map of offsets
      */
-    public Map<TopicPartition, Long> getOffsets() {
+    public Map<TopicPartition, Long> offsets() {
         return offsets;
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceRecord.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceRecord.java b/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceRecord.java
index 2085f66..05286a1 100644
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceRecord.java
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceRecord.java
@@ -19,6 +19,7 @@ package org.apache.kafka.copycat.source;
 
 import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.copycat.connector.CopycatRecord;
+import org.apache.kafka.copycat.data.Schema;
 
 /**
  * <p>
@@ -40,29 +41,47 @@ import org.apache.kafka.copycat.connector.CopycatRecord;
  */
 @InterfaceStability.Unstable
 public class SourceRecord extends CopycatRecord {
+    private final Schema sourcePartitionSchema;
     private final Object sourcePartition;
+    private final Schema sourceOffsetSchema;
     private final Object sourceOffset;
 
-    public SourceRecord(Object sourcePartition, Object sourceOffset, String topic, Integer partition, Object value) {
-        this(sourcePartition, sourceOffset, topic, partition, null, value);
+    public SourceRecord(Schema sourcePartitionSchema, Object sourcePartition,
+                        Schema sourceOffsetSchema, Object sourceOffset,
+                        String topic, Integer partition, Schema valueSchema, Object value) {
+        this(sourcePartitionSchema, sourcePartition, sourceOffsetSchema, sourceOffset, topic, partition, null, null, valueSchema, value);
     }
 
-    public SourceRecord(Object sourcePartition, Object sourceOffset, String topic, Object value) {
-        this(sourcePartition, sourceOffset, topic, null, null, value);
+    public SourceRecord(Schema sourcePartitionSchema, Object sourcePartition,
+                        Schema sourceOffsetSchema, Object sourceOffset,
+                        String topic, Schema valueSchema, Object value) {
+        this(sourcePartitionSchema, sourcePartition, sourceOffsetSchema, sourceOffset, topic, null, null, null, valueSchema, value);
     }
 
-    public SourceRecord(Object sourcePartition, Object sourceOffset, String topic, Integer partition,
-                        Object key, Object value) {
-        super(topic, partition, key, value);
+    public SourceRecord(Schema sourcePartitionSchema, Object sourcePartition,
+                        Schema sourceOffsetSchema, Object sourceOffset,
+                        String topic, Integer partition,
+                        Schema keySchema, Object key, Schema valueSchema, Object value) {
+        super(topic, partition, keySchema, key, valueSchema, value);
+        this.sourcePartitionSchema = sourcePartitionSchema;
         this.sourcePartition = sourcePartition;
+        this.sourceOffsetSchema = sourceOffsetSchema;
         this.sourceOffset = sourceOffset;
     }
 
-    public Object getSourcePartition() {
+    public Schema sourcePartitionSchema() {
+        return sourcePartitionSchema;
+    }
+
+    public Object sourcePartition() {
         return sourcePartition;
     }
 
-    public Object getSourceOffset() {
+    public Schema sourceOffsetSchema() {
+        return sourceOffsetSchema;
+    }
+
+    public Object sourceOffset() {
         return sourceOffset;
     }
 
@@ -77,10 +96,14 @@ public class SourceRecord extends CopycatRecord {
 
         SourceRecord that = (SourceRecord) o;
 
-        if (sourceOffset != null ? !sourceOffset.equals(that.sourceOffset) : that.sourceOffset != null)
+        if (sourcePartitionSchema != null ? !sourcePartitionSchema.equals(that.sourcePartitionSchema) : that.sourcePartitionSchema != null)
+            return false;
+        if (sourceOffsetSchema != null ? !sourceOffsetSchema.equals(that.sourceOffsetSchema) : that.sourceOffsetSchema != null)
             return false;
         if (sourcePartition != null ? !sourcePartition.equals(that.sourcePartition) : that.sourcePartition != null)
             return false;
+        if (sourceOffset != null ? !sourceOffset.equals(that.sourceOffset) : that.sourceOffset != null)
+            return false;
 
         return true;
     }
@@ -88,6 +111,8 @@ public class SourceRecord extends CopycatRecord {
     @Override
     public int hashCode() {
         int result = super.hashCode();
+        result = 31 * result + (sourcePartitionSchema != null ? sourcePartitionSchema.hashCode() : 0);
+        result = 31 * result + (sourceOffsetSchema != null ? sourceOffsetSchema.hashCode() : 0);
         result = 31 * result + (sourcePartition != null ? sourcePartition.hashCode() : 0);
         result = 31 * result + (sourceOffset != null ? sourceOffset.hashCode() : 0);
         return result;

http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTaskContext.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTaskContext.java b/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTaskContext.java
index d52fd62..a3875e7 100644
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTaskContext.java
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTaskContext.java
@@ -34,7 +34,7 @@ public class SourceTaskContext {
     /**
      * Get the OffsetStorageReader for this SourceTask.
      */
-    public OffsetStorageReader getOffsetStorageReader() {
+    public OffsetStorageReader offsetStorageReader() {
         return reader;
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/copycat/api/src/main/java/org/apache/kafka/copycat/storage/Converter.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/storage/Converter.java b/copycat/api/src/main/java/org/apache/kafka/copycat/storage/Converter.java
index c50aee7..dd2068d 100644
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/storage/Converter.java
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/storage/Converter.java
@@ -18,6 +18,8 @@
 package org.apache.kafka.copycat.storage;
 
 import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.copycat.data.Schema;
+import org.apache.kafka.copycat.data.SchemaAndValue;
 
 /**
  * The Converter interface provides support for translating between Copycat's runtime data format
@@ -31,15 +33,16 @@ public interface Converter<T> {
 
     /**
      * Convert a Copycat data object to a native object for serialization.
-     * @param value
+     * @param schema the schema for the value
+     * @param value the value to convert
      * @return
      */
-    T fromCopycatData(Object value);
+    T fromCopycatData(Schema schema, Object value);
 
     /**
      * Convert a native object to a Copycat data object.
-     * @param value
-     * @return
+     * @param value the value to convert
+     * @return an object containing the {@link Schema} and the converted value
      */
-    Object toCopycatData(T value);
+    SchemaAndValue toCopycatData(T value);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/copycat/api/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReader.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReader.java b/copycat/api/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReader.java
index 785660d..b51fbde 100644
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReader.java
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReader.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.copycat.storage;
 
 import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.copycat.data.SchemaAndValue;
 
 import java.util.Collection;
 import java.util.Map;
@@ -36,12 +37,12 @@ public interface OffsetStorageReader {
      * @param partition object uniquely identifying the partition of data
      * @return object uniquely identifying the offset in the partition of data
      */
-    Object getOffset(Object partition);
+    SchemaAndValue offset(SchemaAndValue partition);
 
     /**
      * <p>
      * Get a set of offsets for the specified partition identifiers. This may be more efficient
-     * than calling {@link #getOffset(Object)} repeatedly.
+     * than calling {@link #offset(SchemaAndValue)} repeatedly.
      * </p>
      * <p>
      * Note that when errors occur, this method omits the associated data and tries to return as
@@ -55,5 +56,5 @@ public interface OffsetStorageReader {
      * @param partitions set of identifiers for partitions of data
      * @return a map of partition identifiers to decoded offsets
      */
-    Map<Object, Object> getOffsets(Collection<Object> partitions);
+    Map<SchemaAndValue, SchemaAndValue> offsets(Collection<SchemaAndValue> partitions);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/copycat/api/src/test/java/org/apache/kafka/copycat/connector/ConnectorReconfigurationTest.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/test/java/org/apache/kafka/copycat/connector/ConnectorReconfigurationTest.java b/copycat/api/src/test/java/org/apache/kafka/copycat/connector/ConnectorReconfigurationTest.java
index e7ad2f3..cbaf866 100644
--- a/copycat/api/src/test/java/org/apache/kafka/copycat/connector/ConnectorReconfigurationTest.java
+++ b/copycat/api/src/test/java/org/apache/kafka/copycat/connector/ConnectorReconfigurationTest.java
@@ -57,12 +57,12 @@ public class ConnectorReconfigurationTest {
         }
 
         @Override
-        public Class<? extends Task> getTaskClass() {
+        public Class<? extends Task> taskClass() {
             return null;
         }
 
         @Override
-        public List<Properties> getTaskConfigs(int count) {
+        public List<Properties> taskConfigs(int count) {
             return null;
         }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/492bfdfa/copycat/api/src/test/java/org/apache/kafka/copycat/data/CopycatSchemaTest.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/test/java/org/apache/kafka/copycat/data/CopycatSchemaTest.java b/copycat/api/src/test/java/org/apache/kafka/copycat/data/CopycatSchemaTest.java
new file mode 100644
index 0000000..f400863
--- /dev/null
+++ b/copycat/api/src/test/java/org/apache/kafka/copycat/data/CopycatSchemaTest.java
@@ -0,0 +1,272 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.data;
+
+import org.apache.kafka.copycat.errors.DataException;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNull;
+
+public class CopycatSchemaTest {
+    private static final Schema MAP_INT_STRING_SCHEMA = SchemaBuilder.map(Schema.INT32_SCHEMA, Schema.STRING_SCHEMA).build();
+    private static final Schema FLAT_STRUCT_SCHEMA = SchemaBuilder.struct()
+            .field("field", Schema.INT32_SCHEMA)
+            .build();
+    private static final Schema STRUCT_SCHEMA = SchemaBuilder.struct()
+            .field("first", Schema.INT32_SCHEMA)
+            .field("second", Schema.STRING_SCHEMA)
+            .field("array", SchemaBuilder.array(Schema.INT32_SCHEMA).build())
+            .field("map", SchemaBuilder.map(Schema.INT32_SCHEMA, Schema.STRING_SCHEMA).build())
+            .field("nested", FLAT_STRUCT_SCHEMA)
+            .build();
+    private static final Schema PARENT_STRUCT_SCHEMA = SchemaBuilder.struct()
+            .field("nested", FLAT_STRUCT_SCHEMA)
+            .build();
+
+    @Test
+    public void testFieldsOnStructSchema() {
+        Schema schema = SchemaBuilder.struct()
+                .field("foo", Schema.BOOLEAN_SCHEMA)
+                .field("bar", Schema.INT32_SCHEMA)
+                .build();
+
+        assertEquals(2, schema.fields().size());
+        // Validate field lookup by name
+        Field foo = schema.field("foo");
+        assertEquals(0, foo.index());
+        Field bar = schema.field("bar");
+        assertEquals(1, bar.index());
+        // Any other field name should fail
+        assertNull(schema.field("other"));
+    }
+
+
+    @Test(expected = DataException.class)
+    public void testFieldsOnlyValidForStructs() {
+        Schema.INT8_SCHEMA.fields();
+    }
+
+    @Test
+    public void testValidateValueMatchingType() {
+        CopycatSchema.validateValue(Schema.INT8_SCHEMA, (byte) 1);
+        CopycatSchema.validateValue(Schema.INT16_SCHEMA, (short) 1);
+        CopycatSchema.validateValue(Schema.INT32_SCHEMA, 1);
+        CopycatSchema.validateValue(Schema.INT64_SCHEMA, (long) 1);
+        CopycatSchema.validateValue(Schema.FLOAT32_SCHEMA, 1.f);
+        CopycatSchema.validateValue(Schema.FLOAT64_SCHEMA, 1.);
+        CopycatSchema.validateValue(Schema.BOOLEAN_SCHEMA, true);
+        CopycatSchema.validateValue(Schema.STRING_SCHEMA, "a string");
+        CopycatSchema.validateValue(Schema.BYTES_SCHEMA, "a byte array".getBytes());
+        CopycatSchema.validateValue(Schema.BYTES_SCHEMA, ByteBuffer.wrap("a byte array".getBytes()));
+        CopycatSchema.validateValue(SchemaBuilder.array(Schema.INT32_SCHEMA).build(), Arrays.asList(1, 2, 3));
+        CopycatSchema.validateValue(
+                SchemaBuilder.map(Schema.INT32_SCHEMA, Schema.STRING_SCHEMA).build(),
+                Collections.singletonMap(1, "value")
+        );
+        // Struct tests the basic struct layout + complex field types + nested structs
+        Struct structValue = new Struct(STRUCT_SCHEMA)
+                .put("first", 1)
+                .put("second", "foo")
+                .put("array", Arrays.asList(1, 2, 3))
+                .put("map", Collections.singletonMap(1, "value"))
+                .put("nested", new Struct(FLAT_STRUCT_SCHEMA).put("field", 12));
+        CopycatSchema.validateValue(STRUCT_SCHEMA, structValue);
+    }
+
+    // To avoid requiring excessive numbers of tests, these checks for invalid types use a similar type where possible
+    // to only include a single test for each type
+
+    @Test(expected = DataException.class)
+    public void testValidateValueMismatchInt8() {
+        CopycatSchema.validateValue(Schema.INT8_SCHEMA, 1);
+    }
+
+    @Test(expected = DataException.class)
+    public void testValidateValueMismatchInt16() {
+        CopycatSchema.validateValue(Schema.INT16_SCHEMA, 1);
+    }
+
+    @Test(expected = DataException.class)
+    public void testValidateValueMismatchInt32() {
+        CopycatSchema.validateValue(Schema.INT32_SCHEMA, (long) 1);
+    }
+
+    @Test(expected = DataException.class)
+    public void testValidateValueMismatchInt64() {
+        CopycatSchema.validateValue(Schema.INT64_SCHEMA, 1);
+    }
+
+    @Test(expected = DataException.class)
+    public void testValidateValueMismatchFloat() {
+        CopycatSchema.validateValue(Schema.FLOAT32_SCHEMA, 1.0);
+    }
+
+    @Test(expected = DataException.class)
+    public void testValidateValueMismatchDouble() {
+        CopycatSchema.validateValue(Schema.FLOAT64_SCHEMA, 1.f);
+    }
+
+    @Test(expected = DataException.class)
+    public void testValidateValueMismatchBoolean() {
+        CopycatSchema.validateValue(Schema.BOOLEAN_SCHEMA, 1.f);
+    }
+
+    @Test(expected = DataException.class)
+    public void testValidateValueMismatchString() {
+        // CharSequence is a similar type (supertype of String), but we restrict to String.
+        CharBuffer cbuf = CharBuffer.wrap("abc");
+        CopycatSchema.validateValue(Schema.STRING_SCHEMA, cbuf);
+    }
+
+    @Test(expected = DataException.class)
+    public void testValidateValueMismatchBytes() {
+        CopycatSchema.validateValue(Schema.BYTES_SCHEMA, new Object[]{1, "foo"});
+    }
+
+    @Test(expected = DataException.class)
+    public void testValidateValueMismatchArray() {
+        CopycatSchema.validateValue(SchemaBuilder.array(Schema.INT32_SCHEMA).build(), Arrays.asList("a", "b", "c"));
+    }
+
+    @Test(expected = DataException.class)
+    public void testValidateValueMismatchArraySomeMatch() {
+        // Even if some match the right type, this should fail if any mismatch. In this case, type erasure loses
+        // the fact that the list is actually List<Object>, but we couldn't tell if only checking the first element
+        CopycatSchema.validateValue(SchemaBuilder.array(Schema.INT32_SCHEMA).build(), Arrays.asList(1, 2, "c"));
+    }
+
+    @Test(expected = DataException.class)
+    public void testValidateValueMismatchMapKey() {
+        CopycatSchema.validateValue(MAP_INT_STRING_SCHEMA, Collections.singletonMap("wrong key type", "value"));
+    }
+
+    @Test(expected = DataException.class)
+    public void testValidateValueMismatchMapValue() {
+        CopycatSchema.validateValue(MAP_INT_STRING_SCHEMA, Collections.singletonMap(1, 2));
+    }
+
+    @Test(expected = DataException.class)
+    public void testValidateValueMismatchMapSomeKeys() {
+        Map<Object, String> data = new HashMap<>();
+        data.put(1, "abc");
+        data.put("wrong", "it's as easy as one two three");
+        CopycatSchema.validateValue(MAP_INT_STRING_SCHEMA, data);
+    }
+
+    @Test(expected = DataException.class)
+    public void testValidateValueMismatchMapSomeValues() {
+        Map<Integer, Object> data = new HashMap<>();
+        data.put(1, "abc");
+        data.put(2, "wrong".getBytes());
+        CopycatSchema.validateValue(MAP_INT_STRING_SCHEMA, data);
+    }
+
+    @Test(expected = DataException.class)
+    public void testValidateValueMismatchStructWrongSchema() {
+        // Completely mismatching schemas
+        CopycatSchema.validateValue(
+                FLAT_STRUCT_SCHEMA,
+                new Struct(SchemaBuilder.struct().field("x", Schema.INT32_SCHEMA).build()).put("x", 1)
+        );
+    }
+
+    @Test(expected = DataException.class)
+    public void testValidateValueMismatchStructWrongNestedSchema() {
+        // Top-level schema  matches, but nested does not.
+        CopycatSchema.validateValue(
+                PARENT_STRUCT_SCHEMA,
+                new Struct(PARENT_STRUCT_SCHEMA)
+                        .put("nested", new Struct(SchemaBuilder.struct().field("x", Schema.INT32_SCHEMA).build()).put("x", 1))
+        );
+    }
+
+
+    @Test
+    public void testPrimitiveEquality() {
+        // Test that primitive types, which only need to consider all the type & metadata fields, handle equality correctly
+        CopycatSchema s1 = new CopycatSchema(Schema.Type.INT8, false, null, "name", 2, "doc", null, null, null);
+        CopycatSchema s2 = new CopycatSchema(Schema.Type.INT8, false, null, "name", 2, "doc", null, null, null);
+        CopycatSchema differentType = new CopycatSchema(Schema.Type.INT16, false, null, "name", 2, "doc", null, null, null);
+        CopycatSchema differentOptional = new CopycatSchema(Schema.Type.INT8, true, null, "name", 2, "doc", null, null, null);
+        CopycatSchema differentDefault = new CopycatSchema(Schema.Type.INT8, false, true, "name", 2, "doc", null, null, null);
+        CopycatSchema differentName = new CopycatSchema(Schema.Type.INT8, false, null, "otherName", 2, "doc", null, null, null);
+        CopycatSchema differentVersion = new CopycatSchema(Schema.Type.INT8, false, null, "name", 4, "doc", null, null, null);
+        CopycatSchema differentDoc = new CopycatSchema(Schema.Type.INT8, false, null, "name", 2, "other doc", null, null, null);
+
+        assertEquals(s1, s2);
+        assertNotEquals(s1, differentType);
+        assertNotEquals(s1, differentOptional);
+        assertNotEquals(s1, differentDefault);
+        assertNotEquals(s1, differentName);
+        assertNotEquals(s1, differentVersion);
+        assertNotEquals(s1, differentDoc);
+    }
+
+    @Test
+    public void testArrayEquality() {
+        // Validate that the value type for the array is tested for equality. This test makes sure the same schema object is
+        // never reused to ensure we're actually checking equality
+        CopycatSchema s1 = new CopycatSchema(Schema.Type.ARRAY, false, null, null, null, null, null, null, SchemaBuilder.int8().build());
+        CopycatSchema s2 = new CopycatSchema(Schema.Type.ARRAY, false, null, null, null, null, null, null, SchemaBuilder.int8().build());
+        CopycatSchema differentValueSchema = new CopycatSchema(Schema.Type.ARRAY, false, null, null, null, null, null, null, SchemaBuilder.int16().build());
+
+        assertEquals(s1, s2);
+        assertNotEquals(s1, differentValueSchema);
+    }
+
+    @Test
+    public void testMapEquality() {
+        // Same as testArrayEquality, but for both key and value schemas
+        CopycatSchema s1 = new CopycatSchema(Schema.Type.MAP, false, null, null, null, null, null, SchemaBuilder.int8().build(), SchemaBuilder.int16().build());
+        CopycatSchema s2 = new CopycatSchema(Schema.Type.MAP, false, null, null, null, null, null, SchemaBuilder.int8().build(), SchemaBuilder.int16().build());
+        CopycatSchema differentKeySchema = new CopycatSchema(Schema.Type.MAP, false, null, null, null, null, null, SchemaBuilder.string().build(), SchemaBuilder.int16().build());
+        CopycatSchema differentValueSchema = new CopycatSchema(Schema.Type.MAP, false, null, null, null, null, null, SchemaBuilder.int8().build(), SchemaBuilder.string().build());
+
+        assertEquals(s1, s2);
+        assertNotEquals(s1, differentKeySchema);
+        assertNotEquals(s1, differentValueSchema);
+    }
+
+    @Test
+    public void testStructEquality() {
+        // Same as testArrayEquality, but checks differences in fields. Only does a simple check, relying on tests of
+        // Field's equals() method to validate all variations in the list of fields will be checked
+        CopycatSchema s1 = new CopycatSchema(Schema.Type.STRUCT, false, null, null, null, null,
+                Arrays.asList(new Field("field", 0, SchemaBuilder.int8().build()),
+                        new Field("field2", 1, SchemaBuilder.int16().build())), null, null);
+        CopycatSchema s2 = new CopycatSchema(Schema.Type.STRUCT, false, null, null, null, null,
+                Arrays.asList(new Field("field", 0, SchemaBuilder.int8().build()),
+                        new Field("field2", 1, SchemaBuilder.int16().build())), null, null);
+        CopycatSchema differentField = new CopycatSchema(Schema.Type.STRUCT, false, null, null, null, null,
+                Arrays.asList(new Field("field", 0, SchemaBuilder.int8().build()),
+                        new Field("different field name", 1, SchemaBuilder.int16().build())), null, null);
+
+        assertEquals(s1, s2);
+        assertNotEquals(s1, differentField);
+    }
+
+}