You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2016/12/16 02:38:08 UTC

[3/4] incubator-carbondata git commit: fixUnionIssue and add test case

fixUnionIssue and add test case


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/462f6422
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/462f6422
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/462f6422

Branch: refs/heads/master
Commit: 462f64226428fc255938d8752226cda262ad0ae4
Parents: 526243b
Author: QiangCai <qi...@qq.com>
Authored: Thu Dec 8 19:06:33 2016 +0800
Committer: jackylk <ja...@huawei.com>
Committed: Fri Dec 16 10:13:39 2016 +0800

----------------------------------------------------------------------
 .../impl/DictionaryBasedResultCollector.java    |   29 +-
 .../DictionaryBasedResultCollectorTest.java     |    9 +-
 .../carbondata/examples/CarbonExample.scala     |   12 +
 .../apache/carbondata/spark/CarbonFilters.scala |    7 +
 .../CarbonDecoderOptimizerHelper.scala          |   24 +-
 .../readsupport/SparkRowReadSupportImpl.java    |    5 +-
 .../apache/carbondata/spark/CarbonFilters.scala |    6 +
 .../spark/sql/CarbonDataFrameWriter.scala       |    3 +-
 .../sql/CarbonDatasourceHadoopRelation.scala    |    4 +-
 .../spark/sql/CarbonDictionaryDecoder.scala     |   60 +-
 .../scala/org/apache/spark/sql/CarbonScan.scala |   44 +-
 .../org/apache/spark/sql/CarbonSource.scala     |    3 +-
 .../sql/optimizer/CarbonLateDecodeRule.scala    |  124 +-
 integration/spark2/src/test/resources/data.csv  |   11 +
 .../AllDataTypesTestCaseAggregate.scala         | 1161 ++++++++++++++++++
 .../spark/sql/common/util/CarbonFunSuite.scala  |   49 +
 .../sql/common/util/CarbonSessionTest.scala     |   74 ++
 .../apache/spark/sql/common/util/PlanTest.scala |   59 +
 .../spark/sql/common/util/QueryTest.scala       |  149 +++
 19 files changed, 1654 insertions(+), 179 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/462f6422/core/src/main/java/org/apache/carbondata/scan/collector/impl/DictionaryBasedResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/collector/impl/DictionaryBasedResultCollector.java b/core/src/main/java/org/apache/carbondata/scan/collector/impl/DictionaryBasedResultCollector.java
index 108677f..2462caa 100644
--- a/core/src/main/java/org/apache/carbondata/scan/collector/impl/DictionaryBasedResultCollector.java
+++ b/core/src/main/java/org/apache/carbondata/scan/collector/impl/DictionaryBasedResultCollector.java
@@ -20,11 +20,13 @@ package org.apache.carbondata.scan.collector.impl;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.carbon.metadata.encoder.Encoding;
 import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
 import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
 import org.apache.carbondata.core.util.CarbonUtil;
@@ -35,6 +37,7 @@ import org.apache.carbondata.scan.model.QueryDimension;
 import org.apache.carbondata.scan.model.QueryMeasure;
 import org.apache.carbondata.scan.result.AbstractScannedResult;
 
+import org.apache.commons.lang3.ArrayUtils;
 /**
  * It is not a collector it is just a scanned result holder.
  */
