You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/09/05 06:22:26 UTC

[1/2] carbondata git commit: [CARBONDATA-2853] Implement min/max index for streaming segment

Repository: carbondata
Updated Branches:
  refs/heads/master 526e3bfa1 -> 21a72bf2e


http://git-wip-us.apache.org/repos/asf/carbondata/blob/21a72bf2/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
index 9e83924..89f00c9 100644
--- a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
@@ -19,7 +19,9 @@ package org.apache.carbondata.streaming.segment;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.carbondata.common.CarbonIterator;
@@ -29,21 +31,31 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
 import org.apache.carbondata.core.locks.CarbonLockFactory;
 import org.apache.carbondata.core.locks.ICarbonLock;
 import org.apache.carbondata.core.locks.LockUsage;
+import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.reader.CarbonIndexFileReader;
 import org.apache.carbondata.core.statusmanager.FileFormat;
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
 import org.apache.carbondata.core.statusmanager.SegmentStatus;
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.ByteUtil;
+import org.apache.carbondata.core.util.CarbonMetadataUtil;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.core.util.comparator.Comparator;
+import org.apache.carbondata.core.util.comparator.SerializableComparator;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.core.writer.CarbonIndexFileWriter;
 import org.apache.carbondata.format.BlockIndex;
 import org.apache.carbondata.format.BlockletIndex;
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
 import org.apache.carbondata.streaming.CarbonStreamRecordWriter;
+import org.apache.carbondata.streaming.index.StreamFileIndex;
 
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
@@ -221,10 +233,48 @@ public class StreamSegment {
     }
   }
 
+  public static BlockletMinMaxIndex collectMinMaxIndex(SimpleStatsResult[] dimStats,
+      SimpleStatsResult[] mrsStats) {
+    BlockletMinMaxIndex minMaxIndex = new BlockletMinMaxIndex();
+    byte[][] maxIndexes = new byte[dimStats.length + mrsStats.length][];
+    for (int index = 0; index < dimStats.length; index++) {
+      maxIndexes[index] =
+          CarbonUtil.getValueAsBytes(dimStats[index].getDataType(), dimStats[index].getMax());
+    }
+    for (int index = 0; index < mrsStats.length; index++) {
+      maxIndexes[dimStats.length + index] =
+          CarbonUtil.getValueAsBytes(mrsStats[index].getDataType(), mrsStats[index].getMax());
+    }
+    minMaxIndex.setMaxValues(maxIndexes);
+
+    byte[][] minIndexes = new byte[maxIndexes.length][];
+    for (int index = 0; index < dimStats.length; index++) {
+      minIndexes[index] =
+          CarbonUtil.getValueAsBytes(dimStats[index].getDataType(), dimStats[index].getMin());
+    }
+    for (int index = 0; index < mrsStats.length; index++) {
+      minIndexes[dimStats.length + index] =
+          CarbonUtil.getValueAsBytes(mrsStats[index].getDataType(), mrsStats[index].getMin());
+    }
+    minMaxIndex.setMinValues(minIndexes);
+    return minMaxIndex;
+  }
+
+  /**
+   * create a StreamBlockIndex from the SimpleStatsResult array
+   */
+  private static StreamFileIndex createStreamBlockIndex(String fileName,
+      BlockletMinMaxIndex minMaxIndex, DataType[] msrDataTypes, int blockletRowCount) {
+    StreamFileIndex streamFileIndex =
+        new StreamFileIndex(fileName, minMaxIndex, blockletRowCount);
+    streamFileIndex.setMsrDataTypes(msrDataTypes);
+    return streamFileIndex;
+  }
+
   /**
    * invoke CarbonStreamOutputFormat to append batch data to existing carbondata file
    */
