You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by aj...@apache.org on 2020/12/17 11:32:06 UTC

[carbondata] branch master updated: [CARBONDATA-4087] Handled issue with huge data (exceeding 32K records) after enabling local dictionary in Presto

This is an automated email from the ASF dual-hosted git repository.

ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new b8c3702  [CARBONDATA-4087] Handled issue with huge data (exceeding 32K records) after enabling local dictionary in Presto
b8c3702 is described below

commit b8c3702c054bf8080ed8ea3e2883e34451570d7e
Author: akkio-97 <ak...@gmail.com>
AuthorDate: Wed Dec 16 14:16:03 2020 +0530

    [CARBONDATA-4087] Handled issue with huge data (exceeding 32K records) after enabling local dictionary in Presto
    
    Why is this PR needed?
    After enabling local dictionary, this issue appears while doing SELECT on array(varchar) types, for data exceeding the batch size - 32k.
    The int array used to store the surrogate values of dictionary has a default size of 32K only. ArrayOutOfBounds exception is thrown if the page size exceeds 32k.
    
    What changes were proposed in this PR?
    If its size is not sufficient to accommodate all the string values then increase the default size to the desired page size.
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    Yes
    
    This closes #4055
---
 .../impl/LocalDictDimensionDataChunkStore.java     |  6 ++++
 .../result/vector/impl/CarbonColumnVectorImpl.java |  8 +++++
 .../carbondata/presto/server/PrestoTestUtil.scala  | 18 +++++++++++
 .../carbondata/presto/server/PrestoTestUtil.scala  | 19 ++++++++++++
 .../PrestoTestNonTransactionalTableFiles.scala     | 36 ++++++++++++++++++++++
 .../command/management/CarbonAddLoadCommand.scala  |  2 +-
 .../testsuite/addsegment/AddSegmentTestCase.scala  |  4 ++-
 7 files changed, 91 insertions(+), 2 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/LocalDictDimensionDataChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/LocalDictDimensionDataChunkStore.java
index 2c1b1b3..6e2938e 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/LocalDictDimensionDataChunkStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/LocalDictDimensionDataChunkStore.java
@@ -25,6 +25,7 @@ import org.apache.carbondata.core.datastore.chunk.store.DimensionDataChunkStore;
 import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
 import org.apache.carbondata.core.scan.result.vector.CarbonDictionary;
 import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
+import org.apache.carbondata.core.scan.result.vector.impl.CarbonColumnVectorImpl;
 import org.apache.carbondata.core.scan.result.vector.impl.directread.ColumnarVectorWrapperDirectFactory;
 import org.apache.carbondata.core.scan.result.vector.impl.directread.ConvertibleVector;
 import org.apache.carbondata.core.util.CarbonUtil;
@@ -84,6 +85,11 @@ public class LocalDictDimensionDataChunkStore implements DimensionDataChunkStore
     vector = ColumnarVectorWrapperDirectFactory
         .getDirectVectorWrapperFactory(vectorInfo, vector, invertedIndex, nullBitset,
             vectorInfo.deletedRows, false, false);
+    // this check is in case of array of string type
+    if (vectorInfo.vector.getType().isComplexType()
+        && ((CarbonColumnVectorImpl) dictionaryVector).getIntArraySize() < rowsNum) {
+      ((CarbonColumnVectorImpl) dictionaryVector).increaseIntArraySize(rowsNum);
+    }
     for (int i = 0; i < rowsNum; i++) {
       int surrogate = CarbonUtil.getSurrogateInternal(data, i * columnValueSize, columnValueSize);
       // If complex string primitive value is null then surrogate will be unequal to
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java
index 4952224..cf5c3c7 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java
@@ -509,6 +509,14 @@ public class CarbonColumnVectorImpl implements CarbonColumnVector {
     return lengths;
   }
 
+  public int getIntArraySize() {
+    return ints.length;
+  }
+
+  public void increaseIntArraySize(int size) {
+    this.ints = new int[size];
+  }
+
   public int[] getOffsets() {
     return offsets;
   }
diff --git a/integration/presto/src/test/prestodb/org/apache/carbondata/presto/server/PrestoTestUtil.scala b/integration/presto/src/test/prestodb/org/apache/carbondata/presto/server/PrestoTestUtil.scala
index 0413baf..646d191 100644
--- a/integration/presto/src/test/prestodb/org/apache/carbondata/presto/server/PrestoTestUtil.scala
+++ b/integration/presto/src/test/prestodb/org/apache/carbondata/presto/server/PrestoTestUtil.scala
@@ -170,4 +170,22 @@ object PrestoTestUtil {
       }
     }
   }
+
+  // this method depends on prestodb jdbc PrestoArray class
+  def validateHugeDataForArrayWithLocalDict(actualResult: List[Map[String, Any]]): Unit = {
+    assert(actualResult.size == 100 * 1000)
+    val data1 = actualResult(actualResult.size - 2)("arraystring")
+      .asInstanceOf[PrestoArray]
+      .getArray()
+      .asInstanceOf[Array[Object]]
+    val data2 = actualResult(actualResult.size - 1)("arraystring")
+      .asInstanceOf[PrestoArray]
+      .getArray()
+      .asInstanceOf[Array[Object]]
+    assert(data1.size == 3)
+    assert(data2.size == 1)
+
+    assert(data1.sameElements(Array("India", "China", "Japan")))
+    assert(data2.sameElements(Array("Korea")))
+  }
 }
