You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/09/18 10:01:33 UTC

[14/51] [abbrv] carbondata git commit: [CARBONDATA-649] fix for update with rand function

[CARBONDATA-649] fix for update with rand function

This closes #1296


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/8b38e0b3
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/8b38e0b3
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/8b38e0b3

Branch: refs/heads/branch-1.2
Commit: 8b38e0b3844d2220d6639d25bcafbab7a7af75f7
Parents: 590bbb9
Author: ashwini-krishnakumar <as...@gmail.com>
Authored: Thu Sep 7 07:36:32 2017 +0000
Committer: Ravindra Pesala <ra...@gmail.com>
Committed: Mon Sep 11 14:07:09 2017 +0530

----------------------------------------------------------------------
 .../iud/UpdateCarbonTableTestCase.scala         | 30 +++++++++++
 .../sql/CustomDeterministicExpression.scala     | 41 +++++++++++++++
 .../spark/sql/hive/CarbonStrategies.scala       | 52 ++++++++++--------
 .../spark/sql/optimizer/CarbonOptimizer.scala   | 55 ++++++++++++++++----
 .../sql/CustomDeterministicExpression.scala     | 42 +++++++++++++++
 .../execution/CarbonLateDecodeStrategy.scala    | 49 +++++++++--------
 .../sql/optimizer/CarbonLateDecodeRule.scala    | 43 +++++++++++++--
 7 files changed, 251 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/8b38e0b3/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
index 623416b..4186fa2 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
@@ -448,6 +448,36 @@ class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
     sql("DROP TABLE IF EXISTS default.carbon1")
   }
 
