You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by om...@apache.org on 2016/08/17 15:49:20 UTC

[2/2] orc git commit: ORC-54: Evolve schemas based on field name rather than index (Mark Wagner via omalley)

ORC-54: Evolve schemas based on field name rather than index (Mark Wagner
via omalley)

Fixes #40
Fixes #55

Signed-off-by: Owen O'Malley <om...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/orc/repo
Commit: http://git-wip-us.apache.org/repos/asf/orc/commit/9ba93782
Tree: http://git-wip-us.apache.org/repos/asf/orc/tree/9ba93782
Diff: http://git-wip-us.apache.org/repos/asf/orc/diff/9ba93782

Branch: refs/heads/master
Commit: 9ba937821ee68fbeea1a2273caa8daf905bbc7d7
Parents: 50729b8
Author: Mark Wagner <mw...@apache.org>
Authored: Tue Aug 16 15:30:51 2016 -0700
Committer: Owen O'Malley <om...@apache.org>
Committed: Wed Aug 17 08:47:19 2016 -0700

----------------------------------------------------------------------
 java/core/src/java/org/apache/orc/OrcConf.java  |   6 +
 java/core/src/java/org/apache/orc/Reader.java   |  37 ++
 .../org/apache/orc/impl/BitFieldReader.java     |   2 +-
 .../java/org/apache/orc/impl/ReaderImpl.java    |  13 +-
 .../org/apache/orc/impl/RecordReaderImpl.java   |  90 +--
 .../apache/orc/impl/RunLengthByteReader.java    |   2 +-
 .../org/apache/orc/impl/SchemaEvolution.java    | 221 +++++--
 .../test/org/apache/orc/TestOrcTimezone1.java   |   2 +-
 .../test/org/apache/orc/TestVectorOrcFile.java  |  12 +-
 .../apache/orc/impl/TestRecordReaderImpl.java   |   2 +-
 .../apache/orc/impl/TestSchemaEvolution.java    | 622 +++++++++++++++++--
 .../org/apache/orc/mapred/OrcInputFormat.java   |   5 +-
 .../apache/orc/mapred/TestOrcFileEvolution.java | 331 ++++++++++
 13 files changed, 1190 insertions(+), 155 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/orc/blob/9ba93782/java/core/src/java/org/apache/orc/OrcConf.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/OrcConf.java b/java/core/src/java/org/apache/orc/OrcConf.java
index 48a1bb4..ac8e3f0 100644
--- a/java/core/src/java/org/apache/orc/OrcConf.java
+++ b/java/core/src/java/org/apache/orc/OrcConf.java
@@ -82,6 +82,12 @@ public enum OrcConf {
       "If ORC reader encounters corrupt data, this value will be used to\n" +
           "determine whether to skip the corrupt data or throw exception.\n" +
           "The default behavior is to throw exception."),
+  TOLERATE_MISSING_SCHEMA("orc.tolerate.missing.schema",
+      "hive.exec.orc.tolerate.missing.schema",
+      true,
+      "Writers earlier than HIVE-4243 may have inaccurate schema metadata.\n"
+          + "This setting will enable best effort schema evolution rather\n"
+          + "than rejecting mismatched schemas"),
   MEMORY_POOL("orc.memory.pool", "hive.exec.orc.memory.pool", 0.5,
       "Maximum fraction of heap that can be used by ORC file writers"),
   DICTIONARY_KEY_SIZE_THRESHOLD("orc.dictionary.key.threshold",

http://git-wip-us.apache.org/repos/asf/orc/blob/9ba93782/java/core/src/java/org/apache/orc/Reader.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/Reader.java b/java/core/src/java/org/apache/orc/Reader.java
index c2d5235..c0e6109 100644
--- a/java/core/src/java/org/apache/orc/Reader.java
+++ b/java/core/src/java/org/apache/orc/Reader.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 
 /**
@@ -157,6 +158,17 @@ public interface Reader {
     private Boolean skipCorruptRecords = null;
     private TypeDescription schema = null;
     private DataReader dataReader = null;
+    private Boolean tolerateMissingSchema = null;
+
+    public Options() {
+      // PASS
+    }
+
+    public Options(Configuration conf) {
+      useZeroCopy = OrcConf.USE_ZEROCOPY.getBoolean(conf);
+      skipCorruptRecords = OrcConf.SKIP_CORRUPT_DATA.getBoolean(conf);
+      tolerateMissingSchema = OrcConf.TOLERATE_MISSING_SCHEMA.getBoolean(conf);
+    }
 
     /**
      * Set the list of columns to read.
@@ -225,6 +237,18 @@ public interface Reader {
       return this;
     }
 
+    /**
+     * Set whether to make a best effort to tolerate schema evolution for files
+     * which do not have an embedded schema because they were written with a'
+     * pre-HIVE-4243 writer.
+     * @param value the new tolerance flag
+     * @return this
+     */
+    public Options tolerateMissingSchema(boolean value) {
+      this.tolerateMissingSchema = value;
+      return this;
+    }
+
     public boolean[] getInclude() {
       return include;
     }
@@ -280,6 +304,7 @@ public interface Reader {
       result.useZeroCopy = useZeroCopy;
       result.skipCorruptRecords = skipCorruptRecords;
       result.dataReader = dataReader == null ? null : dataReader.clone();
+      result.tolerateMissingSchema = tolerateMissingSchema;
       return result;
     }
 
@@ -324,9 +349,21 @@ public interface Reader {
       buffer.append("}");
       return buffer.toString();
     }
+
+    public boolean getTolerateMissingSchema() {
+      return tolerateMissingSchema != null ? tolerateMissingSchema :
+          (Boolean) OrcConf.TOLERATE_MISSING_SCHEMA.getDefaultValue();
+    }
   }
 
   /**
+   * Create a default options object that can be customized for creating
+   * a RecordReader.
+   * @return a new default Options object
+   */
+  Options options();
+
+  /**
    * Create a RecordReader that reads everything with the default options.
    * @return a new RecordReader
    * @throws IOException

http://git-wip-us.apache.org/repos/asf/orc/blob/9ba93782/java/core/src/java/org/apache/orc/impl/BitFieldReader.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/BitFieldReader.java b/java/core/src/java/org/apache/orc/impl/BitFieldReader.java
index dda7355..2103306 100644
--- a/java/core/src/java/org/apache/orc/impl/BitFieldReader.java
+++ b/java/core/src/java/org/apache/orc/impl/BitFieldReader.java
@@ -34,7 +34,7 @@ public class BitFieldReader {
   private final int mask;
 
   public BitFieldReader(InStream input,
-      int bitSize) throws IOException {
+                        int bitSize) {
     this.input = new RunLengthByteReader(input);
     this.bitSize = bitSize;
     mask = (1 << bitSize) - 1;

http://git-wip-us.apache.org/repos/asf/orc/blob/9ba93782/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/ReaderImpl.java b/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
index baf1335..18aaf61 100644
--- a/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
@@ -568,8 +568,13 @@ public class ReaderImpl implements Reader {
   }
 
   @Override
+  public Options options() {
+    return new Options(conf);
+  }
+
+  @Override
   public RecordReader rows() throws IOException {
-    return rows(new Options());
+    return rows(options());
   }
 
   @Override
@@ -579,7 +584,11 @@ public class ReaderImpl implements Reader {
     // if included columns is null, then include all columns
     if (include == null) {
       options = options.clone();
-      include = new boolean[types.size()];
+      TypeDescription readSchema = options.getSchema();
+      if (readSchema == null) {
+        readSchema = schema;
+      }
+      include = new boolean[readSchema.getMaximumId() + 1];
       Arrays.fill(include, true);
       options.include(include);
     }

http://git-wip-us.apache.org/repos/asf/orc/blob/9ba93782/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java b/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
index 7aa6e71..8a492dc 100644
--- a/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
@@ -27,10 +27,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.orc.BloomFilterIO;
 import org.apache.orc.BooleanColumnStatistics;
-import org.apache.orc.Reader;
-import org.apache.orc.RecordReader;
-import org.apache.orc.TypeDescription;
 import org.apache.orc.ColumnStatistics;
 import org.apache.orc.CompressionCodec;
 import org.apache.orc.DataReader;
@@ -39,9 +37,13 @@ import org.apache.orc.DecimalColumnStatistics;
 import org.apache.orc.DoubleColumnStatistics;
 import org.apache.orc.IntegerColumnStatistics;
 import org.apache.orc.OrcConf;
+import org.apache.orc.OrcProto;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
 import org.apache.orc.StringColumnStatistics;
 import org.apache.orc.StripeInformation;
 import org.apache.orc.TimestampColumnStatistics;
+import org.apache.orc.TypeDescription;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.fs.Path;
@@ -50,7 +52,6 @@ import org.apache.hadoop.hive.common.io.DiskRangeList;
 import org.apache.hadoop.hive.common.io.DiskRangeList.CreateHelper;
 import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.orc.BloomFilterIO;
 import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue;
@@ -58,7 +59,6 @@ import org.apache.hadoop.hive.serde2.io.DateWritable;
 import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
 import org.apache.hadoop.hive.ql.util.TimestampUtils;
 import org.apache.hadoop.io.Text;
-import org.apache.orc.OrcProto;
 
 public class RecordReaderImpl implements RecordReader {
   static final Logger LOG = LoggerFactory.getLogger(RecordReaderImpl.class);
@@ -76,6 +76,7 @@ public class RecordReaderImpl implements RecordReader {
   private final int bufferSize;
   private final SchemaEvolution evolution;
   private final boolean[] included;
+  private final boolean[] writerIncluded;
   private final long rowIndexStride;
   private long rowInStripe = 0;
   private int currentStripe = -1;
@@ -95,17 +96,18 @@ public class RecordReaderImpl implements RecordReader {
   /**
    * Given a list of column names, find the given column and return the index.
    *
-   * @param columnNames the list of potential column names
+   * @param evolution the mapping from reader to file schema
    * @param columnName  the column name to look for
-   * @param rootColumn  offset the result with the rootColumn
-   * @return the column number or -1 if the column wasn't found
+   * @return the file column number or -1 if the column wasn't found
    */
