You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/03/23 15:52:30 UTC

[5/5] carbondata git commit: [CARBONDATA-2224][File Level Reader Support] External File level reader support

[CARBONDATA-2224][File Level Reader Support] External File level reader support

File level reader reads any carbondata file placed in any external file path.

This closes #2055


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/9a25dc6a
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/9a25dc6a
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/9a25dc6a

Branch: refs/heads/master
Commit: 9a25dc6af4901fdc3951259a3a08bd089aeb9e36
Parents: 982d03f
Author: sounakr <so...@gmail.com>
Authored: Sat Feb 24 07:55:14 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Fri Mar 23 21:20:24 2018 +0530

----------------------------------------------------------------------
 .../core/metadata/schema/table/CarbonTable.java |   6 +
 .../apache/carbondata/core/util/CarbonUtil.java | 209 +++++-
 .../hadoop/api/CarbonFileInputFormat.java       | 682 +++++++++++++++++++
 .../carbondata/hadoop/util/SchemaReader.java    |  17 +-
 integration/spark-common-test/pom.xml           |   6 +
 ...FileInputFormatWithExternalCarbonTable.scala | 240 +++++++
 ...tCreateTableUsingSparkCarbonFileFormat.scala | 327 +++++++++
 ...tSparkCarbonFileFormatWithSparkSession.scala | 176 +++++
 .../carbondata/spark/rdd/CarbonScanRDD.scala    |  64 +-
 .../VectorizedCarbonRecordReader.java           |  22 +-
 .../management/CarbonLoadDataCommand.scala      |   4 +-
 .../command/table/CarbonDropTableCommand.scala  |   2 +-
 .../datasources/CarbonFileFormat.scala          | 443 ------------
 .../datasources/SparkCarbonFileFormat.scala     | 269 ++++++++
 .../datasources/SparkCarbonTableFormat.scala    | 443 ++++++++++++
 .../sql/execution/strategy/DDLStrategy.scala    |  27 +-
 .../spark/sql/hive/CarbonAnalysisRules.scala    |   2 +-
 .../spark/sql/parser/CarbonSparkSqlParser.scala |  47 +-
 .../spark/sql/hive/CarbonSessionState.scala     |  17 +-
 .../spark/sql/hive/CarbonSessionState.scala     |  16 +-
 ....apache.spark.sql.sources.DataSourceRegister |   3 +-
 .../sdk/file/CSVCarbonWriterTest.java           |   6 +-
 22 files changed, 2522 insertions(+), 506 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/9a25dc6a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index f14672f..278dc96 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -826,6 +826,12 @@ public class CarbonTable implements Serializable {
     return external != null && external.equalsIgnoreCase("true");
   }
 
