You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2019/02/14 07:27:02 UTC

[carbondata] branch master updated: [CARBONDATA-3287]Remove the validation for same schema in a location and fix drop datamap issue

This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 6091eb7  [CARBONDATA-3287]Remove the validation for same schema in a location and fix drop datamap issue
6091eb7 is described below

commit 6091eb7ea2989aa8e95ee4ee806c5e2c9e4b17a0
Author: akashrn5 <ak...@gmail.com>
AuthorDate: Mon Feb 4 16:07:02 2019 +0530

    [CARBONDATA-3287]Remove the validation for same schema in a location and fix drop datamap issue
    
    ### Why this PR?
    Currently we have a validation that if there are two carbondata files in a location with different schema, then we fail the query. I think there is no need to fail. If you see the parquet behavior also we cna understand.
    
    Here i think failing is not good, we can read the latets schema from latest carbondata file in the given location and based on that read all the files and give query output. For the columns which are not present in some data files, it wil have null values for the new column.
    
    But here basically we do not merge schema. we can maintain the same now also, only thing is can take latest schma.
    
    ### Points to Observe
    1. one data file with columns a,b and c. 2nd file is with columns a,b,c,d,e. then can read and create table with 5 columns or 3 columns which ever is latest and create table(This will be when user does not specify schema). If he species table will be created with specified schema
    2. Only **validation** happens is if the column name is same in both the data files present at location, and the datatype is different, then the query fails
    3. When ffirst query is fired the dtamap is created for the table, and if new column is present in other data file, the the datamap is not updated as the table name will be same, so if column list is different we can drop datamap and create again
    
    This closes #3121
---
 .../core/datamap/DataMapStoreManager.java          | 12 +++++++-
 .../carbondata/core/datamap/TableDataMap.java      | 16 +++++++---
 .../carbondata/core/datamap/dev/DataMap.java       |  4 +--
 .../datamap/dev/cgdatamap/CoarseGrainDataMap.java  |  4 +--
 .../datamap/dev/fgdatamap/FineGrainDataMap.java    |  4 +--
 .../indexstore/blockletindex/BlockDataMap.java     |  8 ++---
 .../core/metadata/schema/table/CarbonTable.java    | 27 +++++++++++------
 .../scan/executor/impl/AbstractQueryExecutor.java  | 22 ++++++++------
 .../carbondata/core/scan/model/QueryModel.java     | 34 ++++++++++++++++------
 .../carbondata/core/util/BlockletDataMapUtil.java  | 33 ++++++++++++---------
 .../apache/carbondata/core/util/CarbonUtil.java    | 18 +++++++++---
 .../PrestoTestNonTransactionalTableFiles.scala     |  2 +-
 .../datasource/SparkCarbonDataSourceTest.scala     |  8 ++---
 13 files changed, 126 insertions(+), 66 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
index baf4739..c5cf55d 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
@@ -52,6 +52,7 @@ import org.apache.carbondata.core.util.ThreadLocalSessionInfo;
 import static org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider.MV;
 import static org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider.PREAGGREGATE;
 
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.hadoop.fs.Path;
 import org.apache.log4j.Logger;
 
@@ -322,6 +323,15 @@ public final class DataMapStoreManager {
         tableIndices = allDataMaps.get(tableUniqueName);
       }
     }