-  public static void appendBatchData(CarbonIterator<Object[]> inputIterators,
+  public static StreamFileIndex appendBatchData(CarbonIterator<Object[]> inputIterators,
       TaskAttemptContext job, CarbonLoadModel carbonLoadModel) throws Exception {
     CarbonStreamRecordWriter writer = null;
     try {
@@ -235,11 +285,15 @@ public class StreamSegment {
           writer.getSegmentDir(),
           writer.getFileName(),
           CarbonTablePath.getCarbonStreamIndexFileName());
-
+      int blockletRowCount = 0;
       while (inputIterators.hasNext()) {
         writer.write(null, inputIterators.next());
+        blockletRowCount++;
       }
       inputIterators.close();
+
+      return createStreamBlockIndex(writer.getFileName(), writer.getBatchMinMaxIndex(),
+          writer.getMeasureDataTypes(), blockletRowCount);
     } catch (Throwable ex) {
       if (writer != null) {
         LOGGER.error(ex, "Failed to append batch data to stream segment: " +
@@ -331,6 +385,7 @@ public class StreamSegment {
             } else if (blockIndex.getFile_size() < file.getSize()) {
               FileFactory.truncateFile(filePath, fileType, blockIndex.getFile_size());
             }
+            break;
           }
         }
       } finally {
@@ -357,11 +412,208 @@ public class StreamSegment {
   }
 
   /**
-   * update carbonindex file after a stream batch.
+   * read index file to list BlockIndex
+   *
+   * @param indexPath path of the index file
+   * @param fileType  file type of the index file
+   * @return the list of BlockIndex in the index file
+   * @throws IOException
+   */
+  private static List<BlockIndex> readIndexFile(String indexPath, FileFactory.FileType fileType)
+      throws IOException {
+    List<BlockIndex> blockIndexList = new ArrayList<>();
+    CarbonFile index = FileFactory.getCarbonFile(indexPath, fileType);
+    if (index.exists()) {
+      CarbonIndexFileReader indexReader = new CarbonIndexFileReader();
+      try {
+        indexReader.openThriftReader(indexPath);
+        while (indexReader.hasNext()) {
+          blockIndexList.add(indexReader.readBlockIndexInfo());
+        }
+      } finally {
+        indexReader.closeThriftReader();
+      }
+    }
+    return blockIndexList;
+  }
+
+  /**
+   * combine the index of new blocklet and the BlockletMinMaxIndex index of stream file
+   * 1. if file index is null, not require Min/Max index
+   * 2. if file index is not null,
+   * 2.1 if blocklet index is null, use the BlockletMinMaxIndex index of stream
+   * 2.2 if blocklet index is not null, combine these two index
+   */
+  private static void mergeBatchMinMax(StreamFileIndex blockletIndex, BlockletMinMaxIndex fileIndex)
+      throws IOException {
+    if (fileIndex == null) {
+      // backward compatibility
+      // it will not create a min/max index for the old stream file(without min/max index).
+      blockletIndex.setMinMaxIndex(null);
+      return;
+    }
+
+    DataType[] msrDataTypes = blockletIndex.getMsrDataTypes();
+    SerializableComparator[] comparators = new SerializableComparator[msrDataTypes.length];
+    for (int index = 0; index < comparators.length; index++) {
+      comparators[index] = Comparator.getComparatorByDataTypeForMeasure(msrDataTypes[index]);
+    }
+
+    // min value
+    byte[][] minValues = blockletIndex.getMinMaxIndex().getMinValues();
+    byte[][] mergedMinValues = fileIndex.getMinValues();
+    if (minValues == null || minValues.length == 0) {
+      // use file index
+      blockletIndex.getMinMaxIndex().setMinValues(mergedMinValues);
+    } else if (mergedMinValues != null && mergedMinValues.length != 0) {
+      if (minValues.length != mergedMinValues.length) {
+        throw new IOException("the lengths of the min values should be same.");
+      }
+      int dimCount = minValues.length - msrDataTypes.length;
+      for (int index = 0; index < minValues.length; index++) {
+        if (index < dimCount) {
+          if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(minValues[index], mergedMinValues[index])
+              > 0) {
+            minValues[index] = mergedMinValues[index];
+          }
+        } else {
+          Object object = DataTypeUtil.getMeasureObjectFromDataType(
+              minValues[index], msrDataTypes[index - dimCount]);
+          Object mergedObject = DataTypeUtil.getMeasureObjectFromDataType(
+              mergedMinValues[index], msrDataTypes[index - dimCount]);
+          if (comparators[index - dimCount].compare(object, mergedObject) > 0) {
+            minValues[index] = mergedMinValues[index];
+          }
+        }
+      }
+    }
+
+    // max value
+    byte[][] maxValues = blockletIndex.getMinMaxIndex().getMaxValues();
+    byte[][] mergedMaxValues = fileIndex.getMaxValues();
+    if (maxValues == null || maxValues.length == 0) {
+      blockletIndex.getMinMaxIndex().setMaxValues(mergedMaxValues);
+    } else if (mergedMaxValues != null && mergedMaxValues.length != 0) {
+      if (maxValues.length != mergedMaxValues.length) {
+        throw new IOException("the lengths of the max values should be same.");
+      }
+      int dimCount = maxValues.length - msrDataTypes.length;
+      for (int index = 0; index < maxValues.length; index++) {
+        if (index < dimCount) {
+          if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(maxValues[index], mergedMaxValues[index])
+              < 0) {
+            maxValues[index] = mergedMaxValues[index];
+          }
+        } else {
+          Object object = DataTypeUtil.getMeasureObjectFromDataType(
+              maxValues[index], msrDataTypes[index - dimCount]);
+          Object mergedObject = DataTypeUtil.getMeasureObjectFromDataType(
+              mergedMaxValues[index], msrDataTypes[index - dimCount]);
+          if (comparators[index - dimCount].compare(object, mergedObject) < 0) {
+            maxValues[index] = mergedMaxValues[index];
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * merge blocklet min/max to generate batch min/max
+   */
+  public static BlockletMinMaxIndex mergeBlockletMinMax(BlockletMinMaxIndex to,
+      BlockletMinMaxIndex from, DataType[] msrDataTypes) {
+    if (to == null) {
+      return from;
+    }
+    if (from == null) {
+      return to;
+    }
+
+    SerializableComparator[] comparators = new SerializableComparator[msrDataTypes.length];
+    for (int index = 0; index < comparators.length; index++) {
+      comparators[index] = Comparator.getComparatorByDataTypeForMeasure(msrDataTypes[index]);
+    }
+
+    // min value
+    byte[][] minValues = to.getMinValues();
+    byte[][] mergedMinValues = from.getMinValues();
+    int dimCount1 = minValues.length - msrDataTypes.length;
+    for (int index = 0; index < minValues.length; index++) {
+      if (index < dimCount1) {
+        if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(minValues[index], mergedMinValues[index])
+            > 0) {
+          minValues[index] = mergedMinValues[index];
+        }
+      } else {
+        Object object = DataTypeUtil.getMeasureObjectFromDataType(
+            minValues[index], msrDataTypes[index - dimCount1]);
+        Object mergedObject = DataTypeUtil.getMeasureObjectFromDataType(
+            mergedMinValues[index], msrDataTypes[index - dimCount1]);
+        if (comparators[index - dimCount1].compare(object, mergedObject) > 0) {
+          minValues[index] = mergedMinValues[index];
+        }
+      }
+    }
+
+    // max value
+    byte[][] maxValues = to.getMaxValues();
+    byte[][] mergedMaxValues = from.getMaxValues();
+    int dimCount2 = maxValues.length - msrDataTypes.length;
+    for (int index = 0; index < maxValues.length; index++) {
+      if (index < dimCount2) {
+        if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(maxValues[index], mergedMaxValues[index])
+            < 0) {
+          maxValues[index] = mergedMaxValues[index];
+        }
+      } else {
+        Object object = DataTypeUtil.getMeasureObjectFromDataType(
+            maxValues[index], msrDataTypes[index - dimCount2]);
+        Object mergedObject = DataTypeUtil.getMeasureObjectFromDataType(
+            mergedMaxValues[index], msrDataTypes[index - dimCount2]);
+        if (comparators[index - dimCount2].compare(object, mergedObject) < 0) {
+          maxValues[index] = mergedMaxValues[index];
+        }
+      }
+    }
+    return to;
+  }
+
+  /**
+   * merge new blocklet index and old file index to create new file index
    */
-  public static void updateIndexFile(String segmentDir) throws IOException {
+  private static void updateStreamFileIndex(Map<String, StreamFileIndex> indexMap,
+      String indexPath, FileFactory.FileType fileType) throws IOException {
+    List<BlockIndex> blockIndexList = readIndexFile(indexPath, fileType);
+    for (BlockIndex blockIndex : blockIndexList) {
+      BlockletMinMaxIndex fileIndex = CarbonMetadataUtil
+          .convertExternalMinMaxIndex(blockIndex.getBlock_index().getMin_max_index());
+      StreamFileIndex blockletIndex = indexMap.get(blockIndex.getFile_name());
+      if (blockletIndex == null) {
+        // should index all stream file
+        indexMap.put(blockIndex.getFile_name(),
+            new StreamFileIndex(blockIndex.getFile_name(), fileIndex, blockIndex.getNum_rows()));
+      } else {
+        // merge minMaxIndex into StreamBlockIndex
+        blockletIndex.setRowCount(blockletIndex.getRowCount() + blockIndex.getNum_rows());
+        mergeBatchMinMax(blockletIndex, fileIndex);
+      }
+    }
+  }
+
+  /**
+   * update carbon index file after a stream batch.
+   */
+  public static void updateIndexFile(String segmentDir,
+      StreamFileIndex[] blockIndexes) throws IOException {
     FileFactory.FileType fileType = FileFactory.getFileType(segmentDir);
     String filePath = CarbonTablePath.getCarbonStreamIndexFilePath(segmentDir);
+    // update min/max index
+    Map<String, StreamFileIndex> indexMap = new HashMap<>();
+    for (StreamFileIndex fileIndex : blockIndexes) {
+      indexMap.put(fileIndex.getFileName(), fileIndex);
+    }
+    updateStreamFileIndex(indexMap, filePath, fileType);
+
     String tempFilePath = filePath + CarbonCommonConstants.TEMPWRITEFILEEXTENSION;
     CarbonIndexFileWriter writer = new CarbonIndexFileWriter();
     try {
@@ -372,10 +624,19 @@ public class StreamSegment {
         blockIndex = new BlockIndex();
         blockIndex.setFile_name(file.getName());
         blockIndex.setFile_size(file.getSize());
-        // TODO need to collect these information
-        blockIndex.setNum_rows(-1);
         blockIndex.setOffset(-1);
-        blockIndex.setBlock_index(new BlockletIndex());
+        // set min/max index
+        BlockletIndex blockletIndex = new BlockletIndex();
+        blockIndex.setBlock_index(blockletIndex);
+        StreamFileIndex streamFileIndex = indexMap.get(blockIndex.getFile_name());
+        if (streamFileIndex != null) {
+          blockletIndex.setMin_max_index(
+              CarbonMetadataUtil.convertMinMaxIndex(streamFileIndex.getMinMaxIndex()));
+          blockIndex.setNum_rows(streamFileIndex.getRowCount());
+        } else {
+          blockIndex.setNum_rows(-1);
+        }
+        // write block index
         writer.writeThrift(blockIndex);
       }
       writer.close();


[2/2] carbondata git commit: [CARBONDATA-2853] Implement min/max index for streaming segment

Posted by ja...@apache.org.
[CARBONDATA-2853] Implement min/max index for streaming segment

Implement file-level min/max index (driver side) and blocklet level min/max index (worker side) on stream files to improve read performance.

This closes #2644


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/21a72bf2
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/21a72bf2
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/21a72bf2

Branch: refs/heads/master
Commit: 21a72bf2efa9d76c38d7da92b7bd4eaa5cd4a3ea
Parents: 526e3bf
Author: QiangCai <qi...@qq.com>
Authored: Sat Aug 25 17:18:16 2018 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Wed Sep 5 14:22:03 2018 +0800

----------------------------------------------------------------------
 .../carbondata/core/stream/StreamFile.java      |  63 +++++
 .../carbondata/core/stream/StreamPruner.java    | 143 ++++++++++
 .../core/util/CarbonMetadataUtil.java           |  37 ++-
 .../hadoop/api/CarbonTableInputFormat.java      | 111 ++++----
 .../carbondata/spark/rdd/StreamHandoffRDD.scala |   6 +-
 .../streaming/CarbonAppendableStreamSink.scala  |  18 +-
 .../TestStreamingTableOperation.scala           |  96 +++++++
 .../streaming/CarbonStreamRecordReader.java     |  16 +-
 .../streaming/CarbonStreamRecordWriter.java     |  48 +++-
 .../streaming/StreamBlockletWriter.java         |  87 +++++-
 .../streaming/index/StreamFileIndex.java        |  77 ++++++
 .../streaming/segment/StreamSegment.java        | 275 ++++++++++++++++++-
 12 files changed, 883 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/21a72bf2/core/src/main/java/org/apache/carbondata/core/stream/StreamFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/stream/StreamFile.java b/core/src/main/java/org/apache/carbondata/core/stream/StreamFile.java
new file mode 100644
index 0000000..64c916f
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/stream/StreamFile.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.stream;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex;
+
+@InterfaceAudience.Internal
+public class StreamFile {
+
+  private String segmentNo;
+
+  private String filePath;
+
+  private long fileSize;
+
+  private BlockletMinMaxIndex minMaxIndex;
+
+  public StreamFile(String segmentNo, String filePath, long fileSize) {
+    this.segmentNo = segmentNo;
+    this.filePath = filePath;
+    this.fileSize = fileSize;
+  }
+
+  public String getSegmentNo() {
+    return segmentNo;
+  }
+
+  public String getFilePath() {
+    return filePath;
+  }
+
+  public void setFilePath(String filePath) {
+    this.filePath = filePath;
+  }
+
+  public long getFileSize() {
+    return fileSize;
+  }
+
+  public BlockletMinMaxIndex getMinMaxIndex() {
+    return minMaxIndex;
+  }
+
+  public void setMinMaxIndex(BlockletMinMaxIndex minMaxIndex) {
+    this.minMaxIndex = minMaxIndex;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/21a72bf2/core/src/main/java/org/apache/carbondata/core/stream/StreamPruner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/stream/StreamPruner.java b/core/src/main/java/org/apache/carbondata/core/stream/StreamPruner.java
new file mode 100644
index 0000000..ac3589f
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/stream/StreamPruner.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.stream;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.reader.CarbonIndexFileReader;
+import org.apache.carbondata.core.scan.filter.FilterUtil;
+import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import org.apache.carbondata.core.util.CarbonMetadataUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.format.BlockIndex;
+
+@InterfaceAudience.Internal
+public class StreamPruner {
+
+  private CarbonTable carbonTable;
+  private FilterExecuter filterExecuter;
+
+  public StreamPruner(CarbonTable carbonTable) {
+    this.carbonTable = carbonTable;
+  }
+
+  public void init(FilterResolverIntf filterExp) {
+    if (filterExp != null) {
+      // cache all columns
+      List<CarbonColumn> minMaxCacheColumns = new ArrayList<>();
+      for (CarbonDimension dimension : carbonTable.getDimensions()) {
+        if (!dimension.isComplex()) {
+          minMaxCacheColumns.add(dimension);
+        }
+      }
+      minMaxCacheColumns.addAll(carbonTable.getMeasures());
+      // prepare cardinality of all dimensions
+      List<ColumnSchema> listOfColumns =
+          carbonTable.getTableInfo().getFactTable().getListOfColumns();
+      int[] columnCardinality = new int[listOfColumns.size()];
+      for (int index = 0; index < columnCardinality.length; index++) {
+        columnCardinality[index] = Integer.MAX_VALUE;
+      }
+      // initial filter executor
+      SegmentProperties segmentProperties =
+          new SegmentProperties(listOfColumns, columnCardinality);
+      filterExecuter = FilterUtil.getFilterExecuterTree(
+          filterExp, segmentProperties, null, minMaxCacheColumns);
+    }
+  }
+
+  public List<StreamFile> prune(List<Segment> segments) throws IOException {
+    if (filterExecuter == null) {
+      // if filter is null, list all steam files
+      return listAllStreamFiles(segments, false);
+    } else {
+      List<StreamFile> streamFileList = new ArrayList<>();
+      for (StreamFile streamFile : listAllStreamFiles(segments, true)) {
+        if (isScanRequire(streamFile)) {
+          // if stream file is required to scan
+          streamFileList.add(streamFile);
+          streamFile.setMinMaxIndex(null);
+        }
+      }
+      return streamFileList;
+    }
+  }
+
+  private boolean isScanRequire(StreamFile streamFile) {
+    // backward compatibility, old stream file without min/max index
+    if (streamFile.getMinMaxIndex() == null) {
+      return true;
+    }
+    byte[][] maxValue = streamFile.getMinMaxIndex().getMaxValues();
+    byte[][] minValue = streamFile.getMinMaxIndex().getMinValues();
+    BitSet bitSet = filterExecuter.isScanRequired(maxValue, minValue);
+    if (!bitSet.isEmpty()) {
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  // TODO optimize and move the code to StreamSegment , but it's in the streaming module.
+  private List<StreamFile> listAllStreamFiles(List<Segment> segments, boolean withMinMax)
+      throws IOException {
+    List<StreamFile> streamFileList = new ArrayList<>();
+    for (Segment segment : segments) {
+      String segmentDir = CarbonTablePath.getSegmentPath(
+          carbonTable.getAbsoluteTableIdentifier().getTablePath(), segment.getSegmentNo());
+      String indexFile = CarbonTablePath.getCarbonStreamIndexFilePath(segmentDir);
+      FileFactory.FileType fileType = FileFactory.getFileType(indexFile);
+      if (FileFactory.isFileExist(indexFile, fileType)) {
+        CarbonIndexFileReader indexReader = new CarbonIndexFileReader();
+        indexReader.openThriftReader(indexFile);
+        try {
+          while (indexReader.hasNext()) {
+            BlockIndex blockIndex = indexReader.readBlockIndexInfo();
+            String filePath = segmentDir + File.separator + blockIndex.getFile_name();
+            long length = blockIndex.getFile_size();
+            StreamFile streamFile = new StreamFile(segment.getSegmentNo(), filePath, length);
+            streamFileList.add(streamFile);
+            if (withMinMax) {
+              if (blockIndex.getBlock_index() != null
+                  && blockIndex.getBlock_index().getMin_max_index() != null) {
+                streamFile.setMinMaxIndex(CarbonMetadataUtil
+                    .convertExternalMinMaxIndex(blockIndex.getBlock_index().getMin_max_index()));
+              }
+            }
+          }
+        } finally {
+          indexReader.closeThriftReader();
+        }
+      }
+    }
+    return streamFileList;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/21a72bf2/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
index 70443d8..571a247 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
@@ -96,14 +96,41 @@ public class CarbonMetadataUtil {
     return footer;
   }
 
-  public static BlockletIndex getBlockletIndex(
-      org.apache.carbondata.core.metadata.blocklet.index.BlockletIndex info) {
+  /**
+   * convert external thrift BlockletMinMaxIndex to BlockletMinMaxIndex of carbon metadata
+   */
+  public static org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex
+      convertExternalMinMaxIndex(BlockletMinMaxIndex minMaxIndex) {
+    if (minMaxIndex == null) {
+      return null;
+    }
+
+    return new org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex(
+            minMaxIndex.getMin_values(), minMaxIndex.getMax_values());
+  }
+
+  /**
+   * convert BlockletMinMaxIndex of carbon metadata to external thrift BlockletMinMaxIndex
+   */
+  public static BlockletMinMaxIndex convertMinMaxIndex(
+      org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex minMaxIndex) {
+    if (minMaxIndex == null) {
+      return null;
+    }
+
     BlockletMinMaxIndex blockletMinMaxIndex = new BlockletMinMaxIndex();
 
-    for (int i = 0; i < info.getMinMaxIndex().getMaxValues().length; i++) {
-      blockletMinMaxIndex.addToMax_values(ByteBuffer.wrap(info.getMinMaxIndex().getMaxValues()[i]));
-      blockletMinMaxIndex.addToMin_values(ByteBuffer.wrap(info.getMinMaxIndex().getMinValues()[i]));
+    for (int i = 0; i < minMaxIndex.getMaxValues().length; i++) {
+      blockletMinMaxIndex.addToMax_values(ByteBuffer.wrap(minMaxIndex.getMaxValues()[i]));
+      blockletMinMaxIndex.addToMin_values(ByteBuffer.wrap(minMaxIndex.getMinValues()[i]));
     }
+
+    return blockletMinMaxIndex;
+  }
+
+  public static BlockletIndex getBlockletIndex(
+      org.apache.carbondata.core.metadata.blocklet.index.BlockletIndex info) {
+    BlockletMinMaxIndex blockletMinMaxIndex = convertMinMaxIndex(info.getMinMaxIndex());
     BlockletBTreeIndex blockletBTreeIndex = new BlockletBTreeIndex();
     blockletBTreeIndex.setStart_key(info.getBtreeIndex().getStartKey());
     blockletBTreeIndex.setEnd_key(info.getBtreeIndex().getEndKey());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/21a72bf2/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index ec201b9..4f85975 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -17,7 +17,6 @@
 
 package org.apache.carbondata.hadoop.api;
 
-import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -34,7 +33,6 @@ import org.apache.carbondata.core.datamap.TableDataMap;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.indexstore.PartitionSpec;
-import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.schema.PartitionInfo;
 import org.apache.carbondata.core.metadata.schema.SchemaReader;
@@ -48,7 +46,6 @@ import org.apache.carbondata.core.mutate.data.BlockMappingVO;
 import org.apache.carbondata.core.readcommitter.LatestFilesReadCommittedScope;
 import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
 import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope;
-import org.apache.carbondata.core.reader.CarbonIndexFileReader;
 import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
@@ -56,9 +53,10 @@ import org.apache.carbondata.core.statusmanager.FileFormat;
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
 import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
+import org.apache.carbondata.core.stream.StreamFile;
+import org.apache.carbondata.core.stream.StreamPruner;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
-import org.apache.carbondata.format.BlockIndex;
 import org.apache.carbondata.hadoop.CarbonInputSplit;
 
 import org.apache.commons.logging.Log;
@@ -158,12 +156,12 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
         streamSegments = segments.getStreamSegments();
         streamSegments = getFilteredSegment(job, streamSegments, true, readCommittedScope);
         if (validSegments.size() == 0) {
-          return getSplitsOfStreaming(job, identifier, streamSegments);
+          return getSplitsOfStreaming(job, streamSegments, carbonTable);
         }
         List<Segment> filteredSegmentToAccess =
             getFilteredSegment(job, segments.getValidSegments(), true, readCommittedScope);
         if (filteredSegmentToAccess.size() == 0) {
-          return getSplitsOfStreaming(job, identifier, streamSegments);
+          return getSplitsOfStreaming(job, streamSegments, carbonTable);
         } else {
           setSegmentsToAccess(job.getConfiguration(), filteredSegmentToAccess);
         }
@@ -173,7 +171,7 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
                 getSegmentsToAccess(job, readCommittedScope));
         streamSegments = segments.getStreamSegments();
         if (filteredNormalSegments.size() == 0) {
-          return getSplitsOfStreaming(job, identifier, streamSegments);
+          return getSplitsOfStreaming(job, streamSegments, carbonTable);
         }
         setSegmentsToAccess(job.getConfiguration(),filteredNormalSegments);
       }
@@ -231,7 +229,8 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
     }
 
     // add all splits of streaming
-    List<InputSplit> splitsOfStreaming = getSplitsOfStreaming(job, identifier, streamSegments);
+    List<InputSplit> splitsOfStreaming =
+        getSplitsOfStreaming(job, streamSegments, carbonTable, filterInterface);
     if (!splitsOfStreaming.isEmpty()) {
       splits.addAll(splitsOfStreaming);
     }
@@ -339,64 +338,60 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
     return filteredSegmentToAccess;
   }
 
+  public List<InputSplit> getSplitsOfStreaming(JobContext job, List<Segment> streamSegments,
+      CarbonTable carbonTable) throws IOException {
+    return getSplitsOfStreaming(job, streamSegments, carbonTable, null);
+  }
+
   /**
    * use file list in .carbonindex file to get the split of streaming.
    */
-  public List<InputSplit> getSplitsOfStreaming(JobContext job, AbsoluteTableIdentifier identifier,
-      List<Segment> streamSegments) throws IOException {
+  public List<InputSplit> getSplitsOfStreaming(JobContext job, List<Segment> streamSegments,
+      CarbonTable carbonTable, FilterResolverIntf filterResolverIntf) throws IOException {
     List<InputSplit> splits = new ArrayList<InputSplit>();
     if (streamSegments != null && !streamSegments.isEmpty()) {
       numStreamSegments = streamSegments.size();
       long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
       long maxSize = getMaxSplitSize(job);
-      for (Segment segment : streamSegments) {
-        String segmentDir =
-            CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment.getSegmentNo());
-        FileFactory.FileType fileType = FileFactory.getFileType(segmentDir);
-        if (FileFactory.isFileExist(segmentDir, fileType)) {
-          SegmentIndexFileStore segmentIndexFileStore = new SegmentIndexFileStore();
-          segmentIndexFileStore.readAllIIndexOfSegment(segmentDir);
-          Map<String, byte[]> carbonIndexMap = segmentIndexFileStore.getCarbonIndexMap();
-          CarbonIndexFileReader indexReader = new CarbonIndexFileReader();
-          for (byte[] fileData : carbonIndexMap.values()) {
-            indexReader.openThriftReader(fileData);
-            try {
-              // map block index
-              while (indexReader.hasNext()) {
-                BlockIndex blockIndex = indexReader.readBlockIndexInfo();
-                String filePath = segmentDir + File.separator + blockIndex.getFile_name();
-                Path path = new Path(filePath);
-                long length = blockIndex.getFile_size();
-                if (length != 0) {
-                  BlockLocation[] blkLocations;
-                  FileSystem fs = FileFactory.getFileSystem(path);
-                  FileStatus file = fs.getFileStatus(path);
-                  blkLocations = fs.getFileBlockLocations(path, 0, length);
-                  long blockSize = file.getBlockSize();
-                  long splitSize = computeSplitSize(blockSize, minSize, maxSize);
-                  long bytesRemaining = length;
-                  while (((double) bytesRemaining) / splitSize > 1.1) {
-                    int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);
-                    splits.add(makeSplit(segment.getSegmentNo(), path, length - bytesRemaining,
-                        splitSize, blkLocations[blkIndex].getHosts(),
-                        blkLocations[blkIndex].getCachedHosts(), FileFormat.ROW_V1));
-                    bytesRemaining -= splitSize;
-                  }
-                  if (bytesRemaining != 0) {
-                    int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);
-                    splits.add(makeSplit(segment.getSegmentNo(), path, length - bytesRemaining,
-                        bytesRemaining, blkLocations[blkIndex].getHosts(),
-                        blkLocations[blkIndex].getCachedHosts(), FileFormat.ROW_V1));
-                  }
-                } else {
-                  //Create empty hosts array for zero length files
-                  splits.add(makeSplit(segment.getSegmentNo(), path, 0, length, new String[0],
-                      FileFormat.ROW_V1));
-                }
-              }
-            } finally {
-              indexReader.closeThriftReader();
-            }
+      if (filterResolverIntf == null) {
+        if (carbonTable != null) {
+          Expression filter = getFilterPredicates(job.getConfiguration());
+          if (filter != null) {
+            carbonTable.processFilterExpression(filter, null, null);
+            filterResolverIntf = carbonTable.resolveFilter(filter);
+          }
+        }
+      }
+      StreamPruner streamPruner = new StreamPruner(carbonTable);
+      streamPruner.init(filterResolverIntf);
+      List<StreamFile> streamFiles = streamPruner.prune(streamSegments);
+
+      for (StreamFile streamFile : streamFiles) {
+        Path path = new Path(streamFile.getFilePath());
+        long length = streamFile.getFileSize();
+        if (length != 0) {
+          BlockLocation[] blkLocations;
+          FileSystem fs = FileFactory.getFileSystem(path);
+          FileStatus file = fs.getFileStatus(path);
+          blkLocations = fs.getFileBlockLocations(path, 0, length);
+          long blockSize = file.getBlockSize();
+          long splitSize = computeSplitSize(blockSize, minSize, maxSize);
+          long bytesRemaining = length;
+          // split the stream file to small splits
+          // there is 10% slop to avoid to generate very small split in the end
+          while (((double) bytesRemaining) / splitSize > 1.1) {
+            int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);
+            splits.add(
+                makeSplit(streamFile.getSegmentNo(), path, length - bytesRemaining,
+                    splitSize, blkLocations[blkIndex].getHosts(),
+                    blkLocations[blkIndex].getCachedHosts(), FileFormat.ROW_V1));
+            bytesRemaining -= splitSize;
+          }
+          if (bytesRemaining != 0) {
+            int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);
+            splits.add(makeSplit(streamFile.getSegmentNo(), path, length - bytesRemaining,
+                bytesRemaining, blkLocations[blkIndex].getHosts(),
+                blkLocations[blkIndex].getCachedHosts(), FileFormat.ROW_V1));
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/21a72bf2/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
index b63e0dc..994cb3d 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
@@ -205,8 +205,8 @@ class StreamHandoffRDD[K, V](
     segmentList.add(Segment.toSegment(handOffSegmentId, null))
     val splits = inputFormat.getSplitsOfStreaming(
       job,
-      carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getAbsoluteTableIdentifier,
-      segmentList
+      segmentList,
+      carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
     )
 
     (0 until splits.size()).map { index =>
@@ -334,9 +334,9 @@ object StreamHandoffRDD {
     } catch {
       case ex: Exception =>
         loadStatus = SegmentStatus.LOAD_FAILURE
+        LOGGER.error(ex, s"Handoff failed on streaming segment $handoffSegmenId")
         errorMessage = errorMessage + ": " + ex.getCause.getMessage
         LOGGER.error(errorMessage)
-        LOGGER.error(ex, s"Handoff failed on streaming segment $handoffSegmenId")
     }
 
     if (loadStatus == SegmentStatus.LOAD_FAILURE) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/21a72bf2/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
index ffaac86..196baa6 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
@@ -19,8 +19,6 @@ package org.apache.spark.sql.execution.streaming
 
 import java.util.Date
 
-import scala.collection.JavaConverters._
-
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.mapreduce._
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
@@ -49,6 +47,7 @@ import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePost
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.spark.rdd.StreamHandoffRDD
 import org.apache.carbondata.streaming.{CarbonStreamException, CarbonStreamOutputFormat}
+import org.apache.carbondata.streaming.index.StreamFileIndex
 import org.apache.carbondata.streaming.parser.CarbonStreamParser
 import org.apache.carbondata.streaming.segment.StreamSegment
 
@@ -236,7 +235,7 @@ object CarbonAppendableStreamSink {
 
     // run write data file job
     SQLExecution.withNewExecutionId(sparkSession, queryExecution) {
-      var result: Array[TaskCommitMessage] = null
+      var result: Array[(TaskCommitMessage, StreamFileIndex)] = null
       try {
         committer.setupJob(job)
         // initialize dictionary server
@@ -275,7 +274,8 @@ object CarbonAppendableStreamSink {
 
         // update data file info in index file
         StreamSegment.updateIndexFile(
-          CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentId))
+          CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentId),
+          result.map(_._2))
 
       } catch {
         // catch fault of executor side
@@ -286,7 +286,7 @@ object CarbonAppendableStreamSink {
           committer.abortJob(job)
           throw new CarbonStreamException("Job failed to write data file", t)
       }
-      committer.commitJob(job, result)
+      committer.commitJob(job, result.map(_._1))
       LOGGER.info(s"Job ${ job.getJobID } committed.")
     }
   }
@@ -302,8 +302,7 @@ object CarbonAppendableStreamSink {
       sparkAttemptNumber: Int,
       committer: FileCommitProtocol,
       iterator: Iterator[InternalRow],
-      rowSchema: StructType
-  ): TaskCommitMessage = {
+      rowSchema: StructType): (TaskCommitMessage, StreamFileIndex) = {
 
     val jobId = CarbonInputFormatUtil.getJobId(new Date, sparkStageId)
     val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId)
@@ -325,6 +324,7 @@ object CarbonAppendableStreamSink {
     committer.setupTask(taskAttemptContext)
 
     try {
+      var blockIndex: StreamFileIndex = null
       Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
 
         val parserName = taskAttemptContext.getConfiguration.get(
@@ -335,13 +335,13 @@ object CarbonAppendableStreamSink {
           Class.forName(parserName).newInstance.asInstanceOf[CarbonStreamParser]
         streamParser.initialize(taskAttemptContext.getConfiguration, rowSchema)
 
-        StreamSegment.appendBatchData(new InputIterator(iterator, streamParser),
+        blockIndex = StreamSegment.appendBatchData(new InputIterator(iterator, streamParser),
           taskAttemptContext, carbonLoadModel)
       })(catchBlock = {
         committer.abortTask(taskAttemptContext)
         LOGGER.error(s"Job $jobId aborted.")
       })
-      committer.commitTask(taskAttemptContext)
+      (committer.commitTask(taskAttemptContext), blockIndex)
     } catch {
       case t: Throwable =>
         throw new CarbonStreamException("Task failed while writing rows", t)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/21a72bf2/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
index 4dde81e..ab5539a 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
@@ -25,6 +25,7 @@ import java.util.concurrent.Executors
 
 import scala.collection.mutable
 
+import org.apache.spark.rdd.RDD
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
@@ -42,6 +43,7 @@ import org.apache.carbondata.core.statusmanager.{FileFormat, SegmentStatus}
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.spark.exception.ProcessMetaDataException
+import org.apache.carbondata.spark.rdd.CarbonScanRDD
 import org.apache.carbondata.streaming.parser.CarbonStreamParser
 
 class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
@@ -656,6 +658,9 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
   }
 
   test("query on stream table with dictionary, sort_columns") {
+    val batchParts =
+      partitionNums("select * from streaming.stream_table_filter")
+
     executeStreamingIngest(
       tableName = "stream_table_filter",
       batchNums = 2,
@@ -668,6 +673,12 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
       autoHandoff = false
     )
 
+    val totalParts =
+      partitionNums("select * from streaming.stream_table_filter")
+    assert(totalParts > batchParts)
+
+    val streamParts = totalParts - batchParts
+
     // non-filter
     val result = sql("select * from streaming.stream_table_filter order by id, name").collect()
     assert(result != null)
@@ -680,63 +691,79 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
     assert(result(50).getString(1) == "batch_1")
 
     // filter
+    assert(batchParts >= partitionNums("select * from stream_table_filter where id >= 100000001"))
+
     checkAnswer(
       sql("select * from stream_table_filter where id = 1"),
       Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+    assert(streamParts >= partitionNums("select * from stream_table_filter where id = 1"))
 
     checkAnswer(
       sql("select * from stream_table_filter where id > 49 and id < 100000002"),
       Seq(Row(50, "name_50", "city_50", 500000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")),
         Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+    assert(totalParts >= partitionNums("select * from stream_table_filter where id > 49 and id < 100000002"))
 
     checkAnswer(
       sql("select * from stream_table_filter where id between 50 and 100000001"),
       Seq(Row(50, "name_50", "city_50", 500000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")),
         Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+    assert(totalParts >= partitionNums("select * from stream_table_filter where id between 50 and 100000001"))
 
     checkAnswer(
       sql("select * from stream_table_filter where name in ('name_9','name_10', 'name_11', 'name_12') and id <> 10 and id not in (11, 12)"),
       Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+    assert(streamParts >= partitionNums("select * from stream_table_filter where name in ('name_9','name_10', 'name_11', 'name_12') and id <> 10 and id not in (11, 12)"))
 
     checkAnswer(
       sql("select * from stream_table_filter where name = 'name_3'"),
       Seq(Row(3, "name_3", "city_3", 30000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+    assert(streamParts >= partitionNums("select * from stream_table_filter where name = 'name_3'"))
 
     checkAnswer(
       sql("select * from stream_table_filter where name like '%me_3%' and id < 30"),
       Seq(Row(3, "name_3", "city_3", 30000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+    assert(streamParts >= partitionNums("select * from stream_table_filter where name like '%me_3%' and id < 30"))
 
     checkAnswer(sql("select count(*) from stream_table_filter where name like '%ame%'"),
       Seq(Row(49)))
+    assert(totalParts == partitionNums("select count(*) from stream_table_filter where name like '%ame%'"))
 
     checkAnswer(sql("select count(*) from stream_table_filter where name like '%batch%'"),
       Seq(Row(5)))
+    assert(totalParts == partitionNums("select count(*) from stream_table_filter where name like '%batch%'"))
 
     checkAnswer(
       sql("select * from stream_table_filter where name >= 'name_3' and id < 4"),
       Seq(Row(3, "name_3", "city_3", 30000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+    assert(streamParts >= partitionNums("select * from stream_table_filter where name >= 'name_3' and id < 4"))
 
     checkAnswer(
       sql("select * from stream_table_filter where id in (9, 10, 11, 12) and name <> 'name_10' and name not in ('name_11', 'name_12')"),
       Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+    assert(streamParts >= partitionNums("select * from stream_table_filter where id in (9, 10, 11, 12) and name <> 'name_10' and name not in ('name_11', 'name_12')"))
 
     checkAnswer(
       sql("select * from stream_table_filter where city = 'city_1'"),
       Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")),
         Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+    assert(totalParts >= partitionNums("select * from stream_table_filter where city = 'city_1'"))
 
     checkAnswer(
       sql("select * from stream_table_filter where city like '%ty_1%' and ( id < 10 or id >= 100000001)"),
       Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")),
         Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+    assert(totalParts >= partitionNums("select * from stream_table_filter where city like '%ty_1%' and ( id < 10 or id >= 100000001)"))
 
     checkAnswer(sql("select count(*) from stream_table_filter where city like '%city%'"),
       Seq(Row(54)))
+    assert(totalParts == partitionNums("select count(*) from stream_table_filter where city like '%city%'"))
 
     checkAnswer(
       sql("select * from stream_table_filter where city > 'city_09' and city < 'city_10'"),
       Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")),
         Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+    assert(totalParts >= partitionNums("select * from stream_table_filter where city > 'city_09' and city < 'city_10'"))
 
     checkAnswer(
       sql("select * from stream_table_filter where city between 'city_09' and 'city_1'"),
@@ -746,204 +773,252 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
     checkAnswer(
       sql("select * from stream_table_filter where id in (9, 10, 11, 12) and city <> 'city_10' and city not in ('city_11', 'city_12')"),
       Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+    assert(streamParts >= partitionNums("select * from stream_table_filter where id in (9, 10, 11, 12) and city <> 'city_10' and city not in ('city_11', 'city_12')"))
 
     checkAnswer(
       sql("select * from stream_table_filter where salary = 90000"),
       Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+    assert(streamParts >= partitionNums("select * from stream_table_filter where salary = 90000"))
 
     checkAnswer(
       sql("select * from stream_table_filter where salary > 80000 and salary <= 100000"),
       Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
         Row(10, "name_10", "city_10", 100000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+    assert(streamParts >= partitionNums("select * from stream_table_filter where salary > 80000 and salary <= 100000"))
 
     checkAnswer(
       sql("select * from stream_table_filter where salary between 80001 and 90000"),
       Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+    assert(streamParts >= partitionNums("select * from stream_table_filter where salary between 80001 and 90000"))
 
     checkAnswer(
       sql("select * from stream_table_filter where id in (9, 10, 11, 12) and salary <> 100000.0 and salary not in (110000.0, 120000.0)"),
       Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+    assert(streamParts >= partitionNums("select * from stream_table_filter where id in (9, 10, 11, 12) and salary <> 100000.0 and salary not in (110000.0, 120000.0)"))
 
     checkAnswer(
       sql("select * from stream_table_filter where tax = 0.04 and id < 100"),
       Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+    assert(streamParts >= partitionNums("select * from stream_table_filter where tax = 0.04 and id < 100"))
 
     checkAnswer(
       sql("select * from stream_table_filter where tax >= 0.04 and id < 100"),
       Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+    assert(streamParts >= partitionNums("select * from stream_table_filter where tax >= 0.04 and id < 100"))
 
     checkAnswer(
       sql("select * from stream_table_filter where tax < 0.05 and tax > 0.02 and id < 100"),
       Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+    assert(streamParts >= partitionNums("select * from stream_table_filter where tax < 0.05 and tax > 0.02 and id < 100"))
 
     checkAnswer(
       sql("select * from stream_table_filter where tax between 0.02 and 0.04 and id < 100"),
       Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+    assert(streamParts >= partitionNums("select * from stream_table_filter where tax between 0.02 and 0.04 and id < 100"))
 
     checkAnswer(
       sql("select * from stream_table_filter where id in (9, 10) and tax <> 0.01"),
       Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+    assert(streamParts >= partitionNums("select * from stream_table_filter where id in (9, 10) and tax <> 0.01"))
 
     checkAnswer(
       sql("select * from stream_table_filter where percent = 80.04 and id < 100"),
       Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+    assert(streamParts >= partitionNums("select * from stream_table_filter where percent = 80.04 and id < 100"))
 
     checkAnswer(
       sql("select * from stream_table_filter where percent >= 80.04 and id < 100"),
       Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+    assert(streamParts >= partitionNums("select * from stream_table_filter where percent >= 80.04 and id < 100"))
 
     checkAnswer(
       sql("select * from stream_table_filter where percent < 80.05 and percent > 80.02 and id < 100"),
       Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+    assert(streamParts >= partitionNums("select * from stream_table_filter where percent < 80.05 and percent > 80.02 and id < 100"))
 
     checkAnswer(
       sql("select * from stream_table_filter where percent between 80.02 and 80.05 and id < 100"),
       Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+    assert(streamParts >= partitionNums("select * from stream_table_filter where percent between 80.02 and 80.05 and id < 100"))
 
     checkAnswer(
       sql("select * from stream_table_filter where id in (9, 10) and percent <> 80.01"),
       Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+    assert(streamParts >= partitionNums("select * from stream_table_filter where id in (9, 10) and percent <> 80.01"))
 
     checkAnswer(
       sql("select * from stream_table_filter where birthday between '1990-01-04' and '1990-01-05'"),
       Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
         Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
         Row(100000005, "batch_5", "city_5", 0.5, BigDecimal.valueOf(0.05), 80.05, Date.valueOf("1990-01-05"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Timestamp.valueOf("2010-01-05 10:01:01.0"))))
+    assert(totalParts >= partitionNums("select * from stream_table_filter where birthday between '1990-01-04' and '1990-01-05'"))
 
     checkAnswer(
       sql("select * from stream_table_filter where birthday = '1990-01-04'"),
       Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
         Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+    assert(totalParts >= partitionNums("select * from stream_table_filter where birthday = '1990-01-04'"))
 
     checkAnswer(
       sql("select * from stream_table_filter where birthday > '1990-01-03' and birthday <= '1990-01-04'"),
       Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
         Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+    assert(totalParts >= partitionNums("select * from stream_table_filter where birthday > '1990-01-03' and birthday <= '1990-01-04'"))
 
     checkAnswer(
       sql("select * from stream_table_filter where birthday between '1990-01-04' and '1990-01-05'"),
       Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
         Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
         Row(100000005, "batch_5", "city_5", 0.5, BigDecimal.valueOf(0.05), 80.05, Date.valueOf("1990-01-05"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Timestamp.valueOf("2010-01-05 10:01:01.0"))))
+    assert(totalParts >= partitionNums("select * from stream_table_filter where birthday between '1990-01-04' and '1990-01-05'"))
 
     checkAnswer(
       sql("select * from stream_table_filter where id in (9, 10) and birthday <> '1990-01-01'"),
       Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+    assert(streamParts >= partitionNums("select * from stream_table_filter where id in (9, 10) and birthday <> '1990-01-01'"))
 
     checkAnswer(
       sql("select * from stream_table_filter where register = '2010-01-04 10:01:01'"),
       Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
         Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+    assert(totalParts >= partitionNums("select * from stream_table_filter where register = '2010-01-04 10:01:01'"))
 
     checkAnswer(
       sql("select * from stream_table_filter where register > '2010-01-03 10:01:01' and register <= '2010-01-04 10:01:01'"),
       Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
         Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+    assert(totalParts >= partitionNums("select * from stream_table_filter where register > '2010-01-03 10:01:01' and register <= '2010-01-04 10:01:01'"))
 
     checkAnswer(
       sql("select * from stream_table_filter where register between '2010-01-04 10:01:01' and '2010-01-05 10:01:01'"),
       Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
         Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
         Row(100000005, "batch_5", "city_5", 0.5, BigDecimal.valueOf(0.05), 80.05, Date.valueOf("1990-01-05"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Timestamp.valueOf("2010-01-05 10:01:01.0"))))
+    assert(totalParts >= partitionNums("select * from stream_table_filter where register between '2010-01-04 10:01:01' and '2010-01-05 10:01:01'"))
 
     checkAnswer(
       sql("select * from stream_table_filter where id in (9, 10) and register <> '2010-01-01 10:01:01'"),
       Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+    assert(totalParts >= partitionNums("select * from stream_table_filter where id in (9, 10) and register <> '2010-01-01 10:01:01'"))
 
     checkAnswer(
       sql("select * from stream_table_filter where updated = '2010-01-04 10:01:01'"),
       Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
         Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+    assert(totalParts >= partitionNums("select * from stream_table_filter where updated = '2010-01-04 10:01:01'"))
 
     checkAnswer(
       sql("select * from stream_table_filter where updated > '2010-01-03 10:01:01' and register <= '2010-01-04 10:01:01'"),
       Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
         Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+    assert(totalParts >= partitionNums("select * from stream_table_filter where updated > '2010-01-03 10:01:01' and register <= '2010-01-04 10:01:01'"))
 
     checkAnswer(
       sql("select * from stream_table_filter where updated between '2010-01-04 10:01:01' and '2010-01-05 10:01:01'"),
       Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
         Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
         Row(100000005, "batch_5", "city_5", 0.5, BigDecimal.valueOf(0.05), 80.05, Date.valueOf("1990-01-05"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Timestamp.valueOf("2010-01-05 10:01:01.0"))))
+    assert(totalParts >= partitionNums("select * from stream_table_filter where updated between '2010-01-04 10:01:01' and '2010-01-05 10:01:01'"))
 
     checkAnswer(
       sql("select * from stream_table_filter where id in (9, 10) and updated <> '2010-01-01 10:01:01'"),
       Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+    assert(streamParts >= partitionNums("select * from stream_table_filter where id in (9, 10) and updated <> '2010-01-01 10:01:01'"))
 
     checkAnswer(
       sql("select * from stream_table_filter where id is null order by name"),
       Seq(Row(null, "", "", null, null, null, null, null, null),
         Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+    assert(totalParts >= partitionNums("select * from stream_table_filter where id is null order by name"))
 
     checkAnswer(
       sql("select * from stream_table_filter where name = ''"),
       Seq(Row(null, "", "", null, null, null, null, null, null)))
+    assert(streamParts >= partitionNums("select * from stream_table_filter where name = ''"))
 
     checkAnswer(
       sql("select * from stream_table_filter where id is null and name <> ''"),
       Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+    assert(totalParts == partitionNums("select * from stream_table_filter where id is null and name <> ''"))
 
     checkAnswer(
       sql("select * from stream_table_filter where city = ''"),
       Seq(Row(null, "", "", null, null, null, null, null, null)))
+    assert(streamParts >= partitionNums("select * from stream_table_filter where city = ''"))
 
     checkAnswer(
       sql("select * from stream_table_filter where id is null and city <> ''"),
       Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+    assert(totalParts == partitionNums("select * from stream_table_filter where id is null and city <> ''"))
 
     checkAnswer(
       sql("select * from stream_table_filter where salary is null"),
       Seq(Row(null, "", "", null, null, null, null, null, null)))
+    assert(totalParts == partitionNums("select * from stream_table_filter where salary is null"))
 
     checkAnswer(
       sql("select * from stream_table_filter where id is null and salary is not null"),
       Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+    assert(totalParts == partitionNums("select * from stream_table_filter where id is null and salary is not null"))
 
     checkAnswer(
       sql("select * from stream_table_filter where tax is null"),
       Seq(Row(null, "", "", null, null, null, null, null, null)))
+    assert(totalParts == partitionNums("select * from stream_table_filter where tax is null"))
 
     checkAnswer(
       sql("select * from stream_table_filter where id is null and tax is not null"),
       Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+    assert(totalParts == partitionNums("select * from stream_table_filter where id is null and tax is not null"))
 
     checkAnswer(
       sql("select * from stream_table_filter where percent is null"),
       Seq(Row(null, "", "", null, null, null, null, null, null)))
+    assert(totalParts == partitionNums("select * from stream_table_filter where percent is null"))
 
     checkAnswer(
       sql("select * from stream_table_filter where id is null and percent is not null"),
       Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+    assert(totalParts == partitionNums("select * from stream_table_filter where id is null and percent is not null"))
 
     checkAnswer(
       sql("select * from stream_table_filter where birthday is null"),
       Seq(Row(null, "", "", null, null, null, null, null, null)))
+    assert(1 == partitionNums("select * from stream_table_filter where birthday is null"))
 
     checkAnswer(
       sql("select * from stream_table_filter where id is null and birthday is not null"),
       Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+    assert(totalParts == partitionNums("select * from stream_table_filter where id is null and birthday is not null"))
 
     checkAnswer(
       sql("select * from stream_table_filter where register is null"),
       Seq(Row(null, "", "", null, null, null, null, null, null)))
+    assert(1 == partitionNums("select * from stream_table_filter where register is null"))
 
     checkAnswer(
       sql("select * from stream_table_filter where id is null and register is not null"),
       Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+    assert(totalParts == partitionNums("select * from stream_table_filter where id is null and register is not null"))
 
     checkAnswer(
       sql("select * from stream_table_filter where updated is null"),
       Seq(Row(null, "", "", null, null, null, null, null, null)))
+    assert(1 == partitionNums("select * from stream_table_filter where updated is null"))
 
     checkAnswer(
       sql("select * from stream_table_filter where id is null and updated is not null"),
       Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+    assert(totalParts == partitionNums("select * from stream_table_filter where id is null and updated is not null"))
 
     // agg
     checkAnswer(
       sql("select count(*), max(id), min(name), cast(avg(id) as integer), sum(id) " +
           "from stream_table_filter where id >= 2 and id <= 100000004"),
       Seq(Row(51, 100000004, "batch_1", 7843162, 400001276)))
+    assert(totalParts >= partitionNums(
+      "select count(*), max(id), min(name), cast(avg(id) as integer), sum(id) " +
+      "from stream_table_filter where id >= 2 and id <= 100000004"))
 
     checkAnswer(
       sql("select city, count(id), sum(id), cast(avg(id) as integer), " +
@@ -956,6 +1031,14 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
       Seq(Row("city_1", 2, 100000002, 50000001, 10000.0, 0.1),
         Row("city_2", 1, 100000002, 100000002, 0.2, 0.2),
         Row("city_3", 2, 100000006, 50000003, 30000.0, 0.3)))
+    assert(totalParts >= partitionNums(
+      "select city, count(id), sum(id), cast(avg(id) as integer), " +
+      "max(salary), min(salary) " +
+      "from stream_table_filter " +
+      "where name in ('batch_1', 'batch_2', 'batch_3', 'name_1', 'name_2', 'name_3') " +
+      "and city <> '' " +
+      "group by city " +
+      "order by city"))
 
     // batch loading
     for(_ <- 0 to 2) {
@@ -2508,4 +2591,17 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
     serverSocket
   }
 
+  def findCarbonScanRDD(rdd: RDD[_]): RDD[_] = {
+    if (rdd.isInstanceOf[CarbonScanRDD[_]]) {
+      rdd
+    } else {
+      findCarbonScanRDD(rdd.dependencies(0).rdd)
+    }
+  }
+
+  def partitionNums(sqlString : String): Int = {
+    val rdd = findCarbonScanRDD(sql(sqlString).rdd)
+    rdd.partitions.length
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/21a72bf2/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordReader.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordReader.java b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordReader.java
index c4b501d..02e0ddf 100644
--- a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordReader.java
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordReader.java
@@ -35,6 +35,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
 import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
+import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
@@ -52,6 +53,7 @@ import org.apache.carbondata.core.scan.filter.intf.RowImpl;
 import org.apache.carbondata.core.scan.filter.intf.RowIntf;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 import org.apache.carbondata.core.scan.model.QueryModel;
+import org.apache.carbondata.core.util.CarbonMetadataUtil;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.DataTypeUtil;
 import org.apache.carbondata.format.BlockletHeader;
@@ -413,7 +415,19 @@ public class CarbonStreamRecordReader extends RecordReader<Void, Object> {
   }
 
   private boolean isScanRequired(BlockletHeader header) {
-    // TODO require to implement min-max index
+    if (filter != null && header.getBlocklet_index() != null) {
+      BlockletMinMaxIndex minMaxIndex = CarbonMetadataUtil.convertExternalMinMaxIndex(
+          header.getBlocklet_index().getMin_max_index());
+      if (minMaxIndex != null) {
+        BitSet bitSet =
+            filter.isScanRequired(minMaxIndex.getMaxValues(), minMaxIndex.getMinValues());
+        if (bitSet.isEmpty()) {
+          return false;
+        } else {
+          return true;
+        }
+      }
+    }
     return true;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/21a72bf2/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java
index bd622f0..6325528 100644
--- a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java
@@ -30,10 +30,12 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.CarbonMetadataUtil;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.DataTypeUtil;
@@ -51,12 +53,12 @@ import org.apache.carbondata.processing.loading.parser.RowParser;
 import org.apache.carbondata.processing.loading.parser.impl.RowParserImpl;
 import org.apache.carbondata.processing.store.writer.AbstractFactDataWriter;
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+import org.apache.carbondata.streaming.segment.StreamSegment;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskID;
-
 /**
  * Stream record writer
  */
@@ -95,6 +97,10 @@ public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> {
   private boolean isFirstRow = true;
   private boolean hasException = false;
 
+  // batch level stats collector
+  private BlockletMinMaxIndex batchMinMaxIndex;
+  private boolean isClosed = false;
+
   CarbonStreamRecordWriter(TaskAttemptContext job) throws IOException {
     initialize(job);
   }
@@ -132,7 +138,6 @@ public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> {
   }
 
   private void initializeAtFirstRow() throws IOException, InterruptedException {
-
     // initialize metadata
     isNoDictionaryDimensionColumn =
         CarbonDataProcessorUtil.getNoDictionaryMapping(configuration.getDataFields());
@@ -144,20 +149,19 @@ public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> {
       measureDataTypes[i] =
           dataFields[dimensionWithComplexCount + i].getColumn().getDataType();
     }
-
     // initialize parser and converter
     rowParser = new RowParserImpl(dataFields, configuration);
     badRecordLogger = BadRecordsLoggerProvider.createBadRecordLogger(configuration);
     converter = new RowConverterImpl(configuration.getDataFields(), configuration, badRecordLogger);
     configuration.setCardinalityFinder(converter);
     converter.initialize();
-
     // initialize encoder
     nullBitSet = new BitSet(dataFields.length);
     int rowBufferSize = hadoopConf.getInt(CarbonStreamOutputFormat.CARBON_ENCODER_ROW_BUFFER_SIZE,
         CarbonStreamOutputFormat.CARBON_ENCODER_ROW_BUFFER_SIZE_DEFAULT);
-    output = new StreamBlockletWriter(maxCacheSize, maxRowNums, rowBufferSize);
-
+    output = new StreamBlockletWriter(maxCacheSize, maxRowNums, rowBufferSize,
+        isNoDictionaryDimensionColumn.length, measureCount,
+        measureDataTypes);
     // initialize data writer
     String filePath = segmentDir + File.separator + fileName;
     FileFactory.FileType fileType = FileFactory.getFileType(filePath);
@@ -170,7 +174,6 @@ public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> {
       outputStream = FileFactory.getDataOutputStream(filePath, fileType);
       writeFileHeader();
     }
-
     isFirstRow = false;
   }
 
@@ -178,7 +181,6 @@ public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> {
     if (isFirstRow) {
       initializeAtFirstRow();
     }
-
     // null bit set
     nullBitSet.clear();
     Object[] rowData = (Object[]) value;
@@ -203,7 +205,6 @@ public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> {
       }
       int dimCount = 0;
       Object columnValue;
-
       // primitive type dimension
       for (; dimCount < isNoDictionaryDimensionColumn.length; dimCount++) {
         columnValue = currentRow.getObject(dimCount);
@@ -212,9 +213,13 @@ public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> {
             byte[] col = (byte[]) columnValue;
             output.writeShort(col.length);
             output.writeBytes(col);
+            output.dimStatsCollectors[dimCount].update(col);
           } else {
             output.writeInt((int) columnValue);
+            output.dimStatsCollectors[dimCount].update(ByteUtil.toBytes((int) columnValue));
           }
+        } else {
+          output.dimStatsCollectors[dimCount].updateNull(0);
         }
       }
       // complex type dimension
@@ -234,19 +239,25 @@ public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> {
           dataType = measureDataTypes[msrCount];
           if (dataType == DataTypes.BOOLEAN) {
             output.writeBoolean((boolean) columnValue);
+            output.msrStatsCollectors[msrCount].update((byte) ((boolean) columnValue ? 1 : 0));
           } else if (dataType == DataTypes.SHORT) {
             output.writeShort((short) columnValue);
+            output.msrStatsCollectors[msrCount].update((short) columnValue);
           } else if (dataType == DataTypes.INT) {
             output.writeInt((int) columnValue);
+            output.msrStatsCollectors[msrCount].update((int) columnValue);
           } else if (dataType == DataTypes.LONG) {
             output.writeLong((long) columnValue);
+            output.msrStatsCollectors[msrCount].update((long) columnValue);
           } else if (dataType == DataTypes.DOUBLE) {
             output.writeDouble((double) columnValue);
+            output.msrStatsCollectors[msrCount].update((double) columnValue);
           } else if (DataTypes.isDecimal(dataType)) {
             BigDecimal val = (BigDecimal) columnValue;
             byte[] bigDecimalInBytes = DataTypeUtil.bigDecimalToByte(val);
             output.writeShort(bigDecimalInBytes.length);
             output.writeBytes(bigDecimalInBytes);
+            output.msrStatsCollectors[msrCount].update((BigDecimal) columnValue);
           } else {
             String msg =
                 "unsupported data type:" + dataFields[dimCount + msrCount].getColumn().getDataType()
@@ -254,6 +265,8 @@ public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> {
             LOGGER.error(msg);
             throw new IOException(msg);
           }
+        } else {
+          output.msrStatsCollectors[msrCount].updateNull(0);
         }
       }
     }
@@ -294,12 +307,27 @@ public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> {
     }
     output.apppendBlocklet(outputStream);
     outputStream.flush();
+    if (!isClosed) {
+      batchMinMaxIndex = StreamSegment.mergeBlockletMinMax(
+          batchMinMaxIndex, output.generateBlockletMinMax(), measureDataTypes);
+    }
     // reset data
     output.reset();
   }
 
-  @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException {
+  public BlockletMinMaxIndex getBatchMinMaxIndex() {
+    return StreamSegment.mergeBlockletMinMax(
+        batchMinMaxIndex, output.generateBlockletMinMax(), measureDataTypes);
+  }
+
+  public DataType[] getMeasureDataTypes() {
+    return measureDataTypes;
+  }
+
+  @Override
+  public void close(TaskAttemptContext context) throws IOException, InterruptedException {
     try {
+      isClosed = true;
       // append remain buffer data
       if (!hasException && !isFirstRow) {
         appendBlockletToDataFile();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/21a72bf2/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletWriter.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletWriter.java b/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletWriter.java
index 7b2176b..d4322b4 100644
--- a/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletWriter.java
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletWriter.java
@@ -22,10 +22,20 @@ import java.io.IOException;
 
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
+import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatsCollector;
+import org.apache.carbondata.core.datastore.page.statistics.KeyPageStatsCollector;
+import org.apache.carbondata.core.datastore.page.statistics.PrimitivePageStatsCollector;
+import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
+import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.util.CarbonMetadataUtil;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.format.BlockletHeader;
+import org.apache.carbondata.format.BlockletIndex;
 import org.apache.carbondata.format.BlockletInfo;
 import org.apache.carbondata.format.MutationType;
+import org.apache.carbondata.streaming.segment.StreamSegment;
 
 /**
  * stream blocklet writer
@@ -39,13 +49,49 @@ public class StreamBlockletWriter {
   private int rowIndex = -1;
   private Compressor compressor = CompressorFactory.getInstance().getCompressor();
 
-  StreamBlockletWriter(int maxSize, int maxRowNum, int rowSize) {
+  private int dimCountWithoutComplex;
+  private int measureCount;
+  private DataType[] measureDataTypes;
+
+  // blocklet level stats
+  ColumnPageStatsCollector[] dimStatsCollectors;
+  ColumnPageStatsCollector[] msrStatsCollectors;
+  // blocklet level Min/Max
+  private BlockletMinMaxIndex blockletMinMaxIndex;
+
+  StreamBlockletWriter(int maxSize, int maxRowNum, int rowSize, int dimCountWithoutComplex,
+      int measureCount, DataType[] measureDataTypes) {
     buffer = new byte[maxSize];
     this.maxSize = maxSize;
     this.maxRowNum = maxRowNum;
     this.rowSize = rowSize;
+    this.dimCountWithoutComplex = dimCountWithoutComplex;
+    this.measureCount = measureCount;
+    this.measureDataTypes = measureDataTypes;
+    initializeStatsCollector();
+  }
+
+  private void initializeStatsCollector() {
+    // dimension stats collectors
+    // not require to collector stats for complex type
+    // so it only contains dictionary dimensions and no-dictionary dimensions
+    dimStatsCollectors = new ColumnPageStatsCollector[dimCountWithoutComplex];
+    // measure stats collectors
+    msrStatsCollectors = new ColumnPageStatsCollector[measureCount];
+
+    int dimCount = 0;
+    for (; dimCount < dimCountWithoutComplex; dimCount++) {
+      dimStatsCollectors[dimCount] =
+          KeyPageStatsCollector.newInstance(DataTypes.BYTE_ARRAY);
+    }
+
+    for (int msrCount = 0; msrCount < measureCount; msrCount++) {
+      msrStatsCollectors[msrCount] =
+          PrimitivePageStatsCollector.newInstance(measureDataTypes[msrCount]);
+    }
   }
 
+
   private void ensureCapacity(int space) {
     int newcount = space + count;
     if (newcount > buffer.length) {
@@ -58,6 +104,8 @@ public class StreamBlockletWriter {
   void reset() {
     count = 0;
     rowIndex = -1;
+    initializeStatsCollector();
+    blockletMinMaxIndex = null;
   }
 
   byte[] getBytes() {
@@ -134,6 +182,36 @@ public class StreamBlockletWriter {
     count += len;
   }
 
+  private SimpleStatsResult[] getDimStats() {
+    if (dimStatsCollectors == null) {
+      return new SimpleStatsResult[0];
+    }
+    SimpleStatsResult[] stats = new SimpleStatsResult[dimStatsCollectors.length];
+    int dimCount = 0;
+    for (; dimCount < dimStatsCollectors.length; dimCount++) {
+      stats[dimCount] = dimStatsCollectors[dimCount].getPageStats();
+    }
+    return stats;
+  }
+
+  private SimpleStatsResult[] getMsrStats() {
+    if (msrStatsCollectors == null) {
+      return new SimpleStatsResult[0];
+    }
+    SimpleStatsResult[] stats = new SimpleStatsResult[msrStatsCollectors.length];
+    for (int mrsCount = 0; mrsCount < msrStatsCollectors.length; mrsCount++) {
+      stats[mrsCount] = msrStatsCollectors[mrsCount].getPageStats();
+    }
+    return stats;
+  }
+
+  BlockletMinMaxIndex generateBlockletMinMax() {
+    if (blockletMinMaxIndex == null) {
+      blockletMinMaxIndex = StreamSegment.collectMinMaxIndex(getDimStats(), getMsrStats());
+    }
+    return blockletMinMaxIndex;
+  }
+
   void apppendBlocklet(DataOutputStream outputStream) throws IOException {
     outputStream.write(CarbonStreamOutputFormat.CARBON_SYNC_MARKER);
 
@@ -143,6 +221,13 @@ public class StreamBlockletWriter {
     blockletHeader.setBlocklet_length(getCount());
     blockletHeader.setMutation(MutationType.INSERT);
     blockletHeader.setBlocklet_info(blockletInfo);
+    // add blocklet level min/max
+    blockletMinMaxIndex = generateBlockletMinMax();
+    if (blockletInfo.getNum_rows() > 1) {
+      BlockletIndex blockletIndex = new BlockletIndex();
+      blockletIndex.setMin_max_index(CarbonMetadataUtil.convertMinMaxIndex(blockletMinMaxIndex));
+      blockletHeader.setBlocklet_index(blockletIndex);
+    }
     byte[] headerBytes = CarbonUtil.getByteArray(blockletHeader);
     outputStream.writeInt(headerBytes.length);
     outputStream.write(headerBytes);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/21a72bf2/streaming/src/main/java/org/apache/carbondata/streaming/index/StreamFileIndex.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/index/StreamFileIndex.java b/streaming/src/main/java/org/apache/carbondata/streaming/index/StreamFileIndex.java
new file mode 100644
index 0000000..fa8a694
--- /dev/null
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/index/StreamFileIndex.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.streaming.index;
+
+import java.io.Serializable;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+
+@InterfaceAudience.Internal
+public class StreamFileIndex implements Serializable {
+
+  /**
+   * the name of file, it doesn't contain the whole path.
+   */
+  private String fileName;
+
+  private BlockletMinMaxIndex minMaxIndex;
+
+  private long rowCount;
+
+  private DataType[] msrDataTypes;
+
+  public StreamFileIndex(String fileName, BlockletMinMaxIndex minMaxIndex, long rowCount) {
+    this.fileName = fileName;
+    this.minMaxIndex = minMaxIndex;
+    this.rowCount = rowCount;
+  }
+
+  public String getFileName() {
+    return fileName;
+  }
+
+  public void setFileName(String fileName) {
+    this.fileName = fileName;
+  }
+
+  public BlockletMinMaxIndex getMinMaxIndex() {
+    return minMaxIndex;
+  }
+
+  public void setMinMaxIndex(BlockletMinMaxIndex minMaxIndex) {
+    this.minMaxIndex = minMaxIndex;
+  }
+
+  public long getRowCount() {
+    return rowCount;
+  }
+
+  public void setRowCount(long rowCount) {
+    this.rowCount = rowCount;
+  }
+
+  public DataType[] getMsrDataTypes() {
+    return msrDataTypes;
+  }
+
+  public void setMsrDataTypes(DataType[] msrDataTypes) {
+    this.msrDataTypes = msrDataTypes;
+  }
+}