You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by qi...@apache.org on 2020/05/31 14:56:00 UTC

[carbondata] branch master updated: [CARBONDATA-3835] Fix global sort issues

This is an automated email from the ASF dual-hosted git repository.

qiangcai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 76e5489  [CARBONDATA-3835] Fix global sort issues
76e5489 is described below

commit 76e54891e7925d26c97eaeab09ca9a3368836ce8
Author: ajantha-bhat <aj...@gmail.com>
AuthorDate: Sat May 30 08:24:39 2020 +0530

    [CARBONDATA-3835] Fix global sort issues
    
    Why is this PR needed?
    For global sort without partition, string comes as byte[], if we use string comparator (StringSerializableComparator) it will convert byte[] to toString which gives address and comparison goes wrong.
    For global sort with partition, when sort column is partition column, it was sorting on first column instead of partition columns.
    
    What changes were proposed in this PR?
    change data type to byte before choosing a comparator.
    get the sorted column based on index, don't just take from first
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    Yes
    
    This closes #3779
---
 .../spark/load/DataLoadProcessBuilderOnSpark.scala | 15 ++++--
 .../carbondata/spark/load/GlobalSortHelper.scala   |  8 ++-
 .../command/management/CommonLoadUtils.scala       |  7 ++-
 .../org/apache/spark/sql/test/util/QueryTest.scala | 17 +++++-
 .../StandardPartitionGlobalSortTestCase.scala      | 60 ++++++++++++++++++++++
 5 files changed, 94 insertions(+), 13 deletions(-)

diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
index 55eee11..e7e1baf 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
@@ -32,6 +32,7 @@ import org.apache.spark.sql.{DataFrame, SparkSession}
 import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.command.ExecutionErrors
+import org.apache.spark.sql.types.{ByteType, DateType, LongType, StringType, TimestampType}
 import org.apache.spark.sql.util.{SparkSQLUtil, SparkTypeConverter}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.unsafe.types.UTF8String
@@ -237,10 +238,15 @@ object DataLoadProcessBuilderOnSpark {
         CarbonProperties.getInstance().getGlobalSortRddStorageLevel()))
     }
     val sortColumnsLength = model.getCarbonDataLoadSchema.getCarbonTable.getSortColumns.size()
-    val sortColumnDataTypes = dataTypes.take(sortColumnsLength)
-    val rowComparator = GlobalSortHelper.generateRowComparator(sortColumnDataTypes)
+    var sortColumnDataTypes = dataTypes.take(sortColumnsLength)
+    sortColumnDataTypes = sortColumnDataTypes.map {
+      case StringType => ByteType
+      case TimestampType | DateType => LongType
+      case datatype => datatype
+    }
+    val rowComparator = GlobalSortHelper.generateRowComparator(sortColumnDataTypes.zipWithIndex)
     val sortRDD = rdd.sortBy(row =>
-      getKey(row, sortColumnsLength, sortColumnDataTypes),
+      getKey(row, sortColumnsLength),
       true,
       numPartitions)(
       rowComparator, classTag[Array[AnyRef]])