+  test("update table in carbondata with rand() ") {
+
+    sql("""CREATE TABLE iud.rand(imei string,age int,task bigint,num double,level decimal(10,3),name string)STORED BY 'org.apache.carbondata.format' """)
+    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/update01.csv' INTO TABLE iud.rand OPTIONS('DELIMITER'=',' , 'QUOTECHAR'='"','BAD_RECORDS_ACTION'='FORCE','FILEHEADER'='imei,age,task,num,level,name')""").collect
+
+    sql("select substring(name,1,2 ) , name ,getTupleId() as tupleId , rand()  from  iud.rand").show(100)
+
+    sql("select name , substring(name,1,2 ) ,getTupleId() as tupleId , num , rand() from  iud.rand").show(100)
+
+    sql("Update  rand set (num) = (rand())").show()
+
+    sql("Update  rand set (num) = (rand(9))").show()
+
+    sql("Update  rand set (name) = ('Lily')").show()
+
+    sql("select name ,  num from  iud.rand").show(100)
+
+    sql("select  imei , age , name , num  from  iud.rand").show(100)
+
+    sql("select rand() , getTupleId() as tupleId from  iud.rand").show(100)
+
+    sql("select * from  iud.rand").show(100)
+
+    sql("select  imei , rand() , num from  iud.rand").show(100)
+
+    sql("select  name , rand()  from  iud.rand").show(100)
+
+    sql("DROP TABLE IF EXISTS iud.rand")
+  }
+
   override def afterAll {
     sql("use default")
     sql("drop database  if exists iud cascade")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8b38e0b3/integration/spark/src/main/scala/org/apache/spark/sql/CustomDeterministicExpression.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CustomDeterministicExpression.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CustomDeterministicExpression.scala
new file mode 100644
index 0000000..d745be2
--- /dev/null
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CustomDeterministicExpression.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodeGenContext, GeneratedExpressionCode}
+import org.apache.spark.sql.types.{DataType, StringType}
+
+/**
+ * Custom expression to override the deterministic property
+ *
+ */
+case class CustomDeterministicExpression(nonDt: Expression ) extends Expression with Serializable{
+  override def nullable: Boolean = true
+
+  override def eval(input: InternalRow): Any = null
+
+  override protected def genCode(ctx: CodeGenContext,
+      ev: GeneratedExpressionCode): String = ev.code
+  override def deterministic: Boolean = true
+
+  override def dataType: DataType = StringType
+
+  override def children: Seq[Expression] = Seq()
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8b38e0b3/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
index 13ff2a9..204225b 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
@@ -34,7 +34,7 @@ import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.execution.datasources.{DescribeCommand => LogicalDescribeCommand, LogicalRelation}
 import org.apache.spark.sql.hive.execution.{DropTable, HiveNativeCommand}
 import org.apache.spark.sql.hive.execution.command._
-import org.apache.spark.sql.optimizer.CarbonDecoderRelation
+import org.apache.spark.sql.optimizer.{CarbonDecoderRelation}
 import org.apache.spark.sql.types.IntegerType
 import org.apache.spark.sql.types.StringType
 
@@ -63,15 +63,15 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] {
     def apply(plan: LogicalPlan): Seq[SparkPlan] = {
       plan match {
         case PhysicalOperation(projectList, predicates, l: LogicalRelation)
-            if l.relation.isInstanceOf[CarbonDatasourceRelation] =>
+          if l.relation.isInstanceOf[CarbonDatasourceRelation] =>
           if (isStarQuery(plan)) {
             carbonRawScanForStarQuery(projectList, predicates, l)(sqlContext) :: Nil
           } else {
             carbonRawScan(projectList, predicates, l)(sqlContext) :: Nil
           }
         case InsertIntoCarbonTable(relation: CarbonDatasourceRelation,
-            _, child: LogicalPlan, overwrite, _) =>
-            ExecutedCommand(LoadTableByInsert(relation, child, overwrite)) :: Nil
+        _, child: LogicalPlan, overwrite, _) =>
+          ExecutedCommand(LoadTableByInsert(relation, child, overwrite)) :: Nil
         case CarbonDictionaryCatalystDecoder(relations, profile, aliasMap, _, child) =>
           CarbonDictionaryDecoder(relations,
             profile,
@@ -85,21 +85,27 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] {
     /**
      * Create carbon scan
      */
-    private def carbonRawScan(projectList: Seq[NamedExpression],
-      predicates: Seq[Expression],
-      logicalRelation: LogicalRelation)(sc: SQLContext): SparkPlan = {
+    private def carbonRawScan(projectListRaw: Seq[NamedExpression],
+        predicates: Seq[Expression],
+        logicalRelation: LogicalRelation)(sc: SQLContext): SparkPlan = {
 
       val relation = logicalRelation.relation.asInstanceOf[CarbonDatasourceRelation]
       val tableName: String =
         relation.carbonRelation.metaData.carbonTable.getFactTableName.toLowerCase
       // Check out any expressions are there in project list. if they are present then we need to
       // decode them as well.
+
+      val projectList = projectListRaw.map {p =>
+        p.transform {
+          case CustomDeterministicExpression(exp) => exp
+        }
+      }.asInstanceOf[Seq[NamedExpression]]
       val newProjectList = projectList.map { element =>
         element match {
           case a@Alias(s: ScalaUDF, name)
             if (name.equalsIgnoreCase(CarbonCommonConstants.POSITION_ID) ||
-              name.equalsIgnoreCase(
-                CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)) =>
+                name.equalsIgnoreCase(
+                  CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)) =>
             AttributeReference(name, StringType, true)().withExprId(a.exprId)
           case other => other
         }
@@ -154,8 +160,8 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] {
      * Create carbon scan for star query
      */
     private def carbonRawScanForStarQuery(projectList: Seq[NamedExpression],
-      predicates: Seq[Expression],
-      logicalRelation: LogicalRelation)(sc: SQLContext): SparkPlan = {
+        predicates: Seq[Expression],
+        logicalRelation: LogicalRelation)(sc: SQLContext): SparkPlan = {
       val relation = logicalRelation.relation.asInstanceOf[CarbonDatasourceRelation]
       val tableName: String =
         relation.carbonRelation.metaData.carbonTable.getFactTableName.toLowerCase
@@ -194,10 +200,10 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] {
     }
 
     def getCarbonDecoder(logicalRelation: LogicalRelation,
-      sc: SQLContext,
-      tableName: String,
-      projectExprsNeedToDecode: Seq[Attribute],
-      scan: CarbonScan): CarbonDictionaryDecoder = {
+        sc: SQLContext,
+        tableName: String,
+        projectExprsNeedToDecode: Seq[Attribute],
+        scan: CarbonScan): CarbonDictionaryDecoder = {
       val relation = CarbonDecoderRelation(logicalRelation.attributeMap,
         logicalRelation.relation.asInstanceOf[CarbonDatasourceRelation])
       val attrs = projectExprsNeedToDecode.map { attr =>
@@ -227,7 +233,7 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] {
         relation: CarbonDatasourceRelation,
         allAttrsNotDecode: util.Set[Attribute]): AttributeReference = {
       if (relation.carbonRelation.metaData.dictionaryMap.get(attr.name).getOrElse(false) &&
-        !allAttrsNotDecode.asScala.exists(p => p.name.equals(attr.name))) {
+          !allAttrsNotDecode.asScala.exists(p => p.name.equals(attr.name))) {
         AttributeReference(attr.name,
           IntegerType,
           attr.nullable,
@@ -240,7 +246,7 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] {
     private def isStarQuery(plan: LogicalPlan) = {
       plan match {
         case LogicalFilter(condition, l: LogicalRelation)
-            if l.relation.isInstanceOf[CarbonDatasourceRelation] =>
+          if l.relation.isInstanceOf[CarbonDatasourceRelation] =>
           true
         case l: LogicalRelation if l.relation.isInstanceOf[CarbonDatasourceRelation] => true
         case _ => false
@@ -252,7 +258,7 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] {
     def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
       case DropTable(tableName, ifNotExists)
         if CarbonEnv.get.carbonMetastore
-            .isTablePathExists(toTableIdentifier(tableName.toLowerCase))(sqlContext) =>
+          .isTablePathExists(toTableIdentifier(tableName.toLowerCase))(sqlContext) =>
         val identifier = toTableIdentifier(tableName.toLowerCase)
         ExecutedCommand(DropTableCommand(ifNotExists, identifier.database, identifier.table)) :: Nil
       case ShowLoadsCommand(databaseName, table, limit) =>
@@ -260,7 +266,7 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] {
       case LoadTable(databaseNameOp, tableName, factPathFromUser, dimFilesPath,
       options, isOverwriteExist, inputSqlString, dataFrame, _) =>
         val isCarbonTable = CarbonEnv.get.carbonMetastore
-            .tableExists(TableIdentifier(tableName, databaseNameOp))(sqlContext)
+          .tableExists(TableIdentifier(tableName, databaseNameOp))(sqlContext)
         if (isCarbonTable || options.nonEmpty) {
           ExecutedCommand(LoadTable(databaseNameOp, tableName, factPathFromUser, dimFilesPath,
             options, isOverwriteExist, inputSqlString, dataFrame)) :: Nil
@@ -269,15 +275,15 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] {
         }
       case alterTable@AlterTableCompaction(altertablemodel) =>
         val isCarbonTable = CarbonEnv.get.carbonMetastore
-            .tableExists(TableIdentifier(altertablemodel.tableName,
-                 altertablemodel.dbName))(sqlContext)
+          .tableExists(TableIdentifier(altertablemodel.tableName,
+            altertablemodel.dbName))(sqlContext)
         if (isCarbonTable) {
           if (altertablemodel.compactionType.equalsIgnoreCase("minor") ||
               altertablemodel.compactionType.equalsIgnoreCase("major")) {
             ExecutedCommand(alterTable) :: Nil
           } else {
             throw new MalformedCarbonCommandException(
-                "Unsupported alter operation on carbon table")
+              "Unsupported alter operation on carbon table")
           }
         } else {
           ExecutedCommand(HiveNativeCommand(altertablemodel.alterSql)) :: Nil
@@ -305,7 +311,7 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] {
         }
       case DescribeFormattedCommand(sql, tblIdentifier) =>
         val isTable = CarbonEnv.get.carbonMetastore
-            .tableExists(tblIdentifier)(sqlContext)
+          .tableExists(tblIdentifier)(sqlContext)
         if (isTable) {
           val describe =
             LogicalDescribeCommand(UnresolvedRelation(tblIdentifier, None), isExtended = false)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8b38e0b3/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
index 02ac5f8..914203f 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
@@ -59,7 +59,7 @@ object CarbonOptimizer {
     }
   }
 
-// get the carbon relation from plan.
+  // get the carbon relation from plan.
   def collectCarbonRelation(plan: LogicalPlan): Seq[CarbonDecoderRelation] = {
     plan collect {
       case l: LogicalRelation if l.relation.isInstanceOf[CarbonDatasourceRelation] =>
@@ -73,7 +73,7 @@ object CarbonOptimizer {
  * decoder plan.
  */
 class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation])
-    extends Rule[LogicalPlan] with PredicateHelper {
+  extends Rule[LogicalPlan] with PredicateHelper {
   val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
   def apply(logicalPlan: LogicalPlan): LogicalPlan = {
     if (relations.nonEmpty && !isOptimized(logicalPlan)) {
@@ -101,7 +101,7 @@ class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation])
         val newPlan = updatePlan transform {
           case Project(pList, child) if (!isTransformed) =>
             val (dest: Seq[NamedExpression], source: Seq[NamedExpression]) = pList
-                .splitAt(pList.size - cols.size)
+              .splitAt(pList.size - cols.size)
             val diff = cols.diff(dest.map(_.name))
             if (diff.size > 0) {
               sys.error(s"Unknown column(s) ${diff.mkString(",")} in table ${table.tableName}")
@@ -284,7 +284,7 @@ class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation])
 
         case union: Union
           if !(union.left.isInstanceOf[CarbonDictionaryTempDecoder] ||
-              union.right.isInstanceOf[CarbonDictionaryTempDecoder]) =>
+               union.right.isInstanceOf[CarbonDictionaryTempDecoder]) =>
           val leftCondAttrs = new util.HashSet[AttributeReferenceWrapper]
           val rightCondAttrs = new util.HashSet[AttributeReferenceWrapper]
           val leftLocalAliasMap = CarbonAliasDecoderRelation()
@@ -369,7 +369,7 @@ class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation])
             }
           } else {
             CarbonFilters
-                .selectFilters(splitConjunctivePredicates(filter.condition), attrsOnConds, aliasMap)
+              .selectFilters(splitConjunctivePredicates(filter.condition), attrsOnConds, aliasMap)
           }
 
           var child = filter.child
@@ -391,7 +391,7 @@ class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation])
 
         case j: Join
           if !(j.left.isInstanceOf[CarbonDictionaryTempDecoder] ||
-              j.right.isInstanceOf[CarbonDictionaryTempDecoder]) =>
+               j.right.isInstanceOf[CarbonDictionaryTempDecoder]) =>
           val attrsOnJoin = new util.HashSet[Attribute]
           j.condition match {
             case Some(expression) =>
@@ -706,7 +706,38 @@ class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation])
         if profile.isInstanceOf[IncludeProfile] && profile.isEmpty =>
         child
     }
-    finalPlan
+    val updateDtrFn = finalPlan transform {
+      case p@Project(projectList: Seq[NamedExpression], cd) =>
+        if (cd.isInstanceOf[Filter] || cd.isInstanceOf[LogicalRelation]) {
+          p.transformAllExpressions {
+            case a@Alias(exp, _)
+              if !exp.deterministic && !exp.isInstanceOf[CustomDeterministicExpression] =>
+              Alias(CustomDeterministicExpression(exp), a.name)(a.exprId, a.qualifiers,
+                a.explicitMetadata)
+            case exp: NamedExpression
+              if !exp.deterministic && !exp.isInstanceOf[CustomDeterministicExpression] =>
+              CustomDeterministicExpression(exp)
+          }
+        } else {
+          p
+        }
+      case f@Filter(condition: Expression, cd) =>
+        if (cd.isInstanceOf[Project] || cd.isInstanceOf[LogicalRelation]) {
+          f.transformAllExpressions {
+            case a@Alias(exp, _)
+              if !exp.deterministic && !exp.isInstanceOf[CustomDeterministicExpression] =>
+              Alias(CustomDeterministicExpression(exp), a.name)(a.exprId, a.qualifiers,
+                a.explicitMetadata)
+            case exp: NamedExpression
+              if !exp.deterministic && !exp.isInstanceOf[CustomDeterministicExpression] =>
+              CustomDeterministicExpression(exp)
+          }
+        } else {
+          f
+        }
+    }
+
+    updateDtrFn
   }
 
   private def collectInformationOnAttributes(plan: LogicalPlan,
@@ -812,14 +843,14 @@ case class CarbonDecoderRelation(
   def contains(attr: Attribute): Boolean = {
     val exists =
       attributeMap.exists(entry => entry._1.name.equalsIgnoreCase(attr.name) &&
-          entry._1.exprId.equals(attr.exprId)) ||
-          extraAttrs.exists(entry => entry.name.equalsIgnoreCase(attr.name) &&
-              entry.exprId.equals(attr.exprId))
+                                   entry._1.exprId.equals(attr.exprId)) ||
+      extraAttrs.exists(entry => entry.name.equalsIgnoreCase(attr.name) &&
+                                 entry.exprId.equals(attr.exprId))
     exists
   }
 
   def fillAttributeMap(attrMap: java.util.HashMap[AttributeReferenceWrapper,
-      CarbonDecoderRelation]): Unit = {
+    CarbonDecoderRelation]): Unit = {
     attributeMap.foreach { attr =>
       attrMap.put(AttributeReferenceWrapper(attr._1), this)
     }
@@ -827,3 +858,5 @@ case class CarbonDecoderRelation(
 
   lazy val dictionaryMap = carbonRelation.carbonRelation.metaData.dictionaryMap
 }
+
+

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8b38e0b3/integration/spark2/src/main/scala/org/apache/spark/sql/CustomDeterministicExpression.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CustomDeterministicExpression.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CustomDeterministicExpression.scala
new file mode 100644
index 0000000..6312746
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CustomDeterministicExpression.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
+import org.apache.spark.sql.types.{DataType, StringType}
+
+/**
+ * Custom expression to override the deterministic property .
+ */
+case class CustomDeterministicExpression(nonDt: Expression ) extends Expression with Serializable{
+  override def nullable: Boolean = true
+
+  override def eval(input: InternalRow): Any = null
+
+  override def dataType: DataType = StringType
+
+  override def children: Seq[Expression] = Seq()
+
+  override def deterministic: Boolean = true
+
+  def childexp : Expression = nonDt
+
+  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = ev.copy("")
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8b38e0b3/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
index eac0a28..bc09067 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning}
 import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.spark.sql.optimizer.CarbonDecoderRelation
+import org.apache.spark.sql.optimizer.{CarbonDecoderRelation}
 import org.apache.spark.sql.sources.{BaseRelation, Filter}
 import org.apache.spark.sql.types.{AtomicType, IntegerType, StringType}
 
@@ -59,7 +59,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
           filters,
           (a, f, needDecoder) => toCatalystRDD(l, a, relation.buildScan(
             a.map(_.name).toArray, f), needDecoder)) ::
-            Nil
+        Nil
       case CarbonDictionaryCatalystDecoder(relations, profile, aliasMap, _, child) =>
         if ((profile.isInstanceOf[IncludeProfile] && profile.isEmpty) ||
             !CarbonDictionaryDecoder.
@@ -139,10 +139,15 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
 
   protected def pruneFilterProjectRaw(
       relation: LogicalRelation,
-      projects: Seq[NamedExpression],
+      rawProjects: Seq[NamedExpression],
       filterPredicates: Seq[Expression],
       scanBuilder: (Seq[Attribute], Seq[Expression], Seq[Filter],
         ArrayBuffer[AttributeReference]) => RDD[InternalRow]) = {
+    val projects = rawProjects.map {p =>
+      p.transform {
+        case CustomDeterministicExpression(exp) => exp
+      }
+    }.asInstanceOf[Seq[NamedExpression]]
 
     val projectSet = AttributeSet(projects.flatMap(_.references))
     val filterSet = AttributeSet(filterPredicates.flatMap(_.references))
@@ -162,7 +167,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
       val handledPredicates = filterPredicates.filterNot(unhandledPredicates.contains)
       val unhandledSet = AttributeSet(unhandledPredicates.flatMap(_.references))
       AttributeSet(handledPredicates.flatMap(_.references)) --
-          (projectSet ++ unhandledSet).map(relation.attributeMap)
+      (projectSet ++ unhandledSet).map(relation.attributeMap)
     }
 
     // Combines all Catalyst filter `Expression`s that are either not convertible to data source
@@ -213,12 +218,12 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
       // when the columns of this projection are enough to evaluate all filter conditions,
       // just do a scan followed by a filter, with no extra project.
       val requestedColumns = projects
-          // Safe due to if above.
-          .asInstanceOf[Seq[Attribute]]
-          // Match original case of attributes.
-          .map(relation.attributeMap)
-          // Don't request columns that are only referenced by pushed filters.
-          .filterNot(handledSet.contains)
+        // Safe due to if above.
+        .asInstanceOf[Seq[Attribute]]
+        // Match original case of attributes.
+        .map(relation.attributeMap)
+        // Don't request columns that are only referenced by pushed filters.
+        .filterNot(handledSet.contains)
       val updateRequestedColumns = updateRequestedColumnsFunc(requestedColumns, table, needDecoder)
 
       val updateProject = projects.map { expr =>
@@ -227,7 +232,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
           val dict = map.get(attr.name)
           if (dict.isDefined && dict.get) {
             attr = AttributeReference(attr.name, IntegerType, attr.nullable, attr.metadata)(attr
-                .exprId, attr.qualifier)
+              .exprId, attr.qualifier)
           }
         }
         attr
@@ -245,17 +250,17 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
 
       var newProjectList: Seq[Attribute] = Seq.empty
       val updatedProjects = projects.map {
-          case a@Alias(s: ScalaUDF, name)
-            if name.equalsIgnoreCase(CarbonCommonConstants.POSITION_ID) ||
-                name.equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID) =>
-            val reference = AttributeReference(name, StringType, true)().withExprId(a.exprId)
-            newProjectList :+= reference
-            reference
-          case other => other
+        case a@Alias(s: ScalaUDF, name)
+          if name.equalsIgnoreCase(CarbonCommonConstants.POSITION_ID) ||
+             name.equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID) =>
+          val reference = AttributeReference(name, StringType, true)().withExprId(a.exprId)
+          newProjectList :+= reference
+          reference
+        case other => other
       }
       // Don't request columns that are only referenced by pushed filters.
       val requestedColumns =