-  static int findColumns(String[] columnNames,
-                         String columnName,
-                         int rootColumn) {
-    for(int i=0; i < columnNames.length; ++i) {
-      if (columnName.equals(columnNames[i])) {
-        return i + rootColumn;
+  static int findColumns(SchemaEvolution evolution,
+                         String columnName) {
+    TypeDescription readerSchema = evolution.getReaderBaseSchema();
+    List<String> fieldNames = readerSchema.getFieldNames();
+    List<TypeDescription> children = readerSchema.getChildren();
+    for (int i = 0; i < fieldNames.size(); ++i) {
+      if (columnName.equals(fieldNames.get(i))) {
+        return evolution.getFileType(children.get(i)).getId();
       }
     }
     return -1;
@@ -114,19 +116,17 @@ public class RecordReaderImpl implements RecordReader {
   /**
    * Find the mapping from predicate leaves to columns.
    * @param sargLeaves the search argument that we need to map
-   * @param columnNames the names of the columns
-   * @param rootColumn the offset of the top level row, which offsets the
-   *                   result
-   * @return an array mapping the sarg leaves to concrete column numbers
+   * @param evolution the mapping from reader to file schema
+   * @return an array mapping the sarg leaves to concrete column numbers in the
+   * file
    */
   public static int[] mapSargColumnsToOrcInternalColIdx(List<PredicateLeaf> sargLeaves,
-                             String[] columnNames,
-                             int rootColumn) {
+                             SchemaEvolution evolution) {
     int[] result = new int[sargLeaves.size()];
     Arrays.fill(result, -1);
     for(int i=0; i < result.length; ++i) {
       String colName = sargLeaves.get(i).getColumnName();
-      result[i] = findColumns(columnNames, colName, rootColumn);
+      result[i] = findColumns(evolution, colName);
     }
     return result;
   }
@@ -140,14 +140,15 @@ public class RecordReaderImpl implements RecordReader {
         LOG.info("Reader schema not provided -- using file schema " +
             fileReader.getSchema());
       }
-      evolution = new SchemaEvolution(fileReader.getSchema(), included);
+      evolution = new SchemaEvolution(fileReader.getSchema(), options);
     } else {
 
       // Now that we are creating a record reader for a file, validate that
       // the schema to read is compatible with the file schema.
       //
       evolution = new SchemaEvolution(fileReader.getSchema(),
-          options.getSchema(),included);
+                                      options.getSchema(),
+                                      options);
       if (LOG.isDebugEnabled() && evolution.hasConversion()) {
         LOG.debug("ORC file " + fileReader.path.toString() +
             " has data type conversion --\n" +
@@ -163,8 +164,9 @@ public class RecordReaderImpl implements RecordReader {
     this.rowIndexStride = fileReader.rowIndexStride;
     SearchArgument sarg = options.getSearchArgument();
     if (sarg != null && rowIndexStride != 0) {
-      sargApp = new SargApplier(sarg, options.getColumnNames(), rowIndexStride,
-          included.length, evolution);
+      sargApp = new SargApplier(sarg, options.getColumnNames(),
+                                rowIndexStride,
+                                included.length, evolution);
     } else {
       sargApp = null;
     }
@@ -210,6 +212,8 @@ public class RecordReaderImpl implements RecordReader {
 
     reader = TreeReaderFactory.createTreeReader(evolution.getReaderSchema(),
         evolution, included, skipCorrupt);
+
+    writerIncluded = evolution.getFileIncluded();
     indexes = new OrcProto.RowIndex[types.size()];
     bloomFilterIndices = new OrcProto.BloomFilterIndex[types.size()];
     advanceToNextRow(reader, 0L, true);
@@ -710,16 +714,21 @@ public class RecordReaderImpl implements RecordReader {
     private final boolean[] sargColumns;
     private SchemaEvolution evolution;
 
-    public SargApplier(SearchArgument sarg, String[] columnNames, long rowIndexStride,
-        int includedCount, final SchemaEvolution evolution) {
+    public SargApplier(SearchArgument sarg, String[] columnNames,
+                       long rowIndexStride,
+                       int includedCount,
+                       SchemaEvolution evolution) {
       this.sarg = sarg;
       sargLeaves = sarg.getLeaves();
-      filterColumns = mapSargColumnsToOrcInternalColIdx(sargLeaves, columnNames, 0);
+      filterColumns = mapSargColumnsToOrcInternalColIdx(sargLeaves,
+                                                        evolution);
       this.rowIndexStride = rowIndexStride;
-      // included will not be null, row options will fill the array with trues if null
+      // included will not be null, row options will fill the array with
+      // trues if null
       sargColumns = new boolean[includedCount];
       for (int i : filterColumns) {
-        // filter columns may have -1 as index which could be partition column in SARG.
+        // filter columns may have -1 as index which could be partition
+        // column in SARG.
         if (i > 0) {
           sargColumns[i] = true;
         }
@@ -797,7 +806,7 @@ public class RecordReaderImpl implements RecordReader {
     if (sargApp == null) {
       return null;
     }
-    readRowIndex(currentStripe, included, sargApp.sargColumns);
+    readRowIndex(currentStripe, writerIncluded, sargApp.sargColumns);
     return sargApp.pickRowGroups(stripes.get(currentStripe), indexes, bloomFilterIndices, false);
   }
 
@@ -840,7 +849,7 @@ public class RecordReaderImpl implements RecordReader {
     // if we haven't skipped the whole stripe, read the data
     if (rowInStripe < rowCountInStripe) {
       // if we aren't projecting columns or filtering rows, just read it all
-      if (included == null && includedRowGroups == null) {
+      if (isFullRead() && includedRowGroups == null) {
         readAllDataStreams(stripe);
       } else {
         readPartialDataStreams(stripe);
@@ -853,6 +862,15 @@ public class RecordReaderImpl implements RecordReader {
     }
   }
 
+  private boolean isFullRead() {
+    for (boolean isColumnPresent : writerIncluded){
+      if (!isColumnPresent){
+        return false;
+      }
+    }
+    return true;
+  }
+
   private StripeInformation beginReadStripe() throws IOException {
     StripeInformation stripe = stripes.get(currentStripe);
     stripeFooter = readStripeFooter(stripe);
@@ -942,7 +960,7 @@ public class RecordReaderImpl implements RecordReader {
     for (OrcProto.Stream streamDesc : streamDescriptions) {
       int column = streamDesc.getColumn();
       if ((includeColumn != null &&
-          (column < included.length && !includeColumn[column])) ||
+          (column < includeColumn.length && !includeColumn[column])) ||
           streamDesc.hasKind() &&
               (StreamName.getArea(streamDesc.getKind()) != StreamName.Area.DATA)) {
         streamOffset += streamDesc.getLength();
@@ -960,7 +978,7 @@ public class RecordReaderImpl implements RecordReader {
   private void readPartialDataStreams(StripeInformation stripe) throws IOException {
     List<OrcProto.Stream> streamList = stripeFooter.getStreamsList();
     DiskRangeList toRead = planReadPartialDataStreams(streamList,
-        indexes, included, includedRowGroups, codec != null,
+        indexes, writerIncluded, includedRowGroups, codec != null,
         stripeFooter.getColumnsList(), types, bufferSize, true);
     if (LOG.isDebugEnabled()) {
       LOG.debug("chunks = " + RecordReaderUtils.stringifyDiskRanges(toRead));
@@ -970,7 +988,7 @@ public class RecordReaderImpl implements RecordReader {
       LOG.debug("merge = " + RecordReaderUtils.stringifyDiskRanges(bufferChunks));
     }
 
-    createStreams(streamList, bufferChunks, included, codec, bufferSize, streams);
+    createStreams(streamList, bufferChunks, writerIncluded, codec, bufferSize, streams);
   }
 
   /**
@@ -1181,7 +1199,7 @@ public class RecordReaderImpl implements RecordReader {
       currentStripe = rightStripe;
       readStripe();
     }
-    readRowIndex(currentStripe, included, sargApp == null ? null : sargApp.sargColumns);
+    readRowIndex(currentStripe, writerIncluded, sargApp == null ? null : sargApp.sargColumns);
 
     // if we aren't to the right row yet, advance in the stripe.
     advanceToNextRow(reader, rowNumber, true);

http://git-wip-us.apache.org/repos/asf/orc/blob/9ba93782/java/core/src/java/org/apache/orc/impl/RunLengthByteReader.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/RunLengthByteReader.java b/java/core/src/java/org/apache/orc/impl/RunLengthByteReader.java
index 24bd051..0a75ca9 100644
--- a/java/core/src/java/org/apache/orc/impl/RunLengthByteReader.java
+++ b/java/core/src/java/org/apache/orc/impl/RunLengthByteReader.java
@@ -35,7 +35,7 @@ public class RunLengthByteReader {
   private int used = 0;
   private boolean repeat = false;
 
-  public RunLengthByteReader(InStream input) throws IOException {
+  public RunLengthByteReader(InStream input) {
     this.input = input;
   }
 

http://git-wip-us.apache.org/repos/asf/orc/blob/9ba93782/java/core/src/java/org/apache/orc/impl/SchemaEvolution.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/SchemaEvolution.java b/java/core/src/java/org/apache/orc/impl/SchemaEvolution.java
index fd5c7c1..364df9e 100644
--- a/java/core/src/java/org/apache/orc/impl/SchemaEvolution.java
+++ b/java/core/src/java/org/apache/orc/impl/SchemaEvolution.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,59 +20,132 @@ package org.apache.orc.impl;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
 
+import org.apache.orc.Reader;
 import org.apache.orc.TypeDescription;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Take the file types and the (optional) configuration column names/types and
- * see if there has been schema evolution.
+ * Infer and track the evolution between the schema as stored in the file and
+ * the schema that has been requested by the reader.
  */
 public class SchemaEvolution {
   // indexed by reader column id
   private final TypeDescription[] readerFileTypes;
   // indexed by reader column id
-  private final boolean[] included;
+  private final boolean[] readerIncluded;
+  // indexed by file column id
+  private final boolean[] fileIncluded;
   private final TypeDescription fileSchema;
   private final TypeDescription readerSchema;
   private boolean hasConversion = false;
+  private final boolean isAcid;
+
   // indexed by reader column id
   private final boolean[] ppdSafeConversion;
 
-  public SchemaEvolution(TypeDescription fileSchema, boolean[] includedCols) {
-    this(fileSchema, null, includedCols);
+  private static final Logger LOG =
+    LoggerFactory.getLogger(SchemaEvolution.class);
+  private static final Pattern missingMetadataPattern =
+    Pattern.compile("_col\\d+");
+
+  public static class IllegalEvolutionException extends RuntimeException {
+    public IllegalEvolutionException(String msg) {
+      super(msg);
+    }
+  }
+
+  public SchemaEvolution(TypeDescription fileSchema,
+                         Reader.Options options) {
+    this(fileSchema, null, options);
   }
 
   public SchemaEvolution(TypeDescription fileSchema,
                          TypeDescription readerSchema,
-                         boolean[] includedCols) {
-    this.included = includedCols == null ? null :
+                         Reader.Options options) {
+    boolean allowMissingMetadata = options.getTolerateMissingSchema();
+    boolean[] includedCols = options.getInclude();
+    this.readerIncluded = includedCols == null ? null :
       Arrays.copyOf(includedCols, includedCols.length);
+    this.fileIncluded = new boolean[fileSchema.getMaximumId() + 1];
     this.hasConversion = false;
     this.fileSchema = fileSchema;
+    isAcid = checkAcidSchema(fileSchema);
     if (readerSchema != null) {
-      if (checkAcidSchema(fileSchema)) {
+      if (isAcid) {
         this.readerSchema = createEventSchema(readerSchema);
       } else {
         this.readerSchema = readerSchema;
       }
-      this.readerFileTypes = new TypeDescription[this.readerSchema.getMaximumId() + 1];
-      buildConversionFileTypesArray(fileSchema, this.readerSchema);
+      this.readerFileTypes =
+        new TypeDescription[this.readerSchema.getMaximumId() + 1];
+      int positionalLevels = 0;
+      if (!hasColumnNames(isAcid? getBaseRow(fileSchema) : fileSchema)){
+        if (!this.fileSchema.equals(this.readerSchema)) {
+          if (!allowMissingMetadata) {
+            throw new RuntimeException("Found that schema metadata is missing"
+                + " from file. This is likely caused by"
+                + " a writer earlier than HIVE-4243. Will"
+                + " not try to reconcile schemas");
+          } else {
+            LOG.warn("Column names are missing from this file. This is"
+                + " caused by a writer earlier than HIVE-4243. The reader will"
+                + " reconcile schemas based on index. File type: " +
+                this.fileSchema + ", reader type: " + this.readerSchema);
+            positionalLevels = isAcid ? 2 : 1;
+          }
+        }
+      }
+      buildConversion(fileSchema, this.readerSchema, positionalLevels);
     } else {
       this.readerSchema = fileSchema;
-      this.readerFileTypes = new TypeDescription[this.readerSchema.getMaximumId() + 1];
-      buildSameSchemaFileTypesArray();
+      this.readerFileTypes =
+        new TypeDescription[this.readerSchema.getMaximumId() + 1];
+      buildIdentityConversion(this.readerSchema);
     }
     this.ppdSafeConversion = populatePpdSafeConversion();
   }
 
+  // Return true iff all fields have names like _col[0-9]+
+  private boolean hasColumnNames(TypeDescription fileSchema) {
+    if (fileSchema.getCategory() != TypeDescription.Category.STRUCT) {
+      return true;
+    }
+    for (String fieldName : fileSchema.getFieldNames()) {
+      if (!missingMetadataPattern.matcher(fieldName).matches()) {
+        return true;
+      }
+    }
+    return false;
+  }
+
   public TypeDescription getReaderSchema() {
     return readerSchema;
   }
 
   /**
+   * Returns the non-ACID (aka base) reader type description.
+   *
+   * @return the reader type ignoring the ACID rowid columns, if any
+   */
+  public TypeDescription getReaderBaseSchema() {
+    return isAcid ? getBaseRow(readerSchema) : readerSchema;
+  }
+
+  /**
+   * Does the file include ACID columns?
+   * @return is this an ACID file?
+   */
+  boolean isAcid() {
+    return isAcid;
+  }
+
+  /**
    * Is there Schema Evolution data type conversion?
    * @return
    */
@@ -93,6 +166,10 @@ public class SchemaEvolution {
     return readerFileTypes[id];
   }
 
+  public boolean[] getFileIncluded() {
+    return fileIncluded;
+  }
+
   /**
    * Check if column is safe for ppd evaluation
    * @param colId reader column id
@@ -100,10 +177,8 @@ public class SchemaEvolution {
    */
   public boolean isPPDSafeConversion(final int colId) {
     if (hasConversion()) {
-      if (colId < 0 || colId >= ppdSafeConversion.length) {
-        return false;
-      }
-      return ppdSafeConversion[colId];
+      return !(colId < 0 || colId >= ppdSafeConversion.length) &&
+          ppdSafeConversion[colId];
     }
 
     // when there is no schema evolution PPD is safe
@@ -137,11 +212,8 @@ public class SchemaEvolution {
     if (fileType.getCategory().isPrimitive()) {
       if (fileType.getCategory().equals(readerType.getCategory())) {
         // for decimals alone do equality check to not mess up with precision change
-        if (fileType.getCategory().equals(TypeDescription.Category.DECIMAL) &&
-            !fileType.equals(readerType)) {
-          return false;
-        }
-        return true;
+        return !(fileType.getCategory() == TypeDescription.Category.DECIMAL &&
+            !fileType.equals(readerType));
       }
 
       // only integer and string evolutions are safe
@@ -192,10 +264,22 @@ public class SchemaEvolution {
     return false;
   }
 
-  void buildConversionFileTypesArray(TypeDescription fileType,
-                                     TypeDescription readerType) {
+  /**
+   * Build the mapping from the file type to the reader type. For pre-HIVE-4243
+   * ORC files, the top level structure is matched using position within the
+   * row. Otherwise, structs fields are matched by name.
+   * @param fileType the type in the file
+   * @param readerType the type in the reader
+   * @param positionalLevels the number of structure levels that must be
+   *                         mapped by position rather than field name. Pre
+   *                         HIVE-4243 files have either 1 or 2 levels matched
+   *                         positionally depending on whether they are ACID.
+   */
+  void buildConversion(TypeDescription fileType,
+                       TypeDescription readerType,
+                       int positionalLevels) {
     // if the column isn't included, don't map it
-    if (included != null && !included[readerType.getId()]) {
+    if (readerIncluded != null && !readerIncluded[readerType.getId()]) {
       return;
     }
     boolean isOk = true;
@@ -239,8 +323,8 @@ public class SchemaEvolution {
           List<TypeDescription> readerChildren = readerType.getChildren();
           if (fileChildren.size() == readerChildren.size()) {
             for(int i=0; i < fileChildren.size(); ++i) {
-              buildConversionFileTypesArray(fileChildren.get(i),
-                                            readerChildren.get(i));
+              buildConversion(fileChildren.get(i),
+                              readerChildren.get(i), 0);
             }
           } else {
             isOk = false;
@@ -248,16 +332,38 @@ public class SchemaEvolution {
           break;
         }
         case STRUCT: {
-          // allow either side to have fewer fields than the other
-          List<TypeDescription> fileChildren = fileType.getChildren();
           List<TypeDescription> readerChildren = readerType.getChildren();
+          List<TypeDescription> fileChildren = fileType.getChildren();
           if (fileChildren.size() != readerChildren.size()) {
             hasConversion = true;
           }
-          int jointSize = Math.min(fileChildren.size(), readerChildren.size());
-          for(int i=0; i < jointSize; ++i) {
-            buildConversionFileTypesArray(fileChildren.get(i),
-                                          readerChildren.get(i));
+
+          if (positionalLevels == 0) {
+            List<String> readerFieldNames = readerType.getFieldNames();
+            List<String> fileFieldNames = fileType.getFieldNames();
+            Map<String, TypeDescription> fileTypesIdx = new HashMap<>();
+            for (int i = 0; i < fileFieldNames.size(); i++) {
+              fileTypesIdx.put(fileFieldNames.get(i), fileChildren.get(i));
+            }
+
+            for (int i = 0; i < readerFieldNames.size(); i++) {
+              String readerFieldName = readerFieldNames.get(i);
+              TypeDescription readerField = readerChildren.get(i);
+
+              TypeDescription fileField = fileTypesIdx.get(readerFieldName);
+              if (fileField == null) {
+                continue;
+              }
+
+              buildConversion(fileField, readerField, 0);
+            }
+          } else {
+            int jointSize = Math.min(fileChildren.size(),
+                                     readerChildren.size());
+            for (int i = 0; i < jointSize; ++i) {
+              buildConversion(fileChildren.get(i), readerChildren.get(i),
+                  positionalLevels - 1);
+            }
           }
           break;
         }
@@ -273,45 +379,31 @@ public class SchemaEvolution {
       hasConversion = true;
     }
     if (isOk) {
-      int id = readerType.getId();
-      if (readerFileTypes[id] != null) {
-        throw new RuntimeException("reader to file type entry already" +
-                                   " assigned");
-      }
-      readerFileTypes[id] = fileType;
+      readerFileTypes[readerType.getId()] = fileType;
+      fileIncluded[fileType.getId()] = true;
     } else {
-      throw new IllegalArgumentException(String.format
-                                         ("ORC does not support type" +
-                                          " conversion from file type %s" +
-                                          " (%d) to reader type %s (%d)",
-                                          fileType.toString(),
-                                          fileType.getId(),
-                                          readerType.toString(),
-                                          readerType.getId()));
+      throw new IllegalEvolutionException(
+          String.format("ORC does not support type conversion from file" +
+                        " type %s (%d) to reader type %s (%d)",
+                        fileType.toString(), fileType.getId(),
+                        readerType.toString(), readerType.getId()));
     }
   }
 
-  /**
-   * Use to make a reader to file type array when the schema is the same.
-   * @return
-   */
-  private void buildSameSchemaFileTypesArray() {
-    buildSameSchemaFileTypesArrayRecurse(readerSchema);
-  }
-
-  void buildSameSchemaFileTypesArrayRecurse(TypeDescription readerType) {
-    if (included != null && !included[readerType.getId()]) {
+  void buildIdentityConversion(TypeDescription readerType) {
+    int id = readerType.getId();
+    if (readerIncluded != null && !readerIncluded[id]) {
       return;
     }
-    int id = readerType.getId();
     if (readerFileTypes[id] != null) {
       throw new RuntimeException("reader to file type entry already assigned");
     }
     readerFileTypes[id] = readerType;
+    fileIncluded[id] = true;
     List<TypeDescription> children = readerType.getChildren();
     if (children != null) {
       for (TypeDescription child : children) {
-        buildSameSchemaFileTypesArrayRecurse(child);
+        buildIdentityConversion(child);
       }
     }
   }
@@ -341,8 +433,19 @@ public class SchemaEvolution {
     return result;
   }
 
+  /**
+   * Get the underlying base row from an ACID event struct.
+   * @param typeDescription the ACID event schema.
+   * @return the subtype for the real row
+   */
+  static TypeDescription getBaseRow(TypeDescription typeDescription) {
+    final int ACID_ROW_OFFSET = 5;
+    return typeDescription.getChildren().get(ACID_ROW_OFFSET);
+  }
+
   public static final List<String> acidEventFieldNames=
     new ArrayList<String>();
+
   static {
     acidEventFieldNames.add("operation");
     acidEventFieldNames.add("originalTransaction");

http://git-wip-us.apache.org/repos/asf/orc/blob/9ba93782/java/core/src/test/org/apache/orc/TestOrcTimezone1.java
----------------------------------------------------------------------
diff --git a/java/core/src/test/org/apache/orc/TestOrcTimezone1.java b/java/core/src/test/org/apache/orc/TestOrcTimezone1.java
index 72dc455..7be9aa5 100644
--- a/java/core/src/test/org/apache/orc/TestOrcTimezone1.java
+++ b/java/core/src/test/org/apache/orc/TestOrcTimezone1.java
@@ -170,7 +170,7 @@ public class TestOrcTimezone1 {
     boolean[] include = new boolean[schema.getMaximumId() + 1];
     include[schema.getChildren().get(col).getId()] = true;
     RecordReader rows = reader.rows
-        (new Reader.Options().include(include));
+        (reader.options().include(include));
     assertEquals(true, rows.nextBatch(batch));
     assertEquals(Timestamp.valueOf("2000-03-12 15:00:00"),
         ts.asScratchTimestamp(0));

http://git-wip-us.apache.org/repos/asf/orc/blob/9ba93782/java/core/src/test/org/apache/orc/TestVectorOrcFile.java
----------------------------------------------------------------------
diff --git a/java/core/src/test/org/apache/orc/TestVectorOrcFile.java b/java/core/src/test/org/apache/orc/TestVectorOrcFile.java
index 42d1176..af20d1f 100644
--- a/java/core/src/test/org/apache/orc/TestVectorOrcFile.java
+++ b/java/core/src/test/org/apache/orc/TestVectorOrcFile.java
@@ -1199,9 +1199,9 @@ public class TestVectorOrcFile {
 
     // read the contents and make sure they match
     RecordReader rows1 = reader.rows(
-        new Reader.Options().include(new boolean[]{true, true, false}));
+        reader.options().include(new boolean[]{true, true, false}));
     RecordReader rows2 = reader.rows(
-        new Reader.Options().include(new boolean[]{true, false, true}));
+        reader.options().include(new boolean[]{true, false, true}));
     r1 = new Random(1);
     r2 = new Random(2);
     VectorizedRowBatch batch1 = reader.getSchema().createRowBatch(1000);
@@ -1946,7 +1946,7 @@ public class TestVectorOrcFile {
     boolean[] columns = new boolean[reader.getStatistics().length];
     columns[5] = true; // long colulmn
     columns[9] = true; // text column
-    rows = reader.rows(new Reader.Options()
+    rows = reader.rows(reader.options()
         .range(offsetOfStripe2, offsetOfStripe4 - offsetOfStripe2)
         .include(columns));
     rows.seekToRow(lastRowOfStripe2);
@@ -2146,7 +2146,7 @@ public class TestVectorOrcFile {
           .lessThan("int1", PredicateLeaf.Type.LONG, 600000L)
         .end()
         .build();
-    RecordReader rows = reader.rows(new Reader.Options()
+    RecordReader rows = reader.rows(reader.options()
         .range(0L, Long.MAX_VALUE)
         .include(new boolean[]{true, true, true})
         .searchArgument(sarg, new String[]{null, "int1", "string1"}));
@@ -2171,7 +2171,7 @@ public class TestVectorOrcFile {
           .lessThan("int1", PredicateLeaf.Type.LONG, 0L)
         .end()
         .build();
-    rows = reader.rows(new Reader.Options()
+    rows = reader.rows(reader.options()
         .range(0L, Long.MAX_VALUE)
         .include(new boolean[]{true, true, true})
         .searchArgument(sarg, new String[]{null, "int1", "string1"}));
@@ -2187,7 +2187,7 @@ public class TestVectorOrcFile {
           .end()
         .end()
         .build();
-    rows = reader.rows(new Reader.Options()
+    rows = reader.rows(reader.options()
         .range(0L, Long.MAX_VALUE)
         .include(new boolean[]{true, true, true})
         .searchArgument(sarg, new String[]{null, "int1", "string1"}));

http://git-wip-us.apache.org/repos/asf/orc/blob/9ba93782/java/core/src/test/org/apache/orc/impl/TestRecordReaderImpl.java
----------------------------------------------------------------------
diff --git a/java/core/src/test/org/apache/orc/impl/TestRecordReaderImpl.java b/java/core/src/test/org/apache/orc/impl/TestRecordReaderImpl.java
index 01a5200..6d1955d 100644
--- a/java/core/src/test/org/apache/orc/impl/TestRecordReaderImpl.java
+++ b/java/core/src/test/org/apache/orc/impl/TestRecordReaderImpl.java
@@ -1687,7 +1687,7 @@ public class TestRecordReaderImpl {
     writer.close();
     Reader reader = OrcFile.createReader(path, OrcFile.readerOptions(conf));
 
-    RecordReader recordReader = reader.rows(new Reader.Options()
+    RecordReader recordReader = reader.rows(reader.options()
         .dataReader(mockedDataReader));
 
     recordReader.close();

http://git-wip-us.apache.org/repos/asf/orc/blob/9ba93782/java/core/src/test/org/apache/orc/impl/TestSchemaEvolution.java
----------------------------------------------------------------------
diff --git a/java/core/src/test/org/apache/orc/impl/TestSchemaEvolution.java b/java/core/src/test/org/apache/orc/impl/TestSchemaEvolution.java
index c28af94..f654fbe 100644
--- a/java/core/src/test/org/apache/orc/impl/TestSchemaEvolution.java
+++ b/java/core/src/test/org/apache/orc/impl/TestSchemaEvolution.java
@@ -17,12 +17,14 @@
  */
 package org.apache.orc.impl;
 
+import static junit.framework.TestCase.assertSame;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Arrays;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -45,6 +47,7 @@ public class TestSchemaEvolution {
   public TestName testCaseName = new TestName();
 
   Configuration conf;
+  Reader.Options options;
   Path testFilePath;
   FileSystem fs;
   Path workDir = new Path(System.getProperty("test.tmp.dir",
@@ -53,6 +56,7 @@ public class TestSchemaEvolution {
   @Before
   public void setup() throws Exception {
     conf = new Configuration();
+    options = new Reader.Options(conf);
     fs = FileSystem.getLocal(conf);
     testFilePath = new Path(workDir, "TestOrcFile." +
         testCaseName.getMethodName() + ".orc");
@@ -65,25 +69,26 @@ public class TestSchemaEvolution {
         .addField("f1", TypeDescription.createInt())
         .addField("f2", TypeDescription.createString())
         .addField("f3", TypeDescription.createDecimal().withPrecision(38).withScale(10));
-    SchemaEvolution same1 = new SchemaEvolution(fileStruct1, null);
+    SchemaEvolution same1 = new SchemaEvolution(fileStruct1, options);
     assertFalse(same1.hasConversion());
     TypeDescription readerStruct1 = TypeDescription.createStruct()
         .addField("f1", TypeDescription.createInt())
         .addField("f2", TypeDescription.createString())
         .addField("f3", TypeDescription.createDecimal().withPrecision(38).withScale(10));
-    SchemaEvolution both1 = new SchemaEvolution(fileStruct1, readerStruct1, null);
+    SchemaEvolution both1 = new SchemaEvolution(fileStruct1, readerStruct1, options);
     assertFalse(both1.hasConversion());
     TypeDescription readerStruct1diff = TypeDescription.createStruct()
         .addField("f1", TypeDescription.createLong())
         .addField("f2", TypeDescription.createString())
         .addField("f3", TypeDescription.createDecimal().withPrecision(38).withScale(10));
-    SchemaEvolution both1diff = new SchemaEvolution(fileStruct1, readerStruct1diff, null);
+    SchemaEvolution both1diff = new SchemaEvolution(fileStruct1, readerStruct1diff, options);
     assertTrue(both1diff.hasConversion());
     TypeDescription readerStruct1diffPrecision = TypeDescription.createStruct()
         .addField("f1", TypeDescription.createInt())
         .addField("f2", TypeDescription.createString())
         .addField("f3", TypeDescription.createDecimal().withPrecision(12).withScale(10));
-    SchemaEvolution both1diffPrecision = new SchemaEvolution(fileStruct1, readerStruct1diffPrecision, null);
+    SchemaEvolution both1diffPrecision = new SchemaEvolution(fileStruct1,
+        readerStruct1diffPrecision, options);
     assertTrue(both1diffPrecision.hasConversion());
   }
 
@@ -99,7 +104,7 @@ public class TestSchemaEvolution {
             .addField("f4", TypeDescription.createDouble())
             .addField("f5", TypeDescription.createBoolean()))
         .addField("f6", TypeDescription.createChar().withMaxLength(100));
-    SchemaEvolution same2 = new SchemaEvolution(fileStruct2, null);
+    SchemaEvolution same2 = new SchemaEvolution(fileStruct2, options);
     assertFalse(same2.hasConversion());
     TypeDescription readerStruct2 = TypeDescription.createStruct()
         .addField("f1", TypeDescription.createUnion()
@@ -111,7 +116,7 @@ public class TestSchemaEvolution {
             .addField("f4", TypeDescription.createDouble())
             .addField("f5", TypeDescription.createBoolean()))
         .addField("f6", TypeDescription.createChar().withMaxLength(100));
-    SchemaEvolution both2 = new SchemaEvolution(fileStruct2, readerStruct2, null);
+    SchemaEvolution both2 = new SchemaEvolution(fileStruct2, readerStruct2, options);
     assertFalse(both2.hasConversion());
     TypeDescription readerStruct2diff = TypeDescription.createStruct()
         .addField("f1", TypeDescription.createUnion()
@@ -123,7 +128,7 @@ public class TestSchemaEvolution {
             .addField("f4", TypeDescription.createDouble())
             .addField("f5", TypeDescription.createByte()))
         .addField("f6", TypeDescription.createChar().withMaxLength(100));
-    SchemaEvolution both2diff = new SchemaEvolution(fileStruct2, readerStruct2diff, null);
+    SchemaEvolution both2diff = new SchemaEvolution(fileStruct2, readerStruct2diff, options);
     assertTrue(both2diff.hasConversion());
     TypeDescription readerStruct2diffChar = TypeDescription.createStruct()
         .addField("f1", TypeDescription.createUnion()
@@ -135,7 +140,7 @@ public class TestSchemaEvolution {
             .addField("f4", TypeDescription.createDouble())
             .addField("f5", TypeDescription.createBoolean()))
         .addField("f6", TypeDescription.createChar().withMaxLength(80));
-    SchemaEvolution both2diffChar = new SchemaEvolution(fileStruct2, readerStruct2diffChar, null);
+    SchemaEvolution both2diffChar = new SchemaEvolution(fileStruct2, readerStruct2diffChar, options);
     assertTrue(both2diffChar.hasConversion());
   }
 
@@ -159,7 +164,7 @@ public class TestSchemaEvolution {
     Reader reader = OrcFile.createReader(testFilePath,
         OrcFile.readerOptions(conf).filesystem(fs));
     TypeDescription schemaOnRead = TypeDescription.createDouble();
-    RecordReader rows = reader.rows(new Reader.Options().schema(schemaOnRead));
+    RecordReader rows = reader.rows(reader.options().schema(schemaOnRead));
     batch = schemaOnRead.createRowBatch();
     rows.nextBatch(batch);
     assertEquals(74.72, ((DoubleColumnVector) batch.cols[0]).vector[0], 0.00000000001);
@@ -172,14 +177,14 @@ public class TestSchemaEvolution {
         .addField("f1", TypeDescription.createInt())
         .addField("f2", TypeDescription.createString())
         .addField("f3", TypeDescription.createDecimal().withPrecision(38).withScale(10));
-    SchemaEvolution same1 = new SchemaEvolution(fileStruct1, null);
+    SchemaEvolution same1 = new SchemaEvolution(fileStruct1, options);
     assertTrue(same1.isPPDSafeConversion(0));
     assertFalse(same1.hasConversion());
     TypeDescription readerStruct1 = TypeDescription.createStruct()
         .addField("f1", TypeDescription.createInt())
         .addField("f2", TypeDescription.createString())
         .addField("f3", TypeDescription.createDecimal().withPrecision(38).withScale(10));
-    SchemaEvolution both1 = new SchemaEvolution(fileStruct1, readerStruct1, null);
+    SchemaEvolution both1 = new SchemaEvolution(fileStruct1, readerStruct1, options);
     assertFalse(both1.hasConversion());
     assertTrue(both1.isPPDSafeConversion(0));
     assertTrue(both1.isPPDSafeConversion(1));
@@ -191,7 +196,7 @@ public class TestSchemaEvolution {
         .addField("f1", TypeDescription.createLong())
         .addField("f2", TypeDescription.createString())
         .addField("f3", TypeDescription.createDecimal().withPrecision(38).withScale(10));
-    SchemaEvolution both1diff = new SchemaEvolution(fileStruct1, readerStruct1diff, null);
+    SchemaEvolution both1diff = new SchemaEvolution(fileStruct1, readerStruct1diff, options);
     assertTrue(both1diff.hasConversion());
     assertFalse(both1diff.isPPDSafeConversion(0));
     assertTrue(both1diff.isPPDSafeConversion(1));
@@ -203,8 +208,9 @@ public class TestSchemaEvolution {
         .addField("f1", TypeDescription.createInt())
         .addField("f2", TypeDescription.createString())
         .addField("f3", TypeDescription.createDecimal().withPrecision(12).withScale(10));
-    SchemaEvolution both1diffPrecision = new SchemaEvolution(fileStruct1, readerStruct1diffPrecision,
-        new boolean[] {true, false, false, true});
+    options.include(new boolean[] {true, false, false, true});
+    SchemaEvolution both1diffPrecision = new SchemaEvolution(fileStruct1,
+        readerStruct1diffPrecision, options);
     assertTrue(both1diffPrecision.hasConversion());
     assertFalse(both1diffPrecision.isPPDSafeConversion(0));
     assertFalse(both1diffPrecision.isPPDSafeConversion(1)); // column not included
@@ -217,7 +223,8 @@ public class TestSchemaEvolution {
         .addField("f2", TypeDescription.createString())
         .addField("f3", TypeDescription.createDecimal().withPrecision(38).withScale(10))
         .addField("f4", TypeDescription.createBoolean());
-    both1 = new SchemaEvolution(fileStruct1, readerStruct1, null);
+    options.include(null);
+    both1 = new SchemaEvolution(fileStruct1, readerStruct1, options);
     assertTrue(both1.hasConversion());
     assertFalse(both1.isPPDSafeConversion(0));
     assertTrue(both1.isPPDSafeConversion(1));
@@ -231,13 +238,13 @@ public class TestSchemaEvolution {
     // byte -> short -> int -> long
     TypeDescription fileSchema = TypeDescription.createStruct()
         .addField("f1", TypeDescription.createByte());
-    SchemaEvolution schemaEvolution = new SchemaEvolution(fileSchema, null);
+    SchemaEvolution schemaEvolution = new SchemaEvolution(fileSchema, options);
     assertFalse(schemaEvolution.hasConversion());
 
     // byte -> short
     TypeDescription readerSchema = TypeDescription.createStruct()
         .addField("f1", TypeDescription.createShort());
-    schemaEvolution = new SchemaEvolution(fileSchema, readerSchema, null);
+    schemaEvolution = new SchemaEvolution(fileSchema, readerSchema, options);
     assertTrue(schemaEvolution.hasConversion());
     assertFalse(schemaEvolution.isPPDSafeConversion(0));
     assertTrue(schemaEvolution.isPPDSafeConversion(1));
@@ -245,7 +252,7 @@ public class TestSchemaEvolution {
     // byte -> int
     readerSchema = TypeDescription.createStruct()
         .addField("f1", TypeDescription.createInt());
-    schemaEvolution = new SchemaEvolution(fileSchema, readerSchema, null);
+    schemaEvolution = new SchemaEvolution(fileSchema, readerSchema, options);
     assertTrue(schemaEvolution.hasConversion());
     assertFalse(schemaEvolution.isPPDSafeConversion(0));
     assertTrue(schemaEvolution.isPPDSafeConversion(1));
@@ -253,7 +260,7 @@ public class TestSchemaEvolution {
     // byte -> long
     readerSchema = TypeDescription.createStruct()
         .addField("f1", TypeDescription.createLong());
-    schemaEvolution = new SchemaEvolution(fileSchema, readerSchema, null);
+    schemaEvolution = new SchemaEvolution(fileSchema, readerSchema, options);
     assertTrue(schemaEvolution.hasConversion());
     assertFalse(schemaEvolution.isPPDSafeConversion(0));
     assertTrue(schemaEvolution.isPPDSafeConversion(1));
@@ -261,13 +268,13 @@ public class TestSchemaEvolution {
     // short -> int -> long
     fileSchema = TypeDescription.createStruct()
         .addField("f1", TypeDescription.createShort());
-    schemaEvolution = new SchemaEvolution(fileSchema, null);
+    schemaEvolution = new SchemaEvolution(fileSchema, options);
     assertFalse(schemaEvolution.hasConversion());
 
     // unsafe conversion short -> byte
     readerSchema = TypeDescription.createStruct()
         .addField("f1", TypeDescription.createByte());
-    schemaEvolution = new SchemaEvolution(fileSchema, readerSchema, null);
+    schemaEvolution = new SchemaEvolution(fileSchema, readerSchema, options);
     assertTrue(schemaEvolution.hasConversion());
     assertFalse(schemaEvolution.isPPDSafeConversion(0));
     assertFalse(schemaEvolution.isPPDSafeConversion(1));
@@ -275,7 +282,7 @@ public class TestSchemaEvolution {
     // short -> int
     readerSchema = TypeDescription.createStruct()
         .addField("f1", TypeDescription.createInt());
-    schemaEvolution = new SchemaEvolution(fileSchema, readerSchema, null);
+    schemaEvolution = new SchemaEvolution(fileSchema, readerSchema, options);
     assertTrue(schemaEvolution.hasConversion());
     assertFalse(schemaEvolution.isPPDSafeConversion(0));
     assertTrue(schemaEvolution.isPPDSafeConversion(1));
@@ -283,7 +290,7 @@ public class TestSchemaEvolution {
     // short -> long
     readerSchema = TypeDescription.createStruct()
         .addField("f1", TypeDescription.createLong());
-    schemaEvolution = new SchemaEvolution(fileSchema, readerSchema, null);
+    schemaEvolution = new SchemaEvolution(fileSchema, readerSchema, options);
     assertTrue(schemaEvolution.hasConversion());
     assertFalse(schemaEvolution.isPPDSafeConversion(0));
     assertTrue(schemaEvolution.isPPDSafeConversion(1));
@@ -291,13 +298,13 @@ public class TestSchemaEvolution {
     // int -> long
     fileSchema = TypeDescription.createStruct()
         .addField("f1", TypeDescription.createInt());
-    schemaEvolution = new SchemaEvolution(fileSchema, null);
+    schemaEvolution = new SchemaEvolution(fileSchema, options);
     assertFalse(schemaEvolution.hasConversion());
 
     // unsafe conversion int -> byte
     readerSchema = TypeDescription.createStruct()
         .addField("f1", TypeDescription.createByte());
-    schemaEvolution = new SchemaEvolution(fileSchema, readerSchema, null);
+    schemaEvolution = new SchemaEvolution(fileSchema, readerSchema, options);
     assertTrue(schemaEvolution.hasConversion());
     assertFalse(schemaEvolution.isPPDSafeConversion(0));
     assertFalse(schemaEvolution.isPPDSafeConversion(1));
@@ -305,7 +312,7 @@ public class TestSchemaEvolution {
     // unsafe conversion int -> short
     readerSchema = TypeDescription.createStruct()
         .addField("f1", TypeDescription.createShort());
-    schemaEvolution = new SchemaEvolution(fileSchema, readerSchema, null);
+    schemaEvolution = new SchemaEvolution(fileSchema, readerSchema, options);
     assertTrue(schemaEvolution.hasConversion());
     assertFalse(schemaEvolution.isPPDSafeConversion(0));
     assertFalse(schemaEvolution.isPPDSafeConversion(1));
@@ -313,7 +320,7 @@ public class TestSchemaEvolution {
     // int -> long
     readerSchema = TypeDescription.createStruct()
         .addField("f1", TypeDescription.createLong());
-    schemaEvolution = new SchemaEvolution(fileSchema, readerSchema, null);
+    schemaEvolution = new SchemaEvolution(fileSchema, readerSchema, options);
     assertTrue(schemaEvolution.hasConversion());
     assertFalse(schemaEvolution.isPPDSafeConversion(0));
     assertTrue(schemaEvolution.isPPDSafeConversion(1));
@@ -321,14 +328,14 @@ public class TestSchemaEvolution {
     // long
     fileSchema = TypeDescription.createStruct()
         .addField("f1", TypeDescription.createLong());
-    schemaEvolution = new SchemaEvolution(fileSchema, null);
+    schemaEvolution = new SchemaEvolution(fileSchema, options);
     assertTrue(schemaEvolution.isPPDSafeConversion(0));
     assertFalse(schemaEvolution.hasConversion());
 
     // unsafe conversion long -> byte
     readerSchema = TypeDescription.createStruct()
         .addField("f1", TypeDescription.createByte());
-    schemaEvolution = new SchemaEvolution(fileSchema, readerSchema, null);
+    schemaEvolution = new SchemaEvolution(fileSchema, readerSchema, options);
     assertTrue(schemaEvolution.hasConversion());
     assertFalse(schemaEvolution.isPPDSafeConversion(0));
     assertFalse(schemaEvolution.isPPDSafeConversion(1));
@@ -336,7 +343,7 @@ public class TestSchemaEvolution {
     // unsafe conversion long -> short
     readerSchema = TypeDescription.createStruct()
         .addField("f1", TypeDescription.createShort());
-    schemaEvolution = new SchemaEvolution(fileSchema, readerSchema, null);
+    schemaEvolution = new SchemaEvolution(fileSchema, readerSchema, options);
     assertTrue(schemaEvolution.hasConversion());
     assertFalse(schemaEvolution.isPPDSafeConversion(0));
     assertFalse(schemaEvolution.isPPDSafeConversion(1));
@@ -344,7 +351,7 @@ public class TestSchemaEvolution {
     // unsafe conversion long -> int
     readerSchema = TypeDescription.createStruct()
         .addField("f1", TypeDescription.createInt());
-    schemaEvolution = new SchemaEvolution(fileSchema, readerSchema, null);
+    schemaEvolution = new SchemaEvolution(fileSchema, readerSchema, options);
     assertTrue(schemaEvolution.hasConversion());
     assertFalse(schemaEvolution.isPPDSafeConversion(0));
     assertFalse(schemaEvolution.isPPDSafeConversion(1));
@@ -352,7 +359,7 @@ public class TestSchemaEvolution {
     // invalid
     readerSchema = TypeDescription.createStruct()
         .addField("f1", TypeDescription.createString());
-    schemaEvolution = new SchemaEvolution(fileSchema, readerSchema, null);
+    schemaEvolution = new SchemaEvolution(fileSchema, readerSchema, options);
     assertTrue(schemaEvolution.hasConversion());
     assertFalse(schemaEvolution.isPPDSafeConversion(0));
     assertFalse(schemaEvolution.isPPDSafeConversion(1));
@@ -360,7 +367,7 @@ public class TestSchemaEvolution {
     // invalid
     readerSchema = TypeDescription.createStruct()
         .addField("f1", TypeDescription.createFloat());
-    schemaEvolution = new SchemaEvolution(fileSchema, readerSchema, null);
+    schemaEvolution = new SchemaEvolution(fileSchema, readerSchema, options);
     assertTrue(schemaEvolution.hasConversion());
     assertFalse(schemaEvolution.isPPDSafeConversion(0));
     assertFalse(schemaEvolution.isPPDSafeConversion(1));
@@ -368,7 +375,7 @@ public class TestSchemaEvolution {
     // invalid
     readerSchema = TypeDescription.createStruct()
         .addField("f1", TypeDescription.createTimestamp());
-    schemaEvolution = new SchemaEvolution(fileSchema, readerSchema, null);
+    schemaEvolution = new SchemaEvolution(fileSchema, readerSchema, options);
     assertTrue(schemaEvolution.hasConversion());
     assertFalse(schemaEvolution.isPPDSafeConversion(0));
     assertFalse(schemaEvolution.isPPDSafeConversion(1));
@@ -378,14 +385,14 @@ public class TestSchemaEvolution {
   public void testSafePpdEvaluationForStrings() throws IOException {
     TypeDescription fileSchema = TypeDescription.createStruct()
         .addField("f1", TypeDescription.createString());
-    SchemaEvolution schemaEvolution = new SchemaEvolution(fileSchema, null);
+    SchemaEvolution schemaEvolution = new SchemaEvolution(fileSchema, options);
     assertTrue(schemaEvolution.isPPDSafeConversion(0));
     assertFalse(schemaEvolution.hasConversion());
 
     // string -> char
     TypeDescription readerSchema = TypeDescription.createStruct()
         .addField("f1", TypeDescription.createChar());
-    schemaEvolution = new SchemaEvolution(fileSchema, readerSchema, null);
+    schemaEvolution = new SchemaEvolution(fileSchema, readerSchema, options);
     assertTrue(schemaEvolution.hasConversion());
     assertFalse(schemaEvolution.isPPDSafeConversion(0));
     assertFalse(schemaEvolution.isPPDSafeConversion(1));
@@ -393,21 +400,21 @@ public class TestSchemaEvolution {
     // string -> varchar
     readerSchema = TypeDescription.createStruct()
         .addField("f1", TypeDescription.createVarchar());
-    schemaEvolution = new SchemaEvolution(fileSchema, readerSchema, null);
+    schemaEvolution = new SchemaEvolution(fileSchema, readerSchema, options);
     assertTrue(schemaEvolution.hasConversion());
     assertFalse(schemaEvolution.isPPDSafeConversion(0));
     assertTrue(schemaEvolution.isPPDSafeConversion(1));
 
     fileSchema = TypeDescription.createStruct()
         .addField("f1", TypeDescription.createChar());
-    schemaEvolution = new SchemaEvolution(fileSchema, null);
+    schemaEvolution = new SchemaEvolution(fileSchema, options);
     assertTrue(schemaEvolution.isPPDSafeConversion(0));
     assertFalse(schemaEvolution.hasConversion());
 
     // char -> string
     readerSchema = TypeDescription.createStruct()
         .addField("f1", TypeDescription.createString());
-    schemaEvolution = new SchemaEvolution(fileSchema, readerSchema, null);
+    schemaEvolution = new SchemaEvolution(fileSchema, readerSchema, options);
     assertTrue(schemaEvolution.hasConversion());
     assertFalse(schemaEvolution.isPPDSafeConversion(0));
     assertFalse(schemaEvolution.isPPDSafeConversion(1));
@@ -415,21 +422,21 @@ public class TestSchemaEvolution {
     // char -> varchar
     readerSchema = TypeDescription.createStruct()
         .addField("f1", TypeDescription.createVarchar());
-    schemaEvolution = new SchemaEvolution(fileSchema, readerSchema, null);
+    schemaEvolution = new SchemaEvolution(fileSchema, readerSchema, options);
     assertTrue(schemaEvolution.hasConversion());
     assertFalse(schemaEvolution.isPPDSafeConversion(0));
     assertFalse(schemaEvolution.isPPDSafeConversion(1));
 
     fileSchema = TypeDescription.createStruct()
         .addField("f1", TypeDescription.createVarchar());
-    schemaEvolution = new SchemaEvolution(fileSchema, null);
+    schemaEvolution = new SchemaEvolution(fileSchema, options);
     assertTrue(schemaEvolution.isPPDSafeConversion(0));
     assertFalse(schemaEvolution.hasConversion());
 
     // varchar -> string
     readerSchema = TypeDescription.createStruct()
         .addField("f1", TypeDescription.createString());
-    schemaEvolution = new SchemaEvolution(fileSchema, readerSchema, null);
+    schemaEvolution = new SchemaEvolution(fileSchema, readerSchema, options);
     assertTrue(schemaEvolution.hasConversion());
     assertFalse(schemaEvolution.isPPDSafeConversion(0));
     assertTrue(schemaEvolution.isPPDSafeConversion(1));
@@ -437,7 +444,7 @@ public class TestSchemaEvolution {
     // varchar -> char
     readerSchema = TypeDescription.createStruct()
         .addField("f1", TypeDescription.createChar());
-    schemaEvolution = new SchemaEvolution(fileSchema, readerSchema, null);
+    schemaEvolution = new SchemaEvolution(fileSchema, readerSchema, options);
     assertTrue(schemaEvolution.hasConversion());
     assertFalse(schemaEvolution.isPPDSafeConversion(0));
     assertFalse(schemaEvolution.isPPDSafeConversion(1));
@@ -445,7 +452,7 @@ public class TestSchemaEvolution {
     // invalid
     readerSchema = TypeDescription.createStruct()
         .addField("f1", TypeDescription.createDecimal());
-    schemaEvolution = new SchemaEvolution(fileSchema, readerSchema, null);
+    schemaEvolution = new SchemaEvolution(fileSchema, readerSchema, options);
     assertTrue(schemaEvolution.hasConversion());
     assertFalse(schemaEvolution.isPPDSafeConversion(0));
     assertFalse(schemaEvolution.isPPDSafeConversion(1));
@@ -453,7 +460,7 @@ public class TestSchemaEvolution {
     // invalid
     readerSchema = TypeDescription.createStruct()
         .addField("f1", TypeDescription.createDate());
-    schemaEvolution = new SchemaEvolution(fileSchema, readerSchema, null);
+    schemaEvolution = new SchemaEvolution(fileSchema, readerSchema, options);
     assertTrue(schemaEvolution.hasConversion());
     assertFalse(schemaEvolution.isPPDSafeConversion(0));
     assertFalse(schemaEvolution.isPPDSafeConversion(1));
@@ -461,9 +468,532 @@ public class TestSchemaEvolution {
     // invalid
     readerSchema = TypeDescription.createStruct()
         .addField("f1", TypeDescription.createInt());
-    schemaEvolution = new SchemaEvolution(fileSchema, readerSchema, null);
+    schemaEvolution = new SchemaEvolution(fileSchema, readerSchema, options);
     assertTrue(schemaEvolution.hasConversion());
     assertFalse(schemaEvolution.isPPDSafeConversion(0));
     assertFalse(schemaEvolution.isPPDSafeConversion(1));
   }
+
+  private boolean[] includeAll(TypeDescription readerType) {
+    int numColumns = readerType.getMaximumId() + 1;
+    boolean[] result = new boolean[numColumns];
+    Arrays.fill(result, true);
+    return result;
+  }
+
+  @Test
+  public void testAddFieldToEnd() {
+    TypeDescription fileType =
+        TypeDescription.fromString("struct<a:int,b:string>");
+    TypeDescription readerType =
+        TypeDescription.fromString("struct<a:int,b:string,c:double>");
+    boolean[] included = includeAll(readerType);
+    options.tolerateMissingSchema(false);
+    SchemaEvolution transition =
+        new SchemaEvolution(fileType, readerType, options.include(included));
+
+    // a -> a
+    TypeDescription reader = readerType.getChildren().get(0);
+    TypeDescription mapped = transition.getFileType(reader);
+    TypeDescription original = fileType.getChildren().get(0);
+    assertSame(original, mapped);
+
+    // b -> b
+    reader = readerType.getChildren().get(1);
+    mapped = transition.getFileType(reader);
+    original = fileType.getChildren().get(1);
+    assertSame(original, mapped);
+
+    // c -> null
+    reader = readerType.getChildren().get(2);
+    mapped = transition.getFileType(reader);
+    original = null;
+    assertSame(original, mapped);
+  }
+
+  @Test
+  public void testAddFieldBeforeEnd() {
+    TypeDescription fileType =
+        TypeDescription.fromString("struct<a:int,b:string>");
+    TypeDescription readerType =
+        TypeDescription.fromString("struct<a:int,c:double,b:string>");
+    boolean[] included = includeAll(readerType);
+    options.tolerateMissingSchema(false);
+    SchemaEvolution transition =
+        new SchemaEvolution(fileType, readerType, options.include(included));
+
+    // a -> a
+    TypeDescription reader = readerType.getChildren().get(0);
+    TypeDescription mapped = transition.getFileType(reader);
+    TypeDescription original = fileType.getChildren().get(0);
+    assertSame(original, mapped);
+
+    // c -> null
+    reader = readerType.getChildren().get(1);
+    mapped = transition.getFileType(reader);
+    original = null;
+    assertSame(original, mapped);
+
+    // b -> b
+    reader = readerType.getChildren().get(2);
+    mapped = transition.getFileType(reader);
+    original = fileType.getChildren().get(1);
+    assertSame(original, mapped);
+  }
+
+  @Test
+  public void testRemoveLastField() {
+    TypeDescription fileType =
+        TypeDescription.fromString("struct<a:int,b:string,c:double>");
+    TypeDescription readerType =
+        TypeDescription.fromString("struct<a:int,b:string>");
+    boolean[] included = includeAll(readerType);
+    options.tolerateMissingSchema(false);
+    SchemaEvolution transition =
+        new SchemaEvolution(fileType, readerType, options.include(included));
+
+    // a -> a
+    TypeDescription reader = readerType.getChildren().get(0);
+    TypeDescription mapped = transition.getFileType(reader);
+    TypeDescription original = fileType.getChildren().get(0);
+    assertSame(original, mapped);
+
+    // b -> b
+    reader = readerType.getChildren().get(1);
+    mapped = transition.getFileType(reader);
+    original = fileType.getChildren().get(1);
+    assertSame(original, mapped);
+  }
+
+  @Test
+  public void testRemoveFieldBeforeEnd() {
+    TypeDescription fileType =
+        TypeDescription.fromString("struct<a:int,b:string,c:double>");
+    TypeDescription readerType =
+        TypeDescription.fromString("struct<a:int,c:double>");
+    boolean[] included = includeAll(readerType);
+    options.tolerateMissingSchema(false);
+    SchemaEvolution transition =
+        new SchemaEvolution(fileType, readerType, options.include(included));
+
+    // a -> a
+    TypeDescription reader = readerType.getChildren().get(0);
+    TypeDescription mapped = transition.getFileType(reader);
+    TypeDescription original = fileType.getChildren().get(0);
+    assertSame(original, mapped);
+
+    // c -> b
+    reader = readerType.getChildren().get(1);
+    mapped = transition.getFileType(reader);
+    original = fileType.getChildren().get(2);
+    assertSame(original, mapped);
+
+  }
+
+  @Test
+  public void testRemoveAndAddField() {
+    TypeDescription fileType =
+        TypeDescription.fromString("struct<a:int,b:string>");
+    TypeDescription readerType =
+        TypeDescription.fromString("struct<a:int,c:double>");
+    boolean[] included = includeAll(readerType);
+    options.tolerateMissingSchema(false);
+    SchemaEvolution transition =
+        new SchemaEvolution(fileType, readerType, options.include(included));
+
+    // a -> a
+    TypeDescription reader = readerType.getChildren().get(0);
+    TypeDescription mapped = transition.getFileType(reader);
+    TypeDescription original = fileType.getChildren().get(0);
+    assertSame(original, mapped);
+
+    // c -> null
+    reader = readerType.getChildren().get(1);
+    mapped = transition.getFileType(reader);
+    original = null;
+    assertSame(original, mapped);
+  }
+
+  @Test
+  public void testReorderFields() {
+    TypeDescription fileType =
+        TypeDescription.fromString("struct<a:int,b:string>");
+    TypeDescription readerType =
+        TypeDescription.fromString("struct<b:string,a:int>");
+    boolean[] included = includeAll(readerType);
+    options.tolerateMissingSchema(false);
+    SchemaEvolution transition =
+        new SchemaEvolution(fileType, readerType, options.include(included));
+
+    // b -> b
+    TypeDescription reader = readerType.getChildren().get(0);
+    TypeDescription mapped = transition.getFileType(reader);
+    TypeDescription original = fileType.getChildren().get(1);
+    assertSame(original, mapped);
+
+    // a -> a
+    reader = readerType.getChildren().get(1);
+    mapped = transition.getFileType(reader);
+    original = fileType.getChildren().get(0);
+    assertSame(original, mapped);
+  }
+
+  @Test
+  public void testAddFieldEndOfStruct() {
+    TypeDescription fileType =
+        TypeDescription.fromString("struct<a:struct<b:int>,c:string>");
+    TypeDescription readerType =
+        TypeDescription.fromString("struct<a:struct<b:int,d:double>,c:string>");
+    boolean[] included = includeAll(readerType);
+    options.tolerateMissingSchema(false);
+    SchemaEvolution transition =
+        new SchemaEvolution(fileType, readerType, options.include(included));
+
+    // a -> a
+    TypeDescription reader = readerType.getChildren().get(0);
+    TypeDescription mapped = transition.getFileType(reader);
+    TypeDescription original = fileType.getChildren().get(0);
+    assertSame(original, mapped);
+
+    // a.b -> a.b
+    TypeDescription readerChild = reader.getChildren().get(0);
+    mapped = transition.getFileType(readerChild);
+    TypeDescription originalChild = original.getChildren().get(0);
+    assertSame(originalChild, mapped);
+
+    // a.d -> null
+    readerChild = reader.getChildren().get(1);
+    mapped = transition.getFileType(readerChild);
+    originalChild = null;
+    assertSame(originalChild, mapped);
+
+    // c -> c
+    reader = readerType.getChildren().get(1);
+    mapped = transition.getFileType(reader);
+    original = fileType.getChildren().get(1);
+    assertSame(original, mapped);
+  }
+
+  @Test
+  public void testAddFieldBeforeEndOfStruct() {
+    TypeDescription fileType =
+        TypeDescription.fromString("struct<a:struct<b:int>,c:string>");
+    TypeDescription readerType =
+        TypeDescription.fromString("struct<a:struct<d:double,b:int>,c:string>");
+    boolean[] included = includeAll(readerType);
+    options.tolerateMissingSchema(false);
+    SchemaEvolution transition =
+        new SchemaEvolution(fileType, readerType, options.include(included));
+
+    // a -> a
+    TypeDescription reader = readerType.getChildren().get(0);
+    TypeDescription mapped = transition.getFileType(reader);
+    TypeDescription original = fileType.getChildren().get(0);
+    assertSame(original, mapped);
+
+    // a.b -> a.b
+    TypeDescription readerChild = reader.getChildren().get(1);
+    mapped = transition.getFileType(readerChild);
+    TypeDescription originalChild = original.getChildren().get(0);
+    assertSame(originalChild, mapped);
+
+    // a.d -> null
+    readerChild = reader.getChildren().get(0);
+    mapped = transition.getFileType(readerChild);
+    originalChild = null;
+    assertSame(originalChild, mapped);
+
+    // c -> c
+    reader = readerType.getChildren().get(1);
+    mapped = transition.getFileType(reader);
+    original = fileType.getChildren().get(1);
+    assertSame(original, mapped);
+  }
+
+  /**
+   * Two structs can be equal but in different locations. They can converge to this.
+   */
+  @Test
+  public void testAddSimilarField() {
+    TypeDescription fileType =
+        TypeDescription.fromString("struct<a:struct<b:int>>");
+    TypeDescription readerType =
+        TypeDescription.fromString("struct<a:struct<b:int>,c:struct<b:int>>");
+    boolean[] included = includeAll(readerType);
+    options.tolerateMissingSchema(false);
+    SchemaEvolution transition =
+        new SchemaEvolution(fileType, readerType, options.include(included));
+
+    // a -> a
+    TypeDescription reader = readerType.getChildren().get(0);
+    TypeDescription mapped = transition.getFileType(reader);
+    TypeDescription original = fileType.getChildren().get(0);
+    assertSame(original, mapped);
+
+    // a.b -> a.b
+    TypeDescription readerChild = reader.getChildren().get(0);
+    mapped = transition.getFileType(readerChild);
+    TypeDescription originalChild = original.getChildren().get(0);
+    assertSame(originalChild, mapped);
+
+    // c -> null
+    reader = readerType.getChildren().get(1);
+    mapped = transition.getFileType(reader);
+    original = null;
+    assertSame(original, mapped);
+
+    // c.b -> null
+    readerChild = reader.getChildren().get(0);
+    mapped = transition.getFileType(readerChild);
+    original = null;
+    assertSame(original, mapped);
+  }
+
+  /**
+   * Two structs can be equal but in different locations. They can converge to this.
+   */
+  @Test
+  public void testConvergentEvolution() {
+    TypeDescription fileType = TypeDescription
+        .fromString("struct<a:struct<a:int,b:string>,c:struct<a:int>>");
+    TypeDescription readerType = TypeDescription.fromString(
+        "struct<a:struct<a:int,b:string>,c:struct<a:int,b:string>>");
+    boolean[] included = includeAll(readerType);
+    options.tolerateMissingSchema(false);
+    SchemaEvolution transition =
+        new SchemaEvolution(fileType, readerType, options.include(included));
+
+    // c -> c
+    TypeDescription reader = readerType.getChildren().get(1);
+    TypeDescription mapped = transition.getFileType(reader);
+    TypeDescription original = fileType.getChildren().get(1);
+    assertSame(original, mapped);
+
+    // c.a -> c.a
+    TypeDescription readerchild = reader.getChildren().get(0);
+    mapped = transition.getFileType(readerchild);
+    original = original.getChildren().get(0);
+    assertSame(original, mapped);
+
+    // c.b -> null
+    readerchild = reader.getChildren().get(1);
+    mapped = transition.getFileType(readerchild);
+    original = null;
+    assertSame(original, mapped);
+  }
+
+  @Test
+  public void testMapEvolution() {
+    TypeDescription fileType =
+        TypeDescription
+            .fromString("struct<a:map<struct<a:int>,struct<a:int>>>");
+    TypeDescription readerType = TypeDescription.fromString(
+        "struct<a:map<struct<a:int,b:string>,struct<a:int,c:string>>>");
+    boolean[] included = includeAll(readerType);
+    options.tolerateMissingSchema(false);
+    SchemaEvolution transition =
+        new SchemaEvolution(fileType, readerType, options.include(included));
+
+    // a -> a
+    TypeDescription reader = readerType.getChildren().get(0);
+    TypeDescription mapped = transition.getFileType(reader);
+    TypeDescription original = fileType.getChildren().get(0);
+    assertSame(original, mapped);
+
+    // a.key -> a.key
+    TypeDescription readerchild = reader.getChildren().get(0);
+    mapped = transition.getFileType(readerchild);
+    original = original.getChildren().get(0);
+    assertSame(original, mapped);
+
+    // a.value -> a.value
+    readerchild = reader.getChildren().get(1);
+    mapped = transition.getFileType(readerchild);
+    original = fileType.getChildren().get(0).getChildren().get(1);
+    assertSame(original, mapped);
+  }
+
+  @Test
+  public void testListEvolution() {
+    TypeDescription fileType =
+        TypeDescription.fromString("struct<a:array<struct<b:int>>>");
+    TypeDescription readerType =
+        TypeDescription.fromString("struct<a:array<struct<b:int,c:string>>>");
+    boolean[] included = includeAll(readerType);
+    options.tolerateMissingSchema(false);
+    SchemaEvolution transition =
+        new SchemaEvolution(fileType, readerType, options.include(included));
+
+    // a -> a
+    TypeDescription reader = readerType.getChildren().get(0);
+    TypeDescription mapped = transition.getFileType(reader);
+    TypeDescription original = fileType.getChildren().get(0);
+    assertSame(original, mapped);
+
+    // a.element -> a.element
+    TypeDescription readerchild = reader.getChildren().get(0);
+    mapped = transition.getFileType(readerchild);
+    original = original.getChildren().get(0);
+    assertSame(original, mapped);
+
+    // a.b -> a.b
+    readerchild = reader.getChildren().get(0).getChildren().get(0);
+    mapped = transition.getFileType(readerchild);
+    original = original.getChildren().get(0);
+    assertSame(original, mapped);
+
+    // a.c -> null
+    readerchild = reader.getChildren().get(0).getChildren().get(1);
+    mapped = transition.getFileType(readerchild);
+    original = null;
+    assertSame(original, mapped);
+  }
+
+  @Test(expected = SchemaEvolution.IllegalEvolutionException.class)
+  public void testIncompatibleTypes() {
+    TypeDescription fileType = TypeDescription.fromString("struct<a:int>");
+    TypeDescription readerType = TypeDescription.fromString("struct<a:date>");
+    boolean[] included = includeAll(readerType);
+    options.tolerateMissingSchema(false);
+    SchemaEvolution transition =
+        new SchemaEvolution(fileType, readerType, options.include(included));
+  }
+
+  @Test
+  public void testAcidNamedEvolution() {
+    TypeDescription fileType = TypeDescription.fromString(
+        "struct<operation:int,originalTransaction:bigint,bucket:int," +
+            "rowId:bigint,currentTransaction:bigint," +
+            "row:struct<x:int,z:bigint,y:string>>");
+    TypeDescription readerType = TypeDescription.fromString(
+        "struct<x:int,y:string,z:bigint>");
+    SchemaEvolution evo = new SchemaEvolution(fileType, readerType, options);
+    assertTrue(evo.isAcid());
+    assertEquals("struct<operation:int,originalTransaction:bigint,bucket:int," +
+        "rowId:bigint,currentTransaction:bigint," +
+        "row:struct<x:int,y:string,z:bigint>>", evo.getReaderSchema().toString());
+    assertEquals("struct<x:int,y:string,z:bigint>",
+        evo.getReaderBaseSchema().toString());
+    // the first stuff should be an identity
+    for(int c=0; c < 8; ++c) {
+      assertEquals("column " + c, c, evo.getFileType(c).getId());
+    }
+    // y and z should swap places
+    assertEquals(9, evo.getFileType(8).getId());
+    assertEquals(8, evo.getFileType(9).getId());
+  }
+
+  @Test
+  public void testAcidPositionEvolutionAddField() {
+    TypeDescription fileType = TypeDescription.fromString(
+        "struct<operation:int,originalTransaction:bigint,bucket:int," +
+            "rowId:bigint,currentTransaction:bigint," +
+            "row:struct<_col0:int,_col1:string>>");
+    TypeDescription readerType = TypeDescription.fromString(
+        "struct<x:int,y:string,z:bigint>");
+    SchemaEvolution evo = new SchemaEvolution(fileType, readerType, options);
+    assertTrue(evo.isAcid());
+    assertEquals("struct<operation:int,originalTransaction:bigint,bucket:int," +
+        "rowId:bigint,currentTransaction:bigint," +
+        "row:struct<x:int,y:string,z:bigint>>", evo.getReaderSchema().toString());
+    assertEquals("struct<x:int,y:string,z:bigint>",
+        evo.getReaderBaseSchema().toString());
+    // the first stuff should be an identity
+    for(int c=0; c < 9; ++c) {
+      assertEquals("column " + c, c, evo.getFileType(c).getId());
+    }
+    // the file doesn't have z
+    assertEquals(null, evo.getFileType(9));
+  }
+
+  @Test
+  public void testAcidPositionEvolutionRemoveField() {
+    TypeDescription fileType = TypeDescription.fromString(
+        "struct<operation:int,originalTransaction:bigint,bucket:int," +
+            "rowId:bigint,currentTransaction:bigint," +
+            "row:struct<_col0:int,_col1:string,_col2:double>>");
+    TypeDescription readerType = TypeDescription.fromString(
+        "struct<x:int,y:string>");
+    SchemaEvolution evo = new SchemaEvolution(fileType, readerType, options);
+    assertTrue(evo.isAcid());
+    assertEquals("struct<operation:int,originalTransaction:bigint,bucket:int," +
+        "rowId:bigint,currentTransaction:bigint," +
+        "row:struct<x:int,y:string>>", evo.getReaderSchema().toString());
+    assertEquals("struct<x:int,y:string>",
+        evo.getReaderBaseSchema().toString());
+    // the first stuff should be an identity
+    boolean[] fileInclude = evo.getFileIncluded();
+    for(int c=0; c < 9; ++c) {
+      assertEquals("column " + c, c, evo.getFileType(c).getId());
+      assertTrue("column " + c, fileInclude[c]);
+    }
+    // don't read the last column
+    assertFalse(fileInclude[9]);
+  }
+
+  @Test
+  public void testAcidPositionSubstructure() {
+    TypeDescription fileType = TypeDescription.fromString(
+        "struct<operation:int,originalTransaction:bigint,bucket:int," +
+            "rowId:bigint,currentTransaction:bigint," +
+            "row:struct<_col0:int,_col1:struct<z:int,x:double,y:string>," +
+            "_col2:double>>");
+    TypeDescription readerType = TypeDescription.fromString(
+        "struct<a:int,b:struct<x:double,y:string,z:int>,c:double>");
+    SchemaEvolution evo = new SchemaEvolution(fileType, readerType, options);
+    assertTrue(evo.isAcid());
+    // the first stuff should be an identity
+    boolean[] fileInclude = evo.getFileIncluded();
+    for(int c=0; c < 9; ++c) {
+      assertEquals("column " + c, c, evo.getFileType(c).getId());
+    }
+    assertEquals(10, evo.getFileType(9).getId());
+    assertEquals(11, evo.getFileType(10).getId());
+    assertEquals(9, evo.getFileType(11).getId());
+    assertEquals(12, evo.getFileType(12).getId());
+    assertEquals(13, fileInclude.length);
+    for(int c=0; c < fileInclude.length; ++c) {
+      assertTrue("column " + c, fileInclude[c]);
+    }
+  }
+
+  @Test
+  public void testNonAcidPositionSubstructure() {
+    TypeDescription fileType = TypeDescription.fromString(
+        "struct<_col0:int,_col1:struct<x:double,z:int>," +
+            "_col2:double>");
+    TypeDescription readerType = TypeDescription.fromString(
+        "struct<a:int,b:struct<x:double,y:string,z:int>,c:double>");
+    SchemaEvolution evo = new SchemaEvolution(fileType, readerType, options);
+    assertFalse(evo.isAcid());
+    // the first stuff should be an identity
+    boolean[] fileInclude = evo.getFileIncluded();
+    assertEquals(0, evo.getFileType(0).getId());
+    assertEquals(1, evo.getFileType(1).getId());
+    assertEquals(2, evo.getFileType(2).getId());
+    assertEquals(3, evo.getFileType(3).getId());
+    assertEquals(null, evo.getFileType(4));
+    assertEquals(4, evo.getFileType(5).getId());
+    assertEquals(5, evo.getFileType(6).getId());
+    assertEquals(6, fileInclude.length);
+    for(int c=0; c < fileInclude.length; ++c) {
+      assertTrue("column " + c, fileInclude[c]);
+    }
+  }
+
+  @Test
+  public void testFileIncludeWithNoEvolution() {
+    TypeDescription fileType = TypeDescription.fromString(
+        "struct<a:int,b:double,c:string>");
+    SchemaEvolution evo = new SchemaEvolution(fileType,
+        options.include(new boolean[]{true, false, true, false}));
+    assertFalse(evo.isAcid());
+    assertEquals("struct<a:int,b:double,c:string>",
+        evo.getReaderBaseSchema().toString());
+    boolean[] fileInclude = evo.getFileIncluded();
+    assertTrue(fileInclude[0]);
+    assertFalse(fileInclude[1]);
+    assertTrue(fileInclude[2]);
+    assertFalse(fileInclude[3]);
+  }
 }

http://git-wip-us.apache.org/repos/asf/orc/blob/9ba93782/java/mapreduce/src/java/org/apache/orc/mapred/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/java/mapreduce/src/java/org/apache/orc/mapred/OrcInputFormat.java b/java/mapreduce/src/java/org/apache/orc/mapred/OrcInputFormat.java
index ac8ca61..f742f7c 100644
--- a/java/mapreduce/src/java/org/apache/orc/mapred/OrcInputFormat.java
+++ b/java/mapreduce/src/java/org/apache/orc/mapred/OrcInputFormat.java
@@ -110,10 +110,11 @@ public class OrcInputFormat<V extends WritableComparable>
                                             long length) {
     TypeDescription schema =
         TypeDescription.fromString(OrcConf.MAPRED_INPUT_SCHEMA.getString(conf));
-    Reader.Options options = new Reader.Options()
+    Reader.Options options = reader.options()
         .range(start, length)
         .useZeroCopy(OrcConf.USE_ZEROCOPY.getBoolean(conf))
-        .skipCorruptRecords(OrcConf.SKIP_CORRUPT_DATA.getBoolean(conf));
+        .skipCorruptRecords(OrcConf.SKIP_CORRUPT_DATA.getBoolean(conf))
+        .tolerateMissingSchema(OrcConf.TOLERATE_MISSING_SCHEMA.getBoolean(conf));
     if (schema != null) {
       options.schema(schema);
     } else {