@@ -52,9 +55,31 @@ public class DictionaryBasedResultCollector extends AbstractScannedResultCollect
    * it will keep track of how many record is processed, to handle limit scenario
    */
   @Override public List<Object[]> collectData(AbstractScannedResult scannedResult, int batchSize) {
+
     List<Object[]> listBasedResult = new ArrayList<>(batchSize);
     boolean isMsrsPresent = measureDatatypes.length > 0;
+
     QueryDimension[] queryDimensions = tableBlockExecutionInfos.getQueryDimensions();
+    List<Integer> dictionaryIndexes = new ArrayList<Integer>();
+    for (int i = 0; i < queryDimensions.length; i++) {
+      if(queryDimensions[i].getDimension().hasEncoding(Encoding.DICTIONARY) ||
+          queryDimensions[i].getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY) ) {
+        dictionaryIndexes.add(queryDimensions[i].getDimension().getOrdinal());
+      }
+    }
+    int[] primitive = ArrayUtils.toPrimitive(dictionaryIndexes.toArray(
+        new Integer[dictionaryIndexes.size()]));
+    Arrays.sort(primitive);
+    int[] actualIndexInSurrogateKey = new int[dictionaryIndexes.size()];
+    int index = 0;
+    for (int i = 0; i < queryDimensions.length; i++) {
+      if(queryDimensions[i].getDimension().hasEncoding(Encoding.DICTIONARY) ||
+          queryDimensions[i].getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY) ) {
+        actualIndexInSurrogateKey[index++] = Arrays.binarySearch(primitive,
+            queryDimensions[i].getDimension().getOrdinal());
+      }
+    }
+
     QueryMeasure[] queryMeasures = tableBlockExecutionInfos.getQueryMeasures();
     Map<Integer, GenericQueryType> comlexDimensionInfoMap =
         tableBlockExecutionInfos.getComlexDimensionInfoMap();
@@ -99,7 +124,7 @@ public class DictionaryBasedResultCollector extends AbstractScannedResultCollect
                     .getDirectDictionaryGenerator(queryDimensions[i].getDimension().getDataType());
             if (directDictionaryGenerator != null) {
               row[order[i]] = directDictionaryGenerator.getValueFromSurrogate(
-                  surrogateResult[dictionaryColumnIndex++]);
+                  surrogateResult[actualIndexInSurrogateKey[dictionaryColumnIndex++]]);
             }
           } else if (complexDataTypeArray[i]) {
             row[order[i]] = comlexDimensionInfoMap
@@ -107,7 +132,7 @@ public class DictionaryBasedResultCollector extends AbstractScannedResultCollect
                 .getDataBasedOnDataTypeFromSurrogates(
                     ByteBuffer.wrap(complexTypeKeyArray[complexTypeColumnIndex++]));
           } else {
-            row[order[i]] = surrogateResult[dictionaryColumnIndex++];
+            row[order[i]] = surrogateResult[actualIndexInSurrogateKey[dictionaryColumnIndex++]];
           }
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/462f6422/core/src/test/java/org/apache/carbondata/scan/collector/impl/DictionaryBasedResultCollectorTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/scan/collector/impl/DictionaryBasedResultCollectorTest.java b/core/src/test/java/org/apache/carbondata/scan/collector/impl/DictionaryBasedResultCollectorTest.java
index 7f21d90..48eab1e 100644
--- a/core/src/test/java/org/apache/carbondata/scan/collector/impl/DictionaryBasedResultCollectorTest.java
+++ b/core/src/test/java/org/apache/carbondata/scan/collector/impl/DictionaryBasedResultCollectorTest.java
@@ -19,14 +19,12 @@
 package org.apache.carbondata.scan.collector.impl;
 
 import java.nio.ByteBuffer;
-import java.util.BitSet;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 import org.apache.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk;
 import org.apache.carbondata.core.carbon.metadata.blocklet.datachunk.PresenceMeta;
 import org.apache.carbondata.core.carbon.metadata.datatype.DataType;
+import org.apache.carbondata.core.carbon.metadata.encoder.Encoding;
 import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.carbon.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
