You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by om...@apache.org on 2016/05/20 21:22:49 UTC

[11/27] hive git commit: HIVE-11417. Move the ReaderImpl and RowReaderImpl to the ORC module, by making shims for the row by row reader. (omalley reviewed by prasanth_j)

http://git-wip-us.apache.org/repos/asf/hive/blob/ffb79509/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java
deleted file mode 100644
index 4192588..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java
+++ /dev/null
@@ -1,586 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.io.orc;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-
-import com.google.common.collect.Lists;
-import org.apache.commons.lang.builder.HashCodeBuilder;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.io.DiskRange;
-import org.apache.hadoop.hive.common.io.DiskRangeList;
-import org.apache.hadoop.hive.common.io.DiskRangeList.CreateHelper;
-import org.apache.hadoop.hive.common.io.DiskRangeList.MutateHelper;
-import org.apache.hadoop.hive.shims.HadoopShims;
-import org.apache.hadoop.hive.shims.ShimLoader;
-import org.apache.hadoop.hive.shims.HadoopShims.ByteBufferPoolShim;
-import org.apache.hadoop.hive.shims.HadoopShims.ZeroCopyReaderShim;
-import org.apache.orc.StripeInformation;
-import org.apache.orc.impl.BufferChunk;
-import org.apache.orc.CompressionCodec;
-import org.apache.orc.DataReader;
-import org.apache.orc.impl.DataReaderProperties;
-import org.apache.orc.impl.DirectDecompressionCodec;
-import org.apache.orc.OrcProto;
-
-import com.google.common.collect.ComparisonChain;
-import org.apache.orc.impl.InStream;
-import org.apache.orc.impl.OrcIndex;
-import org.apache.orc.impl.OutStream;
-
-/**
- * Stateless methods shared between RecordReaderImpl and EncodedReaderImpl.
- */
-public class RecordReaderUtils {
-  private static final HadoopShims SHIMS = ShimLoader.getHadoopShims();
-
-  private static class DefaultDataReader implements DataReader {
-    private FSDataInputStream file = null;
-    private final ByteBufferAllocatorPool pool;
-    private ZeroCopyReaderShim zcr = null;
-    private final FileSystem fs;
-    private final Path path;
-    private final boolean useZeroCopy;
-    private final CompressionCodec codec;
-    private final int bufferSize;
-    private final int typeCount;
-
-    private DefaultDataReader(DefaultDataReader other) {
-      this.pool = other.pool;
-      this.zcr = other.zcr;
-      this.bufferSize = other.bufferSize;
-      this.typeCount = other.typeCount;
-      this.fs = other.fs;
-      this.path = other.path;
-      this.useZeroCopy = other.useZeroCopy;
-      this.codec = other.codec;
-    }
-
-    private DefaultDataReader(DataReaderProperties properties) {
-      this.fs = properties.getFileSystem();
-      this.path = properties.getPath();
-      this.useZeroCopy = properties.getZeroCopy();
-      this.codec = WriterImpl.createCodec(properties.getCompression());
-      this.bufferSize = properties.getBufferSize();
-      this.typeCount = properties.getTypeCount();
-      if (useZeroCopy) {
-        this.pool = new ByteBufferAllocatorPool();
-      } else {
-        this.pool = null;
-      }
-    }
-
-    @Override
-    public void open() throws IOException {
-      this.file = fs.open(path);
-      if (useZeroCopy) {
-        zcr = RecordReaderUtils.createZeroCopyShim(file, codec, pool);
-      } else {
-        zcr = null;
-      }
-    }
-
-    @Override
-    public OrcIndex readRowIndex(StripeInformation stripe,
-                                 OrcProto.StripeFooter footer,
-                                 boolean[] included,
-                                 OrcProto.RowIndex[] indexes,
-                                 boolean[] sargColumns,
-                                 OrcProto.BloomFilterIndex[] bloomFilterIndices
-                                 ) throws IOException {
-      if (file == null) {
-        open();
-      }
-      if (footer == null) {
-        footer = readStripeFooter(stripe);
-      }
-      if (indexes == null) {
-        indexes = new OrcProto.RowIndex[typeCount];
-      }
-      if (bloomFilterIndices == null) {
-        bloomFilterIndices = new OrcProto.BloomFilterIndex[typeCount];
-      }
-      long offset = stripe.getOffset();
-      List<OrcProto.Stream> streams = footer.getStreamsList();
-      for (int i = 0; i < streams.size(); i++) {
-        OrcProto.Stream stream = streams.get(i);
-        OrcProto.Stream nextStream = null;
-        if (i < streams.size() - 1) {
-          nextStream = streams.get(i+1);
-        }
-        int col = stream.getColumn();
-        int len = (int) stream.getLength();
-        // row index stream and bloom filter are interlaced, check if the sarg column contains bloom
-        // filter and combine the io to read row index and bloom filters for that column together
-        if (stream.hasKind() && (stream.getKind() == OrcProto.Stream.Kind.ROW_INDEX)) {
-          boolean readBloomFilter = false;
-          if (sargColumns != null && sargColumns[col] &&
-              nextStream.getKind() == OrcProto.Stream.Kind.BLOOM_FILTER) {
-            len += nextStream.getLength();
-            i += 1;
-            readBloomFilter = true;
-          }
-          if ((included == null || included[col]) && indexes[col] == null) {
-            byte[] buffer = new byte[len];
-            file.readFully(offset, buffer, 0, buffer.length);
-            ByteBuffer bb = ByteBuffer.wrap(buffer);
-            indexes[col] = OrcProto.RowIndex.parseFrom(InStream.create("index",
-                Lists.<DiskRange>newArrayList(new BufferChunk(bb, 0)), stream.getLength(),
-                codec, bufferSize));
-            if (readBloomFilter) {
-              bb.position((int) stream.getLength());
-              bloomFilterIndices[col] = OrcProto.BloomFilterIndex.parseFrom(InStream.create(
-                  "bloom_filter", Lists.<DiskRange>newArrayList(new BufferChunk(bb, 0)),
-                  nextStream.getLength(), codec, bufferSize));
-            }
-          }
-        }
-        offset += len;
-      }
-
-      OrcIndex index = new OrcIndex(indexes, bloomFilterIndices);
-      return index;
-    }
-
-    @Override
-    public OrcProto.StripeFooter readStripeFooter(StripeInformation stripe) throws IOException {
-      if (file == null) {
-        open();
-      }
-      long offset = stripe.getOffset() + stripe.getIndexLength() + stripe.getDataLength();
-      int tailLength = (int) stripe.getFooterLength();
-
-      // read the footer
-      ByteBuffer tailBuf = ByteBuffer.allocate(tailLength);
-      file.readFully(offset, tailBuf.array(), tailBuf.arrayOffset(), tailLength);
-      return OrcProto.StripeFooter.parseFrom(InStream.createCodedInputStream("footer",
-          Lists.<DiskRange>newArrayList(new BufferChunk(tailBuf, 0)),
-          tailLength, codec, bufferSize));
-    }
-
-    @Override
-    public DiskRangeList readFileData(
-        DiskRangeList range, long baseOffset, boolean doForceDirect) throws IOException {
-      return RecordReaderUtils.readDiskRanges(file, zcr, baseOffset, range, doForceDirect);
-    }
-
-    @Override
-    public void close() throws IOException {
-      if (file != null) {
-        file.close();
-      }
-      if (pool != null) {
-        pool.clear();
-      }
-    }
-
-    @Override
-    public boolean isTrackingDiskRanges() {
-      return zcr != null;
-    }
-
-    @Override
-    public void releaseBuffer(ByteBuffer buffer) {
-      zcr.releaseBuffer(buffer);
-    }
-
-    @Override
-    public DataReader clone() {
-      return new DefaultDataReader(this);
-    }
-
-  }
-
-  public static DataReader createDefaultDataReader(DataReaderProperties properties) {
-    return new DefaultDataReader(properties);
-  }
-
-  public static boolean[] findPresentStreamsByColumn(
-      List<OrcProto.Stream> streamList, List<OrcProto.Type> types) {
-    boolean[] hasNull = new boolean[types.size()];
-    for(OrcProto.Stream stream: streamList) {
-      if (stream.hasKind() && (stream.getKind() == OrcProto.Stream.Kind.PRESENT)) {
-        hasNull[stream.getColumn()] = true;
-      }
-    }
-    return hasNull;
-  }
-
-  /**
-   * Does region A overlap region B? The end points are inclusive on both sides.
-   * @param leftA A's left point
-   * @param rightA A's right point
-   * @param leftB B's left point
-   * @param rightB B's right point
-   * @return Does region A overlap region B?
-   */
-  static boolean overlap(long leftA, long rightA, long leftB, long rightB) {
-    if (leftA <= leftB) {
-      return rightA >= leftB;
-    }
-    return rightB >= leftA;
-  }
-
-  public static void addEntireStreamToRanges(
-      long offset, long length, CreateHelper list, boolean doMergeBuffers) {
-    list.addOrMerge(offset, offset + length, doMergeBuffers, false);
-  }
-
-  public static void addRgFilteredStreamToRanges(OrcProto.Stream stream,
-      boolean[] includedRowGroups, boolean isCompressed, OrcProto.RowIndex index,
-      OrcProto.ColumnEncoding encoding, OrcProto.Type type, int compressionSize, boolean hasNull,
-      long offset, long length, CreateHelper list, boolean doMergeBuffers) {
-    for (int group = 0; group < includedRowGroups.length; ++group) {
-      if (!includedRowGroups[group]) continue;
-      int posn = getIndexPosition(
-          encoding.getKind(), type.getKind(), stream.getKind(), isCompressed, hasNull);
-      long start = index.getEntry(group).getPositions(posn);
-      final long nextGroupOffset;
-      boolean isLast = group == (includedRowGroups.length - 1);
-      nextGroupOffset = isLast ? length : index.getEntry(group + 1).getPositions(posn);
-
-      start += offset;
-      long end = offset + estimateRgEndOffset(
-          isCompressed, isLast, nextGroupOffset, length, compressionSize);
-      list.addOrMerge(start, end, doMergeBuffers, true);
-    }
-  }
-
-  public static long estimateRgEndOffset(boolean isCompressed, boolean isLast,
-      long nextGroupOffset, long streamLength, int bufferSize) {
-    // figure out the worst case last location
-    // if adjacent groups have the same compressed block offset then stretch the slop
-    // by factor of 2 to safely accommodate the next compression block.
-    // One for the current compression block and another for the next compression block.
-    long slop = isCompressed ? 2 * (OutStream.HEADER_SIZE + bufferSize) : WORST_UNCOMPRESSED_SLOP;
-    return isLast ? streamLength : Math.min(streamLength, nextGroupOffset + slop);
-  }
-
-  private static final int BYTE_STREAM_POSITIONS = 1;
-  private static final int RUN_LENGTH_BYTE_POSITIONS = BYTE_STREAM_POSITIONS + 1;
-  private static final int BITFIELD_POSITIONS = RUN_LENGTH_BYTE_POSITIONS + 1;
-  private static final int RUN_LENGTH_INT_POSITIONS = BYTE_STREAM_POSITIONS + 1;
-
-  /**
-   * Get the offset in the index positions for the column that the given
-   * stream starts.
-   * @param columnEncoding the encoding of the column
-   * @param columnType the type of the column
-   * @param streamType the kind of the stream
-   * @param isCompressed is the file compressed
-   * @param hasNulls does the column have a PRESENT stream?
-   * @return the number of positions that will be used for that stream
-   */
-  public static int getIndexPosition(OrcProto.ColumnEncoding.Kind columnEncoding,
-                              OrcProto.Type.Kind columnType,
-                              OrcProto.Stream.Kind streamType,
-                              boolean isCompressed,
-                              boolean hasNulls) {
-    if (streamType == OrcProto.Stream.Kind.PRESENT) {
-      return 0;
-    }
-    int compressionValue = isCompressed ? 1 : 0;
-    int base = hasNulls ? (BITFIELD_POSITIONS + compressionValue) : 0;
-    switch (columnType) {
-      case BOOLEAN:
-      case BYTE:
-      case SHORT:
-      case INT:
-      case LONG:
-      case FLOAT:
-      case DOUBLE:
-      case DATE:
-      case STRUCT:
-      case MAP:
-      case LIST:
-      case UNION:
-        return base;
-      case CHAR:
-      case VARCHAR:
-      case STRING:
-        if (columnEncoding == OrcProto.ColumnEncoding.Kind.DICTIONARY ||
-            columnEncoding == OrcProto.ColumnEncoding.Kind.DICTIONARY_V2) {
-          return base;
-        } else {
-          if (streamType == OrcProto.Stream.Kind.DATA) {
-            return base;
-          } else {
-            return base + BYTE_STREAM_POSITIONS + compressionValue;
-          }
-        }
-      case BINARY:
-        if (streamType == OrcProto.Stream.Kind.DATA) {
-          return base;
-        }
-        return base + BYTE_STREAM_POSITIONS + compressionValue;
-      case DECIMAL:
-        if (streamType == OrcProto.Stream.Kind.DATA) {
-          return base;
-        }
-        return base + BYTE_STREAM_POSITIONS + compressionValue;
-      case TIMESTAMP:
-        if (streamType == OrcProto.Stream.Kind.DATA) {
-          return base;
-        }
-        return base + RUN_LENGTH_INT_POSITIONS + compressionValue;
-      default:
-        throw new IllegalArgumentException("Unknown type " + columnType);
-    }
-  }
-
-  // for uncompressed streams, what is the most overlap with the following set
-  // of rows (long vint literal group).
-  static final int WORST_UNCOMPRESSED_SLOP = 2 + 8 * 512;
-
-  /**
-   * Is this stream part of a dictionary?
-   * @return is this part of a dictionary?
-   */
-  public static boolean isDictionary(OrcProto.Stream.Kind kind,
-                              OrcProto.ColumnEncoding encoding) {
-    assert kind != OrcProto.Stream.Kind.DICTIONARY_COUNT;
-    OrcProto.ColumnEncoding.Kind encodingKind = encoding.getKind();
-    return kind == OrcProto.Stream.Kind.DICTIONARY_DATA ||
-      (kind == OrcProto.Stream.Kind.LENGTH &&
-       (encodingKind == OrcProto.ColumnEncoding.Kind.DICTIONARY ||
-        encodingKind == OrcProto.ColumnEncoding.Kind.DICTIONARY_V2));
-  }
-
-  /**
-   * Build a string representation of a list of disk ranges.
-   * @param range ranges to stringify
-   * @return the resulting string
-   */
-  public static String stringifyDiskRanges(DiskRangeList range) {
-    StringBuilder buffer = new StringBuilder();
-    buffer.append("[");
-    boolean isFirst = true;
-    while (range != null) {
-      if (!isFirst) {
-        buffer.append(", {");
-      } else {
-        buffer.append("{");
-      }
-      isFirst = false;
-      buffer.append(range.toString());
-      buffer.append("}");
-      range = range.next;
-    }
-    buffer.append("]");
-    return buffer.toString();
-  }
-
-  /**
-   * Read the list of ranges from the file.
-   * @param file the file to read
-   * @param base the base of the stripe
-   * @param range the disk ranges within the stripe to read
-   * @return the bytes read for each disk range, which is the same length as
-   *    ranges
-   * @throws IOException
-   */
-  static DiskRangeList readDiskRanges(FSDataInputStream file,
-                                 ZeroCopyReaderShim zcr,
-                                 long base,
-                                 DiskRangeList range,
-                                 boolean doForceDirect) throws IOException {
-    if (range == null) return null;
-    DiskRangeList prev = range.prev;
-    if (prev == null) {
-      prev = new MutateHelper(range);
-    }
-    while (range != null) {
-      if (range.hasData()) {
-        range = range.next;
-        continue;
-      }
-      int len = (int) (range.getEnd() - range.getOffset());
-      long off = range.getOffset();
-      if (zcr != null) {
-        file.seek(base + off);
-        boolean hasReplaced = false;
-        while (len > 0) {
-          ByteBuffer partial = zcr.readBuffer(len, false);
-          BufferChunk bc = new BufferChunk(partial, off);
-          if (!hasReplaced) {
-            range.replaceSelfWith(bc);
-            hasReplaced = true;
-          } else {
-            range.insertAfter(bc);
-          }
-          range = bc;
-          int read = partial.remaining();
-          len -= read;
-          off += read;
-        }
-      } else {
-        // Don't use HDFS ByteBuffer API because it has no readFully, and is buggy and pointless.
-        byte[] buffer = new byte[len];
-        file.readFully((base + off), buffer, 0, buffer.length);
-        ByteBuffer bb = null;
-        if (doForceDirect) {
-          bb = ByteBuffer.allocateDirect(len);
-          bb.put(buffer);
-          bb.position(0);
-          bb.limit(len);
-        } else {
-          bb = ByteBuffer.wrap(buffer);
-        }
-        range = range.replaceSelfWith(new BufferChunk(bb, range.getOffset()));
-      }
-      range = range.next;
-    }
-    return prev.next;
-  }
-
-
-  static List<DiskRange> getStreamBuffers(DiskRangeList range, long offset, long length) {
-    // This assumes sorted ranges (as do many other parts of ORC code.
-    ArrayList<DiskRange> buffers = new ArrayList<DiskRange>();
-    if (length == 0) return buffers;
-    long streamEnd = offset + length;
-    boolean inRange = false;
-    while (range != null) {
-      if (!inRange) {
-        if (range.getEnd() <= offset) {
-          range = range.next;
-          continue; // Skip until we are in range.
-        }
-        inRange = true;
-        if (range.getOffset() < offset) {
-          // Partial first buffer, add a slice of it.
-          buffers.add(range.sliceAndShift(offset, Math.min(streamEnd, range.getEnd()), -offset));
-          if (range.getEnd() >= streamEnd) break; // Partial first buffer is also partial last buffer.
-          range = range.next;
-          continue;
-        }
-      } else if (range.getOffset() >= streamEnd) {
-        break;
-      }
-      if (range.getEnd() > streamEnd) {
-        // Partial last buffer (may also be the first buffer), add a slice of it.
-        buffers.add(range.sliceAndShift(range.getOffset(), streamEnd, -offset));
-        break;
-      }
-      // Buffer that belongs entirely to one stream.
-      // TODO: ideally we would want to reuse the object and remove it from the list, but we cannot
-      //       because bufferChunks is also used by clearStreams for zcr. Create a useless dup.
-      buffers.add(range.sliceAndShift(range.getOffset(), range.getEnd(), -offset));
-      if (range.getEnd() == streamEnd) break;
-      range = range.next;
-    }
-    return buffers;
-  }
-
-  static ZeroCopyReaderShim createZeroCopyShim(FSDataInputStream file,
-      CompressionCodec codec, ByteBufferAllocatorPool pool) throws IOException {
-    if ((codec == null || ((codec instanceof DirectDecompressionCodec)
-            && ((DirectDecompressionCodec) codec).isAvailable()))) {
-      /* codec is null or is available */
-      return ShimLoader.getHadoopShims().getZeroCopyReader(file, pool);
-    }
-    return null;
-  }
-
-  // this is an implementation copied from ElasticByteBufferPool in hadoop-2,
-  // which lacks a clear()/clean() operation
-  public final static class ByteBufferAllocatorPool implements ByteBufferPoolShim {
-    private static final class Key implements Comparable<Key> {
-      private final int capacity;
-      private final long insertionGeneration;
-
-      Key(int capacity, long insertionGeneration) {
-        this.capacity = capacity;
-        this.insertionGeneration = insertionGeneration;
-      }
-
-      @Override
-      public int compareTo(Key other) {
-        return ComparisonChain.start().compare(capacity, other.capacity)
-            .compare(insertionGeneration, other.insertionGeneration).result();
-      }
-
-      @Override
-      public boolean equals(Object rhs) {
-        if (rhs == null) {
-          return false;
-        }
-        try {
-          Key o = (Key) rhs;
-          return (compareTo(o) == 0);
-        } catch (ClassCastException e) {
-          return false;
-        }
-      }
-
-      @Override
-      public int hashCode() {
-        return new HashCodeBuilder().append(capacity).append(insertionGeneration)
-            .toHashCode();
-      }
-    }
-
-    private final TreeMap<Key, ByteBuffer> buffers = new TreeMap<Key, ByteBuffer>();
-
-    private final TreeMap<Key, ByteBuffer> directBuffers = new TreeMap<Key, ByteBuffer>();
-
-    private long currentGeneration = 0;
-
-    private final TreeMap<Key, ByteBuffer> getBufferTree(boolean direct) {
-      return direct ? directBuffers : buffers;
-    }
-
-    public void clear() {
-      buffers.clear();
-      directBuffers.clear();
-    }
-
-    @Override
-    public ByteBuffer getBuffer(boolean direct, int length) {
-      TreeMap<Key, ByteBuffer> tree = getBufferTree(direct);
-      Map.Entry<Key, ByteBuffer> entry = tree.ceilingEntry(new Key(length, 0));
-      if (entry == null) {
-        return direct ? ByteBuffer.allocateDirect(length) : ByteBuffer
-            .allocate(length);
-      }
-      tree.remove(entry.getKey());
-      return entry.getValue();
-    }
-
-    @Override
-    public void putBuffer(ByteBuffer buffer) {
-      TreeMap<Key, ByteBuffer> tree = getBufferTree(buffer.isDirect());
-      while (true) {
-        Key key = new Key(buffer.capacity(), currentGeneration++);
-        if (!tree.containsKey(key)) {
-          tree.put(key, buffer);
-          return;
-        }
-        // Buffers are indexed by (capacity, generation).
-        // If our key is not unique on the first try, we try again
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/ffb79509/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SchemaEvolution.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SchemaEvolution.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SchemaEvolution.java
deleted file mode 100644
index 046665b..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SchemaEvolution.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.io.orc;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.orc.TypeDescription;
-
-/**
- * Take the file types and the (optional) configuration column names/types and see if there
- * has been schema evolution.
- */
-public class SchemaEvolution {
-  private final Map<TypeDescription, TypeDescription> readerToFile;
-  private final boolean[] included;
-  private final TypeDescription readerSchema;
-  private static final Log LOG = LogFactory.getLog(SchemaEvolution.class);
-
-  public SchemaEvolution(TypeDescription readerSchema, boolean[] included) {
-    this.included = included;
-    readerToFile = null;
-    this.readerSchema = readerSchema;
-  }
-
-  public SchemaEvolution(TypeDescription fileSchema,
-                         TypeDescription readerSchema,
-                         boolean[] included) throws IOException {
-    readerToFile = new HashMap<>(readerSchema.getMaximumId() + 1);
-    this.included = included;
-    if (checkAcidSchema(fileSchema)) {
-      this.readerSchema = createEventSchema(readerSchema);
-    } else {
-      this.readerSchema = readerSchema;
-    }
-    buildMapping(fileSchema, this.readerSchema);
-  }
-
-  public TypeDescription getReaderSchema() {
-    return readerSchema;
-  }
-
-  public TypeDescription getFileType(TypeDescription readerType) {
-    TypeDescription result;
-    if (readerToFile == null) {
-      if (included == null || included[readerType.getId()]) {
-        result = readerType;
-      } else {
-        result = null;
-      }
-    } else {
-      result = readerToFile.get(readerType);
-    }
-    return result;
-  }
-
-  void buildMapping(TypeDescription fileType,
-                    TypeDescription readerType) throws IOException {
-    // if the column isn't included, don't map it
-    if (included != null && !included[readerType.getId()]) {
-      return;
-    }
-    boolean isOk = true;
-    // check the easy case first
-    if (fileType.getCategory() == readerType.getCategory()) {
-      switch (readerType.getCategory()) {
-        case BOOLEAN:
-        case BYTE:
-        case SHORT:
-        case INT:
-        case LONG:
-        case DOUBLE:
-        case FLOAT:
-        case STRING:
-        case TIMESTAMP:
-        case BINARY:
-        case DATE:
-          // these are always a match
-          break;
-        case CHAR:
-        case VARCHAR:
-          // HIVE-13648: Look at ORC data type conversion edge cases (CHAR, VARCHAR, DECIMAL)
-          isOk = fileType.getMaxLength() == readerType.getMaxLength();
-          break;
-        case DECIMAL:
-          // HIVE-13648: Look at ORC data type conversion edge cases (CHAR, VARCHAR, DECIMAL)
-          // TODO we don't enforce scale and precision checks, but probably should
-          break;
-        case UNION:
-        case MAP:
-        case LIST: {
-          // these must be an exact match
-          List<TypeDescription> fileChildren = fileType.getChildren();
-          List<TypeDescription> readerChildren = readerType.getChildren();
-          if (fileChildren.size() == readerChildren.size()) {
-            for(int i=0; i < fileChildren.size(); ++i) {
-              buildMapping(fileChildren.get(i), readerChildren.get(i));
-            }
-          } else {
-            isOk = false;
-          }
-          break;
-        }
-        case STRUCT: {
-          // allow either side to have fewer fields than the other
-          List<TypeDescription> fileChildren = fileType.getChildren();
-          List<TypeDescription> readerChildren = readerType.getChildren();
-          int jointSize = Math.min(fileChildren.size(), readerChildren.size());
-          for(int i=0; i < jointSize; ++i) {
-            buildMapping(fileChildren.get(i), readerChildren.get(i));
-          }
-          break;
-        }
-        default:
-          throw new IllegalArgumentException("Unknown type " + readerType);
-      }
-    } else {
-      /*
-       * Check for the few cases where will not convert....
-       */
-
-      isOk = ConvertTreeReaderFactory.canConvert(fileType, readerType);
-    }
-    if (isOk) {
-      readerToFile.put(readerType, fileType);
-    } else {
-      throw new IOException(
-          String.format(
-              "ORC does not support type conversion from file type %s (%d) to reader type %s (%d)",
-              fileType.toString(), fileType.getId(),
-              readerType.toString(), readerType.getId()));
-    }
-  }
-
-  private static boolean checkAcidSchema(TypeDescription type) {
-    if (type.getCategory().equals(TypeDescription.Category.STRUCT)) {
-      List<String> rootFields = type.getFieldNames();
-      if (acidEventFieldNames.equals(rootFields)) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  /**
-   * @param typeDescr
-   * @return ORC types for the ACID event based on the row's type description
-   */
-  public static TypeDescription createEventSchema(TypeDescription typeDescr) {
-    TypeDescription result = TypeDescription.createStruct()
-        .addField("operation", TypeDescription.createInt())
-        .addField("originalTransaction", TypeDescription.createLong())
-        .addField("bucket", TypeDescription.createInt())
-        .addField("rowId", TypeDescription.createLong())
-        .addField("currentTransaction", TypeDescription.createLong())
-        .addField("row", typeDescr.clone());
-    return result;
-  }
-
-  public static final List<String> acidEventFieldNames= new ArrayList<String>();
-  static {
-    acidEventFieldNames.add("operation");
-    acidEventFieldNames.add("originalTransaction");
-    acidEventFieldNames.add("bucket");
-    acidEventFieldNames.add("rowId");
-    acidEventFieldNames.add("currentTransaction");
-    acidEventFieldNames.add("row");
-  }
-}