You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by ga...@apache.org on 2017/09/07 12:23:27 UTC

[2/2] avro git commit: AVRO-1933: Add more specific error details to SchemaCompatibility class

AVRO-1933: Add more specific error details to SchemaCompatibility class

Closes #200

Signed-off-by: Sriharsha Chintalapani <ha...@hortonworks.com>
Signed-off-by: Anna Szonyi <sz...@cloudera.com>
Signed-off-by: Nandor Kollar <nk...@cloudera.com>
Signed-off-by: Gabor Szadovszky <ga...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/avro/repo
Commit: http://git-wip-us.apache.org/repos/asf/avro/commit/db8ed216
Tree: http://git-wip-us.apache.org/repos/asf/avro/tree/db8ed216
Diff: http://git-wip-us.apache.org/repos/asf/avro/diff/db8ed216

Branch: refs/heads/master
Commit: db8ed216eaf13c8f3862eb31152185f0504c4467
Parents: 189368e
Author: Anders Sundelin <an...@ericsson.com>
Authored: Thu Mar 2 11:12:03 2017 +0100
Committer: Gabor Szadovszky <ga...@apache.org>
Committed: Thu Sep 7 14:05:19 2017 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   3 +
 .../org/apache/avro/SchemaCompatibility.java    | 434 ++++++++++++++-----
 .../apache/avro/TestSchemaCompatibility.java    | 326 +++++---------
 ...estSchemaCompatibilityFixedSizeMismatch.java |  59 +++
 ...stSchemaCompatibilityMissingEnumSymbols.java |  67 +++
 ...stSchemaCompatibilityMissingUnionBranch.java | 111 +++++
 .../TestSchemaCompatibilityNameMismatch.java    |  70 +++
 ...atibilityReaderFieldMissingDefaultValue.java |  59 +++
 .../TestSchemaCompatibilityTypeMismatch.java    | 128 ++++++
 .../org/apache/avro/TestSchemaValidation.java   | 222 +++++++++-
 .../test/java/org/apache/avro/TestSchemas.java  | 139 ++++++
 11 files changed, 1273 insertions(+), 345 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/avro/blob/db8ed216/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 58bf78c..e78adc4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -10,6 +10,9 @@ Trunk (not yet released)
 
     AVRO-2061: Improve Invalid File Format Error Message (Beluga Behr via gabor)
 
+    AVRO-1933: Add more specific error details to SchemaCompatibility class
+    (Anders Sundelin via gabor)
+
   NEW FEATURES
 
     AVRO-1704: Java: Add support for single-message encoding. (blue)

http://git-wip-us.apache.org/repos/asf/avro/blob/db8ed216/lang/java/avro/src/main/java/org/apache/avro/SchemaCompatibility.java
----------------------------------------------------------------------
diff --git a/lang/java/avro/src/main/java/org/apache/avro/SchemaCompatibility.java b/lang/java/avro/src/main/java/org/apache/avro/SchemaCompatibility.java
index c713c32..4b454f6 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/SchemaCompatibility.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/SchemaCompatibility.java
@@ -20,10 +20,10 @@ package org.apache.avro;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TreeSet;
 
 import org.apache.avro.Schema.Field;
 import org.apache.avro.Schema.Type;
