You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ak...@apache.org on 2021/10/28 06:49:16 UTC

[carbondata] branch master updated: [CARBONDATA-4194] Fixed presto read after update/delete from spark

This is an automated email from the ASF dual-hosted git repository.

akashrn5 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 07b41a5  [CARBONDATA-4194] Fixed presto read after update/delete from spark
07b41a5 is described below

commit 07b41a5382f554646f231e192cf39c8f28302a05
Author: nihal0107 <ni...@gmail.com>
AuthorDate: Fri Sep 24 17:01:29 2021 +0530

    [CARBONDATA-4194] Fixed presto read after update/delete from spark
    
    Why is this PR needed?
    After update/delete with spark on the table which contains array/struct column,
    when we are trying to read from presto then it is throwing class cast exception.
    It is because when we perform update/delete then it contains vector of type
    ColumnarVectorWrapperDirectWithDeleteDelta which we are trying to typecast to
    CarbonColumnVectorImpl and because of this it is throwing typecast exception.
    After fixing this(added check for instanceOf) it started throwing IllegalArgumentException.
    It is because:
    
    1. In case of local dictionary enable CarbondataPageSource.load is calling
    ComplexTypeStreamReader.putComplexObject before setting the correct number
    of rows(doesn't subtrat deleted rows). And it throws IllegalArgument while
    block building for child elements.
    2. position count is wrong in the case of the struct. It should subtract
    the number of deleted rows in LocalDictDimensionDataChunkStore.fillVector.
    While this is not required to be changed in the case of the array because
    datalength of the array already taking care of deleted rows in
    ColumnVectorInfo.getUpdatedPageSizeForChildVector.
    
    What changes were proposed in this PR?
    First fixed class cast exception after putting instanceOf condition in if block.
    Then subtracted the deleted row count before calling ComplexTypeStreamReader.putComplexObject
    in DirectCompressCodec.decodeAndFillVector. Also handle deleted rows in case of struct
    in LocalDictDimensionDataChunkStore.fillVector
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    No
    
    This Closes #4224
---
 .../impl/LocalDictDimensionDataChunkStore.java     | 12 ++++++++-
 .../encoding/compress/DirectCompressCodec.java     |  8 ++++--
 .../result/vector/impl/CarbonColumnVectorImpl.java | 30 +++++++++++-----------
 3 files changed, 32 insertions(+), 18 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/LocalDictDimensionDataChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/LocalDictDimensionDataChunkStore.java
index 6e2938e..08f8176 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/LocalDictDimensionDataChunkStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/LocalDictDimensionDataChunkStore.java
@@ -22,6 +22,7 @@ import java.util.BitSet;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.chunk.store.DimensionDataChunkStore;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
 import org.apache.carbondata.core.scan.result.vector.CarbonDictionary;
 import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
@@ -67,8 +68,16 @@ public class LocalDictDimensionDataChunkStore implements DimensionDataChunkStore
     int rowsNum = dataLength / columnValueSize;
     CarbonColumnVector vector = vectorInfo.vector;
     if (vector.getType().isComplexType()) {
+      if (DataTypes.isStructType(vector.getType())) {
+        int deletedRow = vectorInfo.deletedRows != null ? vectorInfo.deletedRows.cardinality() : 0;
+        rowsNum = dataLength - deletedRow;
+      } else {
+        // this is not required to be changed in the case of the array because
+        // datalength of the array already taking care of deleted rows in
+        // ColumnVectorInfo.getUpdatedPageSizeForChildVector
+        rowsNum = dataLength;
+      }
       vector = vectorInfo.vectorStack.peek();
-      rowsNum = dataLength;
       CarbonColumnVector sliceVector = vector.getColumnVector();
       // use rowsNum as positionCount in order to create dictionary block
       sliceVector.setPositionCount(rowsNum);
@@ -87,6 +96,7 @@ public class LocalDictDimensionDataChunkStore implements DimensionDataChunkStore
             vectorInfo.deletedRows, false, false);
     // this check is in case of array of string type
     if (vectorInfo.vector.getType().isComplexType()
+        && dictionaryVector instanceof CarbonColumnVectorImpl
         && ((CarbonColumnVectorImpl) dictionaryVector).getIntArraySize() < rowsNum) {
       ((CarbonColumnVectorImpl) dictionaryVector).increaseIntArraySize(rowsNum);
     }
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
index 7b1fe16..d01a54e 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
@@ -262,12 +262,16 @@ public class DirectCompressCodec implements ColumnPageCodec {
         CarbonColumnVector parentVector = vectorInfo.vectorStack.peek();
         CarbonColumnVectorImpl parentVectorImpl =
             (CarbonColumnVectorImpl) (parentVector.getColumnVector());
+        int deletedRowCount = vectorInfo.deletedRows != null ?
+            vectorInfo.deletedRows.cardinality() : 0;
         // parse the parent page data,
         // save the information about number of child in each row in parent vector
         if (DataTypes.isStructType(parentVectorImpl.getType())) {
-          parentVectorImpl.setNumberOfChildElementsForStruct(pageData, pageSize);
+          parentVectorImpl
+              .setNumberOfElementsInEachRowForStruct(pageData, pageSize - deletedRowCount);
         } else {
-          parentVectorImpl.setNumberOfChildElementsForArray(pageData, pageSize);
+          parentVectorImpl
+              .setNumberOfElementsInEachRowForArray(pageData, pageSize - deletedRowCount);
         }
         for (CarbonColumnVector childVector : parentVector.getColumnVector().getChildrenVector()) {
           // push each child
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java
index 715bd9a..1edf117 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java
@@ -79,7 +79,7 @@ public class CarbonColumnVectorImpl implements CarbonColumnVector {
 
   private boolean loaded;
 
-  private List<Integer> childElementsForEachRow;
+  private List<Integer> numberOfChildElementsInEachRow;
 
   private CarbonDictionary localDictionary;
 
@@ -121,41 +121,41 @@ public class CarbonColumnVectorImpl implements CarbonColumnVector {
   }
 
   public List<Integer> getNumberOfChildrenElementsInEachRow() {
-    return childElementsForEachRow;
+    return numberOfChildElementsInEachRow;
   }
 
-  public void setNumberOfChildElementsInEachRow(List<Integer> childrenElements) {
-    this.childElementsForEachRow = childrenElements;
+  public void setNumberOfChildElementsInEachRow(List<Integer> numberOfChildElementsInEachRow) {
+    this.numberOfChildElementsInEachRow = numberOfChildElementsInEachRow;
   }
 
-  public void setNumberOfChildElementsForArray(byte[] parentPageData, int pageSize) {
+  public void setNumberOfElementsInEachRowForArray(byte[] parentPageData, int pageSize) {
     // for complex array type, go through parent page to get the child information
     ByteBuffer childInfoBuffer = ByteBuffer.wrap(parentPageData);
-    List<Integer> childElementsForEachRow = new ArrayList<>();
+    List<Integer> numberOfArrayElementsInEachRow = new ArrayList<>();
     // Parent page array data looks like
     // number of children in each row [4 byte], Offset [4 byte],
     // number of children in each row [4 byte], Offset [4 byte]...
-    while (pageSize != childElementsForEachRow.size()) {
-      // get the number of children in current row
-      childElementsForEachRow.add(childInfoBuffer.getInt());
+    while (pageSize != numberOfArrayElementsInEachRow.size()) {
+      // get the number of array elements in current row
+      numberOfArrayElementsInEachRow.add(childInfoBuffer.getInt());
       // skip offset
       childInfoBuffer.getInt();
     }
-    setNumberOfChildElementsInEachRow(childElementsForEachRow);
+    setNumberOfChildElementsInEachRow(numberOfArrayElementsInEachRow);
   }
 
-  public void setNumberOfChildElementsForStruct(byte[] parentPageData, int pageSize) {
+  public void setNumberOfElementsInEachRowForStruct(byte[] parentPageData, int pageSize) {
     // for complex struct type, go through parent page to get the child information
     ByteBuffer childInfoBuffer = ByteBuffer.wrap(parentPageData);
-    List<Integer> childElementsForEachRow = new ArrayList<>();
+    List<Integer> numberOfStructElementsInEachRow = new ArrayList<>();
     // Parent page struct data looks like
     // number of children in each row [2 byte], number of children in each row [2 byte],
     // number of children in each row [2 byte], number of children in each row [2 byte]...
-    while (pageSize != childElementsForEachRow.size()) {
+    while (pageSize != numberOfStructElementsInEachRow.size()) {
       int elements = childInfoBuffer.getShort();
-      childElementsForEachRow.add(elements);
+      numberOfStructElementsInEachRow.add(elements);
     }
-    setNumberOfChildElementsInEachRow(childElementsForEachRow);
+    setNumberOfChildElementsInEachRow(numberOfStructElementsInEachRow);
   }
 
   public CarbonDictionary getLocalDictionary() {