You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2019/01/30 10:39:12 UTC

[carbondata] 08/27: [CARBONDATA-3257] Fix for NO_SORT load and describe formatted being in NO_SORT flow even with Sort Columns given

This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/carbondata.git

commit abcfad297825095841df34f00df4215191814621
Author: manishnalla1994 <ma...@gmail.com>
AuthorDate: Mon Jan 21 17:23:37 2019 +0530

    [CARBONDATA-3257] Fix for NO_SORT load and describe formatted being in NO_SORT flow even with Sort Columns given
    
    Problem: Data Load is in No sort flow when version is upgraded even if sort columns are given. Also describe formatted displays wrong sort scope after refresh.
    
    Solution: Added a condition to check for the presence of Sort Columns.
    
    This closes #3083
---
 .../core/constants/CarbonCommonConstants.java      |  1 +
 .../sdv/generated/SetParameterTestCase.scala       |  8 +++---
 .../command/carbonTableSchemaCommon.scala          | 12 ---------
 .../command/management/CarbonLoadDataCommand.scala | 31 +++++++++++++++-------
 .../table/CarbonDescribeFormattedCommand.scala     | 18 ++++++++++---
 5 files changed, 42 insertions(+), 28 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index b7d9761..86bf5f1 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -426,6 +426,7 @@ public final class CarbonCommonConstants {
    */
   public static final String DICTIONARY_PATH = "dictionary_path";
   public static final String SORT_COLUMNS = "sort_columns";
+  public static final String SORT_SCOPE = "sort_scope";
   public static final String RANGE_COLUMN = "range_column";
   public static final String PARTITION_TYPE = "partition_type";
   public static final String NUM_PARTITIONS = "num_partitions";
diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/SetParameterTestCase.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/SetParameterTestCase.scala
index 8c336d8..54d9e3f 100644
--- a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/SetParameterTestCase.scala
+++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/SetParameterTestCase.scala
@@ -209,11 +209,11 @@ class SetParameterTestCase extends QueryTest with BeforeAndAfterAll {
     sql("SET carbon.options.sort.scope=local_sort")
     sql(
       "create table carbon_table(empno int, empname String, designation String, doj Timestamp," +
-      "workgroupcategory int) STORED BY 'org.apache.carbondata.format'")
-    checkExistence(sql("DESC FORMATTED carbon_table"), true, "LOCAL_SORT")
-    val sortscope=sql("DESC FORMATTED carbon_table").collect().filter(_.getString(1).trim.equals("LOCAL_SORT"))
+      "workgroupcategory int) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('SORT_COLUMNS'='empno,empname')")
+    checkExistence(sql("DESC FORMATTED carbon_table"), true, "local_sort")
+    val sortscope=sql("DESC FORMATTED carbon_table").collect().filter(_.getString(1).trim.equals("local_sort"))
     assertResult(1)(sortscope.length)
-    assertResult("LOCAL_SORT")(sortscope(0).getString(1).trim)
+    assertResult("local_sort")(sortscope(0).getString(1).trim)
   }
 
   test("TC_011-test SET property to Enable Unsafe Sort") {
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index 2ce9d89..b6b4e8d 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -854,18 +854,6 @@ class TableNewProcessor(cm: TableModel) {
       tableSchema.getTableId,
       cm.databaseNameOp.getOrElse("default"))
     tablePropertiesMap.put("bad_record_path", badRecordsPath)
-    if (tablePropertiesMap.get("sort_columns") != null) {
-      val sortCol = tablePropertiesMap.get("sort_columns")
-      if ((!sortCol.trim.isEmpty) && tablePropertiesMap.get("sort_scope") == null) {
-        // If sort_scope is not specified, but sort_columns are present, set sort_scope as
-        // local_sort in carbon_properties (cannot add in table properties as if user sets carbon
-        // properties it won't be reflected as table properties is given higher priority)
-        if (CarbonProperties.getInstance().getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE) ==
-            null) {
-          tablePropertiesMap.put("sort_scope", "LOCAL_SORT")
-        }
-      }
-    }
     tableSchema.setTableProperties(tablePropertiesMap)
     if (cm.bucketFields.isDefined) {
       val bucketCols = cm.bucketFields.get.bucketColumns.map { b =>
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 0030156..242a467 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -209,15 +209,28 @@ case class CarbonLoadDataCommand(
     * 4. Session property CARBON_OPTIONS_SORT_SCOPE
     * 5. Default Sort Scope LOAD_SORT_SCOPE
     */
-    optionsFinal.put("sort_scope",
-      options.getOrElse("sort_scope",
-        carbonProperty.getProperty(
-          CarbonLoadOptionConstants.CARBON_TABLE_LOAD_SORT_SCOPE + table.getDatabaseName + "." +
-          table.getTableName,
-          tableProperties.asScala.getOrElse("sort_scope",
-            carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
-              carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
-                CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))))))
+    if (tableProperties.get(CarbonCommonConstants.SORT_COLUMNS) != null &&
+        tableProperties.get(CarbonCommonConstants.SORT_SCOPE) == null) {
+      // If there are Sort Columns given for the table and Sort Scope is not specified,
+      // we will take it as whichever sort scope given or LOCAL_SORT as default
+      optionsFinal
+        .put(CarbonCommonConstants.SORT_SCOPE,
+          carbonProperty
+            .getProperty(
+              CarbonLoadOptionConstants.CARBON_TABLE_LOAD_SORT_SCOPE + table.getDatabaseName + "." +
+              table.getTableName, carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
+                SortScopeOptions.getSortScope("LOCAL_SORT").toString)))
+    } else {
+      optionsFinal.put(CarbonCommonConstants.SORT_SCOPE,
+        options.getOrElse(CarbonCommonConstants.SORT_SCOPE,
+          carbonProperty.getProperty(
+            CarbonLoadOptionConstants.CARBON_TABLE_LOAD_SORT_SCOPE + table.getDatabaseName + "." +
+            table.getTableName,
+            tableProperties.asScala.getOrElse(CarbonCommonConstants.SORT_SCOPE,
+              carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
+                carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
+                  CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))))))
+    }
 
     optionsFinal
       .put("bad_record_path", CarbonBadRecordUtil.getBadRecordsPath(options.asJava, table))
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
index 69db4e0..e541139 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
@@ -30,11 +30,11 @@ import org.apache.spark.sql.execution.command.MetadataCommand
 import org.apache.spark.sql.hive.CarbonRelation
 
 import org.apache.carbondata.common.Strings
