You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by om...@apache.org on 2016/05/20 21:23:02 UTC

[24/27] hive git commit: HIVE-11417. Move the ReaderImpl and RowReaderImpl to the ORC module, by making shims for the row by row reader. (omalley reviewed by prasanth_j)

http://git-wip-us.apache.org/repos/asf/hive/blob/ffb79509/orc/src/java/org/apache/orc/impl/RecordReaderUtils.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/RecordReaderUtils.java b/orc/src/java/org/apache/orc/impl/RecordReaderUtils.java
new file mode 100644
index 0000000..1067957
--- /dev/null
+++ b/orc/src/java/org/apache/orc/impl/RecordReaderUtils.java
@@ -0,0 +1,578 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.impl;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.io.DiskRange;
+import org.apache.hadoop.hive.common.io.DiskRangeList;
+import org.apache.hadoop.hive.common.io.DiskRangeList.CreateHelper;
+import org.apache.hadoop.hive.common.io.DiskRangeList.MutateHelper;
+import org.apache.orc.CompressionCodec;
+import org.apache.orc.DataReader;
+import org.apache.orc.OrcProto;
+
+import com.google.common.collect.ComparisonChain;
+import org.apache.orc.StripeInformation;
+
+/**
+ * Stateless methods shared between RecordReaderImpl and EncodedReaderImpl.
+ */
+public class RecordReaderUtils {
+  private static final HadoopShims SHIMS = HadoopShims.Factory.get();
+
+  private static class DefaultDataReader implements DataReader {
+    private FSDataInputStream file = null;
+    private final ByteBufferAllocatorPool pool;
+    private HadoopShims.ZeroCopyReaderShim zcr = null;
+    private final FileSystem fs;
+    private final Path path;
+    private final boolean useZeroCopy;
+    private final CompressionCodec codec;
+    private final int bufferSize;
+    private final int typeCount;
+
+    private DefaultDataReader(DefaultDataReader other) {
+      this.pool = other.pool;
+      this.bufferSize = other.bufferSize;
+      this.typeCount = other.typeCount;
+      this.fs = other.fs;
+      this.path = other.path;
+      this.useZeroCopy = other.useZeroCopy;
+      this.codec = other.codec;
+    }
+
+    private DefaultDataReader(DataReaderProperties properties) {
+      this.fs = properties.getFileSystem();
+      this.path = properties.getPath();
+      this.useZeroCopy = properties.getZeroCopy();
+      this.codec = WriterImpl.createCodec(properties.getCompression());
+      this.bufferSize = properties.getBufferSize();
+      this.typeCount = properties.getTypeCount();
+      if (useZeroCopy) {
+        this.pool = new ByteBufferAllocatorPool();
+      } else {
+        this.pool = null;
+      }
+    }
+
+    @Override
+    public void open() throws IOException {
+      this.file = fs.open(path);
+      if (useZeroCopy) {
+        zcr = RecordReaderUtils.createZeroCopyShim(file, codec, pool);
+      } else {
+        zcr = null;
+      }
+    }
+
+    @Override
+    public OrcIndex readRowIndex(StripeInformation stripe,
+                                 OrcProto.StripeFooter footer,
+                                 boolean[] included,
+                                 OrcProto.RowIndex[] indexes,
+                                 boolean[] sargColumns,
+                                 OrcProto.BloomFilterIndex[] bloomFilterIndices
+                                 ) throws IOException {
+      if (file == null) {
+        open();
+      }
+      if (footer == null) {
+        footer = readStripeFooter(stripe);
+      }
+      if (indexes == null) {
+        indexes = new OrcProto.RowIndex[typeCount];
+      }
+      if (bloomFilterIndices == null) {
+        bloomFilterIndices = new OrcProto.BloomFilterIndex[typeCount];
+      }
+      long offset = stripe.getOffset();
+      List<OrcProto.Stream> streams = footer.getStreamsList();
+      for (int i = 0; i < streams.size(); i++) {
+        OrcProto.Stream stream = streams.get(i);
+        OrcProto.Stream nextStream = null;
+        if (i < streams.size() - 1) {
+          nextStream = streams.get(i+1);
+        }
+        int col = stream.getColumn();
+        int len = (int) stream.getLength();
+        // row index stream and bloom filter are interlaced, check if the sarg column contains bloom
+        // filter and combine the io to read row index and bloom filters for that column together
+        if (stream.hasKind() && (stream.getKind() == OrcProto.Stream.Kind.ROW_INDEX)) {
+          boolean readBloomFilter = false;
+          if (sargColumns != null && sargColumns[col] &&
+              nextStream.getKind() == OrcProto.Stream.Kind.BLOOM_FILTER) {
+            len += nextStream.getLength();
+            i += 1;
+            readBloomFilter = true;
+          }
+          if ((included == null || included[col]) && indexes[col] == null) {
+            byte[] buffer = new byte[len];
+            file.readFully(offset, buffer, 0, buffer.length);
+            ByteBuffer bb = ByteBuffer.wrap(buffer);
+            indexes[col] = OrcProto.RowIndex.parseFrom(InStream.create("index",
+                Lists.<DiskRange>newArrayList(new BufferChunk(bb, 0)), stream.getLength(),
+                codec, bufferSize));
+            if (readBloomFilter) {
+              bb.position((int) stream.getLength());
+              bloomFilterIndices[col] = OrcProto.BloomFilterIndex.parseFrom(InStream.create(
+                  "bloom_filter", Lists.<DiskRange>newArrayList(new BufferChunk(bb, 0)),
+                  nextStream.getLength(), codec, bufferSize));
+            }
+          }
+        }
+        offset += len;
+      }
+
+      OrcIndex index = new OrcIndex(indexes, bloomFilterIndices);
+      return index;
+    }
+
+    @Override
+    public OrcProto.StripeFooter readStripeFooter(StripeInformation stripe) throws IOException {
+      if (file == null) {
+        open();
+      }
+      long offset = stripe.getOffset() + stripe.getIndexLength() + stripe.getDataLength();
+      int tailLength = (int) stripe.getFooterLength();
+
+      // read the footer
+      ByteBuffer tailBuf = ByteBuffer.allocate(tailLength);
+      file.readFully(offset, tailBuf.array(), tailBuf.arrayOffset(), tailLength);
+      return OrcProto.StripeFooter.parseFrom(InStream.createCodedInputStream("footer",
+          Lists.<DiskRange>newArrayList(new BufferChunk(tailBuf, 0)),
+          tailLength, codec, bufferSize));
+    }
+
+    @Override
+    public DiskRangeList readFileData(
+        DiskRangeList range, long baseOffset, boolean doForceDirect) throws IOException {
+      return RecordReaderUtils.readDiskRanges(file, zcr, baseOffset, range, doForceDirect);
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (pool != null) {
+        pool.clear();
+      }
+      // close both zcr and file
+      try (HadoopShims.ZeroCopyReaderShim myZcr = zcr) {
+        if (file != null) {
+          file.close();
+        }
+      }
+    }
+
+    @Override
+    public boolean isTrackingDiskRanges() {
+      return zcr != null;
+    }
+
+    @Override
+    public void releaseBuffer(ByteBuffer buffer) {
+      zcr.releaseBuffer(buffer);
+    }
+
+    @Override
+    public DataReader clone() {
+      return new DefaultDataReader(this);
+    }
+
+  }
+
+  public static DataReader createDefaultDataReader(DataReaderProperties properties) {
+    return new DefaultDataReader(properties);
+  }
+
+  public static boolean[] findPresentStreamsByColumn(
+      List<OrcProto.Stream> streamList, List<OrcProto.Type> types) {
+    boolean[] hasNull = new boolean[types.size()];
+    for(OrcProto.Stream stream: streamList) {
+      if (stream.hasKind() && (stream.getKind() == OrcProto.Stream.Kind.PRESENT)) {
+        hasNull[stream.getColumn()] = true;
+      }
+    }
+    return hasNull;
+  }
+
+  /**
+   * Does region A overlap region B? The end points are inclusive on both sides.
+   * @param leftA A's left point
+   * @param rightA A's right point
+   * @param leftB B's left point
+   * @param rightB B's right point
+   * @return Does region A overlap region B?
+   */
+  static boolean overlap(long leftA, long rightA, long leftB, long rightB) {
+    if (leftA <= leftB) {
+      return rightA >= leftB;
+    }
+    return rightB >= leftA;
+  }
+
+  public static void addEntireStreamToRanges(
+      long offset, long length, CreateHelper list, boolean doMergeBuffers) {
+    list.addOrMerge(offset, offset + length, doMergeBuffers, false);
+  }
+
+  public static void addRgFilteredStreamToRanges(OrcProto.Stream stream,
+      boolean[] includedRowGroups, boolean isCompressed, OrcProto.RowIndex index,
+      OrcProto.ColumnEncoding encoding, OrcProto.Type type, int compressionSize, boolean hasNull,
+      long offset, long length, CreateHelper list, boolean doMergeBuffers) {
+    for (int group = 0; group < includedRowGroups.length; ++group) {
+      if (!includedRowGroups[group]) continue;
+      int posn = getIndexPosition(
+          encoding.getKind(), type.getKind(), stream.getKind(), isCompressed, hasNull);
+      long start = index.getEntry(group).getPositions(posn);
+      final long nextGroupOffset;
+      boolean isLast = group == (includedRowGroups.length - 1);
+      nextGroupOffset = isLast ? length : index.getEntry(group + 1).getPositions(posn);
+
+      start += offset;
+      long end = offset + estimateRgEndOffset(
+          isCompressed, isLast, nextGroupOffset, length, compressionSize);
+      list.addOrMerge(start, end, doMergeBuffers, true);
+    }
+  }
+
+  public static long estimateRgEndOffset(boolean isCompressed, boolean isLast,
+      long nextGroupOffset, long streamLength, int bufferSize) {
+    // figure out the worst case last location
+    // if adjacent groups have the same compressed block offset then stretch the slop
+    // by factor of 2 to safely accommodate the next compression block.
+    // One for the current compression block and another for the next compression block.
+    long slop = isCompressed ? 2 * (OutStream.HEADER_SIZE + bufferSize) : WORST_UNCOMPRESSED_SLOP;
+    return isLast ? streamLength : Math.min(streamLength, nextGroupOffset + slop);
+  }
+
+  private static final int BYTE_STREAM_POSITIONS = 1;
+  private static final int RUN_LENGTH_BYTE_POSITIONS = BYTE_STREAM_POSITIONS + 1;
+  private static final int BITFIELD_POSITIONS = RUN_LENGTH_BYTE_POSITIONS + 1;
+  private static final int RUN_LENGTH_INT_POSITIONS = BYTE_STREAM_POSITIONS + 1;
+
+  /**
+   * Get the offset in the index positions for the column that the given
+   * stream starts.
+   * @param columnEncoding the encoding of the column
+   * @param columnType the type of the column
+   * @param streamType the kind of the stream
+   * @param isCompressed is the file compressed
+   * @param hasNulls does the column have a PRESENT stream?
+   * @return the number of positions that will be used for that stream
+   */
+  public static int getIndexPosition(OrcProto.ColumnEncoding.Kind columnEncoding,
+                              OrcProto.Type.Kind columnType,
+                              OrcProto.Stream.Kind streamType,
+                              boolean isCompressed,
+                              boolean hasNulls) {
+    if (streamType == OrcProto.Stream.Kind.PRESENT) {
+      return 0;
+    }
+    int compressionValue = isCompressed ? 1 : 0;
+    int base = hasNulls ? (BITFIELD_POSITIONS + compressionValue) : 0;
+    switch (columnType) {
+      case BOOLEAN:
+      case BYTE:
+      case SHORT:
+      case INT:
+      case LONG:
+      case FLOAT:
+      case DOUBLE:
+      case DATE:
+      case STRUCT:
+      case MAP:
+      case LIST:
+      case UNION:
+        return base;
+      case CHAR:
+      case VARCHAR:
+      case STRING:
+        if (columnEncoding == OrcProto.ColumnEncoding.Kind.DICTIONARY ||
+            columnEncoding == OrcProto.ColumnEncoding.Kind.DICTIONARY_V2) {
+          return base;
+        } else {
+          if (streamType == OrcProto.Stream.Kind.DATA) {
+            return base;
+          } else {
+            return base + BYTE_STREAM_POSITIONS + compressionValue;
+          }
+        }
+      case BINARY:
+        if (streamType == OrcProto.Stream.Kind.DATA) {
+          return base;
+        }
+        return base + BYTE_STREAM_POSITIONS + compressionValue;
+      case DECIMAL:
+        if (streamType == OrcProto.Stream.Kind.DATA) {
+          return base;
+        }
+        return base + BYTE_STREAM_POSITIONS + compressionValue;
+      case TIMESTAMP:
+        if (streamType == OrcProto.Stream.Kind.DATA) {
+          return base;
+        }
+        return base + RUN_LENGTH_INT_POSITIONS + compressionValue;
+      default:
+        throw new IllegalArgumentException("Unknown type " + columnType);
+    }
+  }
+
+  // for uncompressed streams, what is the most overlap with the following set
+  // of rows (long vint literal group).
+  static final int WORST_UNCOMPRESSED_SLOP = 2 + 8 * 512;
+
+  /**
+   * Is this stream part of a dictionary?
+   * @return is this part of a dictionary?
+   */
+  public static boolean isDictionary(OrcProto.Stream.Kind kind,
+                              OrcProto.ColumnEncoding encoding) {
+    assert kind != OrcProto.Stream.Kind.DICTIONARY_COUNT;
+    OrcProto.ColumnEncoding.Kind encodingKind = encoding.getKind();
+    return kind == OrcProto.Stream.Kind.DICTIONARY_DATA ||
+      (kind == OrcProto.Stream.Kind.LENGTH &&
+       (encodingKind == OrcProto.ColumnEncoding.Kind.DICTIONARY ||
+        encodingKind == OrcProto.ColumnEncoding.Kind.DICTIONARY_V2));
+  }
+
+  /**
+   * Build a string representation of a list of disk ranges.
+   * @param range ranges to stringify
+   * @return the resulting string
+   */
+  public static String stringifyDiskRanges(DiskRangeList range) {
+    StringBuilder buffer = new StringBuilder();
+    buffer.append("[");
+    boolean isFirst = true;
+    while (range != null) {
+      if (!isFirst) {
+        buffer.append(", {");
+      } else {
+        buffer.append("{");
+      }
+      isFirst = false;
+      buffer.append(range.toString());
+      buffer.append("}");
+      range = range.next;
+    }
+    buffer.append("]");
+    return buffer.toString();
+  }
+
+  /**
+   * Read the list of ranges from the file.
+   * @param file the file to read
+   * @param base the base of the stripe
+   * @param range the disk ranges within the stripe to read
+   * @return the bytes read for each disk range, which is the same length as
+   *    ranges
+   * @throws IOException
+   */
+  static DiskRangeList readDiskRanges(FSDataInputStream file,
+                                      HadoopShims.ZeroCopyReaderShim zcr,
+                                 long base,
+                                 DiskRangeList range,
+                                 boolean doForceDirect) throws IOException {
+    if (range == null) return null;
+    DiskRangeList prev = range.prev;
+    if (prev == null) {
+      prev = new MutateHelper(range);
+    }
+    while (range != null) {
+      if (range.hasData()) {
+        range = range.next;
+        continue;
+      }
+      int len = (int) (range.getEnd() - range.getOffset());
+      long off = range.getOffset();
+      if (zcr != null) {
+        file.seek(base + off);
+        boolean hasReplaced = false;
+        while (len > 0) {
+          ByteBuffer partial = zcr.readBuffer(len, false);
+          BufferChunk bc = new BufferChunk(partial, off);
+          if (!hasReplaced) {
+            range.replaceSelfWith(bc);
+            hasReplaced = true;
+          } else {
+            range.insertAfter(bc);
+          }
+          range = bc;
+          int read = partial.remaining();
+          len -= read;
+          off += read;
+        }
+      } else {
+        // Don't use HDFS ByteBuffer API because it has no readFully, and is buggy and pointless.
+        byte[] buffer = new byte[len];
+        file.readFully((base + off), buffer, 0, buffer.length);
+        ByteBuffer bb = null;
+        if (doForceDirect) {
+          bb = ByteBuffer.allocateDirect(len);
+          bb.put(buffer);
+          bb.position(0);
+          bb.limit(len);
+        } else {
+          bb = ByteBuffer.wrap(buffer);
+        }
+        range = range.replaceSelfWith(new BufferChunk(bb, range.getOffset()));
+      }
+      range = range.next;
+    }
+    return prev.next;
+  }
+
+
+  static List<DiskRange> getStreamBuffers(DiskRangeList range, long offset, long length) {
+    // This assumes sorted ranges (as do many other parts of ORC code.
+    ArrayList<DiskRange> buffers = new ArrayList<DiskRange>();
+    if (length == 0) return buffers;
+    long streamEnd = offset + length;
+    boolean inRange = false;
+    while (range != null) {
+      if (!inRange) {
+        if (range.getEnd() <= offset) {
+          range = range.next;
+          continue; // Skip until we are in range.
+        }
+        inRange = true;
+        if (range.getOffset() < offset) {
+          // Partial first buffer, add a slice of it.
+          buffers.add(range.sliceAndShift(offset, Math.min(streamEnd, range.getEnd()), -offset));
+          if (range.getEnd() >= streamEnd) break; // Partial first buffer is also partial last buffer.
+          range = range.next;
+          continue;
+        }
+      } else if (range.getOffset() >= streamEnd) {
+        break;
+      }
+      if (range.getEnd() > streamEnd) {
+        // Partial last buffer (may also be the first buffer), add a slice of it.
+        buffers.add(range.sliceAndShift(range.getOffset(), streamEnd, -offset));
+        break;
+      }
+      // Buffer that belongs entirely to one stream.
+      // TODO: ideally we would want to reuse the object and remove it from the list, but we cannot
+      //       because bufferChunks is also used by clearStreams for zcr. Create a useless dup.
+      buffers.add(range.sliceAndShift(range.getOffset(), range.getEnd(), -offset));
+      if (range.getEnd() == streamEnd) break;
+      range = range.next;
+    }
+    return buffers;
+  }
+
+  static HadoopShims.ZeroCopyReaderShim createZeroCopyShim(FSDataInputStream file,
+      CompressionCodec codec, ByteBufferAllocatorPool pool) throws IOException {
+    if ((codec == null || ((codec instanceof DirectDecompressionCodec)
+            && ((DirectDecompressionCodec) codec).isAvailable()))) {
+      /* codec is null or is available */
+      return SHIMS.getZeroCopyReader(file, pool);
+    }
+    return null;
+  }
+
+  // this is an implementation copied from ElasticByteBufferPool in hadoop-2,
+  // which lacks a clear()/clean() operation
+  public final static class ByteBufferAllocatorPool implements HadoopShims.ByteBufferPoolShim {
+    private static final class Key implements Comparable<Key> {
+      private final int capacity;
+      private final long insertionGeneration;
+
+      Key(int capacity, long insertionGeneration) {
+        this.capacity = capacity;
+        this.insertionGeneration = insertionGeneration;
+      }
+
+      @Override
+      public int compareTo(Key other) {
+        return ComparisonChain.start().compare(capacity, other.capacity)
+            .compare(insertionGeneration, other.insertionGeneration).result();
+      }
+
+      @Override
+      public boolean equals(Object rhs) {
+        if (rhs == null) {
+          return false;
+        }
+        try {
+          Key o = (Key) rhs;
+          return (compareTo(o) == 0);
+        } catch (ClassCastException e) {
+          return false;
+        }
+      }
+
+      @Override
+      public int hashCode() {
+        return new HashCodeBuilder().append(capacity).append(insertionGeneration)
+            .toHashCode();
+      }
+    }
+
+    private final TreeMap<Key, ByteBuffer> buffers = new TreeMap<Key, ByteBuffer>();
+
+    private final TreeMap<Key, ByteBuffer> directBuffers = new TreeMap<Key, ByteBuffer>();
+
+    private long currentGeneration = 0;
+
+    private final TreeMap<Key, ByteBuffer> getBufferTree(boolean direct) {
+      return direct ? directBuffers : buffers;
+    }
+
+    public void clear() {
+      buffers.clear();
+      directBuffers.clear();
+    }
+
+    @Override
+    public ByteBuffer getBuffer(boolean direct, int length) {
+      TreeMap<Key, ByteBuffer> tree = getBufferTree(direct);
+      Map.Entry<Key, ByteBuffer> entry = tree.ceilingEntry(new Key(length, 0));
+      if (entry == null) {
+        return direct ? ByteBuffer.allocateDirect(length) : ByteBuffer
+            .allocate(length);
+      }
+      tree.remove(entry.getKey());
+      return entry.getValue();
+    }
+
+    @Override
+    public void putBuffer(ByteBuffer buffer) {
+      TreeMap<Key, ByteBuffer> tree = getBufferTree(buffer.isDirect());
+      while (true) {
+        Key key = new Key(buffer.capacity(), currentGeneration++);
+        if (!tree.containsKey(key)) {
+          tree.put(key, buffer);
+          return;
+        }
+        // Buffers are indexed by (capacity, generation).
+        // If our key is not unique on the first try, we try again
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/ffb79509/orc/src/java/org/apache/orc/impl/SchemaEvolution.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/SchemaEvolution.java b/orc/src/java/org/apache/orc/impl/SchemaEvolution.java
new file mode 100644
index 0000000..2c80aaa
--- /dev/null
+++ b/orc/src/java/org/apache/orc/impl/SchemaEvolution.java
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.orc.TypeDescription;
+
+/**
+ * Take the file types and the (optional) configuration column names/types and see if there
+ * has been schema evolution.
+ */
+public class SchemaEvolution {
+  private final Map<TypeDescription, TypeDescription> readerToFile;
+  private final boolean[] included;
+  private final TypeDescription readerSchema;
+  private static final Log LOG = LogFactory.getLog(SchemaEvolution.class);
+
+  public SchemaEvolution(TypeDescription readerSchema, boolean[] included) {
+    this.included = included;
+    readerToFile = null;
+    this.readerSchema = readerSchema;
+  }
+
+  public SchemaEvolution(TypeDescription fileSchema,
+                         TypeDescription readerSchema,
+                         boolean[] included) throws IOException {
+    readerToFile = new HashMap<>(readerSchema.getMaximumId() + 1);
+    this.included = included;
+    if (checkAcidSchema(fileSchema)) {
+      this.readerSchema = createEventSchema(readerSchema);
+    } else {
+      this.readerSchema = readerSchema;
+    }
+    buildMapping(fileSchema, this.readerSchema);
+  }
+
+  public TypeDescription getReaderSchema() {
+    return readerSchema;
+  }
+
+  public TypeDescription getFileType(TypeDescription readerType) {
+    TypeDescription result;
+    if (readerToFile == null) {
+      if (included == null || included[readerType.getId()]) {
+        result = readerType;
+      } else {
+        result = null;
+      }
+    } else {
+      result = readerToFile.get(readerType);
+    }
+    return result;
+  }
+
+  void buildMapping(TypeDescription fileType,
+                    TypeDescription readerType) throws IOException {
+    // if the column isn't included, don't map it
+    if (included != null && !included[readerType.getId()]) {
+      return;
+    }
+    boolean isOk = true;
+    // check the easy case first
+    if (fileType.getCategory() == readerType.getCategory()) {
+      switch (readerType.getCategory()) {
+        case BOOLEAN:
+        case BYTE:
+        case SHORT:
+        case INT:
+        case LONG:
+        case DOUBLE:
+        case FLOAT:
+        case STRING:
+        case TIMESTAMP:
+        case BINARY:
+        case DATE:
+          // these are always a match
+          break;
+        case CHAR:
+        case VARCHAR:
+          // HIVE-13648: Look at ORC data type conversion edge cases (CHAR, VARCHAR, DECIMAL)
+          isOk = fileType.getMaxLength() == readerType.getMaxLength();
+          break;
+        case DECIMAL:
+          // HIVE-13648: Look at ORC data type conversion edge cases (CHAR, VARCHAR, DECIMAL)
+          // TODO we don't enforce scale and precision checks, but probably should
+          break;
+        case UNION:
+        case MAP:
+        case LIST: {
+          // these must be an exact match
+          List<TypeDescription> fileChildren = fileType.getChildren();
+          List<TypeDescription> readerChildren = readerType.getChildren();
+          if (fileChildren.size() == readerChildren.size()) {
+            for(int i=0; i < fileChildren.size(); ++i) {
+              buildMapping(fileChildren.get(i), readerChildren.get(i));
+            }
+          } else {
+            isOk = false;
+          }
+          break;
+        }
+        case STRUCT: {
+          // allow either side to have fewer fields than the other
+          List<TypeDescription> fileChildren = fileType.getChildren();
+          List<TypeDescription> readerChildren = readerType.getChildren();
+          int jointSize = Math.min(fileChildren.size(), readerChildren.size());
+          for(int i=0; i < jointSize; ++i) {
+            buildMapping(fileChildren.get(i), readerChildren.get(i));
+          }
+          break;
+        }
+        default:
+          throw new IllegalArgumentException("Unknown type " + readerType);
+      }
+    } else {
+      /*
+       * Check for the few cases where will not convert....
+       */
+
+      isOk = ConvertTreeReaderFactory.canConvert(fileType, readerType);
+    }
+    if (isOk) {
+      readerToFile.put(readerType, fileType);
+    } else {
+      throw new IOException(
+          String.format(
+              "ORC does not support type conversion from file type %s (%d) to reader type %s (%d)",
+              fileType.toString(), fileType.getId(),
+              readerType.toString(), readerType.getId()));
+    }
+  }
+
+  private static boolean checkAcidSchema(TypeDescription type) {
+    if (type.getCategory().equals(TypeDescription.Category.STRUCT)) {
+      List<String> rootFields = type.getFieldNames();
+      if (acidEventFieldNames.equals(rootFields)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * @param typeDescr
+   * @return ORC types for the ACID event based on the row's type description
+   */
+  public static TypeDescription createEventSchema(TypeDescription typeDescr) {
+    TypeDescription result = TypeDescription.createStruct()
+        .addField("operation", TypeDescription.createInt())
+        .addField("originalTransaction", TypeDescription.createLong())
+        .addField("bucket", TypeDescription.createInt())
+        .addField("rowId", TypeDescription.createLong())
+        .addField("currentTransaction", TypeDescription.createLong())
+        .addField("row", typeDescr.clone());
+    return result;
+  }
+
+  public static final List<String> acidEventFieldNames= new ArrayList<String>();
+  static {
+    acidEventFieldNames.add("operation");
+    acidEventFieldNames.add("originalTransaction");
+    acidEventFieldNames.add("bucket");
+    acidEventFieldNames.add("rowId");
+    acidEventFieldNames.add("currentTransaction");
+    acidEventFieldNames.add("row");
+  }
+}