You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by aj...@apache.org on 2019/11/15 13:52:50 UTC
[carbondata] branch master updated: [CARBONDATA-3579] Support
merging index files when adding new partition
This is an automated email from the ASF dual-hosted git repository.
ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 28bcfa3 [CARBONDATA-3579] Support merging index files when adding new partition
28bcfa3 is described below
commit 28bcfa3a8c0f1ec8142fac1f81a0e050282762df
Author: Jacky Li <ja...@qq.com>
AuthorDate: Thu Nov 14 02:20:08 2019 +0800
[CARBONDATA-3579] Support merging index files when adding new partition
Normally, application will use Carbon SDK to write files into a
partition folder, then add the folder to partitioned carbon table.
If there are many threads writes to the same partition folder, there
will be many carbon index files, and it is not good for query
performance since all index files need to be read to spark driver.
So, a better way is to merge the index files when adding new partition
to carbon table.
This closes #3452
---
.../StandardPartitionTableQueryTestCase.scala | 50 +++++++++++++++++++++-
.../carbondata/events/AlterTableEvents.scala | 6 +--
.../spark/sql/events/MergeIndexEventListener.scala | 24 +++++++----
.../CarbonAlterTableAddHivePartitionCommand.scala | 23 ++++++++--
4 files changed, 88 insertions(+), 15 deletions(-)
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
index fb4b511..e151547 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
@@ -20,7 +20,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.strategy.CarbonDataSourceScan
import org.apache.spark.sql.test.Spark2TestQueryExecutor
import org.apache.spark.sql.test.util.QueryTest
-import org.apache.spark.sql.{DataFrame, Row}
+import org.apache.spark.sql.{CarbonEnv, DataFrame, Row}
import org.apache.spark.util.SparkUtil
import org.scalatest.BeforeAndAfterAll
@@ -28,6 +28,8 @@ import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandExcepti
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.sdk.file.{CarbonWriter, Field, Schema}
import org.apache.carbondata.spark.rdd.CarbonScanRDD
class StandardPartitionTableQueryTestCase extends QueryTest with BeforeAndAfterAll {
@@ -314,6 +316,52 @@ test("Creation of partition table should fail if the colname in table schema and
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(location))
}
+ test("sdk write and add partition based on location on partition table"){
+ sql("drop table if exists partitionTable")
+ sql("create table partitionTable (id int,name String) partitioned by(email string) stored as carbondata")
+ sql("insert into partitionTable select 1,'blue','abc'")
+ val schemaFile =
+ CarbonTablePath.getSchemaFilePath(
+ CarbonEnv.getCarbonTable(None, "partitionTable")(sqlContext.sparkSession).getTablePath)
+
+ val sdkWritePath = metaStoreDB +"/" +"def"
+ FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(sdkWritePath))
+
+ (1 to 3).foreach { i =>
+ val writer = CarbonWriter.builder()
+ .outputPath(sdkWritePath)
+ .writtenBy("test")
+ .withSchemaFile(schemaFile)
+ .withCsvInput()
+ .build()
+ writer.write(Seq("2", "red", "def").toArray)
+ writer.write(Seq("3", "black", "def").toArray)
+ writer.close()
+ }
+
+ sql(s"alter table partitionTable add partition (email='def') location '$sdkWritePath'")
+ sql("show partitions partitionTable").show(false)
+ checkAnswer(sql("show partitions partitionTable"), Seq(Row("email=abc"), Row("email=def")))
+ checkAnswer(sql("select email from partitionTable"), Seq(Row("abc"), Row("def"), Row("def"), Row("def"), Row("def"), Row("def"), Row("def")))
+ checkAnswer(sql("select count(*) from partitionTable"), Seq(Row(7)))
+ checkAnswer(sql("select id from partitionTable where email = 'def'"), Seq(Row(2), Row(3), Row(2), Row(3), Row(2), Row(3)))
+ // alter table add partition should merge index files
+ assert(FileFactory.getCarbonFile(sdkWritePath)
+ .listFiles()
+ .exists(_.getName.contains(".carbonindexmerge")))
+
+ // do compaction to sort data written by sdk
+ sql("alter table partitionTable compact 'major'")
+ assert(sql("show segments for table partitionTable").collectAsList().get(0).getString(1).contains("Compacted"))
+ checkAnswer(sql("show partitions partitionTable"), Seq(Row("email=abc"), Row("email=def")))
+ checkAnswer(sql("select email from partitionTable"), Seq(Row("abc"), Row("def"), Row("def"), Row("def"), Row("def"), Row("def"), Row("def")))
+ checkAnswer(sql("select count(*) from partitionTable"), Seq(Row(7)))
+ checkAnswer(sql("select id from partitionTable where email = 'def'"), Seq(Row(2), Row(3), Row(2), Row(3), Row(2), Row(3)))
+
+ sql("drop table if exists partitionTable")
+ FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(sdkWritePath))
+ }
+
test("add partition with static column partition with load command") {
sql(
"""
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
index 56f4d29..ea73ebe 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
@@ -205,9 +205,9 @@ case class AlterTableCompactionAbortEvent(sparkSession: SparkSession,
/**
* Compaction Event for handling merge index in alter DDL
*
- * @param sparkSession
- * @param carbonTable
- * @param alterTableModel
+ * @param sparkSession spark session
+ * @param carbonTable carbon table
+ * @param alterTableModel alter request
*/
case class AlterTableMergeIndexEvent(sparkSession: SparkSession,
carbonTable: CarbonTable,
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
index 967f390..fcee9f1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
@@ -84,12 +84,20 @@ class MergeIndexEventListener extends OperationEventListener with Logging {
carbonMainTable
.getTableName
}")
- val validSegments: mutable.Buffer[Segment] = CarbonDataMergerUtil.getValidSegmentList(
- carbonMainTable.getAbsoluteTableIdentifier, carbonMainTable.isChildTable).asScala
- val validSegmentIds: mutable.Buffer[String] = mutable.Buffer[String]()
- validSegments.foreach { segment =>
- validSegmentIds += segment.getSegmentNo
- }
+ val segmentsToMerge =
+ if (alterTableMergeIndexEvent.alterTableModel.customSegmentIds.isEmpty) {
+ val validSegments = CarbonDataMergerUtil.getValidSegmentList(
+ carbonMainTable.getAbsoluteTableIdentifier,
+ carbonMainTable.isChildTable).asScala
+ val validSegmentIds: mutable.Buffer[String] = mutable.Buffer[String]()
+ validSegments.foreach { segment =>
+ validSegmentIds += segment.getSegmentNo
+ }
+ validSegmentIds
+ } else {
+ alterTableMergeIndexEvent.alterTableModel.customSegmentIds.get
+ }
+
val loadFolderDetailsArray = SegmentStatusManager
.readLoadMetadata(carbonMainTable.getMetadataPath)
val segmentFileNameMap: java.util.Map[String, String] = new util.HashMap[String,
@@ -105,14 +113,14 @@ class MergeIndexEventListener extends OperationEventListener with Logging {
// old store is also upgraded to new store
CarbonMergeFilesRDD.mergeIndexFiles(
sparkSession = sparkSession,
- segmentIds = validSegmentIds,
+ segmentIds = segmentsToMerge,
segmentFileNameToSegmentIdMap = segmentFileNameMap,
tablePath = carbonMainTable.getTablePath,
carbonTable = carbonMainTable,
mergeIndexProperty = true,
readFileFooterFromCarbonDataFile = true)
// clear Block dataMap Cache
- MergeIndexUtil.clearBlockDataMapCache(carbonMainTable, validSegmentIds)
+ MergeIndexUtil.clearBlockDataMapCache(carbonMainTable, segmentsToMerge)
val requestMessage = "Compaction request completed for table " +
s"${ carbonMainTable.getDatabaseName }.${ carbonMainTable.getTableName }"
LOGGER.info(requestMessage)
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
index a7bf5f4..f29b7b2 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
@@ -24,7 +24,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
-import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand, AlterTableDropPartitionCommand, AtomicRunnableCommand}
+import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand, AlterTableDropPartitionCommand, AlterTableModel, AtomicRunnableCommand}
import org.apache.spark.sql.optimizer.CarbonFilters
import org.apache.carbondata.common.logging.LogServiceFactory
@@ -36,7 +36,7 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.statusmanager.SegmentStatus
import org.apache.carbondata.core.util.CarbonUtil
import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.events.{OperationContext, OperationListenerBus, PostAlterTableHivePartitionCommandEvent, PreAlterTableHivePartitionCommandEvent}
+import org.apache.carbondata.events.{AlterTableMergeIndexEvent, OperationContext, OperationListenerBus, PostAlterTableHivePartitionCommandEvent, PreAlterTableHivePartitionCommandEvent}
import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
import org.apache.carbondata.processing.util.CarbonLoaderUtil
@@ -116,7 +116,7 @@ case class CarbonAlterTableAddHivePartitionCommand(
if (segmentFile != null) {
val indexToSchemas = SegmentFileStore.getSchemaFiles(segmentFile, table.getTablePath)
val tableColums = table.getTableInfo.getFactTable.getListOfColumns.asScala
- var isSameSchema = indexToSchemas.asScala.exists{ case(key, columnSchemas) =>
+ val isSameSchema = indexToSchemas.asScala.exists{ case(key, columnSchemas) =>
columnSchemas.asScala.exists { col =>
tableColums.exists(p => p.getColumnUniqueId.equals(col.getColumnUniqueId))
} && columnSchemas.size() == tableColums.length
@@ -153,6 +153,23 @@ case class CarbonAlterTableAddHivePartitionCommand(
CarbonLoaderUtil.addDataIndexSizeIntoMetaEntry(newMetaEntry, loadModel.getSegmentId, table)
// Make the load as success in table status
CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, loadModel, false, false)
+
+ // Normally, application will use Carbon SDK to write files into a partition folder, then
+ // add the folder to partitioned carbon table.
+ // If there are many threads writes to the same partition folder, there will be many
+ // carbon index files, and it is not good for query performance since all index files
+ // need to be read to spark driver.
+ // So, here trigger to merge the index files by sending an event
+ val alterTableModel = AlterTableModel(
+ dbName = Some(table.getDatabaseName),
+ tableName = table.getTableName,
+ segmentUpdateStatusManager = None,
+ compactionType = "", // to trigger index merge, this is not required
+ factTimeStamp = Some(System.currentTimeMillis()),
+ alterSql = null,
+ customSegmentIds = Some(Seq(loadModel.getSegmentId).toList))
+ val mergeIndexEvent = AlterTableMergeIndexEvent(sparkSession, table, alterTableModel)
+ OperationListenerBus.getInstance.fireEvent(mergeIndexEvent, new OperationContext)
}
}
Seq.empty[Row]