You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ma...@apache.org on 2018/09/07 14:43:44 UTC

[1/2] carbondata git commit: [CARBONDATA-2910] Support backward compatability in fileformat and added tests for load with different sort orders

Repository: carbondata
Updated Branches:
  refs/heads/master b6bd90d80 -> 3894e1d05


http://git-wip-us.apache.org/repos/asf/carbondata/blob/3894e1d0/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
index 837bc4f..dcc76d8 100644
--- a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
+++ b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
@@ -17,6 +17,11 @@
 package org.apache.spark.sql.carbondata.datasource
 
 
+import java.io.File
+import java.util
+
+import scala.collection.JavaConverters._
+
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.carbondata.datasource.TestUtil._
 import org.scalatest.{BeforeAndAfterAll, FunSuite}
@@ -24,6 +29,9 @@ import org.scalatest.{BeforeAndAfterAll, FunSuite}
 import org.apache.carbondata.core.datamap.DataMapStoreManager
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.metadata.datatype.DataTypes
+import org.apache.carbondata.hadoop.testutil.StoreCreator
+import org.apache.carbondata.sdk.file.{CarbonWriter, Field, Schema}
 
 class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll {
 
@@ -346,7 +354,7 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll {
     df.write.format("carbon").save(warehouse1 + "/test_folder/")
     if (!spark.sparkContext.version.startsWith("2.1")) {
       spark
-        .sql(s"create table test123 (c1 string, c2 string, arrayc array<int>, structc struct<_1:string, _2:decimal(38,18)>, shortc smallint,intc int, longc bigint,  doublec double, bigdecimalc decimal(38,18)) using carbon location '$warehouse1/test_folder/'")
+        .sql(s"create table test123 (c1 string, c2 string, shortc smallint,intc int, longc bigint,  doublec double, bigdecimalc decimal(38,18), arrayc array<int>, structc struct<_1:string, _2:decimal(38,18)>) using carbon location '$warehouse1/test_folder/'")
 
       checkAnswer(spark.sql("select * from test123"),
         spark.read.format("carbon").load(warehouse1 + "/test_folder/"))
@@ -613,6 +621,152 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll {
       FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(warehouse1 + "/test_folder"))
     }
   }
+
+  test("test read using old data") {
+    val store = new StoreCreator(new File(warehouse1).getAbsolutePath,
+      new File(warehouse1 + "../../../../../hadoop/src/test/resources/data.csv").getCanonicalPath,
+      false)
+    store.createCarbonStore()
+    FileFactory.deleteAllFilesOfDir(new File(warehouse1+"/testdb/testtable/Fact/Part0/Segment_0/0"))
+    val dfread = spark.read.format("carbon").load(warehouse1+"/testdb/testtable/Fact/Part0/Segment_0")
+    dfread.show(false)
+    spark.sql("drop table if exists parquet_table")
+  }
+
+  test("test read using different sort order data") {
+    if (!spark.sparkContext.version.startsWith("2.1")) {
+      spark.sql("drop table if exists old_comp")
+      FileFactory.deleteAllFilesOfDir(new File(warehouse1 + "/testdb"))
+      val store = new StoreCreator(new File(warehouse1).getAbsolutePath,
+        new File(warehouse1 + "../../../../../hadoop/src/test/resources/data.csv").getCanonicalPath,
+        false)
+      store.setSortColumns(new util.ArrayList[String](Seq("name").asJava))
+      var model = store.createTableAndLoadModel(false)
+      model.setSegmentId("0")
+      store.createCarbonStore(model)
+      FileFactory.deleteAllFilesOfDir(new File(warehouse1 + "/testdb/testtable/Fact/Part0/Segment_0/0"))
+      store.setSortColumns(new util.ArrayList[String](Seq("country,phonetype").asJava))
+      model = store.createTableAndLoadModel(false)
+      model.setSegmentId("1")
+      store.createCarbonStore(model)
+      FileFactory.deleteAllFilesOfDir(new File(warehouse1 + "/testdb/testtable/Fact/Part0/Segment_1/0"))
+      store.setSortColumns(new util.ArrayList[String](Seq("date").asJava))
+      model = store.createTableAndLoadModel(false)
+      model.setSegmentId("2")
+      store.createCarbonStore(model)
+      FileFactory.deleteAllFilesOfDir(new File(warehouse1 + "/testdb/testtable/Fact/Part0/Segment_2/0"))
+      store.setSortColumns(new util.ArrayList[String](Seq("serialname").asJava))
+      model = store.createTableAndLoadModel(false)
+      model.setSegmentId("3")
+      store.createCarbonStore(model)
+      FileFactory.deleteAllFilesOfDir(new File(warehouse1 + "/testdb/testtable/Fact/Part0/Segment_3/0"))
+      spark.sql(s"create table old_comp(id int, date string, country string, name string, phonetype string, serialname string, salary int) using carbon options(path='$warehouse1/testdb/testtable/Fact/Part0/', 'sort_columns'='name')")
+
+      assert(spark.sql("select * from old_comp where country='china'").count() == 3396)
+      assert(spark.sql("select * from old_comp ").count() == 4000)
+      spark.sql("drop table if exists old_comp")
+
+      spark.sql(s"create table old_comp1 using carbon options(path='$warehouse1/testdb/testtable/Fact/Part0/')")
+      assert(spark.sql("select * from old_comp1 where country='china'").count() == 3396)
+      assert(spark.sql("select * from old_comp1 ").count() == 4000)
+      spark.sql("drop table if exists old_comp1")
+      FileFactory.deleteAllFilesOfDir(new File(warehouse1 + "/testdb"))
+    }
+  }
+
+
+  test("test write sdk and read with spark using different sort order data") {
+    spark.sql("drop table if exists sdkout")
+    FileFactory.deleteAllFilesOfDir(new File(warehouse1+"/sdk"))
+    buildTestDataOtherDataType(5, Array("age", "address"), warehouse1+"/sdk")
+    spark.sql(s"create table sdkout using carbon options(path='$warehouse1/sdk')")
+    assert(spark.sql("select * from sdkout").collect().length == 5)
+    buildTestDataOtherDataType(5, Array("name","salary"), warehouse1+"/sdk")
+    spark.sql("refresh table sdkout")
+    assert(spark.sql("select * from sdkout where name = 'name1'").collect().length == 2)
+    assert(spark.sql("select * from sdkout where salary=100").collect().length == 2)
+    buildTestDataOtherDataType(5, Array("name","age"), warehouse1+"/sdk")
+    spark.sql("refresh table sdkout")
+    assert(spark.sql("select * from sdkout where name='name0'").collect().length == 3)
+    assert(spark.sql("select * from sdkout").collect().length == 15)
+    assert(spark.sql("select * from sdkout where salary=100").collect().length == 3)
+    assert(spark.sql("select * from sdkout where address='address1'").collect().length == 3)
+    buildTestDataOtherDataType(5, Array("name","salary"), warehouse1+"/sdk")
+    spark.sql("refresh table sdkout")
+    assert(spark.sql("select * from sdkout where name='name0'").collect().length == 4)
+    assert(spark.sql("select * from sdkout").collect().length == 20)
+    assert(spark.sql("select * from sdkout where salary=100").collect().length == 4)
+    assert(spark.sql("select * from sdkout where address='address1'").collect().length == 4)
+    FileFactory.deleteAllFilesOfDir(new File(warehouse1+"/sdk"))
+  }
+
+  test("test write sdk with different schema and read with spark") {
+    spark.sql("drop table if exists sdkout")
+    FileFactory.deleteAllFilesOfDir(new File(warehouse1+"/sdk1"))
+    buildTestDataOtherDataType(5, Array("age", "address"), warehouse1+"/sdk1")
+    spark.sql(s"create table sdkout using carbon options(path='$warehouse1/sdk1')")
+    assert(spark.sql("select * from sdkout").collect().length == 5)
+    buildTestDataOtherDataType(5, null, warehouse1+"/sdk1", 2)
+    spark.sql("refresh table sdkout")
+    intercept[Exception] {
+      spark.sql("select * from sdkout").show()
+    }
+    intercept[Exception] {
+      spark.sql("select * from sdkout where salary=100").show()
+    }
+    FileFactory.deleteAllFilesOfDir(new File(warehouse1+"/sdk1"))
+  }
+
+  // prepare sdk writer output with other schema
+  def buildTestDataOtherDataType(rows: Int, sortColumns: Array[String], writerPath: String, colCount: Int = -1): Any = {
+    var fields: Array[Field] = new Array[Field](6)
+    // same column name, but name as boolean type
+    fields(0) = new Field("male", DataTypes.BOOLEAN)
+    fields(1) = new Field("age", DataTypes.INT)
+    fields(2) = new Field("height", DataTypes.DOUBLE)
+    fields(3) = new Field("name", DataTypes.STRING)
+    fields(4) = new Field("address", DataTypes.STRING)
+    fields(5) = new Field("salary", DataTypes.LONG)
+
+    if (colCount > 0) {
+      val fieldsToWrite: Array[Field] = new Array[Field](colCount)
+      var i = 0
+      while (i < colCount) {
+        fieldsToWrite(i) = fields(i)
+        i += 1
+      }
+      fields = fieldsToWrite
+    }
+
+    try {
+      val builder = CarbonWriter.builder()
+      val writer =
+        builder.outputPath(writerPath)
+          .isTransactionalTable(false)
+          .uniqueIdentifier(System.nanoTime()).withBlockSize(2).sortBy(sortColumns)
+          .buildWriterForCSVInput(new Schema(fields))
+
+      var i = 0
+      while (i < rows) {
+        val array = Array[String]("true",
+          String.valueOf(i),
+          String.valueOf(i.toDouble / 2),
+          "name" + i,
+          "address" + i,
+          (i * 100).toString)
+        if (colCount > 0) {
+          writer.write(array.slice(0, colCount))
+        } else {
+          writer.write(array)
+        }
+        i += 1
+      }
+      writer.close()
+    } catch {
+      case ex: Exception => throw new RuntimeException(ex)
+      case _ => None
+    }
+  }
   override protected def beforeAll(): Unit = {
     drop
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3894e1d0/streaming/src/test/java/org/apache/carbondata/streaming/CarbonStreamOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/carbondata/streaming/CarbonStreamOutputFormatTest.java b/streaming/src/test/java/org/apache/carbondata/streaming/CarbonStreamOutputFormatTest.java
index af79483..d675973 100644
--- a/streaming/src/test/java/org/apache/carbondata/streaming/CarbonStreamOutputFormatTest.java
+++ b/streaming/src/test/java/org/apache/carbondata/streaming/CarbonStreamOutputFormatTest.java
@@ -70,7 +70,8 @@ public class CarbonStreamOutputFormatTest extends TestCase {
             tablePath,
             new CarbonTableIdentifier(dbName, tableName, UUID.randomUUID().toString()));
 
-    CarbonTable table = StoreCreator.createTable(identifier);
+    CarbonTable table = new StoreCreator(new File("target/store").getAbsolutePath(),
+        new File("../hadoop/src/test/resources/data.csv").getCanonicalPath()).createTable(identifier);
 
     String factFilePath = new File("../hadoop/src/test/resources/data.csv").getCanonicalPath();
     carbonLoadModel = StoreCreator.buildCarbonLoadModel(table, factFilePath, identifier);


[2/2] carbondata git commit: [CARBONDATA-2910] Support backward compatability in fileformat and added tests for load with different sort orders

Posted by ma...@apache.org.
[CARBONDATA-2910] Support backward compatability in fileformat and added tests for load with different sort orders

1. The data loaded by old version with all dictionary exclude can now work with fileformat if the segment folder is given for reading.
2. Now user can specify different sort options per load while loading data through sdk, fileformat can read now.

This closes #2685


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/3894e1d0
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/3894e1d0
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/3894e1d0

Branch: refs/heads/master
Commit: 3894e1d050cc39959a6445f97a7850ac922b7bd8
Parents: b6bd90d
Author: ravipesala <ra...@gmail.com>
Authored: Thu Aug 30 20:41:06 2018 +0530
Committer: manishgupta88 <to...@gmail.com>
Committed: Fri Sep 7 20:17:44 2018 +0530

----------------------------------------------------------------------
 .../carbondata/core/datamap/TableDataMap.java   |  34 ++++
 .../carbondata/core/datamap/dev/DataMap.java    |  13 +-
 .../dev/cgdatamap/CoarseGrainDataMap.java       |  12 ++
 .../datamap/dev/fgdatamap/FineGrainDataMap.java |  12 ++
 .../indexstore/blockletindex/BlockDataMap.java  |  27 +++-
 .../blockletindex/BlockletDataMapFactory.java   |   3 +-
 .../core/metadata/schema/table/CarbonTable.java |  11 +-
 .../executor/impl/AbstractQueryExecutor.java    |  97 ++++++++++++
 .../core/scan/executor/util/QueryUtil.java      |  19 +++
 .../core/scan/expression/ColumnExpression.java  |   7 +
 .../carbondata/core/scan/model/QueryModel.java  |  64 ++++++--
 .../core/scan/model/QueryModelBuilder.java      |  21 ++-
 .../util/AbstractDataFileFooterConverter.java   |  12 ++
 .../core/util/BlockletDataMapUtil.java          |  13 +-
 .../hadoop/api/CarbonFileInputFormat.java       |  11 +-
 .../hadoop/api/CarbonInputFormat.java           |  26 ++--
 .../hadoop/api/CarbonTableInputFormat.java      |  23 ++-
 .../hadoop/testutil/StoreCreator.java           | 101 +++++++++---
 .../hadoop/ft/CarbonTableInputFormatTest.java   |  27 ++--
 .../hadoop/ft/CarbonTableOutputFormatTest.java  |   3 +-
 ...ithColumnMetCacheAndCacheLevelProperty.scala |   4 +-
 .../TestNonTransactionalCarbonTable.scala       |  19 +--
 .../execution/datasources/CarbonFileIndex.scala |  10 +-
 .../datasources/SparkCarbonFileFormat.scala     |  14 +-
 .../datasource/SparkCarbonDataSourceTest.scala  | 156 ++++++++++++++++++-
 .../streaming/CarbonStreamOutputFormatTest.java |   3 +-
 26 files changed, 617 insertions(+), 125 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/3894e1d0/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
index aed8c60..a272777 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
@@ -36,6 +36,7 @@ import org.apache.carbondata.core.indexstore.PartitionSpec;
 import org.apache.carbondata.core.indexstore.SegmentPropertiesFetcher;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
+import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 import org.apache.carbondata.events.Event;
 import org.apache.carbondata.events.OperationContext;
@@ -79,6 +80,39 @@ public final class TableDataMap extends OperationEventListener {
     return blockletDetailsFetcher;
   }
 
+
+  /**
+   * Pass the valid segments and prune the datamap using filter expression
+   *
+   * @param segments
+   * @param filterExp
+   * @return
+   */
+  public List<ExtendedBlocklet> prune(List<Segment> segments, Expression filterExp,
+      List<PartitionSpec> partitions) throws IOException {
+    List<ExtendedBlocklet> blocklets = new ArrayList<>();
+    SegmentProperties segmentProperties;
+    Map<Segment, List<DataMap>> dataMaps = dataMapFactory.getDataMaps(segments);
+    for (Segment segment : segments) {
+      List<Blocklet> pruneBlocklets = new ArrayList<>();
+      // if filter is not passed then return all the blocklets
+      if (filterExp == null) {
+        pruneBlocklets = blockletDetailsFetcher.getAllBlocklets(segment, partitions);
+      } else {
+        segmentProperties = segmentPropertiesFetcher.getSegmentProperties(segment);
+        for (DataMap dataMap : dataMaps.get(segment)) {
+
+          pruneBlocklets
+              .addAll(dataMap.prune(filterExp, segmentProperties, partitions, identifier));
+        }
+      }
+      blocklets.addAll(addSegmentId(
+          blockletDetailsFetcher.getExtendedBlocklets(pruneBlocklets, segment),
+          segment.toString()));
+    }
+    return blocklets;
+  }
+
   /**
    * Pass the valid segments and prune the datamap using filter expression
    *

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3894e1d0/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
index d846281..456776b 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
@@ -24,6 +24,8 @@ import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.indexstore.Blocklet;
 import org.apache.carbondata.core.indexstore.PartitionSpec;
 import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 
 /**
@@ -38,12 +40,19 @@ public interface DataMap<T extends Blocklet> {
   void init(DataMapModel dataMapModel) throws MemoryException, IOException;
 
   /**
-   * Prune the datamap with filter expression and partition information. It returns the list of
-   * blocklets where these filters can exist.
+   * Prune the datamap with resolved filter expression and partition information.
+   * It returns the list of blocklets where these filters can exist.
    */
   List<T> prune(FilterResolverIntf filterExp, SegmentProperties segmentProperties,
       List<PartitionSpec> partitions) throws IOException;
 
+  /**
+   * Prune the datamap with filter expression and partition information. It returns the list of
+   * blocklets where these filters can exist.
+   */
+  List<T> prune(Expression filter, SegmentProperties segmentProperties,
+      List<PartitionSpec> partitions, AbsoluteTableIdentifier identifier) throws IOException;
+
   // TODO Move this method to Abstract class
   /**
    * Validate whether the current segment needs to be fetching the required data

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3894e1d0/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/CoarseGrainDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/CoarseGrainDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/CoarseGrainDataMap.java
index 62a1d1b..25c4c94 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/CoarseGrainDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/CoarseGrainDataMap.java
@@ -16,10 +16,17 @@
  */
 package org.apache.carbondata.core.datamap.dev.cgdatamap;
 
+import java.io.IOException;
+import java.util.List;
+
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.annotations.InterfaceStability;
 import org.apache.carbondata.core.datamap.dev.DataMap;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.indexstore.Blocklet;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.scan.expression.Expression;
 
 /**
  * DataMap for Coarse Grain level, see {@link org.apache.carbondata.core.datamap.DataMapLevel#CG}
@@ -28,4 +35,9 @@ import org.apache.carbondata.core.indexstore.Blocklet;
 @InterfaceStability.Evolving
 public abstract class CoarseGrainDataMap implements DataMap<Blocklet> {
 
+  @Override
+  public List<Blocklet> prune(Expression expression, SegmentProperties segmentProperties,
+      List<PartitionSpec> partitions, AbsoluteTableIdentifier identifier) throws IOException {
+    throw new UnsupportedOperationException("Filter expression not supported");
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3894e1d0/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/FineGrainDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/FineGrainDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/FineGrainDataMap.java
index 18389b2..7431742 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/FineGrainDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/FineGrainDataMap.java
@@ -16,9 +16,16 @@
  */
 package org.apache.carbondata.core.datamap.dev.fgdatamap;
 
+import java.io.IOException;
+import java.util.List;
+
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.annotations.InterfaceStability;
 import org.apache.carbondata.core.datamap.dev.DataMap;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.scan.expression.Expression;
 
 /**
  * DataMap for Fine Grain level, see {@link org.apache.carbondata.core.datamap.DataMapLevel#FG}
@@ -27,4 +34,9 @@ import org.apache.carbondata.core.datamap.dev.DataMap;
 @InterfaceStability.Evolving
 public abstract class FineGrainDataMap implements DataMap<FineGrainBlocklet> {
 
+  @Override
+  public List<FineGrainBlocklet> prune(Expression filter, SegmentProperties segmentProperties,
+      List<PartitionSpec> partitions, AbsoluteTableIdentifier identifier) throws IOException {
+    throw new UnsupportedOperationException("Filter expression not supported");
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3894e1d0/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
index 1938400..2dbf6a0 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
@@ -40,17 +40,24 @@ import org.apache.carbondata.core.indexstore.row.DataMapRow;
 import org.apache.carbondata.core.indexstore.row.DataMapRowImpl;
 import org.apache.carbondata.core.indexstore.schema.CarbonRowSchema;
 import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
 import org.apache.carbondata.core.metadata.blocklet.index.BlockletIndex;
 import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.core.profiler.ExplainCollector;
+import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor;
 import org.apache.carbondata.core.scan.filter.FilterUtil;
 import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
 import org.apache.carbondata.core.scan.filter.executer.ImplicitColumnFilterExecutor;
+import org.apache.carbondata.core.scan.filter.intf.FilterOptimizer;
+import org.apache.carbondata.core.scan.filter.optimizer.RangeFilterOptmizer;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import org.apache.carbondata.core.scan.model.QueryModel;
 import org.apache.carbondata.core.util.BlockletDataMapUtil;
 import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.CarbonUtil;
@@ -102,7 +109,8 @@ public class BlockDataMap extends CoarseGrainDataMap
     BlockletDataMapModel blockletDataMapInfo = (BlockletDataMapModel) dataMapModel;
     DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
     List<DataFileFooter> indexInfo = fileFooterConverter
-        .getIndexInfo(blockletDataMapInfo.getFilePath(), blockletDataMapInfo.getFileData());
+        .getIndexInfo(blockletDataMapInfo.getFilePath(), blockletDataMapInfo.getFileData(),
+            blockletDataMapInfo.getCarbonTable().isTransactionalTable());
     Path path = new Path(blockletDataMapInfo.getFilePath());
     // store file path only in case of partition table, non transactional table and flat folder
     // structure
@@ -632,6 +640,23 @@ public class BlockDataMap extends CoarseGrainDataMap
   }
 
   @Override
+  public List<Blocklet> prune(Expression expression, SegmentProperties properties,
+      List<PartitionSpec> partitions, AbsoluteTableIdentifier identifier) throws IOException {
+    FilterResolverIntf filterResolverIntf = null;
+    if (expression != null) {
+      QueryModel.FilterProcessVO processVO =
+          new QueryModel.FilterProcessVO(properties.getDimensions(), properties.getMeasures(),
+              new ArrayList<CarbonDimension>());
+      QueryModel.processFilterExpression(processVO, expression, null, null);
+      // Optimize Filter Expression and fit RANGE filters is conditions apply.
+      FilterOptimizer rangeFilterOptimizer = new RangeFilterOptmizer(expression);
+      rangeFilterOptimizer.optimizeFilter();
+      filterResolverIntf = CarbonTable.resolveFilter(expression, identifier);
+    }
+    return prune(filterResolverIntf, properties, partitions);
+  }
+
+  @Override
   public List<Blocklet> prune(FilterResolverIntf filterExp, SegmentProperties segmentProperties,
       List<PartitionSpec> partitions) {
     if (memoryDMStore.getRowCount() == 0) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3894e1d0/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
index f36afa0..da2fa39 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
@@ -49,6 +49,7 @@ import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 import org.apache.carbondata.core.util.BlockletDataMapUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.events.Event;
@@ -387,7 +388,7 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
     List<CoarseGrainDataMap> dataMaps = getDataMaps(segment);
     for (CoarseGrainDataMap dataMap : dataMaps) {
       blocklets.addAll(
-          dataMap.prune(null, getSegmentProperties(segment), partitions));
+          dataMap.prune((FilterResolverIntf) null, getSegmentProperties(segment), partitions));
     }
     return blocklets;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3894e1d0/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index c66d168..21f24d6 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -1017,7 +1017,10 @@ public class CarbonTable implements Serializable {
 
   public void processFilterExpression(Expression filterExpression,
       boolean[] isFilterDimensions, boolean[] isFilterMeasures) {
-    QueryModel.processFilterExpression(this, filterExpression, isFilterDimensions,
+    QueryModel.FilterProcessVO processVO =
+        new QueryModel.FilterProcessVO(getDimensionByTableName(getTableName()),
+            getMeasureByTableName(getTableName()), getImplicitDimensionByTableName(getTableName()));
+    QueryModel.processFilterExpression(processVO, filterExpression, isFilterDimensions,
         isFilterMeasures);
 
     if (null != filterExpression) {
@@ -1031,11 +1034,11 @@ public class CarbonTable implements Serializable {
   /**
    * Resolve the filter expression.
    */
-  public FilterResolverIntf resolveFilter(Expression filterExpression) {
+  public static FilterResolverIntf resolveFilter(Expression filterExpression,
+      AbsoluteTableIdentifier identifier) {
     try {
       FilterExpressionProcessor filterExpressionProcessor = new FilterExpressionProcessor();
-      return filterExpressionProcessor.getFilterResolver(
-          filterExpression, getAbsoluteTableIdentifier());
+      return filterExpressionProcessor.getFilterResolver(filterExpression, identifier);
     } catch (Exception e) {
       throw new RuntimeException("Error while resolving filter expression", e);
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3894e1d0/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
index ece2f8d..bd1eb1c 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
@@ -52,12 +52,16 @@ import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.core.scan.executor.QueryExecutor;
 import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException;
 import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
 import org.apache.carbondata.core.scan.executor.util.QueryUtil;
 import org.apache.carbondata.core.scan.executor.util.RestructureUtil;
+import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.filter.FilterUtil;
+import org.apache.carbondata.core.scan.filter.intf.FilterOptimizer;
+import org.apache.carbondata.core.scan.filter.optimizer.RangeFilterOptmizer;
 import org.apache.carbondata.core.scan.model.ProjectionDimension;
 import org.apache.carbondata.core.scan.model.ProjectionMeasure;
 import org.apache.carbondata.core.scan.model.QueryModel;
@@ -214,12 +218,20 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
         if (null == fileFooter) {
           blockInfo.setDetailInfo(null);
           fileFooter = CarbonUtil.readMetadatFile(blockInfo);
+          // In case of non transactional table just set columnuniqueid as columnName to support
+          // backward compatabiity. non transactional tables column uniqueid is always equal to
+          // columnname
+          if (!queryModel.getTable().isTransactionalTable()) {
+            QueryUtil.updateColumnUniqueIdForNonTransactionTable(fileFooter.getColumnInTable());
+          }
           filePathToFileFooterMapping.put(blockInfo.getFilePath(), fileFooter);
           blockInfo.setDetailInfo(blockletDetailInfo);
         }
         if (null == segmentProperties) {
           segmentProperties = new SegmentProperties(fileFooter.getColumnInTable(),
               fileFooter.getSegmentInfo().getColumnCardinality());
+          createFilterExpression(queryModel, segmentProperties);
+          updateColumns(queryModel, fileFooter.getColumnInTable(), blockInfo.getFilePath());
           filePathToSegmentPropertiesMap.put(blockInfo.getFilePath(), segmentProperties);
         }
         readAndFillBlockletInfo(tableBlockInfos, blockInfo,
@@ -228,6 +240,9 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
         if (null == segmentProperties) {
           segmentProperties = new SegmentProperties(blockInfo.getDetailInfo().getColumnSchemas(),
               blockInfo.getDetailInfo().getDimLens());
+          createFilterExpression(queryModel, segmentProperties);
+          updateColumns(queryModel, blockInfo.getDetailInfo().getColumnSchemas(),
+              blockInfo.getFilePath());
           filePathToSegmentPropertiesMap.put(blockInfo.getFilePath(), segmentProperties);
         }
         tableBlockInfos.add(blockInfo);
@@ -242,6 +257,88 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
   }
 
   /**
+   * It updates dimensions and measures of query model. In few scenarios like SDK user can configure
+   * sort options per load, so if first load has c1 as integer column and configure as sort column
+   * then carbon treat that as dimension.But in second load if user change the sort option then the
+   * c1 become measure as bydefault integers are measures. So this method updates the measures to
+   * dimensions and vice versa as per the indexfile schema.
+   */
+  private void updateColumns(QueryModel queryModel, List<ColumnSchema> columnsInTable,
+      String filePath) throws IOException {
+    if (queryModel.getTable().isTransactionalTable()) {
+      return;
+    }
+    // First validate the schema of the carbondata file
+    boolean sameColumnSchemaList = BlockletDataMapUtil.isSameColumnSchemaList(columnsInTable,
+        queryModel.getTable().getTableInfo().getFactTable().getListOfColumns());
+    if (!sameColumnSchemaList) {
+      LOGGER.error("Schema of " + filePath + " doesn't match with the table's schema");
+      throw new IOException("All the files doesn't have same schema. "
+          + "Unsupported operation on nonTransactional table. Check logs.");
+    }
+    List<ProjectionDimension> dimensions = queryModel.getProjectionDimensions();
+    List<ProjectionMeasure> measures = queryModel.getProjectionMeasures();
+    List<ProjectionDimension> updatedDims = new ArrayList<>();
+    List<ProjectionMeasure> updatedMsrs = new ArrayList<>();
+
+    // Check and update dimensions to measures if it is measure in indexfile schema
+    for (ProjectionDimension dimension : dimensions) {
+      int index = columnsInTable.indexOf(dimension.getDimension().getColumnSchema());
+      if (index > -1) {
+        if (!columnsInTable.get(index).isDimensionColumn()) {
+          ProjectionMeasure measure = new ProjectionMeasure(
+              new CarbonMeasure(columnsInTable.get(index), dimension.getDimension().getOrdinal(),
+                  dimension.getDimension().getSchemaOrdinal()));
+          measure.setOrdinal(dimension.getOrdinal());
+          updatedMsrs.add(measure);
+        } else {
+          updatedDims.add(dimension);
+        }
+      } else {
+        updatedDims.add(dimension);
+      }
+    }
+
+    // Check and update measure to dimension if it is dimension in indexfile schema.
+    for (ProjectionMeasure measure : measures) {
+      int index = columnsInTable.indexOf(measure.getMeasure().getColumnSchema());
+      if (index > -1) {
+        if (columnsInTable.get(index).isDimensionColumn()) {
+          ProjectionDimension dimension = new ProjectionDimension(
+              new CarbonDimension(columnsInTable.get(index), measure.getMeasure().getOrdinal(),
+                  measure.getMeasure().getSchemaOrdinal(), -1, -1));
+          dimension.setOrdinal(measure.getOrdinal());
+          updatedDims.add(dimension);
+        } else {
+          updatedMsrs.add(measure);
+        }
+      } else {
+        updatedMsrs.add(measure);
+      }
+    }
+    // Clear and update the query model projections.
+    dimensions.clear();
+    dimensions.addAll(updatedDims);
+    measures.clear();
+    measures.addAll(updatedMsrs);
+  }
+
+  private void createFilterExpression(QueryModel queryModel, SegmentProperties properties) {
+    Expression expression = queryModel.getFilterExpression();
+    if (expression != null) {
+      QueryModel.FilterProcessVO processVO =
+          new QueryModel.FilterProcessVO(properties.getDimensions(), properties.getMeasures(),
+              new ArrayList<CarbonDimension>());
+      QueryModel.processFilterExpression(processVO, expression, null, null);
+      // Optimize Filter Expression and fit RANGE filters is conditions apply.
+      FilterOptimizer rangeFilterOptimizer = new RangeFilterOptmizer(expression);
+      rangeFilterOptimizer.optimizeFilter();
+      queryModel.setFilterExpressionResolverTree(
+          CarbonTable.resolveFilter(expression, queryModel.getAbsoluteTableIdentifier()));
+    }
+  }
+
+  /**
    * Read the file footer of block file and get the blocklets to query
    */
   private void readAndFillBlockletInfo(List<TableBlockInfo> tableBlockInfos,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3894e1d0/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
index 00c7913..2285284 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
@@ -48,6 +48,7 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.RelationIdentifier;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.core.scan.complextypes.ArrayQueryType;
 import org.apache.carbondata.core.scan.complextypes.MapQueryType;
 import org.apache.carbondata.core.scan.complextypes.PrimitiveQueryType;
@@ -730,4 +731,22 @@ public class QueryUtil {
       return new BitSet(1);
     }
   }
+
+  /**
+   * In case of non transactional table just set columnuniqueid as columnName to support
+   * backward compatabiity. non transactional tables column uniqueid is always equal to
+   * columnname
+   */
+  public static void updateColumnUniqueIdForNonTransactionTable(List<ColumnSchema> columnSchemas) {
+    for (ColumnSchema columnSchema : columnSchemas) {
+      // In case of complex types only add the name after removing parent names.
+      int index = columnSchema.getColumnName().lastIndexOf(".");
+      if (index >= 0) {
+        columnSchema.setColumnUniqueId(columnSchema.getColumnName()
+            .substring(index + 1, columnSchema.getColumnName().length()));
+      } else {
+        columnSchema.setColumnUniqueId(columnSchema.getColumnName());
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3894e1d0/core/src/main/java/org/apache/carbondata/core/scan/expression/ColumnExpression.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/expression/ColumnExpression.java b/core/src/main/java/org/apache/carbondata/core/scan/expression/ColumnExpression.java
index 39ad312..766d249 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/expression/ColumnExpression.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/expression/ColumnExpression.java
@@ -98,6 +98,13 @@ public class ColumnExpression extends LeafExpression {
     return dataType;
   }
 
+  public void reset() {
+    dimension = null;
+    measure = null;
+    isDimension = false;
+    isMeasure = false;
+  }
+
   @Override
   public ExpressionResult evaluate(RowIntf value) {
     return new ExpressionResult(dataType, (null == value ? null : value.getVal(colIndex)));

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3894e1d0/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
index 31c7a86..6df98e6 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
@@ -66,6 +66,11 @@ public class QueryModel {
   private FilterResolverIntf filterExpressionResolverTree;
 
   /**
+   * filter expression tree
+   */
+  private Expression filterExpression;
+
+  /**
    * table block information in which query will be executed
    */
   private List<TableBlockInfo> tableBlockInfos;
@@ -128,7 +133,7 @@ public class QueryModel {
     return new QueryModel(carbonTable);
   }
 
-  public static void processFilterExpression(CarbonTable carbonTable, Expression filterExpression,
+  public static void processFilterExpression(FilterProcessVO processVO, Expression filterExpression,
       final boolean[] isFilterDimensions, final boolean[] isFilterMeasures) {
     if (null != filterExpression) {
       if (null != filterExpression.getChildren() && filterExpression.getChildren().size() == 0) {
@@ -136,22 +141,22 @@ public class QueryModel {
           List<ColumnExpression> listOfCol =
               ((ConditionalExpression) filterExpression).getColumnList();
           for (ColumnExpression expression : listOfCol) {
-            setDimAndMsrColumnNode(carbonTable, expression, isFilterDimensions, isFilterMeasures);
+            setDimAndMsrColumnNode(processVO, expression, isFilterDimensions, isFilterMeasures);
           }
         }
       }
       for (Expression expression : filterExpression.getChildren()) {
         if (expression instanceof ColumnExpression) {
-          setDimAndMsrColumnNode(carbonTable, (ColumnExpression) expression, isFilterDimensions,
+          setDimAndMsrColumnNode(processVO, (ColumnExpression) expression, isFilterDimensions,
               isFilterMeasures);
         } else if (expression instanceof UnknownExpression) {
           UnknownExpression exp = ((UnknownExpression) expression);
           List<ColumnExpression> listOfColExpression = exp.getAllColumnList();
           for (ColumnExpression col : listOfColExpression) {
-            setDimAndMsrColumnNode(carbonTable, col, isFilterDimensions, isFilterMeasures);
+            setDimAndMsrColumnNode(processVO, col, isFilterDimensions, isFilterMeasures);
           }
         } else {
-          processFilterExpression(carbonTable, expression, isFilterDimensions, isFilterMeasures);
+          processFilterExpression(processVO, expression, isFilterDimensions, isFilterMeasures);
         }
       }
     }
@@ -167,18 +172,16 @@ public class QueryModel {
     return null;
   }
 
-  private static void setDimAndMsrColumnNode(CarbonTable carbonTable, ColumnExpression col,
+  private static void setDimAndMsrColumnNode(FilterProcessVO processVO, ColumnExpression col,
       boolean[] isFilterDimensions, boolean[] isFilterMeasures) {
     CarbonDimension dim;
     CarbonMeasure msr;
     String columnName;
     columnName = col.getColumnName();
+    col.reset();
     dim = CarbonUtil
-        .findDimension(carbonTable.getDimensionByTableName(carbonTable.getTableName()), columnName);
-    msr = getCarbonMetadataMeasure(columnName,
-        carbonTable.getMeasureByTableName(carbonTable.getTableName()));
-    col.setDimension(false);
-    col.setMeasure(false);
+        .findDimension(processVO.getCarbonDimensions(), columnName);
+    msr = getCarbonMetadataMeasure(columnName, processVO.getCarbonMeasures());
 
     if (null != dim) {
       // Dimension Column
@@ -198,8 +201,7 @@ public class QueryModel {
     } else {
       // check if this is an implicit dimension
       dim = CarbonUtil
-          .findDimension(carbonTable.getImplicitDimensionByTableName(carbonTable.getTableName()),
-              columnName);
+          .findDimension(processVO.getImplicitDimensions(), columnName);
       col.setCarbonColumn(dim);
       col.setDimension(dim);
       col.setDimension(true);
@@ -269,6 +271,14 @@ public class QueryModel {
     this.filterExpressionResolverTree = filterExpressionResolverTree;
   }
 
+  public Expression getFilterExpression() {
+    return filterExpression;
+  }
+
+  public void setFilterExpression(Expression filterExpression) {
+    this.filterExpression = filterExpression;
+  }
+
   /**
    * @return the absoluteTableIdentifier
    */
@@ -401,4 +411,32 @@ public class QueryModel {
   public void setFreeUnsafeMemory(boolean freeUnsafeMemory) {
     this.freeUnsafeMemory = freeUnsafeMemory;
   }
+
+  public static class FilterProcessVO {
+
+    private List<CarbonDimension> carbonDimensions;
+
+    private List<CarbonMeasure> carbonMeasures;
+
+    private List<CarbonDimension> implicitDimensions;
+
+    public FilterProcessVO(List<CarbonDimension> carbonDimensions,
+        List<CarbonMeasure> carbonMeasures, List<CarbonDimension> implicitDimensions) {
+      this.carbonDimensions = carbonDimensions;
+      this.carbonMeasures = carbonMeasures;
+      this.implicitDimensions = implicitDimensions;
+    }
+
+    public List<CarbonDimension> getCarbonDimensions() {
+      return carbonDimensions;
+    }
+
+    public List<CarbonMeasure> getCarbonMeasures() {
+      return carbonMeasures;
+    }
+
+    public List<CarbonDimension> getImplicitDimensions() {
+      return implicitDimensions;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3894e1d0/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModelBuilder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModelBuilder.java b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModelBuilder.java
index ced80f2..f1bbe15 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModelBuilder.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModelBuilder.java
@@ -311,14 +311,19 @@ public class QueryModelBuilder {
     queryModel.setReadPageByPage(readPageByPage);
     queryModel.setProjection(projection);
 
-    // set the filter to the query model in order to filter blocklet before scan
-    boolean[] isFilterDimensions = new boolean[table.getDimensionOrdinalMax()];
-    boolean[] isFilterMeasures = new boolean[table.getAllMeasures().size()];
-    table.processFilterExpression(filterExpression, isFilterDimensions, isFilterMeasures);
-    queryModel.setIsFilterDimensions(isFilterDimensions);
-    queryModel.setIsFilterMeasures(isFilterMeasures);
-    FilterResolverIntf filterIntf = table.resolveFilter(filterExpression);
-    queryModel.setFilterExpressionResolverTree(filterIntf);
+    if (table.isTransactionalTable()) {
+      // set the filter to the query model in order to filter blocklet before scan
+      boolean[] isFilterDimensions = new boolean[table.getDimensionOrdinalMax()];
+      boolean[] isFilterMeasures = new boolean[table.getAllMeasures().size()];
+      table.processFilterExpression(filterExpression, isFilterDimensions, isFilterMeasures);
+      queryModel.setIsFilterDimensions(isFilterDimensions);
+      queryModel.setIsFilterMeasures(isFilterMeasures);
+      FilterResolverIntf filterIntf =
+          CarbonTable.resolveFilter(filterExpression, table.getAbsoluteTableIdentifier());
+      queryModel.setFilterExpressionResolverTree(filterIntf);
+    } else {
+      queryModel.setFilterExpression(filterExpression);
+    }
     return queryModel;
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3894e1d0/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java b/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
index 2e92d84..168a526 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
@@ -43,6 +43,7 @@ import org.apache.carbondata.core.metadata.schema.table.RelationIdentifier;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.core.metadata.schema.table.column.ParentColumnTableRelation;
 import org.apache.carbondata.core.reader.CarbonIndexFileReader;
+import org.apache.carbondata.core.scan.executor.util.QueryUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.format.BlockIndex;
 
@@ -135,6 +136,14 @@ public abstract class AbstractDataFileFooterConverter {
    * @throws IOException problem while reading the index file
    */
   public List<DataFileFooter> getIndexInfo(String filePath, byte[] fileData) throws IOException {
+    return getIndexInfo(filePath, fileData, true);
+  }
+
+  /**
+   * Below method will be used to get the index info from index file
+   */
+  public List<DataFileFooter> getIndexInfo(String filePath, byte[] fileData,
+      boolean isTransactionalTable) throws IOException {
     CarbonIndexFileReader indexReader = new CarbonIndexFileReader();
     List<DataFileFooter> dataFileFooters = new ArrayList<DataFileFooter>();
     String parentPath = filePath.substring(0, filePath.lastIndexOf("/"));
@@ -153,6 +162,9 @@ public abstract class AbstractDataFileFooterConverter {
       for (int i = 0; i < table_columns.size(); i++) {
         columnSchemaList.add(thriftColumnSchemaToWrapperColumnSchema(table_columns.get(i)));
       }
+      if (!isTransactionalTable) {
+        QueryUtil.updateColumnUniqueIdForNonTransactionTable(columnSchemaList);
+      }
       // get the segment info
       SegmentInfo segmentInfo = getSegmentInfo(readIndexHeader.getSegment_info());
       BlockletIndex blockletIndex = null;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3894e1d0/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
index 6ede653..8e8b075 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
@@ -91,17 +91,18 @@ public class BlockletDataMapUtil {
           identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
               .getIndexFileName()) });
     }
-    DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
     Map<String, BlockMetaInfo> blockMetaInfoMap = new HashMap<>();
-    List<DataFileFooter> indexInfo = fileFooterConverter.getIndexInfo(
-        identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
-            .getIndexFileName(), indexFileStore.getFileData(identifier.getIndexFileName()));
     CarbonTable carbonTable = identifierWrapper.getCarbonTable();
     if (carbonTable != null) {
       isTransactionalTable = carbonTable.getTableInfo().isTransactionalTable();
       tableColumnList =
           carbonTable.getTableInfo().getFactTable().getListOfColumns();
     }
+    DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
+    List<DataFileFooter> indexInfo = fileFooterConverter.getIndexInfo(
+        identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
+            .getIndexFileName(), indexFileStore.getFileData(identifier.getIndexFileName()),
+        isTransactionalTable);
     for (DataFileFooter footer : indexInfo) {
       if ((!isTransactionalTable) && (tableColumnList.size() != 0) &&
           !isSameColumnSchemaList(footer.getColumnInTable(), tableColumnList)) {
@@ -247,7 +248,7 @@ public class BlockletDataMapUtil {
     return true;
   }
 
-  private static boolean isSameColumnSchemaList(List<ColumnSchema> indexFileColumnList,
+  public static boolean isSameColumnSchemaList(List<ColumnSchema> indexFileColumnList,
       List<ColumnSchema> tableColumnList) {
     if (indexFileColumnList.size() != tableColumnList.size()) {
       LOG.error("Index file's column size is " + indexFileColumnList.size()
@@ -255,7 +256,7 @@ public class BlockletDataMapUtil {
       return false;
     }
     for (int i = 0; i < tableColumnList.size(); i++) {
-      if (!indexFileColumnList.get(i).equalsWithStrictCheck(tableColumnList.get(i))) {
+      if (!tableColumnList.contains(indexFileColumnList.get(i))) {
         return false;
       }
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3894e1d0/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
index 9d50d69..9e5edc1 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
@@ -36,7 +36,6 @@ import org.apache.carbondata.core.metadata.schema.table.TableInfo;
 import org.apache.carbondata.core.readcommitter.LatestFilesReadCommittedScope;
 import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
 import org.apache.carbondata.core.scan.expression.Expression;
-import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.hadoop.CarbonInputSplit;
@@ -121,12 +120,10 @@ public class CarbonFileInputFormat<T> extends CarbonInputFormat<T> implements Se
           readCommittedScope = new LatestFilesReadCommittedScope(identifier.getTablePath());
         }
       }
-      Expression filter = getFilterPredicates(job.getConfiguration());
       // this will be null in case of corrupt schema file.
       PartitionInfo partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName());
-      carbonTable.processFilterExpression(filter, null, null);
+      Expression filter = getFilterPredicates(job.getConfiguration());
 
-      FilterResolverIntf filterInterface = carbonTable.resolveFilter(filter);
 
       // if external table Segments are found, add it to the List
       List<Segment> externalTableSegments = new ArrayList<Segment>();
@@ -147,7 +144,7 @@ public class CarbonFileInputFormat<T> extends CarbonInputFormat<T> implements Se
       }
       // do block filtering and get split
       List<InputSplit> splits =
-          getSplits(job, filterInterface, externalTableSegments, null, partitionInfo, null);
+          getSplits(job, filter, externalTableSegments, null, partitionInfo, null);
       if (getColumnProjection(job.getConfiguration()) == null) {
         // If the user projection is empty, use default all columns as projections.
         // All column name will be filled inside getSplits, so can update only here.
@@ -167,7 +164,7 @@ public class CarbonFileInputFormat<T> extends CarbonInputFormat<T> implements Se
    * @return
    * @throws IOException
    */
-  private List<InputSplit> getSplits(JobContext job, FilterResolverIntf filterResolver,
+  private List<InputSplit> getSplits(JobContext job, Expression expression,
       List<Segment> validSegments, BitSet matchedPartitions, PartitionInfo partitionInfo,
       List<Integer> oldPartitionIdList) throws IOException {
 
@@ -176,7 +173,7 @@ public class CarbonFileInputFormat<T> extends CarbonInputFormat<T> implements Se
 
     // for each segment fetch blocks matching filter in Driver BTree
     List<CarbonInputSplit> dataBlocksOfSegment =
-        getDataBlocksOfSegment(job, carbonTable, filterResolver, matchedPartitions,
+        getDataBlocksOfSegment(job, carbonTable, expression, matchedPartitions,
             validSegments, partitionInfo, oldPartitionIdList);
     numBlocks = dataBlocksOfSegment.size();
     result.addAll(dataBlocksOfSegment);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3894e1d0/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
index bcbbb10..b497e3a 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
@@ -30,8 +30,10 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.constants.CarbonCommonConstantsInternal;
 import org.apache.carbondata.core.datamap.DataMapChooser;
 import org.apache.carbondata.core.datamap.DataMapJob;
+import org.apache.carbondata.core.datamap.DataMapStoreManager;
 import org.apache.carbondata.core.datamap.DataMapUtil;
 import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datamap.TableDataMap;
 import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper;
 import org.apache.carbondata.core.datamap.dev.expr.DataMapWrapperSimpleInfo;
 import org.apache.carbondata.core.exception.InvalidConfigurationException;
@@ -390,7 +392,7 @@ m filterExpression
    * get data blocks of given segment
    */
   protected List<CarbonInputSplit> getDataBlocksOfSegment(JobContext job, CarbonTable carbonTable,
-      FilterResolverIntf resolver, BitSet matchedPartitions, List<Segment> segmentIds,
+      Expression expression, BitSet matchedPartitions, List<Segment> segmentIds,
       PartitionInfo partitionInfo, List<Integer> oldPartitionIdList) throws IOException {
 
     QueryStatisticsRecorder recorder = CarbonTimeStatisticsFactory.createDriverRecorder();
@@ -400,7 +402,7 @@ m filterExpression
     TokenCache.obtainTokensForNamenodes(job.getCredentials(),
         new Path[] { new Path(carbonTable.getTablePath()) }, job.getConfiguration());
     List<ExtendedBlocklet> prunedBlocklets =
-        getPrunedBlocklets(job, carbonTable, resolver, segmentIds);
+        getPrunedBlocklets(job, carbonTable, expression, segmentIds);
 
     List<CarbonInputSplit> resultFilteredBlocks = new ArrayList<>();
     int partitionIndex = 0;
@@ -447,10 +449,13 @@ m filterExpression
    * First pruned with default blocklet datamap, then pruned with CG and FG datamaps
    */
   private List<ExtendedBlocklet> getPrunedBlocklets(JobContext job, CarbonTable carbonTable,
-      FilterResolverIntf resolver, List<Segment> segmentIds) throws IOException {
+      Expression expression, List<Segment> segmentIds) throws IOException {
     ExplainCollector.addPruningInfo(carbonTable.getTableName());
-    if (resolver != null) {
-      ExplainCollector.setFilterStatement(resolver.getFilterExpression().getStatement());
+    FilterResolverIntf resolver = null;
+    if (expression != null) {
+      carbonTable.processFilterExpression(expression, null, null);
+      resolver = CarbonTable.resolveFilter(expression, carbonTable.getAbsoluteTableIdentifier());
+      ExplainCollector.setFilterStatement(expression.getStatement());
     } else {
       ExplainCollector.setFilterStatement("none");
     }
@@ -461,10 +466,13 @@ m filterExpression
     DataMapJob dataMapJob = DataMapUtil.getDataMapJob(job.getConfiguration());
     List<PartitionSpec> partitionsToPrune = getPartitionsToPrune(job.getConfiguration());
     // First prune using default datamap on driver side.
-    DataMapExprWrapper dataMapExprWrapper = DataMapChooser
-        .getDefaultDataMap(getOrCreateCarbonTable(job.getConfiguration()), resolver);
-    List<ExtendedBlocklet> prunedBlocklets =
-        dataMapExprWrapper.prune(segmentIds, partitionsToPrune);
+    TableDataMap defaultDataMap = DataMapStoreManager.getInstance().getDefaultDataMap(carbonTable);
+    List<ExtendedBlocklet> prunedBlocklets = null;
+    if (carbonTable.isTransactionalTable()) {
+      prunedBlocklets = defaultDataMap.prune(segmentIds, resolver, partitionsToPrune);
+    } else {
+      prunedBlocklets = defaultDataMap.prune(segmentIds, expression, partitionsToPrune);
+    }
 
     if (prunedBlocklets.size() == 0) {
       return prunedBlocklets;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3894e1d0/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index 4f85975..05b73dd 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -198,11 +198,11 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
     Expression filter = getFilterPredicates(job.getConfiguration());
     // this will be null in case of corrupt schema file.
     PartitionInfo partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName());
-    carbonTable.processFilterExpression(filter, null, null);
 
     // prune partitions for filter query on partition table
     BitSet matchedPartitions = null;
     if (partitionInfo != null && partitionInfo.getPartitionType() != PartitionType.NATIVE_HIVE) {
+      carbonTable.processFilterExpression(filter, null, null);
       matchedPartitions = setMatchedPartitions(null, filter, partitionInfo, null);
       if (matchedPartitions != null) {
         if (matchedPartitions.cardinality() == 0) {
@@ -213,11 +213,9 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
       }
     }
 
-    FilterResolverIntf filterInterface = carbonTable.resolveFilter(filter);
-
     // do block filtering and get split
     List<InputSplit> splits =
-        getSplits(job, filterInterface, filteredSegmentToAccess, matchedPartitions, partitionInfo,
+        getSplits(job, filter, filteredSegmentToAccess, matchedPartitions, partitionInfo,
             null, updateStatusManager);
     // pass the invalid segment to task side in order to remove index entry in task side
     if (invalidSegments.size() > 0) {
@@ -229,8 +227,7 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
     }
 
     // add all splits of streaming
-    List<InputSplit> splitsOfStreaming =
-        getSplitsOfStreaming(job, streamSegments, carbonTable, filterInterface);
+    List<InputSplit> splitsOfStreaming = getSplitsOfStreaming(job, streamSegments, carbonTable);
     if (!splitsOfStreaming.isEmpty()) {
       splits.addAll(splitsOfStreaming);
     }
@@ -358,7 +355,8 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
           Expression filter = getFilterPredicates(job.getConfiguration());
           if (filter != null) {
             carbonTable.processFilterExpression(filter, null, null);
-            filterResolverIntf = carbonTable.resolveFilter(filter);
+            filterResolverIntf =
+                CarbonTable.resolveFilter(filter, carbonTable.getAbsoluteTableIdentifier());
           }
         }
       }
@@ -442,9 +440,7 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
       if (null == carbonTable) {
         throw new IOException("Missing/Corrupt schema file for table.");
       }
-
       carbonTable.processFilterExpression(filter, null, null);
-
       // prune partitions for filter query on partition table
       String partitionIds = job.getConfiguration().get(ALTER_PARTITION_ID);
       // matchedPartitions records partitionIndex, not partitionId
@@ -461,9 +457,8 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
         }
       }
 
-      FilterResolverIntf filterInterface = carbonTable.resolveFilter(filter);
       // do block filtering and get split
-      List<InputSplit> splits = getSplits(job, filterInterface, segmentList, matchedPartitions,
+      List<InputSplit> splits = getSplits(job, filter, segmentList, matchedPartitions,
           partitionInfo, oldPartitionIdList, new SegmentUpdateStatusManager(carbonTable));
       // pass the invalid segment to task side in order to remove index entry in task side
       if (invalidSegments.size() > 0) {
@@ -513,7 +508,7 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
    * @return
    * @throws IOException
    */
-  private List<InputSplit> getSplits(JobContext job, FilterResolverIntf filterResolver,
+  private List<InputSplit> getSplits(JobContext job, Expression expression,
       List<Segment> validSegments, BitSet matchedPartitions, PartitionInfo partitionInfo,
       List<Integer> oldPartitionIdList, SegmentUpdateStatusManager updateStatusManager)
       throws IOException {
@@ -527,7 +522,7 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
 
     // for each segment fetch blocks matching filter in Driver BTree
     List<org.apache.carbondata.hadoop.CarbonInputSplit> dataBlocksOfSegment =
-        getDataBlocksOfSegment(job, carbonTable, filterResolver, matchedPartitions,
+        getDataBlocksOfSegment(job, carbonTable, expression, matchedPartitions,
             validSegments, partitionInfo, oldPartitionIdList);
     numBlocks = dataBlocksOfSegment.size();
     for (org.apache.carbondata.hadoop.CarbonInputSplit inputSplit : dataBlocksOfSegment) {
@@ -615,7 +610,7 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
               toBeCleanedSegments);
     }
     List<ExtendedBlocklet> blocklets =
-        blockletMap.prune(filteredSegment, null, partitions);
+        blockletMap.prune(filteredSegment, (FilterResolverIntf) null, partitions);
     for (ExtendedBlocklet blocklet : blocklets) {
       String blockName = blocklet.getPath();
       blockName = CarbonTablePath.getCarbonDataFileName(blockName);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3894e1d0/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java
index 65ab426..935c52d 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java
@@ -101,18 +101,32 @@ public class StoreCreator {
 
   private static LogService LOG =
       LogServiceFactory.getLogService(StoreCreator.class.getCanonicalName());
-  private static AbsoluteTableIdentifier absoluteTableIdentifier;
-  private static String storePath = null;
+  private AbsoluteTableIdentifier absoluteTableIdentifier;
+  private String storePath = null;
+  private String csvPath;
+  private boolean dictionary;
+  private List<String> sortColumns = new ArrayList<>();
+
+  public StoreCreator(String storePath, String csvPath) {
+    this(storePath, csvPath, false);
+  }
 
-  static {
-    storePath = new File("target/store").getAbsolutePath();
+  public StoreCreator(String storePath, String csvPath, boolean dictionary) {
+    this.storePath = storePath;
+    this.csvPath = csvPath;
     String dbName = "testdb";
     String tableName = "testtable";
+    sortColumns.add("date");
+    sortColumns.add("country");
+    sortColumns.add("name");
+    sortColumns.add("phonetype");
+    sortColumns.add("serialname");
     absoluteTableIdentifier = AbsoluteTableIdentifier.from(storePath + "/testdb/testtable",
         new CarbonTableIdentifier(dbName, tableName, UUID.randomUUID().toString()));
+    this.dictionary = dictionary;
   }
 
-  public static AbsoluteTableIdentifier getAbsoluteTableIdentifier() {
+  public AbsoluteTableIdentifier getAbsoluteTableIdentifier() {
     return absoluteTableIdentifier;
   }
 
@@ -159,30 +173,41 @@ public class StoreCreator {
   /**
    * Create store without any restructure
    */
-  public static void createCarbonStore() throws Exception {
+  public void createCarbonStore() throws Exception {
     CarbonLoadModel loadModel = createTableAndLoadModel();
     loadData(loadModel, storePath);
   }
 
   /**
+   * Create store without any restructure
+   */
+  public void createCarbonStore(CarbonLoadModel loadModel) throws Exception {
+    loadData(loadModel, storePath);
+  }
+
+  /**
    * Method to clear the data maps
    */
-  public static void clearDataMaps() throws IOException {
+  public void clearDataMaps() throws IOException {
     DataMapStoreManager.getInstance().clearDataMaps(absoluteTableIdentifier);
   }
 
-  public static CarbonLoadModel createTableAndLoadModel() throws Exception {
-    String factFilePath =
-        new File("../hadoop/src/test/resources/data.csv").getCanonicalPath();
-    File storeDir = new File(storePath);
-    CarbonUtil.deleteFoldersAndFiles(storeDir);
+  public CarbonLoadModel createTableAndLoadModel(boolean deleteOldStore) throws Exception {
+    if (deleteOldStore) {
+      File storeDir = new File(storePath);
+      CarbonUtil.deleteFoldersAndFiles(storeDir);
+    }
 
     CarbonTable table = createTable(absoluteTableIdentifier);
-    writeDictionary(factFilePath, table);
-    return buildCarbonLoadModel(table, factFilePath, absoluteTableIdentifier);
+    writeDictionary(csvPath, table);
+    return buildCarbonLoadModel(table, csvPath, absoluteTableIdentifier);
+  }
+
+  public CarbonLoadModel createTableAndLoadModel() throws Exception {
+    return createTableAndLoadModel(true);
   }
 
-  public static CarbonTable createTable(
+  public CarbonTable createTable(
       AbsoluteTableIdentifier identifier) throws IOException {
     TableInfo tableInfo = new TableInfo();
     tableInfo.setDatabaseName(identifier.getCarbonTableIdentifier().getDatabaseName());
@@ -190,14 +215,21 @@ public class StoreCreator {
     tableSchema.setTableName(identifier.getCarbonTableIdentifier().getTableName());
     List<ColumnSchema> columnSchemas = new ArrayList<ColumnSchema>();
     ArrayList<Encoding> encodings = new ArrayList<>();
-    encodings.add(Encoding.DICTIONARY);
+    if (dictionary) {
+      encodings.add(Encoding.DICTIONARY);
+    }
+    int schemaOrdinal = 0;
     ColumnSchema id = new ColumnSchema();
-    id.setColumnName("ID");
+    id.setColumnName("id");
     id.setDataType(DataTypes.INT);
     id.setEncodingList(encodings);
     id.setColumnUniqueId(UUID.randomUUID().toString());
     id.setColumnReferenceId(id.getColumnUniqueId());
     id.setDimensionColumn(true);
+    id.setSchemaOrdinal(schemaOrdinal++);
+    if (sortColumns.contains(id.getColumnName())) {
+      id.setSortColumn(true);
+    }
     columnSchemas.add(id);
 
     ColumnSchema date = new ColumnSchema();
@@ -206,8 +238,11 @@ public class StoreCreator {
     date.setEncodingList(encodings);
     date.setColumnUniqueId(UUID.randomUUID().toString());
     date.setDimensionColumn(true);
-    date.setSortColumn(true);
     date.setColumnReferenceId(id.getColumnUniqueId());
+    date.setSchemaOrdinal(schemaOrdinal++);
+    if (sortColumns.contains(date.getColumnName())) {
+      date.setSortColumn(true);
+    }
     columnSchemas.add(date);
 
     ColumnSchema country = new ColumnSchema();
@@ -217,6 +252,10 @@ public class StoreCreator {
     country.setColumnUniqueId(UUID.randomUUID().toString());
     country.setDimensionColumn(true);
     country.setSortColumn(true);
+    country.setSchemaOrdinal(schemaOrdinal++);
+    if (sortColumns.contains(country.getColumnName())) {
+      country.setSortColumn(true);
+    }
     country.setColumnReferenceId(id.getColumnUniqueId());
     columnSchemas.add(country);
 
@@ -226,7 +265,10 @@ public class StoreCreator {
     name.setEncodingList(encodings);
     name.setColumnUniqueId(UUID.randomUUID().toString());
     name.setDimensionColumn(true);
-    name.setSortColumn(true);
+    name.setSchemaOrdinal(schemaOrdinal++);
+    if (sortColumns.contains(name.getColumnName())) {
+      name.setSortColumn(true);
+    }
     name.setColumnReferenceId(id.getColumnUniqueId());
     columnSchemas.add(name);
 
@@ -236,7 +278,10 @@ public class StoreCreator {
     phonetype.setEncodingList(encodings);
     phonetype.setColumnUniqueId(UUID.randomUUID().toString());
     phonetype.setDimensionColumn(true);
-    phonetype.setSortColumn(true);
+    phonetype.setSchemaOrdinal(schemaOrdinal++);
+    if (sortColumns.contains(phonetype.getColumnName())) {
+      phonetype.setSortColumn(true);
+    }
     phonetype.setColumnReferenceId(id.getColumnUniqueId());
     columnSchemas.add(phonetype);
 
@@ -246,10 +291,12 @@ public class StoreCreator {
     serialname.setEncodingList(encodings);
     serialname.setColumnUniqueId(UUID.randomUUID().toString());
     serialname.setDimensionColumn(true);
-    serialname.setSortColumn(true);
+    serialname.setSchemaOrdinal(schemaOrdinal++);
+    if (sortColumns.contains(serialname.getColumnName())) {
+      serialname.setSortColumn(true);
+    }
     serialname.setColumnReferenceId(id.getColumnUniqueId());
     columnSchemas.add(serialname);
-
     ColumnSchema salary = new ColumnSchema();
     salary.setColumnName("salary");
     salary.setDataType(DataTypes.INT);
@@ -257,6 +304,7 @@ public class StoreCreator {
     salary.setColumnUniqueId(UUID.randomUUID().toString());
     salary.setDimensionColumn(false);
     salary.setColumnReferenceId(id.getColumnUniqueId());
+    salary.setSchemaOrdinal(schemaOrdinal++);
     columnSchemas.add(salary);
 
     tableSchema.setListOfColumns(columnSchemas);
@@ -297,7 +345,7 @@ public class StoreCreator {
     return CarbonMetadata.getInstance().getCarbonTable(tableInfo.getTableUniqueName());
   }
 
-  private static void writeDictionary(String factFilePath, CarbonTable table) throws Exception {
+  private void writeDictionary(String factFilePath, CarbonTable table) throws Exception {
     BufferedReader reader = new BufferedReader(new InputStreamReader(
         new FileInputStream(factFilePath), "UTF-8"));
     List<CarbonDimension> dims = table.getDimensionByTableName(table.getTableName());
@@ -349,6 +397,10 @@ public class StoreCreator {
     reader.close();
   }
 
+  public void setSortColumns(List<String> sortColumns) {
+    this.sortColumns = sortColumns;
+  }
+
   /**
    * Execute graph which will further load data
    *
@@ -477,7 +529,8 @@ public class StoreCreator {
   }
 
   public static void main(String[] args) throws Exception {
-    StoreCreator.createCarbonStore();
+    new StoreCreator(new File("target/store").getAbsolutePath(),
+        new File("../hadoop/src/test/resources/data.csv").getCanonicalPath()).createCarbonStore();
   }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3894e1d0/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableInputFormatTest.java
----------------------------------------------------------------------
diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableInputFormatTest.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableInputFormatTest.java
index fb9ff9c..136d3cc 100644
--- a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableInputFormatTest.java
+++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableInputFormatTest.java
@@ -56,13 +56,16 @@ import org.junit.Test;
 
 public class CarbonTableInputFormatTest {
   // changed setUp to static init block to avoid un wanted multiple time store creation
+  private static StoreCreator creator;
   static {
     CarbonProperties.getInstance().
         addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, "/tmp/carbon/badrecords");
     CarbonProperties.getInstance()
         .addProperty(CarbonCommonConstants.CARBON_SYSTEM_FOLDER_LOCATION, "/tmp/carbon/");
     try {
-      StoreCreator.createCarbonStore();
+      creator = new StoreCreator(new File("target/store").getAbsolutePath(),
+          new File("../hadoop/src/test/resources/data.csv").getCanonicalPath());
+      creator.createCarbonStore();
     } catch (Exception e) {
       Assert.fail("create table failed: " + e.getMessage());
     }
@@ -73,10 +76,10 @@ public class CarbonTableInputFormatTest {
     JobConf jobConf = new JobConf(new Configuration());
     Job job = Job.getInstance(jobConf);
     job.getConfiguration().set("query.id", UUID.randomUUID().toString());
-    String tblPath = StoreCreator.getAbsoluteTableIdentifier().getTablePath();
+    String tblPath = creator.getAbsoluteTableIdentifier().getTablePath();
     FileInputFormat.addInputPath(job, new Path(tblPath));
-    CarbonTableInputFormat.setDatabaseName(job.getConfiguration(), StoreCreator.getAbsoluteTableIdentifier().getDatabaseName());
-    CarbonTableInputFormat.setTableName(job.getConfiguration(), StoreCreator.getAbsoluteTableIdentifier().getTableName());
+    CarbonTableInputFormat.setDatabaseName(job.getConfiguration(), creator.getAbsoluteTableIdentifier().getDatabaseName());
+    CarbonTableInputFormat.setTableName(job.getConfiguration(), creator.getAbsoluteTableIdentifier().getTableName());
     Expression expression = new EqualToExpression(new ColumnExpression("country", DataTypes.STRING),
         new LiteralExpression("china", DataTypes.STRING));
     CarbonTableInputFormat.setFilterPredicates(job.getConfiguration(), expression);
@@ -92,12 +95,12 @@ public class CarbonTableInputFormatTest {
     JobConf jobConf = new JobConf(new Configuration());
     Job job = Job.getInstance(jobConf);
     job.getConfiguration().set("query.id", UUID.randomUUID().toString());
-    String tblPath = StoreCreator.getAbsoluteTableIdentifier().getTablePath();
+    String tblPath = creator.getAbsoluteTableIdentifier().getTablePath();
     FileInputFormat.addInputPath(job, new Path(tblPath));
-    CarbonTableInputFormat.setDatabaseName(job.getConfiguration(), StoreCreator.getAbsoluteTableIdentifier().getDatabaseName());
-    CarbonTableInputFormat.setTableName(job.getConfiguration(), StoreCreator.getAbsoluteTableIdentifier().getTableName());
+    CarbonTableInputFormat.setDatabaseName(job.getConfiguration(), creator.getAbsoluteTableIdentifier().getDatabaseName());
+    CarbonTableInputFormat.setTableName(job.getConfiguration(), creator.getAbsoluteTableIdentifier().getTableName());
     // list files to get the carbondata file
-    String segmentPath = CarbonTablePath.getSegmentPath(StoreCreator.getAbsoluteTableIdentifier().getTablePath(), "0");
+    String segmentPath = CarbonTablePath.getSegmentPath(creator.getAbsoluteTableIdentifier().getTablePath(), "0");
     File segmentDir = new File(segmentPath);
     if (segmentDir.exists() && segmentDir.isDirectory()) {
       File[] files = segmentDir.listFiles(new FileFilter() {
@@ -134,7 +137,7 @@ public class CarbonTableInputFormatTest {
       Assert.assertTrue("failed", false);
       throw e;
     } finally {
-      StoreCreator.clearDataMaps();
+      creator.clearDataMaps();
     }
   }
 
@@ -153,7 +156,7 @@ public class CarbonTableInputFormatTest {
       e.printStackTrace();
       Assert.assertTrue("failed", false);
     } finally {
-      StoreCreator.clearDataMaps();
+      creator.clearDataMaps();
     }
   }
 
@@ -173,7 +176,7 @@ public class CarbonTableInputFormatTest {
     } catch (Exception e) {
       Assert.assertTrue("failed", false);
     } finally {
-      StoreCreator.clearDataMaps();
+      creator.clearDataMaps();
     }
   }
 
@@ -245,7 +248,7 @@ public class CarbonTableInputFormatTest {
     job.setMapperClass(Map.class);
     job.setInputFormatClass(CarbonTableInputFormat.class);
     job.setOutputFormatClass(TextOutputFormat.class);
-    AbsoluteTableIdentifier abs = StoreCreator.getAbsoluteTableIdentifier();
+    AbsoluteTableIdentifier abs = creator.getAbsoluteTableIdentifier();
     if (projection != null) {
       CarbonTableInputFormat.setColumnProjection(job.getConfiguration(), projection);
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3894e1d0/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableOutputFormatTest.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableOutputFormatTest.java
index 0d8c38b..379fdaf 100644
--- a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableOutputFormatTest.java
+++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableOutputFormatTest.java
@@ -54,7 +54,8 @@ public class CarbonTableOutputFormatTest {
     CarbonProperties.getInstance()
         .addProperty(CarbonCommonConstants.CARBON_SYSTEM_FOLDER_LOCATION, "/tmp/carbon/");
     try {
-      carbonLoadModel = StoreCreator.createTableAndLoadModel();
+      carbonLoadModel = new StoreCreator(new File("target/store").getAbsolutePath(),
+          new File("../hadoop/src/test/resources/data.csv").getCanonicalPath()).createTableAndLoadModel();
     } catch (Exception e) {
       Assert.fail("create table failed: " + e.getMessage());
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3894e1d0/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala
index 53d8f10..a1d4290 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala
@@ -33,6 +33,7 @@ import org.apache.carbondata.core.indexstore.blockletindex.{BlockDataMap, Blockl
 import org.apache.carbondata.core.indexstore.schema.CarbonRowSchema
 import org.apache.carbondata.core.indexstore.Blocklet
 import org.apache.carbondata.core.metadata.datatype.DataTypes
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension
 import org.apache.carbondata.core.scan.expression.conditional.NotEqualsExpression
 import org.apache.carbondata.core.scan.expression.logical.AndExpression
@@ -300,7 +301,8 @@ class TestQueryWithColumnMetCacheAndCacheLevelProperty extends QueryTest with Be
     val notEqualsExpression = new NotEqualsExpression(columnExpression, literalNullExpression)
     val equalsExpression = new NotEqualsExpression(columnExpression, literalValueExpression)
     val andExpression = new AndExpression(notEqualsExpression, equalsExpression)
-    val resolveFilter: FilterResolverIntf = carbonTable.resolveFilter(andExpression)
+    val resolveFilter: FilterResolverIntf =
+      CarbonTable.resolveFilter(andExpression, carbonTable.getAbsoluteTableIdentifier)
     val exprWrapper = DataMapChooser.getDefaultDataMap(carbonTable, resolveFilter)
     val segment = new Segment("0")
     // get the pruned blocklets

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3894e1d0/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
index 87d5622..113066a 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
@@ -1040,19 +1040,14 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
     buildTestDataOtherDataType(3, Array[String]("age"))
     // put other sdk writer output to same path,
     // which has same column names but different sort column
-    val exception =
-    intercept[IOException] {
-      sql("select * from sdkOutputTable").show(false)
-    }
-    assert(exception.getMessage()
-      .contains("Problem in loading segment blocks."))
+    checkAnswer(sql("select * from sdkOutputTable"),
+      Seq(Row(true, 0, 0.0),
+          Row(true, 1, 0.5),
+          Row(true, 2, 1.0),
+          Row(true, 0, 0.0),
+          Row(true, 1, 0.5),
+          Row(true, 2, 1.0)))
 
-    val exception1 =
-      intercept[IOException] {
-        sql("select count(*) from sdkOutputTable").show(false)
-      }
-    assert(exception1.getMessage()
-      .contains("Problem in loading segment blocks."))
 
     sql("DROP TABLE sdkOutputTable")
     // drop table should not delete the files

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3894e1d0/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndex.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndex.scala b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndex.scala
index c330fcb..d970892 100644
--- a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndex.scala
+++ b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndex.scala
@@ -28,7 +28,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{AtomicType, StructType}
 
 import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, HDFSCarbonFile}
 import org.apache.carbondata.core.readcommitter.LatestFilesReadCommittedScope
@@ -84,9 +84,13 @@ class CarbonFileIndex(
       val hadoopConf = sparkSession.sessionState.newHadoopConf()
       // convert t sparks source filter
       val filters = dataFilters.flatMap(DataSourceStrategy.translateFilter)
-
+      val dataTypeMap = dataSchema.map(f => f.name -> f.dataType).toMap
       // convert to carbon filter expressions
-      val filter: Option[CarbonExpression] = filters.flatMap { filter =>
+      val filter: Option[CarbonExpression] = filters.filterNot{ ref =>
+        ref.references.exists{ p =>
+          !dataTypeMap(p).isInstanceOf[AtomicType]
+        }
+      }.flatMap { filter =>
         CarbonSparkDataSourceUtil.createCarbonFilter(dataSchema, filter)
       }.reduceOption(new AndExpression(_, _))
       val model = CarbonSparkDataSourceUtil.prepareLoadModel(parameters, dataSchema)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3894e1d0/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
index b088b98..406e2c9 100644
--- a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
+++ b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
@@ -92,14 +92,18 @@ class SparkCarbonFileFormat extends FileFormat
     val tableInfo = SchemaReader.inferSchema(AbsoluteTableIdentifier.from(tablePath, "", ""), false)
     val table = CarbonTable.buildFromTableInfo(tableInfo)
     var schema = new StructType
-    tableInfo.getFactTable.getListOfColumns.asScala.foreach { col =>
+    val fields = tableInfo.getFactTable.getListOfColumns.asScala.map { col =>
       // TODO find better way to know its a child
       if (!col.getColumnName.contains(".")) {
-        schema = schema.add(
-          col.getColumnName,
-          SparkTypeConverter.convertCarbonToSparkDataType(col, table))
+        Some((col.getSchemaOrdinal,
+          StructField(col.getColumnName,
+            SparkTypeConverter.convertCarbonToSparkDataType(col, table))))
+      } else {
+        None
       }
-    }
+    }.filter(_.nonEmpty).map(_.get)
+    // Maintain the schema order.
+    fields.sortBy(_._1).foreach(f => schema = schema.add(f._2))
     Some(schema)
   }