@@ -273,8 +279,7 @@ object DataLoadProcessBuilderOnSpark {
   }
 
   def getKey(row: Array[AnyRef],
-      sortColumnsLength: Int,
-      dataTypes: Seq[org.apache.spark.sql.types.DataType]): Array[AnyRef] = {
+      sortColumnsLength: Int): Array[AnyRef] = {
     val key: Array[AnyRef] = new Array[AnyRef](sortColumnsLength)
     System.arraycopy(row, 0, key, 0, sortColumnsLength)
     key
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/load/GlobalSortHelper.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/load/GlobalSortHelper.scala
index 91ed27e..00891b9 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/load/GlobalSortHelper.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/load/GlobalSortHelper.scala
@@ -50,7 +50,7 @@ object GlobalSortHelper {
 
   def sortBy(updatedRdd: RDD[InternalRow],
       numPartitions: Int,
-      dataTypes: Seq[DataType]
+      dataTypes: Seq[(DataType, Int)]
   ): RDD[InternalRow] = {
     val keyExtractors = generateKeyExtractor(dataTypes)
     val rowComparator = generateRowComparator(dataTypes)
@@ -68,9 +68,8 @@ object GlobalSortHelper {
     key
   }
 
-  def generateKeyExtractor(dataTypes: Seq[DataType]): Array[KeyExtractor] = {
+  def generateKeyExtractor(dataTypes: Seq[(DataType, Int)]): Array[KeyExtractor] = {
     dataTypes
-      .zipWithIndex
       .map { attr =>
         attr._1 match {
           case StringType => UTF8StringKeyExtractor(attr._2)
@@ -91,9 +90,8 @@ object GlobalSortHelper {
       .toArray
   }
 
-  def generateRowComparator(dataTypes: Seq[DataType]): InternalRowComparator = {
+  def generateRowComparator(dataTypes: Seq[(DataType, Int)]): InternalRowComparator = {
     val comparators = dataTypes
-      .zipWithIndex
       .map { attr =>
         val comparator = attr._1 match {
           case StringType => new StringSerializableComparator()
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
index ce699d6..20d29d8 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
@@ -518,7 +518,12 @@ object CommonLoadUtils {
         } else {
           attributes.take(table.getSortColumns.size())
         }
-      val dataTypes = sortColumns.map(_.dataType)
+      val attributesWithIndex = attributes.zipWithIndex
+      val dataTypes = sortColumns.map { column =>
+        val attributeWithIndex =
+          attributesWithIndex.find(x => x._1.name.equalsIgnoreCase(column.name))
+        (column.dataType, attributeWithIndex.get._2)
+      }
       val sortedRDD: RDD[InternalRow] =
         GlobalSortHelper.sortBy(updatedRdd, numPartitions, dataTypes)
       val outputOrdering = sortColumns.map(SortOrder(_, Ascending))
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala b/integration/spark/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala
index ef9d1b2..c08370b 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala
@@ -90,6 +90,13 @@ class QueryTest extends PlanTest {
     }
   }
 
+  protected def checkAnswerWithoutSort(df: DataFrame, expectedAnswer: Seq[Row]): Unit = {
+    QueryTest.checkAnswer(df, expectedAnswer, needSort = false) match {
+      case Some(errorMessage) => fail(errorMessage)
+      case None =>
+    }
+  }
+
   protected def checkAnswer(df: DataFrame, expectedAnswer: Row): Unit = {
     checkAnswer(df, Seq(expectedAnswer))
   }
@@ -219,8 +226,14 @@ object QueryTest {
    * @param df the [[DataFrame]] to be executed
    * @param expectedAnswer the expected result in a [[Seq]] of [[Row]]s.
    */
-  def checkAnswer(df: DataFrame, expectedAnswer: Seq[Row]): Option[String] = {
-    val isSorted = df.logicalPlan.collect { case s: logical.Sort => s }.nonEmpty
+  def checkAnswer(df: DataFrame,
+      expectedAnswer: Seq[Row],
+      needSort: Boolean = true): Option[String] = {
+    val isSorted = if (needSort) {
+      df.logicalPlan.collect { case s: logical.Sort => s }.nonEmpty
+    } else {
+      true
+    }
     def prepareAnswer(answer: Seq[Row]): Seq[Row] = {
       // Converts data to types that we can do equality comparison using Scala collections.
       // For BigDecimal type, the Scala type has a better definition of equality test (similar to
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala
index 3deb8ba..a9c773d 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala
@@ -48,6 +48,66 @@ class StandardPartitionGlobalSortTestCase extends QueryTest with BeforeAndAfterA
     sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE originTable OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
   }
 
+  test("test global sort column as partition column") {
+    sql("drop table if exists table1")
+    sql(s"""
+         | create table table1(
+         | timestampField TIMESTAMP
+         | )
+         | stored as carbondata
+         | tblproperties('sort_scope'='global_sort', 'sort_columns'='charField')
+         | partitioned by (charField String)
+       """.stripMargin)
+    sql(
+      s"""
+         | INSERT INTO TABLE table1
+         | SELECT
+         | '2015-07-01 00:00:00', 'aaa'
+       """.stripMargin)
+    checkAnswer(sql(s"SELECT charField FROM table1"), Seq(Row("aaa")))
+    sql("drop table if exists table1")
+  }
+
+  test("test global sort order") {
+    sql("DROP TABLE IF EXISTS gs_test")
+    sql("create table gs_test (id string) " +
+        "using carbondata options('sort_scope'='global_sort', 'local_dictionary_enable'='false', 'sort_columns'='id','global_sort_partitions'='1')")
+    sql("insert into gs_test select 'abc1' union all select 'abc5' union all select 'abc10' union all select 'abc20' ")
+    checkAnswerWithoutSort(sql("select * from gs_test"), Seq(Row("abc1"), Row("abc10"), Row("abc20"), Row("abc5")))
+    sql("DROP TABLE IF EXISTS gs_test")
+  }
+
+  test("test global sort order in old insert flow") {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_ENABLE_BAD_RECORD_HANDLING_FOR_INSERT, "true");
+    sql("DROP TABLE IF EXISTS gs_test")
+    sql("create table gs_test (id string) " +
+        "using carbondata options('sort_scope'='global_sort', 'local_dictionary_enable'='false', 'sort_columns'='id','global_sort_partitions'='1')")
+    sql("insert into gs_test select 'abc1' union all select 'abc5' union all select 'abc10' union all select 'abc20' ")
+    checkAnswerWithoutSort(sql("select id from gs_test"), Seq(Row("abc1"), Row("abc10"), Row("abc20"), Row("abc5")))
+    sql("DROP TABLE IF EXISTS gs_test")
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_ENABLE_BAD_RECORD_HANDLING_FOR_INSERT, "false");
+  }
+
+  test("test global sort partition order") {
+    sql("DROP TABLE IF EXISTS gs_test")
+    sql("create table gs_test (id string) partitioned by (val int)" +
+        "stored as carbondata tblproperties('sort_scope'='global_sort', 'local_dictionary_enable'='false', 'sort_columns'='id','global_sort_partitions'='1')")
+    sql("insert into gs_test select 'abc1', 1 union all select 'abc5', 1 union all select 'abc10', 1 union all select 'abc20', 1 ")
+    checkAnswerWithoutSort(sql("select id from gs_test"), Seq(Row("abc1"), Row("abc10"), Row("abc20"), Row("abc5")))
+    sql("DROP TABLE IF EXISTS gs_test")
+  }
+
+  test("test global sort order in old insert partition flow") {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_ENABLE_BAD_RECORD_HANDLING_FOR_INSERT, "true");
+    sql("DROP TABLE IF EXISTS gs_test")
+    sql("create table gs_test (id string) " +
+        "using carbondata options('sort_scope'='global_sort', 'local_dictionary_enable'='false', 'sort_columns'='id','global_sort_partitions'='1')")
+    sql("insert into gs_test select 'abc1' union all select 'abc5' union all select 'abc10' union all select 'abc20' ")
+    checkAnswerWithoutSort(sql("select id from gs_test"), Seq(Row("abc1"), Row("abc10"), Row("abc20"), Row("abc5")))
+    sql("DROP TABLE IF EXISTS gs_test")
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_ENABLE_BAD_RECORD_HANDLING_FOR_INSERT, "false");
+  }
+
   test("data loading for global sort partition table for one partition column") {
     sql(
       """