You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/11/13 22:11:51 UTC

[25/49] carbondata git commit: [CARBONDATA-1572][Streaming] Support streaming ingest and query

[CARBONDATA-1572][Streaming] Support streaming ingest and query

This PR supports streaming ingest from spark structured streaming:
1.row format writer and support to append batch data

2.support StreamSinkProvider and append batch data to row format file

3.row format reader and support to split row format file to small blocks

4.query with streaming row format file.

This closes #1470


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/d7393da9
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/d7393da9
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/d7393da9

Branch: refs/heads/fgdatamap
Commit: d7393da9890c2360831d17d23145b78f8da70575
Parents: fa19331
Author: QiangCai <qi...@qq.com>
Authored: Wed Oct 18 11:13:00 2017 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Thu Nov 9 16:26:53 2017 +0800

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |  10 +
 .../core/datastore/compression/Compressor.java  |   4 +
 .../datastore/compression/SnappyCompressor.java |  13 +
 .../core/datastore/impl/FileFactory.java        |  49 +-
 .../apache/carbondata/core/locks/LockUsage.java |   2 +-
 .../core/metadata/datatype/DataTypes.java       |  37 +-
 .../core/metadata/schema/table/CarbonTable.java |  37 +
 .../carbondata/core/scan/filter/FilterUtil.java |  22 +
 .../filter/executer/AndFilterExecuterImpl.java  |   7 +
 .../executer/ExcludeFilterExecuterImpl.java     |  34 +
 .../scan/filter/executer/FilterExecuter.java    |   4 +
 .../executer/IncludeFilterExecuterImpl.java     |  31 +
 .../filter/executer/OrFilterExecuterImpl.java   |   7 +
 .../RestructureExcludeFilterExecutorImpl.java   |   7 +
 .../RestructureIncludeFilterExecutorImpl.java   |   7 +
 .../executer/RowLevelFilterExecuterImpl.java    |   9 +
 .../filter/executer/TrueFilterExecutor.java     |   5 +
 .../executer/ValueBasedFilterExecuterImpl.java  |   6 +
 .../carbondata/core/scan/model/QueryModel.java  |  49 +-
 .../core/statusmanager/FileFormat.java          |  40 ++
 .../core/statusmanager/LoadMetadataDetails.java |  13 +
 .../statusmanager/SegmentStatusManager.java     |  29 +-
 .../apache/carbondata/core/util/CarbonUtil.java |   8 +
 .../core/util/path/CarbonTablePath.java         |  22 +-
 examples/spark2/pom.xml                         |   5 +
 .../spark2/src/main/resources/streamSample.csv  |   6 +
 .../carbondata/examples/StreamExample.scala     | 213 ++++++
 format/src/main/thrift/carbondata.thrift        |   6 +-
 format/src/main/thrift/carbondata_index.thrift  |   1 +
 .../carbondata/hadoop/CarbonInputFormat.java    |  23 +-
 .../carbondata/hadoop/CarbonInputSplit.java     |  35 +
 .../hadoop/CarbonMultiBlockSplit.java           |  19 +
 .../hadoop/api/CarbonTableInputFormat.java      | 110 ++-
 .../streaming/CarbonStreamInputFormat.java      | 114 ++++
 .../streaming/CarbonStreamOutputFormat.java     |  75 ++
 .../streaming/CarbonStreamRecordReader.java     | 676 +++++++++++++++++++
 .../streaming/CarbonStreamRecordWriter.java     | 305 +++++++++
 .../hadoop/streaming/StreamBlockletReader.java  | 248 +++++++
 .../hadoop/streaming/StreamBlockletWriter.java  | 152 +++++
 .../hadoop/util/CarbonInputFormatUtil.java      |   6 +-
 .../carbondata/hadoop/util/CarbonTypeUtil.java  | 101 +++
 .../hadoop/test/util/StoreCreator.java          |  78 ++-
 .../hive/MapredCarbonInputFormat.java           |   2 +-
 .../presto/CarbonVectorizedRecordReader.java    |   1 +
 .../carbondata/spark/rdd/CarbonScanRDD.scala    |  86 ++-
 integration/spark2/pom.xml                      |   2 +-
 .../sql/CarbonDatasourceHadoopRelation.scala    |   5 +-
 .../org/apache/spark/sql/CarbonSession.scala    |   2 +
 .../org/apache/spark/sql/CarbonSource.scala     |  36 +-
 pom.xml                                         |  11 +-
 .../loading/csvinput/CSVInputFormat.java        |  45 +-
 .../store/writer/AbstractFactDataWriter.java    |   2 +-
 .../util/CarbonDataProcessorUtil.java           |  19 +
 .../processing/util/CarbonLoaderUtil.java       |   2 +-
 streaming/pom.xml                               | 127 ++++
 .../streaming/CarbonStreamException.java        |  32 +
 .../streaming/parser/CSVStreamParserImp.java    |  45 ++
 .../streaming/parser/CarbonStreamParser.java    |  38 ++
 .../streaming/segment/StreamSegment.java        | 373 ++++++++++
 .../streaming/StreamSinkFactory.scala           | 160 +++++
 .../streaming/CarbonAppendableStreamSink.scala  | 292 ++++++++
 .../CarbonStreamingQueryListener.scala          |  67 ++
 62 files changed, 3831 insertions(+), 141 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 711b237..17936d9 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -532,6 +532,16 @@ public final class CarbonCommonConstants {
    * LOAD_STATUS PARTIAL_SUCCESS
    */
   public static final String STORE_LOADSTATUS_PARTIAL_SUCCESS = "Partial Success";
+
+  /**
+   * STORE_LOADSTATUS_STREAMING
+   */
+  public static final String STORE_LOADSTATUS_STREAMING = "Streaming";
+
+  /**
+   * STORE_LOADSTATUS_STREAMING
+   */
+  public static final String STORE_LOADSTATUS_STREAMING_FINISH = "Streaming Finish";
   /**
    * LOAD_STATUS
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/core/src/main/java/org/apache/carbondata/core/datastore/compression/Compressor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/Compressor.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/Compressor.java
index cd31984..a32651a 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/compression/Compressor.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/Compressor.java
@@ -25,6 +25,8 @@ public interface Compressor {
 
   byte[] compressByte(byte[] unCompInput);
 
+  byte[] compressByte(byte[] unCompInput, int byteSize);
+
   byte[] unCompressByte(byte[] compInput);
 
   byte[] unCompressByte(byte[] compInput, int offset, int length);
@@ -61,5 +63,7 @@ public interface Compressor {
 
   long rawCompress(long inputAddress, int inputSize, long outputAddress) throws IOException;
 
+  long rawUncompress(byte[] input, byte[] output) throws IOException;
+
   int maxCompressedLength(int inputSize);
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/core/src/main/java/org/apache/carbondata/core/datastore/compression/SnappyCompressor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/SnappyCompressor.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/SnappyCompressor.java
index 4022680..f234f80 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/compression/SnappyCompressor.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/SnappyCompressor.java
@@ -63,6 +63,15 @@ public class SnappyCompressor implements Compressor {
     }
   }
 
+  @Override public byte[] compressByte(byte[] unCompInput, int byteSize) {
+    try {
+      return Snappy.rawCompress(unCompInput, byteSize);
+    } catch (IOException e) {
+      LOGGER.error(e, e.getMessage());
+      return null;
+    }
+  }
+
   @Override public byte[] unCompressByte(byte[] compInput) {
     try {
       return Snappy.uncompress(compInput);
@@ -228,6 +237,10 @@ public class SnappyCompressor implements Compressor {
     return snappyNative.rawCompress(inputAddress, inputSize, outputAddress);
   }
 
+  public long rawUncompress(byte[] input, byte[] output) throws IOException {
+    return snappyNative.rawUncompress(input, 0, input.length, output, 0);
+  }
+
   @Override
   public int maxCompressedLength(int inputSize) {
     return snappyNative.maxCompressedLength(inputSize);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
index 97f0b3f..e4e4ae2 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
@@ -26,6 +26,7 @@ import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.nio.channels.FileChannel;
 import java.util.zip.GZIPInputStream;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
@@ -81,11 +82,9 @@ public final class FileFactory {
   public static FileType getFileType(String path) {
     if (path.startsWith(CarbonCommonConstants.HDFSURL_PREFIX)) {
       return FileType.HDFS;
-    }
-    else if (path.startsWith(CarbonCommonConstants.ALLUXIOURL_PREFIX)) {
+    } else if (path.startsWith(CarbonCommonConstants.ALLUXIOURL_PREFIX)) {
       return FileType.ALLUXIO;
-    }
-    else if (path.startsWith(CarbonCommonConstants.VIEWFSURL_PREFIX)) {
+    } else if (path.startsWith(CarbonCommonConstants.VIEWFSURL_PREFIX)) {
       return FileType.VIEWFS;
     }
     return FileType.LOCAL;
@@ -438,6 +437,48 @@ public final class FileFactory {
   }
 
   /**
+   * this method will truncate the file to the new size.
+   * @param path
+   * @param fileType
+   * @param newSize
+   * @throws IOException
+   */
+  public static void truncateFile(String path, FileType fileType, long newSize) throws IOException {
+    path = path.replace("\\", "/");
+    FileChannel fileChannel = null;
+    switch (fileType) {
+      case LOCAL:
+        path = getUpdatedFilePath(path, fileType);
+        fileChannel = new FileOutputStream(path, true).getChannel();
+        try {
+          fileChannel.truncate(newSize);
+        } finally {
+          if (fileChannel != null) {
+            fileChannel.close();
+          }
+        }
+        return;
+      case HDFS:
+      case ALLUXIO:
+      case VIEWFS:
+        Path pt = new Path(path);
+        FileSystem fs = pt.getFileSystem(configuration);
+        fs.truncate(pt, newSize);
+        return;
+      default:
+        fileChannel = new FileOutputStream(path, true).getChannel();
+        try {
+          fileChannel.truncate(newSize);
+        } finally {
+          if (fileChannel != null) {
+            fileChannel.close();
+          }
+        }
+        return;
+    }
+  }
+
+  /**
    * for creating a new Lock file and if it is successfully created
    * then in case of abrupt shutdown then the stream to that file will be closed.
    *

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/core/src/main/java/org/apache/carbondata/core/locks/LockUsage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/locks/LockUsage.java b/core/src/main/java/org/apache/carbondata/core/locks/LockUsage.java
index 1738c64..434129c 100644
--- a/core/src/main/java/org/apache/carbondata/core/locks/LockUsage.java
+++ b/core/src/main/java/org/apache/carbondata/core/locks/LockUsage.java
@@ -32,5 +32,5 @@ public class LockUsage {
   public static final String DELETE_SEGMENT_LOCK = "delete_segment.lock";
   public static final String CLEAN_FILES_LOCK = "clean_files.lock";
   public static final String DROP_TABLE_LOCK = "droptable.lock";
-
+  public static final String STREAMING_LOCK = "streaming.lock";
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataTypes.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataTypes.java b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataTypes.java
index 43dad72..8686583 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataTypes.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataTypes.java
@@ -48,25 +48,24 @@ public class DataTypes {
   public static final DataType STRUCT = StructType.STRUCT;
   public static final DataType MAP = MapType.MAP;
 
-  // these IDs are used within this package only
-  static final int STRING_TYPE_ID = 0;
-  static final int DATE_TYPE_ID = 1;
-  static final int TIMESTAMP_TYPE_ID = 2;
-  static final int BOOLEAN_TYPE_ID = 3;
-  static final int SHORT_TYPE_ID = 4;
-  static final int INT_TYPE_ID = 5;
-  static final int FLOAT_TYPE_ID = 6;
-  static final int LONG_TYPE_ID = 7;
-  static final int DOUBLE_TYPE_ID = 8;
-  static final int NULL_TYPE_ID = 9;
-  static final int BYTE_TYPE_ID = 10;
-  static final int BYTE_ARRAY_TYPE_ID = 11;
-  static final int SHORT_INT_TYPE_ID = 12;
-  static final int LEGACY_LONG_TYPE_ID = 13;
-  static final int DECIMAL_TYPE_ID = 20;
-  static final int ARRAY_TYPE_ID = 21;
-  static final int STRUCT_TYPE_ID = 22;
-  static final int MAP_TYPE_ID = 23;
+  public static final int STRING_TYPE_ID = 0;
+  public static final int DATE_TYPE_ID = 1;
+  public static final int TIMESTAMP_TYPE_ID = 2;
+  public static final int BOOLEAN_TYPE_ID = 3;
+  public static final int SHORT_TYPE_ID = 4;
+  public static final int INT_TYPE_ID = 5;
+  public static final int FLOAT_TYPE_ID = 6;
+  public static final int LONG_TYPE_ID = 7;
+  public static final int DOUBLE_TYPE_ID = 8;
+  public static final int NULL_TYPE_ID = 9;
+  public static final int BYTE_TYPE_ID = 10;
+  public static final int BYTE_ARRAY_TYPE_ID = 11;
+  public static final int SHORT_INT_TYPE_ID = 12;
+  public static final int LEGACY_LONG_TYPE_ID = 13;
+  public static final int DECIMAL_TYPE_ID = 20;
+  public static final int ARRAY_TYPE_ID = 21;
+  public static final int STRUCT_TYPE_ID = 22;
+  public static final int MAP_TYPE_ID = 23;
 
   /**
    * create a DataType instance from uniqueId of the DataType

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index e1a7143..d4aaa29 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -124,6 +124,8 @@ public class CarbonTable implements Serializable {
    */
   private int numberOfNoDictSortColumns;
 
+  private int dimensionOrdinalMax;
+
   private CarbonTable() {
     this.tableDimensionsMap = new HashMap<String, List<CarbonDimension>>();
     this.tableImplicitDimensionsMap = new HashMap<String, List<CarbonDimension>>();
@@ -259,6 +261,8 @@ public class CarbonTable implements Serializable {
     fillVisibleDimensions(tableSchema.getTableName());
     fillVisibleMeasures(tableSchema.getTableName());
     addImplicitDimension(dimensionOrdinal, implicitDimensions);
+
+    dimensionOrdinalMax = dimensionOrdinal;
   }
 
   /**
@@ -431,6 +435,30 @@ public class CarbonTable implements Serializable {
   }
 
   /**
+   * This method will give storage order column list
+   */
+  public List<CarbonColumn> getStreamStorageOrderColumn(String tableName) {
+    List<CarbonDimension> dimensions = tableDimensionsMap.get(tableName);
+    List<CarbonMeasure> measures = tableMeasuresMap.get(tableName);
+    List<CarbonColumn> columnList = new ArrayList<>(dimensions.size() + measures.size());
+    List<CarbonColumn> complexdimensionList = new ArrayList<>(dimensions.size());
+    for (CarbonColumn column : dimensions) {
+      if (column.isComplex()) {
+        complexdimensionList.add(column);
+      } else {
+        columnList.add(column);
+      }
+    }
+    columnList.addAll(complexdimensionList);
+    for (CarbonColumn column : measures) {
+      if (!(column.getColName().equals("default_dummy_measure"))) {
+        columnList.add(column);
+      }
+    }
+    return columnList;
+  }
+
+  /**
    * to get particular measure from a table
    *
    * @param tableName
@@ -665,4 +693,13 @@ public class CarbonTable implements Serializable {
     String streaming = getTableInfo().getFactTable().getTableProperties().get("streaming");
     return streaming != null && streaming.equalsIgnoreCase("true");
   }
+
+  public int getDimensionOrdinalMax() {
+    return dimensionOrdinalMax;
+  }
+
+  public void setDimensionOrdinalMax(int dimensionOrdinalMax) {
+    this.dimensionOrdinalMax = dimensionOrdinalMax;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
index 6943b8b..56b27aa 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
@@ -57,6 +57,7 @@ import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.scan.expression.ColumnExpression;
@@ -1639,4 +1640,25 @@ public final class FilterUtil {
       }
     }
   }
+
+  public static void updateIndexOfColumnExpression(Expression exp, int dimOridnalMax) {
+    if (exp.getChildren() == null || exp.getChildren().size() == 0) {
+      if (exp instanceof ColumnExpression) {
+        ColumnExpression ce = (ColumnExpression) exp;
+        CarbonColumn column = ce.getCarbonColumn();
+        if (column.isDimension()) {
+          ce.setColIndex(column.getOrdinal());
+        } else {
+          ce.setColIndex(dimOridnalMax + column.getOrdinal());
+        }
+      }
+    } else {
+      if (exp.getChildren().size() > 0) {
+        List<Expression> children = exp.getChildren();
+        for (int i = 0; i < children.size(); i++) {
+          updateIndexOfColumnExpression(children.get(i), dimOridnalMax);
+        }
+      }
+    }
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/AndFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/AndFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/AndFilterExecuterImpl.java
index f79e788..6b256f1 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/AndFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/AndFilterExecuterImpl.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.util.BitSet;
 
 import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
+import org.apache.carbondata.core.scan.filter.intf.RowIntf;
 import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
 import org.apache.carbondata.core.util.BitSetGroup;
 
@@ -49,6 +50,12 @@ public class AndFilterExecuterImpl implements FilterExecuter {
     return leftFilters;
   }
 
+  @Override public boolean applyFilter(RowIntf value, int dimOrdinalMax)
+      throws FilterUnsupportedException, IOException {
+    return leftExecuter.applyFilter(value, dimOrdinalMax) &&
+        rightExecuter.applyFilter(value, dimOrdinalMax);
+  }
+
   @Override public BitSet isScanRequired(byte[][] blockMaxValue, byte[][] blockMinValue) {
     BitSet leftFilters = leftExecuter.isScanRequired(blockMaxValue, blockMinValue);
     if (leftFilters.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java
index 87e7dea..df94d46 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java
@@ -27,10 +27,12 @@ import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.scan.filter.FilterUtil;
+import org.apache.carbondata.core.scan.filter.intf.RowIntf;
 import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
 import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.MeasureColumnResolvedFilterInfo;
 import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
 import org.apache.carbondata.core.util.BitSetGroup;
+import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.DataTypeUtil;
 import org.apache.carbondata.core.util.comparator.Comparator;
@@ -45,6 +47,7 @@ public class ExcludeFilterExecuterImpl implements FilterExecuter {
   protected SegmentProperties segmentProperties;
   protected boolean isDimensionPresentInCurrentBlock = false;
   protected boolean isMeasurePresentInCurrentBlock = false;
+  private SerializableComparator comparator;
   /**
    * is dimension column data is natural sorted
    */
@@ -71,6 +74,9 @@ public class ExcludeFilterExecuterImpl implements FilterExecuter {
           .prepareKeysFromSurrogates(msrColumnEvaluatorInfo.getFilterValues(), segmentProperties,
               null, null, msrColumnEvaluatorInfo.getMeasure(), msrColumnExecutorInfo);
       isMeasurePresentInCurrentBlock = true;
+
+      DataType msrType = getMeasureDataType(msrColumnEvaluatorInfo);
+      comparator = Comparator.getComparatorByDataTypeForMeasure(msrType);
     }
 
   }
@@ -127,6 +133,34 @@ public class ExcludeFilterExecuterImpl implements FilterExecuter {
     return null;
   }
 
+  @Override public boolean applyFilter(RowIntf value, int dimOrdinalMax) {
+    if (isDimensionPresentInCurrentBlock) {
+      byte[][] filterValues = dimColumnExecuterInfo.getFilterKeys();
+      byte[] col = (byte[])value.getVal(dimColEvaluatorInfo.getDimension().getOrdinal());
+      for (int i = 0; i < filterValues.length; i++) {
+        if (0 == ByteUtil.UnsafeComparer.INSTANCE.compareTo(col, 0, col.length,
+            filterValues[i], 0, filterValues[i].length)) {
+          return false;
+        }
+      }
+    } else if (isMeasurePresentInCurrentBlock) {
+      Object[] filterValues = msrColumnExecutorInfo.getFilterKeys();
+      Object col = value.getVal(msrColumnEvaluatorInfo.getMeasure().getOrdinal() + dimOrdinalMax);
+      for (int i = 0; i < filterValues.length; i++) {
+        if (filterValues[i] == null) {
+          if (null == col) {
+            return false;
+          }
+          continue;
+        }
+        if (comparator.compare(col, filterValues[i]) == 0) {
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
   private DataType getMeasureDataType(MeasureColumnResolvedFilterInfo msrColumnEvaluatorInfo) {
     if (msrColumnEvaluatorInfo.getType() == DataTypes.BOOLEAN) {
       return DataTypes.BOOLEAN;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/FilterExecuter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/FilterExecuter.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/FilterExecuter.java
index 93640fa..85891dc 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/FilterExecuter.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/FilterExecuter.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.util.BitSet;
 
 import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
+import org.apache.carbondata.core.scan.filter.intf.RowIntf;
 import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
 import org.apache.carbondata.core.util.BitSetGroup;
 
@@ -34,6 +35,9 @@ public interface FilterExecuter {
   BitSetGroup applyFilter(BlocksChunkHolder blocksChunkHolder, boolean useBitsetPipeLine)
       throws FilterUnsupportedException, IOException;
 
+  boolean applyFilter(RowIntf value, int dimOrdinalMax)
+      throws FilterUnsupportedException, IOException;
+
   /**
    * API will verify whether the block can be shortlisted based on block
    * max and min key.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
index 0022a72..fe1421c 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
@@ -27,6 +27,7 @@ import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.scan.filter.FilterUtil;
+import org.apache.carbondata.core.scan.filter.intf.RowIntf;
 import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
 import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.MeasureColumnResolvedFilterInfo;
 import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
@@ -78,6 +79,8 @@ public class IncludeFilterExecuterImpl implements FilterExecuter {
               null, null, msrColumnEvaluatorInfo.getMeasure(), msrColumnExecutorInfo);
       isMeasurePresentInCurrentBlock = true;
 
+      DataType msrType = getMeasureDataType(msrColumnEvaluatorInfo);
+      comparator = Comparator.getComparatorByDataTypeForMeasure(msrType);
     }
 
   }
@@ -147,6 +150,34 @@ public class IncludeFilterExecuterImpl implements FilterExecuter {
     return null;
   }
 
+  @Override public boolean applyFilter(RowIntf value, int dimOrdinalMax) {
+    if (isDimensionPresentInCurrentBlock) {
+      byte[][] filterValues = dimColumnExecuterInfo.getFilterKeys();
+      byte[] col = (byte[])value.getVal(dimColumnEvaluatorInfo.getDimension().getOrdinal());
+      for (int i = 0; i < filterValues.length; i++) {
+        if (0 == ByteUtil.UnsafeComparer.INSTANCE.compareTo(col, 0, col.length,
+            filterValues[i], 0, filterValues[i].length)) {
+          return true;
+        }
+      }
+    } else if (isMeasurePresentInCurrentBlock) {
+      Object[] filterValues = msrColumnExecutorInfo.getFilterKeys();
+      Object col = value.getVal(msrColumnEvaluatorInfo.getMeasure().getOrdinal() + dimOrdinalMax);
+      for (int i = 0; i < filterValues.length; i++) {
+        if (filterValues[i] == null) {
+          if (null == col) {
+            return true;
+          }
+          continue;
+        }
+        if (comparator.compare(col, filterValues[i]) == 0) {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
   private DataType getMeasureDataType(MeasureColumnResolvedFilterInfo msrColumnEvaluatorInfo) {
     if (msrColumnEvaluatorInfo.getType() == DataTypes.BOOLEAN) {
       return DataTypes.BOOLEAN;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/OrFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/OrFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/OrFilterExecuterImpl.java
index 7eed8ee..87273bb 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/OrFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/OrFilterExecuterImpl.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.util.BitSet;
 
 import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
+import org.apache.carbondata.core.scan.filter.intf.RowIntf;
 import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
 import org.apache.carbondata.core.util.BitSetGroup;
 
@@ -43,6 +44,12 @@ public class OrFilterExecuterImpl implements FilterExecuter {
     return leftFilters;
   }
 
+  @Override public boolean applyFilter(RowIntf value, int dimOrdinalMax)
+      throws FilterUnsupportedException, IOException {
+    return leftExecuter.applyFilter(value, dimOrdinalMax) ||
+        rightExecuter.applyFilter(value, dimOrdinalMax);
+  }
+
   @Override public BitSet isScanRequired(byte[][] blockMaxValue, byte[][] blockMinValue) {
     BitSet leftFilters = leftExecuter.isScanRequired(blockMaxValue, blockMinValue);
     BitSet rightFilters = rightExecuter.isScanRequired(blockMaxValue, blockMinValue);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RestructureExcludeFilterExecutorImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RestructureExcludeFilterExecutorImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RestructureExcludeFilterExecutorImpl.java
index 2c0d39f..5707eb4 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RestructureExcludeFilterExecutorImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RestructureExcludeFilterExecutorImpl.java
@@ -19,7 +19,9 @@ package org.apache.carbondata.core.scan.filter.executer;
 import java.io.IOException;
 import java.util.BitSet;
 
+import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
 import org.apache.carbondata.core.scan.filter.FilterUtil;
+import org.apache.carbondata.core.scan.filter.intf.RowIntf;
 import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
 import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.MeasureColumnResolvedFilterInfo;
 import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
@@ -54,6 +56,11 @@ public class RestructureExcludeFilterExecutorImpl extends RestructureEvaluatorIm
             numberOfRows, !isDefaultValuePresentInFilterValues);
   }
 
+  @Override public boolean applyFilter(RowIntf value, int dimOrdinalMax)
+      throws FilterUnsupportedException {
+    throw new FilterUnsupportedException("Unsupported RestructureExcludeFilterExecutorImpl on row");
+  }
+
   @Override public BitSet isScanRequired(byte[][] blockMaxValue, byte[][] blockMinValue) {
     BitSet bitSet = new BitSet(1);
     bitSet.flip(0, 1);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RestructureIncludeFilterExecutorImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RestructureIncludeFilterExecutorImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RestructureIncludeFilterExecutorImpl.java
index 5ec6971..8bcc53f 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RestructureIncludeFilterExecutorImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RestructureIncludeFilterExecutorImpl.java
@@ -19,7 +19,9 @@ package org.apache.carbondata.core.scan.filter.executer;
 import java.io.IOException;
 import java.util.BitSet;
 
+import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
 import org.apache.carbondata.core.scan.filter.FilterUtil;
+import org.apache.carbondata.core.scan.filter.intf.RowIntf;
 import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
 import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.MeasureColumnResolvedFilterInfo;
 import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
@@ -53,6 +55,11 @@ public class RestructureIncludeFilterExecutorImpl extends RestructureEvaluatorIm
             numberOfRows, isDefaultValuePresentInFilterValues);
   }
 
+  @Override public boolean applyFilter(RowIntf value, int dimOrdinalMax)
+      throws FilterUnsupportedException {
+    throw new FilterUnsupportedException("Unsupported RestructureIncludeFilterExecutorImpl on row");
+  }
+
   public BitSet isScanRequired(byte[][] blkMaxVal, byte[][] blkMinVal) {
     BitSet bitSet = new BitSet(1);
     bitSet.set(0, isDefaultValuePresentInFilterValues);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java
index 10664c8..777f564 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java
@@ -266,6 +266,15 @@ public class RowLevelFilterExecuterImpl implements FilterExecuter {
     return bitSetGroup;
   }
 
+  @Override public boolean applyFilter(RowIntf value, int dimOrdinalMax)
+      throws FilterUnsupportedException, IOException {
+    try {
+      return exp.evaluate(value).getBoolean();
+    } catch (FilterIllegalMemberException e) {
+      throw new FilterUnsupportedException(e);
+    }
+  }
+
   /**
    * Method will read the members of particular dimension block and create
    * a row instance for further processing of the filters

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/TrueFilterExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/TrueFilterExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/TrueFilterExecutor.java
index 91cebc5..92396ae 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/TrueFilterExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/TrueFilterExecutor.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.BitSet;
 
 import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
+import org.apache.carbondata.core.scan.filter.intf.RowIntf;
 import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
 import org.apache.carbondata.core.util.BitSetGroup;
 
@@ -44,6 +45,10 @@ public class TrueFilterExecutor implements FilterExecuter {
     return group;
   }
 
+  @Override public boolean applyFilter(RowIntf value, int dimOrdinalMax) {
+    return true;
+  }
+
   /**
    * API will verify whether the block can be shortlisted based on block
    * max and min key.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ValueBasedFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ValueBasedFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ValueBasedFilterExecuterImpl.java
index 6dc1375..516447f 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ValueBasedFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ValueBasedFilterExecuterImpl.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.util.BitSet;
 
 import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
+import org.apache.carbondata.core.scan.filter.intf.RowIntf;
 import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
 import org.apache.carbondata.core.util.BitSetGroup;
 
@@ -32,6 +33,11 @@ public class ValueBasedFilterExecuterImpl implements FilterExecuter {
     return new BitSetGroup(0);
   }
 
+  @Override public boolean applyFilter(RowIntf value, int dimOrdinalMax)
+      throws FilterUnsupportedException, IOException {
+    throw new FilterUnsupportedException("Unsupported ValueBasedFilterExecuterImpl on row");
+  }
+
   @Override public BitSet isScanRequired(byte[][] blockMaxValue, byte[][] blockMinValue) {
     return new BitSet(1);
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
index 20be2fd..66dfa61 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
@@ -109,6 +109,9 @@ public class QueryModel implements Serializable {
   private Map<String, UpdateVO> invalidSegmentBlockIdMap =
       new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
 
+  private boolean[] isFilterDimensions;
+  private boolean[] isFilterMeasures;
+
   public QueryModel() {
     tableBlockInfos = new ArrayList<TableBlockInfo>();
     queryDimension = new ArrayList<QueryDimension>();
@@ -136,9 +139,14 @@ public class QueryModel implements Serializable {
     queryModel.setQueryDimension(queryPlan.getDimensions());
     queryModel.setQueryMeasures(queryPlan.getMeasures());
     if (null != queryPlan.getFilterExpression()) {
+      boolean[] isFilterDimensions = new boolean[carbonTable.getDimensionOrdinalMax()];
+      boolean[] isFilterMeasures =
+          new boolean[carbonTable.getNumberOfMeasures(carbonTable.getFactTableName())];
       processFilterExpression(queryPlan.getFilterExpression(),
           carbonTable.getDimensionByTableName(factTableName),
-          carbonTable.getMeasureByTableName(factTableName));
+          carbonTable.getMeasureByTableName(factTableName), isFilterDimensions, isFilterMeasures);
+      queryModel.setIsFilterDimensions(isFilterDimensions);
+      queryModel.setIsFilterMeasures(isFilterMeasures);
     }
     //TODO need to remove this code, and executor will load the table
     // from file metadata
@@ -146,28 +154,32 @@ public class QueryModel implements Serializable {
   }
 
   public static void processFilterExpression(Expression filterExpression,
-      List<CarbonDimension> dimensions, List<CarbonMeasure> measures) {
+      List<CarbonDimension> dimensions, List<CarbonMeasure> measures,
+      final boolean[] isFilterDimensions, final boolean[] isFilterMeasures) {
     if (null != filterExpression) {
       if (null != filterExpression.getChildren() && filterExpression.getChildren().size() == 0) {
         if (filterExpression instanceof ConditionalExpression) {
           List<ColumnExpression> listOfCol =
               ((ConditionalExpression) filterExpression).getColumnList();
           for (ColumnExpression expression : listOfCol) {
-            setDimAndMsrColumnNode(dimensions, measures, expression);
+            setDimAndMsrColumnNode(dimensions, measures, expression, isFilterDimensions,
+                isFilterMeasures);
           }
         }
       }
       for (Expression expression : filterExpression.getChildren()) {
         if (expression instanceof ColumnExpression) {
-          setDimAndMsrColumnNode(dimensions, measures, (ColumnExpression) expression);
+          setDimAndMsrColumnNode(dimensions, measures, (ColumnExpression) expression,
+              isFilterDimensions, isFilterMeasures);
         } else if (expression instanceof UnknownExpression) {
           UnknownExpression exp = ((UnknownExpression) expression);
           List<ColumnExpression> listOfColExpression = exp.getAllColumnList();
           for (ColumnExpression col : listOfColExpression) {
-            setDimAndMsrColumnNode(dimensions, measures, col);
+            setDimAndMsrColumnNode(dimensions, measures, col, isFilterDimensions, isFilterMeasures);
           }
         } else {
-          processFilterExpression(expression, dimensions, measures);
+          processFilterExpression(expression, dimensions, measures, isFilterDimensions,
+              isFilterMeasures);
         }
       }
     }
@@ -184,7 +196,8 @@ public class QueryModel implements Serializable {
   }
 
   private static void setDimAndMsrColumnNode(List<CarbonDimension> dimensions,
-      List<CarbonMeasure> measures, ColumnExpression col) {
+      List<CarbonMeasure> measures, ColumnExpression col, boolean[] isFilterDimensions,
+      boolean[] isFilterMeasures) {
     CarbonDimension dim;
     CarbonMeasure msr;
     String columnName;
@@ -199,10 +212,16 @@ public class QueryModel implements Serializable {
       col.setCarbonColumn(dim);
       col.setDimension(dim);
       col.setDimension(true);
+      if (null != isFilterDimensions) {
+        isFilterDimensions[dim.getOrdinal()] = true;
+      }
     } else {
       col.setCarbonColumn(msr);
       col.setMeasure(msr);
       col.setMeasure(true);
+      if (null != isFilterMeasures) {
+        isFilterMeasures[msr.getOrdinal()] = true;
+      }
     }
   }
 
@@ -378,4 +397,20 @@ public class QueryModel implements Serializable {
   public void setConverter(DataTypeConverter converter) {
     this.converter = converter;
   }
+
+  public boolean[] getIsFilterDimensions() {
+    return isFilterDimensions;
+  }
+
+  public void setIsFilterDimensions(boolean[] isFilterDimensions) {
+    this.isFilterDimensions = isFilterDimensions;
+  }
+
+  public boolean[] getIsFilterMeasures() {
+    return isFilterMeasures;
+  }
+
+  public void setIsFilterMeasures(boolean[] isFilterMeasures) {
+    this.isFilterMeasures = isFilterMeasures;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormat.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormat.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormat.java
new file mode 100644
index 0000000..83a4813
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormat.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.statusmanager;
+
+/**
+ * the data file format which was supported
+ */
+public enum FileFormat {
+  carbondata, rowformat;
+
+  public static FileFormat getByOrdinal(int ordinal) {
+    if (ordinal < 0 || ordinal >= FileFormat.values().length) {
+      return carbondata;
+    }
+
+    switch (ordinal) {
+      case 0:
+        return carbondata;
+      case 1:
+        return rowformat;
+    }
+
+    return carbondata;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
index 3f83c72..7748d17 100644
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
@@ -70,6 +70,11 @@ public class LoadMetadataDetails implements Serializable {
    */
   private String majorCompacted;
 
+  /**
+   * the file format of this segment
+   */
+  private FileFormat fileFormat = FileFormat.carbondata;
+
   public String getPartitionCount() {
     return partitionCount;
   }
@@ -339,4 +344,12 @@ public class LoadMetadataDetails implements Serializable {
   public void setUpdateStatusFileName(String updateStatusFileName) {
     this.updateStatusFileName = updateStatusFileName;
   }
+
+  public FileFormat getFileFormat() {
+    return fileFormat;
+  }
+
+  public void setFileFormat(FileFormat fileFormat) {
+    this.fileFormat = fileFormat;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
index 28d3f18..30304d9 100644
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
@@ -100,9 +100,10 @@ public class SegmentStatusManager {
   public ValidAndInvalidSegmentsInfo getValidAndInvalidSegments() throws IOException {
 
     // @TODO: move reading LoadStatus file to separate class
-    List<String> listOfValidSegments = new ArrayList<String>(10);
-    List<String> listOfValidUpdatedSegments = new ArrayList<String>(10);
-    List<String> listOfInvalidSegments = new ArrayList<String>(10);
+    List<String> listOfValidSegments = new ArrayList<>(10);
+    List<String> listOfValidUpdatedSegments = new ArrayList<>(10);
+    List<String> listOfInvalidSegments = new ArrayList<>(10);
+    List<String> listOfStreamSegments = new ArrayList<>(10);
     CarbonTablePath carbonTablePath = CarbonStorePath
             .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
                     absoluteTableIdentifier.getCarbonTableIdentifier());
@@ -125,6 +126,10 @@ public class SegmentStatusManager {
                   || CarbonCommonConstants.MARKED_FOR_UPDATE
                   .equalsIgnoreCase(loadMetadataDetails.getLoadStatus())
                   || CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS
+                  .equalsIgnoreCase(loadMetadataDetails.getLoadStatus())
+                  || CarbonCommonConstants.STORE_LOADSTATUS_STREAMING
+                  .equalsIgnoreCase(loadMetadataDetails.getLoadStatus())
+                  || CarbonCommonConstants.STORE_LOADSTATUS_STREAMING_FINISH
                   .equalsIgnoreCase(loadMetadataDetails.getLoadStatus())) {
             // check for merged loads.
             if (null != loadMetadataDetails.getMergedLoadName()) {
@@ -144,6 +149,13 @@ public class SegmentStatusManager {
 
               listOfValidUpdatedSegments.add(loadMetadataDetails.getLoadName());
             }
+            if (CarbonCommonConstants.STORE_LOADSTATUS_STREAMING
+                .equalsIgnoreCase(loadMetadataDetails.getLoadStatus())
+                || CarbonCommonConstants.STORE_LOADSTATUS_STREAMING_FINISH
+                .equalsIgnoreCase(loadMetadataDetails.getLoadStatus())) {
+              listOfStreamSegments.add(loadMetadataDetails.getLoadName());
+              continue;
+            }
             listOfValidSegments.add(loadMetadataDetails.getLoadName());
           } else if ((CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
                   .equalsIgnoreCase(loadMetadataDetails.getLoadStatus())
@@ -169,7 +181,7 @@ public class SegmentStatusManager {
       }
     }
     return new ValidAndInvalidSegmentsInfo(listOfValidSegments, listOfValidUpdatedSegments,
-            listOfInvalidSegments);
+            listOfInvalidSegments, listOfStreamSegments);
   }
 
   /**
@@ -642,12 +654,15 @@ public class SegmentStatusManager {
     private final List<String> listOfValidSegments;
     private final List<String> listOfValidUpdatedSegments;
     private final List<String> listOfInvalidSegments;
+    private final List<String> listOfStreamSegments;
 
     private ValidAndInvalidSegmentsInfo(List<String> listOfValidSegments,
-        List<String> listOfValidUpdatedSegments, List<String> listOfInvalidUpdatedSegments) {
+        List<String> listOfValidUpdatedSegments, List<String> listOfInvalidUpdatedSegments,
+        List<String> listOfStreamSegments) {
       this.listOfValidSegments = listOfValidSegments;
       this.listOfValidUpdatedSegments = listOfValidUpdatedSegments;
       this.listOfInvalidSegments = listOfInvalidUpdatedSegments;
+      this.listOfStreamSegments = listOfStreamSegments;
     }
     public List<String> getInvalidSegments() {
       return listOfInvalidSegments;
@@ -655,5 +670,9 @@ public class SegmentStatusManager {
     public List<String> getValidSegments() {
       return listOfValidSegments;
     }
+
+    public List<String> getStreamSegments() {
+      return listOfStreamSegments;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 17a4b5f..016b8b3 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -78,6 +78,7 @@ import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
 import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.core.writer.ThriftWriter;
+import org.apache.carbondata.format.BlockletHeader;
 import org.apache.carbondata.format.DataChunk2;
 import org.apache.carbondata.format.DataChunk3;
 
@@ -1358,6 +1359,13 @@ public final class CarbonUtil {
     return thriftByteArray;
   }
 
+  public static BlockletHeader readBlockletHeader(byte[] data) throws IOException {
+    return (BlockletHeader) read(data, new ThriftReader.TBaseCreator() {
+      @Override public TBase create() {
+        return new BlockletHeader();
+      }
+    }, 0, data.length);
+  }
 
   public static DataChunk3 readDataChunk3(ByteBuffer dataChunkBuffer, int offset, int length)
       throws IOException {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
index 02a000a..caa046f 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
@@ -53,6 +53,10 @@ public class CarbonTablePath extends Path {
   public static final String INDEX_FILE_EXT = ".carbonindex";
   public static final String MERGE_INDEX_FILE_EXT = ".carbonindexmerge";
 
+  private static final String STREAMING_DIR = ".streaming";
+  private static final String STREAMING_LOG_DIR = "log";
+  private static final String STREAMING_CHECKPOINT_DIR = "checkpoint";
+
   private String tablePath;
   private CarbonTableIdentifier carbonTableIdentifier;
 
@@ -428,6 +432,14 @@ public class CarbonTablePath extends Path {
         + INDEX_FILE_EXT;
   }
 
+  public static String getCarbonStreamIndexFileName() {
+    return getCarbonIndexFileName(0, 0, 0, "0");
+  }
+
+  public static String getCarbonStreamIndexFilePath(String segmentDir) {
+    return segmentDir + File.separator + getCarbonStreamIndexFileName();
+  }
+
   /**
    * Below method will be used to get the carbon index filename
    *
@@ -440,7 +452,7 @@ public class CarbonTablePath extends Path {
     return taskNo + "-" + factUpdatedTimeStamp + indexFileExtension;
   }
 
-  private String getSegmentDir(String partitionId, String segmentId) {
+  public String getSegmentDir(String partitionId, String segmentId) {
     return getPartitionDir(partitionId) + File.separator + SEGMENT_PREFIX + segmentId;
   }
 
@@ -456,6 +468,14 @@ public class CarbonTablePath extends Path {
     return tablePath + File.separator + FACT_DIR;
   }
 
+  public String getStreamingLogDir() {
+    return tablePath + File.separator + STREAMING_DIR + File.separator + STREAMING_LOG_DIR;
+  }
+
+  public String getStreamingCheckpointDir() {
+    return tablePath + File.separator + STREAMING_DIR + File.separator + STREAMING_CHECKPOINT_DIR;
+  }
+
   public CarbonTableIdentifier getCarbonTableIdentifier() {
     return carbonTableIdentifier;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/examples/spark2/pom.xml
----------------------------------------------------------------------
diff --git a/examples/spark2/pom.xml b/examples/spark2/pom.xml
index af25771..227da7d 100644
--- a/examples/spark2/pom.xml
+++ b/examples/spark2/pom.xml
@@ -40,6 +40,11 @@
       <version>${project.version}</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.carbondata</groupId>
+      <artifactId>carbondata-streaming</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.spark</groupId>
       <artifactId>spark-sql_${scala.binary.version}</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/examples/spark2/src/main/resources/streamSample.csv
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/resources/streamSample.csv b/examples/spark2/src/main/resources/streamSample.csv
new file mode 100644
index 0000000..590ea90
--- /dev/null
+++ b/examples/spark2/src/main/resources/streamSample.csv
@@ -0,0 +1,6 @@
+id,name,city,salary,file
+100000001,batch_1,city_1,0.1,school_1:school_11$20
+100000002,batch_2,city_2,0.2,school_2:school_22$30
+100000003,batch_3,city_3,0.3,school_3:school_33$40
+100000004,batch_4,city_4,0.4,school_4:school_44$50
+100000005,batch_5,city_5,0.5,school_5:school_55$60

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamExample.scala
new file mode 100644
index 0000000..c31a0aa
--- /dev/null
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamExample.scala
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.examples
+
+import java.io.{File, PrintWriter}
+import java.net.ServerSocket
+
+import org.apache.spark.sql.{CarbonEnv, SparkSession}
+import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
+
+// scalastyle:off println
+object StreamExample {
+  def main(args: Array[String]) {
+
+    // setup paths
+    val rootPath = new File(this.getClass.getResource("/").getPath
+                            + "../../../..").getCanonicalPath
+    val storeLocation = s"$rootPath/examples/spark2/target/store"
+    val warehouse = s"$rootPath/examples/spark2/target/warehouse"
+    val metastoredb = s"$rootPath/examples/spark2/target"
+    val streamTableName = s"stream_table"
+
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
+
+    import org.apache.spark.sql.CarbonSession._
+    val spark = SparkSession
+      .builder()
+      .master("local")
+      .appName("StreamExample")
+      .config("spark.sql.warehouse.dir", warehouse)
+      .getOrCreateCarbonSession(storeLocation, metastoredb)
+
+    spark.sparkContext.setLogLevel("ERROR")
+
+    val requireCreateTable = true
+    val useComplexDataType = false
+
+    if (requireCreateTable) {
+      // drop table if exists previously
+      spark.sql(s"DROP TABLE IF EXISTS ${ streamTableName }")
+      // Create target carbon table and populate with initial data
+      if (useComplexDataType) {
+        spark.sql(
+          s"""
+             | CREATE TABLE ${ streamTableName }(
+             | id INT,
+             | name STRING,
+             | city STRING,
+             | salary FLOAT,
+             | file struct<school:array<string>, age:int>
+             | )
+             | STORED BY 'carbondata'
+             | TBLPROPERTIES('sort_columns'='name', 'dictionary_include'='city')
+             | """.stripMargin)
+      } else {
+        spark.sql(
+          s"""
+             | CREATE TABLE ${ streamTableName }(
+             | id INT,
+             | name STRING,
+             | city STRING,
+             | salary FLOAT
+             | )
+             | STORED BY 'carbondata'
+             | """.stripMargin)
+      }
+
+      val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.
+        lookupRelation(Some("default"), streamTableName)(spark).asInstanceOf[CarbonRelation].
+        tableMeta.carbonTable
+      val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
+      // batch load
+      val path = s"$rootPath/examples/spark2/src/main/resources/streamSample.csv"
+      spark.sql(
+        s"""
+           | LOAD DATA LOCAL INPATH '$path'
+           | INTO TABLE $streamTableName
+           | OPTIONS('HEADER'='true')
+         """.stripMargin)
+
+      // streaming ingest
+      val serverSocket = new ServerSocket(7071)
+      val thread1 = startStreaming(spark, tablePath)
+      val thread2 = writeSocket(serverSocket)
+      val thread3 = showTableCount(spark, streamTableName)
+
+      System.out.println("type enter to interrupt streaming")
+      System.in.read()
+      thread1.interrupt()
+      thread2.interrupt()
+      thread3.interrupt()
+      serverSocket.close()
+    }
+
+    spark.sql(s"select count(*) from ${ streamTableName }").show(100, truncate = false)
+
+    spark.sql(s"select * from ${ streamTableName }").show(100, truncate = false)
+
+    // record(id = 100000001) comes from batch segment_0
+    // record(id = 1) comes from stream segment_1
+    spark.sql(s"select * " +
+              s"from ${ streamTableName } " +
+              s"where id = 100000001 or id = 1 limit 100").show(100, truncate = false)
+
+    // not filter
+    spark.sql(s"select * " +
+              s"from ${ streamTableName } " +
+              s"where id < 10 limit 100").show(100, truncate = false)
+
+    if (useComplexDataType) {
+      // complex
+      spark.sql(s"select file.age, file.school " +
+                s"from ${ streamTableName } " +
+                s"where where file.age = 30 ").show(100, truncate = false)
+    }
+
+    spark.stop()
+    System.out.println("streaming finished")
+  }
+
+  def showTableCount(spark: SparkSession, tableName: String): Thread = {
+    val thread = new Thread() {
+      override def run(): Unit = {
+        for (_ <- 0 to 1000) {
+          spark.sql(s"select count(*) from $tableName").show(truncate = false)
+          Thread.sleep(1000 * 3)
+        }
+      }
+    }
+    thread.start()
+    thread
+  }
+
+  def startStreaming(spark: SparkSession, tablePath: CarbonTablePath): Thread = {
+    val thread = new Thread() {
+      override def run(): Unit = {
+        var qry: StreamingQuery = null
+        try {
+          val readSocketDF = spark.readStream
+            .format("socket")
+            .option("host", "localhost")
+            .option("port", 7071)
+            .load()
+
+          // Write data from socket stream to carbondata file
+          qry = readSocketDF.writeStream
+            .format("carbondata")
+            .trigger(ProcessingTime("5 seconds"))
+            .option("checkpointLocation", tablePath.getStreamingCheckpointDir)
+            .option("tablePath", tablePath.getPath)
+            .start()
+
+          qry.awaitTermination()
+        } catch {
+          case _: InterruptedException =>
+            println("Done reading and writing streaming data")
+        } finally {
+          qry.stop()
+        }
+      }
+    }
+    thread.start()
+    thread
+  }
+
+  def writeSocket(serverSocket: ServerSocket): Thread = {
+    val thread = new Thread() {
+      override def run(): Unit = {
+        // wait for client to connection request and accept
+        val clientSocket = serverSocket.accept()
+        val socketWriter = new PrintWriter(clientSocket.getOutputStream())
+        var index = 0
+        for (_ <- 1 to 1000) {
+          // write 5 records per iteration
+          for (_ <- 0 to 100) {
+            index = index + 1
+            socketWriter.println(index.toString + ",name_" + index
+                                 + ",city_" + index + "," + (index * 10000.00).toString +
+                                 ",school_" + index + ":school_" + index + index + "$" + index)
+          }
+          socketWriter.flush()
+          Thread.sleep(2000)
+        }
+        socketWriter.close()
+        System.out.println("Socket closed")
+      }
+    }
+    thread.start()
+    thread
+  }
+}
+// scalastyle:on println

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/format/src/main/thrift/carbondata.thrift
----------------------------------------------------------------------
diff --git a/format/src/main/thrift/carbondata.thrift b/format/src/main/thrift/carbondata.thrift
index 8acd0b1..1c15f3d 100644
--- a/format/src/main/thrift/carbondata.thrift
+++ b/format/src/main/thrift/carbondata.thrift
@@ -152,7 +152,7 @@ struct DataChunk3{
  */
 struct BlockletInfo{
     1: required i32 num_rows;	// Number of rows in this blocklet
-    2: required list<DataChunk> column_data_chunks;	// Information about all column chunks in this blocklet
+    2: optional list<DataChunk> column_data_chunks;	// Information about all column chunks in this blocklet
 }
 
 /**
@@ -209,6 +209,8 @@ struct FileHeader{
 	2: required list<schema.ColumnSchema> column_schema;  // Description of columns in this file
 	3: optional bool is_footer_present; //  To check whether footer is present or not
 	4: optional i64 time_stamp; // Timestamp to compare column schema against master schema
+	5: optional bool is_splitable; // Whether file is splitable or not
+	6: optional binary sync_marker; // 16 bytes sync marker
 }
 
 /**
@@ -225,7 +227,7 @@ enum MutationType {
 struct BlockletHeader{
 	1: required i32 blocklet_length; // Length of blocklet data
 	2: required MutationType mutation; // Mutation type of this blocklet
-	3: required BlockletIndex blocklet_index;  // Index for the following blocklet
+	3: optional BlockletIndex blocklet_index;  // Index for the following blocklet
 	4: required BlockletInfo blocklet_info;  // Info for the following blocklet
 	5: optional dictionary.ColumnDictionaryChunk dictionary; // Blocklet local dictionary
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/format/src/main/thrift/carbondata_index.thrift
----------------------------------------------------------------------
diff --git a/format/src/main/thrift/carbondata_index.thrift b/format/src/main/thrift/carbondata_index.thrift
index 4df085a..60ec769 100644
--- a/format/src/main/thrift/carbondata_index.thrift
+++ b/format/src/main/thrift/carbondata_index.thrift
@@ -42,4 +42,5 @@ struct BlockIndex{
   3: required i64 offset; // Offset of the footer
   4: required carbondata.BlockletIndex block_index;	// Blocklet index
   5: optional carbondata.BlockletInfo3 blocklet_info;
+  6: optional i64 file_size // Record the valid size for appendable carbon file
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
index 4e8591e..e5aac84 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
@@ -133,7 +133,7 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
   /**
    * Get TableInfo object from `configuration`
    */
-  private TableInfo getTableInfo(Configuration configuration) throws IOException {
+  private static TableInfo getTableInfo(Configuration configuration) throws IOException {
     String tableInfoStr = configuration.get(TABLE_INFO);
     if (tableInfoStr == null) {
       return null;
@@ -192,6 +192,21 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
       return this.carbonTable;
     }
   }
+
+  public static CarbonTable createCarbonTable(Configuration configuration) throws IOException {
+    // carbon table should be created either from deserialized table info (schema saved in
+    // hive metastore) or by reading schema in HDFS (schema saved in HDFS)
+    TableInfo tableInfo = getTableInfo(configuration);
+    CarbonTable carbonTable;
+    if (tableInfo != null) {
+      carbonTable = CarbonTable.buildFromTableInfo(tableInfo);
+    } else {
+      carbonTable = SchemaReader.readCarbonTableFromStore(
+          getAbsoluteTableIdentifier(configuration));
+    }
+    return carbonTable;
+  }
+
   public static void setTablePath(Configuration configuration, String tablePath)
       throws IOException {
     configuration.set(FileInputFormat.INPUT_DIR, tablePath);
@@ -295,7 +310,7 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
     configuration.set(CarbonInputFormat.INPUT_FILES, CarbonUtil.getSegmentString(validFiles));
   }
 
-  private AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration configuration)
+  private static AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration configuration)
       throws IOException {
     String dirs = configuration.get(INPUT_DIR, "");
     String[] inputPaths = StringUtils.split(dirs);
@@ -351,7 +366,7 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
       Expression filter = getFilterPredicates(job.getConfiguration());
       CarbonTable carbonTable = getOrCreateCarbonTable(job.getConfiguration());
       TableProvider tableProvider = new SingleTableProvider(carbonTable);
-      CarbonInputFormatUtil.processFilterExpression(filter, carbonTable);
+      CarbonInputFormatUtil.processFilterExpression(filter, carbonTable, null, null);
       BitSet matchedPartitions = null;
       PartitionInfo partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName());
       if (partitionInfo != null) {
@@ -812,7 +827,7 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
 
     // set the filter to the query model in order to filter blocklet before scan
     Expression filter = getFilterPredicates(configuration);
-    CarbonInputFormatUtil.processFilterExpression(filter, carbonTable);
+    CarbonInputFormatUtil.processFilterExpression(filter, carbonTable, null, null);
     FilterResolverIntf filterIntf = CarbonInputFormatUtil
         .resolveFilter(filter, carbonTable.getAbsoluteTableIdentifier(), tableProvider);
     queryModel.setFilterExpressionResolverTree(filterIntf);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
index dde4c76..f7b372f 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
@@ -33,6 +33,7 @@ import org.apache.carbondata.core.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
 import org.apache.carbondata.core.mutate.UpdateVO;
+import org.apache.carbondata.core.statusmanager.FileFormat;
 import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
@@ -81,6 +82,8 @@ public class CarbonInputSplit extends FileSplit
 
   private BlockletDetailInfo detailInfo;
 
+  private FileFormat fileFormat = FileFormat.carbondata;
+
   public CarbonInputSplit() {
     segmentId = null;
     taskId = "0";
@@ -111,6 +114,30 @@ public class CarbonInputSplit extends FileSplit
     this.numberOfBlocklets = numberOfBlocklets;
   }
 
+  public CarbonInputSplit(String segmentId, Path path, long start, long length, String[] locations,
+      FileFormat fileFormat) {
+    super(path, start, length, locations);
+    this.segmentId = segmentId;
+    this.fileFormat = fileFormat;
+    taskId = "0";
+    bucketId = "0";
+    numberOfBlocklets = 0;
+    invalidSegments = new ArrayList<>();
+    version = CarbonProperties.getInstance().getFormatVersion();
+  }
+
+  public CarbonInputSplit(String segmentId, Path path, long start, long length, String[] locations,
+      String[] inMemoryHosts, FileFormat fileFormat) {
+    super(path, start, length, locations, inMemoryHosts);
+    this.segmentId = segmentId;
+    this.fileFormat = fileFormat;
+    taskId = "0";
+    bucketId = "0";
+    numberOfBlocklets = 0;
+    invalidSegments = new ArrayList<>();
+    version = CarbonProperties.getInstance().getFormatVersion();
+  }
+
   /**
    * Constructor to initialize the CarbonInputSplit with blockStorageIdMap
    * @param segmentId
@@ -363,4 +390,12 @@ public class CarbonInputSplit extends FileSplit
   public void setDetailInfo(BlockletDetailInfo detailInfo) {
     this.detailInfo = detailInfo;
   }
+
+  public FileFormat getFileFormat() {
+    return fileFormat;
+  }
+
+  public void setFormat(FileFormat fileFormat) {
+    this.fileFormat = fileFormat;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java
index 1f8ccfc..d3fa2c2 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.statusmanager.FileFormat;
 
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -44,6 +45,8 @@ public class CarbonMultiBlockSplit extends InputSplit implements Writable {
    */
   private String[] locations;
 
+  private FileFormat fileFormat = FileFormat.carbondata;
+
   public CarbonMultiBlockSplit() {
     splitList = null;
     locations = null;
@@ -55,6 +58,13 @@ public class CarbonMultiBlockSplit extends InputSplit implements Writable {
     this.locations = locations;
   }
 
+  public CarbonMultiBlockSplit(AbsoluteTableIdentifier identifier, List<CarbonInputSplit> splitList,
+      String[] locations, FileFormat fileFormat) throws IOException {
+    this.splitList = splitList;
+    this.locations = locations;
+    this.fileFormat = fileFormat;
+  }
+
   /**
    * Return all splits for scan
    * @return split list for scan
@@ -88,6 +98,7 @@ public class CarbonMultiBlockSplit extends InputSplit implements Writable {
     for (int i = 0; i < locations.length; i++) {
       out.writeUTF(locations[i]);
     }
+    out.writeInt(fileFormat.ordinal());
   }
 
   @Override
@@ -105,6 +116,14 @@ public class CarbonMultiBlockSplit extends InputSplit implements Writable {
     for (int i = 0; i < len; i++) {
       locations[i] = in.readUTF();
     }
+    fileFormat = FileFormat.getByOrdinal(in.readInt());
+  }
+
+  public FileFormat getFileFormat() {
+    return fileFormat;
   }
 
+  public void setFileFormat(FileFormat fileFormat) {
+    this.fileFormat = fileFormat;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index 9fbeb8a..e22a5c6 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.hadoop.api;
 
 import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
+import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.lang.reflect.Constructor;
@@ -33,6 +34,8 @@ import java.util.Map;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datamap.DataMapStoreManager;
 import org.apache.carbondata.core.datamap.TableDataMap;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMap;
 import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory;
@@ -46,6 +49,7 @@ import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
 import org.apache.carbondata.core.mutate.SegmentUpdateDetails;
 import org.apache.carbondata.core.mutate.UpdateVO;
 import org.apache.carbondata.core.mutate.data.BlockMappingVO;
+import org.apache.carbondata.core.reader.CarbonIndexFileReader;
 import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor;
 import org.apache.carbondata.core.scan.filter.SingleTableProvider;
@@ -56,6 +60,7 @@ import org.apache.carbondata.core.scan.model.QueryModel;
 import org.apache.carbondata.core.stats.QueryStatistic;
 import org.apache.carbondata.core.stats.QueryStatisticsConstants;
 import org.apache.carbondata.core.stats.QueryStatisticsRecorder;
+import org.apache.carbondata.core.statusmanager.FileFormat;
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
 import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
 import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
@@ -64,6 +69,7 @@ import org.apache.carbondata.core.util.DataTypeConverter;
 import org.apache.carbondata.core.util.DataTypeConverterImpl;
 import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.format.BlockIndex;
 import org.apache.carbondata.hadoop.CarbonInputSplit;
 import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
 import org.apache.carbondata.hadoop.CarbonProjection;
@@ -77,6 +83,8 @@ import org.apache.carbondata.hadoop.util.SchemaReader;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.InvalidPathException;
 import org.apache.hadoop.fs.LocalFileSystem;
@@ -128,7 +136,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
   /**
    * Get TableInfo object from `configuration`
    */
-  private TableInfo getTableInfo(Configuration configuration) throws IOException {
+  public static TableInfo getTableInfo(Configuration configuration) throws IOException {
     String tableInfoStr = configuration.get(TABLE_INFO);
     if (tableInfoStr == null) {
       return null;
@@ -278,6 +286,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
     List<String> invalidSegments = new ArrayList<>();
     List<UpdateVO> invalidTimestampsList = new ArrayList<>();
     List<String> validSegments = Arrays.asList(getSegmentsToAccess(job));
+    List<String> streamSegments = null;
     // get all valid segments and set them into the configuration
     SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(identifier);
     if (validSegments.size() == 0) {
@@ -285,8 +294,9 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
       SegmentStatusManager.ValidAndInvalidSegmentsInfo segments =
           segmentStatusManager.getValidAndInvalidSegments();
       validSegments = segments.getValidSegments();
+      streamSegments = segments.getStreamSegments();
       if (validSegments.size() == 0) {
-        return new ArrayList<>(0);
+        return getSplitsOfStreaming(job, identifier, streamSegments);
       }
       // remove entry in the segment index if there are invalid segments
       invalidSegments.addAll(segments.getInvalidSegments());
@@ -327,7 +337,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
       throw new IOException("Missing/Corrupt schema file for table.");
     }
     PartitionInfo partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName());
-    CarbonInputFormatUtil.processFilterExpression(filter, carbonTable);
+    CarbonInputFormatUtil.processFilterExpression(filter, carbonTable, null, null);
 
     // prune partitions for filter query on partition table
     BitSet matchedPartitions = null;
@@ -356,9 +366,93 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
             .setInvalidTimestampRange(invalidTimestampsList);
       }
     }
+
+    // add all splits of streaming
+    List<InputSplit> splitsOfStreaming = getSplitsOfStreaming(job, identifier, streamSegments);
+    if (!splitsOfStreaming.isEmpty()) {
+      splits.addAll(splitsOfStreaming);
+    }
+    return splits;
+  }
+
+  /**
+   * use file list in .carbonindex file to get the split of streaming.
+   */
+  private List<InputSplit> getSplitsOfStreaming(JobContext job, AbsoluteTableIdentifier identifier,
+      List<String> streamSegments) throws IOException {
+    List<InputSplit> splits = new ArrayList<InputSplit>();
+    if (streamSegments != null && !streamSegments.isEmpty()) {
+
+      CarbonTablePath tablePath = CarbonStorePath.getCarbonTablePath(identifier);
+      long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
+      long maxSize = getMaxSplitSize(job);
+      for (String segmentId : streamSegments) {
+        String segmentDir = tablePath.getSegmentDir("0", segmentId);
+        FileFactory.FileType fileType = FileFactory.getFileType(segmentDir);
+        if (FileFactory.isFileExist(segmentDir, fileType)) {
+          String indexName = CarbonTablePath.getCarbonStreamIndexFileName();
+          String indexPath = segmentDir + File.separator + indexName;
+          CarbonFile index = FileFactory.getCarbonFile(indexPath, fileType);
+          // index file exists
+          if (index.exists()) {
+            // data file exists
+            CarbonIndexFileReader indexReader = new CarbonIndexFileReader();
+            try {
+              // map block index
+              indexReader.openThriftReader(indexPath);
+              while (indexReader.hasNext()) {
+                BlockIndex blockIndex = indexReader.readBlockIndexInfo();
+                String filePath = segmentDir + File.separator + blockIndex.getFile_name();
+                Path path = new Path(filePath);
+                long length = blockIndex.getFile_size();
+                if (length != 0) {
+                  BlockLocation[] blkLocations;
+                  FileSystem fs = FileFactory.getFileSystem(path);
+                  FileStatus file = fs.getFileStatus(path);
+                  blkLocations = fs.getFileBlockLocations(path, 0, length);
+                  long blockSize = file.getBlockSize();
+                  long splitSize = computeSplitSize(blockSize, minSize, maxSize);
+                  long bytesRemaining = length;
+                  while (((double) bytesRemaining) / splitSize > 1.1) {
+                    int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);
+                    splits.add(makeSplit(segmentId, path, length - bytesRemaining, splitSize,
+                        blkLocations[blkIndex].getHosts(),
+                        blkLocations[blkIndex].getCachedHosts(), FileFormat.rowformat));
+                    bytesRemaining -= splitSize;
+                  }
+                  if (bytesRemaining != 0) {
+                    int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);
+                    splits.add(makeSplit(segmentId, path, length - bytesRemaining, bytesRemaining,
+                        blkLocations[blkIndex].getHosts(),
+                        blkLocations[blkIndex].getCachedHosts(), FileFormat.rowformat));
+                  }
+                } else {
+                  //Create empty hosts array for zero length files
+                  splits.add(makeSplit(segmentId, path, 0, length, new String[0],
+                      FileFormat.rowformat));
+                }
+              }
+            } finally {
+              indexReader.closeThriftReader();
+            }
+          }
+        }
+      }
+    }
     return splits;
   }
 
+  protected FileSplit makeSplit(String segmentId, Path file, long start, long length,
+      String[] hosts, FileFormat fileFormat) {
+    return new CarbonInputSplit(segmentId, file, start, length, hosts, fileFormat);
+  }
+
+
+  protected FileSplit makeSplit(String segmentId, Path file, long start, long length,
+      String[] hosts, String[] inMemoryHosts, FileFormat fileFormat) {
+    return new CarbonInputSplit(segmentId, file, start, length, hosts, inMemoryHosts, fileFormat);
+  }
+
   /**
    * Read data in one segment. For alter table partition statement
    * @param job
@@ -387,7 +481,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
         throw new IOException("Missing/Corrupt schema file for table.");
       }
 
-      CarbonInputFormatUtil.processFilterExpression(filter, carbonTable);
+      CarbonInputFormatUtil.processFilterExpression(filter, carbonTable, null, null);
 
       TableProvider tableProvider = new SingleTableProvider(carbonTable);
       // prune partitions for filter query on partition table
@@ -633,7 +727,13 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
 
     // set the filter to the query model in order to filter blocklet before scan
     Expression filter = getFilterPredicates(configuration);
-    CarbonInputFormatUtil.processFilterExpression(filter, carbonTable);
+    boolean[] isFilterDimensions = new boolean[carbonTable.getDimensionOrdinalMax()];
+    boolean[] isFilterMeasures =
+        new boolean[carbonTable.getNumberOfMeasures(carbonTable.getFactTableName())];
+    CarbonInputFormatUtil.processFilterExpression(filter, carbonTable, isFilterDimensions,
+        isFilterMeasures);
+    queryModel.setIsFilterDimensions(isFilterDimensions);
+    queryModel.setIsFilterMeasures(isFilterMeasures);
     FilterResolverIntf filterIntf = CarbonInputFormatUtil
         .resolveFilter(filter, carbonTable.getAbsoluteTableIdentifier(), tableProvider);
     queryModel.setFilterExpressionResolverTree(filterIntf);