You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/09/18 10:01:33 UTC
[14/51] [abbrv] carbondata git commit: [CARBONDATA-649] fix for
update with rand function
[CARBONDATA-649] fix for update with rand function
This closes #1296
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/8b38e0b3
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/8b38e0b3
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/8b38e0b3
Branch: refs/heads/branch-1.2
Commit: 8b38e0b3844d2220d6639d25bcafbab7a7af75f7
Parents: 590bbb9
Author: ashwini-krishnakumar <as...@gmail.com>
Authored: Thu Sep 7 07:36:32 2017 +0000
Committer: Ravindra Pesala <ra...@gmail.com>
Committed: Mon Sep 11 14:07:09 2017 +0530
----------------------------------------------------------------------
.../iud/UpdateCarbonTableTestCase.scala | 30 +++++++++++
.../sql/CustomDeterministicExpression.scala | 41 +++++++++++++++
.../spark/sql/hive/CarbonStrategies.scala | 52 ++++++++++--------
.../spark/sql/optimizer/CarbonOptimizer.scala | 55 ++++++++++++++++----
.../sql/CustomDeterministicExpression.scala | 42 +++++++++++++++
.../execution/CarbonLateDecodeStrategy.scala | 49 +++++++++--------
.../sql/optimizer/CarbonLateDecodeRule.scala | 43 +++++++++++++--
7 files changed, 251 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8b38e0b3/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
index 623416b..4186fa2 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
@@ -448,6 +448,36 @@ class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
sql("DROP TABLE IF EXISTS default.carbon1")
}
+ test("update table in carbondata with rand() ") {
+
+ sql("""CREATE TABLE iud.rand(imei string,age int,task bigint,num double,level decimal(10,3),name string)STORED BY 'org.apache.carbondata.format' """)
+ sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/update01.csv' INTO TABLE iud.rand OPTIONS('DELIMITER'=',' , 'QUOTECHAR'='"','BAD_RECORDS_ACTION'='FORCE','FILEHEADER'='imei,age,task,num,level,name')""").collect
+
+ sql("select substring(name,1,2 ) , name ,getTupleId() as tupleId , rand() from iud.rand").show(100)
+
+ sql("select name , substring(name,1,2 ) ,getTupleId() as tupleId , num , rand() from iud.rand").show(100)
+
+ sql("Update rand set (num) = (rand())").show()
+
+ sql("Update rand set (num) = (rand(9))").show()
+
+ sql("Update rand set (name) = ('Lily')").show()
+
+ sql("select name , num from iud.rand").show(100)
+
+ sql("select imei , age , name , num from iud.rand").show(100)
+
+ sql("select rand() , getTupleId() as tupleId from iud.rand").show(100)
+
+ sql("select * from iud.rand").show(100)
+
+ sql("select imei , rand() , num from iud.rand").show(100)
+
+ sql("select name , rand() from iud.rand").show(100)
+
+ sql("DROP TABLE IF EXISTS iud.rand")
+ }
+
override def afterAll {
sql("use default")
sql("drop database if exists iud cascade")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8b38e0b3/integration/spark/src/main/scala/org/apache/spark/sql/CustomDeterministicExpression.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CustomDeterministicExpression.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CustomDeterministicExpression.scala
new file mode 100644
index 0000000..d745be2
--- /dev/null
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CustomDeterministicExpression.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodeGenContext, GeneratedExpressionCode}
+import org.apache.spark.sql.types.{DataType, StringType}
+
+/**
+ * Custom expression to override the deterministic property
+ *
+ */
+case class CustomDeterministicExpression(nonDt: Expression ) extends Expression with Serializable{
+ override def nullable: Boolean = true
+
+ override def eval(input: InternalRow): Any = null
+
+ override protected def genCode(ctx: CodeGenContext,
+ ev: GeneratedExpressionCode): String = ev.code
+ override def deterministic: Boolean = true
+
+ override def dataType: DataType = StringType
+
+ override def children: Seq[Expression] = Seq()
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8b38e0b3/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
index 13ff2a9..204225b 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
@@ -34,7 +34,7 @@ import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources.{DescribeCommand => LogicalDescribeCommand, LogicalRelation}
import org.apache.spark.sql.hive.execution.{DropTable, HiveNativeCommand}
import org.apache.spark.sql.hive.execution.command._
-import org.apache.spark.sql.optimizer.CarbonDecoderRelation
+import org.apache.spark.sql.optimizer.{CarbonDecoderRelation}
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.StringType
@@ -63,15 +63,15 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] {
def apply(plan: LogicalPlan): Seq[SparkPlan] = {
plan match {
case PhysicalOperation(projectList, predicates, l: LogicalRelation)
- if l.relation.isInstanceOf[CarbonDatasourceRelation] =>
+ if l.relation.isInstanceOf[CarbonDatasourceRelation] =>
if (isStarQuery(plan)) {
carbonRawScanForStarQuery(projectList, predicates, l)(sqlContext) :: Nil
} else {
carbonRawScan(projectList, predicates, l)(sqlContext) :: Nil
}
case InsertIntoCarbonTable(relation: CarbonDatasourceRelation,
- _, child: LogicalPlan, overwrite, _) =>
- ExecutedCommand(LoadTableByInsert(relation, child, overwrite)) :: Nil
+ _, child: LogicalPlan, overwrite, _) =>
+ ExecutedCommand(LoadTableByInsert(relation, child, overwrite)) :: Nil
case CarbonDictionaryCatalystDecoder(relations, profile, aliasMap, _, child) =>
CarbonDictionaryDecoder(relations,
profile,
@@ -85,21 +85,27 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] {
/**
* Create carbon scan
*/
- private def carbonRawScan(projectList: Seq[NamedExpression],
- predicates: Seq[Expression],
- logicalRelation: LogicalRelation)(sc: SQLContext): SparkPlan = {
+ private def carbonRawScan(projectListRaw: Seq[NamedExpression],
+ predicates: Seq[Expression],
+ logicalRelation: LogicalRelation)(sc: SQLContext): SparkPlan = {
val relation = logicalRelation.relation.asInstanceOf[CarbonDatasourceRelation]
val tableName: String =
relation.carbonRelation.metaData.carbonTable.getFactTableName.toLowerCase
// Check out any expressions are there in project list. if they are present then we need to
// decode them as well.
+
+ val projectList = projectListRaw.map {p =>
+ p.transform {
+ case CustomDeterministicExpression(exp) => exp
+ }
+ }.asInstanceOf[Seq[NamedExpression]]
val newProjectList = projectList.map { element =>
element match {
case a@Alias(s: ScalaUDF, name)
if (name.equalsIgnoreCase(CarbonCommonConstants.POSITION_ID) ||
- name.equalsIgnoreCase(
- CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)) =>
+ name.equalsIgnoreCase(
+ CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)) =>
AttributeReference(name, StringType, true)().withExprId(a.exprId)
case other => other
}
@@ -154,8 +160,8 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] {
* Create carbon scan for star query
*/
private def carbonRawScanForStarQuery(projectList: Seq[NamedExpression],
- predicates: Seq[Expression],
- logicalRelation: LogicalRelation)(sc: SQLContext): SparkPlan = {
+ predicates: Seq[Expression],
+ logicalRelation: LogicalRelation)(sc: SQLContext): SparkPlan = {
val relation = logicalRelation.relation.asInstanceOf[CarbonDatasourceRelation]
val tableName: String =
relation.carbonRelation.metaData.carbonTable.getFactTableName.toLowerCase
@@ -194,10 +200,10 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] {
}
def getCarbonDecoder(logicalRelation: LogicalRelation,
- sc: SQLContext,
- tableName: String,
- projectExprsNeedToDecode: Seq[Attribute],
- scan: CarbonScan): CarbonDictionaryDecoder = {
+ sc: SQLContext,
+ tableName: String,
+ projectExprsNeedToDecode: Seq[Attribute],
+ scan: CarbonScan): CarbonDictionaryDecoder = {
val relation = CarbonDecoderRelation(logicalRelation.attributeMap,
logicalRelation.relation.asInstanceOf[CarbonDatasourceRelation])
val attrs = projectExprsNeedToDecode.map { attr =>
@@ -227,7 +233,7 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] {
relation: CarbonDatasourceRelation,
allAttrsNotDecode: util.Set[Attribute]): AttributeReference = {
if (relation.carbonRelation.metaData.dictionaryMap.get(attr.name).getOrElse(false) &&
- !allAttrsNotDecode.asScala.exists(p => p.name.equals(attr.name))) {
+ !allAttrsNotDecode.asScala.exists(p => p.name.equals(attr.name))) {
AttributeReference(attr.name,
IntegerType,
attr.nullable,
@@ -240,7 +246,7 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] {
private def isStarQuery(plan: LogicalPlan) = {
plan match {
case LogicalFilter(condition, l: LogicalRelation)
- if l.relation.isInstanceOf[CarbonDatasourceRelation] =>
+ if l.relation.isInstanceOf[CarbonDatasourceRelation] =>
true
case l: LogicalRelation if l.relation.isInstanceOf[CarbonDatasourceRelation] => true
case _ => false
@@ -252,7 +258,7 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case DropTable(tableName, ifNotExists)
if CarbonEnv.get.carbonMetastore
- .isTablePathExists(toTableIdentifier(tableName.toLowerCase))(sqlContext) =>
+ .isTablePathExists(toTableIdentifier(tableName.toLowerCase))(sqlContext) =>
val identifier = toTableIdentifier(tableName.toLowerCase)
ExecutedCommand(DropTableCommand(ifNotExists, identifier.database, identifier.table)) :: Nil
case ShowLoadsCommand(databaseName, table, limit) =>
@@ -260,7 +266,7 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] {
case LoadTable(databaseNameOp, tableName, factPathFromUser, dimFilesPath,
options, isOverwriteExist, inputSqlString, dataFrame, _) =>
val isCarbonTable = CarbonEnv.get.carbonMetastore
- .tableExists(TableIdentifier(tableName, databaseNameOp))(sqlContext)
+ .tableExists(TableIdentifier(tableName, databaseNameOp))(sqlContext)
if (isCarbonTable || options.nonEmpty) {
ExecutedCommand(LoadTable(databaseNameOp, tableName, factPathFromUser, dimFilesPath,
options, isOverwriteExist, inputSqlString, dataFrame)) :: Nil
@@ -269,15 +275,15 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] {
}
case alterTable@AlterTableCompaction(altertablemodel) =>
val isCarbonTable = CarbonEnv.get.carbonMetastore
- .tableExists(TableIdentifier(altertablemodel.tableName,
- altertablemodel.dbName))(sqlContext)
+ .tableExists(TableIdentifier(altertablemodel.tableName,
+ altertablemodel.dbName))(sqlContext)
if (isCarbonTable) {
if (altertablemodel.compactionType.equalsIgnoreCase("minor") ||
altertablemodel.compactionType.equalsIgnoreCase("major")) {
ExecutedCommand(alterTable) :: Nil
} else {
throw new MalformedCarbonCommandException(
- "Unsupported alter operation on carbon table")
+ "Unsupported alter operation on carbon table")
}
} else {
ExecutedCommand(HiveNativeCommand(altertablemodel.alterSql)) :: Nil
@@ -305,7 +311,7 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] {
}
case DescribeFormattedCommand(sql, tblIdentifier) =>
val isTable = CarbonEnv.get.carbonMetastore
- .tableExists(tblIdentifier)(sqlContext)
+ .tableExists(tblIdentifier)(sqlContext)
if (isTable) {
val describe =
LogicalDescribeCommand(UnresolvedRelation(tblIdentifier, None), isExtended = false)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8b38e0b3/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
index 02ac5f8..914203f 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
@@ -59,7 +59,7 @@ object CarbonOptimizer {
}
}
-// get the carbon relation from plan.
+ // get the carbon relation from plan.
def collectCarbonRelation(plan: LogicalPlan): Seq[CarbonDecoderRelation] = {
plan collect {
case l: LogicalRelation if l.relation.isInstanceOf[CarbonDatasourceRelation] =>
@@ -73,7 +73,7 @@ object CarbonOptimizer {
* decoder plan.
*/
class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation])
- extends Rule[LogicalPlan] with PredicateHelper {
+ extends Rule[LogicalPlan] with PredicateHelper {
val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
def apply(logicalPlan: LogicalPlan): LogicalPlan = {
if (relations.nonEmpty && !isOptimized(logicalPlan)) {
@@ -101,7 +101,7 @@ class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation])
val newPlan = updatePlan transform {
case Project(pList, child) if (!isTransformed) =>
val (dest: Seq[NamedExpression], source: Seq[NamedExpression]) = pList
- .splitAt(pList.size - cols.size)
+ .splitAt(pList.size - cols.size)
val diff = cols.diff(dest.map(_.name))
if (diff.size > 0) {
sys.error(s"Unknown column(s) ${diff.mkString(",")} in table ${table.tableName}")
@@ -284,7 +284,7 @@ class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation])
case union: Union
if !(union.left.isInstanceOf[CarbonDictionaryTempDecoder] ||
- union.right.isInstanceOf[CarbonDictionaryTempDecoder]) =>
+ union.right.isInstanceOf[CarbonDictionaryTempDecoder]) =>
val leftCondAttrs = new util.HashSet[AttributeReferenceWrapper]
val rightCondAttrs = new util.HashSet[AttributeReferenceWrapper]
val leftLocalAliasMap = CarbonAliasDecoderRelation()
@@ -369,7 +369,7 @@ class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation])
}
} else {
CarbonFilters
- .selectFilters(splitConjunctivePredicates(filter.condition), attrsOnConds, aliasMap)
+ .selectFilters(splitConjunctivePredicates(filter.condition), attrsOnConds, aliasMap)
}
var child = filter.child
@@ -391,7 +391,7 @@ class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation])
case j: Join
if !(j.left.isInstanceOf[CarbonDictionaryTempDecoder] ||
- j.right.isInstanceOf[CarbonDictionaryTempDecoder]) =>
+ j.right.isInstanceOf[CarbonDictionaryTempDecoder]) =>
val attrsOnJoin = new util.HashSet[Attribute]
j.condition match {
case Some(expression) =>
@@ -706,7 +706,38 @@ class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation])
if profile.isInstanceOf[IncludeProfile] && profile.isEmpty =>
child
}
- finalPlan
+ val updateDtrFn = finalPlan transform {
+ case p@Project(projectList: Seq[NamedExpression], cd) =>
+ if (cd.isInstanceOf[Filter] || cd.isInstanceOf[LogicalRelation]) {
+ p.transformAllExpressions {
+ case a@Alias(exp, _)
+ if !exp.deterministic && !exp.isInstanceOf[CustomDeterministicExpression] =>
+ Alias(CustomDeterministicExpression(exp), a.name)(a.exprId, a.qualifiers,
+ a.explicitMetadata)
+ case exp: NamedExpression
+ if !exp.deterministic && !exp.isInstanceOf[CustomDeterministicExpression] =>
+ CustomDeterministicExpression(exp)
+ }
+ } else {
+ p
+ }
+ case f@Filter(condition: Expression, cd) =>
+ if (cd.isInstanceOf[Project] || cd.isInstanceOf[LogicalRelation]) {
+ f.transformAllExpressions {
+ case a@Alias(exp, _)
+ if !exp.deterministic && !exp.isInstanceOf[CustomDeterministicExpression] =>
+ Alias(CustomDeterministicExpression(exp), a.name)(a.exprId, a.qualifiers,
+ a.explicitMetadata)
+ case exp: NamedExpression
+ if !exp.deterministic && !exp.isInstanceOf[CustomDeterministicExpression] =>
+ CustomDeterministicExpression(exp)
+ }
+ } else {
+ f
+ }
+ }
+
+ updateDtrFn
}
private def collectInformationOnAttributes(plan: LogicalPlan,
@@ -812,14 +843,14 @@ case class CarbonDecoderRelation(
def contains(attr: Attribute): Boolean = {
val exists =
attributeMap.exists(entry => entry._1.name.equalsIgnoreCase(attr.name) &&
- entry._1.exprId.equals(attr.exprId)) ||
- extraAttrs.exists(entry => entry.name.equalsIgnoreCase(attr.name) &&
- entry.exprId.equals(attr.exprId))
+ entry._1.exprId.equals(attr.exprId)) ||
+ extraAttrs.exists(entry => entry.name.equalsIgnoreCase(attr.name) &&
+ entry.exprId.equals(attr.exprId))
exists
}
def fillAttributeMap(attrMap: java.util.HashMap[AttributeReferenceWrapper,
- CarbonDecoderRelation]): Unit = {
+ CarbonDecoderRelation]): Unit = {
attributeMap.foreach { attr =>
attrMap.put(AttributeReferenceWrapper(attr._1), this)
}
@@ -827,3 +858,5 @@ case class CarbonDecoderRelation(
lazy val dictionaryMap = carbonRelation.carbonRelation.metaData.dictionaryMap
}
+
+
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8b38e0b3/integration/spark2/src/main/scala/org/apache/spark/sql/CustomDeterministicExpression.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CustomDeterministicExpression.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CustomDeterministicExpression.scala
new file mode 100644
index 0000000..6312746
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CustomDeterministicExpression.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
+import org.apache.spark.sql.types.{DataType, StringType}
+
+/**
+ * Custom expression to override the deterministic property .
+ */
+case class CustomDeterministicExpression(nonDt: Expression ) extends Expression with Serializable{
+ override def nullable: Boolean = true
+
+ override def eval(input: InternalRow): Any = null
+
+ override def dataType: DataType = StringType
+
+ override def children: Seq[Expression] = Seq()
+
+ override def deterministic: Boolean = true
+
+ def childexp : Expression = nonDt
+
+ override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = ev.copy("")
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8b38e0b3/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
index eac0a28..bc09067 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning}
import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.spark.sql.optimizer.CarbonDecoderRelation
+import org.apache.spark.sql.optimizer.{CarbonDecoderRelation}
import org.apache.spark.sql.sources.{BaseRelation, Filter}
import org.apache.spark.sql.types.{AtomicType, IntegerType, StringType}
@@ -59,7 +59,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
filters,
(a, f, needDecoder) => toCatalystRDD(l, a, relation.buildScan(
a.map(_.name).toArray, f), needDecoder)) ::
- Nil
+ Nil
case CarbonDictionaryCatalystDecoder(relations, profile, aliasMap, _, child) =>
if ((profile.isInstanceOf[IncludeProfile] && profile.isEmpty) ||
!CarbonDictionaryDecoder.
@@ -139,10 +139,15 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
protected def pruneFilterProjectRaw(
relation: LogicalRelation,
- projects: Seq[NamedExpression],
+ rawProjects: Seq[NamedExpression],
filterPredicates: Seq[Expression],
scanBuilder: (Seq[Attribute], Seq[Expression], Seq[Filter],
ArrayBuffer[AttributeReference]) => RDD[InternalRow]) = {
+ val projects = rawProjects.map {p =>
+ p.transform {
+ case CustomDeterministicExpression(exp) => exp
+ }
+ }.asInstanceOf[Seq[NamedExpression]]
val projectSet = AttributeSet(projects.flatMap(_.references))
val filterSet = AttributeSet(filterPredicates.flatMap(_.references))
@@ -162,7 +167,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
val handledPredicates = filterPredicates.filterNot(unhandledPredicates.contains)
val unhandledSet = AttributeSet(unhandledPredicates.flatMap(_.references))
AttributeSet(handledPredicates.flatMap(_.references)) --
- (projectSet ++ unhandledSet).map(relation.attributeMap)
+ (projectSet ++ unhandledSet).map(relation.attributeMap)
}
// Combines all Catalyst filter `Expression`s that are either not convertible to data source
@@ -213,12 +218,12 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
// when the columns of this projection are enough to evaluate all filter conditions,
// just do a scan followed by a filter, with no extra project.
val requestedColumns = projects
- // Safe due to if above.
- .asInstanceOf[Seq[Attribute]]
- // Match original case of attributes.
- .map(relation.attributeMap)
- // Don't request columns that are only referenced by pushed filters.
- .filterNot(handledSet.contains)
+ // Safe due to if above.
+ .asInstanceOf[Seq[Attribute]]
+ // Match original case of attributes.
+ .map(relation.attributeMap)
+ // Don't request columns that are only referenced by pushed filters.
+ .filterNot(handledSet.contains)
val updateRequestedColumns = updateRequestedColumnsFunc(requestedColumns, table, needDecoder)
val updateProject = projects.map { expr =>
@@ -227,7 +232,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
val dict = map.get(attr.name)
if (dict.isDefined && dict.get) {
attr = AttributeReference(attr.name, IntegerType, attr.nullable, attr.metadata)(attr
- .exprId, attr.qualifier)
+ .exprId, attr.qualifier)
}
}
attr
@@ -245,17 +250,17 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
var newProjectList: Seq[Attribute] = Seq.empty
val updatedProjects = projects.map {
- case a@Alias(s: ScalaUDF, name)
- if name.equalsIgnoreCase(CarbonCommonConstants.POSITION_ID) ||
- name.equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID) =>
- val reference = AttributeReference(name, StringType, true)().withExprId(a.exprId)
- newProjectList :+= reference
- reference
- case other => other
+ case a@Alias(s: ScalaUDF, name)
+ if name.equalsIgnoreCase(CarbonCommonConstants.POSITION_ID) ||
+ name.equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID) =>
+ val reference = AttributeReference(name, StringType, true)().withExprId(a.exprId)
+ newProjectList :+= reference
+ reference
+ case other => other
}
// Don't request columns that are only referenced by pushed filters.
val requestedColumns =
- (projectSet ++ filterSet -- handledSet).map(relation.attributeMap).toSeq ++ newProjectList
+ (projectSet ++ filterSet -- handledSet).map(relation.attributeMap).toSeq ++ newProjectList
val updateRequestedColumns = updateRequestedColumnsFunc(requestedColumns, table, needDecoder)
val scan = getDataSourceScan(relation,
updateRequestedColumns.asInstanceOf[Seq[Attribute]],
@@ -454,9 +459,9 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
case c@EqualTo(Literal(v, t), Cast(a: Attribute, _)) =>
CastExpressionOptimization.checkIfCastCanBeRemove(c)
case Not(EqualTo(a: Attribute, Literal(v, t))) =>
- Some(sources.Not(sources.EqualTo(a.name, v)))
+ Some(sources.Not(sources.EqualTo(a.name, v)))
case Not(EqualTo(Literal(v, t), a: Attribute)) =>
- Some(sources.Not(sources.EqualTo(a.name, v)))
+ Some(sources.Not(sources.EqualTo(a.name, v)))
case c@Not(EqualTo(Cast(a: Attribute, _), Literal(v, t))) =>
CastExpressionOptimization.checkIfCastCanBeRemove(c)
case c@Not(EqualTo(Literal(v, t), Cast(a: Attribute, _))) =>
@@ -534,6 +539,6 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
val supportCodegen =
sqlContext.conf.wholeStageEnabled && sqlContext.conf.wholeStageMaxNumFields >= cols.size
supportCodegen && vectorizedReader.toBoolean &&
- cols.forall(_.dataType.isInstanceOf[AtomicType])
+ cols.forall(_.dataType.isInstanceOf[AtomicType])
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8b38e0b3/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
index 0dca0d4..c6dd905 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
@@ -51,7 +51,7 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper {
plan collect {
case l: LogicalRelation if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
CarbonDecoderRelation(l.attributeMap,
- l.relation.asInstanceOf[CarbonDatasourceHadoopRelation])
+ l.relation.asInstanceOf[CarbonDatasourceHadoopRelation])
}
}
@@ -94,7 +94,7 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper {
val newCols = cols.map {
case a@Alias(s: ScalaUDF, name)
if name.equalsIgnoreCase(CarbonCommonConstants.POSITION_ID) ||
- name.equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID) =>
+ name.equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID) =>
udfExists = true
projectionToBeAdded :+= a
AttributeReference(name, StringType, nullable = true)().withExprId(a.exprId)
@@ -311,7 +311,7 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper {
)
if (hasCarbonRelation(child) && condAttrs.size() > 0 &&
- !child.isInstanceOf[CarbonDictionaryCatalystDecoder]) {
+ !child.isInstanceOf[CarbonDictionaryCatalystDecoder]) {
CarbonDictionaryTempDecoder(condAttrs,
new util.HashSet[AttributeReferenceWrapper](),
child, false, Some(localAliasMap))
@@ -389,7 +389,7 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper {
Filter(filter.condition, child)
}
- case j: Join
+ case j: Join
if !(j.left.isInstanceOf[CarbonDictionaryTempDecoder] ||
j.right.isInstanceOf[CarbonDictionaryTempDecoder]) =>
val attrsOnJoin = new util.HashSet[Attribute]
@@ -720,7 +720,39 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper {
cd
}
}
- finalPlan
+
+ val updateDtrFn = finalPlan transform {
+ case p@Project(projectList: Seq[NamedExpression], cd) =>
+ if (cd.isInstanceOf[Filter] || cd.isInstanceOf[LogicalRelation]) {
+ p.transformAllExpressions {
+ case a@Alias(exp, _)
+ if !exp.deterministic && !exp.isInstanceOf[CustomDeterministicExpression] =>
+ Alias(CustomDeterministicExpression(exp), a.name)(a.exprId, a.qualifier,
+ a.explicitMetadata, a.isGenerated)
+ case exp: NamedExpression
+ if !exp.deterministic && !exp.isInstanceOf[CustomDeterministicExpression] =>
+ CustomDeterministicExpression(exp)
+ }
+ } else {
+ p
+ }
+ case f@Filter(condition: Expression, cd) =>
+ if (cd.isInstanceOf[Project] || cd.isInstanceOf[LogicalRelation]) {
+ f.transformAllExpressions {
+ case a@Alias(exp, _)
+ if !exp.deterministic && !exp.isInstanceOf[CustomDeterministicExpression] =>
+ Alias(CustomDeterministicExpression(exp), a.name)(a.exprId, a.qualifier,
+ a.explicitMetadata, a.isGenerated)
+ case exp: NamedExpression
+ if !exp.deterministic && !exp.isInstanceOf[CustomDeterministicExpression] =>
+ CustomDeterministicExpression(exp)
+ }
+ } else {
+ f
+ }
+ }
+
+ updateDtrFn
}
private def collectInformationOnAttributes(plan: LogicalPlan,
@@ -841,3 +873,4 @@ case class CarbonDecoderRelation(
lazy val dictionaryMap = carbonRelation.carbonRelation.metaData.dictionaryMap
}
+