diff --git a/integration/presto/src/test/prestosql/org/apache/carbondata/presto/server/PrestoTestUtil.scala b/integration/presto/src/test/prestosql/org/apache/carbondata/presto/server/PrestoTestUtil.scala
index 7d51f1f..9044933 100644
--- a/integration/presto/src/test/prestosql/org/apache/carbondata/presto/server/PrestoTestUtil.scala
+++ b/integration/presto/src/test/prestosql/org/apache/carbondata/presto/server/PrestoTestUtil.scala
@@ -170,4 +170,23 @@ object PrestoTestUtil {
       }
     }
   }
+
+    // this method depends on prestosql jdbc PrestoArray class
+    def validateHugeDataForArrayWithLocalDict(actualResult: List[Map[String, Any]]): Unit = {
+      assert(actualResult.size == 100 * 1000)
+      val data1 = actualResult(actualResult.size - 2)("arraystring")
+        .asInstanceOf[PrestoArray]
+        .getArray()
+        .asInstanceOf[Array[Object]]
+      val data2 = actualResult(actualResult.size - 1)("arraystring")
+        .asInstanceOf[PrestoArray]
+        .getArray()
+        .asInstanceOf[Array[Object]]
+      assert(data1.size == 3)
+      assert(data2.size == 1)
+
+      assert(data1.sameElements(Array("India", "China", "Japan")))
+      assert(data2.sameElements(Array("Korea")))
+    }
+
 }
diff --git a/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoTestNonTransactionalTableFiles.scala b/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoTestNonTransactionalTableFiles.scala
index 2e4f814..c56a77c 100644
--- a/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoTestNonTransactionalTableFiles.scala
+++ b/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoTestNonTransactionalTableFiles.scala
@@ -777,4 +777,40 @@ class PrestoTestNonTransactionalTableFiles
 
   }
 
+  test("test Array of varchar type with huge data enabling local dictionary") {
+    val writerPathComplex = storePath + "/sdk_output/files9"
+    import scala.collection.JavaConverters._
+    FileUtils.deleteDirectory(new File(writerPathComplex))
+    prestoServer.execute("drop table if exists sdk_output.files9")
+    prestoServer
+      .execute(
+        "create table sdk_output.files9(arrayString ARRAY(varchar)) with(format='CARBON') ")
+    val fields8 = List(new StructField("intField", DataTypes.STRING))
+    val structType8 = new Field("arrayString", "array", fields8.asJava)
+    try {
+      val builder = CarbonWriter.builder()
+      val writer =
+        builder.outputPath(writerPathComplex)
+          .uniqueIdentifier(System.nanoTime()).withBlockSize(2).enableLocalDictionary(true)
+          .withCsvInput(new Schema(Array[Field](structType8))).writtenBy("presto").build()
+
+      var i = 0
+      while (i < 50000) {
+        val array = Array[String]("India" + "\001" + "China" + "\001" + "Japan")
+        writer.write(array)
+        val array1 = Array[String]("Korea")
+        writer.write(array1)
+        i += 1
+      }
+      writer.close()
+    } catch {
+      case e: Exception =>
+        assert(false)
+    }
+    val actualResult: List[Map[String, Any]] = prestoServer
+      .executeQuery("select * from files9 ")
+    PrestoTestUtil.validateHugeDataForArrayWithLocalDict(actualResult)
+    FileUtils.deleteDirectory(new File(writerPathComplex))
+  }
+
 }
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
index 7dd0660..4defda0 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
@@ -93,7 +93,7 @@ case class CarbonAddLoadCommand(
     var givenPath = options.getOrElse(
       "path", throw new UnsupportedOperationException("PATH is mandatory"))
     // remove file separator if already present
-    if (givenPath.charAt(givenPath.length - 1) == CarbonCommonConstants.FILE_SEPARATOR) {
+    if (givenPath.charAt(givenPath.length - 1) == '/') {
       givenPath = givenPath.substring(0, givenPath.length - 1)
     }
     val inputPath = givenPath
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
index 76f6ee8..ad3b50f 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
@@ -74,10 +74,12 @@ class AddSegmentTestCase extends QueryTest with BeforeAndAfterAll {
     checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(20)))
     checkAnswer(sql("select count(empname) from addsegment1"), Seq(Row(20)))
 
+    // cannot add segment from same path file again so deleting previously added segment
+    sql("delete from table addsegment1 where segment.id in (2)")
     sql(s"alter table addsegment1 add segment options('path'='$newPathWithLineSeparator', " +
         s"'format'='carbon')")
       .collect()
-    checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(30)))
+    checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(20)))
     FileFactory.deleteAllFilesOfDir(new File(newPath))
   }