+  public boolean isFileLevelExternalTable() {
+    String external = tableInfo.getFactTable().getTableProperties().get("_filelevelexternal");
+    return external != null && external.equalsIgnoreCase("true");
+  }
+
+
   public long size() throws IOException {
     Map<String, Long> dataIndexSize = CarbonUtil.calculateDataIndexSize(this);
     Long dataSize = dataIndexSize.get(CarbonCommonConstants.CARBON_TOTAL_DATA_SIZE);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9a25dc6a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 5bb1c8e..337c150 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -41,6 +41,7 @@ import org.apache.carbondata.core.datastore.columnar.ColumnGroupModel;
 import org.apache.carbondata.core.datastore.columnar.UnBlockIndexer;
 import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.exception.InvalidConfigurationException;
 import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
@@ -52,18 +53,26 @@ import org.apache.carbondata.core.metadata.SegmentFileStore;
 import org.apache.carbondata.core.metadata.ValueEncoderMeta;
 import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
 import org.apache.carbondata.core.metadata.blocklet.SegmentInfo;
+import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypeAdapter;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.datatype.DecimalType;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.metadata.schema.SchemaEvolution;
+import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry;
 import org.apache.carbondata.core.metadata.schema.table.AggregationDataMapSchema;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
+import org.apache.carbondata.core.metadata.schema.table.RelationIdentifier;
 import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.metadata.schema.table.TableSchema;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.metadata.schema.table.column.ParentColumnTableRelation;
 import org.apache.carbondata.core.mutate.UpdateVO;
+import org.apache.carbondata.core.reader.CarbonHeaderReader;
 import org.apache.carbondata.core.reader.ThriftReader;
 import org.apache.carbondata.core.reader.ThriftReader.TBaseCreator;
 import org.apache.carbondata.core.scan.model.ProjectionDimension;
@@ -77,6 +86,8 @@ import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.format.BlockletHeader;
 import org.apache.carbondata.format.DataChunk2;
 import org.apache.carbondata.format.DataChunk3;
+import org.apache.carbondata.format.FileHeader;
+
 
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
@@ -1280,7 +1291,7 @@ public final class CarbonUtil {
     int counter = 0;
     for (int i = 0; i < wrapperColumnSchemaList.size(); i++) {
       if (CarbonUtil.hasEncoding(wrapperColumnSchemaList.get(i).getEncodingList(),
-          org.apache.carbondata.core.metadata.encoder.Encoding.DICTIONARY)) {
+          Encoding.DICTIONARY)) {
         cardinality.add(dictionaryColumnCardinality[counter]);
         counter++;
       } else if (!wrapperColumnSchemaList.get(i).isDimensionColumn()) {
@@ -2069,6 +2080,202 @@ public final class CarbonUtil {
     return tableInfo;
   }
 
+  public static ColumnSchema thriftColumnSchmeaToWrapperColumnSchema(
+      org.apache.carbondata.format.ColumnSchema externalColumnSchema) {
+    ColumnSchema wrapperColumnSchema = new ColumnSchema();
+    wrapperColumnSchema.setColumnUniqueId(externalColumnSchema.getColumn_id());
+    wrapperColumnSchema.setColumnName(externalColumnSchema.getColumn_name());
+    wrapperColumnSchema.setColumnar(externalColumnSchema.isColumnar());
+    DataType dataType = thriftDataTyopeToWrapperDataType(externalColumnSchema.data_type);
+    if (DataTypes.isDecimal(dataType)) {
+      DecimalType decimalType = (DecimalType) dataType;
+      decimalType.setPrecision(externalColumnSchema.getPrecision());
+      decimalType.setScale(externalColumnSchema.getScale());
+    }
+    wrapperColumnSchema.setDataType(dataType);
+    wrapperColumnSchema.setDimensionColumn(externalColumnSchema.isDimension());
+    List<Encoding> encoders = new ArrayList<Encoding>();
+    for (org.apache.carbondata.format.Encoding encoder : externalColumnSchema.getEncoders()) {
+      encoders.add(fromExternalToWrapperEncoding(encoder));
+    }
+    wrapperColumnSchema.setEncodingList(encoders);
+    wrapperColumnSchema.setNumberOfChild(externalColumnSchema.getNum_child());
+    wrapperColumnSchema.setPrecision(externalColumnSchema.getPrecision());
+    wrapperColumnSchema.setColumnGroup(externalColumnSchema.getColumn_group_id());
+    wrapperColumnSchema.setScale(externalColumnSchema.getScale());
+    wrapperColumnSchema.setDefaultValue(externalColumnSchema.getDefault_value());
+    wrapperColumnSchema.setSchemaOrdinal(externalColumnSchema.getSchemaOrdinal());
+    Map<String, String> properties = externalColumnSchema.getColumnProperties();
+    if (properties != null) {
+      if (properties.get(CarbonCommonConstants.SORT_COLUMNS) != null) {
+        wrapperColumnSchema.setSortColumn(true);
+      }
+    }
+    wrapperColumnSchema.setFunction(externalColumnSchema.getAggregate_function());
+    List<org.apache.carbondata.format.ParentColumnTableRelation> parentColumnTableRelation =
+        externalColumnSchema.getParentColumnTableRelations();
+    if (null != parentColumnTableRelation) {
+      wrapperColumnSchema.setParentColumnTableRelations(
+          fromThriftToWrapperParentTableColumnRelations(parentColumnTableRelation));
+    }
+    return wrapperColumnSchema;
+  }
+
+  static List<ParentColumnTableRelation> fromThriftToWrapperParentTableColumnRelations(
+      List<org.apache.carbondata.format.ParentColumnTableRelation> thirftParentColumnRelation) {
+    List<ParentColumnTableRelation> parentColumnTableRelationList = new ArrayList<>();
+    for (org.apache.carbondata.format.ParentColumnTableRelation carbonTableRelation :
+        thirftParentColumnRelation) {
+      RelationIdentifier relationIdentifier =
+          new RelationIdentifier(carbonTableRelation.getRelationIdentifier().getDatabaseName(),
+              carbonTableRelation.getRelationIdentifier().getTableName(),
+              carbonTableRelation.getRelationIdentifier().getTableId());
+      ParentColumnTableRelation parentColumnTableRelation =
+          new ParentColumnTableRelation(relationIdentifier, carbonTableRelation.getColumnId(),
+              carbonTableRelation.getColumnName());
+      parentColumnTableRelationList.add(parentColumnTableRelation);
+    }
+    return parentColumnTableRelationList;
+  }
+
+  static Encoding fromExternalToWrapperEncoding(
+      org.apache.carbondata.format.Encoding encoderThrift) {
+    switch (encoderThrift) {
+      case DICTIONARY:
+        return Encoding.DICTIONARY;
+      case DELTA:
+        return Encoding.DELTA;
+      case RLE:
+        return Encoding.RLE;
+      case INVERTED_INDEX:
+        return Encoding.INVERTED_INDEX;
+      case BIT_PACKED:
+        return Encoding.BIT_PACKED;
+      case DIRECT_DICTIONARY:
+        return Encoding.DIRECT_DICTIONARY;
+      default:
+        throw new IllegalArgumentException(encoderThrift.toString() + " is not supported");
+    }
+  }
+
+  static DataType thriftDataTyopeToWrapperDataType(
+      org.apache.carbondata.format.DataType dataTypeThrift) {
+    switch (dataTypeThrift) {
+      case BOOLEAN:
+        return DataTypes.BOOLEAN;
+      case STRING:
+        return DataTypes.STRING;
+      case SHORT:
+        return DataTypes.SHORT;
+      case INT:
+        return DataTypes.INT;
+      case LONG:
+        return DataTypes.LONG;
+      case DOUBLE:
+        return DataTypes.DOUBLE;
+      case DECIMAL:
+        return DataTypes.createDefaultDecimalType();
+      case DATE:
+        return DataTypes.DATE;
+      case TIMESTAMP:
+        return DataTypes.TIMESTAMP;
+      case ARRAY:
+        return DataTypes.createDefaultArrayType();
+      case STRUCT:
+        return DataTypes.createDefaultStructType();
+      default:
+        return DataTypes.STRING;
+    }
+  }
+
+  public static List<String> getFilePathExternalFilePath(String path) {
+
+    // return the list of carbondata files in the given path.
+    CarbonFile segment = FileFactory.getCarbonFile(path, FileFactory.getFileType(path));
+    CarbonFile[] dataFiles = segment.listFiles(new CarbonFileFilter() {
+      @Override public boolean accept(CarbonFile file) {
+
+        if (file.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT)) {
+          return true;
+        }
+        return false;
+      }
+    });
+    List<String> filePaths = new ArrayList<>(dataFiles.length);
+    for (CarbonFile dfiles : dataFiles) {
+      filePaths.add(dfiles.getAbsolutePath());
+    }
+    return filePaths;
+  }
+
+  /**
+   * This method will read the schema file from a given path
+   *
+   * @param schemaFilePath
+   * @return
+   */
+  public static org.apache.carbondata.format.TableInfo inferSchemaFileExternalTable(
+      String carbonDataFilePath, AbsoluteTableIdentifier absoluteTableIdentifier,
+      boolean schemaExists) throws IOException {
+    TBaseCreator createTBase = new ThriftReader.TBaseCreator() {
+      public org.apache.thrift.TBase<org.apache.carbondata.format.TableInfo,
+          org.apache.carbondata.format.TableInfo._Fields> create() {
+        return new org.apache.carbondata.format.TableInfo();
+      }
+    };
+    if (schemaExists == false) {
+      List<String> filePaths =
+          getFilePathExternalFilePath(carbonDataFilePath + "/Fact/Part0/Segment_null");
+      String fistFilePath = null;
+      try {
+        fistFilePath = filePaths.get(0);
+      } catch (Exception e) {
+        LOGGER.error("CarbonData file is not present in the table location");
+      }
+      CarbonHeaderReader carbonHeaderReader = new CarbonHeaderReader(fistFilePath);
+      FileHeader fileHeader = carbonHeaderReader.readHeader();
+      List<ColumnSchema> columnSchemaList = new ArrayList<ColumnSchema>();
+      List<org.apache.carbondata.format.ColumnSchema> table_columns = fileHeader.getColumn_schema();
+      for (int i = 0; i < table_columns.size(); i++) {
+        ColumnSchema col = thriftColumnSchmeaToWrapperColumnSchema(table_columns.get(i));
+        col.setColumnReferenceId(col.getColumnUniqueId());
+        columnSchemaList.add(col);
+      }
+      TableSchema tableSchema = new TableSchema();
+      tableSchema.setTableName(absoluteTableIdentifier.getTableName());
+      tableSchema.setBucketingInfo(null);
+      tableSchema.setSchemaEvalution(null);
+      tableSchema.setTableId(UUID.randomUUID().toString());
+      tableSchema.setListOfColumns(columnSchemaList);
+
+      ThriftWrapperSchemaConverterImpl thriftWrapperSchemaConverter =
+          new ThriftWrapperSchemaConverterImpl();
+      SchemaEvolutionEntry schemaEvolutionEntry = new SchemaEvolutionEntry();
+      schemaEvolutionEntry.setTimeStamp(System.currentTimeMillis());
+      SchemaEvolution schemaEvol = new SchemaEvolution();
+      List<SchemaEvolutionEntry> schEntryList = new ArrayList<>();
+      schEntryList.add(schemaEvolutionEntry);
+      schemaEvol.setSchemaEvolutionEntryList(schEntryList);
+      tableSchema.setSchemaEvalution(schemaEvol);
+
+      org.apache.carbondata.format.TableSchema thriftFactTable =
+          thriftWrapperSchemaConverter.fromWrapperToExternalTableSchema(tableSchema);
+      org.apache.carbondata.format.TableInfo tableInfo =
+          new org.apache.carbondata.format.TableInfo(thriftFactTable,
+              new ArrayList<org.apache.carbondata.format.TableSchema>());
+
+      tableInfo.setDataMapSchemas(null);
+      return tableInfo;
+    } else {
+      ThriftReader thriftReader = new ThriftReader(carbonDataFilePath, createTBase);
+      thriftReader.open();
+      org.apache.carbondata.format.TableInfo tableInfo =
+          (org.apache.carbondata.format.TableInfo) thriftReader.read();
+      thriftReader.close();
+      return tableInfo;
+    }
+  }
+
 
   public static void dropDatabaseDirectory(String databasePath)
       throws IOException, InterruptedException {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9a25dc6a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
new file mode 100644
index 0000000..b86b1cc
--- /dev/null
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
@@ -0,0 +1,682 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.hadoop.api;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.lang.reflect.Constructor;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.DataMapChooser;
+import org.apache.carbondata.core.datamap.DataMapLevel;
+import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.exception.InvalidConfigurationException;
+import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory;
+import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
+import org.apache.carbondata.core.metadata.schema.PartitionInfo;
+import org.apache.carbondata.core.metadata.schema.partition.PartitionType;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.mutate.UpdateVO;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.filter.SingleTableProvider;
+import org.apache.carbondata.core.scan.filter.TableProvider;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import org.apache.carbondata.core.scan.model.QueryModel;
+import org.apache.carbondata.core.stats.QueryStatistic;
+import org.apache.carbondata.core.stats.QueryStatisticsConstants;
+import org.apache.carbondata.core.stats.QueryStatisticsRecorder;
+import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataTypeConverter;
+import org.apache.carbondata.core.util.DataTypeConverterImpl;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
+import org.apache.carbondata.hadoop.CarbonProjection;
+import org.apache.carbondata.hadoop.CarbonRecordReader;
+import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
+import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodeReadSupport;
+import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
+import org.apache.carbondata.hadoop.util.ObjectSerializationUtil;
+import org.apache.carbondata.hadoop.util.SchemaReader;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+
+/**
+ * Input format of CarbonData file.
+ *
+ * @param <T>
+ */
+@InterfaceAudience.User
+@InterfaceStability.Evolving
+public class CarbonFileInputFormat<T> extends FileInputFormat<Void, T> implements Serializable {
+
+  public static final String READ_SUPPORT_CLASS = "carbon.read.support.class";
+  // comma separated list of input segment numbers
+  public static final String INPUT_SEGMENT_NUMBERS =
+      "mapreduce.input.carboninputformat.segmentnumbers";
+  private static final String VALIDATE_INPUT_SEGMENT_IDs =
+      "mapreduce.input.carboninputformat.validsegments";
+  // comma separated list of input files
+  public static final String INPUT_FILES = "mapreduce.input.carboninputformat.files";
+  private static final String ALTER_PARTITION_ID = "mapreduce.input.carboninputformat.partitionid";
+  private static final Log LOG = LogFactory.getLog(CarbonFileInputFormat.class);
+  private static final String FILTER_PREDICATE =
+      "mapreduce.input.carboninputformat.filter.predicate";
+  private static final String COLUMN_PROJECTION = "mapreduce.input.carboninputformat.projection";
+  private static final String TABLE_INFO = "mapreduce.input.carboninputformat.tableinfo";
+  private static final String CARBON_READ_SUPPORT = "mapreduce.input.carboninputformat.readsupport";
+  private static final String CARBON_CONVERTER = "mapreduce.input.carboninputformat.converter";
+  private static final String DATA_MAP_DSTR = "mapreduce.input.carboninputformat.datamapdstr";
+  public static final String DATABASE_NAME = "mapreduce.input.carboninputformat.databaseName";
+  public static final String TABLE_NAME = "mapreduce.input.carboninputformat.tableName";
+  private static final String PARTITIONS_TO_PRUNE =
+      "mapreduce.input.carboninputformat.partitions.to.prune";
+  public static final String UPADTE_T =
+      "mapreduce.input.carboninputformat.partitions.to.prune";
+
+  // a cache for carbon table, it will be used in task side
+  private CarbonTable carbonTable;
+
+  /**
+   * Set the `tableInfo` in `configuration`
+   */
+  public static void setTableInfo(Configuration configuration, TableInfo tableInfo)
+      throws IOException {
+    if (null != tableInfo) {
+      configuration.set(TABLE_INFO, CarbonUtil.encodeToString(tableInfo.serialize()));
+    }
+  }
+
+  /**
+   * Get TableInfo object from `configuration`
+   */
+  private static TableInfo getTableInfo(Configuration configuration) throws IOException {
+    String tableInfoStr = configuration.get(TABLE_INFO);
+    if (tableInfoStr == null) {
+      return null;
+    } else {
+      TableInfo output = new TableInfo();
+      output.readFields(
+          new DataInputStream(
+              new ByteArrayInputStream(CarbonUtil.decodeStringToBytes(tableInfoStr))));
+      return output;
+    }
+  }
+
+
+  public static void setTablePath(Configuration configuration, String tablePath) {
+    configuration.set(FileInputFormat.INPUT_DIR, tablePath);
+  }
+
+  public static void setPartitionIdList(Configuration configuration, List<String> partitionIds) {
+    configuration.set(ALTER_PARTITION_ID, partitionIds.toString());
+  }
+
+
+  public static void setDataMapJob(Configuration configuration, DataMapJob dataMapJob)
+      throws IOException {
+    if (dataMapJob != null) {
+      String toString = ObjectSerializationUtil.convertObjectToString(dataMapJob);
+      configuration.set(DATA_MAP_DSTR, toString);
+    }
+  }
+
+  public static DataMapJob getDataMapJob(Configuration configuration) throws IOException {
+    String jobString = configuration.get(DATA_MAP_DSTR);
+    if (jobString != null) {
+      return (DataMapJob) ObjectSerializationUtil.convertStringToObject(jobString);
+    }
+    return null;
+  }
+
+  /**
+   * It sets unresolved filter expression.
+   *
+   * @param configuration
+   * @param filterExpression
+   */
+  public static void setFilterPredicates(Configuration configuration, Expression filterExpression) {
+    if (filterExpression == null) {
+      return;
+    }
+    try {
+      String filterString = ObjectSerializationUtil.convertObjectToString(filterExpression);
+      configuration.set(FILTER_PREDICATE, filterString);
+    } catch (Exception e) {
+      throw new RuntimeException("Error while setting filter expression to Job", e);
+    }
+  }
+
+  public static void setColumnProjection(Configuration configuration, CarbonProjection projection) {
+    if (projection == null || projection.isEmpty()) {
+      return;
+    }
+    String[] allColumns = projection.getAllColumns();
+    StringBuilder builder = new StringBuilder();
+    for (String column : allColumns) {
+      builder.append(column).append(",");
+    }
+    String columnString = builder.toString();
+    columnString = columnString.substring(0, columnString.length() - 1);
+    configuration.set(COLUMN_PROJECTION, columnString);
+  }
+
+  public static String getColumnProjection(Configuration configuration) {
+    return configuration.get(COLUMN_PROJECTION);
+  }
+
+
+  /**
+   * Set list of segments to access
+   */
+  public static void setSegmentsToAccess(Configuration configuration, List<Segment> validSegments) {
+    configuration.set(INPUT_SEGMENT_NUMBERS, CarbonUtil.convertToString(validSegments));
+  }
+
+  /**
+   * Set `CARBON_INPUT_SEGMENTS` from property to configuration
+   */
+  public static void setQuerySegment(Configuration conf, AbsoluteTableIdentifier identifier) {
+    String dbName = identifier.getCarbonTableIdentifier().getDatabaseName().toLowerCase();
+    String tbName = identifier.getCarbonTableIdentifier().getTableName().toLowerCase();
+    String segmentNumbersFromProperty = CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.CARBON_INPUT_SEGMENTS + dbName + "." + tbName, "*");
+    if (!segmentNumbersFromProperty.trim().equals("*")) {
+      CarbonFileInputFormat
+          .setSegmentsToAccess(conf, Segment.toSegmentList(segmentNumbersFromProperty.split(",")));
+    }
+  }
+
+  /**
+   * set list of segment to access
+   */
+  public static void setValidateSegmentsToAccess(Configuration configuration, Boolean validate) {
+    configuration.set(CarbonFileInputFormat.VALIDATE_INPUT_SEGMENT_IDs, validate.toString());
+  }
+
+  /**
+   * get list of segment to access
+   */
+  public static boolean getValidateSegmentsToAccess(Configuration configuration) {
+    return configuration.get(CarbonFileInputFormat.VALIDATE_INPUT_SEGMENT_IDs, "true")
+        .equalsIgnoreCase("true");
+  }
+
+  /**
+   * set list of partitions to prune
+   */
+  public static void setPartitionsToPrune(Configuration configuration,
+      List<PartitionSpec> partitions) {
+    if (partitions == null) {
+      return;
+    }
+    try {
+      String partitionString =
+          ObjectSerializationUtil.convertObjectToString(new ArrayList<>(partitions));
+      configuration.set(PARTITIONS_TO_PRUNE, partitionString);
+    } catch (Exception e) {
+      throw new RuntimeException("Error while setting patition information to Job", e);
+    }
+  }
+
+  /**
+   * get list of partitions to prune
+   */
+  private static List<PartitionSpec> getPartitionsToPrune(Configuration configuration)
+      throws IOException {
+    String partitionString = configuration.get(PARTITIONS_TO_PRUNE);
+    if (partitionString != null) {
+      return (List<PartitionSpec>) ObjectSerializationUtil.convertStringToObject(partitionString);
+    }
+    return null;
+  }
+
+  public AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration configuration)
+      throws IOException {
+    String tablePath = configuration.get(INPUT_DIR, "");
+    try {
+      return AbsoluteTableIdentifier
+          .from(tablePath, getDatabaseName(configuration), getTableName(configuration));
+    } catch (InvalidConfigurationException e) {
+      throw new IOException(e);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   * Configurations FileInputFormat.INPUT_DIR
+   * are used to get table path to read.
+   *
+   * @param job
+   * @return List<InputSplit> list of CarbonInputSplit
+   * @throws IOException
+   */
+  @Override
+  public List<InputSplit> getSplits(JobContext job) throws IOException {
+    AbsoluteTableIdentifier identifier = getAbsoluteTableIdentifier(job.getConfiguration());
+    CarbonTable carbonTable = getOrCreateCarbonTable(job.getConfiguration());
+    if (null == carbonTable) {
+      throw new IOException("Missing/Corrupt schema file for table.");
+    }
+    //    TableDataMap blockletMap = DataMapStoreManager.getInstance()
+    //        .getDataMap(identifier, BlockletDataMap.NAME, BlockletDataMapFactory.class.getName());
+
+    if (getValidateSegmentsToAccess(job.getConfiguration())) {
+      // get all valid segments and set them into the configuration
+      // check for externalTable segment (Segment_null)
+      // process and resolve the expression
+      Expression filter = getFilterPredicates(job.getConfiguration());
+      TableProvider tableProvider = new SingleTableProvider(carbonTable);
+      // this will be null in case of corrupt schema file.
+      PartitionInfo partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName());
+      CarbonInputFormatUtil.processFilterExpression(filter, carbonTable, null, null);
+
+      FilterResolverIntf filterInterface = CarbonInputFormatUtil
+          .resolveFilter(filter, carbonTable.getAbsoluteTableIdentifier(), tableProvider);
+
+      String segmentDir = CarbonTablePath.getSegmentPath(identifier.getTablePath(), "null");
+      FileFactory.FileType fileType = FileFactory.getFileType(segmentDir);
+      if (FileFactory.isFileExist(segmentDir, fileType)) {
+        // if external table Segments are found, add it to the List
+        List<Segment> externalTableSegments = new ArrayList<Segment>();
+        Segment seg = new Segment("null", null);
+        externalTableSegments.add(seg);
+
+        Map<String, String> indexFiles =
+            new SegmentIndexFileStore().getIndexFilesFromSegment(segmentDir);
+
+        if (indexFiles.size() == 0) {
+          throw new RuntimeException("Index file not present to read the carbondata file");
+        }
+        // do block filtering and get split
+        List<InputSplit> splits =
+            getSplits(job, filterInterface, externalTableSegments, null, partitionInfo, null);
+
+        return splits;
+      }
+    }
+    return null;
+  }
+
+
+
+  /**
+   * {@inheritDoc}
+   * Configurations FileInputFormat.INPUT_DIR, CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS
+   * are used to get table path to read.
+   *
+   * @return
+   * @throws IOException
+   */
+  private List<InputSplit> getSplits(JobContext job, FilterResolverIntf filterResolver,
+      List<Segment> validSegments, BitSet matchedPartitions, PartitionInfo partitionInfo,
+      List<Integer> oldPartitionIdList) throws IOException {
+
+    List<InputSplit> result = new LinkedList<InputSplit>();
+    UpdateVO invalidBlockVOForSegmentId = null;
+    Boolean isIUDTable = false;
+
+    AbsoluteTableIdentifier absoluteTableIdentifier =
+        getOrCreateCarbonTable(job.getConfiguration()).getAbsoluteTableIdentifier();
+    SegmentUpdateStatusManager updateStatusManager =
+        new SegmentUpdateStatusManager(absoluteTableIdentifier);
+
+    isIUDTable = (updateStatusManager.getUpdateStatusDetails().length != 0);
+
+    // for each segment fetch blocks matching filter in Driver BTree
+    List<CarbonInputSplit> dataBlocksOfSegment =
+        getDataBlocksOfSegment(job, absoluteTableIdentifier, filterResolver, matchedPartitions,
+            validSegments, partitionInfo, oldPartitionIdList);
+    for (CarbonInputSplit inputSplit : dataBlocksOfSegment) {
+
+      // Get the UpdateVO for those tables on which IUD operations being performed.
+      if (isIUDTable) {
+        invalidBlockVOForSegmentId =
+            updateStatusManager.getInvalidTimestampRange(inputSplit.getSegmentId());
+      }
+      String[] deleteDeltaFilePath = null;
+      if (isIUDTable) {
+        // In case IUD is not performed in this table avoid searching for
+        // invalidated blocks.
+        if (CarbonUtil
+            .isInvalidTableBlock(inputSplit.getSegmentId(), inputSplit.getPath().toString(),
+                invalidBlockVOForSegmentId, updateStatusManager)) {
+          continue;
+        }
+        // When iud is done then only get delete delta files for a block
+        try {
+          deleteDeltaFilePath = updateStatusManager
+              .getDeleteDeltaFilePath(inputSplit.getPath().toString(), inputSplit.getSegmentId());
+        } catch (Exception e) {
+          throw new IOException(e);
+        }
+      }
+      inputSplit.setDeleteDeltaFiles(deleteDeltaFilePath);
+      result.add(inputSplit);
+    }
+    return result;
+  }
+
+  protected Expression getFilterPredicates(Configuration configuration) {
+    try {
+      String filterExprString = configuration.get(FILTER_PREDICATE);
+      if (filterExprString == null) {
+        return null;
+      }
+      Object filter = ObjectSerializationUtil.convertStringToObject(filterExprString);
+      return (Expression) filter;
+    } catch (IOException e) {
+      throw new RuntimeException("Error while reading filter expression", e);
+    }
+  }
+
+  /**
+   * get data blocks of given segment
+   */
+  private List<CarbonInputSplit> getDataBlocksOfSegment(JobContext job,
+      AbsoluteTableIdentifier absoluteTableIdentifier, FilterResolverIntf resolver,
+      BitSet matchedPartitions, List<Segment> segmentIds, PartitionInfo partitionInfo,
+      List<Integer> oldPartitionIdList) throws IOException {
+
+    QueryStatisticsRecorder recorder = CarbonTimeStatisticsFactory.createDriverRecorder();
+    QueryStatistic statistic = new QueryStatistic();
+
+    // get tokens for all the required FileSystem for table path
+    TokenCache.obtainTokensForNamenodes(job.getCredentials(),
+        new Path[] { new Path(absoluteTableIdentifier.getTablePath()) }, job.getConfiguration());
+    boolean distributedCG = Boolean.parseBoolean(CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP,
+            CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP_DEFAULT));
+    DataMapExprWrapper dataMapExprWrapper =
+        DataMapChooser.get().choose(getOrCreateCarbonTable(job.getConfiguration()), resolver);
+    DataMapJob dataMapJob = getDataMapJob(job.getConfiguration());
+    List<PartitionSpec> partitionsToPrune = getPartitionsToPrune(job.getConfiguration());
+    List<ExtendedBlocklet> prunedBlocklets;
+    if (distributedCG || dataMapExprWrapper.getDataMapType() == DataMapLevel.FG) {
+      DistributableDataMapFormat datamapDstr =
+          new DistributableDataMapFormat(absoluteTableIdentifier, dataMapExprWrapper,
+              segmentIds, partitionsToPrune,
+              BlockletDataMapFactory.class.getName());
+      prunedBlocklets = dataMapJob.execute(datamapDstr, resolver);
+      // Apply expression on the blocklets.
+      prunedBlocklets = dataMapExprWrapper.pruneBlocklets(prunedBlocklets);
+    } else {
+      prunedBlocklets = dataMapExprWrapper.prune(segmentIds, partitionsToPrune);
+    }
+
+    List<CarbonInputSplit> resultFilterredBlocks = new ArrayList<>();
+    int partitionIndex = 0;
+    List<Integer> partitionIdList = new ArrayList<>();
+    if (partitionInfo != null && partitionInfo.getPartitionType() != PartitionType.NATIVE_HIVE) {
+      partitionIdList = partitionInfo.getPartitionIds();
+    }
+    for (ExtendedBlocklet blocklet : prunedBlocklets) {
+      long partitionId = CarbonTablePath.DataFileUtil.getTaskIdFromTaskNo(
+          CarbonTablePath.DataFileUtil.getTaskNo(blocklet.getPath()));
+
+      // OldPartitionIdList is only used in alter table partition command because it change
+      // partition info first and then read data.
+      // For other normal query should use newest partitionIdList
+      if (partitionInfo != null && partitionInfo.getPartitionType() != PartitionType.NATIVE_HIVE) {
+        if (oldPartitionIdList != null) {
+          partitionIndex = oldPartitionIdList.indexOf((int)partitionId);
+        } else {
+          partitionIndex = partitionIdList.indexOf((int)partitionId);
+        }
+      }
+      if (partitionIndex != -1) {
+        // matchedPartitions variable will be null in two cases as follows
+        // 1. the table is not a partition table
+        // 2. the table is a partition table, and all partitions are matched by query
+        // for partition table, the task id of carbaondata file name is the partition id.
+        // if this partition is not required, here will skip it.
+        if (matchedPartitions == null || matchedPartitions.get(partitionIndex)) {
+          CarbonInputSplit inputSplit = convertToCarbonInputSplit(blocklet);
+          if (inputSplit != null) {
+            resultFilterredBlocks.add(inputSplit);
+          }
+        }
+      }
+    }
+    statistic
+        .addStatistics(QueryStatisticsConstants.LOAD_BLOCKS_DRIVER, System.currentTimeMillis());
+    recorder.recordStatisticsForDriver(statistic, job.getConfiguration().get("query.id"));
+    return resultFilterredBlocks;
+  }
+
+  private CarbonInputSplit convertToCarbonInputSplit(ExtendedBlocklet blocklet) throws IOException {
+    CarbonInputSplit split =
+        CarbonInputSplit.from(blocklet.getSegmentId(),
+            blocklet.getBlockletId(), new FileSplit(new Path(blocklet.getPath()), 0,
+                blocklet.getLength(), blocklet.getLocations()),
+            ColumnarFormatVersion.valueOf((short) blocklet.getDetailInfo().getVersionNumber()),
+            blocklet.getDataMapWriterPath());
+    split.setDetailInfo(blocklet.getDetailInfo());
+    return split;
+  }
+
+  @Override
+  public RecordReader<Void, T> createRecordReader(InputSplit inputSplit,
+      TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
+    Configuration configuration = taskAttemptContext.getConfiguration();
+    QueryModel queryModel = createQueryModel(inputSplit, taskAttemptContext);
+    CarbonReadSupport<T> readSupport = getReadSupportClass(configuration);
+    return new CarbonRecordReader<T>(queryModel, readSupport);
+  }
+
+  public QueryModel createQueryModel(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
+      throws IOException {
+    Configuration configuration = taskAttemptContext.getConfiguration();
+    CarbonTable carbonTable = getOrCreateCarbonTable(configuration);
+    TableProvider tableProvider = new SingleTableProvider(carbonTable);
+
+    // query plan includes projection column
+    String projectionString = getColumnProjection(configuration);
+    String[] projectionColumnNames = null;
+    if (projectionString != null) {
+      projectionColumnNames = projectionString.split(",");
+    }
+    QueryModel queryModel = carbonTable.createQueryWithProjection(
+        projectionColumnNames, getDataTypeConverter(configuration));
+
+    // set the filter to the query model in order to filter blocklet before scan
+    Expression filter = getFilterPredicates(configuration);
+    boolean[] isFilterDimensions = new boolean[carbonTable.getDimensionOrdinalMax()];
+    // getAllMeasures returns list of visible and invisible columns
+    boolean[] isFilterMeasures =
+        new boolean[carbonTable.getAllMeasures().size()];
+    CarbonInputFormatUtil.processFilterExpression(filter, carbonTable, isFilterDimensions,
+        isFilterMeasures);
+    queryModel.setIsFilterDimensions(isFilterDimensions);
+    queryModel.setIsFilterMeasures(isFilterMeasures);
+    FilterResolverIntf filterIntf = CarbonInputFormatUtil
+        .resolveFilter(filter, carbonTable.getAbsoluteTableIdentifier(), tableProvider);
+    queryModel.setFilterExpressionResolverTree(filterIntf);
+
+    // update the file level index store if there are invalid segment
+    if (inputSplit instanceof CarbonMultiBlockSplit) {
+      CarbonMultiBlockSplit split = (CarbonMultiBlockSplit) inputSplit;
+      List<String> invalidSegments = split.getAllSplits().get(0).getInvalidSegments();
+      if (invalidSegments.size() > 0) {
+        queryModel.setInvalidSegmentIds(invalidSegments);
+      }
+      List<UpdateVO> invalidTimestampRangeList =
+          split.getAllSplits().get(0).getInvalidTimestampRange();
+      if ((null != invalidTimestampRangeList) && (invalidTimestampRangeList.size() > 0)) {
+        queryModel.setInvalidBlockForSegmentId(invalidTimestampRangeList);
+      }
+    }
+    return queryModel;
+  }
+
+  private CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IOException {
+    CarbonTable carbonTableTemp;
+    if (carbonTable == null) {
+      // carbon table should be created either from deserialized table info (schema saved in
+      // hive metastore) or by reading schema in HDFS (schema saved in HDFS)
+      TableInfo tableInfo = getTableInfo(configuration);
+      CarbonTable localCarbonTable;
+      if (tableInfo != null) {
+        localCarbonTable = CarbonTable.buildFromTableInfo(tableInfo);
+      } else {
+        String schemaPath = CarbonTablePath
+            .getSchemaFilePath(getAbsoluteTableIdentifier(configuration).getTablePath());
+        if (!FileFactory.isFileExist(schemaPath, FileFactory.getFileType(schemaPath))) {
+          TableInfo tableInfoInfer =
+              SchemaReader.inferSchema(getAbsoluteTableIdentifier(configuration));
+          localCarbonTable = CarbonTable.buildFromTableInfo(tableInfoInfer);
+        } else {
+          localCarbonTable =
+              SchemaReader.readCarbonTableFromStore(getAbsoluteTableIdentifier(configuration));
+        }
+      }
+      this.carbonTable = localCarbonTable;
+      return localCarbonTable;
+    } else {
+      carbonTableTemp = this.carbonTable;
+      return carbonTableTemp;
+    }
+  }
+
+
+  public CarbonReadSupport<T> getReadSupportClass(Configuration configuration) {
+    String readSupportClass = configuration.get(CARBON_READ_SUPPORT);
+    //By default it uses dictionary decoder read class
+    CarbonReadSupport<T> readSupport = null;
+    if (readSupportClass != null) {
+      try {
+        Class<?> myClass = Class.forName(readSupportClass);
+        Constructor<?> constructor = myClass.getConstructors()[0];
+        Object object = constructor.newInstance();
+        if (object instanceof CarbonReadSupport) {
+          readSupport = (CarbonReadSupport) object;
+        }
+      } catch (ClassNotFoundException ex) {
+        LOG.error("Class " + readSupportClass + "not found", ex);
+      } catch (Exception ex) {
+        LOG.error("Error while creating " + readSupportClass, ex);
+      }
+    } else {
+      readSupport = new DictionaryDecodeReadSupport<>();
+    }
+    return readSupport;
+  }
+
+  @Override
+  protected boolean isSplitable(JobContext context, Path filename) {
+    try {
+      // Don't split the file if it is local file system
+      FileSystem fileSystem = filename.getFileSystem(context.getConfiguration());
+      if (fileSystem instanceof LocalFileSystem) {
+        return false;
+      }
+    } catch (Exception e) {
+      return true;
+    }
+    return true;
+  }
+
+  /**
+   * return valid segment to access
+   */
+  private String[] getSegmentsToAccess(JobContext job) {
+    String segmentString = job.getConfiguration().get(INPUT_SEGMENT_NUMBERS, "");
+    if (segmentString.trim().isEmpty()) {
+      return new String[0];
+    }
+    return segmentString.split(",");
+  }
+
+  public static DataTypeConverter getDataTypeConverter(Configuration configuration)
+      throws IOException {
+    String converter = configuration.get(CARBON_CONVERTER);
+    if (converter == null) {
+      return new DataTypeConverterImpl();
+    }
+    return (DataTypeConverter) ObjectSerializationUtil.convertStringToObject(converter);
+  }
+
+  public static void setDatabaseName(Configuration configuration, String databaseName) {
+    if (null != databaseName) {
+      configuration.set(DATABASE_NAME, databaseName);
+    }
+  }
+
+  public static String getDatabaseName(Configuration configuration)
+      throws InvalidConfigurationException {
+    String databseName = configuration.get(DATABASE_NAME);
+    if (null == databseName) {
+      throw new InvalidConfigurationException("Database name is not set.");
+    }
+    return databseName;
+  }
+
+  public static void setTableName(Configuration configuration, String tableName) {
+    if (null != tableName) {
+      configuration.set(TABLE_NAME, tableName);
+    }
+  }
+
+  public static String getTableName(Configuration configuration)
+      throws InvalidConfigurationException {
+    String tableName = configuration.get(TABLE_NAME);
+    if (tableName == null) {
+      throw new InvalidConfigurationException("Table name is not set");
+    }
+    return tableName;
+  }
+
+  public org.apache.hadoop.mapred.RecordReader<Void, T> getRecordReader(
+      org.apache.hadoop.mapred.InputSplit split, JobConf job, Reporter reporter)
+      throws IOException {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9a25dc6a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java
index dfa8dd1..ab7c333 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java
@@ -28,7 +28,6 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.TableInfo;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
 
 /**
  * TODO: It should be removed after store manager implementation.
@@ -59,6 +58,7 @@ public class SchemaReader {
       throw new IOException("File does not exist: " + schemaFilePath);
     }
   }
+
   /**
    * the method returns the Wrapper TableInfo
    *
@@ -79,4 +79,19 @@ public class SchemaReader {
         carbonTableIdentifier.getTableName(),
         identifier.getTablePath());
   }
+
+
+  public static TableInfo inferSchema(AbsoluteTableIdentifier identifier)
+      throws IOException {
+    // This routine is going to infer schema from the carbondata file footer
+    // Convert the ColumnSchema -> TableSchema -> TableInfo.
+    // Return the TableInfo.
+    org.apache.carbondata.format.TableInfo tableInfo =
+        CarbonUtil.inferSchemaFileExternalTable(identifier.getTablePath(), identifier, false);
+    SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
+    TableInfo wrapperTableInfo = schemaConverter
+        .fromExternalToWrapperTableInfo(tableInfo, identifier.getDatabaseName(),
+            identifier.getTableName(), identifier.getTablePath());
+    return wrapperTableInfo;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9a25dc6a/integration/spark-common-test/pom.xml
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/pom.xml b/integration/spark-common-test/pom.xml
index b7f19fd..1c6cee9 100644
--- a/integration/spark-common-test/pom.xml
+++ b/integration/spark-common-test/pom.xml
@@ -105,6 +105,12 @@
       <scope>test</scope>
     </dependency>
     <dependency>
+      <groupId>org.apache.carbondata</groupId>
+      <artifactId>carbondata-store-sdk</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.apache.spark</groupId>
       <artifactId>spark-hive-thriftserver_${scala.binary.version}</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9a25dc6a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala
new file mode 100644
index 0000000..8b1f63f
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.createTable
+
+import java.io.File
+
+import org.apache.commons.io.FileUtils
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.sdk.file.{CarbonWriter, Schema}
+
+
+class TestCarbonFileInputFormatWithExternalCarbonTable extends QueryTest with BeforeAndAfterAll {
+
+  var writerPath = new File(this.getClass.getResource("/").getPath
+                            +
+                            "../." +
+                            "./src/test/resources/SparkCarbonFileFormat/WriterOutput/")
+    .getCanonicalPath
+  //getCanonicalPath gives path with \, so code expects /. Need to handle in code ?
+  writerPath = writerPath.replace("\\", "/");
+
+
+  def buildTestData(persistSchema:Boolean) = {
+
+    FileUtils.deleteDirectory(new File(writerPath))
+
+    val schema = new StringBuilder()
+      .append("[ \n")
+      .append("   {\"name\":\"string\"},\n")
+      .append("   {\"age\":\"int\"},\n")
+      .append("   {\"height\":\"double\"}\n")
+      .append("]")
+      .toString()
+
+    try {
+      val builder = CarbonWriter.builder()
+      val writer =
+      if (persistSchema) {
+        builder.persistSchemaFile(true)
+        builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath).buildWriterForCSVInput()
+      } else {
+        builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath).buildWriterForCSVInput()
+      }
+
+      var i = 0
+      while (i < 100) {
+        writer.write(Array[String]("robot" + i, String.valueOf(i), String.valueOf(i.toDouble / 2)))
+        i += 1
+      }
+      writer.close()
+    } catch {
+      case ex: Exception => None
+      case _ => None
+    }
+  }
+
+  def cleanTestData() = {
+    FileUtils.deleteDirectory(new File(writerPath))
+  }
+
+  def deleteIndexFile(path: String, extension: String) : Unit = {
+    val file: CarbonFile = FileFactory
+      .getCarbonFile(path, FileFactory.getFileType(path))
+
+    for (eachDir <- file.listFiles) {
+      if (!eachDir.isDirectory) {
+        if (eachDir.getName.endsWith(extension)) {
+          CarbonUtil.deleteFoldersAndFilesSilent(eachDir)
+        }
+      } else {
+        deleteIndexFile(eachDir.getPath, extension)
+      }
+    }
+  }
+
+  override def beforeAll(): Unit = {
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+    // create carbon table and insert data
+  }
+
+  override def afterAll(): Unit = {
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+  }
+
+  //TO DO, need to remove segment dependency and tableIdentifier Dependency
+  test("read carbondata files (sdk Writer Output) using the Carbonfile ") {
+    buildTestData(false)
+    assert(new File(writerPath).exists())
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+    //new provider Carbonfile
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'Carbonfile' LOCATION
+         |'$writerPath' """.stripMargin)
+
+    sql("Describe formatted sdkOutputTable").show(false)
+
+    sql("select * from sdkOutputTable").show(false)
+
+    sql("select * from sdkOutputTable limit 3").show(false)
+
+    sql("select name from sdkOutputTable").show(false)
+
+    sql("select age from sdkOutputTable").show(false)
+
+    sql("select * from sdkOutputTable where age > 2 and age < 8").show(200, false)
+
+    sql("select * from sdkOutputTable where name = 'robot3'").show(200, false)
+
+    sql("select * from sdkOutputTable where name like 'robo%' limit 5").show(200, false)
+
+    sql("select * from sdkOutputTable where name like '%obot%' limit 2").show(200, false)
+
+    sql("select sum(age) from sdkOutputTable where name like 'robot1%' ").show(200, false)
+
+    sql("select count(*) from sdkOutputTable where name like 'robot%' ").show(200, false)
+
+    sql("select count(*) from sdkOutputTable").show(200, false)
+
+    sql("DROP TABLE sdkOutputTable")
+    // drop table should not delete the files
+    assert(new File(writerPath).exists())
+    cleanTestData()
+  }
+
+  test("should not allow to alter datasource carbontable ") {
+    buildTestData(false)
+    assert(new File(writerPath).exists())
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+    //data source file format
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'Carbonfile' LOCATION
+         |'$writerPath' """.stripMargin)
+
+    val exception = intercept[MalformedCarbonCommandException]
+    {
+      sql("Alter table sdkOutputTable change age age BIGINT")
+    }
+    assert(exception.getMessage()
+      .contains("Unsupported alter operation on Carbon external fileformat table"))
+
+    sql("DROP TABLE sdkOutputTable")
+    // drop table should not delete the files
+    assert(new File(writerPath).exists())
+    cleanTestData()
+  }
+
+  test("Read sdk writer output file without index file should fail") {
+    buildTestData(false)
+    deleteIndexFile(writerPath, CarbonCommonConstants.UPDATE_INDEX_FILE_EXT)
+    assert(new File(writerPath).exists())
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+    //data source file format
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'Carbonfile' LOCATION
+         |'$writerPath' """.stripMargin)
+
+    //org.apache.spark.SparkException: Index file not present to read the carbondata file
+    val exception = intercept[java.lang.RuntimeException]
+    {
+      sql("select * from sdkOutputTable").show(false)
+    }
+    assert(exception.getMessage().contains("Index file not present to read the carbondata file"))
+
+    sql("DROP TABLE sdkOutputTable")
+    // drop table should not delete the files
+    assert(new File(writerPath).exists())
+    cleanTestData()
+  }
+
+
+  test("Read sdk writer output file without Carbondata file should fail") {
+    buildTestData(false)
+    deleteIndexFile(writerPath, CarbonCommonConstants.FACT_FILE_EXT)
+    assert(new File(writerPath).exists())
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+    val exception = intercept[Exception] {
+      //    data source file format
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'Carbonfile' LOCATION
+         |'$writerPath' """.stripMargin)
+    }
+    assert(exception.getMessage()
+      .contains("Operation not allowed: Invalid table path provided:"))
+
+
+    // drop table should not delete the files
+    assert(new File(writerPath).exists())
+    cleanTestData()
+  }
+
+
+  test("Read sdk writer output file without any file should fail") {
+    buildTestData(false)
+    deleteIndexFile(writerPath, CarbonCommonConstants.FACT_FILE_EXT)
+    deleteIndexFile(writerPath, CarbonCommonConstants.UPDATE_INDEX_FILE_EXT)
+    assert(new File(writerPath).exists())
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+    val exception = intercept[Exception] {
+      //data source file format
+      sql(
+      s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'Carbonfile' LOCATION
+         |'$writerPath' """.stripMargin)
+
+      sql("select * from sdkOutputTable").show(false)
+    }
+    assert(exception.getMessage()
+        .contains("Operation not allowed: Invalid table path provided:"))
+
+    // drop table should not delete the files
+    assert(new File(writerPath).exists())
+    cleanTestData()
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9a25dc6a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala
new file mode 100644
index 0000000..d284e50
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.createTable
+
+import java.io.File
+
+import org.apache.commons.io.FileUtils
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.sdk.file.{CarbonWriter, Schema}
+
+class TestCreateTableUsingSparkCarbonFileFormat extends QueryTest with BeforeAndAfterAll {
+
+
+  override def beforeAll(): Unit = {
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+  }
+
+  override def afterAll(): Unit = {
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+  }
+
+  var writerPath = new File(this.getClass.getResource("/").getPath
+                            +
+                            "../." +
+                            "./src/test/resources/SparkCarbonFileFormat/WriterOutput/")
+    .getCanonicalPath
+  //getCanonicalPath gives path with \, so code expects /. Need to handle in code ?
+  writerPath = writerPath.replace("\\", "/");
+
+  val filePath = writerPath + "/Fact/Part0/Segment_null/"
+
+  def buildTestData(persistSchema:Boolean) = {
+
+    FileUtils.deleteDirectory(new File(writerPath))
+
+    val schema = new StringBuilder()
+      .append("[ \n")
+      .append("   {\"name\":\"string\"},\n")
+      .append("   {\"age\":\"int\"},\n")
+      .append("   {\"height\":\"double\"}\n")
+      .append("]")
+      .toString()
+
+    try {
+      val builder = CarbonWriter.builder()
+      val writer =
+        if (persistSchema) {
+          builder.persistSchemaFile(true)
+          builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath).buildWriterForCSVInput()
+        } else {
+          builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath).buildWriterForCSVInput()
+        }
+
+      var i = 0
+      while (i < 100) {
+        writer.write(Array[String]("robot" + i, String.valueOf(i), String.valueOf(i.toDouble / 2)))
+        i += 1
+      }
+      writer.close()
+    } catch {
+      case ex: Exception => None
+      case _ => None
+    }
+  }
+
+  def cleanTestData() = {
+    FileUtils.deleteDirectory(new File(writerPath))
+  }
+
+  def deleteIndexFile(path: String, extension: String) : Unit = {
+    val file: CarbonFile = FileFactory
+      .getCarbonFile(path, FileFactory.getFileType(path))
+
+    for (eachDir <- file.listFiles) {
+      if (!eachDir.isDirectory) {
+        if (eachDir.getName.endsWith(extension)) {
+          CarbonUtil.deleteFoldersAndFilesSilent(eachDir)
+        }
+      } else {
+        deleteIndexFile(eachDir.getPath, extension)
+      }
+    }
+  }
+
+  //TO DO, need to remove segment dependency and tableIdentifier Dependency
+  test("read carbondata files (sdk Writer Output) using the SparkCarbonFileFormat ") {
+    buildTestData(false)
+    assert(new File(filePath).exists())
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+    //data source file format
+    if (sqlContext.sparkContext.version.startsWith("2.1")) {
+      //data source file format
+      sql(s"""CREATE TABLE sdkOutputTable USING Carbonfile OPTIONS (PATH '$filePath') """)
+    } else if (sqlContext.sparkContext.version.startsWith("2.2")) {
+      //data source file format
+      sql(
+        s"""CREATE TABLE sdkOutputTable USING Carbonfile LOCATION
+           |'$filePath' """.stripMargin)
+    } else{
+      // TO DO
+    }
+
+    sql("Describe formatted sdkOutputTable").show(false)
+
+    sql("select * from sdkOutputTable").show(false)
+
+    sql("select * from sdkOutputTable limit 3").show(false)
+
+    sql("select name from sdkOutputTable").show(false)
+
+    sql("select age from sdkOutputTable").show(false)
+
+    sql("select * from sdkOutputTable where age > 2 and age < 8").show(200,false)
+
+    sql("select * from sdkOutputTable where name = 'robot3'").show(200,false)
+
+    sql("select * from sdkOutputTable where name like 'robo%' limit 5").show(200,false)
+
+    sql("select * from sdkOutputTable where name like '%obot%' limit 2").show(200,false)
+
+    sql("select sum(age) from sdkOutputTable where name like 'robot1%' ").show(200,false)
+
+    sql("select count(*) from sdkOutputTable where name like 'robot%' ").show(200,false)
+
+    sql("select count(*) from sdkOutputTable").show(200,false)
+
+    sql("DROP TABLE sdkOutputTable")
+    // drop table should not delete the files
+    assert(new File(filePath).exists())
+    cleanTestData()
+  }
+
+
+  test("should not allow to alter datasource carbontable ") {
+    buildTestData(false)
+    assert(new File(filePath).exists())
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+
+    if (sqlContext.sparkContext.version.startsWith("2.1")) {
+      //data source file format
+      sql(s"""CREATE TABLE sdkOutputTable USING Carbonfile OPTIONS (PATH '$filePath') """)
+    } else if (sqlContext.sparkContext.version.startsWith("2.2")) {
+      //data source file format
+      sql(
+        s"""CREATE TABLE sdkOutputTable USING Carbonfile LOCATION
+           |'$filePath' """.stripMargin)
+    } else{
+      // TO DO
+    }
+
+    val exception = intercept[MalformedCarbonCommandException]
+      {
+        sql("Alter table sdkOutputTable change age age BIGINT")
+      }
+    assert(exception.getMessage().contains("Unsupported alter operation on hive table"))
+
+    sql("DROP TABLE sdkOutputTable")
+    // drop table should not delete the files
+    assert(new File(filePath).exists())
+    cleanTestData()
+  }
+
+  test("Read sdk writer output file without Carbondata file should fail") {
+    buildTestData(false)
+    deleteIndexFile(writerPath, CarbonCommonConstants.FACT_FILE_EXT)
+    assert(new File(filePath).exists())
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+    val exception = intercept[org.apache.spark.SparkException] {
+      //    data source file format
+      if (sqlContext.sparkContext.version.startsWith("2.1")) {
+        //data source file format
+        sql(s"""CREATE TABLE sdkOutputTable USING Carbonfile OPTIONS (PATH '$filePath') """)
+      } else if (sqlContext.sparkContext.version.startsWith("2.2")) {
+        //data source file format
+        sql(
+          s"""CREATE TABLE sdkOutputTable USING Carbonfile LOCATION
+             |'$filePath' """.stripMargin)
+      } else{
+        // TO DO
+      }
+    }
+    assert(exception.getMessage()
+      .contains("CarbonData file is not present in the location mentioned in DDL"))
+
+    // drop table should not delete the files
+    assert(new File(filePath).exists())
+    cleanTestData()
+  }
+
+
+  test("Read sdk writer output file without any file should fail") {
+    buildTestData(false)
+    deleteIndexFile(writerPath, CarbonCommonConstants.UPDATE_INDEX_FILE_EXT)
+    deleteIndexFile(writerPath, CarbonCommonConstants.FACT_FILE_EXT)
+    assert(new File(filePath).exists())
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+    val exception = intercept[org.apache.spark.SparkException] {
+      //data source file format
+      if (sqlContext.sparkContext.version.startsWith("2.1")) {
+        //data source file format
+        sql(s"""CREATE TABLE sdkOutputTable USING Carbonfile OPTIONS (PATH '$filePath') """)
+      } else if (sqlContext.sparkContext.version.startsWith("2.2")) {
+        //data source file format
+        sql(
+          s"""CREATE TABLE sdkOutputTable USING Carbonfile LOCATION
+             |'$filePath' """.stripMargin)
+      } else{
+        // TO DO
+      }
+
+      sql("select * from sdkOutputTable").show(false)
+    }
+    assert(exception.getMessage()
+      .contains("CarbonData file is not present in the location mentioned in DDL"))
+
+    // drop table should not delete the files
+    assert(new File(filePath).exists())
+    cleanTestData()
+  }
+
+  test("Read sdk writer output file withSchema") {
+    buildTestData(true)
+    assert(new File(filePath).exists())
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+    //data source file format
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+    if (sqlContext.sparkContext.version.startsWith("2.1")) {
+      //data source file format
+      sql(s"""CREATE TABLE sdkOutputTable USING Carbonfile OPTIONS (PATH '$filePath') """)
+    } else if (sqlContext.sparkContext.version.startsWith("2.2")) {
+      //data source file format
+      sql(
+        s"""CREATE TABLE sdkOutputTable USING Carbonfile LOCATION
+           |'$filePath' """.stripMargin)
+    } else{
+      // TO DO
+    }
+
+    sql("Describe formatted sdkOutputTable").show(false)
+
+    sql("select * from sdkOutputTable").show(false)
+
+    sql("select * from sdkOutputTable limit 3").show(false)
+
+    sql("select name from sdkOutputTable").show(false)
+
+    sql("select age from sdkOutputTable").show(false)
+
+    sql("select * from sdkOutputTable where age > 2 and age < 8").show(200, false)
+
+    sql("select * from sdkOutputTable where name = 'robot3'").show(200, false)
+
+    sql("select * from sdkOutputTable where name like 'robo%' limit 5").show(200, false)
+
+    sql("select * from sdkOutputTable where name like '%obot%' limit 2").show(200, false)
+
+    sql("select sum(age) from sdkOutputTable where name like 'robot1%' ").show(200, false)
+
+    sql("select count(*) from sdkOutputTable where name like 'robot%' ").show(200, false)
+
+    sql("select count(*) from sdkOutputTable").show(200, false)
+
+    sql("DROP TABLE sdkOutputTable")
+
+    // drop table should not delete the files
+    assert(new File(filePath).exists())
+    cleanTestData()
+  }
+
+  test("Read sdk writer output file without index file should fail") {
+    buildTestData(false)
+    deleteIndexFile(writerPath, CarbonCommonConstants.UPDATE_INDEX_FILE_EXT)
+    assert(new File(filePath).exists())
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+    if (sqlContext.sparkContext.version.startsWith("2.1")) {
+      //data source file format
+      sql(s"""CREATE TABLE sdkOutputTable USING Carbonfile OPTIONS (PATH '$filePath') """)
+    } else if (sqlContext.sparkContext.version.startsWith("2.2")) {
+      //data source file format
+      sql(
+        s"""CREATE TABLE sdkOutputTable USING Carbonfile LOCATION
+           |'$filePath' """.stripMargin)
+    } else{
+      // TO DO
+    }
+    //org.apache.spark.SparkException: Index file not present to read the carbondata file
+    val exception = intercept[org.apache.spark.SparkException]
+      {
+        sql("select * from sdkOutputTable").show(false)
+      }
+    assert(exception.getMessage().contains("Index file not present to read the carbondata file"))
+
+    sql("DROP TABLE sdkOutputTable")
+    // drop table should not delete the files
+    assert(new File(filePath).exists())
+    cleanTestData()
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9a25dc6a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala
new file mode 100644
index 0000000..9a46676
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.createTable
+
+import java.io.File
+
+import org.apache.commons.io.FileUtils
+import org.apache.spark.sql.SparkSession
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.sdk.file.{CarbonWriter, Schema}
+
+
+object TestSparkCarbonFileFormatWithSparkSession {
+
+  var writerPath = new File(this.getClass.getResource("/").getPath
+                            +
+                            "../." +
+                            "./src/test/resources/SparkCarbonFileFormat/WriterOutput/")
+    .getCanonicalPath
+  //getCanonicalPath gives path with \, so code expects /. Need to handle in code ?
+  writerPath = writerPath.replace("\\", "/");
+
+  val filePath = writerPath + "/Fact/Part0/Segment_null/"
+
+  def buildTestData(persistSchema:Boolean) = {
+
+    FileUtils.deleteDirectory(new File(writerPath))
+
+    val schema = new StringBuilder()
+      .append("[ \n")
+      .append("   {\"name\":\"string\"},\n")
+      .append("   {\"age\":\"int\"},\n")
+      .append("   {\"height\":\"double\"}\n")
+      .append("]")
+      .toString()
+
+    try {
+      val builder = CarbonWriter.builder()
+      val writer =
+        if (persistSchema) {
+          builder.persistSchemaFile(true)
+          builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath).buildWriterForCSVInput()
+        } else {
+          builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath).buildWriterForCSVInput()
+        }
+
+      var i = 0
+      while (i < 100) {
+        writer.write(Array[String]("robot" + i, String.valueOf(i), String.valueOf(i.toDouble / 2)))
+        i += 1
+      }
+      writer.close()
+    } catch {
+      case ex: Exception => None
+      case _ => None
+    }
+  }
+
+  def cleanTestData() = {
+    FileUtils.deleteDirectory(new File(writerPath))
+  }
+
+  def deleteIndexFile(path: String, extension: String) : Unit = {
+    val file: CarbonFile = FileFactory
+      .getCarbonFile(path, FileFactory.getFileType(path))
+
+    for (eachDir <- file.listFiles) {
+      if (!eachDir.isDirectory) {
+        if (eachDir.getName.endsWith(extension)) {
+          CarbonUtil.deleteFoldersAndFilesSilent(eachDir)
+        }
+      } else {
+        deleteIndexFile(eachDir.getPath, extension)
+      }
+    }
+  }
+
+  def main(args: Array[String]): Unit = {
+    val rootPath = new File(this.getClass.getResource("/").getPath
+                            + "../../../..").getCanonicalPath
+    val storeLocation = s"$rootPath/examples/spark2/target/store"
+    val warehouse = s"$rootPath/examples/spark2/target/warehouse"
+    val metastoredb = s"$rootPath/examples/spark2/target/metastore_db"
+
+    // clean data folder
+    if (true) {
+      val clean = (path: String) => FileUtils.deleteDirectory(new File(path))
+      clean(storeLocation)
+      clean(warehouse)
+      clean(metastoredb)
+    }
+
+    val spark = SparkSession
+      .builder()
+      .master("local")
+      .appName("TestSparkCarbonFileFormatWithSparkSession")
+      .enableHiveSupport()
+      .config("spark.sql.warehouse.dir", warehouse)
+      .config("javax.jdo.option.ConnectionURL",
+        s"jdbc:derby:;databaseName=$metastoredb;create=true")
+      .getOrCreate()
+
+    CarbonProperties.getInstance()
+      .addProperty("carbon.storelocation", storeLocation)
+
+    spark.sparkContext.setLogLevel("WARN")
+
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd HH:mm:ss")
+      .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "yyyy/MM/dd")
+    buildTestData(false)
+    assert(new File(filePath).exists())
+    //data source file format
+    if (spark.sparkContext.version.startsWith("2.1")) {
+      //data source file format
+      spark.sql(s"""CREATE TABLE sdkOutputTable USING Carbonfile OPTIONS (PATH '$filePath') """)
+    } else if (spark.sparkContext.version.startsWith("2.2")) {
+      //data source file format
+      spark.sql(
+        s"""CREATE TABLE sdkOutputTable USING Carbonfile LOCATION
+           |'$filePath' """.stripMargin)
+    } else{
+      // TO DO
+    }
+
+    spark.sql("Describe formatted sdkOutputTable").show(false)
+
+    spark.sql("select * from sdkOutputTable").show(false)
+
+    spark.sql("select * from sdkOutputTable limit 3").show(false)
+
+    spark.sql("select name from sdkOutputTable").show(false)
+
+    spark.sql("select age from sdkOutputTable").show(false)
+
+    spark.sql("select * from sdkOutputTable where age > 2 and age < 8").show(200,false)
+
+    spark.sql("select * from sdkOutputTable where name = 'robot3'").show(200,false)
+
+    spark.sql("select * from sdkOutputTable where name like 'robo%' limit 5").show(200,false)
+
+    spark.sql("select * from sdkOutputTable where name like '%obot%' limit 2").show(200,false)
+
+    spark.sql("select sum(age) from sdkOutputTable where name like 'robot1%' ").show(200,false)
+
+    spark.sql("select count(*) from sdkOutputTable where name like 'robot%' ").show(200,false)
+
+    spark.sql("select count(*) from sdkOutputTable").show(200,false)
+
+    spark.sql("DROP TABLE sdkOutputTable")
+    // drop table should not delete the files
+    assert(new File(filePath).exists())
+    cleanTestData()
+
+    spark.stop()
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9a25dc6a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index b5c9543..874584d 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -42,7 +42,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.block.Distributable
 import org.apache.carbondata.core.indexstore.PartitionSpec
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
-import org.apache.carbondata.core.metadata.schema.table.TableInfo
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
 import org.apache.carbondata.core.scan.expression.Expression
 import org.apache.carbondata.core.scan.filter.FilterUtil
 import org.apache.carbondata.core.scan.model.QueryModel
@@ -50,7 +50,7 @@ import org.apache.carbondata.core.stats.{QueryStatistic, QueryStatisticsConstant
 import org.apache.carbondata.core.statusmanager.FileFormat
 import org.apache.carbondata.core.util._
 import org.apache.carbondata.hadoop._
-import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
+import org.apache.carbondata.hadoop.api.{CarbonFileInputFormat, CarbonTableInputFormat}
 import org.apache.carbondata.hadoop.streaming.{CarbonStreamInputFormat, CarbonStreamRecordReader}
 import org.apache.carbondata.processing.util.CarbonLoaderUtil
 import org.apache.carbondata.spark.InitInputMetrics
@@ -90,13 +90,21 @@ class CarbonScanRDD(
     val jobConf = new JobConf(conf)
     SparkHadoopUtil.get.addCredentials(jobConf)
     val job = Job.getInstance(jobConf)
-    val format = prepareInputFormatForDriver(job.getConfiguration)
-
+    val fileLevelExternal = tableInfo.getFactTable().getTableProperties().get("_filelevelexternal")
+    val format = if (fileLevelExternal != null && fileLevelExternal.equalsIgnoreCase("true")) {
+      prepareFileInputFormatForDriver(job.getConfiguration)
+    } else {
+      prepareInputFormatForDriver(job.getConfiguration)
+    }
     // initialise query_id for job
     job.getConfiguration.set("query.id", queryId)
 
     // get splits
     val splits = format.getSplits(job)
+    if ((splits == null) && format.isInstanceOf[CarbonFileInputFormat[Object]]) {
+      throw new SparkException(
+        "CarbonData file not exist in the segment_null (SDK writer Output) path")
+    }
 
     // separate split
     // 1. for batch splits, invoke distributeSplits method to create partitions
@@ -113,7 +121,7 @@ class CarbonScanRDD(
     }
     val batchPartitions = distributeColumnarSplits(columnarSplits)
     // check and remove InExpression from filterExpression
-    checkAndRemoveInExpressinFromFilterExpression(format, batchPartitions)
+    checkAndRemoveInExpressinFromFilterExpression(batchPartitions)
     if (streamSplits.isEmpty) {
       batchPartitions.toArray
     } else {
@@ -358,7 +366,9 @@ class CarbonScanRDD(
         case _ =>
           // create record reader for CarbonData file format
           if (vectorReader) {
-            val carbonRecordReader = createVectorizedCarbonRecordReader(model, inputMetricsStats)
+            val carbonRecordReader = createVectorizedCarbonRecordReader(model,
+              inputMetricsStats,
+              "true")
             if (carbonRecordReader == null) {
               new CarbonRecordReader(model,
                 format.getReadSupportClass(attemptContext.getConfiguration), inputMetricsStats)
@@ -435,6 +445,16 @@ class CarbonScanRDD(
     createInputFormat(conf)
   }
 
+  def prepareFileInputFormatForDriver(conf: Configuration): CarbonFileInputFormat[Object] = {
+    CarbonFileInputFormat.setTableInfo(conf, tableInfo)
+    CarbonFileInputFormat.setDatabaseName(conf, tableInfo.getDatabaseName)
+    CarbonFileInputFormat.setTableName(conf, tableInfo.getFactTable.getTableName)
+    if (partitionNames != null) {
+      CarbonFileInputFormat.setPartitionsToPrune(conf, partitionNames.asJava)
+    }
+    createFileInputFormat(conf)
+  }
+
   private def prepareInputFormatForExecutor(conf: Configuration): CarbonTableInputFormat[Object] = {
     CarbonTableInputFormat.setCarbonReadSupport(conf, readSupport)
     val tableInfo1 = getTableInfo
@@ -445,6 +465,32 @@ class CarbonScanRDD(
     createInputFormat(conf)
   }
 
+  private def createFileInputFormat(conf: Configuration): CarbonFileInputFormat[Object] = {
+    val format = new CarbonFileInputFormat[Object]
+    CarbonFileInputFormat.setTablePath(conf,
+      identifier.appendWithLocalPrefix(identifier.getTablePath))
+    CarbonFileInputFormat.setQuerySegment(conf, identifier)
+    CarbonFileInputFormat.setFilterPredicates(conf, filterExpression)
+    CarbonFileInputFormat.setColumnProjection(conf, columnProjection)
+    CarbonFileInputFormat.setDataMapJob(conf, new SparkDataMapJob)
+    if (CarbonProperties.getInstance().getProperty(
+      CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP,
+      CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP_DEFAULT).toBoolean) {
+      CarbonTableInputFormat.setDataMapJob(conf, new SparkDataMapJob)
+    }
+
+    // when validate segments is disabled in thread local update it to CarbonTableInputFormat
+    val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
+    if (carbonSessionInfo != null) {
+      CarbonTableInputFormat.setValidateSegmentsToAccess(conf, carbonSessionInfo.getSessionParams
+        .getProperty(CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS +
+                     identifier.getCarbonTableIdentifier.getDatabaseName + "." +
+                     identifier.getCarbonTableIdentifier.getTableName, "true").toBoolean)
+    }
+    format
+  }
+
+
   private def createInputFormat(conf: Configuration): CarbonTableInputFormat[Object] = {
     val format = new CarbonTableInputFormat[Object]
     CarbonTableInputFormat.setTablePath(conf,
@@ -489,7 +535,6 @@ class CarbonScanRDD(
    * @param identifiedPartitions
    */
   private def checkAndRemoveInExpressinFromFilterExpression(
-      format: CarbonTableInputFormat[Object],
       identifiedPartitions: mutable.Buffer[Partition]) = {
     if (null != filterExpression) {
       if (identifiedPartitions.nonEmpty &&
@@ -537,12 +582,13 @@ class CarbonScanRDD(
   }
 
   def createVectorizedCarbonRecordReader(queryModel: QueryModel,
-      inputMetricsStats: InputMetricsStats): RecordReader[Void, Object] = {
+      inputMetricsStats: InputMetricsStats, enableBatch: String): RecordReader[Void, Object] = {
     val name = "org.apache.carbondata.spark.vectorreader.VectorizedCarbonRecordReader"
     try {
       val cons = Class.forName(name).getDeclaredConstructors
       cons.head.setAccessible(true)
-      cons.head.newInstance(queryModel, inputMetricsStats).asInstanceOf[RecordReader[Void, Object]]
+      cons.head.newInstance(queryModel, inputMetricsStats, enableBatch)
+        .asInstanceOf[RecordReader[Void, Object]]
     } catch {
       case e: Exception =>
         LOGGER.error(e)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9a25dc6a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
index 73da878..903bf44 100644
--- a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
+++ b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
@@ -91,10 +91,21 @@ class VectorizedCarbonRecordReader extends AbstractRecordReader<Object> {
 
   private InputMetricsStats inputMetricsStats;
 
-  public VectorizedCarbonRecordReader(QueryModel queryModel, InputMetricsStats inputMetricsStats) {
+  public VectorizedCarbonRecordReader(QueryModel queryModel, InputMetricsStats inputMetricsStats,
+      String enableBatch) {
     this.queryModel = queryModel;
     this.inputMetricsStats = inputMetricsStats;
-    enableReturningBatches();
+    if (enableBatch.equals("true")) {
+      enableReturningBatches();
+    }
+  }
+
+
+  /*
+ * Can be called before any rows are returned to enable returning columnar batches directly.
+ */
+  public void enableReturningBatches() {
+    returnColumnarBatch = true;
   }
 
   /**
@@ -273,12 +284,7 @@ class VectorizedCarbonRecordReader extends AbstractRecordReader<Object> {
     if (columnarBatch == null) initBatch();
   }
 
-  /*
-   * Can be called before any rows are returned to enable returning columnar batches directly.
-   */
-  private void enableReturningBatches() {
-    returnColumnarBatch = true;
-  }
+
 
   /**
    * Advances to the next batch of rows. Returns false if there are no more.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9a25dc6a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 18c268c..7a6aa53 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Sort}
 import org.apache.spark.sql.execution.LogicalRDD
 import org.apache.spark.sql.execution.SQLExecution.EXECUTION_ID_KEY
 import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataLoadTableFileMapping, UpdateTableModel}
-import org.apache.spark.sql.execution.datasources.{CarbonFileFormat, CatalogFileIndex, FindDataSourceTable, HadoopFsRelation, LogicalRelation}
+import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, FindDataSourceTable, HadoopFsRelation, LogicalRelation, SparkCarbonTableFormat}
 import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.sql.optimizer.CarbonFilters
 import org.apache.spark.sql.types._
@@ -1020,7 +1020,7 @@ case class CarbonLoadDataCommand(
       partitionSchema = partitionSchema,
       dataSchema = dataSchema,
       bucketSpec = catalogTable.bucketSpec,
-      fileFormat = new CarbonFileFormat,
+      fileFormat = new SparkCarbonTableFormat,
       options = options.toMap)(sparkSession = sparkSession)
 
     CarbonReflectionUtils.getLogicalRelation(hdfsRelation,