@@ -74,6 +72,9 @@ public class DictionaryBasedResultCollectorTest {
     QueryDimension queryDimension1 = new QueryDimension("QDCol1");
     queryDimension1.setQueryOrder(1);
     ColumnSchema columnSchema = new ColumnSchema();
+    List encodeList= new ArrayList<Encoding>();
+    encodeList.add(Encoding.DICTIONARY);
+    columnSchema.setEncodingList(encodeList);
     queryDimension1.setDimension(new CarbonDimension(columnSchema, 0, 0, 0, 0));
     QueryDimension queryDimension2 = new QueryDimension("QDCol2");
     queryDimension2.setQueryOrder(2);

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/462f6422/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
index 4aff45a..c2e135a 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
@@ -153,6 +153,18 @@ object CarbonExample {
         |where t1.stringField = t2.stringField
       """.stripMargin).show
 
+    spark.sql(
+      """
+        |with t1 as (
+        |select * from carbon_table
+        |union all
+        |select * from carbon_table
+        |)
+        |select t1.*, t2.*
+        |from t1, carbon_table t2
+        |where t1.stringField = t2.stringField
+      """.stripMargin).show
+
     // Drop table
     spark.sql("DROP TABLE IF EXISTS carbon_table")
     spark.sql("DROP TABLE IF EXISTS csv_table")

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/462f6422/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala
index 2a580dc..5e58235 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala
@@ -83,6 +83,13 @@ object CarbonFilters {
             new ListExpression(
               convertToJavaList(values.map(f => getCarbonLiteralExpression(name, f)).toList))))
 
+        case sources.IsNull(name) =>
+          Some(new EqualToExpression(getCarbonExpression(name),
+            getCarbonLiteralExpression(name, null), true))
+        case sources.IsNotNull(name) =>
+          Some(new NotEqualsExpression(getCarbonExpression(name),
+            getCarbonLiteralExpression(name, null), true))
+
         case sources.And(lhs, rhs) =>
           (createFilter(lhs) ++ createFilter(rhs)).reduceOption(new AndExpression(_, _))
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/462f6422/integration/spark-common/src/main/scala/org/apache/spark/sql/optimizer/CarbonDecoderOptimizerHelper.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/optimizer/CarbonDecoderOptimizerHelper.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/optimizer/CarbonDecoderOptimizerHelper.scala
index 7909a13..6e84e7e 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/optimizer/CarbonDecoderOptimizerHelper.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/optimizer/CarbonDecoderOptimizerHelper.scala
@@ -29,7 +29,7 @@ abstract class AbstractNode
 
 case class Node(cd: CarbonDictionaryTempDecoder) extends AbstractNode
 
-case class BinaryCarbonNode(left: util.List[AbstractNode], right: util.List[AbstractNode])
+case class ArrayCarbonNode(children: Seq[util.List[AbstractNode]])
   extends AbstractNode
 
 case class CarbonDictionaryTempDecoder(
@@ -70,9 +70,16 @@ class CarbonDecoderProcessor {
       case j: BinaryNode =>
         val leftList = new util.ArrayList[AbstractNode]
         val rightList = new util.ArrayList[AbstractNode]
-        nodeList.add(BinaryCarbonNode(leftList, rightList))
+        nodeList.add(ArrayCarbonNode(Seq(leftList, rightList)))
         process(j.left, leftList)
         process(j.right, rightList)
+      case u: Union =>
+        val nodeListSeq = u.children.map { child =>
+          val list = new util.ArrayList[AbstractNode]
+          process(child, list)
+          list
+        }
+        nodeList.add(ArrayCarbonNode(nodeListSeq))
       case e: UnaryNode => process(e.child, nodeList)
       case _ =>
     }
@@ -91,13 +98,12 @@ class CarbonDecoderProcessor {
         decoderNotDecode.asScala.foreach(cd.attrsNotDecode.add)
         decoderNotDecode.asScala.foreach(cd.attrList.remove)
         decoderNotDecode.addAll(cd.attrList)
-      case BinaryCarbonNode(left: util.List[AbstractNode], right: util.List[AbstractNode]) =>
-        val leftNotDecode = new util.HashSet[AttributeReferenceWrapper]
-        val rightNotDecode = new util.HashSet[AttributeReferenceWrapper]
-        updateDecoderInternal(left.asScala, leftNotDecode)
-        updateDecoderInternal(right.asScala, rightNotDecode)
-        decoderNotDecode.addAll(leftNotDecode)
-        decoderNotDecode.addAll(rightNotDecode)
+      case ArrayCarbonNode(children) =>
+        children.foreach { child =>
+          val notDecode = new util.HashSet[AttributeReferenceWrapper]
+          updateDecoderInternal(child.asScala, notDecode)
+          decoderNotDecode.addAll(notDecode)
+        }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/462f6422/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java b/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
index 1bfcdea..499ef0c 100644
--- a/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
+++ b/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
@@ -39,7 +39,10 @@ public class SparkRowReadSupportImpl extends AbstractDictionaryDecodedReadSuppor
 
   @Override public Row readRow(Object[] data) {
     for (int i = 0; i < dictionaries.length; i++) {
-      if (dictionaries[i] == null && data[i] != null) {
+      if (data[i] == null) {
+        continue;
+      }
+      if (dictionaries[i] == null) {
         if (carbonColumns[i].hasEncoding(Encoding.DIRECT_DICTIONARY)) {
           //convert the long to timestamp in case of direct dictionary column
           if (DataType.TIMESTAMP == carbonColumns[i].getDataType()) {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/462f6422/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala
index 2cd4eb7..6d9fb24 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala
@@ -83,6 +83,12 @@ object CarbonFilters {
             new ListExpression(
               convertToJavaList(values.map(f => getCarbonLiteralExpression(name, f)).toList))))
 
+        case sources.IsNull(name) =>
+          Some(new EqualToExpression(getCarbonExpression(name),
+            getCarbonLiteralExpression(name, null), true))
+        case sources.IsNotNull(name) =>
+          Some(new NotEqualsExpression(getCarbonExpression(name),
+            getCarbonLiteralExpression(name, null), true))
         case sources.And(lhs, rhs) =>
           (createFilter(lhs) ++ createFilter(rhs)).reduceOption(new AndExpression(_, _))
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/462f6422/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
index 5db5d14..3057bee 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
@@ -146,7 +146,8 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame) {
     s"""
           CREATE TABLE IF NOT EXISTS ${options.dbName}.${options.tableName}
           (${ carbonSchema.mkString(", ") })
-          using 'org.apache.spark.sql.CarbonRelationProvider'
+          using org.apache.spark.sql.CarbonSource
+          OPTIONS('dbName'='${options.dbName}', 'tableName'='${options.tableName}')
       """
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/462f6422/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index 997106c..09a58ba 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -21,9 +21,9 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.mapreduce.Job
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.execution.command.{ExecutedCommandExec, LoadTableByInsert}
+import org.apache.spark.sql.execution.command.LoadTableByInsert
 import org.apache.spark.sql.hive.CarbonRelation
-import org.apache.spark.sql.sources.{BaseRelation, Filter, InsertableRelation, PrunedFilteredScan}
+import org.apache.spark.sql.sources.{BaseRelation, Filter, InsertableRelation}
 import org.apache.spark.sql.types.StructType
 
 import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/462f6422/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index 3b63021..9a625ee 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -303,41 +303,41 @@ class CarbonDecoderRDD(
       (carbonTable.getFactTableName, carbonTable.getAbsoluteTableIdentifier)
     }.toMap
 
-      val cacheProvider: CacheProvider = CacheProvider.getInstance
-      val forwardDictionaryCache: Cache[DictionaryColumnUniqueIdentifier, Dictionary] =
-        cacheProvider.createCache(CacheType.FORWARD_DICTIONARY, storepath)
-      val dicts: Seq[Dictionary] = getDictionary(absoluteTableIdentifiers,
-        forwardDictionaryCache)
-      val dictIndex = dicts.zipWithIndex.filter(x => x._1 != null).map(x => x._2)
-      // add a task completion listener to clear dictionary that is a decisive factor for
-      // LRU eviction policy
-      val dictionaryTaskCleaner = TaskContext.get
-      dictionaryTaskCleaner.addTaskCompletionListener(context =>
-        dicts.foreach { dictionary =>
-          if (null != dictionary) {
-            dictionary.clear
-          }
+    val cacheProvider: CacheProvider = CacheProvider.getInstance
+    val forwardDictionaryCache: Cache[DictionaryColumnUniqueIdentifier, Dictionary] =
+      cacheProvider.createCache(CacheType.FORWARD_DICTIONARY, storepath)
+    val dicts: Seq[Dictionary] = getDictionary(absoluteTableIdentifiers,
+      forwardDictionaryCache)
+    val dictIndex = dicts.zipWithIndex.filter(x => x._1 != null).map(x => x._2)
+    // add a task completion listener to clear dictionary that is a decisive factor for
+    // LRU eviction policy
+    val dictionaryTaskCleaner = TaskContext.get
+    dictionaryTaskCleaner.addTaskCompletionListener(context =>
+      dicts.foreach { dictionary =>
+        if (null != dictionary) {
+          dictionary.clear
         }
-      )
-      val iter = firstParent[Row].iterator(split, context)
-      new Iterator[Row] {
-        var flag = true
-        var total = 0L
-        override final def hasNext: Boolean = iter.hasNext
+      }
+    )
+    val iter = firstParent[Row].iterator(split, context)
+    new Iterator[Row] {
+      var flag = true
+      var total = 0L
+      override final def hasNext: Boolean = iter.hasNext
 
-        override final def next(): Row = {
-          val startTime = System.currentTimeMillis()
-          val data = iter.next().asInstanceOf[GenericRow].toSeq.toArray
-          dictIndex.foreach { index =>
-            if ( data(index) != null) {
-              data(index) = DataTypeUtil.getDataBasedOnDataType(dicts(index)
-                  .getDictionaryValueForKey(data(index).asInstanceOf[Int]),
-                getDictionaryColumnIds(index)._3)
-            }
+      override final def next(): Row = {
+        val startTime = System.currentTimeMillis()
+        val data = iter.next().asInstanceOf[GenericRow].toSeq.toArray
+        dictIndex.foreach { index =>
+          if ( data(index) != null) {
+            data(index) = DataTypeUtil.getDataBasedOnDataType(dicts(index)
+                .getDictionaryValueForKey(data(index).asInstanceOf[Int]),
+              getDictionaryColumnIds(index)._3)
           }
-          new GenericRow(data)
         }
+        new GenericRow(data)
       }
+    }
   }
 
   private def isRequiredToDecode = {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/462f6422/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonScan.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonScan.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonScan.scala
index 9e42b44..19c3c9c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonScan.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonScan.scala
@@ -78,39 +78,27 @@ case class CarbonScan(
       attributesRaw = attributeOut
     }
 
-    val dimensions = carbonTable.getDimensionByTableName(carbonTable.getFactTableName)
-    val measures = carbonTable.getMeasureByTableName(carbonTable.getFactTableName)
-    val dimAttr = new Array[Attribute](dimensions.size())
-    val msrAttr = new Array[Attribute](measures.size())
+    val columns = carbonTable.getCreateOrderColumn(carbonTable.getFactTableName)
+    val colAttr = new Array[Attribute](columns.size())
     attributesRaw.foreach { attr =>
-      val carbonDimension =
-        carbonTable.getDimensionByName(carbonTable.getFactTableName, attr.name)
-      if(carbonDimension != null) {
-        dimAttr(dimensions.indexOf(carbonDimension)) = attr
-      } else {
-        val carbonMeasure =
-          carbonTable.getMeasureByName(carbonTable.getFactTableName, attr.name)
-        if(carbonMeasure != null) {
-          msrAttr(measures.indexOf(carbonMeasure)) = attr
-        }
-      }
+    val column =
+        carbonTable.getColumnByName(carbonTable.getFactTableName, attr.name)
+      if(column != null) {
+        colAttr(columns.indexOf(column)) = attr
+       }
     }
-
-    attributesRaw = dimAttr.filter(f => f != null) ++ msrAttr.filter(f => f != null)
+    attributesRaw = colAttr.filter(f => f != null)
 
     var queryOrder: Integer = 0
     attributesRaw.foreach { attr =>
-      val carbonDimension =
-        carbonTable.getDimensionByName(carbonTable.getFactTableName, attr.name)
-      if (carbonDimension != null) {
-        val dim = new QueryDimension(attr.name)
-        dim.setQueryOrder(queryOrder)
-        queryOrder = queryOrder + 1
-        selectedDims += dim
-      } else {
-        val carbonMeasure =
-          carbonTable.getMeasureByName(carbonTable.getFactTableName, attr.name)
-        if (carbonMeasure != null) {
+      val carbonColumn = carbonTable.getColumnByName(carbonTable.getFactTableName, attr.name)
+      if (carbonColumn != null) {
+        if (carbonColumn.isDimesion()) {
+          val dim = new QueryDimension(attr.name)
+          dim.setQueryOrder(queryOrder)
+          queryOrder = queryOrder + 1
+          selectedDims += dim
+         } else {
           val m1 = new QueryMeasure(attr.name)
           m1.setQueryOrder(queryOrder)
           queryOrder = queryOrder + 1

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/462f6422/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index 2ba8a03..b639ea8 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -30,6 +30,7 @@ import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.{DecimalType, StructType}
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.spark.CarbonOption
 
 /**
@@ -54,7 +55,7 @@ class CarbonSource extends CreatableRelationProvider
         "the path to store carbon file is the 'storePath' specified when creating CarbonContext")
 
     val options = new CarbonOption(parameters)
-    val storePath = sqlContext.sparkSession.conf.get(CarbonCommonConstants.STORE_LOCATION)
+    val storePath = CarbonProperties.getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION)
     val tablePath = new Path(storePath + "/" + options.dbName + "/" + options.tableName)
     val isExists = tablePath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
         .exists(tablePath)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/462f6422/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
index fb9df70..b3a7d5a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
@@ -165,34 +165,27 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper {
         case union: Union
           if !(union.children(0).isInstanceOf[CarbonDictionaryTempDecoder] ||
             union.children(1).isInstanceOf[CarbonDictionaryTempDecoder]) =>
-          val leftCondAttrs = new util.HashSet[AttributeReferenceWrapper]
-          val rightCondAttrs = new util.HashSet[AttributeReferenceWrapper]
-          union.children(0).output.foreach(attr =>
-            leftCondAttrs.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr))))
-          union.children(1).output.foreach(attr =>
-            rightCondAttrs.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr))))
-          var leftPlan = union.children(0)
-          var rightPlan = union.children(1)
-          if (hasCarbonRelation(leftPlan) && leftCondAttrs.size() > 0 &&
-            !leftPlan.isInstanceOf[CarbonDictionaryCatalystDecoder]) {
-            leftPlan = CarbonDictionaryTempDecoder(leftCondAttrs,
-              new util.HashSet[AttributeReferenceWrapper](),
-              union.children(0))
-          }
-          if (hasCarbonRelation(rightPlan) && rightCondAttrs.size() > 0 &&
-            !rightPlan.isInstanceOf[CarbonDictionaryCatalystDecoder]) {
-            rightPlan = CarbonDictionaryTempDecoder(rightCondAttrs,
-              new util.HashSet[AttributeReferenceWrapper](),
-              union.children(1))
+          val children = union.children.map { child =>
+            val condAttrs = new util.HashSet[AttributeReferenceWrapper]
+            child.output.foreach(attr =>
+              condAttrs.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr))))
+            if (hasCarbonRelation(child) && condAttrs.size() > 0 &&
+              !child.isInstanceOf[CarbonDictionaryCatalystDecoder]) {
+              CarbonDictionaryTempDecoder(condAttrs,
+                new util.HashSet[AttributeReferenceWrapper](),
+                union.children(0))
+            } else {
+              child
+            }
           }
           if (!decoder) {
             decoder = true
             CarbonDictionaryTempDecoder(new util.HashSet[AttributeReferenceWrapper](),
               new util.HashSet[AttributeReferenceWrapper](),
-              Union(leftPlan, rightPlan),
+              Union(children),
               isOuter = true)
           } else {
-            Union(leftPlan, rightPlan)
+            Union(children)
           }
         case agg: Aggregate if !agg.child.isInstanceOf[CarbonDictionaryTempDecoder] =>
           val attrsOndimAggs = new util.HashSet[AttributeReferenceWrapper]
@@ -487,68 +480,32 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper {
       case cd: CarbonDictionaryCatalystDecoder =>
         cd
       case sort: Sort =>
-        val tmpAttrMap = new mutable.HashMap[AttributeReferenceWrapper, Attribute]()
-        if (sort.child.isInstanceOf[CarbonDictionaryTempDecoder]) {
-          val tempDecoder = sort.child.asInstanceOf[CarbonDictionaryTempDecoder]
-          tempDecoder.attrList.asScala.foreach{attr => tmpAttrMap.put(attr, attr.attr)}
-        }
         val sortExprs = sort.order.map { s =>
           s.transform {
             case attr: AttributeReference =>
-              val tempAttr = tmpAttrMap.get(AttributeReferenceWrapper(attr))
-              if(tempAttr.isDefined) {
-                tempAttr.get
-              } else {
-                updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
-              }
+              updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
           }.asInstanceOf[SortOrder]
         }
         Sort(sortExprs, sort.global, sort.child)
       case agg: Aggregate if !agg.child.isInstanceOf[CarbonDictionaryCatalystDecoder] =>
-        val tmpAttrMap = new mutable.HashMap[AttributeReferenceWrapper, Attribute]()
-        if (agg.child.isInstanceOf[CarbonDictionaryTempDecoder]) {
-          val tempDecoder = agg.child.asInstanceOf[CarbonDictionaryTempDecoder]
-          tempDecoder.attrList.asScala.foreach{attr => tmpAttrMap.put(attr, attr.attr)}
-        }
-
         val aggExps = agg.aggregateExpressions.map { aggExp =>
           aggExp.transform {
             case attr: AttributeReference =>
-              val tempAttr = tmpAttrMap.get(AttributeReferenceWrapper(attr))
-              if(tempAttr.isDefined) {
-                tempAttr.get
-              } else {
-                updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
-              }
+              updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
           }
         }.asInstanceOf[Seq[NamedExpression]]
 
         val grpExps = agg.groupingExpressions.map { gexp =>
           gexp.transform {
             case attr: AttributeReference =>
-              val tempAttr = tmpAttrMap.get(AttributeReferenceWrapper(attr))
-              if(tempAttr.isDefined) {
-                tempAttr.get
-              } else {
-                updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
-              }
+              updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
           }
         }
         Aggregate(grpExps, aggExps, agg.child)
       case expand: Expand =>
-        val tmpAttrMap = new mutable.HashMap[AttributeReferenceWrapper, Attribute]()
-        if (expand.child.isInstanceOf[CarbonDictionaryTempDecoder]) {
-          val tempDecoder = expand.child.asInstanceOf[CarbonDictionaryTempDecoder]
-          tempDecoder.attrList.asScala.foreach{attr => tmpAttrMap.put(attr, attr.attr)}
-        }
         expand.transformExpressions {
           case attr: AttributeReference =>
-            val tempAttr = tmpAttrMap.get(AttributeReferenceWrapper(attr))
-            if(tempAttr.isDefined) {
-              tempAttr.get
-            } else {
-              updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
-            }
+            updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
         }
       case filter: Filter =>
         filter
@@ -559,71 +516,36 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper {
         marker.pushBinaryMarker(allAttrsNotDecode)
         u
       case p: Project if relations.nonEmpty =>
-        val tmpAttrMap = new mutable.HashMap[AttributeReferenceWrapper, Attribute]()
-        if (p.child.isInstanceOf[CarbonDictionaryTempDecoder]) {
-          val tempDecoder = p.child.asInstanceOf[CarbonDictionaryTempDecoder]
-          tempDecoder.attrList.asScala.foreach{attr => tmpAttrMap.put(attr, attr.attr)}
-        }
         val prExps = p.projectList.map { prExp =>
           prExp.transform {
             case attr: AttributeReference =>
-              val tempAttr = tmpAttrMap.get(AttributeReferenceWrapper(attr))
-              if(tempAttr.isDefined) {
-                tempAttr.get
-              } else {
-                updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
-              }
+              updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
           }
         }.asInstanceOf[Seq[NamedExpression]]
         Project(prExps, p.child)
       case wd: Window if relations.nonEmpty =>
-        val tmpAttrMap = new mutable.HashMap[AttributeReferenceWrapper, Attribute]()
-        if (wd.child.isInstanceOf[CarbonDictionaryTempDecoder]) {
-          val tempDecoder = wd.child.asInstanceOf[CarbonDictionaryTempDecoder]
-          tempDecoder.attrList.asScala.foreach{attr => tmpAttrMap.put(attr, attr.attr)}
-        }
         val prExps = wd.output.map { prExp =>
           prExp.transform {
             case attr: AttributeReference =>
-              val tempAttr = tmpAttrMap.get(AttributeReferenceWrapper(attr))
-              if(tempAttr.isDefined) {
-                tempAttr.get
-              } else {
-                updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
-              }
+              updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
           }
         }.asInstanceOf[Seq[Attribute]]
         val wdExps = wd.windowExpressions.map { gexp =>
           gexp.transform {
             case attr: AttributeReference =>
-              val tempAttr = tmpAttrMap.get(AttributeReferenceWrapper(attr))
-              if(tempAttr.isDefined) {
-                tempAttr.get
-              } else {
-                updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
-              }
+              updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
           }
         }.asInstanceOf[Seq[NamedExpression]]
         val partitionSpec = wd.partitionSpec.map{ exp =>
           exp.transform {
             case attr: AttributeReference =>
-              val tempAttr = tmpAttrMap.get(AttributeReferenceWrapper(attr))
-              if(tempAttr.isDefined) {
-                tempAttr.get
-              } else {
-                updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
-              }
+              updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
           }
         }
         val orderSpec = wd.orderSpec.map { exp =>
           exp.transform {
             case attr: AttributeReference =>
-              val tempAttr = tmpAttrMap.get(AttributeReferenceWrapper(attr))
-              if(tempAttr.isDefined) {
-                tempAttr.get
-              } else {
-                updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
-              }
+              updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
           }
         }.asInstanceOf[Seq[SortOrder]]
         Window(wdExps, partitionSpec, orderSpec, wd.child)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/462f6422/integration/spark2/src/test/resources/data.csv
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/resources/data.csv b/integration/spark2/src/test/resources/data.csv
new file mode 100644
index 0000000..4ff67da
--- /dev/null
+++ b/integration/spark2/src/test/resources/data.csv
@@ -0,0 +1,11 @@
+empno,empname,designation,doj,workgroupcategory,workgroupcategoryname,deptno,deptname,projectcode,projectjoindate,projectenddate,attendance,utilization,salary
+11,arvind,SE,17-01-2007,1,developer,10,network,928478,17-02-2007,29-11-2016,96,96.2,5040.56
+12,krithin,SSE,29-05-2008,1,developer,11,protocol,928378,29-06-2008,30-12-2016,85,95.1,7124.21
+13,madhan,TPL,07-07-2009,2,tester,10,network,928478,07-08-2009,30-12-2016,88,99,9054.235
+14,anandh,SA,29-12-2010,3,manager,11,protocol,928278,29-01-2011,29-06-2016,77,92.2,11248.25
+15,ayushi,SSA,09-11-2011,1,developer,12,security,928375,09-12-2011,29-05-2016,99,91.5,13245.48
+16,pramod,SE,14-10-2012,1,developer,13,configManagement,928478,14-11-2012,29-12-2016,86,93,5040.56
+17,gawrav,PL,22-09-2013,2,tester,12,security,928778,22-10-2013,15-11-2016,78,97.45,9574.24
+18,sibi,TL,15-08-2014,2,tester,14,Learning,928176,15-09-2014,29-05-2016,84,98.23,7245.25
+19,shivani,PL,12-05-2015,1,developer,10,network,928977,12-06-2015,12-11-2016,88,91.678,11254.24
+20,bill,PM,01-12-2015,3,manager,14,Learning,928479,01-01-2016,30-11-2016,75,94.22,13547.25