-import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
 import org.apache.carbondata.core.metadata.datatype.DataTypes
 import org.apache.carbondata.core.metadata.schema.partition.PartitionType
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 
 private[sql] case class CarbonDescribeFormattedCommand(
     child: SparkPlan,
@@ -54,10 +54,22 @@ private[sql] case class CarbonDescribeFormattedCommand(
 
     val carbonTable = relation.carbonTable
     val tblProps = carbonTable.getTableInfo.getFactTable.getTableProperties.asScala
+    // If Sort Columns are given and Sort Scope is not given in either table properties
+    // or carbon properties then pass LOCAL_SORT as the sort scope,
+    // else pass NO_SORT
     val sortScope = if (carbonTable.getNumberOfSortColumns == 0) {
       "NO_SORT"
     } else {
-      tblProps.getOrElse("sort_scope", CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)
+      if (tblProps.contains(CarbonCommonConstants.SORT_SCOPE)) {
+        tblProps.get(CarbonCommonConstants.SORT_SCOPE).toString
+      } else {
+        tblProps
+          .getOrElse(CarbonCommonConstants.SORT_SCOPE,
+            CarbonProperties.getInstance()
+              .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
+                CarbonProperties.getInstance().getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
+                  "LOCAL_SORT")))
+      }
     }
     val streaming: String = if (carbonTable.isStreamingSink) {
       "sink"