You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2019/01/09 08:36:44 UTC
carbondata git commit: [CARBONDATA-3201] Added load level SORT_SCOPE
Added SORT_SCOPE in Load Options & in SET Command
Repository: carbondata
Updated Branches:
refs/heads/master 4e27b86df -> 77d2b4e8d
[CARBONDATA-3201] Added load level SORT_SCOPE
Added SORT_SCOPE in Load Options & in SET Command
1. Added load level SORT_SCOPE
2. Added Sort_Scope for PreAgg
3. Added sort_scope msg for LoadDataCommand
4. Added property CARBON.TABLE.LOAD.SORT.SCOPE.<database>.<table> to set table level sort_scope property
5. Removed test case veryfying LOAD_OPTIONS with SORT_SCOPE
Load level SORT_SCOPE
LOAD DATA INPATH 'path/to/data.csv'
INTO TABLE my_table
OPTIONS (
'sort_scope'='no_sort'
)
Priority of SORT_SCOPE
Load Level (if provided)
Table level (if provided)
Default
This closes #3014
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/77d2b4e8
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/77d2b4e8
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/77d2b4e8
Branch: refs/heads/master
Commit: 77d2b4e8d132f768b83438845f6fb9660a74fe1f
Parents: 4e27b86
Author: namanrastogi <na...@gmail.com>
Authored: Fri Dec 21 13:03:30 2018 +0530
Committer: kumarvishal09 <ku...@gmail.com>
Committed: Wed Jan 9 14:06:17 2019 +0530
----------------------------------------------------------------------
.../constants/CarbonLoadOptionConstants.java | 6 ++++
.../carbondata/core/util/SessionParams.java | 8 ++++-
.../TestCreateTableWithSortScope.scala | 19 -----------
.../streaming/StreamSinkFactory.scala | 2 +-
.../spark/sql/catalyst/CarbonDDLSqlParser.scala | 3 +-
.../CarbonAlterTableCompactionCommand.scala | 4 +--
.../management/CarbonLoadDataCommand.scala | 35 +++++++++++++++++---
.../preaaggregate/PreAggregateListeners.scala | 7 ++--
.../preaaggregate/PreAggregateTableHelper.scala | 3 +-
.../preaaggregate/PreAggregateUtil.scala | 2 ++
.../execution/command/CarbonHiveCommands.scala | 19 ++++++++---
.../commands/SetCommandTestCase.scala | 28 ++++++++++++++++
.../processing/loading/events/LoadEvents.java | 13 +++++++-
13 files changed, 111 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/77d2b4e8/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
index 5cf6163..eef2bef 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
@@ -81,6 +81,12 @@ public final class CarbonLoadOptionConstants {
"carbon.options.sort.scope";
/**
+ * option to specify table level sort_scope
+ */
+ @CarbonProperty(dynamicConfigurable = true)
+ public static final String CARBON_TABLE_LOAD_SORT_SCOPE = "carbon.table.load.sort.scope.";
+
+ /**
* option to specify the batch sort size inmb
*/
@CarbonProperty(dynamicConfigurable = true)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/77d2b4e8/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
index f49747f..d9aa214 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
@@ -161,7 +161,7 @@ public class SessionParams implements Serializable, Cloneable {
isValid = CarbonUtil.isValidSortOption(value);
if (!isValid) {
throw new InvalidConfigurationException("The sort scope " + key
- + " can have only either BATCH_SORT or LOCAL_SORT or NO_SORT.");
+ + " can have only either NO_SORT, BATCH_SORT, LOCAL_SORT or GLOBAL_SORT.");
}
break;
case CARBON_OPTIONS_BATCH_SORT_SIZE_INMB:
@@ -229,6 +229,12 @@ public class SessionParams implements Serializable, Cloneable {
if (!isValid) {
throw new InvalidConfigurationException("Invalid value " + value + " for key " + key);
}
+ } else if (key.startsWith(CarbonLoadOptionConstants.CARBON_TABLE_LOAD_SORT_SCOPE)) {
+ isValid = CarbonUtil.isValidSortOption(value);
+ if (!isValid) {
+ throw new InvalidConfigurationException("The sort scope " + key
+ + " can have only either NO_SORT, BATCH_SORT, LOCAL_SORT or GLOBAL_SORT.");
+ }
} else {
throw new InvalidConfigurationException(
"The key " + key + " not supported for dynamic configuration.");
http://git-wip-us.apache.org/repos/asf/carbondata/blob/77d2b4e8/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableWithSortScope.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableWithSortScope.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableWithSortScope.scala
index 890475d..dac9124 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableWithSortScope.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableWithSortScope.scala
@@ -32,25 +32,6 @@ class TestCreateTableWithSortScope extends QueryTest with BeforeAndAfterAll {
sql("DROP TABLE IF EXISTS tableWithBatchSort")
sql("DROP TABLE IF EXISTS tableWithNoSort")
sql("DROP TABLE IF EXISTS tableWithUnsupportSortScope")
- sql("DROP TABLE IF EXISTS tableLoadWithSortScope")
- }
-
- test("Do not support load data with specify sort scope") {
- sql(
- s"""
- | CREATE TABLE tableLoadWithSortScope(
- | intField INT,
- | stringField STRING
- | )
- | STORED BY 'carbondata'
- | TBLPROPERTIES('SORT_COLUMNS'='stringField')
- """.stripMargin)
-
- val exception_loaddata_sortscope: Exception = intercept[Exception] {
- sql("LOAD DATA LOCAL INPATH '/path/to/data' INTO TABLE tableLoadWithSortScope " +
- "OPTIONS('SORT_SCOPE'='GLOBAL_SORT')")
- }
- assert(exception_loaddata_sortscope.getMessage.contains("Error: Invalid option(s): sort_scope"))
}
test("test create table with sort scope in normal cases") {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/77d2b4e8/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
index b382693..2138580 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
@@ -109,7 +109,7 @@ object StreamSinkFactory {
carbonLoadModel.setSegmentId(segmentId)
// Used to generate load commands for child tables in case auto-handoff is fired.
- val loadMetaEvent = new LoadMetadataEvent(carbonTable, false)
+ val loadMetaEvent = new LoadMetadataEvent(carbonTable, false, parameters.asJava)
OperationListenerBus.getInstance().fireEvent(loadMetaEvent, operationContext)
// start server if necessary
val server = startDictionaryServer(
http://git-wip-us.apache.org/repos/asf/carbondata/blob/77d2b4e8/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index 4b14879..a669931 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -1105,7 +1105,8 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
"SORT_COLUMN_BOUNDS",
"LOAD_MIN_SIZE_INMB",
"RANGE_COLUMN",
- "SCALE_FACTOR"
+ "SCALE_FACTOR",
+ "SORT_SCOPE"
)
var isSupported = true
val invalidOptions = StringBuilder.newBuilder
http://git-wip-us.apache.org/repos/asf/carbondata/blob/77d2b4e8/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
index 6defb0a..419fa16 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
@@ -95,9 +95,9 @@ case class CarbonAlterTableCompactionCommand(
// If set to true then only loadCommands for compaction will be created.
val loadMetadataEvent =
if (alterTableModel.compactionType.equalsIgnoreCase(CompactionType.STREAMING.name())) {
- new LoadMetadataEvent(table, false)
+ new LoadMetadataEvent(table, false, Map.empty[String, String].asJava)
} else {
- new LoadMetadataEvent(table, true)
+ new LoadMetadataEvent(table, true, Map.empty[String, String].asJava)
}
OperationListenerBus.getInstance().fireEvent(loadMetadataEvent, operationContext)
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/77d2b4e8/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 67172af..95c0767 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -142,7 +142,7 @@ case class CarbonLoadDataCommand(
}
operationContext.setProperty("isOverwrite", isOverwriteTable)
if(CarbonUtil.hasAggregationDataMap(table)) {
- val loadMetadataEvent = new LoadMetadataEvent(table, false)
+ val loadMetadataEvent = new LoadMetadataEvent(table, false, options.asJava)
OperationListenerBus.getInstance().fireEvent(loadMetadataEvent, operationContext)
}
Seq.empty
@@ -191,10 +191,34 @@ case class CarbonLoadDataCommand(
optionsFinal
.put("complex_delimiter_level_4",
ComplexDelimitersEnum.COMPLEX_DELIMITERS_LEVEL_4.value())
- optionsFinal.put("sort_scope", tableProperties.asScala.getOrElse("sort_scope",
- carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
- carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
- CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))))
+
+ /**
+ * Priority of sort_scope assignment :
+ * -----------------------------------
+ *
+ * 1. Load Options ->
+ * LOAD DATA INPATH 'data.csv' INTO TABLE tableName OPTIONS('sort_scope'='no_sort')
+ *
+ * 2. Session property CARBON_TABLE_LOAD_SORT_SCOPE ->
+ * SET CARBON.TABLE.LOAD.SORT.SCOPE.database.table=no_sort
+ * SET CARBON.TABLE.LOAD.SORT.SCOPE.database.table=batch_sort
+ * SET CARBON.TABLE.LOAD.SORT.SCOPE.database.table=local_sort
+ * SET CARBON.TABLE.LOAD.SORT.SCOPE.database.table=global_sort
+ *
+ * 3. Sort Scope provided in TBLPROPERTIES
+ * 4. Session property CARBON_OPTIONS_SORT_SCOPE
+ * 5. Default Sort Scope LOAD_SORT_SCOPE
+ */
+ optionsFinal.put("sort_scope",
+ options.getOrElse("sort_scope",
+ carbonProperty.getProperty(
+ CarbonLoadOptionConstants.CARBON_TABLE_LOAD_SORT_SCOPE + table.getDatabaseName + "." +
+ table.getTableName,
+ tableProperties.asScala.getOrElse("sort_scope",
+ carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
+ carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
+ CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))))))
+
optionsFinal
.put("bad_record_path", CarbonBadRecordUtil.getBadRecordsPath(options.asJava, table))
val factPath = if (dataFrame.isDefined) {
@@ -304,6 +328,7 @@ case class CarbonLoadDataCommand(
}
val partitionStatus = SegmentStatus.SUCCESS
val columnar = sparkSession.conf.get("carbon.is.columnar.storage", "true").toBoolean
+ LOGGER.info("Sort Scope : " + carbonLoadModel.getSortScope)
if (carbonLoadModel.getUseOnePass) {
loadDataUsingOnePass(
sparkSession,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/77d2b4e8/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
index eb98264..4a0b492 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
@@ -349,7 +349,8 @@ object CompactionProcessMetaListener extends OperationEventListener {
TableIdentifier(childTableName, Some(childDatabaseName)),
childDataFrame,
false,
- sparkSession)
+ sparkSession,
+ mutable.Map.empty[String, String])
val uuid = Option(operationContext.getProperty("uuid")).
getOrElse(UUID.randomUUID()).toString
operationContext.setProperty("uuid", uuid)
@@ -377,7 +378,8 @@ object CompactionProcessMetaListener extends OperationEventListener {
TableIdentifier(childTableName, Some(childDatabaseName)),
childDataFrame,
false,
- sparkSession)
+ sparkSession,
+ mutable.Map.empty[String, String])
val uuid = Option(operationContext.getProperty("uuid")).getOrElse(UUID.randomUUID()).toString
loadCommand.processMetadata(sparkSession)
operationContext.setProperty(table.getTableName + "_Compaction", loadCommand)
@@ -453,6 +455,7 @@ object LoadProcessMetaListener extends OperationEventListener {
childDataFrame,
isOverwrite,
sparkSession,
+ tableEvent.getOptions.asScala,
timeseriesParentTableName = childSelectQuery._2)
operationContext.setProperty("uuid", uuid)
loadCommand.operationContext.setProperty("uuid", uuid)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/77d2b4e8/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala
index 4116ed6..b729347 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala
@@ -275,7 +275,8 @@ case class PreAggregateTableHelper(
tableIdentifier,
dataFrame,
isOverwrite = false,
- sparkSession = sparkSession)
+ sparkSession = sparkSession,
+ mutable.Map.empty[String, String])
loadCommand.processMetadata(sparkSession)
Seq.empty
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/77d2b4e8/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
index 4e5b764..0314dd8 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.execution.command.preaaggregate
import scala.collection.JavaConverters._
+import scala.collection.mutable
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, CarbonSession, SparkSession, _}
@@ -895,6 +896,7 @@ object PreAggregateUtil {
dataFrame: DataFrame,
isOverwrite: Boolean,
sparkSession: SparkSession,
+ options: mutable.Map[String, String],
timeseriesParentTableName: String = ""): CarbonLoadDataCommand = {
val headers = columns.asScala.filter { column =>
!column.getColumnName.equalsIgnoreCase(CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/77d2b4e8/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
index 9f97828..5cc5bc8 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.command.table.CarbonDropTableCommand
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
-import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonCommonConstantsInternal}
+import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonCommonConstantsInternal, CarbonLoadOptionConstants}
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, SessionParams}
case class CarbonDropDatabaseCommand(command: DropDatabaseCommand)
@@ -84,10 +84,7 @@ case class CarbonSetCommand(command: SetCommand)
object CarbonSetCommand {
def validateAndSetValue(sessionParams: SessionParams, key: String, value: String): Unit = {
val isCarbonProperty: Boolean = CarbonProperties.getInstance().isCarbonProperty(key)
- if (isCarbonProperty) {
- sessionParams.addProperty(key, value)
- }
- else if (key.startsWith(CarbonCommonConstants.CARBON_INPUT_SEGMENTS)) {
+ if (key.startsWith(CarbonCommonConstants.CARBON_INPUT_SEGMENTS)) {
if (key.split("\\.").length == 5) {
sessionParams.addProperty(key.toLowerCase(), value)
}
@@ -117,6 +114,18 @@ object CarbonSetCommand {
"property should be in \" carbon.load.datamaps.parallel.<database_name>" +
".<table_name>=<true/false> \" format.")
}
+ } else if (key.startsWith(CarbonLoadOptionConstants.CARBON_TABLE_LOAD_SORT_SCOPE)) {
+ if (key.split("\\.").length == 7) {
+ sessionParams.addProperty(key.toLowerCase(), value)
+ }
+ else {
+ throw new MalformedCarbonCommandException(
+ "property should be in \" carbon.table.load.sort.scope.<database_name>" +
+ ".<table_name>=<sort_sope> \" format.")
+ }
+ }
+ else if (isCarbonProperty) {
+ sessionParams.addProperty(key, value)
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/77d2b4e8/integration/spark2/src/test/scala/org/apache/spark/carbondata/commands/SetCommandTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/commands/SetCommandTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/commands/SetCommandTestCase.scala
index 1610d8d..8b03630 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/commands/SetCommandTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/commands/SetCommandTestCase.scala
@@ -128,6 +128,34 @@ class SetCommandTestCase extends Spark2QueryTest with BeforeAndAfterAll{
sql(s"set ${CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS}"))
}
}
+
+ test(s"test set carbon.table.load.sort.scope for valid options") {
+ checkAnswer(
+ sql(s"set carbon.table.load.sort.scope.db.tbl=no_sort"),
+ sql(s"set carbon.table.load.sort.scope.db.tbl"))
+
+ checkAnswer(
+ sql(s"set carbon.table.load.sort.scope.db.tbl=batch_sort"),
+ sql(s"set carbon.table.load.sort.scope.db.tbl"))
+
+ checkAnswer(
+ sql(s"set carbon.table.load.sort.scope.db.tbl=local_sort"),
+ sql(s"set carbon.table.load.sort.scope.db.tbl"))
+
+ checkAnswer(
+ sql(s"set carbon.table.load.sort.scope.db.tbl=global_sort"),
+ sql(s"set carbon.table.load.sort.scope.db.tbl"))
+ }
+
+ test(s"test set carbon.table.load.sort.scope for invalid options")
+ {
+ intercept[InvalidConfigurationException] {
+ checkAnswer(
+ sql(s"set carbon.table.load.sort.scope.db.tbl=fake_sort"),
+ sql(s"set carbon.table.load.sort.scope.db.tbl"))
+ }
+ }
+
override def afterAll {
sqlContext.sparkSession.catalog.clearCache()
sql("reset")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/77d2b4e8/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java b/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java
index 1e53817..c55af83 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java
@@ -17,6 +17,8 @@
package org.apache.carbondata.processing.loading.events;
+import java.util.Map;
+
import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.events.Event;
@@ -101,9 +103,13 @@ public class LoadEvents {
public static class LoadMetadataEvent extends Event {
private CarbonTable carbonTable;
private boolean isCompaction;
- public LoadMetadataEvent(CarbonTable carbonTable, boolean isCompaction) {
+ private Map<String, String> options;
+
+ public LoadMetadataEvent(CarbonTable carbonTable, boolean isCompaction,
+ Map<String, String> options) {
this.carbonTable = carbonTable;
this.isCompaction = isCompaction;
+ this.options = options;
}
public boolean isCompaction() {
return isCompaction;
@@ -111,6 +117,11 @@ public class LoadEvents {
public CarbonTable getCarbonTable() {
return carbonTable;
}
+
+
+ public Map<String, String> getOptions() {
+ return options;
+ }
}
public static class LoadTablePostStatusUpdateEvent extends Event {