You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2018/10/25 11:59:01 UTC
carbondata git commit: [CARBONDATA-3011] Add carbon property to
configure vector based row pruning push down
Repository: carbondata
Updated Branches:
refs/heads/master 9578786b2 -> de6e98b08
[CARBONDATA-3011] Add carbon property to configure vector based row pruning push down
Added below configuration in carbon to enable or disable row filter push down for vector.
carbon.push.rowfilters.for.vector
When enabled complete row filters will be handled by carbon in case of vector.
If it is disabled then only page level pruning will be done by carbon and row level filtering will be done by spark for vector.
There is no change in flow for non-vector based queries.
Default value is true
This closes #2818
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/de6e98b0
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/de6e98b0
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/de6e98b0
Branch: refs/heads/master
Commit: de6e98b085723811b0894e659c3c4ce9770f7ca2
Parents: 9578786
Author: ravipesala <ra...@gmail.com>
Authored: Tue Oct 16 10:32:18 2018 +0530
Committer: kumarvishal09 <ku...@gmail.com>
Committed: Thu Oct 25 17:28:29 2018 +0530
----------------------------------------------------------------------
.../core/constants/CarbonCommonConstants.java | 12 +++
.../carbondata/core/scan/model/QueryModel.java | 13 ++++
.../carbondata/core/util/CarbonProperties.java | 8 ++
.../carbondata/spark/rdd/CarbonScanRDD.scala | 17 +++-
.../strategy/CarbonLateDecodeStrategy.scala | 82 +++++++++++++++++---
5 files changed, 120 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/de6e98b0/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index fa5227b..72da3bd 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1725,6 +1725,18 @@ public final class CarbonCommonConstants {
*/
public static final String CARBON_WRITTEN_BY_APPNAME = "carbon.writtenby.app.name";
+ /**
+ * When enabled complete row filters will be handled by carbon in case of vector.
+ * If it is disabled then only page level pruning will be done by carbon and row level filtering
+ * will be done by spark for vector.
+ * There is no change in flow for non-vector based queries.
+ */
+ @CarbonProperty
+ public static final String CARBON_PUSH_ROW_FILTERS_FOR_VECTOR =
+ "carbon.push.rowfilters.for.vector";
+
+ public static final String CARBON_PUSH_ROW_FILTERS_FOR_VECTOR_DEFAULT = "true";
+
//////////////////////////////////////////////////////////////////////////////////////////
// Unused constants and parameters start here
//////////////////////////////////////////////////////////////////////////////////////////
http://git-wip-us.apache.org/repos/asf/carbondata/blob/de6e98b0/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
index d90c35e..0951da0 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
@@ -124,6 +124,11 @@ public class QueryModel {
private boolean preFetchData = true;
+ /**
+ * It fills the vector directly from decoded column page with out any staging and conversions
+ */
+ private boolean isDirectVectorFill;
+
private QueryModel(CarbonTable carbonTable) {
tableBlockInfos = new ArrayList<TableBlockInfo>();
invalidSegmentIds = new ArrayList<>();
@@ -406,6 +411,14 @@ public class QueryModel {
this.preFetchData = preFetchData;
}
+ public boolean isDirectVectorFill() {
+ return isDirectVectorFill;
+ }
+
+ public void setDirectVectorFill(boolean directVectorFill) {
+ isDirectVectorFill = directVectorFill;
+ }
+
@Override
public String toString() {
return String.format("scan on table %s.%s, %d projection columns with filter (%s)",
http://git-wip-us.apache.org/repos/asf/carbondata/blob/de6e98b0/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index e6d48e5..49d89e7 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -1500,6 +1500,12 @@ public final class CarbonProperties {
return spillPercentage;
}
+ public boolean isPushRowFiltersForVector() {
+ String pushFilters = getProperty(CarbonCommonConstants.CARBON_PUSH_ROW_FILTERS_FOR_VECTOR,
+ CarbonCommonConstants.CARBON_PUSH_ROW_FILTERS_FOR_VECTOR_DEFAULT);
+ return Boolean.parseBoolean(pushFilters);
+ }
+
private void validateSortMemorySpillPercentage() {
String spillPercentageStr = carbonProperties.getProperty(
CARBON_LOAD_SORT_MEMORY_SPILL_PERCENTAGE,
@@ -1558,4 +1564,6 @@ public final class CarbonProperties {
CarbonCommonConstants.CARBON_MINMAX_ALLOWED_BYTE_COUNT_DEFAULT);
}
}
+
+
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/de6e98b0/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 1a7eae2..33031fc 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -90,6 +90,8 @@ class CarbonScanRDD[T: ClassTag](
}
private var vectorReader = false
+ private var directScan = false
+
private val bucketedTable = tableInfo.getFactTable.getBucketingInfo
@transient val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
@@ -228,9 +230,13 @@ class CarbonScanRDD[T: ClassTag](
statistic.addStatistics(QueryStatisticsConstants.BLOCK_ALLOCATION, System.currentTimeMillis)
statisticRecorder.recordStatisticsForDriver(statistic, queryId)
statistic = new QueryStatistic()
- val carbonDistribution = CarbonProperties.getInstance().getProperty(
- CarbonCommonConstants.CARBON_TASK_DISTRIBUTION,
- CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT)
+ val carbonDistribution = if (directScan) {
+ CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_MERGE_FILES
+ } else {
+ CarbonProperties.getInstance().getProperty(
+ CarbonCommonConstants.CARBON_TASK_DISTRIBUTION,
+ CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT)
+ }
// If bucketing is enabled on table then partitions should be grouped based on buckets.
if (bucketedTable != null) {
var i = 0
@@ -437,6 +443,7 @@ class CarbonScanRDD[T: ClassTag](
case _ =>
// create record reader for CarbonData file format
if (vectorReader) {
+ model.setDirectVectorFill(directScan)
val carbonRecordReader = createVectorizedCarbonRecordReader(model,
inputMetricsStats,
"true")
@@ -748,4 +755,8 @@ class CarbonScanRDD[T: ClassTag](
vectorReader = boolean
}
+ // TODO find the better way set it.
+ def setDirectScanSupport(isDirectScan: Boolean): Unit = {
+ directScan = isDirectScan
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/de6e98b0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
index f0184cd..04b65e8 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
@@ -57,6 +57,8 @@ import org.apache.carbondata.spark.util.CarbonScalaUtil
private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
val PUSHED_FILTERS = "PushedFilters"
+ val vectorPushRowFilters = CarbonProperties.getInstance().isPushRowFiltersForVector
+
/*
Spark 2.3.1 plan there can be case of multiple projections like below
Project [substring(name, 1, 2)#124, name#123, tupleId#117, cast(rand(-6778822102499951904)#125
@@ -213,11 +215,12 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
rdd: RDD[InternalRow],
needDecode: ArrayBuffer[AttributeReference]):
RDD[InternalRow] = {
+ val scanRdd = rdd.asInstanceOf[CarbonScanRDD[InternalRow]]
if (needDecode.nonEmpty) {
- rdd.asInstanceOf[CarbonScanRDD[InternalRow]].setVectorReaderSupport(false)
+ scanRdd.setVectorReaderSupport(false)
getDecoderRDD(relation, needDecode, rdd, output)
} else {
- rdd.asInstanceOf[CarbonScanRDD[InternalRow]]
+ scanRdd
.setVectorReaderSupport(supportBatchedDataSource(relation.relation.sqlContext, output))
rdd
}
@@ -300,7 +303,10 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
}
}
}
-
+ // in case of the global dictionary if it has the filter then it needs to decode all data before
+ // applying the filter in spark's side. So we should disable vectorPushRowFilters option
+ // in case of filters on global dictionary.
+ val hasDictionaryFilterCols = hasFilterOnDictionaryColumn(filterSet, table)
if (projects.map(_.toAttribute) == projects &&
projectSet.size == projects.size &&
filterSet.subsetOf(projectSet)) {
@@ -337,19 +343,37 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
metadata,
needDecoder,
updateRequestedColumns.asInstanceOf[Seq[Attribute]])
- filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan)
+ // Check whether spark should handle row filters in case of vector flow.
+ if (!vectorPushRowFilters && scan.isInstanceOf[CarbonDataSourceScan]
+ && !hasDictionaryFilterCols) {
+ // Here carbon only do page pruning and row level pruning will be done by spark.
+ scan.inputRDDs().head match {
+ case rdd: CarbonScanRDD[InternalRow] =>
+ rdd.setDirectScanSupport(true)
+ case _ =>
+ }
+ filterPredicates.reduceLeftOption(expressions.And).map(execution.FilterExec(_, scan))
+ .getOrElse(scan)
+ } else {
+ filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan)
+ }
} else {
var newProjectList: Seq[Attribute] = Seq.empty
+ // In case of implicit exist we should disable vectorPushRowFilters as it goes in IUD flow
+ // to get the positionId or tupleID
+ var implictsExisted = false
val updatedProjects = projects.map {
case a@Alias(s: ScalaUDF, name)
if name.equalsIgnoreCase(CarbonCommonConstants.POSITION_ID) ||
name.equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID) =>
val reference = AttributeReference(name, StringType, true)().withExprId(a.exprId)
newProjectList :+= reference
+ implictsExisted = true
reference
case a@Alias(s: ScalaUDF, name)
if name.equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_SEGMENTID) =>
+ implictsExisted = true
val reference =
AttributeReference(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID,
StringType, true)().withExprId(a.exprId)
@@ -363,7 +387,24 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
// Don't request columns that are only referenced by pushed filters.
val requestedColumns =
(projectSet ++ filterSet -- handledSet).map(relation.attributeMap).toSeq ++ newProjectList
- val updateRequestedColumns = updateRequestedColumnsFunc(requestedColumns, table, needDecoder)
+
+ var updateRequestedColumns =
+ if (!vectorPushRowFilters && !implictsExisted && !hasDictionaryFilterCols) {
+ updateRequestedColumnsFunc(
+ (projectSet ++ filterSet).map(relation.attributeMap).toSeq,
+ table,
+ needDecoder)
+ } else {
+ updateRequestedColumnsFunc(requestedColumns, table, needDecoder)
+ }
+ val supportBatch =
+ supportBatchedDataSource(relation.relation.sqlContext,
+ updateRequestedColumns.asInstanceOf[Seq[Attribute]]) &&
+ needDecoder.isEmpty
+ if (!vectorPushRowFilters && !supportBatch && !implictsExisted && !hasDictionaryFilterCols) {
+ // revert for row scan
+ updateRequestedColumns = updateRequestedColumnsFunc(requestedColumns, table, needDecoder)
+ }
val scan = getDataSourceScan(relation,
updateRequestedColumns.asInstanceOf[Seq[Attribute]],
partitions,
@@ -374,10 +415,27 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
metadata,
needDecoder,
updateRequestedColumns.asInstanceOf[Seq[Attribute]])
- execution.ProjectExec(
- updateRequestedColumnsFunc(updatedProjects, table,
- needDecoder).asInstanceOf[Seq[NamedExpression]],
- filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan))
+ // Check whether spark should handle row filters in case of vector flow.
+ if (!vectorPushRowFilters && scan.isInstanceOf[CarbonDataSourceScan]
+ && !implictsExisted && !hasDictionaryFilterCols) {
+ // Here carbon only do page pruning and row level pruning will be done by spark.
+ scan.inputRDDs().head match {
+ case rdd: CarbonScanRDD[InternalRow] =>
+ rdd.setDirectScanSupport(true)
+ case _ =>
+ }
+ execution.ProjectExec(
+ updateRequestedColumnsFunc(updatedProjects, table,
+ needDecoder).asInstanceOf[Seq[NamedExpression]],
+ filterPredicates.reduceLeftOption(expressions.And).map(
+ execution.FilterExec(_, scan)).getOrElse(scan))
+ } else {
+ execution.ProjectExec(
+ updateRequestedColumnsFunc(updatedProjects, table,
+ needDecoder).asInstanceOf[Seq[NamedExpression]],
+ filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan))
+ }
+
}
}
@@ -457,6 +515,12 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
}
}
+ private def hasFilterOnDictionaryColumn(filterColumns: AttributeSet,
+ relation: CarbonDatasourceHadoopRelation): Boolean = {
+ val map = relation.carbonRelation.metaData.dictionaryMap
+ filterColumns.exists(c => map.get(c.name).getOrElse(false))
+ }
+
private def getPartitioning(carbonTable: CarbonTable,
output: Seq[Attribute]): Partitioning = {
val info: BucketingInfo = carbonTable.getBucketingInfo(carbonTable.getTableName)