You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/06/15 11:50:17 UTC
[12/42] carbondata git commit: Added batch sort to load options and
added test cases
Added batch sort to load options and added test cases
Added sort_scope to load options
rebase
rebase
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/d734f530
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/d734f530
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/d734f530
Branch: refs/heads/branch-1.1
Commit: d734f53006308a675af30acefa798c814ada3329
Parents: 211c23b
Author: ravipesala <ra...@gmail.com>
Authored: Thu May 11 23:54:30 2017 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Thu Jun 15 12:56:20 2017 +0530
----------------------------------------------------------------------
.../core/constants/CarbonCommonConstants.java | 10 +-
.../carbondata/hadoop/CarbonInputSplit.java | 16 +-
.../dataload/TestBatchSortDataLoad.scala | 230 +++++++++++++++++++
.../spark/sql/catalyst/CarbonDDLSqlParser.scala | 11 +-
.../execution/command/carbonTableSchema.scala | 4 +
.../execution/command/carbonTableSchema.scala | 4 +
.../DataLoadFailAllTypeSortTest.scala | 27 ++-
.../processing/model/CarbonLoadModel.java | 30 ++-
.../newflow/DataLoadProcessBuilder.java | 12 +-
.../newflow/sort/SortScopeOptions.java | 63 +++++
.../processing/newflow/sort/SorterFactory.java | 7 +-
.../newflow/sort/unsafe/UnsafeSortDataRows.java | 5 +-
.../sortandgroupby/sortdata/SortParameters.java | 13 ++
.../util/CarbonDataProcessorUtil.java | 51 ++++
14 files changed, 449 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d734f530/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 269a75f..e1f3e9d 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1104,15 +1104,15 @@ public final class CarbonCommonConstants {
/**
* Sorts the data in batches and writes the batch data to store with index file.
*/
- public static final String LOAD_USE_BATCH_SORT = "carbon.load.use.batch.sort";
+ public static final String LOAD_SORT_SCOPE = "carbon.load.sort.scope";
/**
- * If set to true, the sorting scope is smaller and more index tree will be created,
+ * If set to BATCH_SORT, the sorting scope is smaller and more index tree will be created,
* thus loading is faster but query maybe slower.
- * If set to false, the sorting scope is bigger and one index tree per data node will be created,
- * thus loading is slower but query is faster.
+ * If set to LOCAL_SORT, the sorting scope is bigger and one index tree per data node will be
+ * created, thus loading is slower but query is faster.
*/
- public static final String LOAD_USE_BATCH_SORT_DEFAULT = "false";
+ public static final String LOAD_SORT_SCOPE_DEFAULT = "LOCAL_SORT";
/**
* Size of batch data to keep in memory, as a thumb rule it supposed
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d734f530/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
index 0dcaba2..08661a2 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
@@ -31,6 +31,7 @@ import org.apache.carbondata.core.datastore.block.Distributable;
import org.apache.carbondata.core.datastore.block.TableBlockInfo;
import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
import org.apache.carbondata.core.mutate.UpdateVO;
+import org.apache.carbondata.core.util.ByteUtil;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.hadoop.internal.index.Block;
@@ -84,7 +85,11 @@ public class CarbonInputSplit extends FileSplit
ColumnarFormatVersion version) {
super(path, start, length, locations);
this.segmentId = segmentId;
- this.taskId = CarbonTablePath.DataFileUtil.getTaskNo(path.getName());
+ String taskNo = CarbonTablePath.DataFileUtil.getTaskNo(path.getName());
+ if (taskNo.contains("_")) {
+ taskNo = taskNo.split("_")[0];
+ }
+ this.taskId = taskNo;
this.bucketId = CarbonTablePath.DataFileUtil.getBucketNo(path.getName());
this.invalidSegments = new ArrayList<>();
this.version = version;
@@ -237,10 +242,11 @@ public class CarbonInputSplit extends FileSplit
String filePath1 = this.getPath().getName();
String filePath2 = other.getPath().getName();
if (CarbonTablePath.isCarbonDataFile(filePath1)) {
- int firstTaskId = Integer.parseInt(CarbonTablePath.DataFileUtil.getTaskNo(filePath1));
- int otherTaskId = Integer.parseInt(CarbonTablePath.DataFileUtil.getTaskNo(filePath2));
- if (firstTaskId != otherTaskId) {
- return firstTaskId - otherTaskId;
+ byte[] firstTaskId = CarbonTablePath.DataFileUtil.getTaskNo(filePath1).getBytes();
+ byte[] otherTaskId = CarbonTablePath.DataFileUtil.getTaskNo(filePath2).getBytes();
+ int compare = ByteUtil.compare(firstTaskId, otherTaskId);
+ if (compare != 0) {
+ return compare;
}
int firstBucketNo = Integer.parseInt(CarbonTablePath.DataFileUtil.getBucketNo(filePath1));
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d734f530/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala
new file mode 100644
index 0000000..70007c6
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.dataload
+
+import java.io.{BufferedWriter, File, FileWriter, FilenameFilter}
+
+import org.apache.spark.sql.common.util.QueryTest
+import org.apache.spark.sql.Row
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+class TestBatchSortDataLoad extends QueryTest with BeforeAndAfterAll {
+ var filePath: String = _
+
+
+ def buildTestData() = {
+ filePath = s"${integrationPath}/spark-common-test/target/big.csv"
+ val file = new File(filePath)
+ val writer = new BufferedWriter(new FileWriter(file))
+ writer.write("c1,c2,c3, c4, c5, c6, c7, c8, c9, c10")
+ writer.newLine()
+ for(i <- 0 until 200000) {
+ writer.write("a" + i%1000 + "," +
+ "b" + i%1000 + "," +
+ "c" + i%1000 + "," +
+ "d" + i%1000 + "," +
+ "e" + i%1000 + "," +
+ "f" + i%1000 + "," +
+ i%1000 + "," +
+ i%1000 + "," +
+ i%1000 + "," +
+ i%1000 + "\n")
+ if ( i % 10000 == 0) {
+ writer.flush()
+ }
+ }
+ writer.close()
+ }
+
+ def dropTable() = {
+ sql("DROP TABLE IF EXISTS carbon_load1")
+ sql("DROP TABLE IF EXISTS carbon_load2")
+ sql("DROP TABLE IF EXISTS carbon_load3")
+ sql("DROP TABLE IF EXISTS carbon_load4")
+ sql("DROP TABLE IF EXISTS carbon_load5")
+ sql("DROP TABLE IF EXISTS carbon_load6")
+ }
+
+
+
+ override def beforeAll {
+ dropTable
+ buildTestData
+ }
+
+
+
+ test("test batch sort load by passing option to load command") {
+
+ sql(
+ """
+ | CREATE TABLE carbon_load1(c1 string, c2 string, c3 string, c4 string, c5 string,
+ | c6 string, c7 int, c8 int, c9 int, c10 int)
+ | STORED BY 'org.apache.carbondata.format'
+ """.stripMargin)
+
+ sql(s"LOAD DATA LOCAL INPATH '$filePath' into table carbon_load1 " +
+ s"OPTIONS('sort_scope'='batch_sort', 'batch_sort_size_inmb'='1')")
+
+ checkAnswer(sql("select count(*) from carbon_load1"), Seq(Row(200000)))
+
+ assert(getIndexfileCount("carbon_load1") == 12, "Something wrong in batch sort")
+ }
+
+ test("test batch sort load by passing option to load command and compare with normal load") {
+
+ sql(
+ """
+ | CREATE TABLE carbon_load2(c1 string, c2 string, c3 string, c4 string, c5 string,
+ | c6 string, c7 int, c8 int, c9 int, c10 int)
+ | STORED BY 'org.apache.carbondata.format'
+ """.stripMargin)
+
+ sql(s"LOAD DATA LOCAL INPATH '$filePath' into table carbon_load2 ")
+
+ checkAnswer(sql("select * from carbon_load1 where c1='a1' order by c1"),
+ sql("select * from carbon_load2 where c1='a1' order by c1"))
+
+ }
+
+ test("test batch sort load by passing option and compaction") {
+
+ sql(s"LOAD DATA LOCAL INPATH '$filePath' into table carbon_load1 " +
+ s"OPTIONS('sort_scope'='batch_sort', 'batch_sort_size_inmb'='1')")
+ sql(s"LOAD DATA LOCAL INPATH '$filePath' into table carbon_load1 " +
+ s"OPTIONS('sort_scope'='batch_sort', 'batch_sort_size_inmb'='1')")
+ sql(s"LOAD DATA LOCAL INPATH '$filePath' into table carbon_load1 " +
+ s"OPTIONS('sort_scope'='batch_sort', 'batch_sort_size_inmb'='1')")
+ sql("alter table carbon_load1 compact 'major'")
+ Thread.sleep(4000)
+ checkAnswer(sql("select count(*) from carbon_load1"), Seq(Row(800000)))
+
+ assert(getIndexfileCount("carbon_load1", "0.1") == 1, "Something wrong in compaction after batch sort")
+
+ }
+
+ test("test batch sort load by passing option in one load and with out option in other load and then do compaction") {
+
+ sql(
+ """
+ | CREATE TABLE carbon_load5(c1 string, c2 string, c3 string, c4 string, c5 string,
+ | c6 string, c7 int, c8 int, c9 int, c10 int)
+ | STORED BY 'org.apache.carbondata.format'
+ """.stripMargin)
+
+ sql(s"LOAD DATA LOCAL INPATH '$filePath' into table carbon_load5 " +
+ s"OPTIONS('sort_scope'='batch_sort', 'batch_sort_size_inmb'='1')")
+ sql(s"LOAD DATA LOCAL INPATH '$filePath' into table carbon_load5 ")
+ sql(s"LOAD DATA LOCAL INPATH '$filePath' into table carbon_load5 " +
+ s"OPTIONS('sort_scope'='batch_sort', 'batch_sort_size_inmb'='1')")
+ sql(s"LOAD DATA LOCAL INPATH '$filePath' into table carbon_load5 ")
+
+ checkAnswer(sql("select count(*) from carbon_load5"), Seq(Row(800000)))
+
+ checkAnswer(sql("select * from carbon_load1 where c1='a1' order by c1"),
+ sql("select * from carbon_load5 where c1='a1' order by c1"))
+
+ sql("alter table carbon_load5 compact 'major'")
+ Thread.sleep(4000)
+
+ assert(getIndexfileCount("carbon_load5", "0.1") == 1,
+ "Something wrong in compaction after batch sort")
+
+ checkAnswer(sql("select * from carbon_load1 where c1='a1' order by c1"),
+ sql("select * from carbon_load5 where c1='a1' order by c1"))
+
+ }
+
+ test("test batch sort load by passing option with single pass") {
+
+ sql(
+ """
+ | CREATE TABLE carbon_load3(c1 string, c2 string, c3 string, c4 string, c5 string,
+ | c6 string, c7 int, c8 int, c9 int, c10 int)
+ | STORED BY 'org.apache.carbondata.format'
+ """.stripMargin)
+
+ sql(s"LOAD DATA LOCAL INPATH '$filePath' into table carbon_load3 " +
+ s"OPTIONS('sort_scope'='batch_sort', 'batch_sort_size_inmb'='1', 'single_pass'='true')")
+
+ checkAnswer(sql("select count(*) from carbon_load3"), Seq(Row(200000)))
+
+ assert(getIndexfileCount("carbon_load3") == 12, "Something wrong in batch sort")
+
+ checkAnswer(sql("select * from carbon_load3 where c1='a1' order by c1"),
+ sql("select * from carbon_load2 where c1='a1' order by c1"))
+
+ }
+
+ test("test batch sort load by with out passing option but through carbon properties") {
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, "BATCH_SORT")
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB, "1")
+ sql(
+ """
+ | CREATE TABLE carbon_load4(c1 string, c2 string, c3 string, c4 string, c5 string,
+ | c6 string, c7 int, c8 int, c9 int, c10 int)
+ | STORED BY 'org.apache.carbondata.format'
+ """.stripMargin)
+
+ sql(s"LOAD DATA LOCAL INPATH '$filePath' into table carbon_load4 " )
+
+ checkAnswer(sql("select count(*) from carbon_load4"), Seq(Row(200000)))
+
+ assert(getIndexfileCount("carbon_load4") == 12, "Something wrong in batch sort")
+ CarbonProperties.getInstance().
+ addProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
+ CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB, "0")
+ }
+
+ test("test batch sort load by with out passing option but through carbon properties with default size") {
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, "BATCH_SORT")
+ sql(
+ """
+ | CREATE TABLE carbon_load6(c1 string, c2 string, c3 string, c4 string, c5 string,
+ | c6 string, c7 int, c8 int, c9 int, c10 int)
+ | STORED BY 'org.apache.carbondata.format'
+ """.stripMargin)
+
+ sql(s"LOAD DATA LOCAL INPATH '$filePath' into table carbon_load6 " )
+
+ checkAnswer(sql("select count(*) from carbon_load6"), Seq(Row(200000)))
+
+ assert(getIndexfileCount("carbon_load6") == 1, "Something wrong in batch sort")
+ CarbonProperties.getInstance().
+ addProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
+ CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)
+ }
+
+ def getIndexfileCount(tableName: String, segmentNo: String = "0"): Int = {
+ val store = storeLocation +"/default/"+ tableName + "/Fact/Part0/Segment_"+segmentNo
+ val list = new File(store).list(new FilenameFilter {
+ override def accept(dir: File, name: String) = name.endsWith(".carbonindex")
+ })
+ list.size
+ }
+
+ override def afterAll {
+ dropTable
+ new File(filePath).delete()
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d734f530/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index afc4a58..a701c72 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -35,6 +35,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.metadata.datatype.DataType
import org.apache.carbondata.core.util.DataTypeUtil
import org.apache.carbondata.processing.constants.LoggerAction
+import org.apache.carbondata.processing.newflow.sort.SortScopeOptions
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
import org.apache.carbondata.spark.util.CommonUtil
@@ -753,7 +754,7 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
"COMPLEX_DELIMITER_LEVEL_1", "COMPLEX_DELIMITER_LEVEL_2", "COLUMNDICT",
"SERIALIZATION_NULL_FORMAT", "BAD_RECORDS_LOGGER_ENABLE", "BAD_RECORDS_ACTION",
"ALL_DICTIONARY_PATH", "MAXCOLUMNS", "COMMENTCHAR", "DATEFORMAT",
- "SINGLE_PASS", "IS_EMPTY_DATA_BAD_RECORD"
+ "SINGLE_PASS", "IS_EMPTY_DATA_BAD_RECORD", "SORT_SCOPE", "BATCH_SORT_SIZE_INMB"
)
var isSupported = true
val invalidOptions = StringBuilder.newBuilder
@@ -808,6 +809,14 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
}
}
+ if (options.exists(_._1.equalsIgnoreCase("SORT_SCOPE"))) {
+ val optionValue: String = options.get("sort_scope").get.head._2
+ if (!SortScopeOptions.isValidSortOption(optionValue)) {
+ throw new MalformedCarbonCommandException(
+ "option SORT_SCOPE can have option either BATCH_SORT or LOCAL_SORT or GLOBAL_SORT")
+ }
+ }
+
// check for duplicate options
val duplicateOptions = options filter {
case (_, optionlist) => optionlist.size > 1
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d734f530/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 1192e08..494beff 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -405,6 +405,8 @@ case class LoadTable(
val dateFormat = options.getOrElse("dateformat", null)
validateDateFormat(dateFormat, table)
val maxColumns = options.getOrElse("maxcolumns", null)
+ val sortScope = options.getOrElse("sort_scope", null)
+ val batchSortSizeInMB = options.getOrElse("batch_sort_size_inmb", null)
carbonLoadModel.setEscapeChar(checkDefaultValue(escapeChar, "\\"))
carbonLoadModel.setQuoteChar(checkDefaultValue(quoteChar, "\""))
@@ -428,6 +430,8 @@ case class LoadTable(
carbonLoadModel
.setIsEmptyDataBadRecord(
DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD + "," + isEmptyDataBadRecord)
+ carbonLoadModel.setSortScope(sortScope)
+ carbonLoadModel.setBatchSortSizeInMb(batchSortSizeInMB)
// when single_pass=true, and not use all dict
val useOnePass = options.getOrElse("single_pass", "false").trim.toLowerCase match {
case "true" =>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d734f530/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index e2405f2..09824d8 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -417,6 +417,8 @@ case class LoadTable(
val dateFormat = options.getOrElse("dateformat", null)
validateDateFormat(dateFormat, table)
val maxColumns = options.getOrElse("maxcolumns", null)
+ val sortScope = options.getOrElse("sort_scope", null)
+ val batchSortSizeInMB = options.getOrElse("batch_sort_size_inmb", null)
carbonLoadModel.setEscapeChar(checkDefaultValue(escapeChar, "\\"))
carbonLoadModel.setQuoteChar(checkDefaultValue(quoteChar, "\""))
carbonLoadModel.setCommentChar(checkDefaultValue(commentChar, "#"))
@@ -439,6 +441,8 @@ case class LoadTable(
carbonLoadModel
.setIsEmptyDataBadRecord(
DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD + "," + isEmptyDataBadRecord)
+ carbonLoadModel.setSortScope(sortScope)
+ carbonLoadModel.setBatchSortSizeInMb(batchSortSizeInMB)
val useOnePass = options.getOrElse("single_pass", "false").trim.toLowerCase match {
case "true" =>
true
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d734f530/integration/spark2/src/test/scala/org/apache/spark/carbondata/DataLoadFailAllTypeSortTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/DataLoadFailAllTypeSortTest.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/DataLoadFailAllTypeSortTest.scala
index 0465aa7..5e91574 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/DataLoadFailAllTypeSortTest.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/DataLoadFailAllTypeSortTest.scala
@@ -116,9 +116,9 @@ class DataLoadFailAllTypeSortTest extends QueryTest with BeforeAndAfterAll {
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
CarbonProperties.getInstance()
- .addProperty(CarbonCommonConstants.LOAD_USE_BATCH_SORT, "true");
+ .addProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, "batch_sort")
CarbonProperties.getInstance()
- .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "FAIL");
+ .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "FAIL")
sql("create table data_bm(name String, dob long, weight int) " +
"STORED BY 'org.apache.carbondata.format'")
val testData = s"$resourcesPath/badrecords/dummy.csv"
@@ -132,7 +132,8 @@ class DataLoadFailAllTypeSortTest extends QueryTest with BeforeAndAfterAll {
}
finally {
CarbonProperties.getInstance()
- .addProperty(CarbonCommonConstants.LOAD_USE_BATCH_SORT, "false");
+ .addProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
+ CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT);
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT);
@@ -148,9 +149,9 @@ class DataLoadFailAllTypeSortTest extends QueryTest with BeforeAndAfterAll {
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
CarbonProperties.getInstance()
- .addProperty(CarbonCommonConstants.LOAD_USE_BATCH_SORT, "true");
+ .addProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, "BATCH_SORT")
CarbonProperties.getInstance()
- .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "FORCE");
+ .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "FORCE")
sql("create table data_bmf(name String, dob long, weight int) " +
"STORED BY 'org.apache.carbondata.format'")
val testData = s"$resourcesPath/badrecords/dummy.csv"
@@ -166,10 +167,11 @@ class DataLoadFailAllTypeSortTest extends QueryTest with BeforeAndAfterAll {
}
finally {
CarbonProperties.getInstance()
- .addProperty(CarbonCommonConstants.LOAD_USE_BATCH_SORT, "false");
+ .addProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
+ CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
- CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT);
+ CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT)
}
}
@@ -182,7 +184,7 @@ class DataLoadFailAllTypeSortTest extends QueryTest with BeforeAndAfterAll {
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
CarbonProperties.getInstance()
- .addProperty(CarbonCommonConstants.LOAD_USE_BATCH_SORT, "true");
+ .addProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, "BATCH_SORT")
sql("create table data_bm_no_good_data(name String, dob long, weight int) " +
"STORED BY 'org.apache.carbondata.format'")
val testData = s"$resourcesPath/badrecords/dummy2.csv"
@@ -198,10 +200,11 @@ class DataLoadFailAllTypeSortTest extends QueryTest with BeforeAndAfterAll {
}
finally {
CarbonProperties.getInstance()
- .addProperty(CarbonCommonConstants.LOAD_USE_BATCH_SORT, "false");
+ .addProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
+ CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
- CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT);
+ CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT)
}
}
@@ -214,7 +217,7 @@ class DataLoadFailAllTypeSortTest extends QueryTest with BeforeAndAfterAll {
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
CarbonProperties.getInstance()
- .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "FAIL");
+ .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "FAIL")
sql("create table data_tbm(name String, dob long, weight int) " +
"USING org.apache.spark.sql.CarbonSource OPTIONS('bucketnumber'='4', " +
"'bucketcolumns'='name', 'tableName'='data_tbm')")
@@ -232,7 +235,7 @@ class DataLoadFailAllTypeSortTest extends QueryTest with BeforeAndAfterAll {
finally {
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
- CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT);
+ CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT)
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d734f530/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
index d8f84bf..3a2e2eb 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
@@ -171,7 +171,15 @@ public class CarbonLoadModel implements Serializable {
*/
private boolean preFetch;
- private String numberOfcolumns;
+ /**
+ * Batch sort should be enabled or not
+ */
+ private String sortScope;
+
+ /**
+ * Batch sort size in mb.
+ */
+ private String batchSortSizeInMb;
/**
* get escape char
*
@@ -391,6 +399,8 @@ public class CarbonLoadModel implements Serializable {
copy.dictionaryServerPort = dictionaryServerPort;
copy.preFetch = preFetch;
copy.isEmptyDataBadRecord = isEmptyDataBadRecord;
+ copy.sortScope = sortScope;
+ copy.batchSortSizeInMb = batchSortSizeInMb;
return copy;
}
@@ -442,6 +452,8 @@ public class CarbonLoadModel implements Serializable {
copyObj.dictionaryServerPort = dictionaryServerPort;
copyObj.preFetch = preFetch;
copyObj.isEmptyDataBadRecord = isEmptyDataBadRecord;
+ copyObj.sortScope = sortScope;
+ copyObj.batchSortSizeInMb = batchSortSizeInMb;
return copyObj;
}
@@ -773,4 +785,20 @@ public class CarbonLoadModel implements Serializable {
public void setIsEmptyDataBadRecord(String isEmptyDataBadRecord) {
this.isEmptyDataBadRecord = isEmptyDataBadRecord;
}
+
+ public String getSortScope() {
+ return sortScope;
+ }
+
+ public void setSortScope(String sortScope) {
+ this.sortScope = sortScope;
+ }
+
+ public String getBatchSortSizeInMb() {
+ return batchSortSizeInMb;
+ }
+
+ public void setBatchSortSizeInMb(String batchSortSizeInMb) {
+ this.batchSortSizeInMb = batchSortSizeInMb;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d734f530/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
index 8865518..5c7c035 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
@@ -35,6 +35,8 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.processing.model.CarbonLoadModel;
import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants;
+import org.apache.carbondata.processing.newflow.sort.SortScopeOptions;
+import org.apache.carbondata.processing.newflow.steps.CarbonRowDataWriterProcessorStepImpl;
import org.apache.carbondata.processing.newflow.steps.DataConverterProcessorStepImpl;
import org.apache.carbondata.processing.newflow.steps.DataConverterProcessorWithBucketingStepImpl;
import org.apache.carbondata.processing.newflow.steps.DataWriterBatchProcessorStepImpl;
@@ -53,14 +55,12 @@ public final class DataLoadProcessBuilder {
public AbstractDataLoadProcessorStep build(CarbonLoadModel loadModel, String storeLocation,
CarbonIterator[] inputIterators) throws Exception {
- boolean batchSort = Boolean.parseBoolean(CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.LOAD_USE_BATCH_SORT,
- CarbonCommonConstants.LOAD_USE_BATCH_SORT_DEFAULT));
CarbonDataLoadConfiguration configuration =
createConfiguration(loadModel, storeLocation);
+ SortScopeOptions.SortScope sortScope = CarbonDataProcessorUtil.getSortScope(configuration);
if (configuration.getBucketingInfo() != null) {
return buildInternalForBucketing(inputIterators, configuration);
- } else if (batchSort) {
+ } else if (sortScope.equals(SortScopeOptions.SortScope.BATCH_SORT)) {
return buildInternalForBatchSort(inputIterators, configuration);
} else {
return buildInternal(inputIterators, configuration);
@@ -158,6 +158,10 @@ public final class DataLoadProcessBuilder {
loadModel.getIsEmptyDataBadRecord().split(",")[1]);
configuration.setDataLoadProperty(DataLoadProcessorConstants.FACT_FILE_PATH,
loadModel.getFactFilePath());
+ configuration
+ .setDataLoadProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, loadModel.getSortScope());
+ configuration.setDataLoadProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
+ loadModel.getBatchSortSizeInMb());
CarbonMetadata.getInstance().addCarbonTable(carbonTable);
List<CarbonDimension> dimensions =
carbonTable.getDimensionByTableName(carbonTable.getFactTableName());
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d734f530/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortScopeOptions.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortScopeOptions.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortScopeOptions.java
new file mode 100644
index 0000000..f2534db
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortScopeOptions.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.newflow.sort;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+
+/**
+ * Sort scope options
+ */
+public class SortScopeOptions {
+
+ public static SortScope getSortScope(String sortScope) {
+ if (sortScope == null) {
+ sortScope = CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT;
+ }
+ switch (sortScope.toUpperCase()) {
+ case "BATCH_SORT":
+ return SortScope.BATCH_SORT;
+ case "LOCAL_SORT":
+ return SortScope.LOCAL_SORT;
+ case "NO_SORT":
+ return SortScope.NO_SORT;
+ default:
+ return SortScope.LOCAL_SORT;
+ }
+ }
+
+ public static boolean isValidSortOption(String sortScope) {
+ if (sortScope == null) {
+ return false;
+ }
+ switch (sortScope.toUpperCase()) {
+ case "BATCH_SORT":
+ return true;
+ case "LOCAL_SORT":
+ return true;
+ case "NO_SORT":
+ return true;
+ default:
+ return false;
+ }
+ }
+
+ public enum SortScope {
+ NO_SORT, BATCH_SORT, LOCAL_SORT;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d734f530/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SorterFactory.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SorterFactory.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SorterFactory.java
index 60cca69..39a21ad 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SorterFactory.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SorterFactory.java
@@ -29,6 +29,7 @@ import org.apache.carbondata.processing.newflow.sort.impl.ParallelReadMergeSorte
import org.apache.carbondata.processing.newflow.sort.impl.UnsafeBatchParallelReadMergeSorterImpl;
import org.apache.carbondata.processing.newflow.sort.impl.UnsafeParallelReadMergeSorterImpl;
import org.apache.carbondata.processing.newflow.sort.impl.UnsafeParallelReadMergeSorterWithBucketingImpl;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
public class SorterFactory {
@@ -39,9 +40,7 @@ public class SorterFactory {
boolean offheapsort = Boolean.parseBoolean(CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT));
- boolean batchSort = Boolean.parseBoolean(CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.LOAD_USE_BATCH_SORT,
- CarbonCommonConstants.LOAD_USE_BATCH_SORT_DEFAULT));
+ SortScopeOptions.SortScope sortScope = CarbonDataProcessorUtil.getSortScope(configuration);
Sorter sorter;
if (offheapsort) {
if (configuration.getBucketingInfo() != null) {
@@ -58,7 +57,7 @@ public class SorterFactory {
sorter = new ParallelReadMergeSorterImpl(counter);
}
}
- if (batchSort) {
+ if (sortScope.equals(SortScopeOptions.SortScope.BATCH_SORT)) {
if (configuration.getBucketingInfo() == null) {
sorter = new UnsafeBatchParallelReadMergeSorterImpl(counter);
} else {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d734f530/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
index df3825a..898b73d 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
@@ -98,9 +98,10 @@ public class UnsafeSortDataRows {
.getProperty(CarbonCommonConstants.ENABLE_INMEMORY_MERGE_SORT,
CarbonCommonConstants.ENABLE_INMEMORY_MERGE_SORT_DEFAULT));
- this.maxSizeAllowed = Integer.parseInt(CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB, "0"));
+ this.maxSizeAllowed = parameters.getBatchSortSizeinMb();
if (maxSizeAllowed <= 0) {
+ // If user does not input any memory size, then take half the size of usable memory configured
+ // in sort memory size.
this.maxSizeAllowed = UnsafeMemoryManager.INSTANCE.getUsableMemory() / 2;
} else {
this.maxSizeAllowed = this.maxSizeAllowed * 1024 * 1024;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d734f530/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
index 3c3a9d8..07149f7 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
@@ -114,6 +114,8 @@ public class SortParameters {
private int numberOfCores;
+ private int batchSortSizeinMb;
+
public SortParameters getCopy() {
SortParameters parameters = new SortParameters();
parameters.tempFileLocation = tempFileLocation;
@@ -138,6 +140,7 @@ public class SortParameters {
parameters.taskNo = taskNo;
parameters.noDictionaryDimnesionColumn = noDictionaryDimnesionColumn;
parameters.numberOfCores = numberOfCores;
+ parameters.batchSortSizeinMb = batchSortSizeinMb;
return parameters;
}
@@ -317,6 +320,14 @@ public class SortParameters {
this.numberOfCores = numberOfCores;
}
+ public int getBatchSortSizeinMb() {
+ return batchSortSizeinMb;
+ }
+
+ public void setBatchSortSizeinMb(int batchSortSizeinMb) {
+ this.batchSortSizeinMb = batchSortSizeinMb;
+ }
+
public static SortParameters createSortParameters(CarbonDataLoadConfiguration configuration) {
SortParameters parameters = new SortParameters();
CarbonTableIdentifier tableIdentifier =
@@ -334,6 +345,8 @@ public class SortParameters {
parameters.setComplexDimColCount(configuration.getComplexDimensionCount());
parameters.setNoDictionaryDimnesionColumn(
CarbonDataProcessorUtil.getNoDictionaryMapping(configuration.getDataFields()));
+ parameters.setBatchSortSizeinMb(CarbonDataProcessorUtil.getBatchSortSizeinMb(configuration));
+
parameters.setObserver(new SortObserver());
// get sort buffer size
parameters.setSortBufferSize(Integer.parseInt(carbonProperties
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d734f530/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index 41bfbed..a4de24e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -56,8 +56,10 @@ import org.apache.carbondata.processing.datatypes.GenericDataType;
import org.apache.carbondata.processing.datatypes.PrimitiveDataType;
import org.apache.carbondata.processing.datatypes.StructDataType;
import org.apache.carbondata.processing.model.CarbonDataLoadSchema;
+import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
import org.apache.carbondata.processing.newflow.DataField;
import org.apache.carbondata.processing.newflow.row.CarbonRow;
+import org.apache.carbondata.processing.newflow.sort.SortScopeOptions;
import org.apache.commons.lang3.ArrayUtils;
@@ -522,4 +524,53 @@ public final class CarbonDataProcessorUtil {
return aggType;
}
+ /**
+ * Check whether batch sort is enabled or not.
+ * @param configuration
+ * @return
+ */
+ public static SortScopeOptions.SortScope getSortScope(CarbonDataLoadConfiguration configuration) {
+ SortScopeOptions.SortScope sortScope;
+ try {
+ // first check whether user input it from ddl, otherwise get from carbon properties
+ if (configuration.getDataLoadProperty(CarbonCommonConstants.LOAD_SORT_SCOPE) == null) {
+ sortScope = SortScopeOptions.getSortScope(CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
+ CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT));
+ } else {
+ sortScope = SortScopeOptions.getSortScope(
+ configuration.getDataLoadProperty(CarbonCommonConstants.LOAD_SORT_SCOPE)
+ .toString());
+ }
+ } catch (Exception e) {
+ sortScope = SortScopeOptions.getSortScope(CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT);
+ LOGGER.warn("sort scope is set to " + sortScope);
+ }
+ return sortScope;
+ }
+
+ /**
+ * Get the batch sort size
+ * @param configuration
+ * @return
+ */
+ public static int getBatchSortSizeinMb(CarbonDataLoadConfiguration configuration) {
+ int batchSortSizeInMb;
+ try {
+ // First try get from user input from ddl , otherwise get from carbon properties.
+ if (configuration.getDataLoadProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB)
+ == null) {
+ batchSortSizeInMb = Integer.parseInt(CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB, "0"));
+ } else {
+ batchSortSizeInMb = Integer.parseInt(
+ configuration.getDataLoadProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB)
+ .toString());
+ }
+ } catch (Exception e) {
+ batchSortSizeInMb = 0;
+ }
+ return batchSortSizeInMb;
+ }
+
}
\ No newline at end of file