You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by pl...@apache.org on 2023/05/15 17:43:21 UTC

[orc] branch main updated: ORC-1413 fix for ORC row level filter issue with ACID table (#1482)

This is an automated email from the ASF dual-hosted git repository.

planka pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/orc.git


The following commit(s) were added to refs/heads/main by this push:
     new 09cd71cd5 ORC-1413 fix for ORC row level filter issue with ACID table (#1482)
09cd71cd5 is described below

commit 09cd71cd50eece97facfc002f82f7ce7e19a072e
Author: Zoltan Ratkai <11...@users.noreply.github.com>
AuthorDate: Mon May 15 19:43:14 2023 +0200

    ORC-1413 fix for ORC row level filter issue with ACID table (#1482)
    
    ### What changes were proposed in this pull request?
    This PR fixes ORC row level filter with ACID table issue.
    
    ### Why are the changes needed?
    Without this Hive can not work with ORC 1.8.3. and ACID table and row level filter enabled.
    
    ### How was this patch tested?
    Unit test added.
---
 .../src/java/org/apache/orc/impl/ParserUtils.java  |  17 ++-
 .../test/org/apache/orc/TestOrcFilterContext.java  | 132 ++++++++++++++++++++-
 2 files changed, 143 insertions(+), 6 deletions(-)

diff --git a/java/core/src/java/org/apache/orc/impl/ParserUtils.java b/java/core/src/java/org/apache/orc/impl/ParserUtils.java
index 6491fb8ba..f493f078a 100644
--- a/java/core/src/java/org/apache/orc/impl/ParserUtils.java
+++ b/java/core/src/java/org/apache/orc/impl/ParserUtils.java
@@ -411,18 +411,22 @@ public class ParserUtils {
     private final ColumnVector[] result;
     private int resultIdx = 0;
 
-    ColumnFinder(TypeDescription schema, VectorizedRowBatch batch, int levels) {
+    ColumnFinder(TypeDescription schema, ColumnVector[] columnVectors, int levels) {
       if (schema.getCategory() == TypeDescription.Category.STRUCT) {
-        top = batch.cols;
+        top = columnVectors;
         result = new ColumnVector[levels];
       } else {
         result = new ColumnVector[levels + 1];
-        current = batch.cols[0];
+        current = columnVectors[0];
         top = null;
         addResult(current);
       }
     }
 
+    ColumnFinder(TypeDescription schema, VectorizedRowBatch vectorizedRowBatch, int levels) {
+      this(schema, vectorizedRowBatch.cols, levels);
+    }
+
     private void addResult(ColumnVector vector) {
       result[resultIdx] = vector;
       resultIdx += 1;
@@ -459,8 +463,11 @@ public class ParserUtils {
                                                  boolean isCaseSensitive,
                                                  VectorizedRowBatch batch) {
     List<String> names = ParserUtils.splitName(source);
-    ColumnFinder result = new ColumnFinder(schema, batch, names.size());
-    findColumn(removeAcid(schema), names, isCaseSensitive, result);
+    TypeDescription schemaToUse = removeAcid(schema);
+    ColumnVector[] columnVectors = SchemaEvolution.checkAcidSchema(schema)
+                  ? ((StructColumnVector) batch.cols[batch.cols.length - 1]).fields : batch.cols;
+    ColumnFinder result = new ColumnFinder(schemaToUse, columnVectors, names.size());
+    findColumn(schemaToUse, names, isCaseSensitive, result);
     return result.result;
   }
 
diff --git a/java/core/src/test/org/apache/orc/TestOrcFilterContext.java b/java/core/src/test/org/apache/orc/TestOrcFilterContext.java
index c38e9081d..d5b0ae135 100644
--- a/java/core/src/test/org/apache/orc/TestOrcFilterContext.java
+++ b/java/core/src/test/org/apache/orc/TestOrcFilterContext.java
@@ -25,10 +25,24 @@ import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.orc.impl.OrcFilterContextImpl;
+import org.apache.orc.impl.SchemaEvolution;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Random;
+import java.util.Arrays;
+
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -61,9 +75,22 @@ public class TestOrcFilterContext {
                                            TypeDescription.createList(TypeDescription.createChar()))
                 )
     );
+  private static Configuration configuration;
+  private static FileSystem fileSystem;
+  private static final Path workDir = new Path(System.getProperty("test.tmp.dir",
+          "target" + File.separator + "test"
+                  + File.separator + "tmp"));
+  private static final Path filePath = new Path(workDir, "orc_filter_file.orc");
+
+  private static final int RowCount = 400;
+
   private final OrcFilterContext filterContext = new OrcFilterContextImpl(schema, false)
     .setBatch(schema.createRowBatch());
-
+  TypeDescription typeDescriptionACID =
+          TypeDescription.fromString("struct<int1:int,string1:string>");
+  TypeDescription acidSchema = SchemaEvolution.createEventSchema(typeDescriptionACID);
+  private final OrcFilterContext filterContextACID = new OrcFilterContextImpl(acidSchema, true)
+          .setBatch(acidSchema.createRowBatch());
   @BeforeEach
   public void setup() {
     filterContext.reset();
@@ -225,4 +252,107 @@ public class TestOrcFilterContext {
     assertTrue(OrcFilterContext.isNull(vectorBranch, 1));
     assertTrue(OrcFilterContext.isNull(vectorBranch, 2));
   }
+  
+  @Test
+  public void testACIDTable() {
+    ColumnVector[] columnVector = filterContextACID.findColumnVector("string1");
+    assertEquals(1, columnVector.length);
+    assertTrue(columnVector[0] instanceof BytesColumnVector, "Expected a  BytesColumnVector, but found "+ columnVector[0].getClass());
+    columnVector = filterContextACID.findColumnVector("int1");
+    assertEquals(1, columnVector.length);
+    assertTrue(columnVector[0] instanceof LongColumnVector, "Expected a  LongColumnVector, but found "+ columnVector[0].getClass());
+  }
+
+  @Test
+  public void testRowFilterWithACIDTable() throws IOException {
+    createAcidORCFile();
+    readSingleRowWithFilter(new Random().nextInt(RowCount));
+    fileSystem.delete(filePath, false);
+    
+  }
+
+  private void createAcidORCFile() throws IOException {
+    configuration = new Configuration();
+    fileSystem = FileSystem.get(configuration);
+
+    try (Writer writer = OrcFile.createWriter(filePath,
+            OrcFile.writerOptions(configuration)
+                    .fileSystem(fileSystem)
+                    .overwrite(true)
+                    .rowIndexStride(8192)
+                    .setSchema(acidSchema))) {
+
+      Random random = new Random(1024);
+      VectorizedRowBatch vectorizedRowBatch = acidSchema.createRowBatch();
+      for (int rowId = 0; rowId < RowCount; rowId++) {
+        long v = random.nextLong();
+        populateColumnValues(acidSchema, vectorizedRowBatch.cols,vectorizedRowBatch.size, v);
+        // Populate the rowId
+        ((LongColumnVector) vectorizedRowBatch.cols[3]).vector[vectorizedRowBatch.size] = rowId;
+        StructColumnVector row = (StructColumnVector) vectorizedRowBatch.cols[5];
+        ((LongColumnVector) row.fields[0]).vector[vectorizedRowBatch.size] = rowId;
+        vectorizedRowBatch.size += 1;
+        if (vectorizedRowBatch.size == vectorizedRowBatch.getMaxSize()) {
+          writer.addRowBatch(vectorizedRowBatch);
+          vectorizedRowBatch.reset();
+        }
+      }
+      if (vectorizedRowBatch.size > 0) {
+        writer.addRowBatch(vectorizedRowBatch);
+        vectorizedRowBatch.reset();
+      }
+    }
+  }
+  
+  private void populateColumnValues(TypeDescription typeDescription, ColumnVector[] columnVectors, int index, long value) {
+    for (int columnId = 0; columnId < typeDescription.getChildren().size() ; columnId++) {
+      switch (typeDescription.getChildren().get(columnId).getCategory()) {
+        case INT:
+          ((LongColumnVector)columnVectors[columnId]).vector[index] = value;
+          break;
+        case LONG:
+          ((LongColumnVector)columnVectors[columnId]).vector[index] = value;
+          break;
+        case STRING:
+          ((BytesColumnVector) columnVectors[columnId]).setVal(index,
+                  ("String-"+ index).getBytes(StandardCharsets.UTF_8));
+          break;
+        case STRUCT:
+          populateColumnValues(typeDescription.getChildren().get(columnId), ((StructColumnVector)columnVectors[columnId]).fields, index, value);
+          break;           
+        default:
+          throw new IllegalArgumentException();
+      }
+    }
+  }
+  
+  private void readSingleRowWithFilter(int id) throws IOException {
+    Reader reader = OrcFile.createReader(filePath, OrcFile.readerOptions(configuration).filesystem(fileSystem));
+    SearchArgument searchArgument = SearchArgumentFactory.newBuilder()
+            .in("int1", PredicateLeaf.Type.LONG, new Long(id))
+            .build();
+    Reader.Options readerOptions = reader.options()
+            .searchArgument(searchArgument, new String[] {"int1"})
+            .useSelected(true)
+            .allowSARGToFilter(true);
+    VectorizedRowBatch vectorizedRowBatch = acidSchema.createRowBatch();
+    long rowCount = 0;
+    try (RecordReader recordReader = reader.rows(readerOptions)) {
+      assertTrue(recordReader.nextBatch(vectorizedRowBatch));
+      rowCount += vectorizedRowBatch.size;
+      assertEquals(6, vectorizedRowBatch.cols.length);
+      assertTrue(vectorizedRowBatch.cols[5] instanceof StructColumnVector);
+      assertTrue(((StructColumnVector) vectorizedRowBatch.cols[5]).fields[0] instanceof LongColumnVector);
+      assertTrue(((StructColumnVector) vectorizedRowBatch.cols[5]).fields[1] instanceof BytesColumnVector);
+      assertEquals(id, ((LongColumnVector) ((StructColumnVector) vectorizedRowBatch.cols[5]).fields[0]).vector[vectorizedRowBatch.selected[0]]);
+      checkStringColumn(id, vectorizedRowBatch);
+      assertFalse(recordReader.nextBatch(vectorizedRowBatch));
+    }
+    assertEquals(1, rowCount);
+  }
+
+  private static void checkStringColumn(int id, VectorizedRowBatch vectorizedRowBatch) {
+    BytesColumnVector bytesColumnVector = (BytesColumnVector) ((StructColumnVector) vectorizedRowBatch.cols[5]).fields[1];
+    assertEquals("String-"+ id, bytesColumnVector.toString(id));
+  }
 }