You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by om...@apache.org on 2021/01/26 19:34:51 UTC

[orc] branch master updated: ORC-741: Schema Evolution missing column not handled in filters.

This is an automated email from the ASF dual-hosted git repository.

omalley pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/orc.git


The following commit(s) were added to refs/heads/master by this push:
     new 1c41def  ORC-741: Schema Evolution missing column not handled in filters.
1c41def is described below

commit 1c41def3de61f833df8515cf4d613ecef3fc6b4b
Author: Pavan Lanka <pl...@apple.com>
AuthorDate: Fri Jan 22 15:05:04 2021 -0800

    ORC-741: Schema Evolution missing column not handled in filters.
    
    Fixes #634
    
    Signed-off-by: Owen O'Malley <om...@apache.org>
---
 .../java/org/apache/orc/impl/RecordReaderImpl.java | 14 ++---
 .../test/org/apache/orc/TestRowFilteringSkip.java  | 68 ++++++++++++++++++++++
 .../apache/orc/mapred/TestOrcFileEvolution.java    | 42 +++++++++++--
 3 files changed, 112 insertions(+), 12 deletions(-)

diff --git a/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java b/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
index 9b6d563..508ca65 100644
--- a/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
@@ -100,7 +100,8 @@ public class RecordReaderImpl implements RecordReader {
    *
    * @param evolution the mapping from reader to file schema
    * @param columnName  the fully qualified column name to look for
-   * @return the file column number or -1 if the column wasn't found
+   * @return the file column number or -1 if the column wasn't found in the file schema
+   * @throws IllegalArgumentException if the column was not found in the reader schema
    */
   static int findColumns(SchemaEvolution evolution,
                          String columnName) {
@@ -110,10 +111,9 @@ public class RecordReaderImpl implements RecordReader {
       TypeDescription fileColumn = evolution.getFileType(readerColumn);
       return fileColumn == null ? -1 : fileColumn.getId();
     } catch (IllegalArgumentException e) {
-      if (LOG.isDebugEnabled()){
-        LOG.debug("{}", e.getMessage());
-      }
-      return -1;
+      throw new IllegalArgumentException("Filter could not find column with name: " +
+                                         columnName + " on " + evolution.getReaderBaseSchema(),
+                                         e);
     }
   }
 
@@ -231,11 +231,9 @@ public class RecordReaderImpl implements RecordReader {
     if (options.getPreFilterColumnNames() != null) {
       for (String colName : options.getPreFilterColumnNames()) {
         int expandColId = findColumns(evolution, colName);
+        // If the column is not present in the file then this can be ignored from read.
         if (expandColId != -1) {
           filterColIds.add(expandColId);
-        } else {
-          throw new IllegalArgumentException("Filter could not find column with name: " +
-              colName + " on " + evolution.getReaderBaseSchema());
         }
       }
       LOG.info("Filter Columns: " + filterColIds);
diff --git a/java/core/src/test/org/apache/orc/TestRowFilteringSkip.java b/java/core/src/test/org/apache/orc/TestRowFilteringSkip.java
index 0be05b3..10d5594 100644
--- a/java/core/src/test/org/apache/orc/TestRowFilteringSkip.java
+++ b/java/core/src/test/org/apache/orc/TestRowFilteringSkip.java
@@ -1237,6 +1237,74 @@ public class TestRowFilteringSkip {
   }
 
   @Test
+  public void testSchemaEvolutionMissingColumn() throws Exception {
+    // Set the row stride to a multiple of the batch size
+    final int INDEX_STRIDE = 16 * ColumnBatchRows;
+    final int NUM_BATCHES = 10;
+
+    TypeDescription fileSchema = TypeDescription.createStruct()
+      .addField("int1", TypeDescription.createInt())
+      .addField("ts2", TypeDescription.createTimestamp());
+
+    try (Writer writer = OrcFile.createWriter(testFilePath,
+                                              OrcFile.writerOptions(conf)
+                                                .setSchema(fileSchema)
+                                                .rowIndexStride(INDEX_STRIDE))) {
+      VectorizedRowBatch batch = fileSchema.createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      TimestampColumnVector col2 = (TimestampColumnVector) batch.cols[1];
+      for (int b=0; b < NUM_BATCHES; ++b) {
+        batch.reset();
+        batch.size = ColumnBatchRows;
+        for (int row = 0; row < batch.size; row++) {
+          col1.vector[row] = row;
+          if ((row % 2) == 0)
+            col2.set(row, Timestamp.valueOf((1900+row)+"-04-01 12:34:56.9"));
+          else {
+            col2.isNull[row] = true;
+            col2.set(row, null);
+          }
+        }
+        col1.isRepeating = true;
+        col1.noNulls = false;
+        col2.noNulls = false;
+        writer.addRowBatch(batch);
+      }
+    }
+
+    TypeDescription readSchema = fileSchema
+      .clone()
+      .addField("missing", TypeDescription.createInt());
+    Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf));
+
+    try (RecordReaderImpl rows = (RecordReaderImpl) reader.rows(
+      reader.options()
+        .schema(readSchema)
+        .setRowFilter(new String[]{"missing"}, TestRowFilteringSkip::notNullFilterMissing))) {
+      VectorizedRowBatch batch = readSchema.createRowBatchV2();
+
+      long rowCount = 0;
+      while (rows.nextBatch(batch)) {
+        // All rows are selected as NullTreeReader does not support filters
+        Assert.assertFalse(batch.selectedInUse);
+        rowCount += batch.size;
+      }
+      Assert.assertEquals(reader.getNumberOfRows(), rowCount);
+    }
+  }
+
+  private static void notNullFilterMissing(VectorizedRowBatch batch) {
+    int selIdx = 0;
+    for (int i = 0; i < batch.size; i++) {
+      if (!batch.cols[2].isNull[i]) {
+        batch.selected[selIdx++] = i;
+      }
+    }
+    batch.selectedInUse = true;
+    batch.size = selIdx;
+  }
+
+  @Test
   public void testcustomFileTimestampRoundRobbinRowFilterCallback() throws Exception {
     testFilePath = new Path(getClass().getClassLoader().
         getSystemResource("orc_split_elim.orc").getPath());
diff --git a/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcFileEvolution.java b/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcFileEvolution.java
index 7e907b7..0b5e037 100644
--- a/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcFileEvolution.java
+++ b/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcFileEvolution.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.mapred.Reporter;
 import org.apache.orc.*;
 import org.apache.orc.TypeDescription.Category;
 import org.apache.orc.impl.SchemaEvolution;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -233,9 +234,22 @@ public class TestOrcFileEvolution {
   }
 
   @Test
+  public void testMissingColumnFromReaderSchema() {
+    // Expect failure if the column is missing from the reader schema, as column a that is added
+    // by the SArg is missing from the reader schema
+    Assert.assertThrows("Field a not found in", IllegalArgumentException.class,
+                        () -> checkEvolution("struct<b:int,c:string>",
+                                       "struct<b:int,c:string>",
+                                       struct(1, "foo"),
+                                       struct(1, "foo", null),
+                                       true, true, false));
+
+  }
+
+  @Test
   public void testPreHive4243CheckEqual() {
     // Expect success on equal schemas
-    checkEvolution("struct<_col0:int,_col1:string>",
+    checkEvolutionPosn("struct<_col0:int,_col1:string>",
                    "struct<_col0:int,_col1:string>",
                    struct(1, "foo"),
                    struct(1, "foo", null), false, addSarg, false);
@@ -245,7 +259,7 @@ public class TestOrcFileEvolution {
   public void testPreHive4243Check() {
     // Expect exception on strict compatibility check
     thrown.expectMessage("HIVE-4243");
-    checkEvolution("struct<_col0:int,_col1:string>",
+    checkEvolutionPosn("struct<_col0:int,_col1:string>",
                    "struct<_col0:int,_col1:string,_col2:double>",
                    struct(1, "foo"),
                    struct(1, "foo", null), false, addSarg, false);
@@ -253,7 +267,7 @@ public class TestOrcFileEvolution {
 
   @Test
   public void testPreHive4243AddColumn() {
-    checkEvolution("struct<_col0:int,_col1:string>",
+    checkEvolutionPosn("struct<_col0:int,_col1:string>",
                    "struct<_col0:int,_col1:string,_col2:double>",
                    struct(1, "foo"),
                    struct(1, "foo", null), true, addSarg, false);
@@ -263,7 +277,7 @@ public class TestOrcFileEvolution {
   public void testPreHive4243AddColumnMiddle() {
     // Expect exception on type mismatch
     thrown.expect(SchemaEvolution.IllegalEvolutionException.class);
-    checkEvolution("struct<_col0:int,_col1:double>",
+    checkEvolutionPosn("struct<_col0:int,_col1:double>",
                    "struct<_col0:int,_col1:date,_col2:double>",
                    struct(1, 1.0),
                    null, true, addSarg, false);
@@ -312,6 +326,26 @@ public class TestOrcFileEvolution {
         false, addSarg, true);
   }
 
+  private void checkEvolutionPosn(String writerType, String readerType,
+                                  Object inputRow, Object expectedOutput,
+                                  boolean tolerateSchema, boolean addSarg,
+                                  boolean positional) {
+    SearchArgument sArg = null;
+    String[] sCols = null;
+    if (addSarg) {
+      sArg = SearchArgumentFactory
+        .newBuilder()
+        .lessThan("_col0", PredicateLeaf.Type.LONG, 10L)
+        .build();
+      sCols = new String[]{null, "_col0", null};
+    }
+
+    checkEvolution(writerType, readerType,
+                   inputRow, expectedOutput,
+                   tolerateSchema,
+                   sArg, sCols, positional);
+  }
+
   private void checkEvolution(String writerType, String readerType,
                               Object inputRow, Object expectedOutput,
                               boolean tolerateSchema, boolean addSarg,