-      (projectSet ++ filterSet -- handledSet).map(relation.attributeMap).toSeq ++ newProjectList
+        (projectSet ++ filterSet -- handledSet).map(relation.attributeMap).toSeq ++ newProjectList
       val updateRequestedColumns = updateRequestedColumnsFunc(requestedColumns, table, needDecoder)
       val scan = getDataSourceScan(relation,
         updateRequestedColumns.asInstanceOf[Seq[Attribute]],
@@ -454,9 +459,9 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
       case c@EqualTo(Literal(v, t), Cast(a: Attribute, _)) =>
         CastExpressionOptimization.checkIfCastCanBeRemove(c)
       case Not(EqualTo(a: Attribute, Literal(v, t))) =>
-          Some(sources.Not(sources.EqualTo(a.name, v)))
+        Some(sources.Not(sources.EqualTo(a.name, v)))
       case Not(EqualTo(Literal(v, t), a: Attribute)) =>
-          Some(sources.Not(sources.EqualTo(a.name, v)))
+        Some(sources.Not(sources.EqualTo(a.name, v)))
       case c@Not(EqualTo(Cast(a: Attribute, _), Literal(v, t))) =>
         CastExpressionOptimization.checkIfCastCanBeRemove(c)
       case c@Not(EqualTo(Literal(v, t), Cast(a: Attribute, _))) =>
@@ -534,6 +539,6 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
     val supportCodegen =
       sqlContext.conf.wholeStageEnabled && sqlContext.conf.wholeStageMaxNumFields >= cols.size
     supportCodegen && vectorizedReader.toBoolean &&
-      cols.forall(_.dataType.isInstanceOf[AtomicType])
+    cols.forall(_.dataType.isInstanceOf[AtomicType])
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8b38e0b3/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
index 0dca0d4..c6dd905 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
@@ -51,7 +51,7 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper {
     plan collect {
       case l: LogicalRelation if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
         CarbonDecoderRelation(l.attributeMap,
-        l.relation.asInstanceOf[CarbonDatasourceHadoopRelation])
+          l.relation.asInstanceOf[CarbonDatasourceHadoopRelation])
     }
   }
 
