You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2020/04/20 07:19:40 UTC
[carbondata] branch master updated: [HOTFIX] Avoid reading table
status file again per query
This is an automated email from the ASF dual-hosted git repository.
kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new d1b4221 [HOTFIX] Avoid reading table status file again per query
d1b4221 is described below
commit d1b4221c28542bed634aed7fb995d519ea363536
Author: ajantha-bhat <aj...@gmail.com>
AuthorDate: Wed Apr 15 11:41:47 2020 +0530
[HOTFIX] Avoid reading table status file again per query
Why is this PR needed?
After add segment feature, we take snapshot twice (which leads in two times table status IO),
we can refuse the original snapshot
a. First time snapshot in LateDecodeStrategy.pruneFilterProjectRaw()
b. second time in CarbonTableInputFormat.getSplits()
What changes were proposed in this PR?
Take snapshot only once and reuse in the second place
This closes #3709
---
.../hadoop/api/CarbonTableInputFormat.java | 4 ++++
.../carbondata/spark/rdd/CarbonScanRDD.scala | 12 ++++++++++
.../strategy/CarbonLateDecodeStrategy.scala | 26 ++++++++++++++++++++--
3 files changed, 40 insertions(+), 2 deletions(-)
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index 7a8a2e7..7d2c2f7 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -572,4 +572,8 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
}
return readCommittedScope;
}
+
+ public void setReadCommittedScope(ReadCommittedScope readCommittedScope) {
+ this.readCommittedScope = readCommittedScope;
+ }
}
\ No newline at end of file
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index df2b6a6..5424b66 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -50,6 +50,7 @@ import org.apache.carbondata.core.index.{IndexFilter, Segment}
import org.apache.carbondata.core.indexstore.PartitionSpec
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
+import org.apache.carbondata.core.readcommitter.ReadCommittedScope
import org.apache.carbondata.core.scan.expression.Expression
import org.apache.carbondata.core.scan.expression.conditional.ImplicitExpression
import org.apache.carbondata.core.scan.expression.logical.AndExpression
@@ -98,6 +99,8 @@ class CarbonScanRDD[T: ClassTag](
private var segmentsToAccess: Array[Segment] = _
+ private var readCommittedScope: ReadCommittedScope = _
+
@transient val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
override def internalGetPartitions: Array[Partition] = {
@@ -133,6 +136,11 @@ class CarbonScanRDD[T: ClassTag](
// get splits
getSplitsStartTime = System.currentTimeMillis()
if (null == splits) {
+ format match {
+ case inputFormat: CarbonTableInputFormat[Object] =>
+ inputFormat.setReadCommittedScope(readCommittedScope)
+ case _ =>
+ }
splits = format.getSplits(job)
}
getSplitsEndTime = System.currentTimeMillis()
@@ -781,4 +789,8 @@ class CarbonScanRDD[T: ClassTag](
indexFilter.setExpression(new AndExpression(indexFilter.getExpression, expressionVal))
}
}
+
+ def setReadCommittedScope(readCommittedScope: ReadCommittedScope): Unit = {
+ this.readCommittedScope = readCommittedScope
+ }
}
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
index 4beb091..d05d8f9 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
@@ -349,8 +349,14 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
scanBuilder: (Seq[Attribute], Seq[Expression], Seq[Filter], Seq[PartitionSpec])
=> RDD[InternalRow]): CodegenSupport = {
val table = relation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
- val extraRdd = MixedFormatHandler.extraRDD(relation, rawProjects, filterPredicates,
- new TableStatusReadCommittedScope(table.identifier, FileFactory.getConfiguration),
+ val readCommittedScope = new TableStatusReadCommittedScope(
+ table.identifier,
+ FileFactory.getConfiguration)
+ val extraRdd = MixedFormatHandler.extraRDD(
+ relation,
+ rawProjects,
+ filterPredicates,
+ readCommittedScope,
table.identifier)
val projects = rawProjects.map {p =>
p.transform {
@@ -441,6 +447,14 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
metadata,
updateRequestedColumns.asInstanceOf[Seq[Attribute]],
extraRdd)
+ if (scan.isInstanceOf[CarbonDataSourceScan] && table.carbonTable.isTransactionalTable) {
+ // set the read committed scope and reuse in the scanRDD
+ scan.inputRDDs().head match {
+ case rdd: CarbonScanRDD[InternalRow] =>
+ rdd.setReadCommittedScope(readCommittedScope)
+ case _ =>
+ }
+ }
// Check whether spark should handle row filters in case of vector flow.
if (!vectorPushRowFilters && scan.isInstanceOf[CarbonDataSourceScan]) {
// Here carbon only do page pruning and row level pruning will be done by spark.
@@ -522,6 +536,14 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
metadata,
newRequestedColumns,
extraRdd)
+ if (scan.isInstanceOf[CarbonDataSourceScan] && table.carbonTable.isTransactionalTable) {
+ // set the read committed scope and reuse in the scanRDD
+ scan.inputRDDs().head match {
+ case rdd: CarbonScanRDD[InternalRow] =>
+ rdd.setReadCommittedScope(readCommittedScope)
+ case _ =>
+ }
+ }
// Check whether spark should handle row filters in case of vector flow.
if (!vectorPushRowFilters &&
scan.isInstanceOf[CarbonDataSourceScan] &&