You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2020/09/22 10:39:38 UTC

[carbondata] branch master updated: [CARBONDATA-4001] Fix SI global sort load on partition table

This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new b7548d5  [CARBONDATA-4001] Fix SI global sort load on partition table
b7548d5 is described below

commit b7548d5fe1763306e0bc1e94b71c406b76ea180e
Author: ajantha-bhat <aj...@gmail.com>
AuthorDate: Mon Sep 21 18:56:39 2020 +0530

    [CARBONDATA-4001] Fix SI global sort load on partition table
    
    Why is this PR needed?
    On a partition table, when SI is created with global sort and
    when data is loaded. It shows 0 rows as main table query results is 0 rows.
    
    What changes were proposed in this PR?
    For partition table, local sort SI flow current.segmentfile
    is set in CarbonSecondaryIndexRDD
    For the global sort, this value was not set. so, the main table
    query was resulting in 0 rows. Setting this value for global sort flow also.
    
    This closes #3944
---
 .../carbondata/hadoop/api/CarbonInputFormat.java   | 24 +++++++--------
 .../secondaryindex/TestSIWithSecondryIndex.scala   | 24 +++++++++++++++
 .../carbondata/spark/rdd/CarbonScanRDD.scala       | 18 +++++++++++-
 .../secondaryindex/rdd/SecondaryIndexCreator.scala | 34 ++++++++++++++++++----
 4 files changed, 81 insertions(+), 19 deletions(-)

diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
index 43cbe1f..9765913 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
@@ -466,7 +466,7 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
    * get data blocks of given segment
    */
   protected List<CarbonInputSplit> getDataBlocksOfSegment(JobContext job, CarbonTable carbonTable,
-      IndexFilter expression, List<Segment> segmentIds,
+      IndexFilter expression, List<Segment> validSegments,
       List<Segment> invalidSegments, List<String> segmentsToBeRefreshed)
       throws IOException {
 
@@ -474,7 +474,7 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
     QueryStatistic statistic = new QueryStatistic();
 
     List<ExtendedBlocklet> prunedBlocklets =
-        getPrunedBlocklets(job, carbonTable, expression, segmentIds, invalidSegments,
+        getPrunedBlocklets(job, carbonTable, expression, validSegments, invalidSegments,
             segmentsToBeRefreshed);
     List<CarbonInputSplit> resultFilteredBlocks = new ArrayList<>();
     for (ExtendedBlocklet blocklet : prunedBlocklets) {
@@ -508,7 +508,7 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
    * First pruned with default blocklet index, then pruned with CG and FG index
    */
   private List<ExtendedBlocklet> getPrunedBlocklets(JobContext job, CarbonTable carbonTable,
-      IndexFilter filter, List<Segment> segmentIds, List<Segment> invalidSegments,
+      IndexFilter filter, List<Segment> validSegments, List<Segment> invalidSegments,
       List<String> segmentsToBeRefreshed) throws IOException {
     ExplainCollector.addPruningInfo(carbonTable.getTableName());
     filter = filter == null ? new IndexFilter(carbonTable, null) : filter;
@@ -529,7 +529,7 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
     if (isDistributedPruningEnabled) {
       try {
         prunedBlocklets =
-            getDistributedSplit(carbonTable, filter.getResolver(), partitionsToPrune, segmentIds,
+            getDistributedSplit(carbonTable, filter.getResolver(), partitionsToPrune, validSegments,
                 invalidSegments, segmentsToBeRefreshed, false, job.getConfiguration());
       } catch (Exception e) {
         // Check if fallback is disabled then directly throw exception otherwise try driver
@@ -537,15 +537,15 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
         if (CarbonProperties.getInstance().isFallBackDisabled()) {
           throw e;
         }
-        prunedBlocklets = defaultIndex.prune(segmentIds, filter, partitionsToPrune);
+        prunedBlocklets = defaultIndex.prune(validSegments, filter, partitionsToPrune);
       }
     } else {
       if (carbonTable.isTransactionalTable()) {
         IndexExprWrapper indexExprWrapper =
             IndexChooser.getDefaultIndex(getOrCreateCarbonTable(job.getConfiguration()), null);
-        IndexUtil.loadIndexes(carbonTable, indexExprWrapper, segmentIds);
+        IndexUtil.loadIndexes(carbonTable, indexExprWrapper, validSegments);
       }
-      prunedBlocklets = defaultIndex.prune(segmentIds, filter, partitionsToPrune);
+      prunedBlocklets = defaultIndex.prune(validSegments, filter, partitionsToPrune);
 
       if (ExplainCollector.enabled()) {
         ExplainCollector.setDefaultIndexPruningBlockHit(getBlockCount(prunedBlocklets));
@@ -562,7 +562,7 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
 
       if (cgIndexExprWrapper != null) {
         // Prune segments from already pruned blocklets
-        IndexUtil.pruneSegments(segmentIds, prunedBlocklets);
+        IndexUtil.pruneSegments(validSegments, prunedBlocklets);
         List<ExtendedBlocklet> cgPrunedBlocklets = new ArrayList<>();
         boolean isCGPruneFallback = false;
         // Again prune with CG index.
@@ -570,10 +570,10 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
           if (distributedCG && indexJob != null) {
             cgPrunedBlocklets = IndexUtil
                 .executeIndexJob(carbonTable, filter.getResolver(), indexJob, partitionsToPrune,
-                    segmentIds, invalidSegments, IndexLevel.CG, new ArrayList<>(),
+                    validSegments, invalidSegments, IndexLevel.CG, new ArrayList<>(),
                     job.getConfiguration());
           } else {
-            cgPrunedBlocklets = cgIndexExprWrapper.prune(segmentIds, partitionsToPrune);
+            cgPrunedBlocklets = cgIndexExprWrapper.prune(validSegments, partitionsToPrune);
           }
         } catch (Exception e) {
           isCGPruneFallback = true;
@@ -603,11 +603,11 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
         List<ExtendedBlocklet> fgPrunedBlocklets;
         if (fgIndexExprWrapper != null) {
           // Prune segments from already pruned blocklets
-          IndexUtil.pruneSegments(segmentIds, prunedBlocklets);
+          IndexUtil.pruneSegments(validSegments, prunedBlocklets);
           // Prune segments from already pruned blocklets
           fgPrunedBlocklets = IndexUtil
               .executeIndexJob(carbonTable, filter.getResolver(), indexJob, partitionsToPrune,
-                  segmentIds, invalidSegments, fgIndexExprWrapper.getIndexLevel(),
+                  validSegments, invalidSegments, fgIndexExprWrapper.getIndexLevel(),
                   new ArrayList<>(), job.getConfiguration());
           // note that the 'fgPrunedBlocklets' has extra index related info compared with
           // 'prunedBlocklets', so the intersection should keep the elements in 'fgPrunedBlocklets'
diff --git a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithSecondryIndex.scala b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithSecondryIndex.scala
index 47a2110..26ad960 100644
--- a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithSecondryIndex.scala
+++ b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithSecondryIndex.scala
@@ -126,6 +126,30 @@ class TestSIWithSecondryIndex extends QueryTest with BeforeAndAfterAll {
     sql("drop table table1")
   }
 
+  test("test create secondary index global sort on partition table") {
+    sql("drop table if exists partition_carbon_table")
+    sql("create table partition_carbon_table (name string, id string, country string) PARTITIONED BY(dateofjoin " +
+      "string) stored as carbondata")
+    // create SI before the inserting the data
+    sql("create index partition_carbon_table_index on table partition_carbon_table(id, country) as 'carbondata' properties" +
+        "('sort_scope'='global_sort', 'Global_sort_partitions'='3')")
+    sql("insert into partition_carbon_table select 'xx', '2', 'china', '2020' " +
+        "union all select 'xx', '1', 'india', '2021'")
+    checkAnswerWithoutSort(sql("select id, country from partition_carbon_table_index"),
+      Seq(Row("1", "india"), Row("2", "china")))
+    // check for valid sort_scope
+    checkExistence(sql("describe formatted partition_carbon_table_index"), true, "Sort Scope global_sort")
+    sql("drop index partition_carbon_table_index on partition_carbon_table")
+    // create SI after the inserting the data
+    sql("create index partition_carbon_table_index on table partition_carbon_table(id, country) as 'carbondata' properties" +
+        "('sort_scope'='global_sort', 'Global_sort_partitions'='3')")
+    checkAnswerWithoutSort(sql("select id, country from partition_carbon_table_index"),
+      Seq(Row("1", "india"), Row("2", "china")))
+    // check for valid sort_scope
+    checkExistence(sql("describe formatted partition_carbon_table_index"), true, "Sort Scope global_sort")
+    sql("drop table partition_carbon_table")
+  }
+
   test("test array<string> and string as index columns on secondary index with global sort") {
     sql("drop table if exists complextable")
     sql(
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 2669a5f..6929169 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -26,6 +26,7 @@ import scala.reflect.ClassTag
 import scala.util.Random
 import scala.util.control.Breaks.{break, breakable}
 
+import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.mapreduce._
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
@@ -100,6 +101,11 @@ class CarbonScanRDD[T: ClassTag](
   // doesn't validate the segment and allows query on the segments without validation.
   private var validateSegmentToAccess: Boolean = true
 
+  // Used for setting the current segment file name for partitioned table
+  // during global sort SI load for querying main table.
+  // Which will be same as output committer segment file name.
+  private var currentSegmentFileName: String = _
+
   @transient val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
 
   override def internalGetPartitions: Array[Partition] = {
@@ -124,7 +130,12 @@ class CarbonScanRDD[T: ClassTag](
       }
       // initialise query_id for job
       job.getConfiguration.set("query.id", queryId)
-
+      if (!StringUtils.isEmpty(currentSegmentFileName)) {
+        // For querying partitioned main table of SI global sort load,
+        // set the configuration for current segment file("current.segment") as
+        // same as carbon output committer
+        job.getConfiguration.set(CarbonCommonConstants.CURRENT_SEGMENTFILE, currentSegmentFileName)
+      }
       if (null != segmentsToAccess) {
         CarbonInputFormat
           .setSegmentsToAccess(job.getConfiguration, segmentsToAccess.toList.asJava)
@@ -801,4 +812,9 @@ class CarbonScanRDD[T: ClassTag](
   def setValidateSegmentToAccess(needValidate: Boolean): Unit = {
     validateSegmentToAccess = needValidate
   }
+
+  def setCurrentSegmentFileName(segmentFileName: String): Unit = {
+    currentSegmentFileName = segmentFileName;
+  }
+
 }
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
index a6f3120..e611f8b 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
@@ -208,15 +208,27 @@ object SecondaryIndexCreator {
                 try {
                   val configuration = FileFactory.getConfiguration
                   configuration.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, eachSegment)
-                  def findCarbonScanRDD(rdd: RDD[_]): Unit = {
+                  val currentSegmentFileName = if (mainTable.isHivePartitionTable) {
+                    eachSegment + CarbonCommonConstants.UNDERSCORE +
+                    carbonLoadModel.getFactTimeStamp
+                  } else {
+                    null
+                  }
+
+                  def findCarbonScanRDD(rdd: RDD[_], currentSegmentFileName: String): Unit = {
                     rdd match {
                       case carbonScanRDD: CarbonScanRDD[_] =>
                         carbonScanRDD.setValidateSegmentToAccess(false)
+                        if (currentSegmentFileName != null) {
+                          carbonScanRDD.setCurrentSegmentFileName(currentSegmentFileName)
+                        }
                       case others =>
-                        others.dependencies.foreach {x => findCarbonScanRDD(x.rdd)}
+                        others.dependencies
+                          .foreach { x => findCarbonScanRDD(x.rdd, currentSegmentFileName) }
                     }
                   }
-                  findCarbonScanRDD(dataFrame.rdd)
+
+                  findCarbonScanRDD(dataFrame.rdd, currentSegmentFileName)
                   // accumulator to collect segment metadata
                   val segmentMetaDataAccumulator = sc.sparkSession.sqlContext
                     .sparkContext
@@ -571,9 +583,19 @@ object SecondaryIndexCreator {
         case p: Project =>
           Project(p.projectList :+ positionId, p.child)
       }
-      carbonTable.getTableInfo
-        .getFactTable
-        .getTableProperties.put("isPositionIDRequested", "true")
+      val tableProperties = if (carbonTable.isHivePartitionTable) {
+        // in case of partition table, TableProperties object in carbonEnv is not same as
+        // in carbonTable object, so update from carbon env itself.
+        CarbonEnv.getCarbonTable(Some(carbonTable.getDatabaseName), carbonTable.getTableName)(
+          sparkSession).getTableInfo
+          .getFactTable
+          .getTableProperties
+      } else {
+        carbonTable.getTableInfo
+          .getFactTable
+          .getTableProperties
+      }
+      tableProperties.put("isPositionIDRequested", "true")
       SparkSQLUtil.execute(newLogicalPlan, sparkSession)
     } finally {
       CarbonUtils