You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by om...@apache.org on 2016/08/01 23:48:19 UTC

[2/2] orc git commit: HIVE-14310. ORC schema evolutions should not completely disable PPD (Prasanth Jayachandran reviewd by Owen O'Malley)

HIVE-14310. ORC schema evolutions should not completely disable PPD
 (Prasanth Jayachandran reviewd by Owen O'Malley)

Signed-off-by: Owen O'Malley <om...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/orc/repo
Commit: http://git-wip-us.apache.org/repos/asf/orc/commit/14b818ad
Tree: http://git-wip-us.apache.org/repos/asf/orc/tree/14b818ad
Diff: http://git-wip-us.apache.org/repos/asf/orc/diff/14b818ad

Branch: refs/heads/master
Commit: 14b818adc9ee2cbb04aef77490cdd2f9cb5c45da
Parents: 90a0cf8
Author: Owen O'Malley <om...@apache.org>
Authored: Mon Aug 1 16:31:13 2016 -0700
Committer: Owen O'Malley <om...@apache.org>
Committed: Mon Aug 1 16:40:29 2016 -0700

----------------------------------------------------------------------
 .../orc/impl/ConvertTreeReaderFactory.java      |   7 +-
 .../org/apache/orc/impl/RecordReaderImpl.java   |  32 +-
 .../org/apache/orc/impl/SchemaEvolution.java    | 150 +++++-
 .../apache/orc/impl/TestSchemaEvolution.java    | 469 +++++++++++++++++++
 4 files changed, 610 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/orc/blob/14b818ad/java/core/src/java/org/apache/orc/impl/ConvertTreeReaderFactory.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/ConvertTreeReaderFactory.java b/java/core/src/java/org/apache/orc/impl/ConvertTreeReaderFactory.java
index 87f379f..9290f5c 100644
--- a/java/core/src/java/org/apache/orc/impl/ConvertTreeReaderFactory.java
+++ b/java/core/src/java/org/apache/orc/impl/ConvertTreeReaderFactory.java
@@ -1349,8 +1349,6 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
 
     private DecimalTreeReader decimalTreeReader;
 
-    private final TypeDescription fileType;
-    private final TypeDescription readerType;
     private DecimalColumnVector fileDecimalColVector;
     private int filePrecision;
     private int fileScale;
@@ -1361,10 +1359,8 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
     DecimalFromDecimalTreeReader(int columnId, TypeDescription fileType, TypeDescription readerType)
         throws IOException {
       super(columnId);
-      this.fileType = fileType;
       filePrecision = fileType.getPrecision();
       fileScale = fileType.getScale();
-      this.readerType = readerType;
       readerPrecision = readerType.getPrecision();
       readerScale = readerType.getScale();
       decimalTreeReader = new DecimalTreeReader(columnId, filePrecision, fileScale);
@@ -2791,8 +2787,7 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
     }
   }
 
-  public static boolean canConvert(TypeDescription fileType, TypeDescription readerType)
-    throws IOException {
+  public static boolean canConvert(TypeDescription fileType, TypeDescription readerType) {
 
     Category readerTypeCategory = readerType.getCategory();
 

http://git-wip-us.apache.org/repos/asf/orc/blob/14b818ad/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java b/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
index a052ca5..7aa6e71 100644
--- a/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
@@ -44,8 +44,6 @@ import org.apache.orc.StripeInformation;
 import org.apache.orc.TimestampColumnStatistics;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.io.DiskRange;
 import org.apache.hadoop.hive.common.io.DiskRangeList;
@@ -164,21 +162,11 @@ public class RecordReaderImpl implements RecordReader {
     this.bufferSize = fileReader.bufferSize;
     this.rowIndexStride = fileReader.rowIndexStride;
     SearchArgument sarg = options.getSearchArgument();
-    // We want to use the sarg for predicate evaluation but we have data type
-    // conversion (i.e Schema Evolution), so we currently ignore it.
-    if (sarg != null && rowIndexStride != 0 && !evolution.hasConversion()) {
-      sargApp = new SargApplier(
-          sarg, options.getColumnNames(), rowIndexStride, types,
-          included.length);
+    if (sarg != null && rowIndexStride != 0) {
+      sargApp = new SargApplier(sarg, options.getColumnNames(), rowIndexStride,
+          included.length, evolution);
     } else {
       sargApp = null;
-      if (evolution.hasConversion()) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Skipping stripe elimination for {} since the" +
-                    " schema has data type conversion",
-                    fileReader.path);
-        }
-      }
     }
     long rows = 0;
     long skippedRows = 0;
@@ -720,9 +708,10 @@ public class RecordReaderImpl implements RecordReader {
     private final long rowIndexStride;
     // same as the above array, but indices are set to true
     private final boolean[] sargColumns;
+    private SchemaEvolution evolution;
 
     public SargApplier(SearchArgument sarg, String[] columnNames, long rowIndexStride,
-        List<OrcProto.Type> types, int includedCount) {
+        int includedCount, final SchemaEvolution evolution) {
       this.sarg = sarg;
       sargLeaves = sarg.getLeaves();
       filterColumns = mapSargColumnsToOrcInternalColIdx(sargLeaves, columnNames, 0);
@@ -735,6 +724,7 @@ public class RecordReaderImpl implements RecordReader {
           sargColumns[i] = true;
         }
       }
+      this.evolution = evolution;
     }
 
     /**
@@ -764,10 +754,14 @@ public class RecordReaderImpl implements RecordReader {
             }
             OrcProto.ColumnStatistics stats = entry.getStatistics();
             OrcProto.BloomFilter bf = null;
-            if (bloomFilterIndices != null && bloomFilterIndices[filterColumns[pred]] != null) {
-              bf = bloomFilterIndices[filterColumns[pred]].getBloomFilter(rowGroup);
+            if (bloomFilterIndices != null && bloomFilterIndices[columnIx] != null) {
+              bf = bloomFilterIndices[columnIx].getBloomFilter(rowGroup);
+            }
+            if (evolution != null && evolution.isPPDSafeConversion(columnIx)) {
+              leafValues[pred] = evaluatePredicateProto(stats, sargLeaves.get(pred), bf);
+            } else {
+              leafValues[pred] = TruthValue.YES_NO_NULL;
             }
-            leafValues[pred] = evaluatePredicateProto(stats, sargLeaves.get(pred), bf);
             if (LOG.isTraceEnabled()) {
               LOG.trace("Stats = " + stats);
               LOG.trace("Setting " + sargLeaves.get(pred) + " to " + leafValues[pred]);

http://git-wip-us.apache.org/repos/asf/orc/blob/14b818ad/java/core/src/java/org/apache/orc/impl/SchemaEvolution.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/SchemaEvolution.java b/java/core/src/java/org/apache/orc/impl/SchemaEvolution.java
index 849ce0f..fd5c7c1 100644
--- a/java/core/src/java/org/apache/orc/impl/SchemaEvolution.java
+++ b/java/core/src/java/org/apache/orc/impl/SchemaEvolution.java
@@ -18,7 +18,6 @@
 
 package org.apache.orc.impl;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -32,35 +31,41 @@ import org.slf4j.LoggerFactory;
  * see if there has been schema evolution.
  */
 public class SchemaEvolution {
+  // indexed by reader column id
   private final TypeDescription[] readerFileTypes;
+  // indexed by reader column id
   private final boolean[] included;
+  private final TypeDescription fileSchema;
   private final TypeDescription readerSchema;
   private boolean hasConversion = false;
-  private static final Logger LOG =
-    LoggerFactory.getLogger(SchemaEvolution.class);
+  // indexed by reader column id
+  private final boolean[] ppdSafeConversion;
 
-  public SchemaEvolution(TypeDescription readerSchema, boolean[] included) {
-    this.included = (included == null ? null : Arrays.copyOf(included,
-                                                             included.length));
-    this.readerSchema = readerSchema;
-
-    readerFileTypes = new TypeDescription[this.readerSchema.getMaximumId()+ 1];
-    buildSameSchemaFileTypesArray();
+  public SchemaEvolution(TypeDescription fileSchema, boolean[] includedCols) {
+    this(fileSchema, null, includedCols);
   }
 
   public SchemaEvolution(TypeDescription fileSchema,
                          TypeDescription readerSchema,
-                         boolean[] included) throws IOException {
-    this.included = (included == null ? null : Arrays.copyOf(included,
-                                                             included.length));
-    if (checkAcidSchema(fileSchema)) {
-      this.readerSchema = createEventSchema(readerSchema);
+                         boolean[] includedCols) {
+    this.included = includedCols == null ? null :
+      Arrays.copyOf(includedCols, includedCols.length);
+    this.hasConversion = false;
+    this.fileSchema = fileSchema;
+    if (readerSchema != null) {
+      if (checkAcidSchema(fileSchema)) {
+        this.readerSchema = createEventSchema(readerSchema);
+      } else {
+        this.readerSchema = readerSchema;
+      }
+      this.readerFileTypes = new TypeDescription[this.readerSchema.getMaximumId() + 1];
+      buildConversionFileTypesArray(fileSchema, this.readerSchema);
     } else {
-      this.readerSchema = readerSchema;
+      this.readerSchema = fileSchema;
+      this.readerFileTypes = new TypeDescription[this.readerSchema.getMaximumId() + 1];
+      buildSameSchemaFileTypesArray();
     }
-
-    readerFileTypes = new TypeDescription[this.readerSchema.getMaximumId()+ 1];
-    buildConversionFileTypesArray(fileSchema, this.readerSchema);
+    this.ppdSafeConversion = populatePpdSafeConversion();
   }
 
   public TypeDescription getReaderSchema() {
@@ -81,16 +86,114 @@ public class SchemaEvolution {
 
   /**
    * Get the file type by reader type id.
-   * @param readerType
+   * @param id reader column id
    * @return
    */
   public TypeDescription getFileType(int id) {
     return readerFileTypes[id];
   }
 
+  /**
+   * Check if column is safe for ppd evaluation
+   * @param colId reader column id
+   * @return true if the specified column is safe for ppd evaluation else false
+   */
+  public boolean isPPDSafeConversion(final int colId) {
+    if (hasConversion()) {
+      if (colId < 0 || colId >= ppdSafeConversion.length) {
+        return false;
+      }
+      return ppdSafeConversion[colId];
+    }
+
+    // when there is no schema evolution PPD is safe
+    return true;
+  }
+
+  private boolean[] populatePpdSafeConversion() {
+    if (fileSchema == null || readerSchema == null || readerFileTypes == null) {
+      return null;
+    }
+
+    boolean[] result = new boolean[readerSchema.getMaximumId() + 1];
+    boolean safePpd = validatePPDConversion(fileSchema, readerSchema);
+    result[readerSchema.getId()] = safePpd;
+    List<TypeDescription> children = readerSchema.getChildren();
+    if (children != null) {
+      for (TypeDescription child : children) {
+        TypeDescription fileType = getFileType(child.getId());
+        safePpd = validatePPDConversion(fileType, child);
+        result[child.getId()] = safePpd;
+      }
+    }
+    return result;
+  }
+
+  private boolean validatePPDConversion(final TypeDescription fileType,
+      final TypeDescription readerType) {
+    if (fileType == null) {
+      return false;
+    }
+    if (fileType.getCategory().isPrimitive()) {
+      if (fileType.getCategory().equals(readerType.getCategory())) {
+        // for decimals alone do equality check to not mess up with precision change
+        if (fileType.getCategory().equals(TypeDescription.Category.DECIMAL) &&
+            !fileType.equals(readerType)) {
+          return false;
+        }
+        return true;
+      }
+
+      // only integer and string evolutions are safe
+      // byte -> short -> int -> long
+      // string <-> char <-> varchar
+      // NOTE: Float to double evolution is not safe as floats are stored as doubles in ORC's
+      // internal index, but when doing predicate evaluation for queries like "select * from
+      // orc_float where f = 74.72" the constant on the filter is converted from string -> double
+      // so the precisions will be different and the comparison will fail.
+      // Soon, we should convert all sargs that compare equality between floats or
+      // doubles to range predicates.
+
+      // Similarly string -> char and varchar -> char and vice versa is not possible, as ORC stores
+      // char with padded spaces in its internal index.
+      switch (fileType.getCategory()) {
+        case BYTE:
+          if (readerType.getCategory().equals(TypeDescription.Category.SHORT) ||
+              readerType.getCategory().equals(TypeDescription.Category.INT) ||
+              readerType.getCategory().equals(TypeDescription.Category.LONG)) {
+            return true;
+          }
+          break;
+        case SHORT:
+          if (readerType.getCategory().equals(TypeDescription.Category.INT) ||
+              readerType.getCategory().equals(TypeDescription.Category.LONG)) {
+            return true;
+          }
+          break;
+        case INT:
+          if (readerType.getCategory().equals(TypeDescription.Category.LONG)) {
+            return true;
+          }
+          break;
+        case STRING:
+          if (readerType.getCategory().equals(TypeDescription.Category.VARCHAR)) {
+            return true;
+          }
+          break;
+        case VARCHAR:
+          if (readerType.getCategory().equals(TypeDescription.Category.STRING)) {
+            return true;
+          }
+          break;
+        default:
+          break;
+      }
+    }
+    return false;
+  }
+
   void buildConversionFileTypesArray(TypeDescription fileType,
-                                     TypeDescription readerType
-                                     ) throws IOException {
+                                     TypeDescription readerType) {
     // if the column isn't included, don't map it
     if (included != null && !included[readerType.getId()]) {
       return;
@@ -177,7 +280,8 @@ public class SchemaEvolution {
       }
       readerFileTypes[id] = fileType;
     } else {
-      throw new IOException(String.format("ORC does not support type" +
+      throw new IllegalArgumentException(String.format
+                                         ("ORC does not support type" +
                                           " conversion from file type %s" +
                                           " (%d) to reader type %s (%d)",
                                           fileType.toString(),

http://git-wip-us.apache.org/repos/asf/orc/blob/14b818ad/java/core/src/test/org/apache/orc/impl/TestSchemaEvolution.java
----------------------------------------------------------------------
diff --git a/java/core/src/test/org/apache/orc/impl/TestSchemaEvolution.java b/java/core/src/test/org/apache/orc/impl/TestSchemaEvolution.java
new file mode 100644
index 0000000..c28af94
--- /dev/null
+++ b/java/core/src/test/org/apache/orc/impl/TestSchemaEvolution.java
@@ -0,0 +1,469 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.Writer;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+public class TestSchemaEvolution {
+
+  @Rule
+  public TestName testCaseName = new TestName();
+
+  Configuration conf;
+  Path testFilePath;
+  FileSystem fs;
+  Path workDir = new Path(System.getProperty("test.tmp.dir",
+      "target" + File.separator + "test" + File.separator + "tmp"));
+
+  @Before
+  public void setup() throws Exception {
+    conf = new Configuration();
+    fs = FileSystem.getLocal(conf);
+    testFilePath = new Path(workDir, "TestOrcFile." +
+        testCaseName.getMethodName() + ".orc");
+    fs.delete(testFilePath, false);
+  }
+
+  @Test
+  public void testDataTypeConversion1() throws IOException {
+    TypeDescription fileStruct1 = TypeDescription.createStruct()
+        .addField("f1", TypeDescription.createInt())
+        .addField("f2", TypeDescription.createString())
+        .addField("f3", TypeDescription.createDecimal().withPrecision(38).withScale(10));
+    SchemaEvolution same1 = new SchemaEvolution(fileStruct1, null);
+    assertFalse(same1.hasConversion());
+    TypeDescription readerStruct1 = TypeDescription.createStruct()
+        .addField("f1", TypeDescription.createInt())
+        .addField("f2", TypeDescription.createString())
+        .addField("f3", TypeDescription.createDecimal().withPrecision(38).withScale(10));
+    SchemaEvolution both1 = new SchemaEvolution(fileStruct1, readerStruct1, null);
+    assertFalse(both1.hasConversion());
+    TypeDescription readerStruct1diff = TypeDescription.createStruct()
+        .addField("f1", TypeDescription.createLong())
+        .addField("f2", TypeDescription.createString())
+        .addField("f3", TypeDescription.createDecimal().withPrecision(38).withScale(10));
+    SchemaEvolution both1diff = new SchemaEvolution(fileStruct1, readerStruct1diff, null);
+    assertTrue(both1diff.hasConversion());
+    TypeDescription readerStruct1diffPrecision = TypeDescription.createStruct()
+        .addField("f1", TypeDescription.createInt())
+        .addField("f2", TypeDescription.createString())
+        .addField("f3", TypeDescription.createDecimal().withPrecision(12).withScale(10));
+    SchemaEvolution both1diffPrecision = new SchemaEvolution(fileStruct1, readerStruct1diffPrecision, null);
+    assertTrue(both1diffPrecision.hasConversion());
+  }
+
+  @Test
+  public void testDataTypeConversion2() throws IOException {
+    TypeDescription fileStruct2 = TypeDescription.createStruct()
+        .addField("f1", TypeDescription.createUnion()
+            .addUnionChild(TypeDescription.createByte())
+            .addUnionChild(TypeDescription.createDecimal()
+                .withPrecision(20).withScale(10)))
+        .addField("f2", TypeDescription.createStruct()
+            .addField("f3", TypeDescription.createDate())
+            .addField("f4", TypeDescription.createDouble())
+            .addField("f5", TypeDescription.createBoolean()))
+        .addField("f6", TypeDescription.createChar().withMaxLength(100));
+    SchemaEvolution same2 = new SchemaEvolution(fileStruct2, null);
+    assertFalse(same2.hasConversion());
+    TypeDescription readerStruct2 = TypeDescription.createStruct()
+        .addField("f1", TypeDescription.createUnion()
+            .addUnionChild(TypeDescription.createByte())
+            .addUnionChild(TypeDescription.createDecimal()
+                .withPrecision(20).withScale(10)))
+        .addField("f2", TypeDescription.createStruct()
+            .addField("f3", TypeDescription.createDate())
+            .addField("f4", TypeDescription.createDouble())
+            .addField("f5", TypeDescription.createBoolean()))
+        .addField("f6", TypeDescription.createChar().withMaxLength(100));
+    SchemaEvolution both2 = new SchemaEvolution(fileStruct2, readerStruct2, null);
+    assertFalse(both2.hasConversion());
+    TypeDescription readerStruct2diff = TypeDescription.createStruct()
+        .addField("f1", TypeDescription.createUnion()
+            .addUnionChild(TypeDescription.createByte())
+            .addUnionChild(TypeDescription.createDecimal()
+                .withPrecision(20).withScale(10)))
+        .addField("f2", TypeDescription.createStruct()
+            .addField("f3", TypeDescription.createDate())
+            .addField("f4", TypeDescription.createDouble())
+            .addField("f5", TypeDescription.createByte()))
+        .addField("f6", TypeDescription.createChar().withMaxLength(100));
+    SchemaEvolution both2diff = new SchemaEvolution(fileStruct2, readerStruct2diff, null);
+    assertTrue(both2diff.hasConversion());
+    TypeDescription readerStruct2diffChar = TypeDescription.createStruct()
+        .addField("f1", TypeDescription.createUnion()
+            .addUnionChild(TypeDescription.createByte())
+            .addUnionChild(TypeDescription.createDecimal()
+                .withPrecision(20).withScale(10)))
+        .addField("f2", TypeDescription.createStruct()
+            .addField("f3", TypeDescription.createDate())
+            .addField("f4", TypeDescription.createDouble())
+            .addField("f5", TypeDescription.createBoolean()))
+        .addField("f6", TypeDescription.createChar().withMaxLength(80));
+    SchemaEvolution both2diffChar = new SchemaEvolution(fileStruct2, readerStruct2diffChar, null);
+    assertTrue(both2diffChar.hasConversion());
+  }
+
+  @Test
+  public void testFloatToDoubleEvolution() throws Exception {
+    testFilePath = new Path(workDir, "TestOrcFile." +
+        testCaseName.getMethodName() + ".orc");
+    TypeDescription schema = TypeDescription.createFloat();
+    Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf).setSchema(schema).stripeSize(100000)
+            .bufferSize(10000));
+    VectorizedRowBatch batch = new VectorizedRowBatch(1, 1024);
+    DoubleColumnVector dcv = new DoubleColumnVector(1024);
+    batch.cols[0] = dcv;
+    batch.reset();
+    batch.size = 1;
+    dcv.vector[0] = 74.72f;
+    writer.addRowBatch(batch);
+    writer.close();
+
+    Reader reader = OrcFile.createReader(testFilePath,
+        OrcFile.readerOptions(conf).filesystem(fs));
+    TypeDescription schemaOnRead = TypeDescription.createDouble();
+    RecordReader rows = reader.rows(new Reader.Options().schema(schemaOnRead));
+    batch = schemaOnRead.createRowBatch();
+    rows.nextBatch(batch);
+    assertEquals(74.72, ((DoubleColumnVector) batch.cols[0]).vector[0], 0.00000000001);
+    rows.close();
+  }
+
+  @Test
+  public void testSafePpdEvaluation() throws IOException {
+    TypeDescription fileStruct1 = TypeDescription.createStruct()
+        .addField("f1", TypeDescription.createInt())
+        .addField("f2", TypeDescription.createString())
+        .addField("f3", TypeDescription.createDecimal().withPrecision(38).withScale(10));
+    SchemaEvolution same1 = new SchemaEvolution(fileStruct1, null);
+    assertTrue(same1.isPPDSafeConversion(0));
+    assertFalse(same1.hasConversion());
+    TypeDescription readerStruct1 = TypeDescription.createStruct()
+        .addField("f1", TypeDescription.createInt())
+        .addField("f2", TypeDescription.createString())
+        .addField("f3", TypeDescription.createDecimal().withPrecision(38).withScale(10));
+    SchemaEvolution both1 = new SchemaEvolution(fileStruct1, readerStruct1, null);
+    assertFalse(both1.hasConversion());
+    assertTrue(both1.isPPDSafeConversion(0));
+    assertTrue(both1.isPPDSafeConversion(1));
+    assertTrue(both1.isPPDSafeConversion(2));
+    assertTrue(both1.isPPDSafeConversion(3));
+
+    // int -> long
+    TypeDescription readerStruct1diff = TypeDescription.createStruct()
+        .addField("f1", TypeDescription.createLong())
+        .addField("f2", TypeDescription.createString())
+        .addField("f3", TypeDescription.createDecimal().withPrecision(38).withScale(10));
+    SchemaEvolution both1diff = new SchemaEvolution(fileStruct1, readerStruct1diff, null);
+    assertTrue(both1diff.hasConversion());
+    assertFalse(both1diff.isPPDSafeConversion(0));
+    assertTrue(both1diff.isPPDSafeConversion(1));
+    assertTrue(both1diff.isPPDSafeConversion(2));
+    assertTrue(both1diff.isPPDSafeConversion(3));
+
+    // decimal(38,10) -> decimal(12, 10)
+    TypeDescription readerStruct1diffPrecision = TypeDescription.createStruct()
+        .addField("f1", TypeDescription.createInt())
+        .addField("f2", TypeDescription.createString())
+        .addField("f3", TypeDescription.createDecimal().withPrecision(12).withScale(10));
+    SchemaEvolution both1diffPrecision = new SchemaEvolution(fileStruct1, readerStruct1diffPrecision,
+        new boolean[] {true, false, false, true});
+    assertTrue(both1diffPrecision.hasConversion());
+    assertFalse(both1diffPrecision.isPPDSafeConversion(0));
+    assertFalse(both1diffPrecision.isPPDSafeConversion(1)); // column not included
+    assertFalse(both1diffPrecision.isPPDSafeConversion(2)); // column not included
+    assertFalse(both1diffPrecision.isPPDSafeConversion(3));
+
+    // add columns
+    readerStruct1 = TypeDescription.createStruct()
+        .addField("f1", TypeDescription.createInt())
+        .addField("f2", TypeDescription.createString())
+        .addField("f3", TypeDescription.createDecimal().withPrecision(38).withScale(10))
+        .addField("f4", TypeDescription.createBoolean());
+    both1 = new SchemaEvolution(fileStruct1, readerStruct1, null);
+    assertTrue(both1.hasConversion());
+    assertFalse(both1.isPPDSafeConversion(0));
+    assertTrue(both1.isPPDSafeConversion(1));
+    assertTrue(both1.isPPDSafeConversion(2));
+    assertTrue(both1.isPPDSafeConversion(3));
+    assertFalse(both1.isPPDSafeConversion(4));
+  }
+
+  @Test
+  public void testSafePpdEvaluationForInts() throws IOException {
+    // byte -> short -> int -> long
+    TypeDescription fileSchema = TypeDescription.createStruct()
+        .addField("f1", TypeDescription.createByte());
+    SchemaEvolution schemaEvolution = new SchemaEvolution(fileSchema, null);
+    assertFalse(schemaEvolution.hasConversion());
+
+    // byte -> short
+    TypeDescription readerSchema = TypeDescription.createStruct()
+        .addField("f1", TypeDescription.createShort());
+    schemaEvolution = new SchemaEvolution(fileSchema, readerSchema, null);
+    assertTrue(schemaEvolution.hasConversion());
+    assertFalse(schemaEvolution.isPPDSafeConversion(0));
+    assertTrue(schemaEvolution.isPPDSafeConversion(1));
+
+    // byte -> int
+    readerSchema = TypeDescription.createStruct()
+        .addField("f1", TypeDescription.createInt());
+    schemaEvolution = new SchemaEvolution(fileSchema, readerSchema, null);
+    assertTrue(schemaEvolution.hasConversion());
+    assertFalse(schemaEvolution.isPPDSafeConversion(0));
+    assertTrue(schemaEvolution.isPPDSafeConversion(1));
+
+    // byte -> long
+    readerSchema = TypeDescription.createStruct()
+        .addField("f1", TypeDescription.createLong());
+    schemaEvolution = new SchemaEvolution(fileSchema, readerSchema, null);
+    assertTrue(schemaEvolution.hasConversion());
+    assertFalse(schemaEvolution.isPPDSafeConversion(0));
+    assertTrue(schemaEvolution.isPPDSafeConversion(1));
+
+    // short -> int -> long
+    fileSchema = TypeDescription.createStruct()
+        .addField("f1", TypeDescription.createShort());
+    schemaEvolution = new SchemaEvolution(fileSchema, null);
+    assertFalse(schemaEvolution.hasConversion());
+
+    // unsafe conversion short -> byte
+    readerSchema = TypeDescription.createStruct()
+        .addField("f1", TypeDescription.createByte());
+    schemaEvolution = new SchemaEvolution(fileSchema, readerSchema, null);
+    assertTrue(schemaEvolution.hasConversion());
+    assertFalse(schemaEvolution.isPPDSafeConversion(0));
+    assertFalse(schemaEvolution.isPPDSafeConversion(1));
+
+    // short -> int
+    readerSchema = TypeDescription.createStruct()
+        .addField("f1", TypeDescription.createInt());
+    schemaEvolution = new SchemaEvolution(fileSchema, readerSchema, null);
+    assertTrue(schemaEvolution.hasConversion());
+    assertFalse(schemaEvolution.isPPDSafeConversion(0));
+    assertTrue(schemaEvolution.isPPDSafeConversion(1));
+
+    // short -> long
+    readerSchema = TypeDescription.createStruct()
+        .addField("f1", TypeDescription.createLong());
+    schemaEvolution = new SchemaEvolution(fileSchema, readerSchema, null);
+    assertTrue(schemaEvolution.hasConversion());
+    assertFalse(schemaEvolution.isPPDSafeConversion(0));
+    assertTrue(schemaEvolution.isPPDSafeConversion(1));
+
+    // int -> long
+    fileSchema = TypeDescription.createStruct()
+        .addField("f1", TypeDescription.createInt());
+    schemaEvolution = new SchemaEvolution(fileSchema, null);
+    assertFalse(schemaEvolution.hasConversion());
+
+    // unsafe conversion int -> byte
+    readerSchema = TypeDescription.createStruct()
+        .addField("f1", TypeDescription.createByte());
+    schemaEvolution = new SchemaEvolution(fileSchema, readerSchema, null);
+    assertTrue(schemaEvolution.hasConversion());
+    assertFalse(schemaEvolution.isPPDSafeConversion(0));
+    assertFalse(schemaEvolution.isPPDSafeConversion(1));
+
+    // unsafe conversion int -> short
+    readerSchema = TypeDescription.createStruct()
+        .addField("f1", TypeDescription.createShort());
+    schemaEvolution = new SchemaEvolution(fileSchema, readerSchema, null);
+    assertTrue(schemaEvolution.hasConversion());
+    assertFalse(schemaEvolution.isPPDSafeConversion(0));
+    assertFalse(schemaEvolution.isPPDSafeConversion(1));
+
+    // int -> long
+    readerSchema = TypeDescription.createStruct()
+        .addField("f1", TypeDescription.createLong());
+    schemaEvolution = new SchemaEvolution(fileSchema, readerSchema, null);
+    assertTrue(schemaEvolution.hasConversion());
+    assertFalse(schemaEvolution.isPPDSafeConversion(0));
+    assertTrue(schemaEvolution.isPPDSafeConversion(1));
+
+    // long
+    fileSchema = TypeDescription.createStruct()
+        .addField("f1", TypeDescription.createLong());
+    schemaEvolution = new SchemaEvolution(fileSchema, null);
+    assertTrue(schemaEvolution.isPPDSafeConversion(0));
+    assertFalse(schemaEvolution.hasConversion());
+
+    // unsafe conversion long -> byte
+    readerSchema = TypeDescription.createStruct()
+        .addField("f1", TypeDescription.createByte());
+    schemaEvolution = new SchemaEvolution(fileSchema, readerSchema, null);
+    assertTrue(schemaEvolution.hasConversion());
+    assertFalse(schemaEvolution.isPPDSafeConversion(0));
+    assertFalse(schemaEvolution.isPPDSafeConversion(1));
+
+    // unsafe conversion long -> short
+    readerSchema = TypeDescription.createStruct()
+        .addField("f1", TypeDescription.createShort());
+    schemaEvolution = new SchemaEvolution(fileSchema, readerSchema, null);
+    assertTrue(schemaEvolution.hasConversion());
+    assertFalse(schemaEvolution.isPPDSafeConversion(0));
+    assertFalse(schemaEvolution.isPPDSafeConversion(1));
+
+    // unsafe conversion long -> int
+    readerSchema = TypeDescription.createStruct()
+        .addField("f1", TypeDescription.createInt());
+    schemaEvolution = new SchemaEvolution(fileSchema, readerSchema, null);
+    assertTrue(schemaEvolution.hasConversion());
+    assertFalse(schemaEvolution.isPPDSafeConversion(0));
+    assertFalse(schemaEvolution.isPPDSafeConversion(1));
+
+    // invalid
+    readerSchema = TypeDescription.createStruct()
+        .addField("f1", TypeDescription.createString());
+    schemaEvolution = new SchemaEvolution(fileSchema, readerSchema, null);
+    assertTrue(schemaEvolution.hasConversion());
+    assertFalse(schemaEvolution.isPPDSafeConversion(0));
+    assertFalse(schemaEvolution.isPPDSafeConversion(1));
+
+    // invalid
+    readerSchema = TypeDescription.createStruct()
+        .addField("f1", TypeDescription.createFloat());
+    schemaEvolution = new SchemaEvolution(fileSchema, readerSchema, null);
+    assertTrue(schemaEvolution.hasConversion());
+    assertFalse(schemaEvolution.isPPDSafeConversion(0));
+    assertFalse(schemaEvolution.isPPDSafeConversion(1));
+
+    // invalid
+    readerSchema = TypeDescription.createStruct()
+        .addField("f1", TypeDescription.createTimestamp());
+    schemaEvolution = new SchemaEvolution(fileSchema, readerSchema, null);
+    assertTrue(schemaEvolution.hasConversion());
+    assertFalse(schemaEvolution.isPPDSafeConversion(0));
+    assertFalse(schemaEvolution.isPPDSafeConversion(1));
+  }
+
+  @Test
+  public void testSafePpdEvaluationForStrings() throws IOException {
+    TypeDescription fileSchema = TypeDescription.createStruct()
+        .addField("f1", TypeDescription.createString());
+    SchemaEvolution schemaEvolution = new SchemaEvolution(fileSchema, null);
+    assertTrue(schemaEvolution.isPPDSafeConversion(0));
+    assertFalse(schemaEvolution.hasConversion());
+
+    // string -> char
+    TypeDescription readerSchema = TypeDescription.createStruct()
+        .addField("f1", TypeDescription.createChar());
+    schemaEvolution = new SchemaEvolution(fileSchema, readerSchema, null);
+    assertTrue(schemaEvolution.hasConversion());
+    assertFalse(schemaEvolution.isPPDSafeConversion(0));
+    assertFalse(schemaEvolution.isPPDSafeConversion(1));
+
+    // string -> varchar
+    readerSchema = TypeDescription.createStruct()
+        .addField("f1", TypeDescription.createVarchar());
+    schemaEvolution = new SchemaEvolution(fileSchema, readerSchema, null);
+    assertTrue(schemaEvolution.hasConversion());
+    assertFalse(schemaEvolution.isPPDSafeConversion(0));
+    assertTrue(schemaEvolution.isPPDSafeConversion(1));
+
+    fileSchema = TypeDescription.createStruct()
+        .addField("f1", TypeDescription.createChar());
+    schemaEvolution = new SchemaEvolution(fileSchema, null);
+    assertTrue(schemaEvolution.isPPDSafeConversion(0));
+    assertFalse(schemaEvolution.hasConversion());
+
+    // char -> string
+    readerSchema = TypeDescription.createStruct()
+        .addField("f1", TypeDescription.createString());
+    schemaEvolution = new SchemaEvolution(fileSchema, readerSchema, null);
+    assertTrue(schemaEvolution.hasConversion());
+    assertFalse(schemaEvolution.isPPDSafeConversion(0));
+    assertFalse(schemaEvolution.isPPDSafeConversion(1));
+
+    // char -> varchar
+    readerSchema = TypeDescription.createStruct()
+        .addField("f1", TypeDescription.createVarchar());
+    schemaEvolution = new SchemaEvolution(fileSchema, readerSchema, null);
+    assertTrue(schemaEvolution.hasConversion());
+    assertFalse(schemaEvolution.isPPDSafeConversion(0));
+    assertFalse(schemaEvolution.isPPDSafeConversion(1));
+
+    fileSchema = TypeDescription.createStruct()
+        .addField("f1", TypeDescription.createVarchar());
+    schemaEvolution = new SchemaEvolution(fileSchema, null);
+    assertTrue(schemaEvolution.isPPDSafeConversion(0));
+    assertFalse(schemaEvolution.hasConversion());
+
+    // varchar -> string
+    readerSchema = TypeDescription.createStruct()
+        .addField("f1", TypeDescription.createString());
+    schemaEvolution = new SchemaEvolution(fileSchema, readerSchema, null);
+    assertTrue(schemaEvolution.hasConversion());
+    assertFalse(schemaEvolution.isPPDSafeConversion(0));
+    assertTrue(schemaEvolution.isPPDSafeConversion(1));
+
+    // varchar -> char
+    readerSchema = TypeDescription.createStruct()
+        .addField("f1", TypeDescription.createChar());
+    schemaEvolution = new SchemaEvolution(fileSchema, readerSchema, null);
+    assertTrue(schemaEvolution.hasConversion());
+    assertFalse(schemaEvolution.isPPDSafeConversion(0));
+    assertFalse(schemaEvolution.isPPDSafeConversion(1));
+
+    // invalid
+    readerSchema = TypeDescription.createStruct()
+        .addField("f1", TypeDescription.createDecimal());
+    schemaEvolution = new SchemaEvolution(fileSchema, readerSchema, null);
+    assertTrue(schemaEvolution.hasConversion());
+    assertFalse(schemaEvolution.isPPDSafeConversion(0));
+    assertFalse(schemaEvolution.isPPDSafeConversion(1));
+
+    // invalid
+    readerSchema = TypeDescription.createStruct()
+        .addField("f1", TypeDescription.createDate());
+    schemaEvolution = new SchemaEvolution(fileSchema, readerSchema, null);
+    assertTrue(schemaEvolution.hasConversion());
+    assertFalse(schemaEvolution.isPPDSafeConversion(0));
+    assertFalse(schemaEvolution.isPPDSafeConversion(1));
+
+    // invalid
+    readerSchema = TypeDescription.createStruct()
+        .addField("f1", TypeDescription.createInt());
+    schemaEvolution = new SchemaEvolution(fileSchema, readerSchema, null);
+    assertTrue(schemaEvolution.hasConversion());
+    assertFalse(schemaEvolution.isPPDSafeConversion(0));
+    assertFalse(schemaEvolution.isPPDSafeConversion(1));
+  }
+}