You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/09/21 00:27:20 UTC
[10/23] carbondata git commit: [CARBONDATA-1438] Unify the sort
column and sort scope in create table command
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cb6f65/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortcolumns/TestSortColumns.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortcolumns/TestSortColumns.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortcolumns/TestSortColumns.scala
index bd1264a..9e4f3b7 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortcolumns/TestSortColumns.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortcolumns/TestSortColumns.scala
@@ -43,11 +43,10 @@ class TestSortColumns extends QueryTest with BeforeAndAfterAll {
" workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, " +
"projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int," +
"utilization int,salary int) STORED BY 'org.apache.carbondata.format' tblproperties" +
- "('dictionary_exclude'='empno','sort_columns'='empno')")
+ "('dictionary_exclude'='empno','sort_columns'='empno', 'SORT_SCOPE'='BATCH_SORT')")
sql(
s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE sorttable1a OPTIONS
- |('DELIMITER'= ',', 'QUOTECHAR'= '\"','SORT_SCOPE'='BATCH_SORT',
- |'batch_sort_size_inmb'='64')""".stripMargin)
+ |('DELIMITER'= ',', 'QUOTECHAR'= '\"', 'batch_sort_size_inmb'='64')""".stripMargin)
checkAnswer(sql("select empname from sorttable1a"),
sql("select empname from origintable1 order by empname"))
}
@@ -61,11 +60,11 @@ class TestSortColumns extends QueryTest with BeforeAndAfterAll {
" workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, " +
"projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int," +
"utilization int,salary int) STORED BY 'org.apache.carbondata.format' tblproperties" +
- "('dictionary_exclude'='empno,empname,workgroupcategoryname','sort_columns'='empno,empname')")
+ "('dictionary_exclude'='empno,empname,workgroupcategoryname','sort_columns'='empno,empname'," +
+ "'SORT_SCOPE'='BATCH_SORT')")
sql(
s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE sorttable1b OPTIONS
- |('DELIMITER'= ',', 'QUOTECHAR'= '\"','SORT_SCOPE'='BATCH_SORT',
- |'batch_sort_size_inmb'='64')""".stripMargin)
+ |('DELIMITER'= ',', 'QUOTECHAR'= '\"', 'batch_sort_size_inmb'='64')""".stripMargin)
checkAnswer(sql("select empname from sorttable1b"),
sql("select empname from origintable1 order by empname"))
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cb6f65/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/ValidateUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/ValidateUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/ValidateUtil.scala
index f2a4a7d..a73b0df 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/ValidateUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/ValidateUtil.scala
@@ -52,7 +52,7 @@ object ValidateUtil {
if (sortScope != null) {
// Don't support use global sort on partitioned table.
if (carbonTable.getPartitionInfo(carbonTable.getFactTableName) != null &&
- sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT.toString)) {
+ sortScope.equalsIgnoreCase(SortScopeOptions.SortScope.GLOBAL_SORT.toString)) {
throw new MalformedCarbonCommandException("Don't support use global sort on partitioned " +
"table.")
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cb6f65/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index 661f724..d0309ba 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -845,8 +845,8 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
"COMPLEX_DELIMITER_LEVEL_1", "COMPLEX_DELIMITER_LEVEL_2", "COLUMNDICT",
"SERIALIZATION_NULL_FORMAT", "BAD_RECORDS_LOGGER_ENABLE", "BAD_RECORDS_ACTION",
"ALL_DICTIONARY_PATH", "MAXCOLUMNS", "COMMENTCHAR", "DATEFORMAT", "BAD_RECORD_PATH",
- "SINGLE_PASS", "IS_EMPTY_DATA_BAD_RECORD", "SORT_SCOPE", "BATCH_SORT_SIZE_INMB",
- "GLOBAL_SORT_PARTITIONS", "HEADER"
+ "BATCH_SORT_SIZE_INMB", "GLOBAL_SORT_PARTITIONS", "SINGLE_PASS",
+ "IS_EMPTY_DATA_BAD_RECORD", "HEADER"
)
var isSupported = true
val invalidOptions = StringBuilder.newBuilder
@@ -901,14 +901,6 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
}
}
- if (options.exists(_._1.equalsIgnoreCase("SORT_SCOPE"))) {
- val optionValue: String = options.get("sort_scope").get.head._2
- if (!SortScopeOptions.isValidSortOption(optionValue)) {
- throw new MalformedCarbonCommandException(
- "option SORT_SCOPE can have option either BATCH_SORT or LOCAL_SORT or GLOBAL_SORT")
- }
- }
-
// check for duplicate options
val duplicateOptions = options filter {
case (_, optionlist) => optionlist.size > 1
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cb6f65/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 98ceae8..8a39b0a 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -39,9 +39,10 @@ import org.codehaus.jackson.map.ObjectMapper
import org.apache.carbondata.api.CarbonStore
import org.apache.carbondata.common.constants.LoggerAction
import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.dictionary.server.DictionaryServer
+import org.apache.carbondata.core.exception.InvalidConfigurationException
import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage}
import org.apache.carbondata.core.metadata.{CarbonMetadata, CarbonTableIdentifier}
import org.apache.carbondata.core.metadata.encoder.Encoding
@@ -173,6 +174,14 @@ case class CreateTable(cm: TableModel) extends RunnableCommand {
val tableInfo: TableInfo = TableNewProcessor(cm)
+ // Add validation for sort scope when create table
+ val sortScope = tableInfo.getFactTable.getTableProperties
+ .getOrDefault("sort_scope", CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)
+ if (!CarbonUtil.isValidSortOption(sortScope)) {
+ throw new InvalidConfigurationException(s"Passing invalid SORT_SCOPE '$sortScope'," +
+ s" valid SORT_SCOPE are 'NO_SORT', 'BATCH_SORT', 'LOCAL_SORT' and 'GLOBAL_SORT' ")
+ }
+
if (tableInfo.getFactTable.getListOfColumns.isEmpty) {
sys.error("No Dimensions found. Table should have at least one dimesnion !")
}
@@ -433,10 +442,25 @@ case class LoadTable(
val dateFormat = options.getOrElse("dateformat", null)
ValidateUtil.validateDateFormat(dateFormat, table, tableName)
val maxColumns = options.getOrElse("maxcolumns", null)
- val sortScope = options.getOrElse("sort_scope", null)
+ val tableProperties = table.getTableInfo.getFactTable.getTableProperties
+ val sortScopeDefault = CarbonProperties.getInstance().
+ getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
+ CarbonProperties.getInstance().getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
+ CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))
+ val sortScope = if (null == tableProperties) {
+ sortScopeDefault
+ } else {
+ tableProperties.getOrDefault("sort_scope", sortScopeDefault)
+ }
+
ValidateUtil.validateSortScope(table, sortScope)
- val batchSortSizeInMB = options.getOrElse("batch_sort_size_inmb", null)
- val globalSortPartitions = options.getOrElse("global_sort_partitions", null)
+ val carbonProperty: CarbonProperties = CarbonProperties.getInstance()
+ val batchSortSizeInMB = options.getOrElse("batch_sort_size_inmb", carbonProperty
+ .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB,
+ carbonProperty.getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
+ CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT)))
+ val globalSortPartitions = options.getOrElse("global_sort_partitions", carbonProperty
+ .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS, null))
ValidateUtil.validateGlobalSortPartitions(globalSortPartitions)
// if there isn't file header in csv file and load sql doesn't provide FILEHEADER option,
@@ -884,6 +908,9 @@ private[sql] case class DescribeCommandFormatted(
results ++= Seq(("CARBON Store Path: ", relation.tableMeta.storePath, ""))
val carbonTable = relation.tableMeta.carbonTable
results ++= Seq(("Table Block Size : ", carbonTable.getBlockSizeInMB + " MB", ""))
+ results ++= Seq(("SORT_SCOPE", carbonTable.getTableInfo.getFactTable
+ .getTableProperties.getOrDefault("sort_scope", CarbonCommonConstants
+ .LOAD_SORT_SCOPE_DEFAULT), CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))
results ++= Seq(("", "", ""), ("##Detailed Column property", "", ""))
if (colPropStr.length() > 0) {
results ++= Seq((colPropStr, "", ""))
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cb6f65/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index de16f69..3f0153e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -43,6 +43,7 @@ import org.apache.carbondata.core.cache.CacheProvider
import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.dictionary.server.DictionaryServer
+import org.apache.carbondata.core.exception.InvalidConfigurationException
import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier}
import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
@@ -345,6 +346,14 @@ case class CreateTable(cm: TableModel, createDSTable: Boolean = true) extends Ru
val tableInfo: TableInfo = TableNewProcessor(cm)
+ // Add validation for sort scope when create table
+ val sortScope = tableInfo.getFactTable.getTableProperties
+ .getOrDefault("sort_scope", CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)
+ if (!CarbonUtil.isValidSortOption(sortScope)) {
+ throw new InvalidConfigurationException(s"Passing invalid SORT_SCOPE '$sortScope'," +
+ s" valid SORT_SCOPE are 'NO_SORT', 'BATCH_SORT', 'LOCAL_SORT' and 'GLOBAL_SORT' ")
+ }
+
if (tableInfo.getFactTable.getListOfColumns.size <= 0) {
sys.error("No Dimensions found. Table should have at least one dimesnion !")
}
@@ -562,16 +571,12 @@ case class LoadTable(
.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS, null)))
optionsFinal.put("maxcolumns", options.getOrElse("maxcolumns", null))
- optionsFinal.put("sort_scope", options
- .getOrElse("sort_scope",
- carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
- carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
- CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))))
optionsFinal.put("batch_sort_size_inmb", options.getOrElse("batch_sort_size_inmb",
carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB,
carbonProperty.getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT))))
+
optionsFinal.put("bad_record_path", options.getOrElse("bad_record_path",
carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
carbonProperty.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
@@ -640,6 +645,15 @@ case class LoadTable(
val carbonProperty: CarbonProperties = CarbonProperties.getInstance()
carbonProperty.addProperty("zookeeper.enable.lock", "false")
val optionsFinal = getFinalOptions(carbonProperty)
+
+ val tableProperties = relation.tableMeta.carbonTable.getTableInfo
+ .getFactTable.getTableProperties
+
+ optionsFinal.put("sort_scope", tableProperties.getOrDefault("sort_scope",
+ carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
+ carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
+ CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))))
+
try {
val factPath = if (dataFrame.isDefined) {
""
@@ -677,7 +691,6 @@ case class LoadTable(
ValidateUtil.validateDateFormat(dateFormat, table, tableName)
ValidateUtil.validateSortScope(table, sort_scope)
-
if (bad_records_logger_enable.toBoolean ||
LoggerAction.REDIRECT.name().equalsIgnoreCase(bad_records_action)) {
if (!CarbonUtil.isValidBadStorePath(bad_record_path)) {
@@ -1140,6 +1153,9 @@ private[sql] case class DescribeCommandFormatted(
results ++= Seq(("CARBON Store Path: ", relation.tableMeta.storePath, ""))
val carbonTable = relation.tableMeta.carbonTable
results ++= Seq(("Table Block Size : ", carbonTable.getBlockSizeInMB + " MB", ""))
+ results ++= Seq(("SORT_SCOPE", carbonTable.getTableInfo.getFactTable
+ .getTableProperties.getOrDefault("sort_scope", CarbonCommonConstants
+ .LOAD_SORT_SCOPE_DEFAULT), CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))
results ++= Seq(("", "", ""), ("##Detailed Column property", "", ""))
if (colPropStr.length() > 0) {
results ++= Seq((colPropStr, "", ""))