You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2019/01/30 10:39:12 UTC
[carbondata] 08/27: [CARBONDATA-3257] Fix for NO_SORT load and
describe formatted being in NO_SORT flow even with Sort Columns given
This is an automated email from the ASF dual-hosted git repository.
ravipesala pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/carbondata.git
commit abcfad297825095841df34f00df4215191814621
Author: manishnalla1994 <ma...@gmail.com>
AuthorDate: Mon Jan 21 17:23:37 2019 +0530
[CARBONDATA-3257] Fix for NO_SORT load and describe formatted being in NO_SORT flow even with Sort Columns given
Problem: Data Load is in No sort flow when version is upgraded even if sort columns are given. Also describe formatted displays wrong sort scope after refresh.
Solution: Added a condition to check for the presence of Sort Columns.
This closes #3083
---
.../core/constants/CarbonCommonConstants.java | 1 +
.../sdv/generated/SetParameterTestCase.scala | 8 +++---
.../command/carbonTableSchemaCommon.scala | 12 ---------
.../command/management/CarbonLoadDataCommand.scala | 31 +++++++++++++++-------
.../table/CarbonDescribeFormattedCommand.scala | 18 ++++++++++---
5 files changed, 42 insertions(+), 28 deletions(-)
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index b7d9761..86bf5f1 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -426,6 +426,7 @@ public final class CarbonCommonConstants {
*/
public static final String DICTIONARY_PATH = "dictionary_path";
public static final String SORT_COLUMNS = "sort_columns";
+ public static final String SORT_SCOPE = "sort_scope";
public static final String RANGE_COLUMN = "range_column";
public static final String PARTITION_TYPE = "partition_type";
public static final String NUM_PARTITIONS = "num_partitions";
diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/SetParameterTestCase.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/SetParameterTestCase.scala
index 8c336d8..54d9e3f 100644
--- a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/SetParameterTestCase.scala
+++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/SetParameterTestCase.scala
@@ -209,11 +209,11 @@ class SetParameterTestCase extends QueryTest with BeforeAndAfterAll {
sql("SET carbon.options.sort.scope=local_sort")
sql(
"create table carbon_table(empno int, empname String, designation String, doj Timestamp," +
- "workgroupcategory int) STORED BY 'org.apache.carbondata.format'")
- checkExistence(sql("DESC FORMATTED carbon_table"), true, "LOCAL_SORT")
- val sortscope=sql("DESC FORMATTED carbon_table").collect().filter(_.getString(1).trim.equals("LOCAL_SORT"))
+ "workgroupcategory int) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('SORT_COLUMNS'='empno,empname')")
+ checkExistence(sql("DESC FORMATTED carbon_table"), true, "local_sort")
+ val sortscope=sql("DESC FORMATTED carbon_table").collect().filter(_.getString(1).trim.equals("local_sort"))
assertResult(1)(sortscope.length)
- assertResult("LOCAL_SORT")(sortscope(0).getString(1).trim)
+ assertResult("local_sort")(sortscope(0).getString(1).trim)
}
test("TC_011-test SET property to Enable Unsafe Sort") {
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index 2ce9d89..b6b4e8d 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -854,18 +854,6 @@ class TableNewProcessor(cm: TableModel) {
tableSchema.getTableId,
cm.databaseNameOp.getOrElse("default"))
tablePropertiesMap.put("bad_record_path", badRecordsPath)
- if (tablePropertiesMap.get("sort_columns") != null) {
- val sortCol = tablePropertiesMap.get("sort_columns")
- if ((!sortCol.trim.isEmpty) && tablePropertiesMap.get("sort_scope") == null) {
- // If sort_scope is not specified, but sort_columns are present, set sort_scope as
- // local_sort in carbon_properties (cannot add in table properties as if user sets carbon
- // properties it won't be reflected as table properties is given higher priority)
- if (CarbonProperties.getInstance().getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE) ==
- null) {
- tablePropertiesMap.put("sort_scope", "LOCAL_SORT")
- }
- }
- }
tableSchema.setTableProperties(tablePropertiesMap)
if (cm.bucketFields.isDefined) {
val bucketCols = cm.bucketFields.get.bucketColumns.map { b =>
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 0030156..242a467 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -209,15 +209,28 @@ case class CarbonLoadDataCommand(
* 4. Session property CARBON_OPTIONS_SORT_SCOPE
* 5. Default Sort Scope LOAD_SORT_SCOPE
*/
- optionsFinal.put("sort_scope",
- options.getOrElse("sort_scope",
- carbonProperty.getProperty(
- CarbonLoadOptionConstants.CARBON_TABLE_LOAD_SORT_SCOPE + table.getDatabaseName + "." +
- table.getTableName,
- tableProperties.asScala.getOrElse("sort_scope",
- carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
- carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
- CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))))))
+ if (tableProperties.get(CarbonCommonConstants.SORT_COLUMNS) != null &&
+ tableProperties.get(CarbonCommonConstants.SORT_SCOPE) == null) {
+ // If there are Sort Columns given for the table and Sort Scope is not specified,
+ // we will take it as whichever sort scope given or LOCAL_SORT as default
+ optionsFinal
+ .put(CarbonCommonConstants.SORT_SCOPE,
+ carbonProperty
+ .getProperty(
+ CarbonLoadOptionConstants.CARBON_TABLE_LOAD_SORT_SCOPE + table.getDatabaseName + "." +
+ table.getTableName, carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
+ SortScopeOptions.getSortScope("LOCAL_SORT").toString)))
+ } else {
+ optionsFinal.put(CarbonCommonConstants.SORT_SCOPE,
+ options.getOrElse(CarbonCommonConstants.SORT_SCOPE,
+ carbonProperty.getProperty(
+ CarbonLoadOptionConstants.CARBON_TABLE_LOAD_SORT_SCOPE + table.getDatabaseName + "." +
+ table.getTableName,
+ tableProperties.asScala.getOrElse(CarbonCommonConstants.SORT_SCOPE,
+ carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
+ carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
+ CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))))))
+ }
optionsFinal
.put("bad_record_path", CarbonBadRecordUtil.getBadRecordsPath(options.asJava, table))
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
index 69db4e0..e541139 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
@@ -30,11 +30,11 @@ import org.apache.spark.sql.execution.command.MetadataCommand
import org.apache.spark.sql.hive.CarbonRelation
import org.apache.carbondata.common.Strings
-import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
import org.apache.carbondata.core.metadata.datatype.DataTypes
import org.apache.carbondata.core.metadata.schema.partition.PartitionType
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
private[sql] case class CarbonDescribeFormattedCommand(
child: SparkPlan,
@@ -54,10 +54,22 @@ private[sql] case class CarbonDescribeFormattedCommand(
val carbonTable = relation.carbonTable
val tblProps = carbonTable.getTableInfo.getFactTable.getTableProperties.asScala
+ // If Sort Columns are given and Sort Scope is not given in either table properties
+ // or carbon properties then pass LOCAL_SORT as the sort scope,
+ // else pass NO_SORT
val sortScope = if (carbonTable.getNumberOfSortColumns == 0) {
"NO_SORT"
} else {
- tblProps.getOrElse("sort_scope", CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)
+ if (tblProps.contains(CarbonCommonConstants.SORT_SCOPE)) {
+ tblProps.get(CarbonCommonConstants.SORT_SCOPE).toString
+ } else {
+ tblProps
+ .getOrElse(CarbonCommonConstants.SORT_SCOPE,
+ CarbonProperties.getInstance()
+ .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
+ CarbonProperties.getInstance().getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
+ "LOCAL_SORT")))
+ }
}
val streaming: String = if (carbonTable.isStreamingSink) {
"sink"