You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2019/01/16 14:36:34 UTC
[carbondata] branch master updated: [CARBONDATA-3239] Fix
ArrayIndexOutOfBoundsException in DataSkewRangePartitioner
This is an automated email from the ASF dual-hosted git repository.
ravipesala pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 1a2c051 [CARBONDATA-3239] Fix ArrayIndexOutOfBoundsException in DataSkewRangePartitioner
1a2c051 is described below
commit 1a2c0513e46b317b7d0f13b0b83acdff9213e7fe
Author: QiangCai <qi...@qq.com>
AuthorDate: Thu Jan 10 15:30:31 2019 +0800
[CARBONDATA-3239] Fix ArrayIndexOutOfBoundsException in DataSkewRangePartitioner
Fix ArrayIndexOutOfBoundsException in DataSkewRangePartitioner and add test case
This closes #3061
---
.../src/test/resources/range_column/dataskew.csv | 31 +++---
.../dataload/TestRangeColumnDataLoad.scala | 104 +++++++++++++++++++--
.../apache/spark/DataSkewRangePartitioner.scala | 3 +
3 files changed, 119 insertions(+), 19 deletions(-)
diff --git a/integration/spark-common-test/src/test/resources/range_column/dataskew.csv b/integration/spark-common-test/src/test/resources/range_column/dataskew.csv
index fb77a5d..2a4e2c5 100644
--- a/integration/spark-common-test/src/test/resources/range_column/dataskew.csv
+++ b/integration/spark-common-test/src/test/resources/range_column/dataskew.csv
@@ -1,11 +1,20 @@
-id,name,city,age
-1,,wuhan,91
-2,,hangzhou,102
-3,,beijing,112
-4,,shenzhen,124
-5,e,shenzhen,65
-6,f,beijing,76
-7,g,hangzhou,37
-8,h,wuhan,48
-9,i,,89
-10,j,,50
\ No newline at end of file
+1,abc_1001
+2,null
+3,abc_1001
+4,null
+5,abc_1005
+6,null
+7,abc_1007
+8,null
+9,null
+10,abc_1001
+11,abc_1001
+12,null
+13,abc_1003
+14,null
+15,abc_1005
+16,null
+17,abc_1001
+18,null
+19,null
+20,null
\ No newline at end of file
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestRangeColumnDataLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestRangeColumnDataLoad.scala
index 14f11e5..4803fb2 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestRangeColumnDataLoad.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestRangeColumnDataLoad.scala
@@ -17,6 +17,10 @@
package org.apache.carbondata.spark.testsuite.dataload
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.classTag
+
+import org.apache.spark.DataSkewRangePartitioner
import org.apache.spark.sql.Row
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
@@ -25,8 +29,10 @@ import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException
import org.apache.carbondata.core.datamap.Segment
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore
+import org.apache.carbondata.core.metadata.datatype.DataTypes
import org.apache.carbondata.core.metadata.{CarbonMetadata, SegmentFileStore}
import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.spark.load.PrimtiveOrdering
class TestRangeColumnDataLoad extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll {
var filePath: String = s"$resourcesPath/globalsort"
@@ -97,19 +103,101 @@ class TestRangeColumnDataLoad extends QueryTest with BeforeAndAfterEach with Bef
test("range_column with data skew") {
sql(
"""
- | CREATE TABLE carbon_range_column4(id INT, name STRING, city STRING, age INT)
- | STORED BY 'org.apache.carbondata.format'
- | TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT', 'SORT_COLUMNS'='name, city')
+ | CREATE TABLE carbon_range_column4(c1 int, c2 string)
+ | STORED AS carbondata
+ | TBLPROPERTIES('sort_columns'='c1,c2', 'sort_scope'='local_sort')
""".stripMargin)
val dataSkewPath = s"$resourcesPath/range_column"
- sql(s"LOAD DATA LOCAL INPATH '$dataSkewPath' INTO TABLE carbon_range_column4 " +
- "OPTIONS('GLOBAL_SORT_PARTITIONS'='5', 'range_column'='name', " +
- "'BAD_RECORDS_ACTION'='force')")
+ sql(
+ s"""LOAD DATA LOCAL INPATH '$dataSkewPath'
+ | INTO TABLE carbon_range_column4
+ | OPTIONS('FILEHEADER'='c1,c2', 'range_column'='c2', 'global_sort_partitions'='10')
+ """.stripMargin)
+
+ assert(getIndexFileCount("carbon_range_column4") === 9)
+ checkAnswer(sql("SELECT COUNT(*) FROM carbon_range_column4"), Seq(Row(20)))
+ }
+
+ test("DataSkewRangePartitioner.combineDataSkew") {
+ val partitioner =
+ new DataSkewRangePartitioner(1, null)(new PrimtiveOrdering(DataTypes.STRING),
+ classTag[Object])
+
+ testCombineDataSkew(
+ partitioner,
+ Array("a", "b"),
+ 0)
+
+ testCombineDataSkew(
+ partitioner,
+ Array("a", "a"),
+ 1,
+ Array(0),
+ Array(2))
+
+ testCombineDataSkew(
+ partitioner,
+ Array("a", "b", "c"),
+ 0)
+
+ testCombineDataSkew(
+ partitioner,
+ Array("a", "b", "b", "c", "c", "c"),
+ 2,
+ Array(1, 2),
+ Array(2, 3))
+
+ testCombineDataSkew(
+ partitioner,
+ Array("a", "b", "b", "b", "c", "c"),
+ 2,
+ Array(1, 2),
+ Array(3, 2))
+
+ testCombineDataSkew(
+ partitioner,
+ Array("a", "a", "b", "b", "c", "c"),
+ 3,
+ Array(0, 1, 2),
+ Array(2, 2, 2))
+
+ testCombineDataSkew(
+ partitioner,
+ Array("a", "a", "a", "b", "c", "c"),
+ 2,
+ Array(0, 2),
+ Array(3, 2))
+
+ testCombineDataSkew(
+ partitioner,
+ Array("a", "a", "a", "b", "b", "c"),
+ 2,
+ Array(0, 1),
+ Array(3, 2))
+
+ testCombineDataSkew(
+ partitioner,
+ Array("a", "a", "b", "b", "b", "c"),
+ 2,
+ Array(0, 1),
+ Array(2, 3))
+ }
- assert(getIndexFileCount("carbon_range_column4") === 5)
- checkAnswer(sql("SELECT COUNT(*) FROM carbon_range_column4"), Seq(Row(10)))
+ private def testCombineDataSkew(partitioner: DataSkewRangePartitioner[Object, Nothing],
+ bounds: Array[String], skewCount: Int, skewIndexes: Array[Int] = null,
+ skewWeights: Array[Int] = null
+ ): Unit = {
+ val boundsBuffer = new ArrayBuffer[Object]()
+ bounds.map(_.getBytes()).foreach(boundsBuffer += _)
+ val (_, actualSkewCount, actualSkewIndexes, actualSkewWeights) =
+ partitioner.combineDataSkew(boundsBuffer)
+ assertResult(skewCount)(actualSkewCount)
+ if (skewCount > 0) {
+ assertResult(skewIndexes)(actualSkewIndexes)
+ assertResult(skewWeights)(actualSkewWeights)
+ }
}
private def getIndexFileCount(tableName: String, segmentNo: String = "0"): Int = {
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/DataSkewRangePartitioner.scala b/integration/spark-common/src/main/scala/org/apache/spark/DataSkewRangePartitioner.scala
index 07ad011..5f5c376 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/DataSkewRangePartitioner.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/DataSkewRangePartitioner.scala
@@ -192,6 +192,9 @@ class DataSkewRangePartitioner[K: Ordering : ClassTag, V](
}
}
}
+ if (dataSkewCountTmp > 1) {
+ dataSkewNumTmp += dataSkewCountTmp
+ }
if (dataSkewIndexTmp.size > 0) {
(finalBounds.toArray, dataSkewIndexTmp.size, dataSkewIndexTmp.toArray, dataSkewNumTmp.toArray)
} else {