@@ -94,7 +94,7 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper {
         val newCols = cols.map {
           case a@Alias(s: ScalaUDF, name)
             if name.equalsIgnoreCase(CarbonCommonConstants.POSITION_ID) ||
-                name.equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID) =>
+               name.equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID) =>
             udfExists = true
             projectionToBeAdded :+= a
             AttributeReference(name, StringType, nullable = true)().withExprId(a.exprId)
@@ -311,7 +311,7 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper {
             )
 
             if (hasCarbonRelation(child) && condAttrs.size() > 0 &&
-              !child.isInstanceOf[CarbonDictionaryCatalystDecoder]) {
+                !child.isInstanceOf[CarbonDictionaryCatalystDecoder]) {
               CarbonDictionaryTempDecoder(condAttrs,
                 new util.HashSet[AttributeReferenceWrapper](),
                 child, false, Some(localAliasMap))
@@ -389,7 +389,7 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper {
             Filter(filter.condition, child)
           }
 
-         case j: Join
+        case j: Join
           if !(j.left.isInstanceOf[CarbonDictionaryTempDecoder] ||
                j.right.isInstanceOf[CarbonDictionaryTempDecoder]) =>
           val attrsOnJoin = new util.HashSet[Attribute]
@@ -720,7 +720,39 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper {
           cd
         }
     }
