You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/02/03 19:43:39 UTC
[32/50] [abbrv] carbondata git commit: [CARBONDATA-2093] Use small
file feature of global sort to minimise the carbondata file count
[CARBONDATA-2093] Use small file feature of global sort to minimise the carbondata file count
This closes #1876
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/e527c059
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/e527c059
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/e527c059
Branch: refs/heads/branch-1.3
Commit: e527c059e81e58503d568c82a3e7ac822a8a5b47
Parents: 8875775
Author: ravipesala <ra...@gmail.com>
Authored: Sun Jan 28 20:37:21 2018 +0530
Committer: QiangCai <qi...@qq.com>
Committed: Sat Feb 3 16:36:30 2018 +0800
----------------------------------------------------------------------
.../StandardPartitionTableLoadingTestCase.scala | 77 ++++++++++-
.../load/DataLoadProcessBuilderOnSpark.scala | 130 +------------------
.../carbondata/spark/util/DataLoadingUtil.scala | 127 ++++++++++++++++++
.../management/CarbonLoadDataCommand.scala | 94 ++++++--------
.../sort/sortdata/SortParameters.java | 4 +
5 files changed, 249 insertions(+), 183 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e527c059/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
index 16f252b..669d6e7 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
@@ -16,11 +16,12 @@
*/
package org.apache.carbondata.spark.testsuite.standardpartition
-import java.io.{File, IOException}
+import java.io.{File, FileWriter, IOException}
import java.util
import java.util.concurrent.{Callable, ExecutorService, Executors}
import org.apache.commons.io.FileUtils
+import org.apache.spark.sql.execution.BatchedDataSourceScanExec
import org.apache.spark.sql.test.util.QueryTest
import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row}
import org.scalatest.BeforeAndAfterAll
@@ -30,7 +31,8 @@ import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFi
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.metadata.CarbonMetadata
import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
+import org.apache.carbondata.spark.rdd.CarbonScanRDD
class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfterAll {
var executorService: ExecutorService = _
@@ -409,6 +411,75 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte
sql("select * from casesensitivepartition where empno=17"))
}
+ test("Partition LOAD with small files") {
+ sql("DROP TABLE IF EXISTS smallpartitionfiles")
+ sql(
+ """
+ | CREATE TABLE smallpartitionfiles(id INT, name STRING, age INT) PARTITIONED BY(city STRING)
+ | STORED BY 'org.apache.carbondata.format'
+ """.stripMargin)
+ val inputPath = new File("target/small_files").getCanonicalPath
+ val folder = new File(inputPath)
+ if (folder.exists()) {
+ FileUtils.deleteDirectory(folder)
+ }
+ folder.mkdir()
+ for (i <- 0 to 100) {
+ val file = s"$folder/file$i.csv"
+ val writer = new FileWriter(file)
+ writer.write("id,name,city,age\n")
+ writer.write(s"$i,name_$i,city_${i % 5},${ i % 100 }")
+ writer.close()
+ }
+ sql(s"LOAD DATA LOCAL INPATH '$inputPath' INTO TABLE smallpartitionfiles")
+ FileUtils.deleteDirectory(folder)
+ val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "smallpartitionfiles")
+ val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
+ val segmentDir = carbonTablePath.getSegmentDir("0", "0")
+ assert(new File(segmentDir).listFiles().length < 50)
+ }
+
+ test("verify partition read with small files") {
+ try {
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION,
+ CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_MERGE_FILES)
+ sql("DROP TABLE IF EXISTS smallpartitionfilesread")
+ sql(
+ """
+ | CREATE TABLE smallpartitionfilesread(id INT, name STRING, age INT) PARTITIONED BY
+ | (city STRING)
+ | STORED BY 'org.apache.carbondata.format'
+ """.stripMargin)
+ val inputPath = new File("target/small_files").getCanonicalPath
+ val folder = new File(inputPath)
+ if (folder.exists()) {
+ FileUtils.deleteDirectory(folder)
+ }
+ folder.mkdir()
+ for (i <- 0 until 100) {
+ val file = s"$folder/file$i.csv"
+ val writer = new FileWriter(file)
+ writer.write("id,name,city,age\n")
+ writer.write(s"$i,name_$i,city_${ i },${ i % 100 }")
+ writer.close()
+ }
+ sql(s"LOAD DATA LOCAL INPATH '$inputPath' INTO TABLE smallpartitionfilesread")
+ FileUtils.deleteDirectory(folder)
+ val dataFrame = sql("select * from smallpartitionfilesread")
+ val scanRdd = dataFrame.queryExecution.sparkPlan.collect {
+ case b: BatchedDataSourceScanExec if b.rdd.isInstanceOf[CarbonScanRDD] => b.rdd
+ .asInstanceOf[CarbonScanRDD]
+ }.head
+ assert(scanRdd.getPartitions.length < 10)
+ assertResult(100)(dataFrame.count)
+ } finally {
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION ,
+ CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT)
+ }
+ }
+
+
+
def restoreData(dblocation: String, tableName: String) = {
val destination = dblocation + CarbonCommonConstants.FILE_SEPARATOR + tableName
val source = dblocation+ "_back" + CarbonCommonConstants.FILE_SEPARATOR + tableName
@@ -435,6 +506,8 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte
override def afterAll = {
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION ,
+ CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT)
dropTable
if (executorService != null && !executorService.isShutdown) {
executorService.shutdownNow()
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e527c059/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
index 781b484..8be70a9 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
@@ -17,26 +17,12 @@
package org.apache.carbondata.spark.load
-import java.text.SimpleDateFormat
-import java.util.{Comparator, Date, Locale}
-
-import scala.collection.mutable.ArrayBuffer
+import java.util.Comparator
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.mapreduce.{TaskAttemptID, TaskType}
-import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
-import org.apache.hadoop.mapreduce.task.{JobContextImpl, TaskAttemptContextImpl}
import org.apache.spark.TaskContext
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
-import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
-import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.command.ExecutionErrors
-import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, PartitionedFile}
-import org.apache.spark.sql.util.SparkSQLUtil.sessionState
import org.apache.spark.storage.StorageLevel
import org.apache.carbondata.common.logging.LogServiceFactory
@@ -45,12 +31,10 @@ import org.apache.carbondata.core.datastore.row.CarbonRow
import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus}
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.processing.loading.{DataLoadProcessBuilder, FailureCauses}
-import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.processing.sort.sortdata.{NewRowComparator, NewRowComparatorForNormalDims, SortParameters}
import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
-import org.apache.carbondata.spark.rdd.SerializableConfiguration
-import org.apache.carbondata.spark.util.CommonUtil
+import org.apache.carbondata.spark.util.DataLoadingUtil
/**
* Use sortBy operator in spark to load the data
@@ -68,7 +52,7 @@ object DataLoadProcessBuilderOnSpark {
} else {
// input data from files
val columnCount = model.getCsvHeaderColumns.length
- csvFileScanRDD(sparkSession, model, hadoopConf)
+ DataLoadingUtil.csvFileScanRDD(sparkSession, model, hadoopConf)
.map(DataLoadProcessorStepOnSpark.toStringArrayRow(_, columnCount))
}
@@ -166,112 +150,4 @@ object DataLoadProcessBuilderOnSpark {
Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors)))
}
}
-
- /**
- * creates a RDD that does reading of multiple CSV files
- */
- def csvFileScanRDD(
- spark: SparkSession,
- model: CarbonLoadModel,
- hadoopConf: Configuration
- ): RDD[InternalRow] = {
- // 1. partition
- val defaultMaxSplitBytes = sessionState(spark).conf.filesMaxPartitionBytes
- val openCostInBytes = sessionState(spark).conf.filesOpenCostInBytes
- val defaultParallelism = spark.sparkContext.defaultParallelism
- CommonUtil.configureCSVInputFormat(hadoopConf, model)
- hadoopConf.set(FileInputFormat.INPUT_DIR, model.getFactFilePath)
- val jobConf = new JobConf(hadoopConf)
- SparkHadoopUtil.get.addCredentials(jobConf)
- val jobContext = new JobContextImpl(jobConf, null)
- val inputFormat = new CSVInputFormat()
- val rawSplits = inputFormat.getSplits(jobContext).toArray
- val splitFiles = rawSplits.map { split =>
- val fileSplit = split.asInstanceOf[FileSplit]
- PartitionedFile(
- InternalRow.empty,
- fileSplit.getPath.toString,
- fileSplit.getStart,
- fileSplit.getLength,
- fileSplit.getLocations)
- }.sortBy(_.length)(implicitly[Ordering[Long]].reverse)
- val totalBytes = splitFiles.map(_.length + openCostInBytes).sum
- val bytesPerCore = totalBytes / defaultParallelism
-
- val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
- LOGGER.info(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " +
- s"open cost is considered as scanning $openCostInBytes bytes.")
-
- val partitions = new ArrayBuffer[FilePartition]
- val currentFiles = new ArrayBuffer[PartitionedFile]
- var currentSize = 0L
-
- def closePartition(): Unit = {
- if (currentFiles.nonEmpty) {
- val newPartition =
- FilePartition(
- partitions.size,
- currentFiles.toArray.toSeq)
- partitions += newPartition
- }
- currentFiles.clear()
- currentSize = 0
- }
-
- splitFiles.foreach { file =>
- if (currentSize + file.length > maxSplitBytes) {
- closePartition()
- }
- // Add the given file to the current partition.
- currentSize += file.length + openCostInBytes
- currentFiles += file
- }
- closePartition()
-
- // 2. read function
- val serializableConfiguration = new SerializableConfiguration(jobConf)
- val readFunction = new (PartitionedFile => Iterator[InternalRow]) with Serializable {
- override def apply(file: PartitionedFile): Iterator[InternalRow] = {
- new Iterator[InternalRow] {
- val hadoopConf = serializableConfiguration.value
- val jobTrackerId: String = {
- val formatter = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US)
- formatter.format(new Date())
- }
- val attemptId = new TaskAttemptID(jobTrackerId, 0, TaskType.MAP, 0, 0)
- val hadoopAttemptContext = new TaskAttemptContextImpl(hadoopConf, attemptId)
- val inputSplit =
- new FileSplit(new Path(file.filePath), file.start, file.length, file.locations)
- var finished = false
- val inputFormat = new CSVInputFormat()
- val reader = inputFormat.createRecordReader(inputSplit, hadoopAttemptContext)
- reader.initialize(inputSplit, hadoopAttemptContext)
-
- override def hasNext: Boolean = {
- if (!finished) {
- if (reader != null) {
- if (reader.nextKeyValue()) {
- true
- } else {
- finished = true
- reader.close()
- false
- }
- } else {
- finished = true
- false
- }
- } else {
- false
- }
- }
-
- override def next(): InternalRow = {
- new GenericInternalRow(reader.getCurrentValue.get().asInstanceOf[Array[Any]])
- }
- }
- }
- }
- new FileScanRDD(spark, readFunction, partitions)
- }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e527c059/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
index 5e9f7fe..8b4c232 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
@@ -17,12 +17,28 @@
package org.apache.carbondata.spark.util
+import java.text.SimpleDateFormat
+import java.util.{Date, Locale}
+
import scala.collection.{immutable, mutable}
import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapred.JobConf
+import org.apache.hadoop.mapreduce.{TaskAttemptID, TaskType}
+import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
+import org.apache.hadoop.mapreduce.task.{JobContextImpl, TaskAttemptContextImpl}
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
+import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, PartitionedFile}
import org.apache.spark.sql.util.CarbonException
+import org.apache.spark.sql.util.SparkSQLUtil.sessionState
import org.apache.carbondata.common.constants.LoggerAction
import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
@@ -32,10 +48,13 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants
+import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
import org.apache.carbondata.processing.util.{CarbonLoaderUtil, DeleteLoadFolders, TableOptionConstant}
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+import org.apache.carbondata.spark.load.DataLoadProcessBuilderOnSpark.LOGGER
import org.apache.carbondata.spark.load.ValidateUtil
+import org.apache.carbondata.spark.rdd.SerializableConfiguration
/**
* the util object of data loading
@@ -403,4 +422,112 @@ object DataLoadingUtil {
}
}
+ /**
+ * creates a RDD that does reading of multiple CSV files
+ */
+ def csvFileScanRDD(
+ spark: SparkSession,
+ model: CarbonLoadModel,
+ hadoopConf: Configuration
+ ): RDD[InternalRow] = {
+ // 1. partition
+ val defaultMaxSplitBytes = sessionState(spark).conf.filesMaxPartitionBytes
+ val openCostInBytes = sessionState(spark).conf.filesOpenCostInBytes
+ val defaultParallelism = spark.sparkContext.defaultParallelism
+ CommonUtil.configureCSVInputFormat(hadoopConf, model)
+ hadoopConf.set(FileInputFormat.INPUT_DIR, model.getFactFilePath)
+ val jobConf = new JobConf(hadoopConf)
+ SparkHadoopUtil.get.addCredentials(jobConf)
+ val jobContext = new JobContextImpl(jobConf, null)
+ val inputFormat = new CSVInputFormat()
+ val rawSplits = inputFormat.getSplits(jobContext).toArray
+ val splitFiles = rawSplits.map { split =>
+ val fileSplit = split.asInstanceOf[FileSplit]
+ PartitionedFile(
+ InternalRow.empty,
+ fileSplit.getPath.toString,
+ fileSplit.getStart,
+ fileSplit.getLength,
+ fileSplit.getLocations)
+ }.sortBy(_.length)(implicitly[Ordering[Long]].reverse)
+ val totalBytes = splitFiles.map(_.length + openCostInBytes).sum
+ val bytesPerCore = totalBytes / defaultParallelism
+
+ val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
+ LOGGER.info(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " +
+ s"open cost is considered as scanning $openCostInBytes bytes.")
+
+ val partitions = new ArrayBuffer[FilePartition]
+ val currentFiles = new ArrayBuffer[PartitionedFile]
+ var currentSize = 0L
+
+ def closePartition(): Unit = {
+ if (currentFiles.nonEmpty) {
+ val newPartition =
+ FilePartition(
+ partitions.size,
+ currentFiles.toArray.toSeq)
+ partitions += newPartition
+ }
+ currentFiles.clear()
+ currentSize = 0
+ }
+
+ splitFiles.foreach { file =>
+ if (currentSize + file.length > maxSplitBytes) {
+ closePartition()
+ }
+ // Add the given file to the current partition.
+ currentSize += file.length + openCostInBytes
+ currentFiles += file
+ }
+ closePartition()
+
+ // 2. read function
+ val serializableConfiguration = new SerializableConfiguration(jobConf)
+ val readFunction = new (PartitionedFile => Iterator[InternalRow]) with Serializable {
+ override def apply(file: PartitionedFile): Iterator[InternalRow] = {
+ new Iterator[InternalRow] {
+ val hadoopConf = serializableConfiguration.value
+ val jobTrackerId: String = {
+ val formatter = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US)
+ formatter.format(new Date())
+ }
+ val attemptId = new TaskAttemptID(jobTrackerId, 0, TaskType.MAP, 0, 0)
+ val hadoopAttemptContext = new TaskAttemptContextImpl(hadoopConf, attemptId)
+ val inputSplit =
+ new FileSplit(new Path(file.filePath), file.start, file.length, file.locations)
+ var finished = false
+ val inputFormat = new CSVInputFormat()
+ val reader = inputFormat.createRecordReader(inputSplit, hadoopAttemptContext)
+ reader.initialize(inputSplit, hadoopAttemptContext)
+
+ override def hasNext: Boolean = {
+ if (!finished) {
+ if (reader != null) {
+ if (reader.nextKeyValue()) {
+ true
+ } else {
+ finished = true
+ reader.close()
+ false
+ }
+ } else {
+ finished = true
+ false
+ }
+ } else {
+ false
+ }
+ }
+
+ override def next(): InternalRow = {
+ new GenericInternalRow(reader.getCurrentValue.get().asInstanceOf[Array[Any]])
+ }
+ }
+ }
+ }
+ new FileScanRDD(spark, readFunction, partitions)
+ }
+
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e527c059/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 8e6c20e..7d49c11 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -26,22 +26,18 @@ import scala.collection.mutable
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.io.NullWritable
-import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.rdd.{NewHadoopRDD, RDD}
+import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, UnresolvedAttribute}
import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, GenericInternalRow}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.execution.LogicalRDD
import org.apache.spark.sql.execution.SQLExecution.EXECUTION_ID_KEY
import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataLoadTableFileMapping, UpdateTableModel}
-import org.apache.spark.sql.execution.datasources.{CarbonFileFormat, CatalogFileIndex, HadoopFsRelation, LogicalRelation}
+import org.apache.spark.sql.execution.datasources.{CarbonFileFormat, CatalogFileIndex, FindDataSourceTable, HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.hive.CarbonRelation
import org.apache.spark.sql.optimizer.CarbonFilters
import org.apache.spark.sql.types.{StringType, StructField, StructType}
@@ -60,22 +56,21 @@ import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum}
import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
-import org.apache.carbondata.core.util.path.{CarbonStorePath}
+import org.apache.carbondata.core.util.path.CarbonStorePath
import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
import org.apache.carbondata.events.exception.PreEventException
import org.apache.carbondata.hadoop.util.ObjectSerializationUtil
import org.apache.carbondata.processing.exception.DataLoadingException
import org.apache.carbondata.processing.loading.TableProcessingOperations
-import org.apache.carbondata.processing.loading.csvinput.{CSVInputFormat, StringArrayWritable}
import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePreExecutionEvent}
-import org.apache.carbondata.processing.loading.exception.{NoRetryException}
-import org.apache.carbondata.processing.loading.model.{CarbonLoadModel}
+import org.apache.carbondata.processing.loading.exception.NoRetryException
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.processing.util.CarbonLoaderUtil
import org.apache.carbondata.spark.dictionary.provider.SecureDictionaryServiceProvider
import org.apache.carbondata.spark.dictionary.server.SecureDictionaryServer
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, CarbonDropPartitionCommitRDD, CarbonDropPartitionRDD}
-import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, DataLoadingUtil, GlobalDictionaryUtil}
+import org.apache.carbondata.spark.util.{CarbonScalaUtil, DataLoadingUtil, GlobalDictionaryUtil}
case class CarbonLoadDataCommand(
databaseNameOp: Option[String],
@@ -95,6 +90,10 @@ case class CarbonLoadDataCommand(
var table: CarbonTable = _
+ var logicalPartitionRelation: LogicalRelation = _
+
+ var sizeInBytes: Long = _
+
override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
@@ -113,6 +112,15 @@ case class CarbonLoadDataCommand(
}
relation.carbonTable
}
+ if (table.isHivePartitionTable) {
+ logicalPartitionRelation =
+ new FindDataSourceTable(sparkSession).apply(
+ sparkSession.sessionState.catalog.lookupRelation(
+ TableIdentifier(tableName, databaseNameOp))).collect {
+ case l: LogicalRelation => l
+ }.head
+ sizeInBytes = logicalPartitionRelation.relation.sizeInBytes
+ }
operationContext.setProperty("isOverwrite", isOverwriteTable)
if(CarbonUtil.hasAggregationDataMap(table)) {
val loadMetadataEvent = new LoadMetadataEvent(table, false)
@@ -500,20 +508,7 @@ case class CarbonLoadDataCommand(
operationContext: OperationContext) = {
val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
val identifier = TableIdentifier(table.getTableName, Some(table.getDatabaseName))
- val logicalPlan =
- sparkSession.sessionState.catalog.lookupRelation(
- identifier)
- val catalogTable: CatalogTable = logicalPlan.collect {
- case l: LogicalRelation => l.catalogTable.get
- case c // To make compatabile with spark 2.1 and 2.2 we need to compare classes
- if c.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.CatalogRelation") ||
- c.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.HiveTableRelation") ||
- c.getClass.getName.equals(
- "org.apache.spark.sql.catalyst.catalog.UnresolvedCatalogRelation") =>
- CarbonReflectionUtils.getFieldOfCatalogTable(
- "tableMeta",
- c).asInstanceOf[CatalogTable]
- }.head
+ val catalogTable: CatalogTable = logicalPartitionRelation.catalogTable.get
val currentPartitions =
CarbonFilters.getPartitions(Seq.empty[Expression], sparkSession, identifier)
// Clean up the alreday dropped partitioned data
@@ -581,10 +576,6 @@ case class CarbonLoadDataCommand(
} else {
// input data from csv files. Convert to logical plan
- CommonUtil.configureCSVInputFormat(hadoopConf, carbonLoadModel)
- hadoopConf.set(FileInputFormat.INPUT_DIR, carbonLoadModel.getFactFilePath)
- val jobConf = new JobConf(hadoopConf)
- SparkHadoopUtil.get.addCredentials(jobConf)
val attributes =
StructType(carbonLoadModel.getCsvHeaderColumns.map(
StructField(_, StringType))).toAttributes
@@ -603,28 +594,27 @@ case class CarbonLoadDataCommand(
}
val len = rowDataTypes.length
var rdd =
- new NewHadoopRDD[NullWritable, StringArrayWritable](
- sparkSession.sparkContext,
- classOf[CSVInputFormat],
- classOf[NullWritable],
- classOf[StringArrayWritable],
- jobConf).map { case (key, value) =>
- val data = new Array[Any](len)
- var i = 0
- val input = value.get()
- val inputLen = Math.min(input.length, len)
- while (i < inputLen) {
- data(i) = UTF8String.fromString(input(i))
- // If partition column then update empty value with special string otherwise spark
- // makes it as null so we cannot internally handle badrecords.
- if (partitionColumns(i)) {
- if (input(i) != null && input(i).isEmpty) {
- data(i) = UTF8String.fromString(CarbonCommonConstants.MEMBER_DEFAULT_VAL)
+ DataLoadingUtil.csvFileScanRDD(
+ sparkSession,
+ model = carbonLoadModel,
+ hadoopConf)
+ .map { row =>
+ val data = new Array[Any](len)
+ var i = 0
+ val input = row.asInstanceOf[GenericInternalRow].values.asInstanceOf[Array[String]]
+ val inputLen = Math.min(input.length, len)
+ while (i < inputLen) {
+ data(i) = UTF8String.fromString(input(i))
+ // If partition column then update empty value with special string otherwise spark
+ // makes it as null so we cannot internally handle badrecords.
+ if (partitionColumns(i)) {
+ if (input(i) != null && input(i).isEmpty) {
+ data(i) = UTF8String.fromString(CarbonCommonConstants.MEMBER_DEFAULT_VAL)
+ }
}
+ i = i + 1
}
- i = i + 1
- }
- InternalRow.fromSeq(data)
+ InternalRow.fromSeq(data)
}
// Only select the required columns
@@ -638,10 +628,6 @@ case class CarbonLoadDataCommand(
}
Project(output, LogicalRDD(attributes, rdd)(sparkSession))
}
- // TODO need to find a way to avoid double lookup
- val sizeInBytes =
- CarbonEnv.getInstance(sparkSession).carbonMetastore.lookupRelation(
- catalogTable.identifier)(sparkSession).asInstanceOf[CarbonRelation].sizeInBytes
val convertRelation = convertToLogicalRelation(
catalogTable,
sizeInBytes,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e527c059/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
index a2248ee..98d150e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
@@ -403,6 +403,10 @@ public class SortParameters implements Serializable {
LOGGER.info("temp file location: " + StringUtils.join(parameters.getTempFileLocation(), ","));
int numberOfCores = carbonProperties.getNumberOfCores() / 2;
+ // In case of loading from partition we should use the cores specified by it
+ if (configuration.getWritingCoresCount() > 0) {
+ numberOfCores = configuration.getWritingCoresCount();
+ }
parameters.setNumberOfCores(numberOfCores > 0 ? numberOfCores : 1);
parameters.setFileWriteBufferSize(Integer.parseInt(carbonProperties