You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2019/01/16 14:36:34 UTC

[carbondata] branch master updated: [CARBONDATA-3239] Fix ArrayIndexOutOfBoundsException in DataSkewRangePartitioner

This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 1a2c051  [CARBONDATA-3239] Fix ArrayIndexOutOfBoundsException in DataSkewRangePartitioner
1a2c051 is described below

commit 1a2c0513e46b317b7d0f13b0b83acdff9213e7fe
Author: QiangCai <qi...@qq.com>
AuthorDate: Thu Jan 10 15:30:31 2019 +0800

    [CARBONDATA-3239] Fix ArrayIndexOutOfBoundsException in DataSkewRangePartitioner
    
    Fix ArrayIndexOutOfBoundsException in DataSkewRangePartitioner and add test case
    
    This closes #3061
---
 .../src/test/resources/range_column/dataskew.csv   |  31 +++---
 .../dataload/TestRangeColumnDataLoad.scala         | 104 +++++++++++++++++++--
 .../apache/spark/DataSkewRangePartitioner.scala    |   3 +
 3 files changed, 119 insertions(+), 19 deletions(-)

diff --git a/integration/spark-common-test/src/test/resources/range_column/dataskew.csv b/integration/spark-common-test/src/test/resources/range_column/dataskew.csv
index fb77a5d..2a4e2c5 100644
--- a/integration/spark-common-test/src/test/resources/range_column/dataskew.csv
+++ b/integration/spark-common-test/src/test/resources/range_column/dataskew.csv
@@ -1,11 +1,20 @@
-id,name,city,age
-1,,wuhan,91
-2,,hangzhou,102
-3,,beijing,112
-4,,shenzhen,124
-5,e,shenzhen,65
-6,f,beijing,76
-7,g,hangzhou,37
-8,h,wuhan,48
-9,i,,89
-10,j,,50
\ No newline at end of file
+1,abc_1001
+2,null
+3,abc_1001
+4,null
+5,abc_1005
+6,null
+7,abc_1007
+8,null
+9,null
+10,abc_1001
+11,abc_1001
+12,null
+13,abc_1003
+14,null
+15,abc_1005
+16,null
+17,abc_1001
+18,null
+19,null
+20,null
\ No newline at end of file
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestRangeColumnDataLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestRangeColumnDataLoad.scala
index 14f11e5..4803fb2 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestRangeColumnDataLoad.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestRangeColumnDataLoad.scala
@@ -17,6 +17,10 @@
 
 package org.apache.carbondata.spark.testsuite.dataload
 
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.classTag
+
+import org.apache.spark.DataSkewRangePartitioner
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
@@ -25,8 +29,10 @@ import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException
 import org.apache.carbondata.core.datamap.Segment
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore
+import org.apache.carbondata.core.metadata.datatype.DataTypes
 import org.apache.carbondata.core.metadata.{CarbonMetadata, SegmentFileStore}
 import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.spark.load.PrimtiveOrdering
 
 class TestRangeColumnDataLoad extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll {
   var filePath: String = s"$resourcesPath/globalsort"
@@ -97,19 +103,101 @@ class TestRangeColumnDataLoad extends QueryTest with BeforeAndAfterEach with Bef
   test("range_column with data skew") {
     sql(
       """
-        | CREATE TABLE carbon_range_column4(id INT, name STRING, city STRING, age INT)
-        | STORED BY 'org.apache.carbondata.format'
-        | TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT', 'SORT_COLUMNS'='name, city')
+        | CREATE TABLE carbon_range_column4(c1 int, c2 string)
+        | STORED AS carbondata
+        | TBLPROPERTIES('sort_columns'='c1,c2', 'sort_scope'='local_sort')
       """.stripMargin)
 
     val dataSkewPath = s"$resourcesPath/range_column"
 
-    sql(s"LOAD DATA LOCAL INPATH '$dataSkewPath' INTO TABLE carbon_range_column4 " +
-        "OPTIONS('GLOBAL_SORT_PARTITIONS'='5', 'range_column'='name', " +
-        "'BAD_RECORDS_ACTION'='force')")
+    sql(
+      s"""LOAD DATA LOCAL INPATH '$dataSkewPath'
+         | INTO TABLE carbon_range_column4
+         | OPTIONS('FILEHEADER'='c1,c2', 'range_column'='c2', 'global_sort_partitions'='10')
+        """.stripMargin)
+
+    assert(getIndexFileCount("carbon_range_column4") === 9)
+    checkAnswer(sql("SELECT COUNT(*) FROM carbon_range_column4"), Seq(Row(20)))
+  }
+
+  test("DataSkewRangePartitioner.combineDataSkew") {
+    val partitioner =
+      new DataSkewRangePartitioner(1, null)(new PrimtiveOrdering(DataTypes.STRING),
+        classTag[Object])
+
+    testCombineDataSkew(
+      partitioner,
+      Array("a", "b"),
+      0)
+
+    testCombineDataSkew(
+      partitioner,
+      Array("a", "a"),
+      1,
+      Array(0),
+      Array(2))
+
+    testCombineDataSkew(
+      partitioner,
+      Array("a", "b", "c"),
+      0)
+
+    testCombineDataSkew(
+      partitioner,
+      Array("a", "b", "b", "c", "c", "c"),
+      2,
+      Array(1, 2),
+      Array(2, 3))
+
+    testCombineDataSkew(
+      partitioner,
+      Array("a", "b", "b", "b", "c", "c"),
+      2,
+      Array(1, 2),
+      Array(3, 2))
+
+    testCombineDataSkew(
+      partitioner,
+      Array("a", "a", "b", "b", "c", "c"),
+      3,
+      Array(0, 1, 2),
+      Array(2, 2, 2))
+
+    testCombineDataSkew(
+      partitioner,
+      Array("a", "a", "a", "b", "c", "c"),
+      2,
+      Array(0, 2),
+      Array(3, 2))
+
+    testCombineDataSkew(
+      partitioner,
+      Array("a", "a", "a", "b", "b", "c"),
+      2,
+      Array(0, 1),
+      Array(3, 2))
+
+    testCombineDataSkew(
+      partitioner,
+      Array("a", "a", "b", "b", "b", "c"),
+      2,
+      Array(0, 1),
+      Array(2, 3))
+  }
 
-    assert(getIndexFileCount("carbon_range_column4") === 5)
-    checkAnswer(sql("SELECT COUNT(*) FROM carbon_range_column4"), Seq(Row(10)))
+  private def testCombineDataSkew(partitioner: DataSkewRangePartitioner[Object, Nothing],
+      bounds: Array[String], skewCount: Int, skewIndexes: Array[Int] = null,
+      skewWeights: Array[Int] = null
+  ): Unit = {
+    val boundsBuffer = new ArrayBuffer[Object]()
+    bounds.map(_.getBytes()).foreach(boundsBuffer += _)
+    val (_, actualSkewCount, actualSkewIndexes, actualSkewWeights) =
+      partitioner.combineDataSkew(boundsBuffer)
+    assertResult(skewCount)(actualSkewCount)
+    if (skewCount > 0) {
+      assertResult(skewIndexes)(actualSkewIndexes)
+      assertResult(skewWeights)(actualSkewWeights)
+    }
   }
 
   private def getIndexFileCount(tableName: String, segmentNo: String = "0"): Int = {
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/DataSkewRangePartitioner.scala b/integration/spark-common/src/main/scala/org/apache/spark/DataSkewRangePartitioner.scala
index 07ad011..5f5c376 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/DataSkewRangePartitioner.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/DataSkewRangePartitioner.scala
@@ -192,6 +192,9 @@ class DataSkewRangePartitioner[K: Ordering : ClassTag, V](
         }
       }
     }
+    if (dataSkewCountTmp > 1) {
+      dataSkewNumTmp += dataSkewCountTmp
+    }
     if (dataSkewIndexTmp.size > 0) {
       (finalBounds.toArray, dataSkewIndexTmp.size, dataSkewIndexTmp.toArray, dataSkewNumTmp.toArray)
     } else {