You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ch...@apache.org on 2016/07/18 14:32:18 UTC

[1/2] incubator-carbondata git commit: Added Spark 1.6.2 support in Carbon

Repository: incubator-carbondata
Updated Branches:
  refs/heads/master 66f1e1952 -> 6948cb05d


Added Spark 1.6.2 support in Carbon

Refactored code

Reverted the wrong checkin

Refactored code

Added Support to all versions of Spark 1.5 and 1.6

Fixed review comments

Fixed review comment

Fixed review comment


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/ce6a1305
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/ce6a1305
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/ce6a1305

Branch: refs/heads/master
Commit: ce6a1305bd2ea0112ac437e64bf785e3f34e1749
Parents: 66f1e19
Author: ravipesala <ra...@gmail.com>
Authored: Tue Jul 12 14:11:08 2016 +0530
Committer: chenliang613 <ch...@apache.org>
Committed: Mon Jul 18 22:15:28 2016 +0800

----------------------------------------------------------------------
 .../spark/sql/CarbonCatalystOperators.scala     |  14 -
 .../org/apache/spark/sql/CarbonContext.scala    |  11 +-
 .../apache/spark/sql/CodeGenerateFactory.scala  | 155 ++++
 .../spark/sql/hive/CarbonStrategies.scala       |  23 +-
 .../spark/sql/optimizer/CarbonOptimizer.scala   | 736 ++++++++++---------
 .../apache/spark/util/ScalaCompilerUtil.scala   |  35 +
 .../org/carbondata/spark/CarbonFilters.scala    |  31 +-
 pom.xml                                         |   6 +-
 8 files changed, 609 insertions(+), 402 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce6a1305/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
