You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2020/04/20 07:19:40 UTC

[carbondata] branch master updated: [HOTFIX] Avoid reading table status file again per query

This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new d1b4221  [HOTFIX] Avoid reading table status file again per query
d1b4221 is described below

commit d1b4221c28542bed634aed7fb995d519ea363536
Author: ajantha-bhat <aj...@gmail.com>
AuthorDate: Wed Apr 15 11:41:47 2020 +0530

    [HOTFIX] Avoid reading table status file again per query
    
    Why is this PR needed?
    After add segment feature, we take snapshot twice (which leads in two times table status IO),
    we can refuse the original snapshot
    a. First time snapshot in LateDecodeStrategy.pruneFilterProjectRaw()
    b. second time in CarbonTableInputFormat.getSplits()
    
    What changes were proposed in this PR?
    Take snapshot only once and reuse in the second place
    
    This closes #3709
---
 .../hadoop/api/CarbonTableInputFormat.java         |  4 ++++
 .../carbondata/spark/rdd/CarbonScanRDD.scala       | 12 ++++++++++
 .../strategy/CarbonLateDecodeStrategy.scala        | 26 ++++++++++++++++++++--
 3 files changed, 40 insertions(+), 2 deletions(-)

diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index 7a8a2e7..7d2c2f7 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -572,4 +572,8 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
     }
     return readCommittedScope;
   }
+
+  public void setReadCommittedScope(ReadCommittedScope readCommittedScope) {
+    this.readCommittedScope = readCommittedScope;
+  }
 }
\ No newline at end of file
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index df2b6a6..5424b66 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -50,6 +50,7 @@ import org.apache.carbondata.core.index.{IndexFilter, Segment}
 import org.apache.carbondata.core.indexstore.PartitionSpec
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
+import org.apache.carbondata.core.readcommitter.ReadCommittedScope
 import org.apache.carbondata.core.scan.expression.Expression
 import org.apache.carbondata.core.scan.expression.conditional.ImplicitExpression
 import org.apache.carbondata.core.scan.expression.logical.AndExpression
@@ -98,6 +99,8 @@ class CarbonScanRDD[T: ClassTag](
 
   private var segmentsToAccess: Array[Segment] = _
 
+  private var readCommittedScope: ReadCommittedScope = _
+
   @transient val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
 
   override def internalGetPartitions: Array[Partition] = {
@@ -133,6 +136,11 @@ class CarbonScanRDD[T: ClassTag](
       // get splits
       getSplitsStartTime = System.currentTimeMillis()
       if (null == splits) {
+        format match {
+          case inputFormat: CarbonTableInputFormat[Object] =>
+            inputFormat.setReadCommittedScope(readCommittedScope)
+          case _ =>
+        }
         splits = format.getSplits(job)
       }
       getSplitsEndTime = System.currentTimeMillis()
@@ -781,4 +789,8 @@ class CarbonScanRDD[T: ClassTag](
       indexFilter.setExpression(new AndExpression(indexFilter.getExpression, expressionVal))
     }
   }
+
+  def setReadCommittedScope(readCommittedScope: ReadCommittedScope): Unit = {
+    this.readCommittedScope = readCommittedScope
+  }
 }
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
index 4beb091..d05d8f9 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
@@ -349,8 +349,14 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
       scanBuilder: (Seq[Attribute], Seq[Expression], Seq[Filter], Seq[PartitionSpec])
         => RDD[InternalRow]): CodegenSupport = {
     val table = relation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
-    val extraRdd = MixedFormatHandler.extraRDD(relation, rawProjects, filterPredicates,
-      new TableStatusReadCommittedScope(table.identifier, FileFactory.getConfiguration),
+    val readCommittedScope = new TableStatusReadCommittedScope(
+      table.identifier,
+      FileFactory.getConfiguration)
+    val extraRdd = MixedFormatHandler.extraRDD(
+      relation,
+      rawProjects,
+      filterPredicates,
+      readCommittedScope,
       table.identifier)
     val projects = rawProjects.map {p =>
       p.transform {
@@ -441,6 +447,14 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
         metadata,
         updateRequestedColumns.asInstanceOf[Seq[Attribute]],
         extraRdd)
+      if (scan.isInstanceOf[CarbonDataSourceScan] && table.carbonTable.isTransactionalTable) {
+        // set the read committed scope and reuse in the scanRDD
+        scan.inputRDDs().head match {
+          case rdd: CarbonScanRDD[InternalRow] =>
+            rdd.setReadCommittedScope(readCommittedScope)
+          case _ =>
+        }
+      }
       // Check whether spark should handle row filters in case of vector flow.
       if (!vectorPushRowFilters && scan.isInstanceOf[CarbonDataSourceScan]) {
         // Here carbon only do page pruning and row level pruning will be done by spark.
@@ -522,6 +536,14 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
         metadata,
         newRequestedColumns,
         extraRdd)
+      if (scan.isInstanceOf[CarbonDataSourceScan] && table.carbonTable.isTransactionalTable) {
+        // set the read committed scope and reuse in the scanRDD
+        scan.inputRDDs().head match {
+          case rdd: CarbonScanRDD[InternalRow] =>
+            rdd.setReadCommittedScope(readCommittedScope)
+          case _ =>
+        }
+      }
       // Check whether spark should handle row filters in case of vector flow.
       if (!vectorPushRowFilters &&
           scan.isInstanceOf[CarbonDataSourceScan] &&