You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2020/01/19 02:31:34 UTC
[carbondata] branch master updated: [CARBONDATA-3663] Support
loading stage files in batches
This is an automated email from the ASF dual-hosted git repository.
jackylk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 3d88685 [CARBONDATA-3663] Support loading stage files in batches
3d88685 is described below
commit 3d88685757b4f5eef330bc543e0d8d20f70bb8df
Author: liuzhi <37...@qq.com>
AuthorDate: Tue Jan 14 14:46:28 2020 +0800
[CARBONDATA-3663] Support loading stage files in batches
Why is this PR needed?
When there are a lots of stage files in the stage directory, if load all of them in once time, the loading time will can not be control.
There need a way for users to specify the number of stage files per processing, to control the execution time of commands.
What changes were proposed in this PR?
Add a load option batch_file_count for users to specify the number of stage files per processing.
Does this PR introduce any user interface change?
Yes
Is any new testcase added?
Yes
This closes #3578
---
docs/dml-of-carbondata.md | 19 ++++-
.../carbon/flink/TestCarbonPartitionWriter.scala | 22 +++---
.../org/apache/carbon/flink/TestCarbonWriter.scala | 87 ++++++++++++++++++++--
.../spark/load/DataLoadProcessBuilderOnSpark.scala | 44 ++++++++++-
.../management/CarbonInsertFromStageCommand.scala | 78 +++++++++----------
.../spark/sql/parser/CarbonSpark2SqlParser.scala | 10 ++-
6 files changed, 190 insertions(+), 70 deletions(-)
diff --git a/docs/dml-of-carbondata.md b/docs/dml-of-carbondata.md
index e148bd0..1cda75a 100644
--- a/docs/dml-of-carbondata.md
+++ b/docs/dml-of-carbondata.md
@@ -316,12 +316,29 @@ CarbonData DML statements are documented here,which includes:
You can use this command to insert them into the table, so that making them visible for query.
```
- INSERT INTO <CARBONDATA TABLE> STAGE
+ INSERT INTO <CARBONDATA TABLE> STAGE OPTIONS(property_name=property_value, ...)
```
+ **Supported Properties:**
+
+| Property | Description |
+| ------------------------------------------------------- | ------------------------------------------------------------ |
+| [BATCH_FILE_COUNT](#batch_file_count) | The number of stage files per processing |
+
+-
+ You can use the following options to load data:
+
+ - ##### BATCH_FILE_COUNT:
+ The number of stage files per processing.
+
+ ```
+ OPTIONS('batch_file_count'=',')
+ ```
Examples:
```
INSERT INTO table1 STAGE
+
+ INSERT INTO table1 STAGE OPTIONS('batch_file_count' = '5')
```
### Load Data Using Static Partition
diff --git a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonPartitionWriter.scala b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonPartitionWriter.scala
index fe2fa38..c92d6fc 100644
--- a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonPartitionWriter.scala
+++ b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonPartitionWriter.scala
@@ -34,7 +34,6 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.spark.sql.Row
import org.apache.spark.sql.test.util.QueryTest
-import org.junit.Test
import scala.collection.JavaConverters._
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
@@ -43,9 +42,8 @@ class TestCarbonPartitionWriter extends QueryTest {
val tableName = "test_flink_partition"
- @Test
- def testLocal(): Unit = {
- sql(s"drop table if exists $tableName").collect()
+ test("Writing flink data to local partition carbon table") {
+ sql(s"DROP TABLE IF EXISTS $tableName").collect()
sql(
s"""
| CREATE TABLE $tableName (stringField string, intField int, shortField short)
@@ -122,17 +120,16 @@ class TestCarbonPartitionWriter extends QueryTest {
sql(s"INSERT INTO $tableName STAGE")
- checkAnswer(sql(s"select count(1) from $tableName"), Seq(Row(1000)))
+ checkAnswer(sql(s"SELECT count(1) FROM $tableName"), Seq(Row(1000)))
} finally {
- sql(s"drop table if exists $tableName").collect()
+ sql(s"DROP TABLE IF EXISTS $tableName").collect()
delDir(new File(dataPath))
}
}
- @Test
- def testComplexType(): Unit = {
- sql(s"drop table if exists $tableName").collect()
+ test("Test complex type") {
+ sql(s"DROP TABLE IF EXISTS $tableName").collect()
sql(
s"""
| CREATE TABLE $tableName (stringField string, intField int, shortField short,
@@ -212,14 +209,14 @@ class TestCarbonPartitionWriter extends QueryTest {
sql(s"INSERT INTO $tableName STAGE")
- checkAnswer(sql(s"select count(1) from $tableName"), Seq(Row(1000)))
+ checkAnswer(sql(s"SELECT count(1) FROM $tableName"), Seq(Row(1000)))
- val rows = sql(s"select * from $tableName limit 1").collect()
+ val rows = sql(s"SELECT * FROM $tableName limit 1").collect()
assertResult(1)(rows.length)
assertResult(Array[Byte](2, 3, 4))(rows(0).get(rows(0).fieldIndex("binaryfield")).asInstanceOf[GenericRowWithSchema](0))
} finally {
- sql(s"drop table if exists $tableName").collect()
+ sql(s"DROP TABLE IF EXISTS $tableName").collect()
delDir(new File(dataPath))
}
}
@@ -231,7 +228,6 @@ class TestCarbonPartitionWriter extends QueryTest {
val properties = new Properties
properties.setProperty(CarbonLocalProperty.DATA_TEMP_PATH, dataTempPath)
properties.setProperty(CarbonLocalProperty.DATA_PATH, dataPath)
- properties.setProperty(CarbonLocalProperty.COMMIT_THRESHOLD, "100")
properties
}
diff --git a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala
index 9195863..a297dcf 100644
--- a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala
+++ b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala
@@ -27,7 +27,6 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.spark.sql.Row
import org.apache.spark.sql.test.util.QueryTest
-import org.junit.Test
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.util.path.CarbonTablePath
@@ -36,9 +35,8 @@ class TestCarbonWriter extends QueryTest {
val tableName = "test_flink"
- @Test
- def testLocal(): Unit = {
- sql(s"drop table if exists $tableName").collect()
+ test("Writing flink data to local carbon table") {
+ sql(s"DROP TABLE IF EXISTS $tableName").collect()
sql(
s"""
| CREATE TABLE $tableName (stringField string, intField int, shortField short)
@@ -103,14 +101,91 @@ class TestCarbonWriter extends QueryTest {
sql(s"INSERT INTO $tableName STAGE")
- checkAnswer(sql(s"select count(1) from $tableName"), Seq(Row(1000)))
+ checkAnswer(sql(s"SELECT count(1) FROM $tableName"), Seq(Row(1000)))
// ensure the stage snapshot file and all stage files are deleted
assertResult(false)(FileFactory.isFileExist(CarbonTablePath.getStageSnapshotFile(tablePath)))
assertResult(true)(FileFactory.getCarbonFile(CarbonTablePath.getStageDir(tablePath)).listFiles().isEmpty)
} finally {
- sql(s"drop table if exists $tableName").collect()
+ sql(s"DROP TABLE IF EXISTS $tableName").collect()
+ new File(dataPath).delete()
+ }
+ }
+
+ test("test batch_file_count option") {
+ sql(s"DROP TABLE IF EXISTS $tableName").collect()
+ sql(
+ s"""
+ | CREATE TABLE $tableName (stringField string, intField int, shortField short)
+ | STORED AS carbondata
+ """.stripMargin
+ ).collect()
+
+ val rootPath = System.getProperty("user.dir") + "/target/test-classes"
+
+ val dataTempPath = rootPath + "/data/temp/"
+ val dataPath = rootPath + "/data/"
+ new File(dataPath).delete()
+ new File(dataPath).mkdir()
+
+ try {
+ val tablePath = storeLocation + "/" + tableName + "/"
+
+ val writerProperties = newWriterProperties(dataTempPath, dataPath, storeLocation)
+ val carbonProperties = newCarbonProperties(storeLocation)
+
+ writerProperties.put(CarbonLocalProperty.COMMIT_THRESHOLD, "100")
+
+ val environment = StreamExecutionEnvironment.getExecutionEnvironment
+ environment.setParallelism(1)
+ environment.setRestartStrategy(RestartStrategies.noRestart)
+
+ val dataCount = 1000
+ val source = new TestSource(dataCount) {
+ @throws[InterruptedException]
+ override def get(index: Int): Array[AnyRef] = {
+ val data = new Array[AnyRef](3)
+ data(0) = "test" + index
+ data(1) = index.asInstanceOf[AnyRef]
+ data(2) = 12345.asInstanceOf[AnyRef]
+ data
+ }
+
+ @throws[InterruptedException]
+ override def onFinish(): Unit = {
+ Thread.sleep(5000L)
+ }
+ }
+ val stream = environment.addSource(source)
+ val factory = CarbonWriterFactory.builder("Local").build(
+ "default",
+ tableName,
+ tablePath,
+ new Properties,
+ writerProperties,
+ carbonProperties
+ )
+ val streamSink = StreamingFileSink.forBulkFormat(new Path(ProxyFileSystem.DEFAULT_URI), factory).build
+
+ stream.addSink(streamSink)
+
+ try environment.execute
+ catch {
+ case exception: Exception =>
+ // TODO
+ throw new UnsupportedOperationException(exception)
+ }
+
+ sql(s"INSERT INTO $tableName STAGE OPTIONS ('batch_file_count' = '5')")
+
+ checkAnswer(sql(s"SELECT count(1) FROM $tableName"), Seq(Row(500)))
+
+ sql(s"INSERT INTO $tableName STAGE OPTIONS ('batch_file_count' = '5')")
+
+ checkAnswer(sql(s"SELECT count(1) FROM $tableName"), Seq(Row(1000)))
+ } finally {
+ sql(s"DROP TABLE IF EXISTS $tableName").collect()
new File(dataPath).delete()
}
}
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
index dc97cd9..ae859c0 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
@@ -22,16 +22,20 @@ import java.util.Comparator
import scala.collection.JavaConverters._
import org.apache.hadoop.conf.Configuration
-import org.apache.spark.{Accumulator, DataSkewRangePartitioner, TaskContext}
+import org.apache.hadoop.mapreduce.InputSplit
+import org.apache.spark.{Accumulator, CarbonInputMetrics, DataSkewRangePartitioner, TaskContext}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
import org.apache.spark.sql.execution.command.ExecutionErrors
-import org.apache.spark.sql.util.SparkSQLUtil
+import org.apache.spark.sql.util.{SparkSQLUtil, SparkTypeConverter}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.unsafe.types.UTF8String
import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.converter.SparkDataTypeConverterImpl
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.row.CarbonRow
import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes, StructField, StructType}
@@ -40,6 +44,7 @@ import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, Ca
import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus}
import org.apache.carbondata.core.util._
import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer
+import org.apache.carbondata.hadoop.CarbonProjection
import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat
import org.apache.carbondata.processing.loading.{CarbonDataLoadConfiguration, DataField, DataLoadProcessBuilder, FailureCauses}
import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants
@@ -47,8 +52,9 @@ import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.processing.sort.sortdata.{NewRowComparator, NewRowComparatorForNormalDims, SortParameters}
import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, TableOptionConstant}
-import org.apache.carbondata.spark.rdd.StringArrayRow
+import org.apache.carbondata.spark.rdd.{CarbonScanRDD, StringArrayRow}
import org.apache.carbondata.spark.util.CommonUtil
+import org.apache.carbondata.store.CarbonRowReadSupport
/**
* Use sortBy operator in spark to load the data
@@ -423,6 +429,38 @@ object DataLoadProcessBuilderOnSpark {
}
loadModel
}
+
+ /**
+ * create DataFrame basing on specified splits
+ */
+ def createInputDataFrame(
+ sparkSession: SparkSession,
+ carbonTable: CarbonTable,
+ splits: Seq[InputSplit]
+ ): DataFrame = {
+ val columns = carbonTable
+ .getCreateOrderColumn
+ .asScala
+ .map(_.getColName)
+ .toArray
+ val schema = SparkTypeConverter.createSparkSchema(carbonTable, columns)
+ val rdd: RDD[InternalRow] = new CarbonScanRDD[CarbonRow](
+ sparkSession,
+ columnProjection = new CarbonProjection(columns),
+ null,
+ carbonTable.getAbsoluteTableIdentifier,
+ carbonTable.getTableInfo.serialize,
+ carbonTable.getTableInfo,
+ new CarbonInputMetrics,
+ null,
+ classOf[SparkDataTypeConverterImpl],
+ classOf[CarbonRowReadSupport],
+ splits.asJava)
+ .map { row =>
+ new GenericInternalRow(row.getData.asInstanceOf[Array[Any]])
+ }
+ SparkSQLUtil.execute(rdd, schema, sparkSession)
+ }
}
class PrimtiveOrdering(dataType: DataType) extends Ordering[Object] {
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
index a4dd45b..0d1121d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
@@ -28,7 +28,7 @@ import com.google.gson.Gson
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.InputSplit
import org.apache.log4j.Logger
-import org.apache.spark.sql.{CarbonEnv, CarbonUtils, Row, SparkSession}
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
import org.apache.spark.sql.execution.command.{Checker, DataCommand}
import org.apache.spark.sql.util.SparkSQLUtil
@@ -56,7 +56,8 @@ import org.apache.carbondata.spark.load.DataLoadProcessBuilderOnSpark
*/
case class CarbonInsertFromStageCommand(
databaseNameOp: Option[String],
- tableName: String
+ tableName: String,
+ options: Map[String, String]
) extends DataCommand {
@transient var LOGGER: Logger = _
@@ -113,7 +114,16 @@ case class CarbonInsertFromStageCommand(
// 8) delete the snapshot file
// 1) read all existing stage files
- val stageFiles = listStageFiles(stagePath, hadoopConf)
+ val batchSize = try {
+ Integer.valueOf(options.getOrElse("batch_file_count", Integer.MAX_VALUE.toString))
+ } catch {
+ case _: NumberFormatException =>
+ throw new MalformedCarbonCommandException("Option [batch_file_count] is not a number.")
+ }
+ if (batchSize < 1) {
+ throw new MalformedCarbonCommandException("Option [batch_file_count] is less than 1.")
+ }
+ val stageFiles = listStageFiles(stagePath, hadoopConf, batchSize)
if (stageFiles.isEmpty) {
// no stage files, so do nothing
LOGGER.warn("files not found under stage metadata folder")
@@ -258,25 +268,14 @@ case class CarbonInsertFromStageCommand(
LOGGER.info(s"start to load ${splits.size} files into " +
s"${table.getDatabaseName}.${table.getTableName}")
val start = System.currentTimeMillis()
- try {
- CarbonUtils
- .threadSet(CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
- table.getDatabaseName + CarbonCommonConstants.POINT + table.getTableName,
- splits.map(s => s.asInstanceOf[CarbonInputSplit].getSegmentId).mkString(","))
- val dataFrame = SparkSQLUtil.createInputDataFrame(spark, table)
- DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(
- spark,
- Option(dataFrame),
- loadModel,
- SparkSQLUtil.sessionState(spark).newHadoopConf()
- ).map { row =>
+ val dataFrame = DataLoadProcessBuilderOnSpark.createInputDataFrame(spark, table, splits)
+ DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(
+ spark,
+ Option(dataFrame),
+ loadModel,
+ SparkSQLUtil.sessionState(spark).newHadoopConf()
+ ).map { row =>
(row._1, FailureCauses.NONE == row._2._2.failureCauses)
- }
- } finally {
- CarbonUtils
- .threadUnset(CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
- table.getDatabaseName + "." +
- table.getTableName)
}
LOGGER.info(s"finish data loading, time taken ${System.currentTimeMillis() - start}ms")
@@ -316,26 +315,11 @@ case class CarbonInsertFromStageCommand(
val start = System.currentTimeMillis()
partitionDataList.map {
case (partition, splits) =>
- LOGGER.info(s"start to load ${ splits.size } files into " +
- s"${ table.getDatabaseName }.${ table.getTableName }. " +
- s"Partition information: ${ partition.mkString(",") }")
- val dataFrame = try {
- // Segments should be set for query here, because consider a scenario where custom
- // compaction is triggered, so it can happen that all the segments might be taken into
- // consideration instead of custom segments if we do not set, leading to duplicate data in
- // compacted segment. To avoid this, segments to be considered are to be set in threadset.
- CarbonUtils
- .threadSet(CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
- table.getDatabaseName + CarbonCommonConstants.POINT +
- table.getTableName,
- splits.map(split => split.asInstanceOf[CarbonInputSplit].getSegmentId).mkString(","))
- SparkSQLUtil.createInputDataFrame(spark, table)
- } finally {
- CarbonUtils.threadUnset(
- CarbonCommonConstants.CARBON_INPUT_SEGMENTS + table.getDatabaseName +
- CarbonCommonConstants.POINT +
- table.getTableName)
- }
+ LOGGER.info(s"start to load ${splits.size} files into " +
+ s"${table.getDatabaseName}.${table.getTableName}. " +
+ s"Partition information: ${partition.mkString(",")}")
+ val dataFrame =
+ DataLoadProcessBuilderOnSpark.createInputDataFrame(spark, table, splits)
val columns = dataFrame.columns
val header = columns.mkString(",")
val selectColumns = columns.filter(!partition.contains(_))
@@ -457,7 +441,8 @@ case class CarbonInsertFromStageCommand(
*/
private def listStageFiles(
loadDetailsDir: String,
- hadoopConf: Configuration
+ hadoopConf: Configuration,
+ batchSize: Int
): Array[(CarbonFile, CarbonFile)] = {
val dir = FileFactory.getCarbonFile(loadDetailsDir, hadoopConf)
if (dir.exists()) {
@@ -467,13 +452,20 @@ case class CarbonInsertFromStageCommand(
}.map { file =>
(file.getName.substring(0, file.getName.indexOf(".")), file)
}.toMap
- allFiles.filter { file =>
+ val stageFiles = allFiles.filter { file =>
!file.getName.endsWith(CarbonTablePath.SUCCESS_FILE_SUBFIX)
}.filter { file =>
successFiles.contains(file.getName)
+ }.sortWith {
+ (file1, file2) => file1.getLastModifiedTime < file2.getLastModifiedTime
}.map { file =>
(file, successFiles(file.getName))
}
+ if (stageFiles.length <= batchSize) {
+ stageFiles
+ } else {
+ stageFiles.dropRight(stageFiles.length - batchSize)
+ }
} else {
Array.empty
}
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index 10b661a..ee094d7 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -523,12 +523,14 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
}
/**
- * INSERT INTO [dbName.]tableName STAGE
+ * INSERT INTO [dbName.]tableName STAGE [OPTIONS (key1=value1, key2=value2, ...)]
*/
protected lazy val insertStageData: Parser[LogicalPlan] =
- INSERT ~ INTO ~> (ident <~ ".").? ~ ident <~ STAGE <~ opt(";") ^^ {
- case dbName ~ tableName =>
- CarbonInsertFromStageCommand(dbName, tableName)
+ INSERT ~ INTO ~> (ident <~ ".").? ~ ident ~ STAGE ~
+ (OPTIONS ~> "(" ~> repsep(loadOptions, ",") <~ ")").? <~ opt(";") ^^ {
+ case dbName ~ tableName ~ stage ~ options =>
+ CarbonInsertFromStageCommand(dbName, tableName,
+ options.getOrElse(List[(String, String)]()).toMap[String, String])
}
protected lazy val cleanFiles: Parser[LogicalPlan] =