You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/12/24 13:26:14 UTC
[18/50] [abbrv] carbondata git commit: [CARBONDATA-1880] Combine
input small files for GLOBAL_SORT
[CARBONDATA-1880] Combine input small files for GLOBAL_SORT
Combine input small files for GLOBAL_SORT to avoid carbon small file issue
This closes #1669
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/694ee774
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/694ee774
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/694ee774
Branch: refs/heads/fgdatamap
Commit: 694ee774cb8ca55ee24cd906368c3cf9cc96b0eb
Parents: 28c9418
Author: QiangCai <qi...@qq.com>
Authored: Fri Dec 15 22:02:28 2017 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Wed Dec 20 14:58:37 2017 +0800
----------------------------------------------------------------------
.../core/constants/CarbonCommonConstants.java | 4 +
.../carbondata/core/util/CarbonProperties.java | 14 ++
.../hadoop/CarbonMultiBlockSplit.java | 17 ++-
...ompactionSupportGlobalSortFunctionTest.scala | 4 +-
...mpactionSupportGlobalSortParameterTest.scala | 5 -
.../MajorCompactionIgnoreInMinorTest.scala | 16 +-
.../dataload/TestGlobalSortDataLoad.scala | 33 ++++-
.../testsuite/datamap/DataMapWriterSuite.scala | 1 +
.../load/DataLoadProcessBuilderOnSpark.scala | 148 ++++++++++++++++---
.../load/DataLoadProcessorStepOnSpark.scala | 7 +-
.../carbondata/spark/rdd/CarbonScanRDD.scala | 121 +++++++++++----
.../apache/spark/sql/util/CarbonException.scala | 8 +-
.../apache/spark/sql/util/SparkSQLUtil.scala | 25 ++++
.../spark/rdd/CarbonDataRDDFactory.scala | 2 +-
.../sql/CarbonDatasourceHadoopRelation.scala | 2 +-
15 files changed, 333 insertions(+), 74 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/694ee774/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index f67b0c5..9534099 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1277,6 +1277,10 @@ public final class CarbonCommonConstants {
public static final String CARBON_CUSTOM_BLOCK_DISTRIBUTION = "carbon.custom.block.distribution";
public static final String CARBON_CUSTOM_BLOCK_DISTRIBUTION_DEFAULT = "false";
+ @CarbonProperty
+ public static final String CARBON_COMBINE_SMALL_INPUT_FILES = "carbon.mergeSmallFileRead.enable";
+ public static final String CARBON_COMBINE_SMALL_INPUT_FILES_DEFAULT = "false";
+
public static final int DICTIONARY_DEFAULT_CARDINALITY = 1;
@CarbonProperty
public static final String SPARK_SCHEMA_STRING_LENGTH_THRESHOLD =
http://git-wip-us.apache.org/repos/asf/carbondata/blob/694ee774/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index fe396cb..11aea99 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -106,6 +106,7 @@ public final class CarbonProperties {
validateLockType();
validateCarbonCSVReadBufferSizeByte();
validateHandoffSize();
+ validateCombineSmallInputFiles();
}
private void validateCarbonCSVReadBufferSizeByte() {
@@ -205,6 +206,19 @@ public final class CarbonProperties {
}
}
+ private void validateCombineSmallInputFiles() {
+ String combineSmallInputFilesStr =
+ carbonProperties.getProperty(CarbonCommonConstants.CARBON_COMBINE_SMALL_INPUT_FILES);
+ boolean isValidBooleanValue = CarbonUtil.validateBoolean(combineSmallInputFilesStr);
+ if (!isValidBooleanValue) {
+ LOGGER.warn("The combine small files value \"" + combineSmallInputFilesStr
+ + "\" is invalid. Using the default value \""
+ + CarbonCommonConstants.CARBON_COMBINE_SMALL_INPUT_FILES_DEFAULT);
+ carbonProperties.setProperty(CarbonCommonConstants.CARBON_COMBINE_SMALL_INPUT_FILES,
+ CarbonCommonConstants.CARBON_COMBINE_SMALL_INPUT_FILES_DEFAULT);
+ }
+ }
+
private void validateEnableUnsafeSort() {
String unSafeSortStr = carbonProperties.getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT);
boolean isValidBooleanValue = CarbonUtil.validateBoolean(unSafeSortStr);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/694ee774/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java
index 96fe909..aed3449 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java
@@ -47,15 +47,19 @@ public class CarbonMultiBlockSplit extends InputSplit implements Writable {
private FileFormat fileFormat = FileFormat.COLUMNAR_V3;
+ private long length;
+
public CarbonMultiBlockSplit() {
splitList = null;
locations = null;
+ length = 0;
}
public CarbonMultiBlockSplit(AbsoluteTableIdentifier identifier, List<CarbonInputSplit> splitList,
String[] locations) throws IOException {
this.splitList = splitList;
this.locations = locations;
+ calculateLength();
}
public CarbonMultiBlockSplit(AbsoluteTableIdentifier identifier, List<CarbonInputSplit> splitList,
@@ -63,6 +67,7 @@ public class CarbonMultiBlockSplit extends InputSplit implements Writable {
this.splitList = splitList;
this.locations = locations;
this.fileFormat = fileFormat;
+ calculateLength();
}
/**
@@ -75,11 +80,19 @@ public class CarbonMultiBlockSplit extends InputSplit implements Writable {
@Override
public long getLength() throws IOException, InterruptedException {
+ return length;
+ }
+
+ public void setLength(long length) {
+ this.length = length;
+ }
+
+ private void calculateLength() {
long total = 0;
- for (InputSplit split: splitList) {
+ for (CarbonInputSplit split : splitList) {
total += split.getLength();
}
- return total;
+ length = total;
}
@Override
http://git-wip-us.apache.org/repos/asf/carbondata/blob/694ee774/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortFunctionTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortFunctionTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortFunctionTest.scala
index 4958f55..9014edb 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortFunctionTest.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortFunctionTest.scala
@@ -439,7 +439,7 @@ class CompactionSupportGlobalSortFunctionTest extends QueryTest with BeforeAndAf
}
sql("ALTER TABLE compaction_globalsort COMPACT 'MAJOR'")
- assert(getIndexFileCount("compaction_globalsort", "0.1") === 3)
+ assert(getIndexFileCount("compaction_globalsort", "0.1") === 2)
checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(72)))
checkAnswer(sql("SELECT * FROM compaction_globalsort order by name, id"),
sql("SELECT * FROM carbon_localsort order by name, id"))
@@ -454,7 +454,7 @@ class CompactionSupportGlobalSortFunctionTest extends QueryTest with BeforeAndAf
}
sql("ALTER TABLE compaction_globalsort COMPACT 'MINOR'")
- assert(getIndexFileCount("compaction_globalsort", "0.1") === 3)
+ assert(getIndexFileCount("compaction_globalsort", "0.1") === 2)
checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(72)))
checkAnswer(sql("SELECT * FROM compaction_globalsort order by name, id"),
sql("SELECT * FROM carbon_localsort order by name, id"))
http://git-wip-us.apache.org/repos/asf/carbondata/blob/694ee774/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala
index f9959fa..02c602a 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala
@@ -17,8 +17,6 @@
package org.apache.carbondata.spark.testsuite.datacompaction
-import java.io.{File, FilenameFilter}
-
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.spark.sql.Row
@@ -26,10 +24,7 @@ import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore
-import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.core.metadata.CarbonMetadata
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.util.path.CarbonStorePath
class CompactionSupportGlobalSortParameterTest extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll {
val filePath: String = s"$resourcesPath/globalsort"
http://git-wip-us.apache.org/repos/asf/carbondata/blob/694ee774/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
index ed63fdf..61de615 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
@@ -59,16 +59,16 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll
"('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
)
// compaction will happen here.
- sql("alter table ignoremajor compact 'major'"
- )
- sql("LOAD DATA LOCAL INPATH '" + csvFilePath1 + "' INTO TABLE ignoremajor OPTIONS" +
+ sql("alter table ignoremajor compact 'major'")
+
+ sql("LOAD DATA LOCAL INPATH '" + csvFilePath1 + "' INTO TABLE ignoremajor OPTIONS" +
"('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
- )
- sql("LOAD DATA LOCAL INPATH '" + csvFilePath2 + "' INTO TABLE ignoremajor OPTIONS" +
+ )
+ sql("LOAD DATA LOCAL INPATH '" + csvFilePath2 + "' INTO TABLE ignoremajor OPTIONS" +
"('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
- )
- sql("alter table ignoremajor compact 'minor'"
- )
+ )
+ sql("alter table ignoremajor compact 'minor'"
+ )
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/694ee774/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
index 6bbc763..9ce9675 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
@@ -17,6 +17,10 @@
package org.apache.carbondata.spark.testsuite.dataload
+import java.io.{File, FileWriter}
+
+import org.apache.commons.io.FileUtils
+
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
@@ -105,7 +109,7 @@ class TestGlobalSortDataLoad extends QueryTest with BeforeAndAfterEach with Befo
sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort " +
"OPTIONS('BAD_RECORDS_ACTION'='REDIRECT')")
- assert(getIndexFileCount("carbon_globalsort") === 3)
+ assert(getIndexFileCount("carbon_globalsort") === 2)
checkAnswer(sql("SELECT COUNT(*) FROM carbon_globalsort"), Seq(Row(11)))
}
@@ -115,7 +119,7 @@ class TestGlobalSortDataLoad extends QueryTest with BeforeAndAfterEach with Befo
sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort " +
"OPTIONS('SINGLE_PASS'='TRUE')")
- assert(getIndexFileCount("carbon_globalsort") === 3)
+ assert(getIndexFileCount("carbon_globalsort") === 2)
checkAnswer(sql("SELECT COUNT(*) FROM carbon_globalsort"), Seq(Row(12)))
checkAnswer(sql("SELECT * FROM carbon_globalsort ORDER BY name"),
sql("SELECT * FROM carbon_localsort_once ORDER BY name"))
@@ -164,7 +168,7 @@ class TestGlobalSortDataLoad extends QueryTest with BeforeAndAfterEach with Befo
sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort")
sql("ALTER TABLE carbon_globalsort COMPACT 'MAJOR'")
- assert(getIndexFileCount("carbon_globalsort") === 3)
+ assert(getIndexFileCount("carbon_globalsort") === 2)
checkAnswer(sql("SELECT COUNT(*) FROM carbon_globalsort"), Seq(Row(24)))
checkAnswer(sql("SELECT * FROM carbon_globalsort ORDER BY name, id"),
sql("SELECT * FROM carbon_localsort_twice ORDER BY name, id"))
@@ -223,7 +227,7 @@ class TestGlobalSortDataLoad extends QueryTest with BeforeAndAfterEach with Befo
sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort")
sql("DELETE FROM carbon_globalsort WHERE id = 1").show
- assert(getIndexFileCount("carbon_globalsort") === 3)
+ assert(getIndexFileCount("carbon_globalsort") === 2)
checkAnswer(sql("SELECT COUNT(*) FROM carbon_globalsort"), Seq(Row(11)))
checkAnswer(sql("SELECT * FROM carbon_globalsort ORDER BY name, id"),
sql("SELECT * FROM carbon_localsort_delete ORDER BY name, id"))
@@ -250,6 +254,27 @@ class TestGlobalSortDataLoad extends QueryTest with BeforeAndAfterEach with Befo
sql("SELECT * FROM carbon_localsort_update ORDER BY name, id"))
}
+ test("LOAD with small files") {
+ val inputPath = new File("target/small_files").getCanonicalPath
+ val folder = new File(inputPath)
+ if (folder.exists()) {
+ FileUtils.deleteDirectory(folder)
+ }
+ folder.mkdir()
+ for (i <- 0 to 100) {
+ val file = s"$folder/file$i.csv"
+ val writer = new FileWriter(file)
+ writer.write("id,name,city,age\n")
+ writer.write(s"$i,name_$i,city_$i,${ i % 100 }")
+ writer.close()
+ }
+ sql(s"LOAD DATA LOCAL INPATH '$inputPath' INTO TABLE carbon_globalsort")
+ val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "carbon_globalsort")
+ val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
+ val segmentDir = carbonTablePath.getSegmentDir("0", "0")
+ assertResult(5)(new File(segmentDir).listFiles().length)
+ }
+
// ----------------------------------- INSERT INTO -----------------------------------
test("INSERT INTO") {
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, "GLOBAL_SORT")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/694ee774/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
index c137fc7..f73a202 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
@@ -70,6 +70,7 @@ class DataMapWriterSuite extends QueryTest with BeforeAndAfterAll {
sqlContext.sparkContext.parallelize(1 to numRows)
.map(x => ("a", "b", x))
.toDF("c1", "c2", "c3")
+ .sort("c3")
}
def dropTable(): Unit = {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/694ee774/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
index c14e0a7..2537a0c 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
@@ -17,17 +17,26 @@
package org.apache.carbondata.spark.load
-import java.util.Comparator
+import java.text.SimpleDateFormat
+import java.util.{Comparator, Date, Locale}
+
+import scala.collection.mutable.ArrayBuffer
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.io.NullWritable
+import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
-import org.apache.spark.{SparkContext, TaskContext}
+import org.apache.hadoop.mapreduce.{TaskAttemptID, TaskType}
+import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
+import org.apache.hadoop.mapreduce.task.{JobContextImpl, TaskAttemptContextImpl}
+import org.apache.spark.TaskContext
import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.rdd.NewHadoopRDD
-import org.apache.spark.sql.DataFrame
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
+import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.command.ExecutionErrors
+import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, PartitionedFile}
+import org.apache.spark.sql.util.SparkSQLUtil.sessionState
import org.apache.spark.storage.StorageLevel
import org.apache.carbondata.common.logging.LogServiceFactory
@@ -36,10 +45,11 @@ import org.apache.carbondata.core.datastore.row.CarbonRow
import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus}
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.processing.loading.{DataLoadProcessBuilder, FailureCauses}
-import org.apache.carbondata.processing.loading.csvinput.{CSVInputFormat, StringArrayWritable}
+import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.processing.sort.sortdata.{NewRowComparator, NewRowComparatorForNormalDims, SortParameters}
import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
+import org.apache.carbondata.spark.rdd.SerializableConfiguration
import org.apache.carbondata.spark.util.CommonUtil
/**
@@ -49,7 +59,7 @@ object DataLoadProcessBuilderOnSpark {
private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
def loadDataUsingGlobalSort(
- sc: SparkContext,
+ sparkSession: SparkSession,
dataFrame: Option[DataFrame],
model: CarbonLoadModel,
hadoopConf: Configuration): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = {
@@ -57,21 +67,13 @@ object DataLoadProcessBuilderOnSpark {
dataFrame.get.rdd
} else {
// input data from files
- CommonUtil.configureCSVInputFormat(hadoopConf, model)
- hadoopConf.set(FileInputFormat.INPUT_DIR, model.getFactFilePath)
val columnCount = model.getCsvHeaderColumns.length
- val jobConf = new JobConf(hadoopConf)
- SparkHadoopUtil.get.addCredentials(jobConf)
- new NewHadoopRDD[NullWritable, StringArrayWritable](
- sc,
- classOf[CSVInputFormat],
- classOf[NullWritable],
- classOf[StringArrayWritable],
- jobConf)
- .map(x => DataLoadProcessorStepOnSpark.toStringArrayRow(x._2, columnCount))
+ csvFileScanRDD(sparkSession, model, hadoopConf)
+ .map(DataLoadProcessorStepOnSpark.toStringArrayRow(_, columnCount))
}
model.setPartitionId("0")
+ val sc = sparkSession.sparkContext
val modelBroadcast = sc.broadcast(model)
val partialSuccessAccum = sc.accumulator(0, "Partial Success Accumulator")
@@ -160,4 +162,112 @@ object DataLoadProcessBuilderOnSpark {
Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors)))
}
}
+
+ /**
+ * creates a RDD that does reading of multiple CSV files
+ */
+ def csvFileScanRDD(
+ spark: SparkSession,
+ model: CarbonLoadModel,
+ hadoopConf: Configuration
+ ): RDD[InternalRow] = {
+ // 1. partition
+ val defaultMaxSplitBytes = sessionState(spark).conf.filesMaxPartitionBytes
+ val openCostInBytes = sessionState(spark).conf.filesOpenCostInBytes
+ val defaultParallelism = spark.sparkContext.defaultParallelism
+ CommonUtil.configureCSVInputFormat(hadoopConf, model)
+ hadoopConf.set(FileInputFormat.INPUT_DIR, model.getFactFilePath)
+ val jobConf = new JobConf(hadoopConf)
+ SparkHadoopUtil.get.addCredentials(jobConf)
+ val jobContext = new JobContextImpl(jobConf, null)
+ val inputFormat = new CSVInputFormat()
+ val rawSplits = inputFormat.getSplits(jobContext).toArray
+ val splitFiles = rawSplits.map { split =>
+ val fileSplit = split.asInstanceOf[FileSplit]
+ PartitionedFile(
+ InternalRow.empty,
+ fileSplit.getPath.toString,
+ fileSplit.getStart,
+ fileSplit.getLength,
+ fileSplit.getLocations)
+ }.sortBy(_.length)(implicitly[Ordering[Long]].reverse)
+ val totalBytes = splitFiles.map(_.length + openCostInBytes).sum
+ val bytesPerCore = totalBytes / defaultParallelism
+
+ val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
+ LOGGER.info(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " +
+ s"open cost is considered as scanning $openCostInBytes bytes.")
+
+ val partitions = new ArrayBuffer[FilePartition]
+ val currentFiles = new ArrayBuffer[PartitionedFile]
+ var currentSize = 0L
+
+ def closePartition(): Unit = {
+ if (currentFiles.nonEmpty) {
+ val newPartition =
+ FilePartition(
+ partitions.size,
+ currentFiles.toArray.toSeq)
+ partitions += newPartition
+ }
+ currentFiles.clear()
+ currentSize = 0
+ }
+
+ splitFiles.foreach { file =>
+ if (currentSize + file.length > maxSplitBytes) {
+ closePartition()
+ }
+ // Add the given file to the current partition.
+ currentSize += file.length + openCostInBytes
+ currentFiles += file
+ }
+ closePartition()
+
+ // 2. read function
+ val serializableConfiguration = new SerializableConfiguration(jobConf)
+ val readFunction = new (PartitionedFile => Iterator[InternalRow]) with Serializable {
+ override def apply(file: PartitionedFile): Iterator[InternalRow] = {
+ new Iterator[InternalRow] {
+ val hadoopConf = serializableConfiguration.value
+ val jobTrackerId: String = {
+ val formatter = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US)
+ formatter.format(new Date())
+ }
+ val attemptId = new TaskAttemptID(jobTrackerId, 0, TaskType.MAP, 0, 0)
+ val hadoopAttemptContext = new TaskAttemptContextImpl(hadoopConf, attemptId)
+ val inputSplit =
+ new FileSplit(new Path(file.filePath), file.start, file.length, file.locations)
+ var finished = false
+ val inputFormat = new CSVInputFormat()
+ val reader = inputFormat.createRecordReader(inputSplit, hadoopAttemptContext)
+ reader.initialize(inputSplit, hadoopAttemptContext)
+
+ override def hasNext: Boolean = {
+ if (!finished) {
+ if (reader != null) {
+ if (reader.nextKeyValue()) {
+ true
+ } else {
+ finished = true
+ reader.close()
+ false
+ }
+ } else {
+ finished = true
+ false
+ }
+ } else {
+ false
+ }
+ }
+
+ override def next(): InternalRow = {
+ new GenericInternalRow(reader.getCurrentValue.get().asInstanceOf[Array[Any]])
+ }
+ }
+ }
+ }
+ new FileScanRDD(spark, readFunction, partitions)
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/694ee774/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
index 5e6ba98..154d3ed 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
@@ -23,6 +23,8 @@ import com.univocity.parsers.common.TextParsingException
import org.apache.spark.{Accumulator, SparkEnv, TaskContext}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException
@@ -30,7 +32,6 @@ import org.apache.carbondata.core.datastore.row.CarbonRow
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.processing.loading.{DataLoadProcessBuilder, TableProcessingOperations}
import org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl
-import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable
import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.processing.loading.parser.impl.RowParserImpl
@@ -45,9 +46,9 @@ import org.apache.carbondata.spark.util.Util
object DataLoadProcessorStepOnSpark {
private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
- def toStringArrayRow(row: StringArrayWritable, columnCount: Int): StringArrayRow = {
+ def toStringArrayRow(row: InternalRow, columnCount: Int): StringArrayRow = {
val outRow = new StringArrayRow(new Array[String](columnCount))
- outRow.setValues(row.get())
+ outRow.setValues(row.asInstanceOf[GenericInternalRow].values.asInstanceOf[Array[String]])
}
def toRDDIterator(
http://git-wip-us.apache.org/repos/asf/carbondata/blob/694ee774/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index e58bfd4..09dbd71 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -31,6 +31,8 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
import org.apache.spark._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.hive.DistributionUtil
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.util.SparkSQLUtil.sessionState
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -55,7 +57,7 @@ import org.apache.carbondata.spark.util.SparkDataTypeConverterImpl
* level filtering in driver side.
*/
class CarbonScanRDD(
- @transient sc: SparkContext,
+ @transient spark: SparkSession,
val columnProjection: CarbonProjection,
var filterExpression: Expression,
identifier: AbsoluteTableIdentifier,
@@ -63,7 +65,7 @@ class CarbonScanRDD(
@transient tableInfo: TableInfo,
inputMetricsStats: InitInputMetrics,
@transient val partitionNames: Seq[String])
- extends CarbonRDDWithTableInfo[InternalRow](sc, Nil, serializedTableInfo) {
+ extends CarbonRDDWithTableInfo[InternalRow](spark.sparkContext, Nil, serializedTableInfo) {
private val queryId = sparkContext.getConf.get("queryId", System.nanoTime() + "")
private val jobTrackerId: String = {
@@ -186,31 +188,74 @@ class CarbonScanRDD(
}
}
noOfNodes = nodeBlockMapping.size
- } else {
- if (CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.CARBON_USE_BLOCKLET_DISTRIBUTION,
- CarbonCommonConstants.CARBON_USE_BLOCKLET_DISTRIBUTION_DEFAULT).toBoolean) {
- // Use blocklet distribution
- // Randomize the blocklets for better shuffling
- Random.shuffle(splits.asScala).zipWithIndex.foreach { splitWithIndex =>
- val multiBlockSplit =
- new CarbonMultiBlockSplit(identifier,
- Seq(splitWithIndex._1.asInstanceOf[CarbonInputSplit]).asJava,
- splitWithIndex._1.getLocations)
- val partition = new CarbonSparkPartition(id, splitWithIndex._2, multiBlockSplit)
- result.add(partition)
+ } else if (CarbonProperties.getInstance().getProperty(
+ CarbonCommonConstants.CARBON_USE_BLOCKLET_DISTRIBUTION,
+ CarbonCommonConstants.CARBON_USE_BLOCKLET_DISTRIBUTION_DEFAULT).toBoolean) {
+ // Use blocklet distribution
+ // Randomize the blocklets for better shuffling
+ Random.shuffle(splits.asScala).zipWithIndex.foreach { splitWithIndex =>
+ val multiBlockSplit =
+ new CarbonMultiBlockSplit(identifier,
+ Seq(splitWithIndex._1.asInstanceOf[CarbonInputSplit]).asJava,
+ splitWithIndex._1.getLocations)
+ val partition = new CarbonSparkPartition(id, splitWithIndex._2, multiBlockSplit)
+ result.add(partition)
+ }
+ } else if (CarbonProperties.getInstance().getProperty(
+ CarbonCommonConstants.CARBON_COMBINE_SMALL_INPUT_FILES,
+ CarbonCommonConstants.CARBON_COMBINE_SMALL_INPUT_FILES_DEFAULT).toBoolean) {
+
+ // sort blocks in reverse order of length
+ val blockSplits = splits
+ .asScala
+ .map(_.asInstanceOf[CarbonInputSplit])
+ .groupBy(f => f.getBlockPath)
+ .map { blockSplitEntry =>
+ new CarbonMultiBlockSplit(identifier,
+ blockSplitEntry._2.asJava,
+ blockSplitEntry._2.flatMap(f => f.getLocations).distinct.toArray)
+ }.toArray.sortBy(_.getLength)(implicitly[Ordering[Long]].reverse)
+
+ val defaultMaxSplitBytes = sessionState(spark).conf.filesMaxPartitionBytes
+ val openCostInBytes = sessionState(spark).conf.filesOpenCostInBytes
+ val defaultParallelism = spark.sparkContext.defaultParallelism
+ val totalBytes = blockSplits.map(_.getLength + openCostInBytes).sum
+ val bytesPerCore = totalBytes / defaultParallelism
+
+ val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
+ LOGGER.info(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " +
+ s"open cost is considered as scanning $openCostInBytes bytes.")
+
+ val currentFiles = new ArrayBuffer[CarbonMultiBlockSplit]
+ var currentSize = 0L
+
+ def closePartition(): Unit = {
+ if (currentFiles.nonEmpty) {
+ result.add(combineSplits(currentFiles, currentSize, result.size()))
}
- } else {
- // Use block distribution
- splits.asScala.map(_.asInstanceOf[CarbonInputSplit]).
- groupBy(f => f.getBlockPath).values.zipWithIndex.foreach { splitWithIndex =>
- val multiBlockSplit =
- new CarbonMultiBlockSplit(identifier,
- splitWithIndex._1.asJava,
- splitWithIndex._1.flatMap(f => f.getLocations).distinct.toArray)
- val partition = new CarbonSparkPartition(id, splitWithIndex._2, multiBlockSplit)
- result.add(partition)
+ currentFiles.clear()
+ currentSize = 0
+ }
+
+ blockSplits.foreach { file =>
+ if (currentSize + file.getLength > maxSplitBytes) {
+ closePartition()
}
+ // Add the given file to the current partition.
+ currentSize += file.getLength + openCostInBytes
+ currentFiles += file
+ }
+ closePartition()
+ } else {
+ // Use block distribution
+ splits.asScala.map(_.asInstanceOf[CarbonInputSplit])
+ .groupBy(f => f.getBlockPath).values.zipWithIndex.foreach { splitWithIndex =>
+ val multiBlockSplit =
+ new CarbonMultiBlockSplit(identifier,
+ splitWithIndex._1.asJava,
+ splitWithIndex._1.flatMap(f => f.getLocations).distinct.toArray)
+ val partition = new CarbonSparkPartition(id, splitWithIndex._2, multiBlockSplit)
+ result.add(partition)
}
}
@@ -232,6 +277,32 @@ class CarbonScanRDD(
result.asScala
}
+ def combineSplits(
+ splits: ArrayBuffer[CarbonMultiBlockSplit],
+ size: Long,
+ partitionId: Int
+ ): CarbonSparkPartition = {
+ val carbonInputSplits = splits.flatMap(_.getAllSplits.asScala)
+
+ // Computes total number of bytes can be retrieved from each host.
+ val hostToNumBytes = mutable.HashMap.empty[String, Long]
+ splits.foreach { split =>
+ split.getLocations.filter(_ != "localhost").foreach { host =>
+ hostToNumBytes(host) = hostToNumBytes.getOrElse(host, 0L) + split.getLength
+ }
+ }
+ // Takes the first 3 hosts with the most data to be retrieved
+ val locations = hostToNumBytes
+ .toSeq
+ .sortBy(_._2)(implicitly[Ordering[Long]].reverse)
+ .take(3)
+ .map(_._1)
+ .toArray
+
+ val multiBlockSplit = new CarbonMultiBlockSplit(null, carbonInputSplits.asJava, locations)
+ new CarbonSparkPartition(id, partitionId, multiBlockSplit)
+ }
+
override def internalCompute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
val queryStartTime = System.currentTimeMillis
val carbonPropertiesFilePath = System.getProperty("carbon.properties.filepath", null)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/694ee774/integration/spark-common/src/main/scala/org/apache/spark/sql/util/CarbonException.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/util/CarbonException.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/util/CarbonException.scala
index 9fd7099..7dabddb 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/util/CarbonException.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/util/CarbonException.scala
@@ -17,8 +17,8 @@
package org.apache.spark.sql.util
- import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.AnalysisException
- object CarbonException {
- def analysisException(message: String): Nothing = throw new AnalysisException(message)
- }
+object CarbonException {
+ def analysisException(message: String): Nothing = throw new AnalysisException(message)
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/694ee774/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
new file mode 100644
index 0000000..370f80c
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.util
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.internal.SessionState
+
+object SparkSQLUtil {
+ def sessionState(sparkSession: SparkSession): SessionState = sparkSession.sessionState
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/694ee774/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 0b786b5..72c979a 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -342,7 +342,7 @@ object CarbonDataRDDFactory {
status = if (carbonTable.getPartitionInfo(carbonTable.getTableName) != null) {
loadDataForPartitionTable(sqlContext, dataFrame, carbonLoadModel, hadoopConf)
} else if (isSortTable && sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT)) {
- DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(sqlContext.sparkContext,
+ DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(sqlContext.sparkSession,
dataFrame, carbonLoadModel, hadoopConf)
} else if (dataFrame.isDefined) {
loadDataFrame(sqlContext, dataFrame, carbonLoadModel)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/694ee774/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index 148fca8..ca0c51d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -76,7 +76,7 @@ case class CarbonDatasourceHadoopRelation(
requiredColumns.foreach(projection.addColumn)
val inputMetricsStats: CarbonInputMetrics = new CarbonInputMetrics
new CarbonScanRDD(
- sparkSession.sparkContext,
+ sparkSession,
projection,
filterExpression.orNull,
identifier,