@@ -49,7 +49,6 @@ public class SchemaCompatibility {
   /**
    * Validates that the provided reader schema can be used to decode avro data written with the
    * provided writer schema.
-   *
    * @param reader schema to check.
    * @param writer schema to check.
    * @return a result object identifying any compatibility errors.
@@ -58,12 +57,12 @@ public class SchemaCompatibility {
       final Schema reader,
       final Schema writer
   ) {
-    final SchemaCompatibilityType compatibility =
+    final SchemaCompatibilityResult compatibility =
         new ReaderWriterCompatiblityChecker()
             .getCompatibility(reader, writer);
 
     final String message;
-    switch (compatibility) {
+    switch (compatibility.getCompatibility()) {
       case INCOMPATIBLE: {
         message = String.format(
             "Data encoded using writer schema:%n%s%n"
@@ -209,8 +208,8 @@ public class SchemaCompatibility {
    * <p> Provides memoization to handle recursive schemas. </p>
    */
   private static final class ReaderWriterCompatiblityChecker {
-    private final Map<ReaderWriter, SchemaCompatibilityType> mMemoizeMap =
-        new HashMap<ReaderWriter, SchemaCompatibilityType>();
+    private final Map<ReaderWriter, SchemaCompatibilityResult> mMemoizeMap =
+        new HashMap<ReaderWriter, SchemaCompatibilityResult>();
 
     /**
      * Reports the compatibility of a reader/writer schema pair.
@@ -221,24 +220,24 @@ public class SchemaCompatibility {
      * @param writer Writer schema to test.
      * @return the compatibility of the reader/writer schema pair.
      */
-    public SchemaCompatibilityType getCompatibility(
+    public SchemaCompatibilityResult getCompatibility(
         final Schema reader,
         final Schema writer
     ) {
       LOG.debug("Checking compatibility of reader {} with writer {}", reader, writer);
       final ReaderWriter pair = new ReaderWriter(reader, writer);
-      final SchemaCompatibilityType existing = mMemoizeMap.get(pair);
+      final SchemaCompatibilityResult existing = mMemoizeMap.get(pair);
       if (existing != null) {
-        if (existing == SchemaCompatibilityType.RECURSION_IN_PROGRESS) {
+        if (existing.getCompatibility() == SchemaCompatibilityType.RECURSION_IN_PROGRESS) {
           // Break the recursion here.
           // schemas are compatible unless proven incompatible:
-          return SchemaCompatibilityType.COMPATIBLE;
+          return SchemaCompatibilityResult.compatible();
         }
         return existing;
       }
       // Mark this reader/writer pair as "in progress":
-      mMemoizeMap.put(pair, SchemaCompatibilityType.RECURSION_IN_PROGRESS);
-      final SchemaCompatibilityType calculated = calculateCompatibility(reader, writer);
+      mMemoizeMap.put(pair, SchemaCompatibilityResult.recursionInProgress());
+      final SchemaCompatibilityResult calculated = calculateCompatibility(reader, writer);
       mMemoizeMap.put(pair, calculated);
       return calculated;
     }
@@ -254,7 +253,7 @@ public class SchemaCompatibility {
      * @param writer Writer schema to test.
      * @return the compatibility of the reader/writer schema pair.
      */
-    private SchemaCompatibilityType calculateCompatibility(
+    private SchemaCompatibilityResult calculateCompatibility(
         final Schema reader,
         final Schema writer
     ) {
@@ -271,7 +270,7 @@ public class SchemaCompatibility {
           case DOUBLE:
           case BYTES:
           case STRING: {
-            return SchemaCompatibilityType.COMPATIBLE;
+            return SchemaCompatibilityResult.compatible();
           }
           case ARRAY: {
             return getCompatibility(reader.getElementType(), writer.getElementType());
@@ -280,66 +279,39 @@ public class SchemaCompatibility {
             return getCompatibility(reader.getValueType(), writer.getValueType());
           }
           case FIXED: {
-            // fixed size and name must match:
-            if (!schemaNameEquals(reader, writer)) {
-              return SchemaCompatibilityType.INCOMPATIBLE;
-            }
-            if (reader.getFixedSize() != writer.getFixedSize()) {
-              return SchemaCompatibilityType.INCOMPATIBLE;
+            SchemaCompatibilityResult nameCheck = checkSchemaNames(reader, writer);
+            if (nameCheck.getCompatibility() == SchemaCompatibilityType.INCOMPATIBLE) {
+              return nameCheck;
             }
-            return SchemaCompatibilityType.COMPATIBLE;
+            return checkFixedSize(reader, writer);
           }
           case ENUM: {
-            // enum names must match:
-            if (!schemaNameEquals(reader, writer)) {
-              return SchemaCompatibilityType.INCOMPATIBLE;
+            SchemaCompatibilityResult nameCheck = checkSchemaNames(reader, writer);
+            if (nameCheck.getCompatibility() == SchemaCompatibilityType.INCOMPATIBLE) {
+              return nameCheck;
             }
-            // reader symbols must contain all writer symbols:
-            final Set<String> symbols = new HashSet<String>(writer.getEnumSymbols());
-            symbols.removeAll(reader.getEnumSymbols());
-            // TODO: Report a human-readable error.
-            // if (!symbols.isEmpty()) {
-            // }
-            return symbols.isEmpty()
-                ? SchemaCompatibilityType.COMPATIBLE
-                : SchemaCompatibilityType.INCOMPATIBLE;
+            return checkReaderEnumContainsAllWriterEnumSymbols(reader, writer);
           }
           case RECORD: {
-            // record names must match:
-            if (!schemaNameEquals(reader, writer)) {
-              return SchemaCompatibilityType.INCOMPATIBLE;
+            SchemaCompatibilityResult nameCheck = checkSchemaNames(reader, writer);
+            if (nameCheck.getCompatibility() == SchemaCompatibilityType.INCOMPATIBLE) {
+              return nameCheck;
             }
-
-            // Check that each field in the reader record can be populated from the writer record:
-            for (final Field readerField : reader.getFields()) {
-              final Field writerField = lookupWriterField(writer, readerField);
-              if (writerField == null) {
-                // Reader field does not correspond to any field in the writer record schema,
-                // reader field must have a default value.
-                if (readerField.defaultValue() == null) {
-                  // reader field has no default value
-                  return SchemaCompatibilityType.INCOMPATIBLE;
-                }
-              } else {
-                if (getCompatibility(readerField.schema(), writerField.schema())
-                    == SchemaCompatibilityType.INCOMPATIBLE) {
-                  return SchemaCompatibilityType.INCOMPATIBLE;
-                }
-              }
-            }
-
-            // All fields in the reader record can be populated from the writer record:
-            return SchemaCompatibilityType.COMPATIBLE;
+            return checkReaderWriterRecordFields(reader, writer);
           }
           case UNION: {
             // Check that each individual branch of the writer union can be decoded:
             for (final Schema writerBranch : writer.getTypes()) {
-              if (getCompatibility(reader, writerBranch) == SchemaCompatibilityType.INCOMPATIBLE) {
-                return SchemaCompatibilityType.INCOMPATIBLE;
+              SchemaCompatibilityResult compatibility = getCompatibility(reader, writerBranch);
+              if (compatibility.getCompatibility() == SchemaCompatibilityType.INCOMPATIBLE) {
+                String msg = String.format("reader union lacking writer type: %s",
+                    writerBranch.getType());
+                return SchemaCompatibilityResult.incompatible(
+                    SchemaIncompatibilityType.MISSING_UNION_BRANCH, reader, writer, msg);
               }
             }
             // Each schema in the writer union can be decoded with the reader:
-            return SchemaCompatibilityType.COMPATIBLE;
+            return SchemaCompatibilityResult.compatible();
           }
 
           default: {
@@ -353,61 +325,65 @@ public class SchemaCompatibility {
         // Reader compatible with all branches of a writer union is compatible
         if (writer.getType() == Schema.Type.UNION) {
           for (Schema s : writer.getTypes()) {
-            SchemaCompatibilityType compatibility = getCompatibility(reader, s);
-            if (compatibility == SchemaCompatibilityType.INCOMPATIBLE) {
-              return SchemaCompatibilityType.INCOMPATIBLE;
+            SchemaCompatibilityResult compat = getCompatibility(reader, s);
+            if (compat.getCompatibility() == SchemaCompatibilityType.INCOMPATIBLE) {
+              return compat;
             }
           }
-          return SchemaCompatibilityType.COMPATIBLE;
+          return SchemaCompatibilityResult.compatible();
         }
 
         switch (reader.getType()) {
-          case NULL: return SchemaCompatibilityType.INCOMPATIBLE;
-          case BOOLEAN: return SchemaCompatibilityType.INCOMPATIBLE;
-          case INT: return SchemaCompatibilityType.INCOMPATIBLE;
+          case NULL:
+            return typeMismatch(reader, writer);
+          case BOOLEAN:
+            return typeMismatch(reader, writer);
+          case INT:
+            return typeMismatch(reader, writer);
           case LONG: {
-            return (writer.getType() == Type.INT)
-                ? SchemaCompatibilityType.COMPATIBLE
-                : SchemaCompatibilityType.INCOMPATIBLE;
+            return (writer.getType() == Type.INT) ? SchemaCompatibilityResult.compatible()
+                : typeMismatch(reader, writer);
           }
           case FLOAT: {
-            return ((writer.getType() == Type.INT)
-                || (writer.getType() == Type.LONG))
-                ? SchemaCompatibilityType.COMPATIBLE
-                : SchemaCompatibilityType.INCOMPATIBLE;
+            return ((writer.getType() == Type.INT) || (writer.getType() == Type.LONG))
+                ? SchemaCompatibilityResult.compatible() : typeMismatch(reader, writer);
 
           }
           case DOUBLE: {
-            return ((writer.getType() == Type.INT)
-                || (writer.getType() == Type.LONG)
-                || (writer.getType() == Type.FLOAT))
-                ? SchemaCompatibilityType.COMPATIBLE
-                : SchemaCompatibilityType.INCOMPATIBLE;
+            return ((writer.getType() == Type.INT) || (writer.getType() == Type.LONG)
+                || (writer.getType() == Type.FLOAT)) ? SchemaCompatibilityResult.compatible()
+                    : typeMismatch(reader, writer);
           }
           case BYTES: {
-            return (writer.getType() == Type.STRING)
-                      ? SchemaCompatibilityType.COMPATIBLE
-                      : SchemaCompatibilityType.INCOMPATIBLE;
-                }
+            return (writer.getType() == Type.STRING) ? SchemaCompatibilityResult.compatible()
+                : typeMismatch(reader, writer);
+          }
           case STRING: {
-            return (writer.getType() == Type.BYTES)
-                  ? SchemaCompatibilityType.COMPATIBLE
-                  : SchemaCompatibilityType.INCOMPATIBLE;
-            }
+            return (writer.getType() == Type.BYTES) ? SchemaCompatibilityResult.compatible()
+                : typeMismatch(reader, writer);
+          }
 
-          case ARRAY: return SchemaCompatibilityType.INCOMPATIBLE;
-          case MAP: return SchemaCompatibilityType.INCOMPATIBLE;
-          case FIXED: return SchemaCompatibilityType.INCOMPATIBLE;
-          case ENUM: return SchemaCompatibilityType.INCOMPATIBLE;
-          case RECORD: return SchemaCompatibilityType.INCOMPATIBLE;
+          case ARRAY:
+            return typeMismatch(reader, writer);
+          case MAP:
+            return typeMismatch(reader, writer);
+          case FIXED:
+            return typeMismatch(reader, writer);
+          case ENUM:
+            return typeMismatch(reader, writer);
+          case RECORD:
+            return typeMismatch(reader, writer);
           case UNION: {
             for (final Schema readerBranch : reader.getTypes()) {
-              if (getCompatibility(readerBranch, writer) == SchemaCompatibilityType.COMPATIBLE) {
-                return SchemaCompatibilityType.COMPATIBLE;
+              SchemaCompatibilityResult compatibility = getCompatibility(readerBranch, writer);
+              if (compatibility.getCompatibility() == SchemaCompatibilityType.COMPATIBLE) {
+                return SchemaCompatibilityResult.compatible();
               }
             }
             // No branch in the reader union has been found compatible with the writer schema:
-            return SchemaCompatibilityType.INCOMPATIBLE;
+            String msg = String.format("reader union lacking writer type: %s", writer.getType());
+            return SchemaCompatibilityResult
+                .incompatible(SchemaIncompatibilityType.MISSING_UNION_BRANCH, reader, writer, msg);
           }
 
           default: {
@@ -416,12 +392,74 @@ public class SchemaCompatibility {
         }
       }
     }
+
+    private SchemaCompatibilityResult checkReaderWriterRecordFields(final Schema reader,
+        final Schema writer) {
+      // Check that each field in the reader record can be populated from the writer record:
+      for (final Field readerField : reader.getFields()) {
+        final Field writerField = lookupWriterField(writer, readerField);
+        if (writerField == null) {
+          // Reader field does not correspond to any field in the writer record schema, so the
+          // reader field must have a default value.
+          if (readerField.defaultValue() == null) {
+            // reader field has no default value
+            return SchemaCompatibilityResult.incompatible(
+                SchemaIncompatibilityType.READER_FIELD_MISSING_DEFAULT_VALUE, reader, writer,
+                readerField.name());
+          }
+        } else {
+          SchemaCompatibilityResult compatibility = getCompatibility(readerField.schema(),
+              writerField.schema());
+          if (compatibility.getCompatibility() == SchemaCompatibilityType.INCOMPATIBLE) {
+            return compatibility;
+          }
+        }
+      }
+      // All fields in the reader record can be populated from the writer record:
+      return SchemaCompatibilityResult.compatible();
+    }
+
+    private SchemaCompatibilityResult checkReaderEnumContainsAllWriterEnumSymbols(
+        final Schema reader, final Schema writer) {
+      final Set<String> symbols = new TreeSet<>(writer.getEnumSymbols());
+      symbols.removeAll(reader.getEnumSymbols());
+      return symbols.isEmpty() ? SchemaCompatibilityResult.compatible()
+          : SchemaCompatibilityResult.incompatible(SchemaIncompatibilityType.MISSING_ENUM_SYMBOLS,
+              reader, writer, symbols.toString());
+    }
+
+    private SchemaCompatibilityResult checkFixedSize(final Schema reader, final Schema writer) {
+      int actual = reader.getFixedSize();
+      int expected = writer.getFixedSize();
+      if (actual != expected) {
+        String msg = String.format("expected: %d, found: %d", expected, actual);
+        return SchemaCompatibilityResult.incompatible(SchemaIncompatibilityType.FIXED_SIZE_MISMATCH,
+            reader, writer, msg);
+      }
+      return SchemaCompatibilityResult.compatible();
+    }
+
+    private SchemaCompatibilityResult checkSchemaNames(final Schema reader, final Schema writer) {
+      if (!schemaNameEquals(reader, writer)) {
+        String msg = String.format("expected: %s", writer.getFullName());
+        return SchemaCompatibilityResult.incompatible(SchemaIncompatibilityType.NAME_MISMATCH,
+            reader, writer, msg);
+      }
+      return SchemaCompatibilityResult.compatible();
+    }
+
+    private SchemaCompatibilityResult typeMismatch(final Schema reader, final Schema writer) {
+      String msg = String.format("reader type: %s not compatible with writer type: %s",
+          reader.getType(), writer.getType());
+      return SchemaCompatibilityResult.incompatible(SchemaIncompatibilityType.TYPE_MISMATCH, reader,
+          writer, msg);
+    }
   }
 
   /**
    * Identifies the type of a schema compatibility result.
    */
-  public static enum SchemaCompatibilityType {
+  public enum SchemaCompatibilityType {
     COMPATIBLE,
     INCOMPATIBLE,
 
@@ -429,6 +467,180 @@ public class SchemaCompatibility {
     RECURSION_IN_PROGRESS;
   }
 
+  public enum SchemaIncompatibilityType {
+    NAME_MISMATCH,
+    FIXED_SIZE_MISMATCH,
+    MISSING_ENUM_SYMBOLS,
+    READER_FIELD_MISSING_DEFAULT_VALUE,
+    TYPE_MISMATCH,
+    MISSING_UNION_BRANCH;
+  }
+
+  /**
+   * Immutable class representing details about a particular schema pair compatibility check.
+   */
+  public static final class SchemaCompatibilityResult {
+    private final SchemaCompatibilityType mCompatibility;
+    // the below fields are only valid if INCOMPATIBLE
+    private final SchemaIncompatibilityType mSchemaIncompatibilityType;
+    private final Schema mReaderSubset;
+    private final Schema mWriterSubset;
+    private final String mMessage;
+    // cached objects for stateless details
+    private static final SchemaCompatibilityResult COMPATIBLE = new SchemaCompatibilityResult(
+        SchemaCompatibilityType.COMPATIBLE, null, null, null, null);
+    private static final SchemaCompatibilityResult RECURSION_IN_PROGRESS = new SchemaCompatibilityResult(
+        SchemaCompatibilityType.RECURSION_IN_PROGRESS, null, null, null, null);
+
+    private SchemaCompatibilityResult(SchemaCompatibilityType type,
+        SchemaIncompatibilityType errorDetails, Schema readerDetails, Schema writerDetails,
+        String details) {
+      this.mCompatibility = type;
+      this.mSchemaIncompatibilityType = errorDetails;
+      this.mReaderSubset = readerDetails;
+      this.mWriterSubset = writerDetails;
+      this.mMessage = details;
+    }
+
+    /**
+     * Returns a details object representing a compatible schema pair.
+     * @return a SchemaCompatibilityDetails object with COMPATIBLE SchemaCompatibilityType, and no
+     *         other state.
+     */
+    public static SchemaCompatibilityResult compatible() {
+      return COMPATIBLE;
+    }
+
+    /**
+     * Returns a details object representing a state indicating that recursion is in progress.
+     * @return a SchemaCompatibilityDetails object with RECURSION_IN_PROGRESS
+     *         SchemaCompatibilityType, and no other state.
+     */
+    public static SchemaCompatibilityResult recursionInProgress() {
+      return RECURSION_IN_PROGRESS;
+    }
+
+    /**
+     * Returns a details object representing an incompatible schema pair, including error details.
+     * @return a SchemaCompatibilityDetails object with INCOMPATIBLE SchemaCompatibilityType, and
+     *         state representing the violating part.
+     */
+    public static SchemaCompatibilityResult incompatible(SchemaIncompatibilityType error,
+        Schema reader, Schema writer, String details) {
+      return new SchemaCompatibilityResult(SchemaCompatibilityType.INCOMPATIBLE, error, reader,
+          writer, details);
+    }
+
+    /**
+     * Returns the SchemaCompatibilityType, always non-null.
+     * @return a SchemaCompatibilityType instance, always non-null
+     */
+    public SchemaCompatibilityType getCompatibility() {
+      return mCompatibility;
+    }
+
+    /**
+     * If the compatibility is INCOMPATIBLE, returns the SchemaIncompatibilityType (first thing that
+     * was incompatible), otherwise null.
+     * @return a SchemaIncompatibilityType instance, or null
+     */
+    public SchemaIncompatibilityType getIncompatibility() {
+      return mSchemaIncompatibilityType;
+    }
+
+    /**
+     * If the compatibility is INCOMPATIBLE, returns the first part of the reader schema that failed
+     * compatibility check.
+     * @return a Schema instance (part of the reader schema), or null
+     */
+    public Schema getReaderSubset() {
+      return mReaderSubset;
+    }
+
+    /**
+     * If the compatibility is INCOMPATIBLE, returns the first part of the writer schema that failed
+     * compatibility check.
+     * @return a Schema instance (part of the writer schema), or null
+     */
+    public Schema getWriterSubset() {
+      return mWriterSubset;
+    }
+
+    /**
+     * If the compatibility is INCOMPATIBLE, returns a human-readable string with more details about
+     * what failed. Syntax depends on the SchemaIncompatibilityType.
+     * @see #getIncompatibility()
+     * @return a String with details about the incompatibility, or null
+     */
+    public String getMessage() {
+      return mMessage;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public int hashCode() {
+      final int prime = 31;
+      int result = 1;
+      result = prime * result + ((mMessage == null) ? 0 : mMessage.hashCode());
+      result = prime * result + ((mReaderSubset == null) ? 0 : mReaderSubset.hashCode());
+      result = prime * result + ((mCompatibility == null) ? 0 : mCompatibility.hashCode());
+      result = prime * result
+          + ((mSchemaIncompatibilityType == null) ? 0 : mSchemaIncompatibilityType.hashCode());
+      result = prime * result + ((mWriterSubset == null) ? 0 : mWriterSubset.hashCode());
+      return result;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj) {
+        return true;
+      }
+      if (obj == null) {
+        return false;
+      }
+      if (getClass() != obj.getClass()) {
+        return false;
+      }
+      SchemaCompatibilityResult other = (SchemaCompatibilityResult) obj;
+      if (mMessage == null) {
+        if (other.mMessage != null) {
+          return false;
+        }
+      } else if (!mMessage.equals(other.mMessage)) {
+        return false;
+      }
+      if (mReaderSubset == null) {
+        if (other.mReaderSubset != null) {
+          return false;
+        }
+      } else if (!mReaderSubset.equals(other.mReaderSubset)) {
+        return false;
+      }
+      if (mCompatibility != other.mCompatibility) {
+        return false;
+      }
+      if (mSchemaIncompatibilityType != other.mSchemaIncompatibilityType) {
+        return false;
+      }
+      if (mWriterSubset == null) {
+        if (other.mWriterSubset != null) {
+          return false;
+        }
+      } else if (!mWriterSubset.equals(other.mWriterSubset)) {
+        return false;
+      }
+      return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public String toString() {
+      return String.format(
+          "SchemaCompatibilityDetails{compatibility:%s, type:%s, readerSubset:%s, writerSubset:%s, message:%s}",
+          mCompatibility, mSchemaIncompatibilityType, mReaderSubset, mWriterSubset, mMessage);
+    }
+  }
   // -----------------------------------------------------------------------------------------------
 
   /**
@@ -437,8 +649,8 @@ public class SchemaCompatibility {
    * Note: This class represents a one-way relationship from the reader to the writer schema.
    */
   public static final class SchemaPairCompatibility {
-    /** The type of this result. */
-    private final SchemaCompatibilityType mType;
+    /** The details of this result. */
+    private final SchemaCompatibilityResult mResult;
 
     /** Validated reader schema. */
     private final Schema mReader;
@@ -451,18 +663,17 @@ public class SchemaCompatibility {
 
     /**
      * Constructs a new instance.
-     *
-     * @param type of the schema compatibility.
+     * @param result of the schema compatibility.
      * @param reader schema that was validated.
      * @param writer schema that was validated.
      * @param description of this compatibility result.
      */
     public SchemaPairCompatibility(
-        SchemaCompatibilityType type,
+        SchemaCompatibilityResult result,
         Schema reader,
         Schema writer,
         String description) {
-      mType = type;
+      mResult = result;
       mReader = reader;
       mWriter = writer;
       mDescription = description;
@@ -470,11 +681,18 @@ public class SchemaCompatibility {
 
     /**
      * Gets the type of this result.
-     *
      * @return the type of this result.
      */
     public SchemaCompatibilityType getType() {
-      return mType;
+      return mResult.getCompatibility();
+    }
+
+    /**
+     * Gets more details about the compatibility, in particular if getType() is INCOMPATIBLE.
+     * @return the details of this compatibility check.
+     */
+    public SchemaCompatibilityResult getResult() {
+      return mResult;
     }
 
     /**
@@ -508,8 +726,8 @@ public class SchemaCompatibility {
     @Override
     public String toString() {
       return String.format(
-          "SchemaPairCompatibility{type:%s, readerSchema:%s, writerSchema:%s, description:%s}",
-          mType, mReader, mWriter, mDescription);
+          "SchemaPairCompatibility{result:%s, readerSchema:%s, writerSchema:%s, description:%s}",
+          mResult, mReader, mWriter, mDescription);
     }
 
     /** {@inheritDoc} */
@@ -517,7 +735,7 @@ public class SchemaCompatibility {
     public boolean equals(Object other) {
       if ((null != other) && (other instanceof SchemaPairCompatibility)) {
         final SchemaPairCompatibility result = (SchemaPairCompatibility) other;
-        return objectsEqual(result.mType, mType)
+        return objectsEqual(result.mResult, mResult)
             && objectsEqual(result.mReader, mReader)
             && objectsEqual(result.mWriter, mWriter)
             && objectsEqual(result.mDescription, mDescription);
@@ -529,7 +747,7 @@ public class SchemaCompatibility {
     /** {@inheritDoc} */
     @Override
     public int hashCode() {
-      return Arrays.hashCode(new Object[]{mType, mReader, mWriter, mDescription});
+      return Arrays.hashCode(new Object[] { mResult, mReader, mWriter, mDescription });
     }
   }
 

http://git-wip-us.apache.org/repos/asf/avro/blob/db8ed216/lang/java/avro/src/test/java/org/apache/avro/TestSchemaCompatibility.java
----------------------------------------------------------------------
diff --git a/lang/java/avro/src/test/java/org/apache/avro/TestSchemaCompatibility.java b/lang/java/avro/src/test/java/org/apache/avro/TestSchemaCompatibility.java
index e09d211..324c439 100644
--- a/lang/java/avro/src/test/java/org/apache/avro/TestSchemaCompatibility.java
+++ b/lang/java/avro/src/test/java/org/apache/avro/TestSchemaCompatibility.java
@@ -18,16 +18,56 @@
 package org.apache.avro;
 
 import static org.apache.avro.SchemaCompatibility.checkReaderWriterCompatibility;
+import static org.apache.avro.TestSchemas.A_DINT_B_DINT_RECORD1;
+import static org.apache.avro.TestSchemas.A_DINT_RECORD1;
+import static org.apache.avro.TestSchemas.A_INT_B_DINT_RECORD1;
+import static org.apache.avro.TestSchemas.A_INT_B_INT_RECORD1;
+import static org.apache.avro.TestSchemas.A_INT_RECORD1;
+import static org.apache.avro.TestSchemas.A_LONG_RECORD1;
+import static org.apache.avro.TestSchemas.BOOLEAN_SCHEMA;
+import static org.apache.avro.TestSchemas.BYTES_SCHEMA;
+import static org.apache.avro.TestSchemas.BYTES_UNION_SCHEMA;
+import static org.apache.avro.TestSchemas.DOUBLE_SCHEMA;
+import static org.apache.avro.TestSchemas.DOUBLE_UNION_SCHEMA;
+import static org.apache.avro.TestSchemas.EMPTY_RECORD1;
+import static org.apache.avro.TestSchemas.EMPTY_UNION_SCHEMA;
+import static org.apache.avro.TestSchemas.ENUM1_ABC_SCHEMA;
+import static org.apache.avro.TestSchemas.ENUM1_AB_SCHEMA;
+import static org.apache.avro.TestSchemas.ENUM1_BC_SCHEMA;
+import static org.apache.avro.TestSchemas.FIXED_4_BYTES;
+import static org.apache.avro.TestSchemas.FLOAT_SCHEMA;
+import static org.apache.avro.TestSchemas.FLOAT_UNION_SCHEMA;
+import static org.apache.avro.TestSchemas.INT_ARRAY_SCHEMA;
+import static org.apache.avro.TestSchemas.INT_FLOAT_UNION_SCHEMA;
+import static org.apache.avro.TestSchemas.INT_LIST_RECORD;
+import static org.apache.avro.TestSchemas.INT_LONG_FLOAT_DOUBLE_UNION_SCHEMA;
+import static org.apache.avro.TestSchemas.INT_LONG_UNION_SCHEMA;
+import static org.apache.avro.TestSchemas.INT_MAP_SCHEMA;
+import static org.apache.avro.TestSchemas.INT_SCHEMA;
+import static org.apache.avro.TestSchemas.INT_STRING_UNION_SCHEMA;
+import static org.apache.avro.TestSchemas.INT_UNION_SCHEMA;
+import static org.apache.avro.TestSchemas.LONG_ARRAY_SCHEMA;
+import static org.apache.avro.TestSchemas.LONG_LIST_RECORD;
+import static org.apache.avro.TestSchemas.LONG_MAP_SCHEMA;
+import static org.apache.avro.TestSchemas.LONG_SCHEMA;
+import static org.apache.avro.TestSchemas.LONG_UNION_SCHEMA;
+import static org.apache.avro.TestSchemas.NULL_SCHEMA;
+import static org.apache.avro.TestSchemas.STRING_ARRAY_SCHEMA;
+import static org.apache.avro.TestSchemas.STRING_INT_UNION_SCHEMA;
+import static org.apache.avro.TestSchemas.STRING_SCHEMA;
+import static org.apache.avro.TestSchemas.STRING_UNION_SCHEMA;
+import static org.apache.avro.TestSchemas.assertSchemaContains;
+import static org.apache.avro.TestSchemas.list;
 import static org.junit.Assert.assertEquals;
 
 import java.io.ByteArrayOutputStream;
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 
-import org.apache.avro.Schema.Field;
+import org.apache.avro.SchemaCompatibility.SchemaCompatibilityResult;
 import org.apache.avro.SchemaCompatibility.SchemaCompatibilityType;
+import org.apache.avro.SchemaCompatibility.SchemaIncompatibilityType;
 import org.apache.avro.SchemaCompatibility.SchemaPairCompatibility;
+import org.apache.avro.TestSchemas.ReaderWriter;
 import org.apache.avro.generic.GenericData.EnumSymbol;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericDatumWriter;
@@ -45,134 +85,6 @@ import org.slf4j.LoggerFactory;
 /** Unit-tests for SchemaCompatibility. */
 public class TestSchemaCompatibility {
   private static final Logger LOG = LoggerFactory.getLogger(TestSchemaCompatibility.class);
-
-  // -----------------------------------------------------------------------------------------------
-
-  private static final Schema NULL_SCHEMA = Schema.create(Schema.Type.NULL);
-  private static final Schema BOOLEAN_SCHEMA = Schema.create(Schema.Type.BOOLEAN);
-  private static final Schema INT_SCHEMA = Schema.create(Schema.Type.INT);
-  private static final Schema LONG_SCHEMA = Schema.create(Schema.Type.LONG);
-  private static final Schema FLOAT_SCHEMA = Schema.create(Schema.Type.FLOAT);
-  private static final Schema DOUBLE_SCHEMA = Schema.create(Schema.Type.DOUBLE);
-  private static final Schema STRING_SCHEMA = Schema.create(Schema.Type.STRING);
-  private static final Schema BYTES_SCHEMA = Schema.create(Schema.Type.BYTES);
-
-  private static final Schema INT_ARRAY_SCHEMA = Schema.createArray(INT_SCHEMA);
-  private static final Schema LONG_ARRAY_SCHEMA = Schema.createArray(LONG_SCHEMA);
-  private static final Schema STRING_ARRAY_SCHEMA = Schema.createArray(STRING_SCHEMA);
-
-  private static final Schema INT_MAP_SCHEMA = Schema.createMap(INT_SCHEMA);
-  private static final Schema LONG_MAP_SCHEMA = Schema.createMap(LONG_SCHEMA);
-  private static final Schema STRING_MAP_SCHEMA = Schema.createMap(STRING_SCHEMA);
-
-  private static final Schema ENUM1_AB_SCHEMA =
-      Schema.createEnum("Enum1", null, null, list("A", "B"));
-  private static final Schema ENUM1_ABC_SCHEMA =
-      Schema.createEnum("Enum1", null, null, list("A", "B", "C"));
-  private static final Schema ENUM1_BC_SCHEMA =
-      Schema.createEnum("Enum1", null, null, list("B", "C"));
-  private static final Schema ENUM2_AB_SCHEMA =
-      Schema.createEnum("Enum2", null, null, list("A", "B"));
-
-  private static final Schema EMPTY_UNION_SCHEMA =
-      Schema.createUnion(new ArrayList<Schema>());
-  private static final Schema NULL_UNION_SCHEMA =
-      Schema.createUnion(list(NULL_SCHEMA));
-  private static final Schema INT_UNION_SCHEMA =
-      Schema.createUnion(list(INT_SCHEMA));
-  private static final Schema LONG_UNION_SCHEMA =
-      Schema.createUnion(list(LONG_SCHEMA));
-  private static final Schema FLOAT_UNION_SCHEMA =
-      Schema.createUnion(list(FLOAT_SCHEMA));
-  private static final Schema DOUBLE_UNION_SCHEMA =
-      Schema.createUnion(list(DOUBLE_SCHEMA));
-  private static final Schema STRING_UNION_SCHEMA =
-      Schema.createUnion(list(STRING_SCHEMA));
-  private static final Schema BYTES_UNION_SCHEMA =
-      Schema.createUnion(list(BYTES_SCHEMA));
-  private static final Schema INT_STRING_UNION_SCHEMA =
-      Schema.createUnion(list(INT_SCHEMA, STRING_SCHEMA));
-  private static final Schema STRING_INT_UNION_SCHEMA =
-      Schema.createUnion(list(STRING_SCHEMA, INT_SCHEMA));
-  private static final Schema INT_FLOAT_UNION_SCHEMA =
-      Schema.createUnion(list(INT_SCHEMA, FLOAT_SCHEMA));
-  private static final Schema INT_LONG_UNION_SCHEMA =
-      Schema.createUnion(list(INT_SCHEMA, LONG_SCHEMA));
-  private static final Schema INT_LONG_FLOAT_DOUBLE_UNION_SCHEMA =
-      Schema.createUnion(list(INT_SCHEMA, LONG_SCHEMA, FLOAT_SCHEMA, DOUBLE_SCHEMA));
-
-  // Non recursive records:
-  private static final Schema EMPTY_RECORD1 =
-      Schema.createRecord("Record1", null, null, false);
-  private static final Schema EMPTY_RECORD2 =
-      Schema.createRecord("Record2", null, null, false);
-  private static final Schema A_INT_RECORD1 =
-      Schema.createRecord("Record1", null, null, false);
-  private static final Schema A_LONG_RECORD1 =
-      Schema.createRecord("Record1", null, null, false);
-  private static final Schema A_INT_B_INT_RECORD1 =
-      Schema.createRecord("Record1", null, null, false);
-  private static final Schema A_DINT_RECORD1 =  // DTYPE means TYPE with default value
-      Schema.createRecord("Record1", null, null, false);
-  private static final Schema A_INT_B_DINT_RECORD1 =
-      Schema.createRecord("Record1", null, null, false);
-  private static final Schema A_DINT_B_DINT_RECORD1 =
-      Schema.createRecord("Record1", null, null, false);
-  static {
-    EMPTY_RECORD1.setFields(Collections.<Field>emptyList());
-    EMPTY_RECORD2.setFields(Collections.<Field>emptyList());
-    A_INT_RECORD1.setFields(list(
-        new Field("a", INT_SCHEMA, null, null)));
-    A_LONG_RECORD1.setFields(list(
-        new Field("a", LONG_SCHEMA, null, null)));
-    A_INT_B_INT_RECORD1.setFields(list(
-        new Field("a", INT_SCHEMA, null, null),
-        new Field("b", INT_SCHEMA, null, null)));
-    A_DINT_RECORD1.setFields(list(
-        new Field("a", INT_SCHEMA, null, 0)));
-    A_INT_B_DINT_RECORD1.setFields(list(
-        new Field("a", INT_SCHEMA, null, null),
-        new Field("b", INT_SCHEMA, null, 0)));
-    A_DINT_B_DINT_RECORD1.setFields(list(
-        new Field("a", INT_SCHEMA, null, 0),
-        new Field("b", INT_SCHEMA, null, 0)));
-  }
-
-  // Recursive records
-  private static final Schema INT_LIST_RECORD =
-      Schema.createRecord("List", null, null, false);
-  private static final Schema LONG_LIST_RECORD =
-      Schema.createRecord("List", null, null, false);
-  static {
-    INT_LIST_RECORD.setFields(list(
-        new Field("head", INT_SCHEMA, null, null),
-        new Field("tail", INT_LIST_RECORD, null, null)));
-    LONG_LIST_RECORD.setFields(list(
-        new Field("head", LONG_SCHEMA, null, null),
-        new Field("tail", LONG_LIST_RECORD, null, null)));
-  }
-
-  // -----------------------------------------------------------------------------------------------
-
-  /** Reader/writer schema pair. */
-  private static final class ReaderWriter {
-    private final Schema mReader;
-    private final Schema mWriter;
-
-    public ReaderWriter(final Schema reader, final Schema writer) {
-      mReader = reader;
-      mWriter = writer;
-    }
-
-    public Schema getReader() {
-      return mReader;
-    }
-
-    public Schema getWriter() {
-      return mWriter;
-    }
-  }
-
   // -----------------------------------------------------------------------------------------------
 
   private static final Schema WRITER_SCHEMA = Schema.createRecord(list(
@@ -186,7 +98,7 @@ public class TestSchemaCompatibility {
     final Schema reader = Schema.createRecord(readerFields);
     final SchemaCompatibility.SchemaPairCompatibility expectedResult =
         new SchemaCompatibility.SchemaPairCompatibility(
-            SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE,
+            SchemaCompatibility.SchemaCompatibilityResult.compatible(),
             reader,
             WRITER_SCHEMA,
             SchemaCompatibility.READER_WRITER_COMPATIBLE_MESSAGE);
@@ -202,7 +114,7 @@ public class TestSchemaCompatibility {
     final Schema reader = Schema.createRecord(readerFields);
     final SchemaCompatibility.SchemaPairCompatibility expectedResult =
         new SchemaCompatibility.SchemaPairCompatibility(
-            SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE,
+            SchemaCompatibility.SchemaCompatibilityResult.compatible(),
             reader,
             WRITER_SCHEMA,
             SchemaCompatibility.READER_WRITER_COMPATIBLE_MESSAGE);
@@ -219,7 +131,7 @@ public class TestSchemaCompatibility {
     final Schema reader = Schema.createRecord(readerFields);
     final SchemaCompatibility.SchemaPairCompatibility expectedResult =
         new SchemaCompatibility.SchemaPairCompatibility(
-            SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE,
+            SchemaCompatibility.SchemaCompatibilityResult.compatible(),
             reader,
             WRITER_SCHEMA,
             SchemaCompatibility.READER_WRITER_COMPATIBLE_MESSAGE);
@@ -236,7 +148,7 @@ public class TestSchemaCompatibility {
     final Schema reader = Schema.createRecord(readerFields);
     final SchemaCompatibility.SchemaPairCompatibility expectedResult =
         new SchemaCompatibility.SchemaPairCompatibility(
-            SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE,
+            SchemaCompatibility.SchemaCompatibilityResult.compatible(),
             reader,
             WRITER_SCHEMA,
             SchemaCompatibility.READER_WRITER_COMPATIBLE_MESSAGE);
@@ -251,19 +163,20 @@ public class TestSchemaCompatibility {
         new Schema.Field("oldfield1", INT_SCHEMA, null, null),
         new Schema.Field("newfield1", INT_SCHEMA, null, null));
     final Schema reader = Schema.createRecord(readerFields);
-    final SchemaCompatibility.SchemaPairCompatibility expectedResult =
-        new SchemaCompatibility.SchemaPairCompatibility(
-            SchemaCompatibility.SchemaCompatibilityType.INCOMPATIBLE,
-            reader,
-            WRITER_SCHEMA,
-            String.format(
-                "Data encoded using writer schema:%n%s%n"
-                + "will or may fail to decode using reader schema:%n%s%n",
-                WRITER_SCHEMA.toString(true),
-                reader.toString(true)));
-
     // Test new field without default value.
-    assertEquals(expectedResult, checkReaderWriterCompatibility(reader, WRITER_SCHEMA));
+    SchemaPairCompatibility compatibility = checkReaderWriterCompatibility(reader, WRITER_SCHEMA);
+    assertEquals(SchemaCompatibility.SchemaCompatibilityType.INCOMPATIBLE, compatibility.getType());
+    assertEquals(SchemaCompatibility.SchemaCompatibilityResult.incompatible(
+        SchemaIncompatibilityType.READER_FIELD_MISSING_DEFAULT_VALUE, reader, WRITER_SCHEMA,
+        "newfield1"), compatibility.getResult());
+    assertEquals(
+        String.format(
+            "Data encoded using writer schema:%n%s%n"
+                + "will or may fail to decode using reader schema:%n%s%n",
+            WRITER_SCHEMA.toString(true), reader.toString(true)),
+        compatibility.getDescription());
+    assertEquals(reader, compatibility.getReader());
+    assertEquals(WRITER_SCHEMA, compatibility.getWriter());
   }
 
   @Test
@@ -272,13 +185,17 @@ public class TestSchemaCompatibility {
     final Schema invalidReader = Schema.createMap(STRING_SCHEMA);
     final SchemaCompatibility.SchemaPairCompatibility validResult =
         new SchemaCompatibility.SchemaPairCompatibility(
-            SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE,
+            SchemaCompatibility.SchemaCompatibilityResult.compatible(),
             validReader,
             STRING_ARRAY_SCHEMA,
             SchemaCompatibility.READER_WRITER_COMPATIBLE_MESSAGE);
     final SchemaCompatibility.SchemaPairCompatibility invalidResult =
         new SchemaCompatibility.SchemaPairCompatibility(
-            SchemaCompatibility.SchemaCompatibilityType.INCOMPATIBLE,
+            SchemaCompatibility.SchemaCompatibilityResult.incompatible(
+            SchemaIncompatibilityType.TYPE_MISMATCH,
+            invalidReader,
+            STRING_ARRAY_SCHEMA,
+            "reader type: MAP not compatible with writer type: ARRAY"),
             invalidReader,
             STRING_ARRAY_SCHEMA,
             String.format(
@@ -300,13 +217,17 @@ public class TestSchemaCompatibility {
     final Schema validReader = Schema.create(Schema.Type.STRING);
     final SchemaCompatibility.SchemaPairCompatibility validResult =
         new SchemaCompatibility.SchemaPairCompatibility(
-            SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE,
+            SchemaCompatibility.SchemaCompatibilityResult.compatible(),
             validReader,
             STRING_SCHEMA,
             SchemaCompatibility.READER_WRITER_COMPATIBLE_MESSAGE);
     final SchemaCompatibility.SchemaPairCompatibility invalidResult =
         new SchemaCompatibility.SchemaPairCompatibility(
-            SchemaCompatibility.SchemaCompatibilityType.INCOMPATIBLE,
+            SchemaCompatibility.SchemaCompatibilityResult.incompatible(
+            SchemaIncompatibilityType.TYPE_MISMATCH,
+            INT_SCHEMA,
+            STRING_SCHEMA,
+            "reader type: INT not compatible with writer type: STRING"),
             INT_SCHEMA,
             STRING_SCHEMA,
             String.format(
@@ -364,7 +285,7 @@ public class TestSchemaCompatibility {
 
       new ReaderWriter(ENUM1_AB_SCHEMA, ENUM1_AB_SCHEMA),
       new ReaderWriter(ENUM1_ABC_SCHEMA, ENUM1_AB_SCHEMA),
-      
+
       // String-to/from-bytes, introduced in Avro 1.7.7
       new ReaderWriter(STRING_SCHEMA, BYTES_SCHEMA),
       new ReaderWriter(BYTES_SCHEMA, STRING_SCHEMA),
@@ -398,6 +319,8 @@ public class TestSchemaCompatibility {
       new ReaderWriter(FLOAT_SCHEMA, FLOAT_UNION_SCHEMA),
       new ReaderWriter(INT_UNION_SCHEMA, INT_SCHEMA),
       new ReaderWriter(INT_SCHEMA, INT_UNION_SCHEMA),
+      // Fixed types
+      new ReaderWriter(FIXED_4_BYTES, FIXED_4_BYTES),
 
       // Tests involving records:
       new ReaderWriter(EMPTY_RECORD1, EMPTY_RECORD1),
@@ -427,58 +350,29 @@ public class TestSchemaCompatibility {
 
   // -----------------------------------------------------------------------------------------------
 
-  /** Collection of reader/writer schema pair that are incompatible. */
-  public static final List<ReaderWriter> INCOMPATIBLE_READER_WRITER_TEST_CASES = list(
-      new ReaderWriter(NULL_SCHEMA, INT_SCHEMA),
-      new ReaderWriter(NULL_SCHEMA, LONG_SCHEMA),
-
-      new ReaderWriter(BOOLEAN_SCHEMA, INT_SCHEMA),
-
-      new ReaderWriter(INT_SCHEMA, NULL_SCHEMA),
-      new ReaderWriter(INT_SCHEMA, BOOLEAN_SCHEMA),
-      new ReaderWriter(INT_SCHEMA, LONG_SCHEMA),
-      new ReaderWriter(INT_SCHEMA, FLOAT_SCHEMA),
-      new ReaderWriter(INT_SCHEMA, DOUBLE_SCHEMA),
-
-      new ReaderWriter(LONG_SCHEMA, FLOAT_SCHEMA),
-      new ReaderWriter(LONG_SCHEMA, DOUBLE_SCHEMA),
-
-      new ReaderWriter(FLOAT_SCHEMA, DOUBLE_SCHEMA),
-
-      new ReaderWriter(STRING_SCHEMA, BOOLEAN_SCHEMA),
-      new ReaderWriter(STRING_SCHEMA, INT_SCHEMA),
-
-      new ReaderWriter(BYTES_SCHEMA, NULL_SCHEMA),
-      new ReaderWriter(BYTES_SCHEMA, INT_SCHEMA),
-
-      new ReaderWriter(INT_ARRAY_SCHEMA, LONG_ARRAY_SCHEMA),
-      new ReaderWriter(INT_MAP_SCHEMA, INT_ARRAY_SCHEMA),
-      new ReaderWriter(INT_ARRAY_SCHEMA, INT_MAP_SCHEMA),
-      new ReaderWriter(INT_MAP_SCHEMA, LONG_MAP_SCHEMA),
-
-      new ReaderWriter(ENUM1_AB_SCHEMA, ENUM1_ABC_SCHEMA),
-      new ReaderWriter(ENUM1_BC_SCHEMA, ENUM1_ABC_SCHEMA),
-
-      new ReaderWriter(ENUM1_AB_SCHEMA, ENUM2_AB_SCHEMA),
-      new ReaderWriter(INT_SCHEMA, ENUM2_AB_SCHEMA),
-      new ReaderWriter(ENUM2_AB_SCHEMA, INT_SCHEMA),
-
-      // Tests involving unions:
-      new ReaderWriter(INT_UNION_SCHEMA, INT_STRING_UNION_SCHEMA),
-      new ReaderWriter(STRING_UNION_SCHEMA, INT_STRING_UNION_SCHEMA),
-      new ReaderWriter(FLOAT_SCHEMA, INT_LONG_FLOAT_DOUBLE_UNION_SCHEMA),
-      new ReaderWriter(LONG_SCHEMA, INT_FLOAT_UNION_SCHEMA),
-      new ReaderWriter(INT_SCHEMA, INT_FLOAT_UNION_SCHEMA),
-
-      new ReaderWriter(EMPTY_RECORD2, EMPTY_RECORD1),
-      new ReaderWriter(A_INT_RECORD1, EMPTY_RECORD1),
-      new ReaderWriter(A_INT_B_DINT_RECORD1, EMPTY_RECORD1),
-
-      new ReaderWriter(INT_LIST_RECORD, LONG_LIST_RECORD),
-
-      // Last check:
-      new ReaderWriter(NULL_SCHEMA, INT_SCHEMA)
-  );
+  /**
+   * The reader/writer pairs that are incompatible are now moved to specific test classes, one class
+   * per error case (for easier pinpointing of errors). The method to validate incompatibility is
+   * still here.
+   */
+  public static void validateIncompatibleSchemas(Schema reader, Schema writer,
+      SchemaIncompatibilityType incompatibility, String details) {
+    SchemaPairCompatibility compatibility = checkReaderWriterCompatibility(reader, writer);
+    SchemaCompatibilityResult compatibilityDetails = compatibility.getResult();
+    assertEquals(incompatibility, compatibilityDetails.getIncompatibility());
+    Schema readerSubset = compatibilityDetails.getReaderSubset();
+    Schema writerSubset = compatibilityDetails.getWriterSubset();
+    assertSchemaContains(readerSubset, reader);
+    assertSchemaContains(writerSubset, writer);
+    assertEquals(reader, compatibility.getReader());
+    assertEquals(writer, compatibility.getWriter());
+    assertEquals(details, compatibilityDetails.getMessage());
+    String description = String.format(
+        "Data encoded using writer schema:%n%s%n"
+            + "will or may fail to decode using reader schema:%n%s%n",
+        writer.toString(true), reader.toString(true));
+    assertEquals(description, compatibility.getDescription());
+  }
 
   // -----------------------------------------------------------------------------------------------
 
@@ -498,22 +392,6 @@ public class TestSchemaCompatibility {
     }
   }
 
-  /** Tests the reader/writer incompatibility validation. */
-  @Test
-  public void testReaderWriterIncompatibility() {
-    for (ReaderWriter readerWriter : INCOMPATIBLE_READER_WRITER_TEST_CASES) {
-      final Schema reader = readerWriter.getReader();
-      final Schema writer = readerWriter.getWriter();
-      LOG.debug("Testing incompatibility of reader {} with writer {}.", reader, writer);
-      final SchemaPairCompatibility result =
-          checkReaderWriterCompatibility(reader, writer);
-      assertEquals(String.format(
-          "Expecting reader %s to be incompatible with writer %s, but tested compatible.",
-          reader, writer),
-          SchemaCompatibilityType.INCOMPATIBLE, result.getType());
-    }
-  }
-
   // -----------------------------------------------------------------------------------------------
 
   /**
@@ -633,12 +511,4 @@ public class TestSchemaCompatibility {
           expectedDecodedDatum, decodedDatum);
     }
   }
-
-  /** Borrowed from the Guava library. */
-  private static <E> ArrayList<E> list(E... elements) {
-    final ArrayList<E> list = new ArrayList<E>();
-    Collections.addAll(list, elements);
-    return list;
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/avro/blob/db8ed216/lang/java/avro/src/test/java/org/apache/avro/TestSchemaCompatibilityFixedSizeMismatch.java
----------------------------------------------------------------------
diff --git a/lang/java/avro/src/test/java/org/apache/avro/TestSchemaCompatibilityFixedSizeMismatch.java b/lang/java/avro/src/test/java/org/apache/avro/TestSchemaCompatibilityFixedSizeMismatch.java
new file mode 100644
index 0000000..7403856
--- /dev/null
+++ b/lang/java/avro/src/test/java/org/apache/avro/TestSchemaCompatibilityFixedSizeMismatch.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro;
+
+import static org.apache.avro.TestSchemaCompatibility.validateIncompatibleSchemas;
+import static org.apache.avro.TestSchemas.FIXED_4_BYTES;
+import static org.apache.avro.TestSchemas.FIXED_8_BYTES;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.avro.SchemaCompatibility.SchemaIncompatibilityType;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+public class TestSchemaCompatibilityFixedSizeMismatch {
+
+  @Parameters(name = "r: {0} | w: {1}")
+  public static Iterable<Object[]> data() {
+    Object[][] fields = { //
+        { FIXED_4_BYTES, FIXED_8_BYTES, "expected: 8, found: 4" },
+        { FIXED_8_BYTES, FIXED_4_BYTES, "expected: 4, found: 8" } };
+    List<Object[]> list = new ArrayList<>(fields.length);
+    for (Object[] schemas : fields) {
+      list.add(schemas);
+    }
+    return list;
+  }
+
+  @Parameter(0)
+  public Schema reader;
+  @Parameter(1)
+  public Schema writer;
+  @Parameter(2)
+  public String details;
+
+  @Test
+  public void testFixedSizeMismatchSchemas() throws Exception {
+    validateIncompatibleSchemas(reader, writer, SchemaIncompatibilityType.FIXED_SIZE_MISMATCH,
+        details);
+  }
+}

http://git-wip-us.apache.org/repos/asf/avro/blob/db8ed216/lang/java/avro/src/test/java/org/apache/avro/TestSchemaCompatibilityMissingEnumSymbols.java
----------------------------------------------------------------------
diff --git a/lang/java/avro/src/test/java/org/apache/avro/TestSchemaCompatibilityMissingEnumSymbols.java b/lang/java/avro/src/test/java/org/apache/avro/TestSchemaCompatibilityMissingEnumSymbols.java
new file mode 100644
index 0000000..d457283
--- /dev/null
+++ b/lang/java/avro/src/test/java/org/apache/avro/TestSchemaCompatibilityMissingEnumSymbols.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro;
+
+import static org.apache.avro.TestSchemaCompatibility.validateIncompatibleSchemas;
+import static org.apache.avro.TestSchemas.ENUM1_ABC_SCHEMA;
+import static org.apache.avro.TestSchemas.ENUM1_AB_SCHEMA;
+import static org.apache.avro.TestSchemas.ENUM1_BC_SCHEMA;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.avro.SchemaCompatibility.SchemaIncompatibilityType;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+public class TestSchemaCompatibilityMissingEnumSymbols {
+
+  private static final Schema RECORD1_WITH_ENUM_AB = SchemaBuilder.record("Record1").fields() //
+      .name("field1").type(ENUM1_AB_SCHEMA).noDefault() //
+      .endRecord();
+  private static final Schema RECORD1_WITH_ENUM_ABC = SchemaBuilder.record("Record1").fields() //
+      .name("field1").type(ENUM1_ABC_SCHEMA).noDefault() //
+      .endRecord();
+
+  @Parameters(name = "r: {0} | w: {1}")
+  public static Iterable<Object[]> data() {
+    Object[][] fields = { //
+        { ENUM1_AB_SCHEMA, ENUM1_ABC_SCHEMA, "[C]" }, { ENUM1_BC_SCHEMA, ENUM1_ABC_SCHEMA, "[A]" },
+        { RECORD1_WITH_ENUM_AB, RECORD1_WITH_ENUM_ABC, "[C]" } };
+    List<Object[]> list = new ArrayList<>(fields.length);
+    for (Object[] schemas : fields) {
+      list.add(schemas);
+    }
+    return list;
+  }
+
+  @Parameter(0)
+  public Schema reader;
+  @Parameter(1)
+  public Schema writer;
+  @Parameter(2)
+  public String details;
+
+  @Test
+  public void testTypeMismatchSchemas() throws Exception {
+    validateIncompatibleSchemas(reader, writer, SchemaIncompatibilityType.MISSING_ENUM_SYMBOLS,
+        details);
+  }
+}

http://git-wip-us.apache.org/repos/asf/avro/blob/db8ed216/lang/java/avro/src/test/java/org/apache/avro/TestSchemaCompatibilityMissingUnionBranch.java
----------------------------------------------------------------------
diff --git a/lang/java/avro/src/test/java/org/apache/avro/TestSchemaCompatibilityMissingUnionBranch.java b/lang/java/avro/src/test/java/org/apache/avro/TestSchemaCompatibilityMissingUnionBranch.java
new file mode 100644
index 0000000..1a31078
--- /dev/null
+++ b/lang/java/avro/src/test/java/org/apache/avro/TestSchemaCompatibilityMissingUnionBranch.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro;
+
+import static org.apache.avro.TestSchemaCompatibility.validateIncompatibleSchemas;
+import static org.apache.avro.TestSchemas.BOOLEAN_SCHEMA;
+import static org.apache.avro.TestSchemas.BYTES_UNION_SCHEMA;
+import static org.apache.avro.TestSchemas.DOUBLE_UNION_SCHEMA;
+import static org.apache.avro.TestSchemas.ENUM1_AB_SCHEMA;
+import static org.apache.avro.TestSchemas.FIXED_4_BYTES;
+import static org.apache.avro.TestSchemas.FLOAT_UNION_SCHEMA;
+import static org.apache.avro.TestSchemas.INT_ARRAY_SCHEMA;
+import static org.apache.avro.TestSchemas.INT_LONG_FLOAT_DOUBLE_UNION_SCHEMA;
+import static org.apache.avro.TestSchemas.INT_MAP_SCHEMA;
+import static org.apache.avro.TestSchemas.INT_SCHEMA;
+import static org.apache.avro.TestSchemas.INT_STRING_UNION_SCHEMA;
+import static org.apache.avro.TestSchemas.INT_UNION_SCHEMA;
+import static org.apache.avro.TestSchemas.LONG_UNION_SCHEMA;
+import static org.apache.avro.TestSchemas.NULL_SCHEMA;
+import static org.apache.avro.TestSchemas.STRING_UNION_SCHEMA;
+import static org.apache.avro.TestSchemas.list;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.avro.SchemaCompatibility.SchemaIncompatibilityType;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+public class TestSchemaCompatibilityMissingUnionBranch {
+
+  private static final Schema RECORD1_WITH_INT = SchemaBuilder.record("Record1").fields() //
+      .name("field1").type(INT_SCHEMA).noDefault() //
+      .endRecord();
+  private static final Schema RECORD2_WITH_INT = SchemaBuilder.record("Record2").fields() //
+      .name("field1").type(INT_SCHEMA).noDefault() //
+      .endRecord();
+  private static final Schema UNION_INT_RECORD1 = Schema
+      .createUnion(list(INT_SCHEMA, RECORD1_WITH_INT));
+  private static final Schema UNION_INT_RECORD2 = Schema
+      .createUnion(list(INT_SCHEMA, RECORD2_WITH_INT));
+  private static final Schema UNION_INT_ENUM1_AB = Schema
+      .createUnion(list(INT_SCHEMA, ENUM1_AB_SCHEMA));
+  private static final Schema UNION_INT_FIXED_4_BYTES = Schema
+      .createUnion(list(INT_SCHEMA, FIXED_4_BYTES));
+  private static final Schema UNION_INT_BOOLEAN = Schema
+      .createUnion(list(INT_SCHEMA, BOOLEAN_SCHEMA));
+  private static final Schema UNION_INT_ARRAY_INT = Schema
+      .createUnion(list(INT_SCHEMA, INT_ARRAY_SCHEMA));
+  private static final Schema UNION_INT_MAP_INT = Schema
+      .createUnion(list(INT_SCHEMA, INT_MAP_SCHEMA));
+  private static final Schema UNION_INT_NULL = Schema.createUnion(list(INT_SCHEMA, NULL_SCHEMA));
+
+  @Parameters(name = "r: {0} | w: {1}")
+  public static Iterable<Object[]> data() {
+    Object[][] fields = { //
+        { INT_UNION_SCHEMA, INT_STRING_UNION_SCHEMA, "reader union lacking writer type: STRING" },
+        { STRING_UNION_SCHEMA, INT_STRING_UNION_SCHEMA, "reader union lacking writer type: INT" },
+        { INT_UNION_SCHEMA, UNION_INT_RECORD1, "reader union lacking writer type: RECORD" },
+        { INT_UNION_SCHEMA, UNION_INT_RECORD2, "reader union lacking writer type: RECORD" },
+        // more info in the subset schemas
+        { UNION_INT_RECORD1, UNION_INT_RECORD2, "reader union lacking writer type: RECORD" },
+        { INT_UNION_SCHEMA, UNION_INT_ENUM1_AB, "reader union lacking writer type: ENUM" },
+        { INT_UNION_SCHEMA, UNION_INT_FIXED_4_BYTES, "reader union lacking writer type: FIXED" },
+        { INT_UNION_SCHEMA, UNION_INT_BOOLEAN, "reader union lacking writer type: BOOLEAN" },
+        { INT_UNION_SCHEMA, LONG_UNION_SCHEMA, "reader union lacking writer type: LONG" },
+        { INT_UNION_SCHEMA, FLOAT_UNION_SCHEMA, "reader union lacking writer type: FLOAT" },
+        { INT_UNION_SCHEMA, DOUBLE_UNION_SCHEMA, "reader union lacking writer type: DOUBLE" },
+        { INT_UNION_SCHEMA, BYTES_UNION_SCHEMA, "reader union lacking writer type: BYTES" },
+        { INT_UNION_SCHEMA, UNION_INT_ARRAY_INT, "reader union lacking writer type: ARRAY" },
+        { INT_UNION_SCHEMA, UNION_INT_MAP_INT, "reader union lacking writer type: MAP" },
+        { INT_UNION_SCHEMA, UNION_INT_NULL, "reader union lacking writer type: NULL" },
+        { INT_UNION_SCHEMA, INT_LONG_FLOAT_DOUBLE_UNION_SCHEMA,
+            "reader union lacking writer type: LONG" }, };
+    List<Object[]> list = new ArrayList<>(fields.length);
+    for (Object[] schemas : fields) {
+      list.add(schemas);
+    }
+    return list;
+  }
+
+  @Parameter(0)
+  public Schema reader;
+  @Parameter(1)
+  public Schema writer;
+  @Parameter(2)
+  public String details;
+
+  @Test
+  public void testMissingUnionBranch() throws Exception {
+    validateIncompatibleSchemas(reader, writer, SchemaIncompatibilityType.MISSING_UNION_BRANCH,
+        details);
+  }
+}

http://git-wip-us.apache.org/repos/asf/avro/blob/db8ed216/lang/java/avro/src/test/java/org/apache/avro/TestSchemaCompatibilityNameMismatch.java
----------------------------------------------------------------------
diff --git a/lang/java/avro/src/test/java/org/apache/avro/TestSchemaCompatibilityNameMismatch.java b/lang/java/avro/src/test/java/org/apache/avro/TestSchemaCompatibilityNameMismatch.java
new file mode 100644
index 0000000..7d4bf6f
--- /dev/null
+++ b/lang/java/avro/src/test/java/org/apache/avro/TestSchemaCompatibilityNameMismatch.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro;
+
+import static org.apache.avro.TestSchemaCompatibility.validateIncompatibleSchemas;
+import static org.apache.avro.TestSchemas.EMPTY_RECORD1;
+import static org.apache.avro.TestSchemas.EMPTY_RECORD2;
+import static org.apache.avro.TestSchemas.ENUM1_AB_SCHEMA;
+import static org.apache.avro.TestSchemas.ENUM2_AB_SCHEMA;
+import static org.apache.avro.TestSchemas.FIXED_4_BYTES;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.avro.SchemaCompatibility.SchemaIncompatibilityType;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+public class TestSchemaCompatibilityNameMismatch {
+
+  private static final Schema FIXED_4_ANOTHER_NAME = Schema.createFixed("AnotherName", null, null,
+      4);
+  private static final Schema FIXED_4_NAMESPACE_V1 = Schema.createFixed("Fixed", null,
+      "org.apache.avro.tests.v_1_0", 4);
+  private static final Schema FIXED_4_NAMESPACE_V2 = Schema.createFixed("Fixed", null,
+      "org.apache.avro.tests.v_2_0", 4);
+
+  @Parameters(name = "r: {0} | w: {1}")
+  public static Iterable<Object[]> data() {
+    Object[][] fields = { //
+        { ENUM1_AB_SCHEMA, ENUM2_AB_SCHEMA, "expected: Enum2" },
+        { EMPTY_RECORD2, EMPTY_RECORD1, "expected: Record1" },
+        { FIXED_4_BYTES, FIXED_4_ANOTHER_NAME, "expected: AnotherName" }, { FIXED_4_NAMESPACE_V1,
+            FIXED_4_NAMESPACE_V2, "expected: org.apache.avro.tests.v_2_0.Fixed" } };
+    List<Object[]> list = new ArrayList<>(fields.length);
+    for (Object[] schemas : fields) {
+      list.add(schemas);
+    }
+    return list;
+  }
+
+  @Parameter(0)
+  public Schema reader;
+  @Parameter(1)
+  public Schema writer;
+  @Parameter(2)
+  public String details;
+
+  @Test
+  public void testNameMismatchSchemas() throws Exception {
+    validateIncompatibleSchemas(reader, writer, SchemaIncompatibilityType.NAME_MISMATCH, details);
+  }
+}

http://git-wip-us.apache.org/repos/asf/avro/blob/db8ed216/lang/java/avro/src/test/java/org/apache/avro/TestSchemaCompatibilityReaderFieldMissingDefaultValue.java
----------------------------------------------------------------------
diff --git a/lang/java/avro/src/test/java/org/apache/avro/TestSchemaCompatibilityReaderFieldMissingDefaultValue.java b/lang/java/avro/src/test/java/org/apache/avro/TestSchemaCompatibilityReaderFieldMissingDefaultValue.java
new file mode 100644
index 0000000..06a7dcb
--- /dev/null
+++ b/lang/java/avro/src/test/java/org/apache/avro/TestSchemaCompatibilityReaderFieldMissingDefaultValue.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro;
+
+import static org.apache.avro.TestSchemaCompatibility.validateIncompatibleSchemas;
+import static org.apache.avro.TestSchemas.A_INT_B_DINT_RECORD1;
+import static org.apache.avro.TestSchemas.A_INT_RECORD1;
+import static org.apache.avro.TestSchemas.EMPTY_RECORD1;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.avro.SchemaCompatibility.SchemaIncompatibilityType;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+public class TestSchemaCompatibilityReaderFieldMissingDefaultValue {
+
+  @Parameters(name = "r: {0} | w: {1}")
+  public static Iterable<Object[]> data() {
+    Object[][] fields = { //
+        { A_INT_RECORD1, EMPTY_RECORD1, "a" }, { A_INT_B_DINT_RECORD1, EMPTY_RECORD1, "a" } };
+    List<Object[]> list = new ArrayList<>(fields.length);
+    for (Object[] schemas : fields) {
+      list.add(schemas);
+    }
+    return list;
+  }
+
+  @Parameter(0)
+  public Schema reader;
+  @Parameter(1)
+  public Schema writer;
+  @Parameter(2)
+  public String details;
+
+  @Test
+  public void testReaderFieldMissingDefaultValueSchemas() throws Exception {
+    validateIncompatibleSchemas(reader, writer,
+        SchemaIncompatibilityType.READER_FIELD_MISSING_DEFAULT_VALUE, details);
+  }
+}

http://git-wip-us.apache.org/repos/asf/avro/blob/db8ed216/lang/java/avro/src/test/java/org/apache/avro/TestSchemaCompatibilityTypeMismatch.java
----------------------------------------------------------------------
diff --git a/lang/java/avro/src/test/java/org/apache/avro/TestSchemaCompatibilityTypeMismatch.java b/lang/java/avro/src/test/java/org/apache/avro/TestSchemaCompatibilityTypeMismatch.java
new file mode 100644
index 0000000..a8772a7
--- /dev/null
+++ b/lang/java/avro/src/test/java/org/apache/avro/TestSchemaCompatibilityTypeMismatch.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro;
+
+import static org.apache.avro.TestSchemaCompatibility.validateIncompatibleSchemas;
+import static org.apache.avro.TestSchemas.A_INT_RECORD1;
+import static org.apache.avro.TestSchemas.BOOLEAN_SCHEMA;
+import static org.apache.avro.TestSchemas.BYTES_SCHEMA;
+import static org.apache.avro.TestSchemas.DOUBLE_SCHEMA;
+import static org.apache.avro.TestSchemas.ENUM2_AB_SCHEMA;
+import static org.apache.avro.TestSchemas.FIXED_4_BYTES;
+import static org.apache.avro.TestSchemas.FLOAT_SCHEMA;
+import static org.apache.avro.TestSchemas.INT_ARRAY_SCHEMA;
+import static org.apache.avro.TestSchemas.INT_FLOAT_UNION_SCHEMA;
+import static org.apache.avro.TestSchemas.INT_LIST_RECORD;
+import static org.apache.avro.TestSchemas.INT_LONG_FLOAT_DOUBLE_UNION_SCHEMA;
+import static org.apache.avro.TestSchemas.INT_MAP_SCHEMA;
+import static org.apache.avro.TestSchemas.INT_SCHEMA;
+import static org.apache.avro.TestSchemas.LONG_ARRAY_SCHEMA;
+import static org.apache.avro.TestSchemas.LONG_LIST_RECORD;
+import static org.apache.avro.TestSchemas.LONG_MAP_SCHEMA;
+import static org.apache.avro.TestSchemas.LONG_SCHEMA;
+import static org.apache.avro.TestSchemas.NULL_SCHEMA;
+import static org.apache.avro.TestSchemas.STRING_SCHEMA;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.avro.SchemaCompatibility.SchemaIncompatibilityType;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+public class TestSchemaCompatibilityTypeMismatch {
+
+  @Parameters(name = "r: {0} | w: {1}")
+  public static Iterable<Object[]> data() {
+    Object[][] fields = { //
+        { NULL_SCHEMA, INT_SCHEMA, "reader type: NULL not compatible with writer type: INT" },
+        { NULL_SCHEMA, LONG_SCHEMA, "reader type: NULL not compatible with writer type: LONG" },
+
+        { BOOLEAN_SCHEMA, INT_SCHEMA, "reader type: BOOLEAN not compatible with writer type: INT" },
+
+        { INT_SCHEMA, NULL_SCHEMA, "reader type: INT not compatible with writer type: NULL" },
+        { INT_SCHEMA, BOOLEAN_SCHEMA, "reader type: INT not compatible with writer type: BOOLEAN" },
+        { INT_SCHEMA, LONG_SCHEMA, "reader type: INT not compatible with writer type: LONG" },
+        { INT_SCHEMA, FLOAT_SCHEMA, "reader type: INT not compatible with writer type: FLOAT" },
+        { INT_SCHEMA, DOUBLE_SCHEMA, "reader type: INT not compatible with writer type: DOUBLE" },
+
+        { LONG_SCHEMA, FLOAT_SCHEMA, "reader type: LONG not compatible with writer type: FLOAT" },
+        { LONG_SCHEMA, DOUBLE_SCHEMA, "reader type: LONG not compatible with writer type: DOUBLE" },
+
+        { FLOAT_SCHEMA, DOUBLE_SCHEMA,
+            "reader type: FLOAT not compatible with writer type: DOUBLE" },
+
+        { DOUBLE_SCHEMA, STRING_SCHEMA,
+            "reader type: DOUBLE not compatible with writer type: STRING" },
+
+        { FIXED_4_BYTES, STRING_SCHEMA,
+            "reader type: FIXED not compatible with writer type: STRING" },
+
+        { STRING_SCHEMA, BOOLEAN_SCHEMA,
+            "reader type: STRING not compatible with writer type: BOOLEAN" },
+        { STRING_SCHEMA, INT_SCHEMA, "reader type: STRING not compatible with writer type: INT" },
+
+        { BYTES_SCHEMA, NULL_SCHEMA, "reader type: BYTES not compatible with writer type: NULL" },
+        { BYTES_SCHEMA, INT_SCHEMA, "reader type: BYTES not compatible with writer type: INT" },
+
+        { A_INT_RECORD1, INT_SCHEMA, "reader type: RECORD not compatible with writer type: INT" },
+
+        { INT_ARRAY_SCHEMA, LONG_ARRAY_SCHEMA,
+            "reader type: INT not compatible with writer type: LONG" },
+        { INT_MAP_SCHEMA, INT_ARRAY_SCHEMA,
+            "reader type: MAP not compatible with writer type: ARRAY" },
+        { INT_ARRAY_SCHEMA, INT_MAP_SCHEMA,
+            "reader type: ARRAY not compatible with writer type: MAP" },
+        { INT_MAP_SCHEMA, LONG_MAP_SCHEMA,
+            "reader type: INT not compatible with writer type: LONG" },
+
+        { INT_SCHEMA, ENUM2_AB_SCHEMA, "reader type: INT not compatible with writer type: ENUM" },
+        { ENUM2_AB_SCHEMA, INT_SCHEMA, "reader type: ENUM not compatible with writer type: INT" },
+
+        { FLOAT_SCHEMA, INT_LONG_FLOAT_DOUBLE_UNION_SCHEMA,
+            "reader type: FLOAT not compatible with writer type: DOUBLE" },
+        { LONG_SCHEMA, INT_FLOAT_UNION_SCHEMA,
+            "reader type: LONG not compatible with writer type: FLOAT" },
+        { INT_SCHEMA, INT_FLOAT_UNION_SCHEMA,
+            "reader type: INT not compatible with writer type: FLOAT" },
+
+        { INT_LIST_RECORD, LONG_LIST_RECORD,
+            "reader type: INT not compatible with writer type: LONG" },
+
+        { NULL_SCHEMA, INT_SCHEMA, "reader type: NULL not compatible with writer type: INT" } };
+    List<Object[]> list = new ArrayList<>(fields.length);
+    for (Object[] schemas : fields) {
+      list.add(schemas);
+    }
+    return list;
+  }
+
+  @Parameter(0)
+  public Schema reader;
+  @Parameter(1)
+  public Schema writer;
+  @Parameter(2)
+  public String details;
+
+  @Test
+  public void testTypeMismatchSchemas() throws Exception {
+    validateIncompatibleSchemas(reader, writer, SchemaIncompatibilityType.TYPE_MISMATCH, details);
+  }
+}

http://git-wip-us.apache.org/repos/asf/avro/blob/db8ed216/lang/java/avro/src/test/java/org/apache/avro/TestSchemaValidation.java
----------------------------------------------------------------------
diff --git a/lang/java/avro/src/test/java/org/apache/avro/TestSchemaValidation.java b/lang/java/avro/src/test/java/org/apache/avro/TestSchemaValidation.java
index 838dc91..f0b8230 100644
--- a/lang/java/avro/src/test/java/org/apache/avro/TestSchemaValidation.java
+++ b/lang/java/avro/src/test/java/org/apache/avro/TestSchemaValidation.java
@@ -15,47 +15,229 @@
  * implied.  See the License for the specific language governing
  * permissions and limitations under the License.
  */
-
 package org.apache.avro;
 
+import static org.apache.avro.TestSchemas.A_DINT_B_DINT_RECORD1;
+import static org.apache.avro.TestSchemas.A_DINT_RECORD1;
+import static org.apache.avro.TestSchemas.A_INT_B_DINT_RECORD1;
+import static org.apache.avro.TestSchemas.A_INT_B_INT_RECORD1;
+import static org.apache.avro.TestSchemas.A_INT_RECORD1;
+import static org.apache.avro.TestSchemas.A_LONG_RECORD1;
+import static org.apache.avro.TestSchemas.BOOLEAN_SCHEMA;
+import static org.apache.avro.TestSchemas.BYTES_SCHEMA;
+import static org.apache.avro.TestSchemas.BYTES_UNION_SCHEMA;
+import static org.apache.avro.TestSchemas.DOUBLE_SCHEMA;
+import static org.apache.avro.TestSchemas.DOUBLE_UNION_SCHEMA;
+import static org.apache.avro.TestSchemas.EMPTY_RECORD1;
+import static org.apache.avro.TestSchemas.EMPTY_RECORD2;
+import static org.apache.avro.TestSchemas.EMPTY_UNION_SCHEMA;
+import static org.apache.avro.TestSchemas.ENUM1_ABC_SCHEMA;
+import static org.apache.avro.TestSchemas.ENUM1_AB_SCHEMA;
+import static org.apache.avro.TestSchemas.ENUM1_BC_SCHEMA;
+import static org.apache.avro.TestSchemas.ENUM2_AB_SCHEMA;
+import static org.apache.avro.TestSchemas.FLOAT_SCHEMA;
+import static org.apache.avro.TestSchemas.FLOAT_UNION_SCHEMA;
+import static org.apache.avro.TestSchemas.INT_ARRAY_SCHEMA;
+import static org.apache.avro.TestSchemas.INT_FLOAT_UNION_SCHEMA;
+import static org.apache.avro.TestSchemas.INT_LIST_RECORD;
+import static org.apache.avro.TestSchemas.INT_LONG_FLOAT_DOUBLE_UNION_SCHEMA;
+import static org.apache.avro.TestSchemas.INT_LONG_UNION_SCHEMA;
+import static org.apache.avro.TestSchemas.INT_MAP_SCHEMA;
+import static org.apache.avro.TestSchemas.INT_SCHEMA;
+import static org.apache.avro.TestSchemas.INT_STRING_UNION_SCHEMA;
+import static org.apache.avro.TestSchemas.INT_UNION_SCHEMA;
+import static org.apache.avro.TestSchemas.LONG_ARRAY_SCHEMA;
+import static org.apache.avro.TestSchemas.LONG_LIST_RECORD;
+import static org.apache.avro.TestSchemas.LONG_MAP_SCHEMA;
+import static org.apache.avro.TestSchemas.LONG_SCHEMA;
+import static org.apache.avro.TestSchemas.LONG_UNION_SCHEMA;
+import static org.apache.avro.TestSchemas.NULL_SCHEMA;
+import static org.apache.avro.TestSchemas.STRING_INT_UNION_SCHEMA;
+import static org.apache.avro.TestSchemas.STRING_SCHEMA;
+import static org.apache.avro.TestSchemas.STRING_UNION_SCHEMA;
+import static org.apache.avro.TestSchemas.list;
 import java.util.ArrayList;
 import java.util.Arrays;
-
+import java.util.Collections;
+import java.util.List;
+import org.apache.avro.TestSchemas.ReaderWriter;
 import org.apache.avro.reflect.ReflectData;
 import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 
 public class TestSchemaValidation {
 
-  SchemaValidatorBuilder builder = new SchemaValidatorBuilder();
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
+  /** Collection of reader/writer schema pair that are compatible. */
+  public static final List<ReaderWriter> COMPATIBLE_READER_WRITER_TEST_CASES = list(
+      new ReaderWriter(BOOLEAN_SCHEMA, BOOLEAN_SCHEMA),
+
+      new ReaderWriter(INT_SCHEMA, INT_SCHEMA),
+
+      new ReaderWriter(LONG_SCHEMA, INT_SCHEMA),
+      new ReaderWriter(LONG_SCHEMA, LONG_SCHEMA),
+
+      // Avro spec says INT/LONG can be promoted to FLOAT/DOUBLE.
+      // This is arguable as this causes a loss of precision.
+      new ReaderWriter(FLOAT_SCHEMA, INT_SCHEMA),
+      new ReaderWriter(FLOAT_SCHEMA, LONG_SCHEMA),
+      new ReaderWriter(DOUBLE_SCHEMA, LONG_SCHEMA),
+
+      new ReaderWriter(DOUBLE_SCHEMA, INT_SCHEMA),
+      new ReaderWriter(DOUBLE_SCHEMA, FLOAT_SCHEMA),
+
+      new ReaderWriter(STRING_SCHEMA, STRING_SCHEMA),
+
+      new ReaderWriter(BYTES_SCHEMA, BYTES_SCHEMA),
+
+      new ReaderWriter(INT_ARRAY_SCHEMA, INT_ARRAY_SCHEMA),
+      new ReaderWriter(LONG_ARRAY_SCHEMA, INT_ARRAY_SCHEMA),
+      new ReaderWriter(INT_MAP_SCHEMA, INT_MAP_SCHEMA),
+      new ReaderWriter(LONG_MAP_SCHEMA, INT_MAP_SCHEMA),
+
+      new ReaderWriter(ENUM1_AB_SCHEMA, ENUM1_AB_SCHEMA),
+      new ReaderWriter(ENUM1_ABC_SCHEMA, ENUM1_AB_SCHEMA),
+
+      // String-to/from-bytes, introduced in Avro 1.7.7
+      new ReaderWriter(STRING_SCHEMA, BYTES_SCHEMA),
+      new ReaderWriter(BYTES_SCHEMA, STRING_SCHEMA),
+
+      // Tests involving unions:
+      new ReaderWriter(EMPTY_UNION_SCHEMA, EMPTY_UNION_SCHEMA),
+      new ReaderWriter(INT_UNION_SCHEMA, INT_UNION_SCHEMA),
+      new ReaderWriter(INT_STRING_UNION_SCHEMA, STRING_INT_UNION_SCHEMA),
+      new ReaderWriter(INT_UNION_SCHEMA, EMPTY_UNION_SCHEMA),
+      new ReaderWriter(LONG_UNION_SCHEMA, INT_UNION_SCHEMA),
+      // float unions cannot read int or long unions
+      // new ReaderWriter(FLOAT_UNION_SCHEMA, INT_UNION_SCHEMA),
+      // new ReaderWriter(FLOAT_UNION_SCHEMA, LONG_UNION_SCHEMA),
+      new ReaderWriter(DOUBLE_UNION_SCHEMA, INT_UNION_SCHEMA),
+      new ReaderWriter(LONG_UNION_SCHEMA, EMPTY_UNION_SCHEMA),
+      new ReaderWriter(DOUBLE_UNION_SCHEMA, LONG_UNION_SCHEMA),
+      new ReaderWriter(FLOAT_UNION_SCHEMA, EMPTY_UNION_SCHEMA),
+      new ReaderWriter(DOUBLE_UNION_SCHEMA, FLOAT_UNION_SCHEMA),
+      new ReaderWriter(STRING_UNION_SCHEMA, EMPTY_UNION_SCHEMA),
+      new ReaderWriter(STRING_UNION_SCHEMA, BYTES_UNION_SCHEMA),
+      new ReaderWriter(BYTES_UNION_SCHEMA, EMPTY_UNION_SCHEMA),
+      new ReaderWriter(BYTES_UNION_SCHEMA, STRING_UNION_SCHEMA),
+      new ReaderWriter(DOUBLE_UNION_SCHEMA, INT_FLOAT_UNION_SCHEMA),
+
+      // Readers capable of reading all branches of a union are compatible
+      new ReaderWriter(FLOAT_SCHEMA, INT_FLOAT_UNION_SCHEMA),
+      new ReaderWriter(LONG_SCHEMA, INT_LONG_UNION_SCHEMA),
+      new ReaderWriter(DOUBLE_SCHEMA, INT_FLOAT_UNION_SCHEMA),
+      new ReaderWriter(DOUBLE_SCHEMA, INT_LONG_FLOAT_DOUBLE_UNION_SCHEMA),
+
+      // Special case of singleton unions:
+      new ReaderWriter(FLOAT_SCHEMA, FLOAT_UNION_SCHEMA),
+      new ReaderWriter(INT_UNION_SCHEMA, INT_SCHEMA),
+      new ReaderWriter(INT_SCHEMA, INT_UNION_SCHEMA),
+
+      // Tests involving records:
+      new ReaderWriter(EMPTY_RECORD1, EMPTY_RECORD1),
+      new ReaderWriter(EMPTY_RECORD1, A_INT_RECORD1),
+
+      new ReaderWriter(A_INT_RECORD1, A_INT_RECORD1),
+      new ReaderWriter(A_DINT_RECORD1, A_INT_RECORD1),
+      new ReaderWriter(A_DINT_RECORD1, A_DINT_RECORD1),
+      new ReaderWriter(A_INT_RECORD1, A_DINT_RECORD1),
+
+      new ReaderWriter(A_LONG_RECORD1, A_INT_RECORD1),
+
+      new ReaderWriter(A_INT_RECORD1, A_INT_B_INT_RECORD1),
+      new ReaderWriter(A_DINT_RECORD1, A_INT_B_INT_RECORD1),
+
+      new ReaderWriter(A_INT_B_DINT_RECORD1, A_INT_RECORD1),
+      new ReaderWriter(A_DINT_B_DINT_RECORD1, EMPTY_RECORD1),
+      new ReaderWriter(A_DINT_B_DINT_RECORD1, A_INT_RECORD1),
+      new ReaderWriter(A_INT_B_INT_RECORD1, A_DINT_B_DINT_RECORD1),
+
+      // The SchemaValidator, unlike the SchemaCompatibility class, cannot cope with recursive schemas
+      // See AVRO-2074
+      // new ReaderWriter(INT_LIST_RECORD, INT_LIST_RECORD),
+      // new ReaderWriter(LONG_LIST_RECORD, LONG_LIST_RECORD),
+      // new ReaderWriter(LONG_LIST_RECORD, INT_LIST_RECORD),
+
+      new ReaderWriter(NULL_SCHEMA, NULL_SCHEMA));
+
+  /** Collection of reader/writer schema pair that are incompatible. */
+  public static final List<ReaderWriter> INCOMPATIBLE_READER_WRITER_TEST_CASES = list(
+      new ReaderWriter(NULL_SCHEMA, INT_SCHEMA),
+      new ReaderWriter(NULL_SCHEMA, LONG_SCHEMA),
+
+      new ReaderWriter(BOOLEAN_SCHEMA, INT_SCHEMA),
+
+      new ReaderWriter(INT_SCHEMA, NULL_SCHEMA),
+      new ReaderWriter(INT_SCHEMA, BOOLEAN_SCHEMA),
+      new ReaderWriter(INT_SCHEMA, LONG_SCHEMA),
+      new ReaderWriter(INT_SCHEMA, FLOAT_SCHEMA),
+      new ReaderWriter(INT_SCHEMA, DOUBLE_SCHEMA),
 
+      new ReaderWriter(LONG_SCHEMA, FLOAT_SCHEMA),
+      new ReaderWriter(LONG_SCHEMA, DOUBLE_SCHEMA),
+
+      new ReaderWriter(FLOAT_SCHEMA, DOUBLE_SCHEMA),
+
+      new ReaderWriter(STRING_SCHEMA, BOOLEAN_SCHEMA),
+      new ReaderWriter(STRING_SCHEMA, INT_SCHEMA),
+
+      new ReaderWriter(BYTES_SCHEMA, NULL_SCHEMA),
+      new ReaderWriter(BYTES_SCHEMA, INT_SCHEMA),
+
+      new ReaderWriter(INT_ARRAY_SCHEMA, LONG_ARRAY_SCHEMA),
+      new ReaderWriter(INT_MAP_SCHEMA, INT_ARRAY_SCHEMA),
+      new ReaderWriter(INT_ARRAY_SCHEMA, INT_MAP_SCHEMA),
+      new ReaderWriter(INT_MAP_SCHEMA, LONG_MAP_SCHEMA),
+
+      new ReaderWriter(ENUM1_AB_SCHEMA, ENUM1_ABC_SCHEMA),
+      new ReaderWriter(ENUM1_BC_SCHEMA, ENUM1_ABC_SCHEMA),
+
+      new ReaderWriter(ENUM1_AB_SCHEMA, ENUM2_AB_SCHEMA),
+      new ReaderWriter(INT_SCHEMA, ENUM2_AB_SCHEMA),
+      new ReaderWriter(ENUM2_AB_SCHEMA, INT_SCHEMA),
+
+      // Tests involving unions:
+      new ReaderWriter(INT_UNION_SCHEMA, INT_STRING_UNION_SCHEMA),
+      new ReaderWriter(STRING_UNION_SCHEMA, INT_STRING_UNION_SCHEMA),
+      new ReaderWriter(FLOAT_SCHEMA, INT_LONG_FLOAT_DOUBLE_UNION_SCHEMA),
+      new ReaderWriter(LONG_SCHEMA, INT_FLOAT_UNION_SCHEMA),
+      new ReaderWriter(INT_SCHEMA, INT_FLOAT_UNION_SCHEMA),
+
+      new ReaderWriter(EMPTY_RECORD2, EMPTY_RECORD1),
+      new ReaderWriter(A_INT_RECORD1, EMPTY_RECORD1),
+      new ReaderWriter(A_INT_B_DINT_RECORD1, EMPTY_RECORD1),
+
+      new ReaderWriter(INT_LIST_RECORD, LONG_LIST_RECORD),
+
+      new ReaderWriter(NULL_SCHEMA, INT_SCHEMA));
+
+  SchemaValidatorBuilder builder = new SchemaValidatorBuilder();
   Schema rec = SchemaBuilder.record("test.Rec").fields()
       .name("a").type().intType().intDefault(1)
       .name("b").type().longType().noDefault()
       .endRecord();
-
   Schema rec2 = SchemaBuilder.record("test.Rec").fields()
       .name("a").type().intType().intDefault(1)
       .name("b").type().longType().noDefault()
       .name("c").type().intType().intDefault(0)
       .endRecord();
-
   Schema rec3 = SchemaBuilder.record("test.Rec").fields()
       .name("b").type().longType().noDefault()
       .name("c").type().intType().intDefault(0)
       .endRecord();
-
   Schema rec4 = SchemaBuilder.record("test.Rec").fields()
       .name("b").type().longType().noDefault()
       .name("c").type().intType().noDefault()
       .endRecord();
-
   Schema rec5 = SchemaBuilder.record("test.Rec").fields()
       .name("a").type().stringType().stringDefault("") // different type from original
       .name("b").type().longType().noDefault()
       .name("c").type().intType().intDefault(0)
       .endRecord();
-
   @Test
   public void testAllTypes() throws SchemaValidationException {
     Schema s = SchemaBuilder.record("r").fields()
@@ -179,6 +361,29 @@ public class TestSchemaValidation {
         union2, union1);
   }
 
+  @Test
+  public void testSchemaCompatibilitySuccesses()
+      throws SchemaValidationException {
+    // float-union-to-int/long-union does not work...
+    // and neither does recursive types
+    for (ReaderWriter tc : COMPATIBLE_READER_WRITER_TEST_CASES) {
+      testValidatorPasses(builder.canReadStrategy().validateAll(), tc.getReader(), tc.getWriter());
+    }
+  }
+
+  @Test
+  public void testSchemaCompatibilityFailures()
+      throws SchemaValidationException {
+    for (ReaderWriter tc : INCOMPATIBLE_READER_WRITER_TEST_CASES) {
+      Schema reader = tc.getReader();
+      Schema writer = tc.getWriter();
+      expectedException.expect(SchemaValidationException.class);
+      expectedException.expectMessage("Unable to read schema: \n" + writer.toString());
+      SchemaValidator validator = builder.canReadStrategy().validateAll();
+      validator.validate(reader, Collections.singleton(writer));
+    }
+  }
+
   private void testValidatorPasses(SchemaValidator validator,
       Schema schema, Schema... prev) throws SchemaValidationException {
     ArrayList<Schema> prior = new ArrayList<Schema>();
@@ -203,5 +408,4 @@ public class TestSchemaValidation {
     }
     Assert.assertTrue(threw);
   }
-
 }