You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by aj...@apache.org on 2019/11/15 13:52:50 UTC

[carbondata] branch master updated: [CARBONDATA-3579] Support merging index files when adding new partition

This is an automated email from the ASF dual-hosted git repository.

ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 28bcfa3  [CARBONDATA-3579] Support merging index files when adding new partition
28bcfa3 is described below

commit 28bcfa3a8c0f1ec8142fac1f81a0e050282762df
Author: Jacky Li <ja...@qq.com>
AuthorDate: Thu Nov 14 02:20:08 2019 +0800

    [CARBONDATA-3579] Support merging index files when adding new partition
    
    Normally, application will use Carbon SDK to write files into a
    partition folder, then add the folder to partitioned carbon table.
    If there are many threads writes to the same partition folder, there
    will be many carbon index files, and it is not good for query
    performance since all index files need to be read to spark driver.
    
    So, a better way is to merge the index files when adding new partition
    to carbon table.
    
    This closes #3452
---
 .../StandardPartitionTableQueryTestCase.scala      | 50 +++++++++++++++++++++-
 .../carbondata/events/AlterTableEvents.scala       |  6 +--
 .../spark/sql/events/MergeIndexEventListener.scala | 24 +++++++----
 .../CarbonAlterTableAddHivePartitionCommand.scala  | 23 ++++++++--
 4 files changed, 88 insertions(+), 15 deletions(-)

diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
index fb4b511..e151547 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
@@ -20,7 +20,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.strategy.CarbonDataSourceScan
 import org.apache.spark.sql.test.Spark2TestQueryExecutor
 import org.apache.spark.sql.test.util.QueryTest
-import org.apache.spark.sql.{DataFrame, Row}
+import org.apache.spark.sql.{CarbonEnv, DataFrame, Row}
 import org.apache.spark.util.SparkUtil
 import org.scalatest.BeforeAndAfterAll
 
@@ -28,6 +28,8 @@ import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandExcepti
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.sdk.file.{CarbonWriter, Field, Schema}
 import org.apache.carbondata.spark.rdd.CarbonScanRDD
 
 class StandardPartitionTableQueryTestCase extends QueryTest with BeforeAndAfterAll {
@@ -314,6 +316,52 @@ test("Creation of partition table should fail if the colname in table schema and
     FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(location))
   }
 