-    finalPlan
+
+    val updateDtrFn = finalPlan transform {
+      case p@Project(projectList: Seq[NamedExpression], cd) =>
+        if (cd.isInstanceOf[Filter] || cd.isInstanceOf[LogicalRelation]) {
+          p.transformAllExpressions {
+            case a@Alias(exp, _)
+              if !exp.deterministic && !exp.isInstanceOf[CustomDeterministicExpression] =>
+              Alias(CustomDeterministicExpression(exp), a.name)(a.exprId, a.qualifier,
+                a.explicitMetadata, a.isGenerated)
+            case exp: NamedExpression
+              if !exp.deterministic && !exp.isInstanceOf[CustomDeterministicExpression] =>
+              CustomDeterministicExpression(exp)
+          }
+        } else {
+          p
+        }
+      case f@Filter(condition: Expression, cd) =>
+        if (cd.isInstanceOf[Project] || cd.isInstanceOf[LogicalRelation]) {
+          f.transformAllExpressions {
+            case a@Alias(exp, _)
+              if !exp.deterministic && !exp.isInstanceOf[CustomDeterministicExpression] =>
+              Alias(CustomDeterministicExpression(exp), a.name)(a.exprId, a.qualifier,
+                a.explicitMetadata, a.isGenerated)
+            case exp: NamedExpression
+              if !exp.deterministic && !exp.isInstanceOf[CustomDeterministicExpression] =>
+              CustomDeterministicExpression(exp)
+          }
+        } else {
+          f
+        }
+    }
+
+    updateDtrFn
   }
 
   private def collectInformationOnAttributes(plan: LogicalPlan,
@@ -841,3 +873,4 @@ case class CarbonDecoderRelation(
 
   lazy val dictionaryMap = carbonRelation.carbonRelation.metaData.dictionaryMap
 }
+