+    // in case of fileformat or sdk, when table is dropped or schema is changed the datamaps are
+    // not cleared, they need to be cleared by using API, so compare the columns, if not same, clear
+    // the datamaps on that table
+    if (allDataMaps.size() > 0 && !CollectionUtils.isEmpty(allDataMaps.get(tableUniqueName))
+        && !allDataMaps.get(tableUniqueName).get(0).getTable().getTableInfo().getFactTable()
+        .getListOfColumns().equals(table.getTableInfo().getFactTable().getListOfColumns())) {
+      clearDataMaps(tableUniqueName);
+      tableIndices = null;
+    }
     TableDataMap dataMap = null;
     if (tableIndices != null) {
       dataMap = getTableDataMap(dataMapSchema.getDataMapName(), tableIndices);
@@ -422,7 +432,7 @@ public final class DataMapStoreManager {
       blockletDetailsFetcher = getBlockletDetailsFetcher(table);
     }
     segmentPropertiesFetcher = (SegmentPropertiesFetcher) blockletDetailsFetcher;
-    TableDataMap dataMap = new TableDataMap(table.getAbsoluteTableIdentifier(),
+    TableDataMap dataMap = new TableDataMap(table,
         dataMapSchema, dataMapFactory, blockletDetailsFetcher, segmentPropertiesFetcher);
 
     tableIndices.add(dataMap);
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
index 86390e8..0d46fd8 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
@@ -43,6 +43,7 @@ import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.indexstore.PartitionSpec;
 import org.apache.carbondata.core.indexstore.SegmentPropertiesFetcher;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
 import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
@@ -64,6 +65,8 @@ import org.apache.log4j.Logger;
 @InterfaceAudience.Internal
 public final class TableDataMap extends OperationEventListener {
 
+  private CarbonTable table;
+
   private AbsoluteTableIdentifier identifier;
 
   private DataMapSchema dataMapSchema;
@@ -80,10 +83,11 @@ public final class TableDataMap extends OperationEventListener {
   /**
    * It is called to initialize and load the required table datamap metadata.
    */
-  TableDataMap(AbsoluteTableIdentifier identifier, DataMapSchema dataMapSchema,
+  TableDataMap(CarbonTable table, DataMapSchema dataMapSchema,
       DataMapFactory dataMapFactory, BlockletDetailsFetcher blockletDetailsFetcher,
       SegmentPropertiesFetcher segmentPropertiesFetcher) {
-    this.identifier = identifier;
+    this.identifier = table.getAbsoluteTableIdentifier();
+    this.table = table;
     this.dataMapSchema = dataMapSchema;
     this.dataMapFactory = dataMapFactory;
     this.blockletDetailsFetcher = blockletDetailsFetcher;
@@ -115,8 +119,8 @@ public final class TableDataMap extends OperationEventListener {
       } else {
         segmentProperties = segmentPropertiesFetcher.getSegmentProperties(segment);
         for (DataMap dataMap : dataMaps.get(segment)) {
-          pruneBlocklets
-              .addAll(dataMap.prune(filterExp, segmentProperties, partitions, identifier));
+          pruneBlocklets.addAll(dataMap
+              .prune(filterExp, segmentProperties, partitions, table));
         }
       }
       blocklets.addAll(addSegmentId(
@@ -126,6 +130,10 @@ public final class TableDataMap extends OperationEventListener {
     return blocklets;
   }
 
+  public CarbonTable getTable() {
+    return table;
+  }
+
   /**
    * Pass the valid segments and prune the datamap using filter expression
    *
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
index f31b7f3..c52cc41 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
@@ -24,7 +24,7 @@ import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.indexstore.Blocklet;
 import org.apache.carbondata.core.indexstore.PartitionSpec;
 import org.apache.carbondata.core.memory.MemoryException;
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 
@@ -52,7 +52,7 @@ public interface DataMap<T extends Blocklet> {
    * blocklets where these filters can exist.
    */
   List<T> prune(Expression filter, SegmentProperties segmentProperties,
-      List<PartitionSpec> partitions, AbsoluteTableIdentifier identifier) throws IOException;
+      List<PartitionSpec> partitions, CarbonTable carbonTable) throws IOException;
 
   // TODO Move this method to Abstract class
   /**
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/CoarseGrainDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/CoarseGrainDataMap.java
index fc1f104..b4af9d9 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/CoarseGrainDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/CoarseGrainDataMap.java
@@ -25,7 +25,7 @@ import org.apache.carbondata.core.datamap.dev.DataMap;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.indexstore.Blocklet;
 import org.apache.carbondata.core.indexstore.PartitionSpec;
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.scan.expression.Expression;
 
 /**
@@ -37,7 +37,7 @@ public abstract class CoarseGrainDataMap implements DataMap<Blocklet> {
 
   @Override
   public List<Blocklet> prune(Expression expression, SegmentProperties segmentProperties,
-      List<PartitionSpec> partitions, AbsoluteTableIdentifier identifier) throws IOException {
+      List<PartitionSpec> partitions, CarbonTable carbonTable) throws IOException {
     throw new UnsupportedOperationException("Filter expression not supported");
   }
 
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/FineGrainDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/FineGrainDataMap.java
index a6732a6..03b2bfb 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/FineGrainDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/FineGrainDataMap.java
@@ -24,7 +24,7 @@ import org.apache.carbondata.common.annotations.InterfaceStability;
 import org.apache.carbondata.core.datamap.dev.DataMap;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.indexstore.PartitionSpec;
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.scan.expression.Expression;
 
 /**
@@ -36,7 +36,7 @@ public abstract class FineGrainDataMap implements DataMap<FineGrainBlocklet> {
 
   @Override
   public List<FineGrainBlocklet> prune(Expression filter, SegmentProperties segmentProperties,
-      List<PartitionSpec> partitions, AbsoluteTableIdentifier identifier) throws IOException {
+      List<PartitionSpec> partitions, CarbonTable carbonTable) throws IOException {
     throw new UnsupportedOperationException("Filter expression not supported");
   }
 
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
index e29dfef..a7818c2 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
@@ -45,7 +45,6 @@ import org.apache.carbondata.core.indexstore.row.DataMapRow;
 import org.apache.carbondata.core.indexstore.row.DataMapRowImpl;
 import org.apache.carbondata.core.indexstore.schema.CarbonRowSchema;
 import org.apache.carbondata.core.memory.MemoryException;
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
 import org.apache.carbondata.core.metadata.blocklet.index.BlockletIndex;
 import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex;
@@ -708,17 +707,18 @@ public class BlockDataMap extends CoarseGrainDataMap
 
   @Override
   public List<Blocklet> prune(Expression expression, SegmentProperties properties,
-      List<PartitionSpec> partitions, AbsoluteTableIdentifier identifier) throws IOException {
+      List<PartitionSpec> partitions, CarbonTable carbonTable) throws IOException {
     FilterResolverIntf filterResolverIntf = null;
     if (expression != null) {
       QueryModel.FilterProcessVO processVO =
           new QueryModel.FilterProcessVO(properties.getDimensions(), properties.getMeasures(),
               new ArrayList<CarbonDimension>());
-      QueryModel.processFilterExpression(processVO, expression, null, null);
+      QueryModel.processFilterExpression(processVO, expression, null, null, carbonTable);
       // Optimize Filter Expression and fit RANGE filters is conditions apply.
       FilterOptimizer rangeFilterOptimizer = new RangeFilterOptmizer(expression);
       rangeFilterOptimizer.optimizeFilter();
-      filterResolverIntf = CarbonTable.resolveFilter(expression, identifier);
+      filterResolverIntf =
+          CarbonTable.resolveFilter(expression, carbonTable.getAbsoluteTableIdentifier());
     }
     return prune(filterResolverIntf, properties, partitions);
   }
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index c4adab4..8ed781a 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -246,7 +246,7 @@ public class CarbonTable implements Serializable {
       String tableName,
       Configuration configuration) throws IOException {
     TableInfo tableInfoInfer = CarbonUtil.buildDummyTableInfo(tablePath, "null", "null");
-    CarbonFile carbonFile = getFirstIndexFile(FileFactory.getCarbonFile(tablePath, configuration));
+    CarbonFile carbonFile = getLatestIndexFile(FileFactory.getCarbonFile(tablePath, configuration));
     if (carbonFile == null) {
       throw new RuntimeException("Carbon index file not exists.");
     }
@@ -265,22 +265,31 @@ public class CarbonTable implements Serializable {
     return CarbonTable.buildFromTableInfo(tableInfoInfer);
   }
 
-  private static CarbonFile getFirstIndexFile(CarbonFile tablePath) {
+  private static CarbonFile getLatestIndexFile(CarbonFile tablePath) {
     CarbonFile[] carbonFiles = tablePath.listFiles();
+    CarbonFile latestCarbonIndexFile = null;
+    long latestIndexFileTimestamp = 0L;
     for (CarbonFile carbonFile : carbonFiles) {
-      if (carbonFile.isDirectory()) {
+      if (carbonFile.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT)
+          && carbonFile.getLastModifiedTime() > latestIndexFileTimestamp) {
+        latestCarbonIndexFile = carbonFile;
+        latestIndexFileTimestamp = carbonFile.getLastModifiedTime();
+      } else if (carbonFile.isDirectory()) {
         // if the list has directories that doesn't contain index files,
         // continue checking other files/directories in the list.
-        if (getFirstIndexFile(carbonFile) == null) {
+        if (getLatestIndexFile(carbonFile) == null) {
           continue;
         } else {
-          return getFirstIndexFile(carbonFile);
+          return getLatestIndexFile(carbonFile);
         }
-      } else if (carbonFile.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT)) {
-        return carbonFile;
       }
     }
-    return null;
+    if (latestCarbonIndexFile != null) {
+      return latestCarbonIndexFile;
+    } else {
+      // returning null only if the path doesn't have index files.
+      return null;
+    }
   }
 
   public static CarbonTable buildDummyTable(String tablePath) throws IOException {
@@ -1058,7 +1067,7 @@ public class CarbonTable implements Serializable {
         new QueryModel.FilterProcessVO(getDimensionByTableName(getTableName()),
             getMeasureByTableName(getTableName()), getImplicitDimensionByTableName(getTableName()));
     QueryModel.processFilterExpression(processVO, filterExpression, isFilterDimensions,
-        isFilterMeasures);
+        isFilterMeasures, this);
 
     if (null != filterExpression) {
       // Optimize Filter Expression and fit RANGE filters is conditions apply.
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
index ab7c577..f81a3dc 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
@@ -273,12 +273,15 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
     if (queryModel.getTable().isTransactionalTable()) {
       return;
     }
-    // First validate the schema of the carbondata file
-    boolean sameColumnSchemaList = BlockletDataMapUtil.isSameColumnSchemaList(columnsInTable,
-        queryModel.getTable().getTableInfo().getFactTable().getListOfColumns());
+    // First validate the schema of the carbondata file if the same column name have different
+    // datatype
+    boolean sameColumnSchemaList = BlockletDataMapUtil
+        .isSameColumnAndDifferentDatatypeInSchema(columnsInTable,
+            queryModel.getTable().getTableInfo().getFactTable().getListOfColumns());
     if (!sameColumnSchemaList) {
-      LOGGER.error("Schema of " + filePath + " doesn't match with the table's schema");
-      throw new IOException("All the files doesn't have same schema. "
+      LOGGER.error("Datatype of the common columns present in " + filePath + " doesn't match with"
+          + "the column's datatype in table schema");
+      throw new IOException("All common columns present in the files doesn't have same datatype. "
           + "Unsupported operation on nonTransactional table. Check logs.");
     }
     List<ProjectionDimension> dimensions = queryModel.getProjectionDimensions();
@@ -331,10 +334,11 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
   private void createFilterExpression(QueryModel queryModel, SegmentProperties properties) {
     Expression expression = queryModel.getFilterExpression();
     if (expression != null) {
-      QueryModel.FilterProcessVO processVO =
-          new QueryModel.FilterProcessVO(properties.getDimensions(), properties.getMeasures(),
-              new ArrayList<CarbonDimension>());
-      QueryModel.processFilterExpression(processVO, expression, null, null);
+      QueryModel.FilterProcessVO processVO = new QueryModel.FilterProcessVO(
+          properties.getDimensions(),
+          properties.getMeasures(),
+          new ArrayList<CarbonDimension>());
+      QueryModel.processFilterExpression(processVO, expression, null, null, queryModel.getTable());
       // Optimize Filter Expression and fit RANGE filters is conditions apply.
       FilterOptimizer rangeFilterOptimizer = new RangeFilterOptmizer(expression);
       rangeFilterOptimizer.optimizeFilter();
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
index d7dcee0..d6017f5 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
@@ -145,29 +145,33 @@ public class QueryModel {
   }
 
   public static void processFilterExpression(FilterProcessVO processVO, Expression filterExpression,
-      final boolean[] isFilterDimensions, final boolean[] isFilterMeasures) {
+      final boolean[] isFilterDimensions, final boolean[] isFilterMeasures,
+      CarbonTable carbonTable) {
     if (null != filterExpression) {
       if (null != filterExpression.getChildren() && filterExpression.getChildren().size() == 0) {
         if (filterExpression instanceof ConditionalExpression) {
           List<ColumnExpression> listOfCol =
               ((ConditionalExpression) filterExpression).getColumnList();
           for (ColumnExpression expression : listOfCol) {
-            setDimAndMsrColumnNode(processVO, expression, isFilterDimensions, isFilterMeasures);
+            setDimAndMsrColumnNode(processVO, expression, isFilterDimensions, isFilterMeasures,
+                carbonTable);
           }
         }
       }
       for (Expression expression : filterExpression.getChildren()) {
         if (expression instanceof ColumnExpression) {
           setDimAndMsrColumnNode(processVO, (ColumnExpression) expression, isFilterDimensions,
-              isFilterMeasures);
+              isFilterMeasures, carbonTable);
         } else if (expression instanceof UnknownExpression) {
           UnknownExpression exp = ((UnknownExpression) expression);
           List<ColumnExpression> listOfColExpression = exp.getAllColumnList();
           for (ColumnExpression col : listOfColExpression) {
-            setDimAndMsrColumnNode(processVO, col, isFilterDimensions, isFilterMeasures);
+            setDimAndMsrColumnNode(processVO, col, isFilterDimensions, isFilterMeasures,
+                carbonTable);
           }
         } else {
-          processFilterExpression(processVO, expression, isFilterDimensions, isFilterMeasures);
+          processFilterExpression(processVO, expression, isFilterDimensions, isFilterMeasures,
+              carbonTable);
         }
       }
     }
@@ -184,7 +188,7 @@ public class QueryModel {
   }
 
   private static void setDimAndMsrColumnNode(FilterProcessVO processVO, ColumnExpression col,
-      boolean[] isFilterDimensions, boolean[] isFilterMeasures) {
+      boolean[] isFilterDimensions, boolean[] isFilterMeasures, CarbonTable table) {
     CarbonDimension dim;
     CarbonMeasure msr;
     String columnName;
@@ -209,13 +213,25 @@ public class QueryModel {
       if (null != isFilterMeasures) {
         isFilterMeasures[msr.getOrdinal()] = true;
       }
-    } else {
+    } else if (null != CarbonUtil.findDimension(processVO.getImplicitDimensions(), columnName)) {
       // check if this is an implicit dimension
-      dim = CarbonUtil
-          .findDimension(processVO.getImplicitDimensions(), columnName);
+      dim = CarbonUtil.findDimension(processVO.getImplicitDimensions(), columnName);
       col.setCarbonColumn(dim);
       col.setDimension(dim);
       col.setDimension(true);
+    } else {
+      // in case of sdk or fileformat, there can be chance that each carbondata file may have
+      // different schema, so every segment properties will have dims and measures based on
+      // corresponding segment. So the filter column may not be present in it. so generate the
+      // dimension and measure from the carbontable
+      CarbonDimension dimension =
+          table.getDimensionByName(table.getTableName(), col.getColumnName());
+      CarbonMeasure measure = table.getMeasureByName(table.getTableName(), col.getColumnName());
+      col.setDimension(dimension);
+      col.setMeasure(measure);
+      col.setCarbonColumn(dimension == null ? measure : dimension);
+      col.setDimension(null != dimension);
+      col.setMeasure(null != measure);
     }
   }
 
diff --git a/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
index b81bc75..68aad72 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
@@ -109,10 +109,10 @@ public class BlockletDataMapUtil {
         isTransactionalTable);
     for (DataFileFooter footer : indexInfo) {
       if ((!isTransactionalTable) && (tableColumnList.size() != 0) &&
-          !isSameColumnSchemaList(footer.getColumnInTable(), tableColumnList)) {
-        LOG.error("Schema of " + identifier.getIndexFileName()
-            + " doesn't match with the table's schema");
-        throw new IOException("All the files doesn't have same schema. "
+          !isSameColumnAndDifferentDatatypeInSchema(footer.getColumnInTable(), tableColumnList)) {
+        LOG.error("Datatype of the common columns present in " + identifier.getIndexFileName()
+            + " doesn't match with the column's datatype in table schema");
+        throw new IOException("All common columns present in the files doesn't have same datatype. "
             + "Unsupported operation on nonTransactional table. Check logs.");
       }
       if ((tableColumnList != null) && (tableColumnList.size() == 0)) {
@@ -252,16 +252,23 @@ public class BlockletDataMapUtil {
     return true;
   }
 
-  public static boolean isSameColumnSchemaList(List<ColumnSchema> indexFileColumnList,
-      List<ColumnSchema> tableColumnList) {
-    if (indexFileColumnList.size() != tableColumnList.size()) {
-      LOG.error("Index file's column size is " + indexFileColumnList.size()
-          + " but table's column size is " + tableColumnList.size());
-      return false;
-    }
+  /**
+   * This method validates whether the schema present in index and table contains the same column
+   * name but with different dataType.
+   */
+  public static boolean isSameColumnAndDifferentDatatypeInSchema(
+      List<ColumnSchema> indexFileColumnList, List<ColumnSchema> tableColumnList) {
     for (int i = 0; i < tableColumnList.size(); i++) {
-      if (!tableColumnList.contains(indexFileColumnList.get(i))) {
-        return false;
+      for (int j = 0; j < indexFileColumnList.size(); j++) {
+        if (indexFileColumnList.get(j).getColumnName()
+            .equalsIgnoreCase(tableColumnList.get(i).getColumnName()) && !indexFileColumnList.get(j)
+            .getDataType().getName()
+            .equalsIgnoreCase(tableColumnList.get(i).getDataType().getName())) {
+          LOG.error("Datatype of the Column " + indexFileColumnList.get(j).getColumnName()
+              + " present in index file, is not same as datatype of the column with same name"
+              + "present in table");
+          return false;
+        }
       }
     }
     return true;
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 3fb54f0..2b1cd6e 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -2188,9 +2188,14 @@ public final class CarbonUtil {
     CarbonFile segment = FileFactory.getCarbonFile(path, configuration);
 
     CarbonFile[] dataFiles = segment.listFiles();
+    CarbonFile latestCarbonFile = null;
+    long latestDatafileTimestamp = 0L;
+    // get the latest carbondatafile to get the latest schema in the folder
     for (CarbonFile dataFile : dataFiles) {
-      if (dataFile.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT)) {
-        return dataFile.getAbsolutePath();
+      if (dataFile.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT)
+          && dataFile.getLastModifiedTime() > latestDatafileTimestamp) {
+        latestCarbonFile = dataFile;
+        latestDatafileTimestamp = dataFile.getLastModifiedTime();
       } else if (dataFile.isDirectory()) {
         // if the list has directories that doesn't contain data files,
         // continue checking other files/directories in the list.
@@ -2201,8 +2206,13 @@ public final class CarbonUtil {
         }
       }
     }
-    //returning null only if the path doesn't have data files.
-    return null;
+
+    if (latestCarbonFile != null) {
+      return latestCarbonFile.getAbsolutePath();
+    } else {
+      //returning null only if the path doesn't have data files.
+      return null;
+    }
   }
 
   /**
diff --git a/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoTestNonTransactionalTableFiles.scala b/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoTestNonTransactionalTableFiles.scala
index bdee4a1..97691d6 100644
--- a/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoTestNonTransactionalTableFiles.scala
+++ b/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoTestNonTransactionalTableFiles.scala
@@ -272,7 +272,7 @@ class PrestoTestNonTransactionalTableFiles extends FunSuiteLike with BeforeAndAf
           .executeQuery("select count(*) as RESULT from files ")
       }
     assert(exception.getMessage()
-      .contains("All the files doesn't have same schema"))
+      .contains("All common columns present in the files doesn't have same datatype"))
     cleanTestData()
   }
 
diff --git a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
index 329a250..fa37548 100644
--- a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
+++ b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
@@ -1225,12 +1225,8 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll {
     assert(spark.sql("select * from sdkout").collect().length == 5)
     buildTestDataOtherDataType(5, null, warehouse1+"/sdk1", 2)
     spark.sql("refresh table sdkout")
-    intercept[Exception] {
-      spark.sql("select * from sdkout").show()
-    }
-    intercept[Exception] {
-      spark.sql("select * from sdkout where salary=100").show()
-    }
+    assert(spark.sql("select * from sdkout").count() == 10)
+    assert(spark.sql("select * from sdkout where salary=100").count() == 1)
     FileFactory.deleteAllFilesOfDir(new File(warehouse1+"/sdk1"))
   }