+  test("sdk write and add partition based on location on partition table"){
+    sql("drop table if exists partitionTable")
+    sql("create table partitionTable (id int,name String) partitioned by(email string) stored as carbondata")
+    sql("insert into partitionTable select 1,'blue','abc'")
+    val schemaFile =
+      CarbonTablePath.getSchemaFilePath(
+        CarbonEnv.getCarbonTable(None, "partitionTable")(sqlContext.sparkSession).getTablePath)
+
+    val sdkWritePath = metaStoreDB +"/" +"def"
+    FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(sdkWritePath))
+
+    (1 to 3).foreach { i =>
+      val writer = CarbonWriter.builder()
+        .outputPath(sdkWritePath)
+        .writtenBy("test")
+        .withSchemaFile(schemaFile)
+        .withCsvInput()
+        .build()
+      writer.write(Seq("2", "red", "def").toArray)
+      writer.write(Seq("3", "black", "def").toArray)
+      writer.close()
+    }
+
+    sql(s"alter table partitionTable add partition (email='def') location '$sdkWritePath'")
+    sql("show partitions partitionTable").show(false)
+    checkAnswer(sql("show partitions partitionTable"), Seq(Row("email=abc"), Row("email=def")))
+    checkAnswer(sql("select email from partitionTable"), Seq(Row("abc"), Row("def"), Row("def"), Row("def"), Row("def"), Row("def"), Row("def")))
+    checkAnswer(sql("select count(*) from partitionTable"), Seq(Row(7)))
+    checkAnswer(sql("select id from partitionTable where email = 'def'"), Seq(Row(2), Row(3), Row(2), Row(3), Row(2), Row(3)))
+    // alter table add partition should merge index files
+    assert(FileFactory.getCarbonFile(sdkWritePath)
+      .listFiles()
+      .exists(_.getName.contains(".carbonindexmerge")))
+
+    // do compaction to sort data written by sdk
+    sql("alter table partitionTable compact 'major'")
+    assert(sql("show segments for table partitionTable").collectAsList().get(0).getString(1).contains("Compacted"))
+    checkAnswer(sql("show partitions partitionTable"), Seq(Row("email=abc"), Row("email=def")))
+    checkAnswer(sql("select email from partitionTable"), Seq(Row("abc"), Row("def"), Row("def"), Row("def"), Row("def"), Row("def"), Row("def")))
+    checkAnswer(sql("select count(*) from partitionTable"), Seq(Row(7)))
+    checkAnswer(sql("select id from partitionTable where email = 'def'"), Seq(Row(2), Row(3), Row(2), Row(3), Row(2), Row(3)))
+
+    sql("drop table if exists partitionTable")
+    FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(sdkWritePath))
+  }
+
   test("add partition with static column partition with load command") {
     sql(
       """
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
index 56f4d29..ea73ebe 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
@@ -205,9 +205,9 @@ case class AlterTableCompactionAbortEvent(sparkSession: SparkSession,
 /**
  * Compaction Event for handling merge index in alter DDL
  *
- * @param sparkSession
- * @param carbonTable
- * @param alterTableModel
+ * @param sparkSession spark session
+ * @param carbonTable carbon table
+ * @param alterTableModel alter request
  */
 case class AlterTableMergeIndexEvent(sparkSession: SparkSession,
     carbonTable: CarbonTable,
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
index 967f390..fcee9f1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
@@ -84,12 +84,20 @@ class MergeIndexEventListener extends OperationEventListener with Logging {
                             carbonMainTable
                               .getTableName
                           }")
-              val validSegments: mutable.Buffer[Segment] = CarbonDataMergerUtil.getValidSegmentList(
-                carbonMainTable.getAbsoluteTableIdentifier, carbonMainTable.isChildTable).asScala
-              val validSegmentIds: mutable.Buffer[String] = mutable.Buffer[String]()
-              validSegments.foreach { segment =>
-                validSegmentIds += segment.getSegmentNo
-              }
+              val segmentsToMerge =
+                if (alterTableMergeIndexEvent.alterTableModel.customSegmentIds.isEmpty) {
+                  val validSegments = CarbonDataMergerUtil.getValidSegmentList(
+                    carbonMainTable.getAbsoluteTableIdentifier,
+                    carbonMainTable.isChildTable).asScala
+                  val validSegmentIds: mutable.Buffer[String] = mutable.Buffer[String]()
+                  validSegments.foreach { segment =>
+                    validSegmentIds += segment.getSegmentNo
+                  }
+                  validSegmentIds
+                } else {
+                  alterTableMergeIndexEvent.alterTableModel.customSegmentIds.get
+                }
+
               val loadFolderDetailsArray = SegmentStatusManager
                 .readLoadMetadata(carbonMainTable.getMetadataPath)
               val segmentFileNameMap: java.util.Map[String, String] = new util.HashMap[String,
@@ -105,14 +113,14 @@ class MergeIndexEventListener extends OperationEventListener with Logging {
               // old store is also upgraded to new store
               CarbonMergeFilesRDD.mergeIndexFiles(
                 sparkSession = sparkSession,
-                segmentIds = validSegmentIds,
+                segmentIds = segmentsToMerge,
                 segmentFileNameToSegmentIdMap = segmentFileNameMap,
                 tablePath = carbonMainTable.getTablePath,
                 carbonTable = carbonMainTable,
                 mergeIndexProperty = true,
                 readFileFooterFromCarbonDataFile = true)
               // clear Block dataMap Cache
-              MergeIndexUtil.clearBlockDataMapCache(carbonMainTable, validSegmentIds)
+              MergeIndexUtil.clearBlockDataMapCache(carbonMainTable, segmentsToMerge)
               val requestMessage = "Compaction request completed for table " +
                 s"${ carbonMainTable.getDatabaseName }.${ carbonMainTable.getTableName }"
               LOGGER.info(requestMessage)
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
index a7bf5f4..f29b7b2 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
@@ -24,7 +24,7 @@ import scala.collection.JavaConverters._
 import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
-import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand, AlterTableDropPartitionCommand, AtomicRunnableCommand}
+import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand, AlterTableDropPartitionCommand, AlterTableModel, AtomicRunnableCommand}
 import org.apache.spark.sql.optimizer.CarbonFilters
 
 import org.apache.carbondata.common.logging.LogServiceFactory
@@ -36,7 +36,7 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.statusmanager.SegmentStatus
 import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.events.{OperationContext, OperationListenerBus, PostAlterTableHivePartitionCommandEvent, PreAlterTableHivePartitionCommandEvent}
+import org.apache.carbondata.events.{AlterTableMergeIndexEvent, OperationContext, OperationListenerBus, PostAlterTableHivePartitionCommandEvent, PreAlterTableHivePartitionCommandEvent}
 import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
 import org.apache.carbondata.processing.util.CarbonLoaderUtil
 
@@ -116,7 +116,7 @@ case class CarbonAlterTableAddHivePartitionCommand(
       if (segmentFile != null) {
         val indexToSchemas = SegmentFileStore.getSchemaFiles(segmentFile, table.getTablePath)
         val tableColums = table.getTableInfo.getFactTable.getListOfColumns.asScala
-        var isSameSchema = indexToSchemas.asScala.exists{ case(key, columnSchemas) =>
+        val isSameSchema = indexToSchemas.asScala.exists{ case(key, columnSchemas) =>
           columnSchemas.asScala.exists { col =>
             tableColums.exists(p => p.getColumnUniqueId.equals(col.getColumnUniqueId))
           } && columnSchemas.size() == tableColums.length
@@ -153,6 +153,23 @@ case class CarbonAlterTableAddHivePartitionCommand(
         CarbonLoaderUtil.addDataIndexSizeIntoMetaEntry(newMetaEntry, loadModel.getSegmentId, table)
         // Make the load as success in table status
         CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, loadModel, false, false)
+
+        // Normally, application will use Carbon SDK to write files into a partition folder, then
+        // add the folder to partitioned carbon table.
+        // If there are many threads writes to the same partition folder, there will be many
+        // carbon index files, and it is not good for query performance since all index files
+        // need to be read to spark driver.
+        // So, here trigger to merge the index files by sending an event
+        val alterTableModel = AlterTableModel(
+          dbName = Some(table.getDatabaseName),
+          tableName = table.getTableName,
+          segmentUpdateStatusManager = None,
+          compactionType = "", // to trigger index merge, this is not required
+          factTimeStamp = Some(System.currentTimeMillis()),
+          alterSql = null,
+          customSegmentIds = Some(Seq(loadModel.getSegmentId).toList))
+        val mergeIndexEvent = AlterTableMergeIndexEvent(sparkSession, table, alterTableModel)
+        OperationListenerBus.getInstance.fireEvent(mergeIndexEvent, new OperationContext)
       }
     }
     Seq.empty[Row]