index 14c1cca..c7333cf 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
@@ -93,17 +93,3 @@ abstract class CarbonProfile(attributes: Seq[Attribute]) extends Serializable {
 case class IncludeProfile(attributes: Seq[Attribute]) extends CarbonProfile(attributes)
 
 case class ExcludeProfile(attributes: Seq[Attribute]) extends CarbonProfile(attributes)
-
-case class FakeCarbonCast(child: Literal, dataType: DataType)
-  extends LeafExpression with CodegenFallback {
-
-  override def toString: String = s"FakeCarbonCast($child as ${ dataType.simpleString })"
-
-  override def checkInputDataTypes(): TypeCheckResult = {
-    TypeCheckResult.TypeCheckSuccess
-  }
-
-  override def nullable: Boolean = child.nullable
-
-  override def eval(input: InternalRow): Any = child.value
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce6a1305/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
index a317c1c..d47d255 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
@@ -22,13 +22,14 @@ import java.io.File
 import scala.language.implicitConversions
 
 import org.apache.spark.{Logging, SparkContext}
-import org.apache.spark.sql.catalyst.ParserDialect
+import org.apache.spark.sql.catalyst.{CatalystConf, ParserDialect}
 import org.apache.spark.sql.catalyst.analysis.{Analyzer, OverrideCatalog}
-import org.apache.spark.sql.catalyst.optimizer.{DefaultOptimizer, Optimizer}
+import org.apache.spark.sql.catalyst.optimizer.Optimizer
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.command.PartitionData
 import org.apache.spark.sql.hive._
 import org.apache.spark.sql.optimizer.CarbonOptimizer
+import org.apache.spark.util.Utils
 
 import org.carbondata.common.logging.LogServiceFactory
 import org.carbondata.core.constants.CarbonCommonConstants
@@ -54,6 +55,7 @@ class CarbonContext(
   }
 
   CarbonContext.addInstance(sc, this)
+  CodeGenerateFactory.init(sc.version)
 
   var lastSchemaUpdatedTime = System.currentTimeMillis()
 
@@ -71,7 +73,10 @@ class CarbonContext(
 
   @transient
   override protected[sql] lazy val optimizer: Optimizer =
-    new CarbonOptimizer(DefaultOptimizer, conf)
+    CarbonOptimizer.optimizer(
+      CodeGenerateFactory.createDefaultOptimizer(conf, sc),
+      conf.asInstanceOf[CarbonSQLConf],
+      sc.version)
 
   protected[sql] override def getSQLDialect(): ParserDialect = new CarbonSQLDialect(this)
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce6a1305/integration/spark/src/main/scala/org/apache/spark/sql/CodeGenerateFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CodeGenerateFactory.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CodeGenerateFactory.scala
new file mode 100644
index 0000000..7ab2bf7
--- /dev/null
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CodeGenerateFactory.scala
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.catalyst.CatalystConf
+import org.apache.spark.sql.catalyst.optimizer.Optimizer
+import org.apache.spark.sql.catalyst.plans.logical.{Expand, LogicalPlan}
+import org.apache.spark.util.{ScalaCompilerUtil, Utils}
+
+private[sql] class CodeGenerateFactory(version: String) {
+
+  val optimizerFactory = if (version.equals("1.6.2")) {
+    ScalaCompilerUtil.compiledCode(CodeTemplates.spark1_6_2_OptimizerString)
+      .asInstanceOf[AbstractCarbonOptimizerFactory]
+  } else if (version.startsWith("1.6") || version.startsWith("1.5")) {
+    ScalaCompilerUtil.compiledCode(CodeTemplates.defaultOptimizerString)
+      .asInstanceOf[AbstractCarbonOptimizerFactory]
+  } else {
+    throw new UnsupportedOperationException(s"Spark version $version is not supported")
+  }
+
+  val expandFactory = if (version.startsWith("1.5")) {
+    ScalaCompilerUtil.compiledCode(CodeTemplates.spark1_5ExpandString)
+      .asInstanceOf[AbstractCarbonExpandFactory]
+  } else if (version.startsWith("1.6")) {
+    new AbstractCarbonExpandFactory {
+      override def createExpand(expand: Expand, child: LogicalPlan): Expand = {
+        val loader = Utils.getContextOrSparkClassLoader
+        try {
+          val cons = loader.loadClass("org.apache.spark.sql.catalyst.plans.logical.Expand")
+            .getDeclaredConstructors
+          cons.head.setAccessible(true)
+          cons.head.newInstance(expand.projections, expand.output, child).asInstanceOf[Expand]
+        } catch {
+          case e: Exception => null
+        }
+      }
+    }
+  } else {
+    throw new UnsupportedOperationException(s"Spark version $version is not supported")
+  }
+
+}
+
+object CodeGenerateFactory {
+
+  private var codeGenerateFactory: CodeGenerateFactory = _
+
+  def init(version: String): Unit = {
+    if (codeGenerateFactory == null) {
+      codeGenerateFactory = new CodeGenerateFactory(version)
+    }
+  }
+
+  def getInstance(): CodeGenerateFactory = {
+    codeGenerateFactory
+  }
+
+  def createDefaultOptimizer(conf: CatalystConf, sc: SparkContext): Optimizer = {
+    val name = "org.apache.spark.sql.catalyst.optimizer.DefaultOptimizer"
+    val loader = Utils.getContextOrSparkClassLoader
+    try {
+      val cons = loader.loadClass(name + "$").getDeclaredConstructors
+      cons.head.setAccessible(true)
+      cons.head.newInstance().asInstanceOf[Optimizer]
+    } catch {
+      case e: Exception =>
+        loader.loadClass(name).getConstructor(classOf[CatalystConf])
+          .newInstance(conf).asInstanceOf[Optimizer]
+    }
+  }
+
+}
+
+object CodeTemplates {
+
+  val spark1_6_2_OptimizerString =
+    s"""
+       import org.apache.spark.sql._;
+       import org.apache.spark.sql.optimizer._;
+       import org.apache.spark.sql.catalyst.plans.logical._;
+       import org.apache.spark.sql.catalyst._;
+       import org.apache.spark.sql.catalyst.optimizer.Optimizer;
+
+       new AbstractCarbonOptimizerFactory {
+         override def createOptimizer(optimizer: Optimizer, conf: CarbonSQLConf): Optimizer = {
+           class CarbonOptimizer1(optimizer: Optimizer, conf: CarbonSQLConf)
+             extends Optimizer(conf) {
+             override val batches = Nil;
+             override def execute(plan: LogicalPlan): LogicalPlan = {
+               CarbonOptimizer.execute(plan, optimizer);
+             }
+           }
+           new CarbonOptimizer1(optimizer, conf);
+         }
+       }
+    """
+
+  val defaultOptimizerString =
+    s"""
+       import org.apache.spark.sql._;
+       import org.apache.spark.sql.optimizer._;
+       import org.apache.spark.sql.catalyst.plans.logical._;
+       import org.apache.spark.sql.catalyst._;
+       import org.apache.spark.sql.catalyst.optimizer.Optimizer;
+
+       new AbstractCarbonOptimizerFactory {
+         override def createOptimizer(optimizer: Optimizer, conf: CarbonSQLConf): Optimizer = {
+           class CarbonOptimizer2(optimizer: Optimizer, conf: CarbonSQLConf) extends Optimizer {
+             val batches = Nil;
+             override def execute(plan: LogicalPlan): LogicalPlan = {
+               CarbonOptimizer.execute(plan, optimizer);
+             }
+           }
+           new CarbonOptimizer2(optimizer, conf);
+         }
+       }
+    """
+
+  val spark1_5ExpandString =
+    s"""
+       import org.apache.spark.sql._
+       import org.apache.spark.sql.catalyst.plans.logical.{Expand, LogicalPlan}
+       new AbstractCarbonExpandFactory {
+         override def createExpand(expand: Expand, child: LogicalPlan): Expand = {
+           Expand(expand.bitmasks, expand.groupByExprs, expand.gid, child)
+         }
+       }
+    """
+}
+
+abstract class AbstractCarbonOptimizerFactory {
+  def createOptimizer(optimizer: Optimizer, conf: CarbonSQLConf) : Optimizer
+}
+
+abstract class AbstractCarbonExpandFactory {
+  def createExpand(expand: Expand, child: LogicalPlan) : Expand
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce6a1305/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
index f620f13..834aec0 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
@@ -59,15 +59,12 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] {
 
     def apply(plan: LogicalPlan): Seq[SparkPlan] = {
       plan match {
-        case PhysicalOperation(projectList, predicates,
-        l@LogicalRelation(carbonRelation: CarbonDatasourceRelation, _)) =>
+        case PhysicalOperation(projectList, predicates, l: LogicalRelation)
+            if l.relation.isInstanceOf[CarbonDatasourceRelation] =>
           if (isStarQuery(plan)) {
-            carbonRawScanForStarQuery(projectList, predicates, carbonRelation, l)(sqlContext) :: Nil
+            carbonRawScanForStarQuery(projectList, predicates, l)(sqlContext) :: Nil
           } else {
-            carbonRawScan(projectList,
-              predicates,
-              carbonRelation,
-              l)(sqlContext) :: Nil
+            carbonRawScan(projectList, predicates, l)(sqlContext) :: Nil
           }
         case CarbonDictionaryCatalystDecoder(relations, profile, aliasMap, _, child) =>
           CarbonDictionaryDecoder(relations,
@@ -84,9 +81,9 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] {
      */
     private def carbonRawScan(projectList: Seq[NamedExpression],
       predicates: Seq[Expression],
-      relation: CarbonDatasourceRelation,
       logicalRelation: LogicalRelation)(sc: SQLContext): SparkPlan = {
 
+      val relation = logicalRelation.relation.asInstanceOf[CarbonDatasourceRelation]
       val tableName: String =
         relation.carbonRelation.metaData.carbonTable.getFactTableName.toLowerCase
       // Check out any expressions are there in project list. if they are present then we need to
@@ -128,9 +125,8 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] {
      */
     private def carbonRawScanForStarQuery(projectList: Seq[NamedExpression],
       predicates: Seq[Expression],
-      relation: CarbonDatasourceRelation,
       logicalRelation: LogicalRelation)(sc: SQLContext): SparkPlan = {
-
+      val relation = logicalRelation.relation.asInstanceOf[CarbonDatasourceRelation]
       val tableName: String =
         relation.carbonRelation.metaData.carbonTable.getFactTableName.toLowerCase
       // Check out any expressions are there in project list. if they are present then we need to
@@ -213,9 +209,10 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] {
 
     private def isStarQuery(plan: LogicalPlan) = {
       plan match {
-        case LogicalFilter(condition,
-        LogicalRelation(carbonRelation: CarbonDatasourceRelation, _)) => true
-        case LogicalRelation(carbonRelation: CarbonDatasourceRelation, _) => true
+        case LogicalFilter(condition, l: LogicalRelation)
+            if l.relation.isInstanceOf[CarbonDatasourceRelation] =>
+          true
+        case l: LogicalRelation if l.relation.isInstanceOf[CarbonDatasourceRelation] => true
         case _ => false
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce6a1305/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
index f1922d4..8ad3c5b 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
@@ -23,11 +23,10 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.CatalystConf
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate._
 import org.apache.spark.sql.catalyst.optimizer.Optimizer
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, _}
+import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.types.{IntegerType, StringType}
@@ -37,427 +36,460 @@ import org.carbondata.spark.CarbonFilters
 /**
  * Carbon Optimizer to add dictionary decoder.
  */
-class CarbonOptimizer(optimizer: Optimizer, conf: CatalystConf)
-  extends Optimizer with PredicateHelper {
+object CarbonOptimizer {
 
-  val batches = Nil
+  def optimizer(optimizer: Optimizer, conf: CarbonSQLConf, version: String): Optimizer = {
+    CodeGenerateFactory.getInstance().optimizerFactory.createOptimizer(optimizer, conf)
+  }
 
-  override def execute(plan: LogicalPlan): LogicalPlan = {
+  def execute(plan: LogicalPlan, optimizer: Optimizer): LogicalPlan = {
     val executedPlan: LogicalPlan = optimizer.execute(plan)
-    val relations = collectCarbonRelation(plan)
+    val relations = CarbonOptimizer.collectCarbonRelation(plan)
     if (relations.nonEmpty) {
-      new ResolveCarbonFunctions(relations)(executedPlan)
+      new ResolveCarbonFunctions(relations).apply(executedPlan)
     } else {
       executedPlan
     }
   }
 
-  /**
-   * It does two jobs. 1. Change the datatype for dictionary encoded column 2. Add the dictionary
-   * decoder plan.
-   */
-  class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation]) extends
-    Rule[LogicalPlan] {
-    def apply(plan: LogicalPlan): LogicalPlan = {
-      transformCarbonPlan(plan, relations)
+  // get the carbon relation from plan.
+  def collectCarbonRelation(plan: LogicalPlan): Seq[CarbonDecoderRelation] = {
+    plan collect {
+      case l: LogicalRelation if l.relation.isInstanceOf[CarbonDatasourceRelation] =>
+        CarbonDecoderRelation(l.attributeMap, l.relation.asInstanceOf[CarbonDatasourceRelation])
     }
+  }
+}
+
+/**
+ * It does two jobs. 1. Change the datatype for dictionary encoded column 2. Add the dictionary
+ * decoder plan.
+ */
+class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation])
+  extends Rule[LogicalPlan] with PredicateHelper {
 
-    /**
-     * Steps for changing the plan.
-     * 1. It finds out the join condition columns and dimension aggregate columns which are need to
-     * be decoded just before that plan executes.
-     * 2. Plan starts transform by adding the decoder to the plan where it needs the decoded data
-     * like dimension aggregate columns decoder under aggregator and join condition decoder under
-     * join children.
-     */
-    def transformCarbonPlan(plan: LogicalPlan,
-        relations: Seq[CarbonDecoderRelation]): LogicalPlan = {
-      var decoder = false
-      val aliasMap = CarbonAliasDecoderRelation()
-      // collect alias information before hand.
-      collectInformationOnAttributes(plan, aliasMap)
-      val transFormedPlan =
-        plan transformDown {
-          case cd: CarbonDictionaryTempDecoder if cd.isOuter =>
+  def apply(plan: LogicalPlan): LogicalPlan = {
+    transformCarbonPlan(plan, relations)
+  }
+
+  /**
+   * Steps for changing the plan.
+   * 1. It finds out the join condition columns and dimension aggregate columns which are need to
+   * be decoded just before that plan executes.
+   * 2. Plan starts transform by adding the decoder to the plan where it needs the decoded data
+   * like dimension aggregate columns decoder under aggregator and join condition decoder under
+   * join children.
+   */
+  def transformCarbonPlan(plan: LogicalPlan,
+      relations: Seq[CarbonDecoderRelation]): LogicalPlan = {
+    var decoder = false
+    val aliasMap = CarbonAliasDecoderRelation()
+    // collect alias information before hand.
+    collectInformationOnAttributes(plan, aliasMap)
+    val transFormedPlan =
+      plan transformDown {
+        case cd: CarbonDictionaryTempDecoder if cd.isOuter =>
+          decoder = true
+          cd
+        case sort: Sort if !sort.child.isInstanceOf[CarbonDictionaryTempDecoder] =>
+          val attrsOnSort = new util.HashSet[Attribute]()
+          sort.order.map { s =>
+            s.collect {
+              case attr: AttributeReference
+                if isDictionaryEncoded(attr, relations, aliasMap) =>
+                attrsOnSort.add(aliasMap.getOrElse(attr, attr))
+            }
+          }
+          var child = sort.child
+          if (attrsOnSort.size() > 0 && !child.isInstanceOf[Sort]) {
+            child = CarbonDictionaryTempDecoder(attrsOnSort,
+              new util.HashSet[Attribute](), sort.child)
+          }
+          if (!decoder) {
             decoder = true
-            cd
-          case sort: Sort if !sort.child.isInstanceOf[CarbonDictionaryTempDecoder] =>
-            val attrsOnSort = new util.HashSet[Attribute]()
-            sort.order.map { s =>
-              s.collect {
+            CarbonDictionaryTempDecoder(new util.HashSet[Attribute](),
+              new util.HashSet[Attribute](),
+              Sort(sort.order, sort.global, child),
+              isOuter = true)
+          } else {
+            Sort(sort.order, sort.global, child)
+          }
+
+        case agg: Aggregate if !agg.child.isInstanceOf[CarbonDictionaryTempDecoder] =>
+          val attrsOndimAggs = new util.HashSet[Attribute]
+          agg.aggregateExpressions.map {
+            case attr: AttributeReference =>
+            case a@Alias(attr: AttributeReference, name) => aliasMap.put(a.toAttribute, attr)
+            case aggExp: AggregateExpression =>
+              aggExp.transform {
+                case aggExp: AggregateExpression =>
+                  collectDimensionAggregates(aggExp, attrsOndimAggs, aliasMap)
+                  aggExp
+                case a@Alias(attr: Attribute, name) =>
+                  aliasMap.put(a.toAttribute, attr)
+                  a
+              }
+            case others =>
+              others.collect {
+                case a@ Alias(attr: AttributeReference, _) => aliasMap.put(a.toAttribute, attr)
+                case a@Alias(exp, _) if !exp.isInstanceOf[AttributeReference] =>
+                  aliasMap.put(a.toAttribute, new AttributeReference("", StringType)())
                 case attr: AttributeReference
                   if isDictionaryEncoded(attr, relations, aliasMap) =>
-                  attrsOnSort.add(aliasMap.getOrElse(attr, attr))
+                  attrsOndimAggs.add(aliasMap.getOrElse(attr, attr))
               }
-            }
-            var child = sort.child
-            if (attrsOnSort.size() > 0 && !child.isInstanceOf[Sort]) {
-              child = CarbonDictionaryTempDecoder(attrsOnSort,
-                new util.HashSet[Attribute](), sort.child)
-            }
-            if (!decoder) {
-              decoder = true
-              CarbonDictionaryTempDecoder(new util.HashSet[Attribute](),
-                new util.HashSet[Attribute](),
-                Sort(sort.order, sort.global, child),
-                isOuter = true)
-            } else {
-              Sort(sort.order, sort.global, child)
-            }
-
-          case agg: Aggregate if !agg.child.isInstanceOf[CarbonDictionaryTempDecoder] =>
-            val attrsOndimAggs = new util.HashSet[Attribute]
-            agg.aggregateExpressions.map {
+          }
+          var child = agg.child
+          // Incase if the child also aggregate then push down decoder to child
+          if (attrsOndimAggs.size() > 0 && !child.equals(agg)) {
+            child = CarbonDictionaryTempDecoder(attrsOndimAggs,
+              new util.HashSet[Attribute](),
+              agg.child)
+          }
+          if (!decoder) {
+            decoder = true
+            CarbonDictionaryTempDecoder(new util.HashSet[Attribute](),
+              new util.HashSet[Attribute](),
+              Aggregate(agg.groupingExpressions, agg.aggregateExpressions, child),
+              isOuter = true)
+          } else {
+            Aggregate(agg.groupingExpressions, agg.aggregateExpressions, child)
+          }
+        case expand: Expand if !expand.child.isInstanceOf[CarbonDictionaryTempDecoder] =>
+          val attrsOnExpand = new util.HashSet[Attribute]
+          expand.projections.map {s =>
+            s.map {
               case attr: AttributeReference =>
               case a@Alias(attr: AttributeReference, name) => aliasMap.put(a.toAttribute, attr)
-              case aggExp: AggregateExpression =>
-                aggExp.transform {
-                  case aggExp: AggregateExpression =>
-                    collectDimensionAggregates(aggExp, attrsOndimAggs, aliasMap)
-                    aggExp
-                  case a@Alias(attr: Attribute, name) =>
-                    aliasMap.put(a.toAttribute, attr)
-                    a
-                }
               case others =>
                 others.collect {
                   case attr: AttributeReference
                     if isDictionaryEncoded(attr, relations, aliasMap) =>
-                    attrsOndimAggs.add(aliasMap.getOrElse(attr, attr))
+                    attrsOnExpand.add(aliasMap.getOrElse(attr, attr))
                 }
             }
-            var child = agg.child
-            // Incase if the child also aggregate then push down decoder to child
-            if (attrsOndimAggs.size() > 0 && !child.equals(agg)) {
-              child = CarbonDictionaryTempDecoder(attrsOndimAggs,
-                new util.HashSet[Attribute](),
-                agg.child)
-            }
-            if (!decoder) {
-              decoder = true
-              CarbonDictionaryTempDecoder(new util.HashSet[Attribute](),
-                new util.HashSet[Attribute](),
-                Aggregate(agg.groupingExpressions, agg.aggregateExpressions, child),
-                isOuter = true)
-            } else {
-              Aggregate(agg.groupingExpressions, agg.aggregateExpressions, child)
-            }
-
-          case filter: Filter if !filter.child.isInstanceOf[CarbonDictionaryTempDecoder] =>
-            val attrsOnConds = new util.HashSet[Attribute]
-            CarbonFilters
-              .selectFilters(splitConjunctivePredicates(filter.condition), attrsOnConds, aliasMap)
-
-            var child = filter.child
-            if (attrsOnConds.size() > 0 && !child.isInstanceOf[Filter]) {
-              child = CarbonDictionaryTempDecoder(attrsOnConds,
-                new util.HashSet[Attribute](),
-                filter.child)
-            }
+          }
+          var child = expand.child
+          if (attrsOnExpand.size() > 0 && !child.isInstanceOf[Expand]) {
+            child = CarbonDictionaryTempDecoder(attrsOnExpand,
+              new util.HashSet[Attribute](),
+              expand.child)
+          }
+          if (!decoder) {
+            decoder = true
+            CarbonDictionaryTempDecoder(new util.HashSet[Attribute](),
+              new util.HashSet[Attribute](),
+              CodeGenerateFactory.getInstance().expandFactory.createExpand(expand, child),
+              isOuter = true)
+          } else {
+            CodeGenerateFactory.getInstance().expandFactory.createExpand(expand, child)
+          }
+        case filter: Filter if !filter.child.isInstanceOf[CarbonDictionaryTempDecoder] =>
+          val attrsOnConds = new util.HashSet[Attribute]
+          CarbonFilters
+            .selectFilters(splitConjunctivePredicates(filter.condition), attrsOnConds, aliasMap)
+
+          var child = filter.child
+          if (attrsOnConds.size() > 0 && !child.isInstanceOf[Filter]) {
+            child = CarbonDictionaryTempDecoder(attrsOnConds,
+              new util.HashSet[Attribute](),
+              filter.child)
+          }
 
-            if (!decoder) {
-              decoder = true
-              CarbonDictionaryTempDecoder(new util.HashSet[Attribute](),
-                new util.HashSet[Attribute](),
-                Filter(filter.condition, child),
-                isOuter = true)
-            } else {
-              Filter(filter.condition, child)
-            }
+          if (!decoder) {
+            decoder = true
+            CarbonDictionaryTempDecoder(new util.HashSet[Attribute](),
+              new util.HashSet[Attribute](),
+              Filter(filter.condition, child),
+              isOuter = true)
+          } else {
+            Filter(filter.condition, child)
+          }
 
-          case j: Join
+        case j: Join
             if !(j.left.isInstanceOf[CarbonDictionaryTempDecoder] ||
                  j.right.isInstanceOf[CarbonDictionaryTempDecoder]) =>
-            val attrsOnJoin = new util.HashSet[Attribute]
-            j.condition match {
-              case Some(expression) =>
-                expression.collect {
-                  case attr: AttributeReference
-                    if isDictionaryEncoded(attr, relations, aliasMap) =>
-                    attrsOnJoin.add(aliasMap.getOrElse(attr, attr))
-                }
-              case _ =>
-            }
+          val attrsOnJoin = new util.HashSet[Attribute]
+          j.condition match {
+            case Some(expression) =>
+              expression.collect {
+                case attr: AttributeReference
+                  if isDictionaryEncoded(attr, relations, aliasMap) =>
+                  attrsOnJoin.add(aliasMap.getOrElse(attr, attr))
+              }
+            case _ =>
+          }
 
-            val leftCondAttrs = new util.HashSet[Attribute]
-            val rightCondAttrs = new util.HashSet[Attribute]
-            if (attrsOnJoin.size() > 0) {
+          val leftCondAttrs = new util.HashSet[Attribute]
+          val rightCondAttrs = new util.HashSet[Attribute]
+          if (attrsOnJoin.size() > 0) {
 
-              attrsOnJoin.asScala.map { attr =>
-                if (qualifierPresence(j.left, attr)) {
-                  leftCondAttrs.add(attr)
-                }
-                if (qualifierPresence(j.right, attr)) {
-                  rightCondAttrs.add(attr)
-                }
-              }
-              var leftPlan = j.left
-              var rightPlan = j.right
-              if (leftCondAttrs.size() > 0 &&
-                  !leftPlan.isInstanceOf[CarbonDictionaryCatalystDecoder]) {
-                leftPlan = CarbonDictionaryTempDecoder(leftCondAttrs,
-                  new util.HashSet[Attribute](),
-                  j.left)
-              }
-              if (rightCondAttrs.size() > 0 &&
-                  !rightPlan.isInstanceOf[CarbonDictionaryCatalystDecoder]) {
-                rightPlan = CarbonDictionaryTempDecoder(rightCondAttrs,
-                  new util.HashSet[Attribute](),
-                  j.right)
+            attrsOnJoin.asScala.map { attr =>
+              if (qualifierPresence(j.left, attr)) {
+                leftCondAttrs.add(attr)
               }
-              if (!decoder) {
-                decoder = true
-                CarbonDictionaryTempDecoder(new util.HashSet[Attribute](),
-                  new util.HashSet[Attribute](),
-                  Join(leftPlan, rightPlan, j.joinType, j.condition),
-                  isOuter = true)
-              } else {
-                Join(leftPlan, rightPlan, j.joinType, j.condition)
+              if (qualifierPresence(j.right, attr)) {
+                rightCondAttrs.add(attr)
               }
-            } else {
-              j
             }
-
-          case p: Project
-            if relations.nonEmpty && !p.child.isInstanceOf[CarbonDictionaryTempDecoder] =>
-            val attrsOnProjects = new util.HashSet[Attribute]
-            p.projectList.map {
-              case attr: AttributeReference =>
-              case a@Alias(attr: AttributeReference, name) => aliasMap.put(a.toAttribute, attr)
-              case others =>
-                others.collect {
-                  case attr: AttributeReference
-                    if isDictionaryEncoded(attr, relations, aliasMap) =>
-                    attrsOnProjects.add(aliasMap.getOrElse(attr, attr))
-                }
+            var leftPlan = j.left
+            var rightPlan = j.right
+            if (leftCondAttrs.size() > 0 &&
+                !leftPlan.isInstanceOf[CarbonDictionaryCatalystDecoder]) {
+              leftPlan = CarbonDictionaryTempDecoder(leftCondAttrs,
+                new util.HashSet[Attribute](),
+                j.left)
             }
-            var child = p.child
-            if (attrsOnProjects.size() > 0 && !child.isInstanceOf[Project]) {
-              child = CarbonDictionaryTempDecoder(attrsOnProjects,
+            if (rightCondAttrs.size() > 0 &&
+                !rightPlan.isInstanceOf[CarbonDictionaryCatalystDecoder]) {
+              rightPlan = CarbonDictionaryTempDecoder(rightCondAttrs,
                 new util.HashSet[Attribute](),
-                p.child)
+                j.right)
             }
             if (!decoder) {
               decoder = true
               CarbonDictionaryTempDecoder(new util.HashSet[Attribute](),
                 new util.HashSet[Attribute](),
-                Project(p.projectList, child),
+                Join(leftPlan, rightPlan, j.joinType, j.condition),
                 isOuter = true)
             } else {
-              Project(p.projectList, child)
-            }
-
-          case l@LogicalRelation(carbonRelation: CarbonDatasourceRelation, _) =>
-            if (!decoder) {
-              decoder = true
-              CarbonDictionaryTempDecoder(new util.HashSet[Attribute](),
-                new util.HashSet[Attribute](), l, isOuter = true)
-            } else {
-              l
+              Join(leftPlan, rightPlan, j.joinType, j.condition)
             }
-
-          case others => others
-        }
-
-      val processor = new CarbonDecoderProcessor
-      processor.updateDecoders(processor.getDecoderList(transFormedPlan))
-      updateProjection(updateTempDecoder(transFormedPlan, aliasMap))
-    }
-
-    private def updateTempDecoder(plan: LogicalPlan,
-        aliasMap: CarbonAliasDecoderRelation): LogicalPlan = {
-      var allAttrsNotDecode: util.Set[Attribute] = new util.HashSet[Attribute]()
-      val marker = new CarbonPlanMarker
-      plan transformDown {
-        case cd: CarbonDictionaryTempDecoder if !cd.processed =>
-          cd.processed = true
-          allAttrsNotDecode = cd.attrsNotDecode
-          marker.pushMarker(cd.attrsNotDecode)
-          if (cd.isOuter) {
-            CarbonDictionaryCatalystDecoder(relations,
-              ExcludeProfile(cd.attrsNotDecode.asScala.toSeq),
-              aliasMap,
-              isOuter = true,
-              cd.child)
           } else {
-            CarbonDictionaryCatalystDecoder(relations,
-              IncludeProfile(cd.attrList.asScala.toSeq),
-              aliasMap,
-              isOuter = false,
-              cd.child)
+            j
           }
-        case cd: CarbonDictionaryCatalystDecoder =>
-          cd
-        case sort: Sort =>
-          val sortExprs = sort.order.map { s =>
-            s.transform {
-              case attr: AttributeReference =>
-                updateDataType(attr, relations, allAttrsNotDecode, aliasMap)
-            }.asInstanceOf[SortOrder]
-          }
-          Sort(sortExprs, sort.global, sort.child)
-        case agg: Aggregate if !agg.child.isInstanceOf[CarbonDictionaryCatalystDecoder] =>
-          val aggExps = agg.aggregateExpressions.map { aggExp =>
-            aggExp.transform {
-              case attr: AttributeReference =>
-                updateDataType(attr, relations, allAttrsNotDecode, aliasMap)
-            }
-          }.asInstanceOf[Seq[NamedExpression]]
 
-          val grpExps = agg.groupingExpressions.map { gexp =>
-            gexp.transform {
-              case attr: AttributeReference =>
-                updateDataType(attr, relations, allAttrsNotDecode, aliasMap)
-            }
-          }
-          Aggregate(grpExps, aggExps, agg.child)
-        case filter: Filter =>
-          val filterExps = filter.condition transform {
+        case p: Project
+            if relations.nonEmpty && !p.child.isInstanceOf[CarbonDictionaryTempDecoder] =>
+          val attrsOnProjects = new util.HashSet[Attribute]
+          p.projectList.map {
             case attr: AttributeReference =>
-              updateDataType(attr, relations, allAttrsNotDecode, aliasMap)
-            case l: Literal => FakeCarbonCast(l, l.dataType)
+            case a@Alias(attr: AttributeReference, name) => aliasMap.put(a.toAttribute, attr)
+            case others =>
+              others.collect {
+                case attr: AttributeReference
+                  if isDictionaryEncoded(attr, relations, aliasMap) =>
+                  attrsOnProjects.add(aliasMap.getOrElse(attr, attr))
+              }
           }
-          Filter(filterExps, filter.child)
-        case j: Join =>
-          marker.pushJoinMarker(allAttrsNotDecode)
-          j
-        case p: Project if relations.nonEmpty =>
-          val prExps = p.projectList.map { prExp =>
-            prExp.transform {
-              case attr: AttributeReference =>
-                updateDataType(attr, relations, allAttrsNotDecode, aliasMap)
-            }
-          }.asInstanceOf[Seq[NamedExpression]]
-          Project(prExps, p.child)
-        case l@LogicalRelation(carbonRelation: CarbonDatasourceRelation, _) =>
-          allAttrsNotDecode = marker.revokeJoin()
-          l
-        case others => others
-      }
-    }
-
-    private def updateProjection(plan: LogicalPlan): LogicalPlan = {
-      val transFormedPlan = plan transform {
-        case p@Project(projectList: Seq[NamedExpression], cd: CarbonDictionaryCatalystDecoder) =>
-          if (cd.child.isInstanceOf[Filter] || cd.child.isInstanceOf[LogicalRelation]) {
-            Project(projectList: Seq[NamedExpression], cd.child)
+          var child = p.child
+          if (attrsOnProjects.size() > 0 && !child.isInstanceOf[Project]) {
+            child = CarbonDictionaryTempDecoder(attrsOnProjects,
+              new util.HashSet[Attribute](),
+              p.child)
+          }
+          if (!decoder) {
+            decoder = true
+            CarbonDictionaryTempDecoder(new util.HashSet[Attribute](),
+              new util.HashSet[Attribute](),
+              Project(p.projectList, child),
+              isOuter = true)
           } else {
-            p
+            Project(p.projectList, child)
           }
-        case f@Filter(condition: Expression, cd: CarbonDictionaryCatalystDecoder) =>
-          if (cd.child.isInstanceOf[Project] || cd.child.isInstanceOf[LogicalRelation]) {
-            Filter(condition, cd.child)
+
+        case l: LogicalRelation if l.relation.isInstanceOf[CarbonDatasourceRelation] =>
+          if (!decoder) {
+            decoder = true
+            CarbonDictionaryTempDecoder(new util.HashSet[Attribute](),
+              new util.HashSet[Attribute](), l, isOuter = true)
           } else {
-            f
+            l
           }
+
+        case others => others
       }
-      // Remove unnecessary decoders
-      val finalPlan = transFormedPlan transform {
-        case CarbonDictionaryCatalystDecoder(_, profile, _, false, child)
-          if profile.isInstanceOf[IncludeProfile] && profile.isEmpty => child
-      }
-      finalPlan
-    }
 
-    private def collectInformationOnAttributes(plan: LogicalPlan,
-        aliasMap: CarbonAliasDecoderRelation) {
-      plan transformUp {
-        case project: Project =>
-          project.projectList.map { p =>
-            p transform {
-              case a@Alias(attr: Attribute, name) =>
-                aliasMap.put(a.toAttribute, attr)
-                a
-              case a@Alias(_, name) =>
-                aliasMap.put(a.toAttribute, new AttributeReference("", StringType)())
-                a
-            }
+    val processor = new CarbonDecoderProcessor
+    processor.updateDecoders(processor.getDecoderList(transFormedPlan))
+    updateProjection(updateTempDecoder(transFormedPlan, aliasMap))
+  }
+
+  private def updateTempDecoder(plan: LogicalPlan,
+      aliasMap: CarbonAliasDecoderRelation): LogicalPlan = {
+    var allAttrsNotDecode: util.Set[Attribute] = new util.HashSet[Attribute]()
+    val marker = new CarbonPlanMarker
+    plan transformDown {
+      case cd: CarbonDictionaryTempDecoder if !cd.processed =>
+        cd.processed = true
+        allAttrsNotDecode = cd.attrsNotDecode
+        marker.pushMarker(cd.attrsNotDecode)
+        if (cd.isOuter) {
+          CarbonDictionaryCatalystDecoder(relations,
+            ExcludeProfile(cd.attrsNotDecode.asScala.toSeq),
+            aliasMap,
+            isOuter = true,
+            cd.child)
+        } else {
+          CarbonDictionaryCatalystDecoder(relations,
+            IncludeProfile(cd.attrList.asScala.toSeq),
+            aliasMap,
+            isOuter = false,
+            cd.child)
+        }
+      case cd: CarbonDictionaryCatalystDecoder =>
+        cd
+      case sort: Sort =>
+        val sortExprs = sort.order.map { s =>
+          s.transform {
+            case attr: AttributeReference =>
+              updateDataType(attr, relations, allAttrsNotDecode, aliasMap)
+          }.asInstanceOf[SortOrder]
+        }
+        Sort(sortExprs, sort.global, sort.child)
+      case agg: Aggregate if !agg.child.isInstanceOf[CarbonDictionaryCatalystDecoder] =>
+        val aggExps = agg.aggregateExpressions.map { aggExp =>
+          aggExp.transform {
+            case attr: AttributeReference =>
+              updateDataType(attr, relations, allAttrsNotDecode, aliasMap)
           }
-          project
-        case agg: Aggregate =>
-          agg.aggregateExpressions.map { aggExp =>
-            aggExp.transform {
-              case a@Alias(attr: Attribute, name) =>
-                aliasMap.put(a.toAttribute, attr)
-                a
-              case a@Alias(_, name) =>
-                aliasMap.put(a.toAttribute, new AttributeReference("", StringType)())
-                a
-            }
+        }.asInstanceOf[Seq[NamedExpression]]
+
+        val grpExps = agg.groupingExpressions.map { gexp =>
+          gexp.transform {
+            case attr: AttributeReference =>
+              updateDataType(attr, relations, allAttrsNotDecode, aliasMap)
           }
-          agg
-      }
+        }
+        Aggregate(grpExps, aggExps, agg.child)
+      case filter: Filter =>
+        val filterExps = filter.condition transform {
+          case attr: AttributeReference =>
+            updateDataType(attr, relations, allAttrsNotDecode, aliasMap)
+        }
+        Filter(filterExps, filter.child)
+      case j: Join =>
+        marker.pushJoinMarker(allAttrsNotDecode)
+        j
+      case p: Project if relations.nonEmpty =>
+        val prExps = p.projectList.map { prExp =>
+          prExp.transform {
+            case attr: AttributeReference =>
+              updateDataType(attr, relations, allAttrsNotDecode, aliasMap)
+          }
+        }.asInstanceOf[Seq[NamedExpression]]
+        Project(prExps, p.child)
+      case l: LogicalRelation if l.relation.isInstanceOf[CarbonDatasourceRelation] =>
+        allAttrsNotDecode = marker.revokeJoin()
+        l
+      case others => others
     }
+  }
 
-    // Collect aggregates on dimensions so that we can add decoder to it.
-    private def collectDimensionAggregates(aggExp: AggregateExpression,
-        attrsOndimAggs: util.HashSet[Attribute],
-        aliasMap: CarbonAliasDecoderRelation) {
-      aggExp collect {
-        case attr: AttributeReference if isDictionaryEncoded(attr, relations, aliasMap) =>
-          attrsOndimAggs.add(aliasMap.getOrElse(attr, attr))
-        case a@Alias(attr: Attribute, name) => aliasMap.put(a.toAttribute, attr)
-      }
+  private def updateProjection(plan: LogicalPlan): LogicalPlan = {
+    val transFormedPlan = plan transform {
+      case p@Project(projectList: Seq[NamedExpression], cd: CarbonDictionaryCatalystDecoder) =>
+        if (cd.child.isInstanceOf[Filter] || cd.child.isInstanceOf[LogicalRelation]) {
+          Project(projectList: Seq[NamedExpression], cd.child)
+        } else {
+          p
+        }
+      case f@Filter(condition: Expression, cd: CarbonDictionaryCatalystDecoder) =>
+        if (cd.child.isInstanceOf[Project] || cd.child.isInstanceOf[LogicalRelation]) {
+          Filter(condition, cd.child)
+        } else {
+          f
+        }
     }
+    // Remove unnecessary decoders
+    val finalPlan = transFormedPlan transform {
+      case CarbonDictionaryCatalystDecoder(_, profile, _, false, child)
+          if profile.isInstanceOf[IncludeProfile] && profile.isEmpty =>
+        child
+    }
+    finalPlan
+  }
 
-    /**
-     * Update the attribute datatype with [IntegerType] if the carbon column is encoded with
-     * dictionary.
-     *
-     */
-    private def updateDataType(attr: Attribute,
-        relations: Seq[CarbonDecoderRelation],
-        allAttrsNotDecode: util.Set[Attribute],
-        aliasMap: CarbonAliasDecoderRelation) = {
-      val uAttr = aliasMap.getOrElse(attr, attr)
-      val relation = relations.find(p => p.contains(uAttr))
-      if (relation.isDefined) {
-        relation.get.carbonRelation.carbonRelation.metaData.dictionaryMap.get(uAttr.name) match {
-          case Some(true) if !allAttrsNotDecode.asScala.exists(p => p.name.equals(uAttr.name)) =>
-            val newAttr = AttributeReference(attr.name,
-              IntegerType,
-              attr.nullable,
-              attr.metadata)(attr.exprId, attr.qualifiers)
-            relation.get.addAttribute(newAttr)
-            newAttr
-          case _ => attr
+  private def collectInformationOnAttributes(plan: LogicalPlan,
+      aliasMap: CarbonAliasDecoderRelation) {
+    plan transformUp {
+      case project: Project =>
+        project.projectList.map { p =>
+          p transform {
+            case a@Alias(attr: Attribute, name) =>
+              aliasMap.put(a.toAttribute, attr)
+              a
+            case a@Alias(_, name) =>
+              aliasMap.put(a.toAttribute, new AttributeReference("", StringType)())
+              a
+          }
         }
-      } else {
-        attr
-      }
+        project
+      case agg: Aggregate =>
+        agg.aggregateExpressions.map { aggExp =>
+          aggExp.transform {
+            case a@Alias(attr: Attribute, name) =>
+              aliasMap.put(a.toAttribute, attr)
+              a
+            case a@Alias(_, name) =>
+              aliasMap.put(a.toAttribute, new AttributeReference("", StringType)())
+              a
+          }
+        }
+        agg
     }
+  }
 
-    private def isDictionaryEncoded(attr: Attribute,
-        relations: Seq[CarbonDecoderRelation],
-        aliasMap: CarbonAliasDecoderRelation): Boolean = {
-      val uAttr = aliasMap.getOrElse(attr, attr)
-      val relation = relations.find(p => p.contains(uAttr))
-      if (relation.isDefined) {
-        relation.get.carbonRelation.carbonRelation.metaData.dictionaryMap.get(uAttr.name) match {
-          case Some(true) => true
-          case _ => false
-        }
-      } else {
-        false
+  // Collect aggregates on dimensions so that we can add decoder to it.
+  private def collectDimensionAggregates(aggExp: AggregateExpression,
+      attrsOndimAggs: util.HashSet[Attribute],
+      aliasMap: CarbonAliasDecoderRelation) {
+    aggExp collect {
+      case attr: AttributeReference if isDictionaryEncoded(attr, relations, aliasMap) =>
+        attrsOndimAggs.add(aliasMap.getOrElse(attr, attr))
+      case a@Alias(attr: Attribute, name) => aliasMap.put(a.toAttribute, attr)
+    }
+  }
+
+  /**
+   * Update the attribute datatype with [IntegerType] if the carbon column is encoded with
+   * dictionary.
+   *
+   */
+  private def updateDataType(attr: Attribute,
+      relations: Seq[CarbonDecoderRelation],
+      allAttrsNotDecode: util.Set[Attribute],
+      aliasMap: CarbonAliasDecoderRelation): Attribute = {
+    val uAttr = aliasMap.getOrElse(attr, attr)
+    val relation = relations.find(p => p.contains(uAttr))
+    if (relation.isDefined) {
+      relation.get.carbonRelation.carbonRelation.metaData.dictionaryMap.get(uAttr.name) match {
+        case Some(true) if !allAttrsNotDecode.asScala.exists(p => p.name.equals(uAttr.name)) =>
+          val newAttr = AttributeReference(attr.name,
+            IntegerType,
+            attr.nullable,
+            attr.metadata)(attr.exprId, attr.qualifiers)
+          relation.get.addAttribute(newAttr)
+          newAttr
+        case _ => attr
       }
+    } else {
+      attr
     }
+  }
 
-    def qualifierPresence(plan: LogicalPlan, attr: Attribute): Boolean = {
-      var present = false
-      plan collect {
-        case l: LogicalRelation if l.attributeMap.contains(attr) =>
-          present = true
+  private def isDictionaryEncoded(attr: Attribute,
+      relations: Seq[CarbonDecoderRelation],
+      aliasMap: CarbonAliasDecoderRelation): Boolean = {
+    val uAttr = aliasMap.getOrElse(attr, attr)
+    val relation = relations.find(p => p.contains(uAttr))
+    if (relation.isDefined) {
+      relation.get.carbonRelation.carbonRelation.metaData.dictionaryMap.get(uAttr.name) match {
+        case Some(true) => true
+        case _ => false
       }
-      present
+    } else {
+      false
     }
   }
 
-  // get the carbon relation from plan.
-  def collectCarbonRelation(plan: LogicalPlan): Seq[CarbonDecoderRelation] = {
+  def qualifierPresence(plan: LogicalPlan, attr: Attribute): Boolean = {
+    var present = false
     plan collect {
-      case l@LogicalRelation(carbonRelation: CarbonDatasourceRelation, _) =>
-        CarbonDecoderRelation(l.attributeMap, carbonRelation)
+      case l: LogicalRelation if l.attributeMap.contains(attr) =>
+        present = true
     }
+    present
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce6a1305/integration/spark/src/main/scala/org/apache/spark/util/ScalaCompilerUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/util/ScalaCompilerUtil.scala b/integration/spark/src/main/scala/org/apache/spark/util/ScalaCompilerUtil.scala
new file mode 100644
index 0000000..516ba58
--- /dev/null
+++ b/integration/spark/src/main/scala/org/apache/spark/util/ScalaCompilerUtil.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import scala.tools.reflect.ToolBox
+
+/**
+ * It compiles the code dynamically at runtime and returns the object
+ */
+object ScalaCompilerUtil {
+
+  def compiledCode(code: String): Any = {
+    import scala.reflect.runtime.universe._
+    val cm = runtimeMirror(Utils.getContextOrSparkClassLoader)
+    val toolbox = cm.mkToolBox()
+
+    val tree = toolbox.parse(code)
+    toolbox.compile(tree)()
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce6a1305/integration/spark/src/main/scala/org/carbondata/spark/CarbonFilters.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/CarbonFilters.scala b/integration/spark/src/main/scala/org/carbondata/spark/CarbonFilters.scala
index e02bc7f..a7a8313 100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/CarbonFilters.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/CarbonFilters.scala
@@ -18,13 +18,11 @@
 package org.carbondata.spark
 
 import scala.collection.JavaConverters._
-import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 
-import org.apache.spark.sql.sources
-import org.apache.spark.sql.FakeCarbonCast
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.optimizer.CarbonAliasDecoderRelation
+import org.apache.spark.sql.sources
 import org.apache.spark.sql.types.StructType
 
 import org.carbondata.core.carbon.metadata.datatype.DataType
@@ -125,7 +123,7 @@ object CarbonFilters {
 
         case EqualTo(a: Attribute, Literal(v, t)) =>
           Some(sources.EqualTo(a.name, v))
-        case EqualTo(FakeCarbonCast(l@Literal(v, t), b), a: Attribute) =>
+        case EqualTo(l@Literal(v, t), a: Attribute) =>
           Some(sources.EqualTo(a.name, v))
         case EqualTo(Cast(a: Attribute, _), Literal(v, t)) =>
           Some(sources.EqualTo(a.name, v))
@@ -191,35 +189,35 @@ object CarbonFilters {
           (transformExpression(left) ++ transformExpression(right)).reduceOption(new
               AndExpression(_, _))
 
-        case EqualTo(a: Attribute, FakeCarbonCast(l@Literal(v, t), b)) => new
+        case EqualTo(a: Attribute, l@Literal(v, t)) => new
             Some(new EqualToExpression(transformExpression(a).get, transformExpression(l).get))
-        case EqualTo(FakeCarbonCast(l@Literal(v, t), b), a: Attribute) => new
+        case EqualTo(l@Literal(v, t), a: Attribute) => new
             Some(new EqualToExpression(transformExpression(a).get, transformExpression(l).get))
-        case EqualTo(Cast(a: Attribute, _), FakeCarbonCast(l@Literal(v, t), b)) => new
+        case EqualTo(Cast(a: Attribute, _), l@Literal(v, t)) => new
             Some(new EqualToExpression(transformExpression(a).get, transformExpression(l).get))
-        case EqualTo(FakeCarbonCast(l@Literal(v, t), b), Cast(a: Attribute, _)) => new
+        case EqualTo(l@Literal(v, t), Cast(a: Attribute, _)) => new
             Some(new EqualToExpression(transformExpression(a).get, transformExpression(l).get))
 
-        case Not(EqualTo(a: Attribute, FakeCarbonCast(l@Literal(v, t), b))) => new
+        case Not(EqualTo(a: Attribute, l@Literal(v, t))) => new
             Some(new NotEqualsExpression(transformExpression(a).get, transformExpression(l).get))
-        case Not(EqualTo(FakeCarbonCast(l@Literal(v, t), b), a: Attribute)) => new
+        case Not(EqualTo(l@Literal(v, t), a: Attribute)) => new
             Some(new NotEqualsExpression(transformExpression(a).get, transformExpression(l).get))
-        case Not(EqualTo(Cast(a: Attribute, _), FakeCarbonCast(l@Literal(v, t), b))) => new
+        case Not(EqualTo(Cast(a: Attribute, _), l@Literal(v, t))) => new
             Some(new NotEqualsExpression(transformExpression(a).get, transformExpression(l).get))
-        case Not(EqualTo(FakeCarbonCast(l@Literal(v, t), b), Cast(a: Attribute, _))) => new
+        case Not(EqualTo(l@Literal(v, t), Cast(a: Attribute, _))) => new
             Some(new NotEqualsExpression(transformExpression(a).get, transformExpression(l).get))
 
-        case Not(In(a: Attribute, list)) if !list.exists(!_.isInstanceOf[FakeCarbonCast]) =>
+        case Not(In(a: Attribute, list)) if !list.exists(!_.isInstanceOf[Literal]) =>
           Some(new NotInExpression(transformExpression(a).get,
             new ListExpression(list.map(transformExpression(_).get).asJava)))
-        case In(a: Attribute, list) if !list.exists(!_.isInstanceOf[FakeCarbonCast]) =>
+        case In(a: Attribute, list) if !list.exists(!_.isInstanceOf[Literal]) =>
           Some(new InExpression(transformExpression(a).get,
             new ListExpression(list.map(transformExpression(_).get).asJava)))
         case Not(In(Cast(a: Attribute, _), list))
-          if !list.exists(!_.isInstanceOf[FakeCarbonCast]) =>
+          if !list.exists(!_.isInstanceOf[Literal]) =>
           Some(new NotInExpression(transformExpression(a).get,
             new ListExpression(list.map(transformExpression(_).get).asJava)))
-        case In(Cast(a: Attribute, _), list) if !list.exists(!_.isInstanceOf[FakeCarbonCast]) =>
+        case In(Cast(a: Attribute, _), list) if !list.exists(!_.isInstanceOf[Literal]) =>
           Some(new InExpression(transformExpression(a).get,
             new ListExpression(list.map(transformExpression(_).get).asJava)))
 
@@ -227,7 +225,6 @@ object CarbonFilters {
           Some(new CarbonColumnExpression(name,
             CarbonScalaUtil.convertSparkToCarbonDataType(
               getActualCarbonDataType(name, carbonTable))))
-        case FakeCarbonCast(literal, dataType) => transformExpression(literal)
         case Literal(name, dataType) => Some(new
             CarbonLiteralExpression(name, CarbonScalaUtil.convertSparkToCarbonDataType(dataType)))
         case Cast(left, right) if !left.isInstanceOf[Literal] => transformExpression(left)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce6a1305/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index d5f5e33..61609b5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -114,16 +114,16 @@
       </properties>
     </profile>
     <profile>
-      <id>spark-1.5.2</id>
+      <id>spark-1.5</id>
       <!-- default -->
       <properties>
         <spark.version>1.5.2</spark.version>
       </properties>
     </profile>
     <profile>
-      <id>spark-1.6.1</id>
+      <id>spark-1.6</id>
       <properties>
-        <spark.version>1.6.1</spark.version>
+        <spark.version>1.6.2</spark.version>
       </properties>
     </profile>
     <profile>


[2/2] incubator-carbondata git commit: [CARBONDATA-50]Added Spark 1.6.2 support in Carbon. And also supports all versions of Spark 1.5 and 1.6 This closes #35

Posted by ch...@apache.org.
[CARBONDATA-50]Added Spark 1.6.2 support in Carbon. And also supports all versions of Spark 1.5 and 1.6 This closes #35


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/6948cb05
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/6948cb05
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/6948cb05

Branch: refs/heads/master
Commit: 6948cb05ddc2736ea3d82ecdd3647f6c40896b02
Parents: 66f1e19 ce6a130
Author: chenliang613 <ch...@apache.org>
Authored: Mon Jul 18 22:31:44 2016 +0800
Committer: chenliang613 <ch...@apache.org>
Committed: Mon Jul 18 22:31:44 2016 +0800

----------------------------------------------------------------------
 .../spark/sql/CarbonCatalystOperators.scala     |  14 -
 .../org/apache/spark/sql/CarbonContext.scala    |  11 +-
 .../apache/spark/sql/CodeGenerateFactory.scala  | 155 ++++
 .../spark/sql/hive/CarbonStrategies.scala       |  23 +-
 .../spark/sql/optimizer/CarbonOptimizer.scala   | 736 ++++++++++---------
 .../apache/spark/util/ScalaCompilerUtil.scala   |  35 +
 .../org/carbondata/spark/CarbonFilters.scala    |  31 +-
 pom.xml                                         |   6 +-
 8 files changed, 609 insertions(+), 402 deletions(-)
----------------------------------------------------------------------